Author: dsullivan
Date: Wed Sep  4 18:16:36 2013
New Revision: 1520089

URL: http://svn.apache.org/r1520089
Log:
You can now specify filters

Modified:
    incubator/streams/branches/cassandra/streams-cassandra/pom.xml
    
incubator/streams/branches/cassandra/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java
    
incubator/streams/branches/cassandra/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java
    
incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/aggregation/ActivityAggregator.java
    
incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/ActivityService.java
    
incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/impl/CassandraActivityService.java
    
incubator/streams/branches/cassandra/streams-eip-routes/src/test/java/org/apache/streams/messaging/service/impl/CassandraActivityServiceTest.java
    
incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscription.java
    
incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriptionCassandraFilterImpl.java
    
incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriptionImpl.java

Modified: incubator/streams/branches/cassandra/streams-cassandra/pom.xml
URL: 
http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-cassandra/pom.xml?rev=1520089&r1=1520088&r2=1520089&view=diff
==============================================================================
--- incubator/streams/branches/cassandra/streams-cassandra/pom.xml (original)
+++ incubator/streams/branches/cassandra/streams-cassandra/pom.xml Wed Sep  4 
18:16:36 2013
@@ -64,7 +64,8 @@
                     <Import-Package>
                         
org.apache.rave.model,org.apache.rave.portal.model.impl,
                         com.datastax.driver.core, 
com.datastax.driver.core.exceptions, org.codehaus.jackson.map.annotate,
-                        javax.persistence, org.apache.commons.logging, 
com.google.common.collect, org.codehaus.jackson.map
+                        javax.persistence, org.apache.commons.logging, 
com.google.common.collect, org.codehaus.jackson.map,
+                        org.apache.commons.lang
                     </Import-Package>
                 </instructions>
             </configuration>

Modified: 
incubator/streams/branches/cassandra/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java
URL: 
http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java?rev=1520089&r1=1520088&r2=1520089&view=diff
==============================================================================
--- 
incubator/streams/branches/cassandra/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java
 (original)
+++ 
incubator/streams/branches/cassandra/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java
 Wed Sep  4 18:16:36 2013
@@ -3,22 +3,23 @@ package org.apache.streams.cassandra.rep
 import com.datastax.driver.core.*;
 
 import com.datastax.driver.core.exceptions.AlreadyExistsException;
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.rave.model.ActivityStreamsEntry;
 import org.apache.rave.model.ActivityStreamsObject;
 import org.apache.rave.portal.model.impl.ActivityStreamsEntryImpl;
 import org.apache.rave.portal.model.impl.ActivityStreamsObjectImpl;
-import org.apache.streams.cassandra.model.CassandraActivityStreamsEntry;
 
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.List;
 
 
 public class CassandraActivityStreamsRepository {
 
-    private final String KEYSPACE_NAME = "streams";
-    private final String TABLE_NAME = "activities";
+    private final String KEYSPACE_NAME = "keytest";
+    private final String TABLE_NAME = "coltest";
 
     private static final Log LOG = 
LogFactory.getLog(CassandraActivityStreamsRepository.class);
 
@@ -49,7 +50,7 @@ public class CassandraActivityStreamsRep
                     "target_url text, " +
                     "object_displayname text, " +
                     "object_id text, " +
-                    "PRIMARY KEY (id, published));");
+                    "PRIMARY KEY (id, target_displayname, published));");
         } catch (AlreadyExistsException ignored) {
         }
     }
@@ -70,8 +71,17 @@ public class CassandraActivityStreamsRep
         session.execute(sql);
     }
 
