Resolve merge conflicts for George's pull request https://github.com/apache/usergrid/pull/520 Jira : https://issues.apache.org/jira/browse/USERGRID-1120 https://issues.apache.org/jira/browse/USERGRID-1116 https://issues.apache.org/jira/browse/USERGRID-1118
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/dd13f0b0 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/dd13f0b0 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/dd13f0b0 Branch: refs/heads/datastax-cass-driver Commit: dd13f0b00cdc1a84153a582028b9aac13f98f74b Parents: e8f68a0 Author: Ayesha Dastagiri <[email protected]> Authored: Mon Aug 1 14:34:57 2016 -0700 Committer: Ayesha Dastagiri <[email protected]> Committed: Mon Aug 1 14:34:57 2016 -0700 ---------------------------------------------------------------------- .../corepersistence/CpEntityManager.java | 33 ++++++++++++++++++-- .../corepersistence/CpEntityManagerFactory.java | 9 ++++-- .../usergrid/persistence/PersistenceModule.java | 12 ++++--- .../applications/events/EventsResource.java | 6 ++-- .../applications/events/EventsResourceIT.java | 32 +++++++++---------- 5 files changed, 59 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/dd13f0b0/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java index fcda5b5..7b273e4 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java @@ -36,6 +36,8 @@ import org.apache.usergrid.corepersistence.service.CollectionService; import org.apache.usergrid.corepersistence.service.ConnectionService; import org.apache.usergrid.corepersistence.util.CpEntityMapUtils; import org.apache.usergrid.corepersistence.util.CpNamingUtils; +import org.apache.usergrid.mq.QueueManager; +import org.apache.usergrid.mq.QueueManagerFactory; import org.apache.usergrid.persistence.*; import org.apache.usergrid.persistence.Query.Level; import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; @@ -65,6 +67,7 @@ import org.apache.usergrid.persistence.model.entity.SimpleId; import org.apache.usergrid.persistence.model.field.Field; import org.apache.usergrid.persistence.model.field.StringField; import org.apache.usergrid.persistence.model.util.UUIDGenerator; +import org.apache.usergrid.mq.Message; import org.apache.usergrid.utils.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -155,6 +158,9 @@ public class CpEntityManager implements EntityManager { private EntityCollectionManager ecm; + public QueueManagerFactory queueManagerFactory; + + // /** Short-term cache to keep us from reloading same Entity during single request. */ // private LoadingCache<EntityScope, org.apache.usergrid.persistence.model.entity.Entity> entityCache; @@ -178,7 +184,8 @@ public class CpEntityManager implements EntityManager { final CollectionService collectionService, final ConnectionService connectionService, final CollectionSettingsFactory collectionSettingsFactory, - final UUID applicationId ) { + final UUID applicationId, + final QueueManagerFactory queueManagerFactory) { this.entityManagerFig = entityManagerFig; this.actorSystemFig = actorSystemFig; @@ -243,6 +250,8 @@ public class CpEntityManager implements EntityManager { // set to false for now this.skipAggregateCounters = false; + + this.queueManagerFactory = queueManagerFactory; } @@ -1493,6 +1502,21 @@ public class CpEntityManager implements EntityManager { return entity; } + public Message storeEventAsMessage(Mutator<ByteBuffer> m, Event event, long timestamp) { + + counterUtils.addEventCounterMutations(m, applicationId, event, timestamp); + + QueueManager q = queueManagerFactory.getQueueManager(applicationId); + + Message message = new Message(); + message.setType("event"); + message.setCategory(event.getCategory()); + message.setStringProperty("message", event.getMessage()); + message.setTimestamp(timestamp); + q.postToQueue("events", message); + + return message; + } @Override public Entity createItemInCollection( EntityRef entityRef, String collectionName, @@ -2772,11 +2796,14 @@ public class CpEntityManager implements EntityManager { } } - //doesn't allow the mutator to be ignored. - counterUtils.addEventCounterMutations( null, applicationId, event, timestamp ); + Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be ); + Message message = storeEventAsMessage( batch, event, timestamp ); incrementEntityCollection( "events", timestamp ); + entity.setUuid( message.getUuid() ); + batch.execute(); + return entity; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/dd13f0b0/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java index 8055740..2a88302 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java @@ -34,6 +34,7 @@ import org.apache.usergrid.corepersistence.service.ConnectionService; import org.apache.usergrid.corepersistence.util.CpNamingUtils; import org.apache.usergrid.exception.ConflictException; import org.apache.usergrid.locking.LockManager; +import org.apache.usergrid.mq.QueueManagerFactory; import org.apache.usergrid.persistence.*; import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; import org.apache.usergrid.persistence.actorsystem.ActorSystemManager; @@ -43,7 +44,6 @@ import org.apache.usergrid.persistence.cassandra.Setup; import org.apache.usergrid.persistence.collection.EntityCollectionManager; import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException; import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; -import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValueActor; import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesService; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.migration.data.MigrationDataProvider; @@ -116,6 +116,8 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application private UniqueValuesService uniqueValuesService; private final LockManager lockManager; + private final QueueManagerFactory queueManagerFactory; + public static final String MANAGEMENT_APP_INIT_MAXRETRIES= "management.app.init.max-retries"; public static final String MANAGEMENT_APP_INIT_INTERVAL = "management.app.init.interval"; @@ -159,7 +161,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application } } this.lockManager = injector.getInstance( LockManager.class ); - + this.queueManagerFactory = injector.getInstance( QueueManagerFactory.class ); // this line always needs to be last due to the temporary cicular dependency until spring is removed @@ -375,7 +377,8 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application collectionService, connectionService, collectionSettingsFactory, - applicationId ); + applicationId, + queueManagerFactory); return em; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/dd13f0b0/stack/core/src/main/java/org/apache/usergrid/persistence/PersistenceModule.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/PersistenceModule.java b/stack/core/src/main/java/org/apache/usergrid/persistence/PersistenceModule.java index 70fff90..a945462 100644 --- a/stack/core/src/main/java/org/apache/usergrid/persistence/PersistenceModule.java +++ b/stack/core/src/main/java/org/apache/usergrid/persistence/PersistenceModule.java @@ -20,14 +20,12 @@ package org.apache.usergrid.persistence; -import org.springframework.beans.factory.BeanFactory; -import org.springframework.beans.factory.ListableBeanFactory; -import org.springframework.context.ApplicationContext; - - import com.google.inject.AbstractModule; import com.google.inject.Provider; import com.google.inject.spring.SpringIntegration; +import org.apache.usergrid.mq.QueueManagerFactory; +import org.springframework.beans.factory.BeanFactory; +import org.springframework.context.ApplicationContext; /** @@ -60,6 +58,10 @@ public class PersistenceModule extends AbstractModule { final Provider<EntityManagerFactory> emfProvider = SpringIntegration.fromSpring( EntityManagerFactory.class, "entityManagerFactory" ); bind( EntityManagerFactory.class ).toProvider( emfProvider ); + + final Provider<QueueManagerFactory> qmfProvider = SpringIntegration.fromSpring( QueueManagerFactory.class, "queueManagerFactory" ); + bind( QueueManagerFactory.class ).toProvider( qmfProvider ); + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/dd13f0b0/stack/rest/src/main/java/org/apache/usergrid/rest/applications/events/EventsResource.java ---------------------------------------------------------------------- diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/events/EventsResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/events/EventsResource.java index d5709d7..0b5eeb7 100644 --- a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/events/EventsResource.java +++ b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/events/EventsResource.java @@ -29,10 +29,7 @@ import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; -import javax.ws.rs.DefaultValue; -import javax.ws.rs.GET; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; +import javax.ws.rs.*; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.UriInfo; @@ -55,6 +52,7 @@ public class EventsResource extends ServiceResource { @GET @JSONP + @Consumes(MediaType.APPLICATION_JSON) @Produces({MediaType.APPLICATION_JSON, "application/javascript"}) public QueueResults executeQueueGet( @Context UriInfo ui, @QueryParam("callback") @DefaultValue("callback") String callback ) throws Exception { http://git-wip-us.apache.org/repos/asf/usergrid/blob/dd13f0b0/stack/rest/src/test/java/org/apache/usergrid/rest/applications/events/EventsResourceIT.java ---------------------------------------------------------------------- diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/events/EventsResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/events/EventsResourceIT.java index f4f8630..965105b 100644 --- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/events/EventsResourceIT.java +++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/events/EventsResourceIT.java @@ -17,26 +17,21 @@ package org.apache.usergrid.rest.applications.events; -import java.util.LinkedHashMap; -import java.util.Map; - -import javax.ws.rs.core.MediaType; - -import com.fasterxml.jackson.databind.JsonNode; -import java.io.IOException; - import org.apache.usergrid.rest.test.resource.AbstractRestIT; import org.apache.usergrid.rest.test.resource.model.ApiResponse; import org.apache.usergrid.rest.test.resource.model.Collection; -import org.apache.usergrid.rest.test.resource.model.Token; +import org.apache.usergrid.rest.test.resource.model.QueryParameters; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import org.junit.Ignore; public class EventsResourceIT extends AbstractRestIT { @@ -45,7 +40,6 @@ public class EventsResourceIT extends AbstractRestIT { @Test - @Ignore("Pending https://issues.apache.org/jira/browse/USERGRID-1118. Events not working yet") public void testEventPostandGet() throws IOException { Map<String, Object> payload = new LinkedHashMap<String, Object>(); @@ -108,23 +102,25 @@ public class EventsResourceIT extends AbstractRestIT { collection = this.app().collection( "events" ) .get(); - assertEquals("Expected Advertising", advertising, ((Map<String, Object>) ((Map<String, Object>) collection.getResponse().getProperties().get("messages")).get(0)).get("uuid").toString()); + assertEquals("Expected Advertising", advertising, ((Map)((ArrayList) collection.getResponse().getProperties().get("messages" )).get(0 ) ).get("uuid" ).toString()); lastId = collection.getResponse().getProperties().get("last").toString(); } // check sales event in queue - collection = this.app().collection( "events" ) - .get(); + QueryParameters queryParameters = new QueryParameters(); + queryParameters.addParam( "last",lastId ); + + collection = this.app().collection( "events" ).get(queryParameters); - assertEquals( "Expected Sales", sales,((Map<String, Object>) ((Map<String, Object>) collection.getResponse().getProperties().get("messages")).get(0)).get("uuid").toString()); + assertEquals( "Expected Sales", sales,((Map<String, Object>) ((ArrayList) collection.getResponse().getProperties().get("messages")).get(0)).get("uuid").toString()); lastId = collection.getResponse().getProperties().get("last").toString(); // check marketing event in queue - collection = this.app().collection( "events" ) - .get(); + queryParameters.addParam( "last",lastId ); + collection = this.app().collection( "events" ).get(queryParameters); - assertEquals( "Expected Marketing", marketing, ((Map<String, Object>) ((Map<String, Object>) collection.getResponse().getProperties().get("messages")).get(0)).get("uuid").toString()); + assertEquals( "Expected Marketing", marketing, ((Map<String, Object>) ((ArrayList) collection.getResponse().getProperties().get("messages")).get(0)).get("uuid").toString()); } }
