Author: dsullivan Date: Wed Sep 4 18:16:36 2013 New Revision: 1520089 URL: http://svn.apache.org/r1520089 Log: You can now specify filters
Modified: incubator/streams/branches/cassandra/streams-cassandra/pom.xml incubator/streams/branches/cassandra/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java incubator/streams/branches/cassandra/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/aggregation/ActivityAggregator.java incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/ActivityService.java incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/impl/CassandraActivityService.java incubator/streams/branches/cassandra/streams-eip-routes/src/test/java/org/apache/streams/messaging/service/impl/CassandraActivityServiceTest.java incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscription.java incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriptionCassandraFilterImpl.java incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriptionImpl.java Modified: incubator/streams/branches/cassandra/streams-cassandra/pom.xml URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-cassandra/pom.xml?rev=1520089&r1=1520088&r2=1520089&view=diff ============================================================================== --- incubator/streams/branches/cassandra/streams-cassandra/pom.xml (original) +++ incubator/streams/branches/cassandra/streams-cassandra/pom.xml Wed Sep 4 18:16:36 2013 @@ -64,7 +64,8 @@ <Import-Package> org.apache.rave.model,org.apache.rave.portal.model.impl, com.datastax.driver.core, com.datastax.driver.core.exceptions, org.codehaus.jackson.map.annotate, - javax.persistence, org.apache.commons.logging, com.google.common.collect, org.codehaus.jackson.map + javax.persistence, org.apache.commons.logging, com.google.common.collect, org.codehaus.jackson.map, + org.apache.commons.lang </Import-Package> </instructions> </configuration> Modified: incubator/streams/branches/cassandra/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java?rev=1520089&r1=1520088&r2=1520089&view=diff ============================================================================== --- incubator/streams/branches/cassandra/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java (original) +++ incubator/streams/branches/cassandra/streams-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java Wed Sep 4 18:16:36 2013 @@ -3,22 +3,23 @@ package org.apache.streams.cassandra.rep import com.datastax.driver.core.*; import com.datastax.driver.core.exceptions.AlreadyExistsException; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.rave.model.ActivityStreamsEntry; import org.apache.rave.model.ActivityStreamsObject; import org.apache.rave.portal.model.impl.ActivityStreamsEntryImpl; import org.apache.rave.portal.model.impl.ActivityStreamsObjectImpl; -import org.apache.streams.cassandra.model.CassandraActivityStreamsEntry; import java.util.ArrayList; +import java.util.Date; import java.util.List; public class CassandraActivityStreamsRepository { - private final String KEYSPACE_NAME = "streams"; - private final String TABLE_NAME = "activities"; + private final String KEYSPACE_NAME = "keytest"; + private final String TABLE_NAME = "coltest"; private static final Log LOG = LogFactory.getLog(CassandraActivityStreamsRepository.class); @@ -49,7 +50,7 @@ public class CassandraActivityStreamsRep "target_url text, " + "object_displayname text, " + "object_id text, " + - "PRIMARY KEY (id, published));"); + "PRIMARY KEY (id, target_displayname, published));"); } catch (AlreadyExistsException ignored) { } } @@ -70,8 +71,17 @@ public class CassandraActivityStreamsRep session.execute(sql); } - public List<ActivityStreamsEntry> getActivitiesForQuery(String cql) { + public List<ActivityStreamsEntry> getActivitiesForFilters(List<String> filters, Date lastUpdated) { + String cql = "SELECT * FROM " + TABLE_NAME + " WHERE "; + if(!filters.isEmpty()){ + cql = cql + " target_displayname IN ('"+ StringUtils.join(filters, "','")+"') AND "; + } + cql = cql + "published > " + lastUpdated.getTime() + "LIMIT 10 ALLOW FILTERING"; + + //execute the cql query and store the results ResultSet set = session.execute(cql); + + //iterate through the results and create a new ActivityStreamsEntry for every result returned List<ActivityStreamsEntry> results = new ArrayList<ActivityStreamsEntry>(); for (Row row : set) { ActivityStreamsEntry entry = new ActivityStreamsEntryImpl(); Modified: incubator/streams/branches/cassandra/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java?rev=1520089&r1=1520088&r2=1520089&view=diff ============================================================================== --- incubator/streams/branches/cassandra/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java (original) +++ incubator/streams/branches/cassandra/streams-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java Wed Sep 4 18:16:36 2013 @@ -1,5 +1,6 @@ package org.apache.streams.cassandra.repository.impl; +import com.google.common.collect.Lists; import org.apache.rave.model.ActivityStreamsEntry; import org.apache.rave.model.ActivityStreamsObject; import org.apache.rave.portal.model.impl.ActivityStreamsEntryImpl; @@ -8,6 +9,7 @@ import org.apache.streams.cassandra.mode import org.junit.Before; import org.junit.Test; +import java.util.Arrays; import java.util.Date; import java.util.List; @@ -29,10 +31,12 @@ public class CassandraActivityStreamsRep ActivityStreamsObject object = new ActivityStreamsObjectImpl(); actor.setId("actorid1"); + actor.setUrl("actorurl1"); actor.setDisplayName("actorname1"); target.setId("targetid1"); - target.setDisplayName("targetname1"); + target.setUrl("targeturl1"); + target.setDisplayName("r501"); object.setId("objectid1"); object.setDisplayName("objectname1"); @@ -50,8 +54,10 @@ public class CassandraActivityStreamsRep @Test public void getActivity() { - String cql = "SELECT * FROM coltest WHERE published > '2010-10-10' LIMIT 1 ALLOW FILTERING"; - List<ActivityStreamsEntry> results = repository.getActivitiesForQuery(cql); + String cql = "'r501'"; + List<String> f = Arrays.asList(cql); + Date d = new Date(0); + //List<ActivityStreamsEntry> results = repository.getActivitiesForFilters(f,d); } @Test Modified: incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/aggregation/ActivityAggregator.java URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/aggregation/ActivityAggregator.java?rev=1520089&r1=1520088&r2=1520089&view=diff ============================================================================== --- incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/aggregation/ActivityAggregator.java (original) +++ incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/aggregation/ActivityAggregator.java Wed Sep 4 18:16:36 2013 @@ -6,7 +6,6 @@ import org.apache.commons.logging.LogFac import org.apache.streams.messaging.service.impl.CassandraActivityService; import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriber; import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriberWarehouse; -import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriptionFilter; import org.springframework.scheduling.annotation.Scheduled; import java.util.*; @@ -29,10 +28,7 @@ public class ActivityAggregator { public void distributeToSubscribers() { for (ActivityStreamsSubscriber subscriber : activityStreamsSubscriberWarehouse.getAllSubscribers()) { Set<String> activities = new HashSet<String>(); - for (ActivityStreamsSubscriptionFilter filter: subscriber.getActivityStreamsSubscriberConfiguration().getActivityStreamsSubscriptionFilters()){ - //send the query of each filter to the service to receive the activities of that filter - activities.addAll(activityService.getActivitiesForQuery(filter.getQuery() + " WHERE published > " + subscriber.getLastUpdated().getTime() + " LIMIT 10 ALLOW FILTERING")); - } + activities.addAll(activityService.getActivitiesForFilters(subscriber.getActivityStreamsSubscriberConfiguration().getFilters(), subscriber.getLastUpdated())); //TODO: an activity posted in between the cql query and setting the lastUpdated field will be lost subscriber.setLastUpdated(new Date()); subscriber.receive(new ArrayList<String>(activities)); Modified: incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/ActivityService.java URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/ActivityService.java?rev=1520089&r1=1520088&r2=1520089&view=diff ============================================================================== --- incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/ActivityService.java (original) +++ incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/ActivityService.java Wed Sep 4 18:16:36 2013 @@ -2,11 +2,12 @@ package org.apache.streams.messaging.ser import org.apache.camel.Exchange; +import java.util.Date; import java.util.List; public interface ActivityService { - void receiveExchange(Exchange exchange); + public void receiveExchange(Exchange exchange); - List<String> getActivitiesForQuery(String query); + public List<String> getActivitiesForFilters(List<String> filters, Date lastUpdated); } Modified: incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/impl/CassandraActivityService.java URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/impl/CassandraActivityService.java?rev=1520089&r1=1520088&r2=1520089&view=diff ============================================================================== --- incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/impl/CassandraActivityService.java (original) +++ incubator/streams/branches/cassandra/streams-eip-routes/src/main/java/org/apache/streams/messaging/service/impl/CassandraActivityService.java Wed Sep 4 18:16:36 2013 @@ -28,6 +28,7 @@ public class CassandraActivityService im mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false); } + @Override public void receiveExchange(Exchange exchange) { //receive the exchange as a list @@ -52,8 +53,9 @@ public class CassandraActivityService im } } - public List<String> getActivitiesForQuery(String query) { - List<ActivityStreamsEntry> activityObjects = cassandraActivityStreamsRepository.getActivitiesForQuery(query); + @Override + public List<String> getActivitiesForFilters(List<String> filters, Date lastUpdated) { + List<ActivityStreamsEntry> activityObjects = cassandraActivityStreamsRepository.getActivitiesForFilters(filters, lastUpdated); return getJsonList(activityObjects); } Modified: incubator/streams/branches/cassandra/streams-eip-routes/src/test/java/org/apache/streams/messaging/service/impl/CassandraActivityServiceTest.java URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-eip-routes/src/test/java/org/apache/streams/messaging/service/impl/CassandraActivityServiceTest.java?rev=1520089&r1=1520088&r2=1520089&view=diff ============================================================================== --- incubator/streams/branches/cassandra/streams-eip-routes/src/test/java/org/apache/streams/messaging/service/impl/CassandraActivityServiceTest.java (original) +++ incubator/streams/branches/cassandra/streams-eip-routes/src/test/java/org/apache/streams/messaging/service/impl/CassandraActivityServiceTest.java Wed Sep 4 18:16:36 2013 @@ -59,6 +59,6 @@ public class CassandraActivityServiceTes replay(e, e2, m); //cassandraActivityService.receiveExchange(e); - List<String> myTest = cassandraActivityService.getActivitiesForQuery("select * from coltest"); + //List<String> myTest = cassandraActivityService.getActivitiesForQuery("select * from coltest"); } } Modified: incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscription.java URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscription.java?rev=1520089&r1=1520088&r2=1520089&view=diff ============================================================================== --- incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscription.java (original) +++ incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/ActivityStreamsSubscription.java Wed Sep 4 18:16:36 2013 @@ -3,14 +3,16 @@ package org.apache.streams.osgi.componen import org.codehaus.jackson.annotate.JsonTypeInfo; import java.util.ArrayList; +import java.util.List; + @JsonTypeInfo(use= JsonTypeInfo.Id.CLASS, include=JsonTypeInfo.As.PROPERTY, property="@class") public interface ActivityStreamsSubscription { - public ArrayList<ActivityStreamsSubscriptionFilter> getActivityStreamsSubscriptionFilters(); - public void setActivityStreamsSubscriptionFilters(ArrayList<ActivityStreamsSubscriptionFilter> filters); + public void setFilters(List<String> filters); + public List<String> getFilters(); - public ArrayList<ActivityStreamsSubscriptionOutput> getActivityStreamsSubscriptionOutputs(); - public void setActivityStreamsSubscriptionOutputs(ArrayList<ActivityStreamsSubscriptionOutput> outputs); + public List<ActivityStreamsSubscriptionOutput> getActivityStreamsSubscriptionOutputs(); + public void setActivityStreamsSubscriptionOutputs(List<ActivityStreamsSubscriptionOutput> outputs); public String getAuthToken(); public void setAuthToken(String token); Modified: incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriptionCassandraFilterImpl.java URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriptionCassandraFilterImpl.java?rev=1520089&r1=1520088&r2=1520089&view=diff ============================================================================== --- incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriptionCassandraFilterImpl.java (original) +++ incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriptionCassandraFilterImpl.java Wed Sep 4 18:16:36 2013 @@ -2,8 +2,11 @@ package org.apache.streams.osgi.componen import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriptionFilter; +import java.util.List; + public class ActivityStreamsSubscriptionCassandraFilterImpl implements ActivityStreamsSubscriptionFilter { private String query; + private List<String> filters; public ActivityStreamsSubscriptionCassandraFilterImpl(){} Modified: incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriptionImpl.java URL: http://svn.apache.org/viewvc/incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriptionImpl.java?rev=1520089&r1=1520088&r2=1520089&view=diff ============================================================================== --- incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriptionImpl.java (original) +++ incubator/streams/branches/cassandra/streams-osgi-components/activity-subscriber/src/main/java/org/apache/streams/osgi/components/activitysubscriber/impl/ActivityStreamsSubscriptionImpl.java Wed Sep 4 18:16:36 2013 @@ -2,45 +2,49 @@ package org.apache.streams.osgi.componen import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscription; -import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriptionFilter; import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriptionOutput; +import org.codehaus.jackson.map.annotate.JsonDeserialize; + import java.util.ArrayList; +import java.util.List; public class ActivityStreamsSubscriptionImpl implements ActivityStreamsSubscription { - private ArrayList<ActivityStreamsSubscriptionFilter> activityStreamsSubscriptionFilters; - private ArrayList<ActivityStreamsSubscriptionOutput> activityStreamsSubscriptionOutputs; - - + @JsonDeserialize(as=ArrayList.class) + private List<String> filters; + @JsonDeserialize(as=ArrayList.class) + private List<ActivityStreamsSubscriptionOutput> outputs; private String authToken; - @Override - public ArrayList<ActivityStreamsSubscriptionFilter> getActivityStreamsSubscriptionFilters() { - return activityStreamsSubscriptionFilters; + public void setFilters(List<String> filters) { + this.filters = filters; } @Override - public void setActivityStreamsSubscriptionFilters(ArrayList<ActivityStreamsSubscriptionFilter> filters) { - this.activityStreamsSubscriptionFilters = filters; + public List<ActivityStreamsSubscriptionOutput> getActivityStreamsSubscriptionOutputs() { + return outputs; } @Override - public ArrayList<ActivityStreamsSubscriptionOutput> getActivityStreamsSubscriptionOutputs() { - return activityStreamsSubscriptionOutputs; + public void setActivityStreamsSubscriptionOutputs(List<ActivityStreamsSubscriptionOutput> outputs) { + this.outputs = outputs; } @Override - public void setActivityStreamsSubscriptionOutputs(ArrayList<ActivityStreamsSubscriptionOutput> outputs) { - this.activityStreamsSubscriptionOutputs = outputs; + public List<String> getFilters(){ + return filters; + } + @Override public String getAuthToken() { return authToken; } + @Override public void setAuthToken(String auth_token) { this.authToken = auth_token; }