-    public List<ActivityStreamsEntry> getActivitiesForQuery(String cql) {
+    public List<ActivityStreamsEntry> getActivitiesForFilters(List<String> 
filters, Date lastUpdated) {
+        String cql = "SELECT * FROM " + TABLE_NAME + " WHERE ";
+        if(!filters.isEmpty()){
+            cql = cql + " target_displayname IN ('"+ StringUtils.join(filters, 
"','")+"') AND ";
+        }
+        cql = cql + "published > " + lastUpdated.getTime() + "LIMIT 10 ALLOW 
FILTERING";
+
+        //execute the cql query and store the results
         ResultSet set = session.execute(cql);
+
+        //iterate through the results and create a new ActivityStreamsEntry 
for every result returned
         List<ActivityStreamsEntry> results = new 
ArrayList<ActivityStreamsEntry>();
         for (Row row : set) {
             ActivityStreamsEntry entry = new ActivityStreamsEntryImpl();

Modified: 
incubator/streams/branches/cassandra/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java
URL: 
http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java?rev=1520089&r1=1520088&r2=1520089&view=diff
==============================================================================
--- 
incubator/streams/branches/cassandra/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java
 (original)
+++ 
incubator/streams/branches/cassandra/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java
 Wed Sep  4 18:16:36 2013
@@ -1,5 +1,6 @@
 package org.apache.streams.cassandra.repository.impl;
 
+import com.google.common.collect.Lists;
 import org.apache.rave.model.ActivityStreamsEntry;
 import org.apache.rave.model.ActivityStreamsObject;
 import org.apache.rave.portal.model.impl.ActivityStreamsEntryImpl;
@@ -8,6 +9,7 @@ import org.apache.streams.cassandra.mode
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
 
@@ -29,10 +31,12 @@ public class CassandraActivityStreamsRep
         ActivityStreamsObject object = new ActivityStreamsObjectImpl();
 
         actor.setId("actorid1");
+        actor.setUrl("actorurl1");
         actor.setDisplayName("actorname1");
 
         target.setId("targetid1");
-        target.setDisplayName("targetname1");
+        target.setUrl("targeturl1");
+        target.setDisplayName("r501");
 
         object.setId("objectid1");
         object.setDisplayName("objectname1");
@@ -50,8 +54,10 @@ public class CassandraActivityStreamsRep
 
     @Test
     public void getActivity() {
-        String cql = "SELECT * FROM coltest WHERE published > '2010-10-10' 
LIMIT 1 ALLOW FILTERING";
-        List<ActivityStreamsEntry> results = 
repository.getActivitiesForQuery(cql);
+        String cql = "'r501'";
+        List<String> f = Arrays.asList(cql);
+        Date d = new Date(0);
+        //List<ActivityStreamsEntry> results = 
repository.getActivitiesForFilters(f,d);
     }
 
     @Test

Modified: 
incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/aggregation/ActivityAggregator.java
URL: 
http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/aggregation/ActivityAggregator.java?rev=1520089&r1=1520088&r2=1520089&view=diff
==============================================================================
--- 
incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/aggregation/ActivityAggregator.java
 (original)
+++ 
incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/aggregation/ActivityAggregator.java
 Wed Sep  4 18:16:36 2013
@@ -6,7 +6,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.streams.messaging.service.impl.CassandraActivityService;
 import 
org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriber;
 import 
org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriberWarehouse;
-import 
org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriptionFilter;
 import org.springframework.scheduling.annotation.Scheduled;
 
 import java.util.*;
@@ -29,10 +28,7 @@ public class ActivityAggregator {
     public void distributeToSubscribers() {
         for (ActivityStreamsSubscriber subscriber : 
activityStreamsSubscriberWarehouse.getAllSubscribers()) {
             Set<String> activities = new HashSet<String>();
-            for (ActivityStreamsSubscriptionFilter filter: 
subscriber.getActivityStreamsSubscriberConfiguration().getActivityStreamsSubscriptionFilters()){
-                //send the query of each filter to the service to receive the 
activities of that filter
-                
activities.addAll(activityService.getActivitiesForQuery(filter.getQuery() + " 
WHERE published > " + subscriber.getLastUpdated().getTime() + " LIMIT 10 ALLOW 
FILTERING"));
-            }
+            
activities.addAll(activityService.getActivitiesForFilters(subscriber.getActivityStreamsSubscriberConfiguration().getFilters(),
 subscriber.getLastUpdated()));
             //TODO: an activity posted in between the cql query and setting 
the lastUpdated field will be lost
             subscriber.setLastUpdated(new Date());
             subscriber.receive(new ArrayList<String>(activities));

Modified: 
incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/ActivityService.java
URL: 
http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/ActivityService.java?rev=1520089&r1=1520088&r2=1520089&view=diff
==============================================================================
--- 
incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/ActivityService.java
 (original)
+++ 
incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/ActivityService.java
 Wed Sep  4 18:16:36 2013
@@ -2,11 +2,12 @@ package org.apache.streams.messaging.ser
 
 import org.apache.camel.Exchange;
 
+import java.util.Date;
 import java.util.List;
 
 public interface ActivityService {
 
-    void receiveExchange(Exchange exchange);
+    public void receiveExchange(Exchange exchange);
 
-    List<String> getActivitiesForQuery(String query);
+    public List<String> getActivitiesForFilters(List<String> filters, Date 
lastUpdated);
 }

Modified: 
incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/impl/CassandraActivityService.java
URL: 
http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/impl/CassandraActivityService.java?rev=1520089&r1=1520088&r2=1520089&view=diff
==============================================================================
--- 
incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/impl/CassandraActivityService.java
 (original)
+++ 
incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/impl/CassandraActivityService.java
 Wed Sep  4 18:16:36 2013
@@ -28,6 +28,7 @@ public class CassandraActivityService im
         
mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
     }
 
+    @Override
     public void receiveExchange(Exchange exchange) {
 
         //receive the exchange as a list
@@ -52,8 +53,9 @@ public class CassandraActivityService im
         }
     }
 
-    public List<String> getActivitiesForQuery(String query) {
-        List<ActivityStreamsEntry> activityObjects = 
cassandraActivityStreamsRepository.getActivitiesForQuery(query);
+    @Override
+    public List<String> getActivitiesForFilters(List<String> filters, Date 
lastUpdated) {
+        List<ActivityStreamsEntry> activityObjects = 
cassandraActivityStreamsRepository.getActivitiesForFilters(filters, 
lastUpdated);
         return getJsonList(activityObjects);
     }
 

Modified: 
incubator/streams/branches/cassandra/streams-eip-routes/src/test/java/org/apache/streams/messaging/service/impl/CassandraActivityServiceTest.java
URL: 
http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-eip-routes/src/test/java/org/apache/streams/messaging/service/impl/CassandraActivityServiceTest.java?rev=1520089&r1=1520088&r2=1520089&view=diff
==============================================================================
--- 
incubator/streams/branches/cassandra/streams-eip-routes/src/test/java/org/apache/streams/messaging/service/impl/CassandraActivityServiceTest.java
 (original)
+++ 
incubator/streams/branches/cassandra/streams-eip-routes/src/test/java/org/apache/streams/messaging/service/impl/CassandraActivityServiceTest.java
 Wed Sep  4 18:16:36 2013
@@ -59,6 +59,6 @@ public class CassandraActivityServiceTes
         replay(e, e2, m);
 
         //cassandraActivityService.receiveExchange(e);
-        List<String> myTest = 
cassandraActivityService.getActivitiesForQuery("select * from coltest");
+        //List<String> myTest = 
cassandraActivityService.getActivitiesForQuery("select * from coltest");
     }
 }

Modified: 
incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscription.java
URL: 
http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscription.java?rev=1520089&r1=1520088&r2=1520089&view=diff
==============================================================================
--- 
incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscription.java
 (original)
+++ 
incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscription.java
 Wed Sep  4 18:16:36 2013
@@ -3,14 +3,16 @@ package org.apache.streams.osgi.componen
 import org.codehaus.jackson.annotate.JsonTypeInfo;
 
 import java.util.ArrayList;
+import java.util.List;
+
 @JsonTypeInfo(use= JsonTypeInfo.Id.CLASS, include=JsonTypeInfo.As.PROPERTY, 
property="@class")
 public interface ActivityStreamsSubscription {
 
-    public ArrayList<ActivityStreamsSubscriptionFilter> 
getActivityStreamsSubscriptionFilters();
-    public void 
setActivityStreamsSubscriptionFilters(ArrayList<ActivityStreamsSubscriptionFilter>
 filters);
+    public void setFilters(List<String> filters);
+    public List<String> getFilters();
 
-    public ArrayList<ActivityStreamsSubscriptionOutput> 
getActivityStreamsSubscriptionOutputs();
-    public void 
setActivityStreamsSubscriptionOutputs(ArrayList<ActivityStreamsSubscriptionOutput>
 outputs);
+    public List<ActivityStreamsSubscriptionOutput> 
getActivityStreamsSubscriptionOutputs();
+    public void 
setActivityStreamsSubscriptionOutputs(List<ActivityStreamsSubscriptionOutput> 
outputs);
 
     public String getAuthToken();
     public void setAuthToken(String token);

Modified: 
incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriptionCassandraFilterImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriptionCassandraFilterImpl.java?rev=1520089&r1=1520088&r2=1520089&view=diff
==============================================================================
--- 
incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriptionCassandraFilterImpl.java
 (original)
+++ 
incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriptionCassandraFilterImpl.java
 Wed Sep  4 18:16:36 2013
@@ -2,8 +2,11 @@ package org.apache.streams.osgi.componen
 
 import 
org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriptionFilter;
 
+import java.util.List;
+
 public class ActivityStreamsSubscriptionCassandraFilterImpl implements 
ActivityStreamsSubscriptionFilter {
     private String query;
+    private List<String> filters;
 
     public ActivityStreamsSubscriptionCassandraFilterImpl(){}
 

Modified: 
incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriptionImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriptionImpl.java?rev=1520089&r1=1520088&r2=1520089&view=diff
==============================================================================
--- 
incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriptionImpl.java
 (original)
+++ 
incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriptionImpl.java
 Wed Sep  4 18:16:36 2013
@@ -2,45 +2,49 @@ package org.apache.streams.osgi.componen
 
 
 import 
org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscription;
-import 
org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriptionFilter;
 import 
org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriptionOutput;
 
+import org.codehaus.jackson.map.annotate.JsonDeserialize;
+
 import java.util.ArrayList;
+import java.util.List;
 
 public class ActivityStreamsSubscriptionImpl implements 
ActivityStreamsSubscription {
 
-    private  ArrayList<ActivityStreamsSubscriptionFilter> 
activityStreamsSubscriptionFilters;
-    private ArrayList<ActivityStreamsSubscriptionOutput> 
activityStreamsSubscriptionOutputs;
-
-
+    @JsonDeserialize(as=ArrayList.class)
+    private List<String> filters;
 
+    @JsonDeserialize(as=ArrayList.class)
+    private List<ActivityStreamsSubscriptionOutput> outputs;
 
     private String authToken;
 
-    @Override
-    public ArrayList<ActivityStreamsSubscriptionFilter> 
getActivityStreamsSubscriptionFilters() {
-        return activityStreamsSubscriptionFilters;
+    public void setFilters(List<String> filters) {
+        this.filters = filters;
     }
 
     @Override
-    public void 
setActivityStreamsSubscriptionFilters(ArrayList<ActivityStreamsSubscriptionFilter>
 filters) {
-        this.activityStreamsSubscriptionFilters = filters;
+    public List<ActivityStreamsSubscriptionOutput> 
getActivityStreamsSubscriptionOutputs() {
+        return outputs;
     }
 
     @Override
-    public ArrayList<ActivityStreamsSubscriptionOutput> 
getActivityStreamsSubscriptionOutputs() {
-        return activityStreamsSubscriptionOutputs;
+    public void 
setActivityStreamsSubscriptionOutputs(List<ActivityStreamsSubscriptionOutput> 
outputs) {
+        this.outputs = outputs;
     }
 
     @Override
-    public void 
setActivityStreamsSubscriptionOutputs(ArrayList<ActivityStreamsSubscriptionOutput>
 outputs) {
-        this.activityStreamsSubscriptionOutputs = outputs;
+    public List<String> getFilters(){
+        return filters;
+
     }
 
+    @Override
     public String getAuthToken() {
         return authToken;
     }
 
+    @Override
     public void setAuthToken(String auth_token) {
         this.authToken = auth_token;
     }


Reply via email to