Changed the interface to return MarkedEdge interface. This allows the caller to determine if they want to filter or not, as well as to perform read-repair actions on read.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/1d4f22c5 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/1d4f22c5 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/1d4f22c5 Branch: refs/heads/master Commit: 1d4f22c59c057a6f9d52844e03897786b0773a5b Parents: 4e51d38 Author: Todd Nine <[email protected]> Authored: Fri Oct 23 15:25:39 2015 -0600 Committer: Todd Nine <[email protected]> Committed: Fri Oct 23 15:25:39 2015 -0600 ---------------------------------------------------------------------- .../corepersistence/CpEntityManagerFactory.java | 7 +- .../corepersistence/CpRelationManager.java | 16 ++--- .../read/traverse/AbstractReadGraphFilter.java | 74 +++++++++++++++---- .../read/traverse/EdgeCursorSerializer.java | 8 ++- .../traverse/ReadGraphCollectionFilter.java | 5 +- .../ReadGraphConnectionByTypeFilter.java | 11 +-- .../traverse/ReadGraphConnectionFilter.java | 5 +- .../index/AsyncIndexServiceTest.java | 3 +- .../corepersistence/index/IndexServiceTest.java | 12 ++-- .../pipeline/cursor/CursorTest.java | 24 +++---- .../service/ConnectionServiceImplTest.java | 5 +- .../persistence/ApplicationServiceIT.java | 4 +- .../persistence/graph/GraphManager.java | 14 ++-- .../usergrid/persistence/graph/MarkedEdge.java | 1 + .../graph/impl/GraphManagerImpl.java | 30 ++++---- .../graph/impl/SimpleMarkedEdge.java | 39 ++++++---- .../persistence/graph/GraphManagerIT.java | 76 ++++++++++---------- .../persistence/graph/GraphManagerLoadTest.java | 10 +-- .../graph/GraphManagerShardConsistencyIT.java | 6 +- .../graph/GraphManagerStressTest.java | 16 ++--- .../management/AppInfoMigrationPlugin.java | 5 +- 21 files changed, 220 insertions(+), 151 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java index 4bdade5..c75a025 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java @@ -63,6 +63,7 @@ import org.apache.usergrid.persistence.exceptions.EntityNotFoundException; import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.GraphManager; import org.apache.usergrid.persistence.graph.GraphManagerFactory; +import org.apache.usergrid.persistence.graph.MarkedEdge; import org.apache.usergrid.persistence.graph.SearchByEdgeType; import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType; import org.apache.usergrid.persistence.index.EntityIndex; @@ -477,9 +478,9 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application logger.debug("getApplications(): Loading edges of edgeType {} from {}:{}", new Object[]{edgeType, managementId.getType(), managementId.getUuid()}); - Observable<Edge> edges = gm.loadEdgesFromSource( new SimpleSearchByEdgeType( - managementId, edgeType, Long.MAX_VALUE, - SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() )); + Observable<MarkedEdge> edges = gm.loadEdgesFromSource( + new SimpleSearchByEdgeType( managementId, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, + Optional.<Edge>absent() ) ); final EntityCollectionManager ecm = managerCache.getEntityCollectionManager( appScope ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java index aad7610..b4cabc4 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java @@ -59,6 +59,7 @@ import org.apache.usergrid.persistence.entities.Group; import org.apache.usergrid.persistence.entities.User; import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.GraphManager; +import org.apache.usergrid.persistence.graph.MarkedEdge; import org.apache.usergrid.persistence.graph.SearchByEdge; import org.apache.usergrid.persistence.graph.SearchByEdgeType; import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge; @@ -227,14 +228,9 @@ public class CpRelationManager implements RelationManager { Observable<Edge> edges = gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( cpHeadEntity.getId(), edgeType, null ) ) - .flatMap( new Func1<String, Observable<Edge>>() { - @Override - public Observable<Edge> call( final String edgeType ) { - return gm.loadEdgesToTarget( - new SimpleSearchByEdgeType( cpHeadEntity.getId(), edgeType, Long.MAX_VALUE, - SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) ); - } - } ); + .flatMap( edgeType1 -> gm.loadEdgesToTarget( + new SimpleSearchByEdgeType( cpHeadEntity.getId(), edgeType1, Long.MAX_VALUE, + SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) ) ); //if our limit is set, take them. Note this logic is still borked, we can't possibly fit everything in memmory if ( limit > -1 ) { @@ -268,7 +264,7 @@ public class CpRelationManager implements RelationManager { } ); GraphManager gm = managerCache.getGraphManager( applicationScope ); - Observable<Edge> edges = gm.loadEdgeVersions( CpNamingUtils + Observable<MarkedEdge> edges = gm.loadEdgeVersions( CpNamingUtils .createEdgeFromConnectionType( new SimpleId( headEntity.getUuid(), headEntity.getType() ), connectionType, entityId ) ); @@ -288,7 +284,7 @@ public class CpRelationManager implements RelationManager { } ); GraphManager gm = managerCache.getGraphManager( applicationScope ); - Observable<Edge> edges = gm.loadEdgeVersions( CpNamingUtils + Observable<MarkedEdge> edges = gm.loadEdgeVersions( CpNamingUtils .createEdgeFromCollectionName( new SimpleId( headEntity.getUuid(), headEntity.getType() ), collectionName, entityId ) ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java index d3e0345..621edd2 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java @@ -23,13 +23,16 @@ package org.apache.usergrid.corepersistence.pipeline.read.traverse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer; import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter; import org.apache.usergrid.corepersistence.pipeline.read.EdgePath; import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.GraphManager; import org.apache.usergrid.persistence.graph.GraphManagerFactory; +import org.apache.usergrid.persistence.graph.MarkedEdge; import org.apache.usergrid.persistence.graph.SearchByEdgeType; import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType; import org.apache.usergrid.persistence.model.entity.Id; @@ -42,18 +45,21 @@ import rx.Observable; /** * Command for reading graph edges */ -public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, Edge> { +public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, MarkedEdge> { private static final Logger logger = LoggerFactory.getLogger( AbstractReadGraphFilter.class ); private final GraphManagerFactory graphManagerFactory; + private final AsyncEventService asyncEventService; /** * Create a new instance of our command */ - public AbstractReadGraphFilter( final GraphManagerFactory graphManagerFactory ) { + public AbstractReadGraphFilter( final GraphManagerFactory graphManagerFactory, + final AsyncEventService asyncEventService ) { this.graphManagerFactory = graphManagerFactory; + this.asyncEventService = asyncEventService; } @@ -61,9 +67,11 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> previousIds ) { + final ApplicationScope applicationScope = pipelineContext.getApplicationScope(); + //get the graph manager final GraphManager graphManager = - graphManagerFactory.createEdgeManager( pipelineContext.getApplicationScope() ); + graphManagerFactory.createEdgeManager( applicationScope ); final String edgeName = getEdgeTypeName(); @@ -74,18 +82,60 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, return previousIds.flatMap( previousFilterValue -> { //set our our constant state - final Optional<Edge> startFromCursor = getSeekValue(); + final Optional<MarkedEdge> startFromCursor = getSeekValue(); final Id id = previousFilterValue.getValue(); + final Optional<Edge> typeWrapper = Optional.fromNullable(startFromCursor.orNull()); + + /** + * We do not want to filter. This is intentional DO NOT REMOVE!!! + * + * We want to fire events on these edges if they exist, the delete was missed. + */ final SimpleSearchByEdgeType search = new SimpleSearchByEdgeType( id, edgeName, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, - startFromCursor ); + typeWrapper, false ); /** * TODO, pass a message with pointers to our cursor values to be generated later */ - return graphManager.loadEdgesFromSource( search ) + return graphManager.loadEdgesFromSource( search ).filter(markedEdge -> { + + final boolean isDeleted = markedEdge.isDeleted(); + final boolean isSourceNodeDeleted = markedEdge.isSourceNodeDelete(); + final boolean isTargetNodeDelete = markedEdge.isTargetNodeDeleted(); + + + + if(isDeleted){ + logger.trace( "Edge {} is deleted, queueing the delete event", markedEdge ); + asyncEventService.queueDeleteEdge( applicationScope, markedEdge ); + } + + if(isSourceNodeDeleted){ + final Id sourceNodeId = markedEdge.getSourceNode(); + + logger.trace( "Edge {} has a deleted source node, queueing the delete entity event for id {}", markedEdge, sourceNodeId ); + + asyncEventService.queueEntityDelete( applicationScope, sourceNodeId ); + } + + if(isTargetNodeDelete){ + + final Id targetNodeId = markedEdge.getTargetNode(); + + logger.trace( "Edge {} has a deleted target node, queueing the delete entity event for id {}", markedEdge, targetNodeId ); + + asyncEventService.queueEntityDelete( applicationScope, targetNodeId ); + } + + + //filter if any of them are marked + return !isDeleted && !isSourceNodeDeleted && !isTargetNodeDelete; + + + }) //set the edge state for cursors .doOnNext( edge -> { logger.trace( "Seeking over edge {}", edge ); @@ -100,7 +150,7 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, @Override - protected FilterResult<Id> createFilterResult( final Id emit, final Edge cursorValue, + protected FilterResult<Id> createFilterResult( final Id emit, final MarkedEdge cursorValue, final Optional<EdgePath> parent ) { //if it's our first pass, there's no cursor to generate @@ -113,7 +163,7 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, @Override - protected CursorSerializer<Edge> getCursorSerializer() { + protected CursorSerializer<MarkedEdge> getCursorSerializer() { return EdgeCursorSerializer.INSTANCE; } @@ -131,14 +181,14 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, */ private final class EdgeState { - private Edge cursorEdge = null; - private Edge currentEdge = null; + private MarkedEdge cursorEdge = null; + private MarkedEdge currentEdge = null; /** * Update the pointers */ - private void update( final Edge newEdge ) { + private void update( final MarkedEdge newEdge ) { cursorEdge = currentEdge; currentEdge = newEdge; } @@ -147,7 +197,7 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, /** * Get the edge to use in cursors for resume */ - private Edge getCursorEdge() { + private MarkedEdge getCursorEdge() { return cursorEdge; } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EdgeCursorSerializer.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EdgeCursorSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EdgeCursorSerializer.java index 8d9bf6f..d54e547 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EdgeCursorSerializer.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EdgeCursorSerializer.java @@ -22,20 +22,22 @@ package org.apache.usergrid.corepersistence.pipeline.read.traverse; import org.apache.usergrid.corepersistence.pipeline.cursor.AbstractCursorSerializer; import org.apache.usergrid.persistence.graph.Edge; +import org.apache.usergrid.persistence.graph.MarkedEdge; import org.apache.usergrid.persistence.graph.impl.SimpleEdge; +import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge; /** * Edge cursor serializer */ -public class EdgeCursorSerializer extends AbstractCursorSerializer<Edge> { +public class EdgeCursorSerializer extends AbstractCursorSerializer<MarkedEdge> { public static final EdgeCursorSerializer INSTANCE = new EdgeCursorSerializer(); @Override - protected Class<SimpleEdge> getType() { - return SimpleEdge.class; + protected Class<SimpleMarkedEdge> getType() { + return SimpleMarkedEdge.class; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java index dc39f5c..db5a0a8 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java @@ -20,6 +20,7 @@ package org.apache.usergrid.corepersistence.pipeline.read.traverse; +import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; import org.apache.usergrid.persistence.graph.GraphManagerFactory; import com.google.inject.Inject; @@ -40,8 +41,8 @@ public class ReadGraphCollectionFilter extends AbstractReadGraphFilter { * Create a new instance of our command */ @Inject - public ReadGraphCollectionFilter( final GraphManagerFactory graphManagerFactory, @Assisted final String collectionName ) { - super( graphManagerFactory ); + public ReadGraphCollectionFilter( final GraphManagerFactory graphManagerFactory, final AsyncEventService asyncEventService, @Assisted final String collectionName ) { + super( graphManagerFactory, asyncEventService ); this.collectionName = collectionName; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByTypeFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByTypeFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByTypeFilter.java index 61ba4ad..054a52b 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByTypeFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByTypeFilter.java @@ -26,6 +26,7 @@ import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.GraphManager; import org.apache.usergrid.persistence.graph.GraphManagerFactory; +import org.apache.usergrid.persistence.graph.MarkedEdge; import org.apache.usergrid.persistence.graph.SearchByEdgeType; import org.apache.usergrid.persistence.graph.impl.SimpleSearchByIdType; import org.apache.usergrid.persistence.model.entity.Id; @@ -42,7 +43,7 @@ import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getEdgeType /** * Command for reading graph edges on a connection */ -public class ReadGraphConnectionByTypeFilter extends AbstractPathFilter<Id, Id, Edge>{ +public class ReadGraphConnectionByTypeFilter extends AbstractPathFilter<Id, Id, MarkedEdge>{ private final GraphManagerFactory graphManagerFactory; private final String connectionName; @@ -77,12 +78,14 @@ public class ReadGraphConnectionByTypeFilter extends AbstractPathFilter<Id, Id, return filterResultObservable.flatMap( idFilterResult -> { //set our our constant state - final Optional<Edge> startFromCursor = getSeekValue(); + final Optional<MarkedEdge> startFromCursor = getSeekValue(); final Id id = idFilterResult.getValue(); + final Optional<Edge> typeWrapper = Optional.fromNullable(startFromCursor.orNull()); + final SimpleSearchByIdType search = new SimpleSearchByIdType( id, edgeName, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, - entityType, startFromCursor ); + entityType, typeWrapper ); return graphManager.loadEdgesFromSourceByType( search ).map( edge -> createFilterResult( edge.getTargetNode(), edge, idFilterResult.getPath() )); @@ -91,7 +94,7 @@ public class ReadGraphConnectionByTypeFilter extends AbstractPathFilter<Id, Id, @Override - protected CursorSerializer<Edge> getCursorSerializer() { + protected CursorSerializer<MarkedEdge> getCursorSerializer() { return EdgeCursorSerializer.INSTANCE; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java index 11ec5f8..93e8fd4 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java @@ -20,6 +20,7 @@ package org.apache.usergrid.corepersistence.pipeline.read.traverse; +import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; import org.apache.usergrid.persistence.graph.GraphManagerFactory; import com.google.inject.Inject; @@ -40,8 +41,8 @@ public class ReadGraphConnectionFilter extends AbstractReadGraphFilter { * Create a new instance of our command */ @Inject - public ReadGraphConnectionFilter( final GraphManagerFactory graphManagerFactory, @Assisted final String connectionName ) { - super( graphManagerFactory ); + public ReadGraphConnectionFilter( final GraphManagerFactory graphManagerFactory, final AsyncEventService asyncEventService, @Assisted final String connectionName ) { + super( graphManagerFactory, asyncEventService ); this.connectionName = connectionName; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java index 2863cbf..74f9ce0 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.UUID; import org.apache.usergrid.persistence.EntityManagerFactory; +import org.apache.usergrid.persistence.graph.MarkedEdge; import org.apache.usergrid.persistence.index.*; import org.junit.Before; import org.junit.Rule; @@ -135,7 +136,7 @@ public abstract class AsyncIndexServiceTest { */ - final List<Edge> connectionSearchEdges = Observable.range( 0, 500 ).flatMap(integer -> { + final List<MarkedEdge> connectionSearchEdges = Observable.range( 0, 500 ).flatMap(integer -> { final Id connectingId = createId("connecting"); final Edge edge = CpNamingUtils.createConnectionEdge(connectingId, "likes", testEntity.getId()); http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java index 6001dd4..90d6c5a 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Set; import java.util.UUID; +import org.apache.usergrid.persistence.graph.MarkedEdge; import org.apache.usergrid.persistence.index.impl.IndexProducer; import org.junit.Before; import org.junit.Test; @@ -244,7 +245,7 @@ public class IndexServiceTest { // final int edgeCount = indexFig.getIndexBatchSize()*2; final int edgeCount = 100; - final List<Edge> connectionSearchEdges = Observable.range( 0, edgeCount ).flatMap( integer -> { + final List<MarkedEdge> connectionSearchEdges = Observable.range( 0, edgeCount ).flatMap( integer -> { final Id connectingId = createId( "connecting" ); final Edge edge = CpNamingUtils.createConnectionEdge( connectingId, "likes", testEntity.getId() ); @@ -377,7 +378,8 @@ public class IndexServiceTest { //Write multiple connection edges final int edgeCount = 5; - final List<Edge> connectionSearchEdges = createConnectionSearchEdges( testEntity, graphManager, edgeCount ); + final List<MarkedEdge> + connectionSearchEdges = createConnectionSearchEdges( testEntity, graphManager, edgeCount ); indexService.indexEntity( applicationScope, testEntity ).flatMap(mesage -> indexProducer.put(mesage)).toBlocking().getIterator(); @@ -485,10 +487,10 @@ public class IndexServiceTest { } - private List<Edge> createConnectionSearchEdges( - final Entity testEntity, final GraphManager graphManager, final int edgeCount ) { + private List<MarkedEdge> createConnectionSearchEdges( final Entity testEntity, final GraphManager graphManager, + final int edgeCount ) { - final List<Edge> connectionSearchEdges = Observable.range( 0, edgeCount ).flatMap( integer -> { + final List<MarkedEdge> connectionSearchEdges = Observable.range( 0, edgeCount ).flatMap( integer -> { //create our connection edge. final Id connectingId = createId( "connecting" ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java index 7128dcf..c9dcbf1 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java @@ -28,7 +28,9 @@ import org.apache.usergrid.corepersistence.pipeline.read.EdgePath; import org.apache.usergrid.corepersistence.pipeline.read.search.ElasticsearchCursorSerializer; import org.apache.usergrid.corepersistence.pipeline.read.traverse.EdgeCursorSerializer; import org.apache.usergrid.persistence.graph.Edge; +import org.apache.usergrid.persistence.graph.MarkedEdge; import org.apache.usergrid.persistence.graph.impl.SimpleEdge; +import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge; import com.google.common.base.Optional; @@ -40,19 +42,12 @@ public class CursorTest { @Test public void testCursors(){ + //test encoding edge + final MarkedEdge edge1 = new SimpleMarkedEdge( createId("source1"), "edgeType1", createId("target1"), 100, false, false, false ); - - - - - //test encoding edge - - final Edge edge1 = new SimpleEdge( createId("source1"), "edgeType1", createId("target1"), 100 ); - - - final Edge edge2 = new SimpleEdge( createId("source2"), "edgeType2", createId("target2"), 110 ); + final MarkedEdge edge2 = new SimpleMarkedEdge( createId("source2"), "edgeType2", createId("target2"), 110, false, false, false ); @@ -64,11 +59,12 @@ public class CursorTest { final EdgePath<Integer> filter3Path = new EdgePath<>( 3, query2, ElasticsearchCursorSerializer.INSTANCE, Optional.absent() ); - final EdgePath<Edge> filter2Path = new EdgePath<Edge>(2, edge2, EdgeCursorSerializer.INSTANCE, Optional.of( filter3Path )); + final EdgePath<MarkedEdge> filter2Path = + new EdgePath<>( 2, edge2, EdgeCursorSerializer.INSTANCE, Optional.of( filter3Path ) ); final EdgePath<Integer> filter1Path = new EdgePath<>( 1, query1, ElasticsearchCursorSerializer.INSTANCE, Optional.of(filter2Path) ); - final EdgePath<Edge> filter0Path = new EdgePath<>( 0, edge1, EdgeCursorSerializer.INSTANCE, Optional.of( filter1Path ) ); + final EdgePath<MarkedEdge> filter0Path = new EdgePath<>( 0, edge1, EdgeCursorSerializer.INSTANCE, Optional.of( filter1Path ) ); @@ -91,7 +87,7 @@ public class CursorTest { assertEquals(query2, parsedQuery2); - final Edge parsedEdge2 = requestCursor.getCursor( 2, EdgeCursorSerializer.INSTANCE ); + final MarkedEdge parsedEdge2 = requestCursor.getCursor( 2, EdgeCursorSerializer.INSTANCE ); assertEquals( edge2, parsedEdge2 ); @@ -100,7 +96,7 @@ public class CursorTest { assertEquals( query1, parsedQuery1 ); - final Edge parsedEdge1 = requestCursor.getCursor( 0, EdgeCursorSerializer.INSTANCE ); + final MarkedEdge parsedEdge1 = requestCursor.getCursor( 0, EdgeCursorSerializer.INSTANCE ); assertEquals(edge1, parsedEdge1); http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java index 326e128..6929d87 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java @@ -36,6 +36,7 @@ import org.apache.usergrid.persistence.core.test.UseModules; import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.GraphManager; import org.apache.usergrid.persistence.graph.GraphManagerFactory; +import org.apache.usergrid.persistence.graph.MarkedEdge; import org.apache.usergrid.persistence.graph.SearchByEdge; import org.apache.usergrid.persistence.graph.SearchByEdgeType; import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge; @@ -118,7 +119,7 @@ public class ConnectionServiceImplTest { new SimpleSearchByEdge( source, connectionEdge.getType(), target, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.absent() ); - final List<Edge> edges = gm.loadEdgeVersions( simpleSearchByEdge ).toList().toBlocking().last(); + final List<MarkedEdge> edges = gm.loadEdgeVersions( simpleSearchByEdge ).toList().toBlocking().last(); assertEquals( 1, edges.size() ); @@ -209,7 +210,7 @@ public class ConnectionServiceImplTest { SearchByEdgeType.Order.DESCENDING, Optional.absent() ); //check only 1 exists - final List<Edge> edges = gm.loadEdgeVersions( simpleSearchByEdge ).toList().toBlocking().last(); + final List<MarkedEdge> edges = gm.loadEdgeVersions( simpleSearchByEdge ).toList().toBlocking().last(); assertEquals( 1, edges.size() ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java index d870114..658d3eb 100644 --- a/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java +++ b/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java @@ -30,6 +30,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.GraphManager; import org.apache.usergrid.persistence.graph.GraphManagerFactory; +import org.apache.usergrid.persistence.graph.MarkedEdge; import org.apache.usergrid.persistence.graph.SearchByEdgeType; import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge; import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType; @@ -86,7 +87,8 @@ public class ApplicationServiceIT extends AbstractCoreIT { , Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ); - Iterator<Edge> results = graphManager.loadEdgesFromSource(simpleSearchByEdgeType).toBlocking().getIterator(); + Iterator<MarkedEdge> + results = graphManager.loadEdgesFromSource(simpleSearchByEdgeType).toBlocking().getIterator(); if(results.hasNext()){ Assert.fail("should be empty"); http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java index 6100725..000c633 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java @@ -59,7 +59,7 @@ public interface GraphManager extends CPManager { * Create or update an edge. Note that the implementation should also create incoming (reversed) edges for this * edge. */ - Observable<Edge> writeEdge( Edge edge ); + Observable<MarkedEdge> writeEdge( Edge edge ); /** @@ -68,7 +68,7 @@ public interface GraphManager extends CPManager { * * Implementation should also mark the incoming (reversed) edge. Only marks the specific version */ - Observable<Edge> markEdge( Edge edge ); + Observable<MarkedEdge> markEdge( Edge edge ); /** * @param edge Remove the edge in the graph @@ -98,7 +98,7 @@ public interface GraphManager extends CPManager { /** * Get all versions of this edge where versions <= max version */ - Observable<Edge> loadEdgeVersions( SearchByEdge edge ); + Observable<MarkedEdge> loadEdgeVersions( SearchByEdge edge ); /** * Returns an observable that emits all edges where the specified node is the source node. The edges will match the @@ -108,7 +108,7 @@ public interface GraphManager extends CPManager { * * @return An observable that emits Edges. The observer will need to unsubscribe when it has completed consumption. */ - Observable<Edge> loadEdgesFromSource( SearchByEdgeType search ); + Observable<MarkedEdge> loadEdgesFromSource( SearchByEdgeType search ); /** * Returns an observable that emits all edges where the specified node is the target node. The edges will match the @@ -118,7 +118,7 @@ public interface GraphManager extends CPManager { * * @return An observable that emits Edges. The observer will need to unsubscribe when it has completed consumption. */ - Observable<Edge> loadEdgesToTarget( SearchByEdgeType search ); + Observable<MarkedEdge> loadEdgesToTarget( SearchByEdgeType search ); /** @@ -129,7 +129,7 @@ public interface GraphManager extends CPManager { * * @return An observable that emits Edges. The observer will need to unsubscribe when it has completed consumption. */ - Observable<Edge> loadEdgesFromSourceByType( SearchByIdType search ); + Observable<MarkedEdge> loadEdgesFromSourceByType( SearchByIdType search ); /** @@ -140,7 +140,7 @@ public interface GraphManager extends CPManager { * * @return An observable that emits Edges. The observer will need to unsubscribe when it has completed consumption. */ - Observable<Edge> loadEdgesToTargetByType( SearchByIdType search ); + Observable<MarkedEdge> loadEdgesToTargetByType( SearchByIdType search ); /** * Get all edge types to this node. The node provided by search is the target node. http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java index da6fedb..88809e2 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java @@ -30,6 +30,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize; * An edge. With the additional info of if it is marked for deletion * */ +@JsonDeserialize(as = SimpleMarkedEdge.class) public interface MarkedEdge extends Edge{ /** http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java index 1bcb398..93ae753 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java @@ -144,12 +144,12 @@ public class GraphManagerImpl implements GraphManager { @Override - public Observable<Edge> writeEdge( final Edge edge ) { + public Observable<MarkedEdge> writeEdge( final Edge edge ) { GraphValidation.validateEdge( edge ); final MarkedEdge markedEdge = new SimpleMarkedEdge( edge, false ); - final Observable<Edge> observable = Observable.just( markedEdge ).map( edge1 -> { + final Observable<MarkedEdge> observable = Observable.just( markedEdge ).map( edge1 -> { final UUID timestamp = UUIDGenerator.newTimeUUID(); @@ -175,12 +175,12 @@ public class GraphManagerImpl implements GraphManager { @Override - public Observable<Edge> markEdge( final Edge edge ) { + public Observable<MarkedEdge> markEdge( final Edge edge ) { GraphValidation.validateEdge( edge ); final MarkedEdge markedEdge = new SimpleMarkedEdge( edge, true ); - final Observable<Edge> observable = Observable.just( markedEdge ).map( edge1 -> { + final Observable<MarkedEdge> observable = Observable.just( markedEdge ).map( edge1 -> { final UUID timestamp = UUIDGenerator.newTimeUUID(); @@ -282,9 +282,9 @@ public class GraphManagerImpl implements GraphManager { @Override - public Observable<Edge> loadEdgeVersions( final SearchByEdge searchByEdge ) { + public Observable<MarkedEdge> loadEdgeVersions( final SearchByEdge searchByEdge ) { - final Observable<Edge> edges = + final Observable<MarkedEdge> edges = Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) { @Override protected Iterator<MarkedEdge> getIterator() { @@ -298,8 +298,8 @@ public class GraphManagerImpl implements GraphManager { @Override - public Observable<Edge> loadEdgesFromSource( final SearchByEdgeType search ) { - final Observable<Edge> edges = + public Observable<MarkedEdge> loadEdgesFromSource( final SearchByEdgeType search ) { + final Observable<MarkedEdge> edges = Observable.create( new ObservableIterator<MarkedEdge>( "loadEdgesFromSource" ) { @Override protected Iterator<MarkedEdge> getIterator() { @@ -313,8 +313,8 @@ public class GraphManagerImpl implements GraphManager { @Override - public Observable<Edge> loadEdgesToTarget( final SearchByEdgeType search ) { - final Observable<Edge> edges = + public Observable<MarkedEdge> loadEdgesToTarget( final SearchByEdgeType search ) { + final Observable<MarkedEdge> edges = Observable.create( new ObservableIterator<MarkedEdge>( "loadEdgesToTarget" ) { @Override protected Iterator<MarkedEdge> getIterator() { @@ -329,8 +329,8 @@ public class GraphManagerImpl implements GraphManager { @Override - public Observable<Edge> loadEdgesFromSourceByType( final SearchByIdType search ) { - final Observable<Edge> edges = + public Observable<MarkedEdge> loadEdgesFromSourceByType( final SearchByIdType search ) { + final Observable<MarkedEdge> edges = Observable.create( new ObservableIterator<MarkedEdge>( "loadEdgesFromSourceByType" ) { @Override protected Iterator<MarkedEdge> getIterator() { @@ -344,8 +344,8 @@ public class GraphManagerImpl implements GraphManager { @Override - public Observable<Edge> loadEdgesToTargetByType( final SearchByIdType search ) { - final Observable<Edge> edges = + public Observable<MarkedEdge> loadEdgesToTargetByType( final SearchByIdType search ) { + final Observable<MarkedEdge> edges = Observable.create( new ObservableIterator<MarkedEdge>( "loadEdgesToTargetByType" ) { @Override protected Iterator<MarkedEdge> getIterator() { @@ -480,7 +480,7 @@ public class GraphManagerImpl implements GraphManager { } //if any one of these is true, we filter it - return !(simpleMarkedEdge.isDeleted() || simpleMarkedEdge.isSourceNodeDelete() || simpleMarkedEdge.isTargetNodeDeleted()); + return !simpleMarkedEdge.isDeleted() && !simpleMarkedEdge.isSourceNodeDelete() && !simpleMarkedEdge.isTargetNodeDeleted(); }); } ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java index c6dc2e4..9c35e2e 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java @@ -29,50 +29,61 @@ import com.fasterxml.jackson.annotation.JsonIgnore; /** * Simple bean to represent our edge + * * @author tnine */ -public class SimpleMarkedEdge extends SimpleEdge implements MarkedEdge { +public class SimpleMarkedEdge extends SimpleEdge implements MarkedEdge { + + private boolean isDeleted; + private boolean isSourceNodeDeleted; + private boolean isTargetNodeDeleted; - private final boolean deleted; - private final boolean isSourceNodeDeleted; - private final boolean isTargetNodeDeleted; + /** + * Unused but required for Jackson + */ + public SimpleMarkedEdge() { + } - public SimpleMarkedEdge( final Id sourceNode, final String type, final Id targetNode, final long timestamp, final boolean deleted) { - this( sourceNode, type, targetNode, timestamp, deleted, false, false ); + public SimpleMarkedEdge( final Id sourceNode, final String type, final Id targetNode, final long timestamp, + final boolean isDeleted ) { + + this( sourceNode, type, targetNode, timestamp, isDeleted, false, false ); } public SimpleMarkedEdge( final Id sourceNode, final String type, final Id targetNode, final long timestamp, - final boolean deleted, final boolean isSourceNodeDeleted, + final boolean isDeleted, final boolean isSourceNodeDeleted, final boolean isTargetNodeDeleted ) { super( sourceNode, type, targetNode, timestamp ); - this.deleted = deleted; + this.isDeleted = isDeleted; this.isSourceNodeDeleted = isSourceNodeDeleted; this.isTargetNodeDeleted = isTargetNodeDeleted; } - public SimpleMarkedEdge(final Edge edge, final boolean deleted){ - this(edge.getSourceNode(), edge.getType(), edge.getTargetNode(), edge.getTimestamp(), deleted); + public SimpleMarkedEdge( final Edge edge, final boolean isDeleted ) { + this( edge.getSourceNode(), edge.getType(), edge.getTargetNode(), edge.getTimestamp(), isDeleted ); } @Override @JsonIgnore public boolean isDeleted() { - return deleted; + return isDeleted; } @Override + @JsonIgnore public boolean isSourceNodeDelete() { return isSourceNodeDeleted; } @Override + @JsonIgnore public boolean isTargetNodeDeleted() { return isTargetNodeDeleted; } @@ -92,7 +103,7 @@ public class SimpleMarkedEdge extends SimpleEdge implements MarkedEdge { final SimpleMarkedEdge that = ( SimpleMarkedEdge ) o; - if ( deleted != that.deleted ) { + if ( isDeleted != that.isDeleted ) { return false; } @@ -103,7 +114,7 @@ public class SimpleMarkedEdge extends SimpleEdge implements MarkedEdge { @Override public int hashCode() { int result = super.hashCode(); - result = 31 * result + ( deleted ? 1 : 0 ); + result = 31 * result + ( isDeleted ? 1 : 0 ); result = 31 * result + ( isSourceNodeDeleted ? 1 : 0 ); result = 31 * result + ( isTargetNodeDeleted ? 1 : 0 ); return result; @@ -113,7 +124,7 @@ public class SimpleMarkedEdge extends SimpleEdge implements MarkedEdge { @Override public String toString() { return "SimpleMarkedEdge{" + - "deleted=" + deleted + + "deleted=" + isDeleted + ", isSourceNodeDeleted=" + isSourceNodeDeleted + ", isTargetNodeDeleted=" + isTargetNodeDeleted + "} " + super.toString(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java index eda3a02..9a95c26 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java @@ -95,7 +95,7 @@ public class GraphManagerIT { SearchByEdgeType search = createSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTimestamp(), null ); - Observable<Edge> edges = gm.loadEdgesFromSource( search ); + Observable<MarkedEdge> edges = gm.loadEdgesFromSource( search ); //implicitly blows up if more than 1 is returned from "single" Edge returned = edges.toBlocking().last(); @@ -127,7 +127,7 @@ public class GraphManagerIT { SearchByEdgeType search = createSearchByEdge( edge.getTargetNode(), edge.getType(), edge.getTimestamp(), null ); - Observable<Edge> edges = gm.loadEdgesToTarget( search ); + Observable<MarkedEdge> edges = gm.loadEdgesToTarget( search ); //implicitly blows up if more than 1 is returned from "single" Edge returned = edges.toBlocking().single(); @@ -161,7 +161,7 @@ public class GraphManagerIT { SearchByEdgeType search = createSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTimestamp(), null ); - Observable<Edge> edges = gm.loadEdgesFromSource( search ); + Observable<MarkedEdge> edges = gm.loadEdgesFromSource( search ); //implicitly blows up if more than 1 is returned from "single" Edge returned = edges.toBlocking().single(); @@ -196,7 +196,7 @@ public class GraphManagerIT { SearchByEdgeType search = createSearchByEdge( edge.getTargetNode(), edge.getType(), edge.getTimestamp(), null ); - Observable<Edge> edges = gm.loadEdgesToTarget( search ); + Observable<MarkedEdge> edges = gm.loadEdgesToTarget( search ); //implicitly blows up if more than 1 is returned from "single" Edge returned = edges.toBlocking().single(); @@ -248,10 +248,10 @@ public class GraphManagerIT { SearchByEdgeType search = createSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge3.getTimestamp(), null ); - Observable<Edge> edges = gm.loadEdgesFromSource( search ); + Observable<MarkedEdge> edges = gm.loadEdgesFromSource( search ); //implicitly blows up if more than 1 is returned from "single" - Iterator<Edge> returned = edges.toBlocking().getIterator(); + Iterator<MarkedEdge> returned = edges.toBlocking().getIterator(); assertEquals( "Correct edge returned", edge3, returned.next() ); assertEquals( "Correct edge returned", edge2, returned.next() ); @@ -321,10 +321,10 @@ public class GraphManagerIT { SearchByEdgeType search = createSearchByEdge( edge1.getTargetNode(), edge1.getType(), edge3.getTimestamp(), null ); - Observable<Edge> edges = gm.loadEdgesToTarget( search ); + Observable<MarkedEdge> edges = gm.loadEdgesToTarget( search ); //implicitly blows up if more than 1 is returned from "single" - Iterator<Edge> returned = edges.toBlocking().getIterator(); + Iterator<MarkedEdge> returned = edges.toBlocking().getIterator(); assertEquals( "Correct edge returned", edge3, returned.next() ); assertEquals( "Correct edge returned", edge2, returned.next() ); @@ -387,10 +387,10 @@ public class GraphManagerIT { SearchByEdgeType search = createSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge3.getTimestamp(), null ); - Observable<Edge> edges = gm.loadEdgesFromSource( search ); + Observable<MarkedEdge> edges = gm.loadEdgesFromSource( search ); //implicitly blows up if more than 1 is returned from "single" - Iterator<Edge> returned = edges.toBlocking().getIterator(); + Iterator<MarkedEdge> returned = edges.toBlocking().getIterator(); //we have 3 edges, but we specified our first edge as the max, we shouldn't get any more results than the first @@ -443,10 +443,10 @@ public class GraphManagerIT { SearchByEdgeType search = createSearchByEdge( edge1.getTargetNode(), edge1.getType(), edge3.getTimestamp(), null ); - Observable<Edge> edges = gm.loadEdgesToTarget( search ); + Observable<MarkedEdge> edges = gm.loadEdgesToTarget( search ); //implicitly blows up if more than 1 is returned from "single" - Iterator<Edge> returned = edges.toBlocking().getIterator(); + Iterator<MarkedEdge> returned = edges.toBlocking().getIterator(); //we have 3 edges, but we specified our first edge as the max, we shouldn't get any more results than the first @@ -487,7 +487,7 @@ public class GraphManagerIT { SearchByIdType search = createSearchByEdgeAndId( edge.getSourceNode(), edge.getType(), edge.getTimestamp(), edge.getTargetNode().getType(), null ); - Observable<Edge> edges = gm.loadEdgesFromSourceByType( search ); + Observable<MarkedEdge> edges = gm.loadEdgesFromSourceByType( search ); //implicitly blows up if more than 1 is returned from "single" Edge returned = edges.toBlocking().single(); @@ -524,7 +524,7 @@ public class GraphManagerIT { SearchByIdType search = createSearchByEdgeAndId( edge.getTargetNode(), edge.getType(), edge.getTimestamp(), edge.getSourceNode().getType(), null ); - Observable<Edge> edges = gm.loadEdgesToTargetByType( search ); + Observable<MarkedEdge> edges = gm.loadEdgesToTargetByType( search ); //implicitly blows up if more than 1 is returned from "single" Edge returned = edges.toBlocking().single(); @@ -560,7 +560,7 @@ public class GraphManagerIT { SearchByEdgeType search = createSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTimestamp(), null ); - Observable<Edge> edges = gm.loadEdgesFromSource( search ); + Observable<MarkedEdge> edges = gm.loadEdgesFromSource( search ); //implicitly blows up if more than 1 is returned from "single" Edge returned = edges.toBlocking().single(); @@ -629,7 +629,7 @@ public class GraphManagerIT { SearchByEdgeType search = createSearchByEdge( edge.getTargetNode(), edge.getType(), edge.getTimestamp(), null ); - Observable<Edge> edges = gm.loadEdgesToTarget( search ); + Observable<MarkedEdge> edges = gm.loadEdgesToTarget( search ); //implicitly blows up if more than 1 is returned from "single" Edge returned = edges.toBlocking().single(); @@ -988,11 +988,11 @@ public class GraphManagerIT { //get our 2 edges - Observable<Edge> edges = + Observable<MarkedEdge> edges = gm.loadEdgesFromSource( createSearchByEdge( edge1.getSourceNode(), edge1.getType(), maxVersion, null ) ); - Iterator<Edge> results = edges.toBlocking().getIterator(); + Iterator<MarkedEdge> results = edges.toBlocking().getIterator(); assertEquals( "Edges correct", edge2, results.next() ); @@ -1060,11 +1060,11 @@ public class GraphManagerIT { //get our 2 edges - Observable<Edge> edges = + Observable<MarkedEdge> edges = gm.loadEdgesToTarget( createSearchByEdge( edge1.getTargetNode(), edge1.getType(), maxVersion, null ) ); - Iterator<Edge> results = edges.toBlocking().getIterator(); + Iterator<MarkedEdge> results = edges.toBlocking().getIterator(); assertEquals( "Edges correct", edge2, results.next() ); @@ -1140,11 +1140,11 @@ public class GraphManagerIT { //get our 2 edges - Observable<Edge> edges = gm.loadEdgesFromSource( + Observable<MarkedEdge> edges = gm.loadEdgesFromSource( createSearchByEdgeUnfiltered( edge1.getSourceNode(), edge1.getType(), maxVersion, null ) ); - Iterator<Edge> results = edges.toBlocking().getIterator(); + Iterator<MarkedEdge> results = edges.toBlocking().getIterator(); assertEquals( "Edges correct", edge2.getTargetNode(), results.next().getTargetNode() ); @@ -1211,11 +1211,11 @@ public class GraphManagerIT { gm.markEdge( edge1 ).toBlocking().last(); //get our 2 edges - Observable<Edge> edges = gm.loadEdgesToTarget( + Observable<MarkedEdge> edges = gm.loadEdgesToTarget( createSearchByEdgeUnfiltered( edge1.getTargetNode(), edge1.getType(), maxVersion, null ) ); - Iterator<Edge> results = edges.toBlocking().getIterator(); + Iterator<MarkedEdge> results = edges.toBlocking().getIterator(); assertEquals( "Edges correct", edge2.getSourceNode(), results.next().getSourceNode() ); @@ -1282,11 +1282,11 @@ public class GraphManagerIT { //get our 2 edges - Observable<Edge> edges = gm.loadEdgesFromSourceByType( + Observable<MarkedEdge> edges = gm.loadEdgesFromSourceByType( createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId1.getType(), null ) ); - Iterator<Edge> results = edges.toBlocking().getIterator(); + Iterator<MarkedEdge> results = edges.toBlocking().getIterator(); assertEquals( "Edges correct", edge1, results.next() ); @@ -1359,11 +1359,11 @@ public class GraphManagerIT { final long maxVersion = System.currentTimeMillis(); //get our 2 edges - Observable<Edge> edges = gm.loadEdgesToTargetByType( + Observable<MarkedEdge> edges = gm.loadEdgesToTargetByType( createSearchByEdgeAndId( targetId, edge1.getType(), maxVersion, sourceId1.getType(), null ) ); - Iterator<Edge> results = edges.toBlocking().getIterator(); + Iterator<MarkedEdge> results = edges.toBlocking().getIterator(); assertEquals( "Edges correct", edge1, results.next() ); @@ -1435,7 +1435,7 @@ public class GraphManagerIT { final long maxVersion = System.currentTimeMillis(); - Iterator<Edge> results = + Iterator<MarkedEdge> results = gm.loadEdgesFromSource( createSearchByEdge( sourceId, edge1.getType(), maxVersion, null ) ).toBlocking() .getIterator(); @@ -1547,7 +1547,7 @@ public class GraphManagerIT { final long maxVersion = System.currentTimeMillis(); - Iterator<Edge> results = + Iterator<MarkedEdge> results = gm.loadEdgesFromSource( createSearchByEdge( sourceId, edge1.getType(), maxVersion, null ) ).toBlocking() .getIterator(); @@ -1627,7 +1627,7 @@ public class GraphManagerIT { final long maxVersion = System.currentTimeMillis(); - Iterator<Edge> results = + Iterator<MarkedEdge> results = gm.loadEdgesToTarget( createSearchByEdge( targetId, edge1.getType(), maxVersion, null ) ).toBlocking() .getIterator(); @@ -1764,7 +1764,7 @@ public class GraphManagerIT { final long maxVersion = System.currentTimeMillis(); - Iterator<Edge> results = + Iterator<MarkedEdge> results = gm.loadEdgesToTarget( createSearchByEdge( targetId, edge1.getType(), maxVersion, null ) ).toBlocking() .getIterator(); @@ -1842,7 +1842,7 @@ public class GraphManagerIT { final long maxVersion = System.currentTimeMillis(); - Iterator<Edge> results = + Iterator<MarkedEdge> results = gm.loadEdgesFromSource( createSearchByEdge( sourceId, edge1.getType(), maxVersion, null ) ).toBlocking() .getIterator(); @@ -1983,7 +1983,7 @@ public class GraphManagerIT { final long maxVersion = System.currentTimeMillis(); - Iterator<Edge> results = + Iterator<MarkedEdge> results = gm.loadEdgesToTarget( createSearchByEdge( targetId, edge1.getType(), maxVersion, null ) ).toBlocking() .getIterator(); @@ -2356,10 +2356,10 @@ public class GraphManagerIT { new SimpleSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge1.getTargetNode(), Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ); - final Observable<Edge> edgesDescending = gm.loadEdgeVersions( searchDescending ); + final Observable<MarkedEdge> edgesDescending = gm.loadEdgeVersions( searchDescending ); //search descending - final List<Edge> descending = edgesDescending.toList().toBlocking().single(); + final List<MarkedEdge> descending = edgesDescending.toList().toBlocking().single(); assertEquals( "Correct size returned", 3, descending.size() ); @@ -2376,9 +2376,9 @@ public class GraphManagerIT { new SimpleSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge1.getTargetNode(), 0, SearchByEdgeType.Order.ASCENDING, Optional.<Edge>absent() ); - Observable<Edge> edgesAscending = gm.loadEdgeVersions( searchAscending ); + Observable<MarkedEdge> edgesAscending = gm.loadEdgeVersions( searchAscending ); - List<Edge> ascending = edgesAscending.toList().toBlocking().single(); + List<MarkedEdge> ascending = edgesAscending.toList().toBlocking().single(); assertEquals( "Correct size returned", 3, ascending.size() ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerLoadTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerLoadTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerLoadTest.java index b922e7c..22683f6 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerLoadTest.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerLoadTest.java @@ -113,7 +113,7 @@ public class GraphManagerLoadTest { @Override - public Observable<Edge> doSearch( final GraphManager manager ) { + public Observable<MarkedEdge> doSearch( final GraphManager manager ) { return manager.loadEdgesFromSource( new SimpleSearchByEdgeType( sourceId, "test", System.currentTimeMillis(), SearchByEdgeType.Order.DESCENDING, Optional .<Edge>absent()) ); } @@ -141,7 +141,7 @@ public class GraphManagerLoadTest { @Override - public Observable<Edge> doSearch( final GraphManager manager ) { + public Observable<MarkedEdge> doSearch( final GraphManager manager ) { return manager.loadEdgesToTarget( new SimpleSearchByEdgeType( targetId, "test", System.currentTimeMillis(), SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) ); } }; @@ -220,7 +220,7 @@ public class GraphManagerLoadTest { final CountDownLatch latch = new CountDownLatch( 1 ); - generator.doSearch( manager ).take( readCount ).buffer( 1000 ).subscribe( new Subscriber<List<Edge>>() { + generator.doSearch( manager ).take( readCount ).buffer( 1000 ).subscribe( new Subscriber<List<MarkedEdge>>() { @Override public void onCompleted() { timer.stop(); @@ -235,7 +235,7 @@ public class GraphManagerLoadTest { @Override - public void onNext( final List<Edge> edges ) { + public void onNext( final List<MarkedEdge> edges ) { log.info("Read {} edges", edges.size()); } } ); @@ -263,6 +263,6 @@ public class GraphManagerLoadTest { * @param manager * @return */ - public Observable<Edge> doSearch( final GraphManager manager ); + public Observable<MarkedEdge> doSearch( final GraphManager manager ); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java index c1917bb..6aad289 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java @@ -176,7 +176,7 @@ public class GraphManagerShardConsistencyIT { @Override - public Observable<Edge> doSearch( final GraphManager manager ) { + public Observable<MarkedEdge> doSearch( final GraphManager manager ) { return manager.loadEdgesFromSource( new SimpleSearchByEdgeType( sourceId, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) ); @@ -409,7 +409,7 @@ public class GraphManagerShardConsistencyIT { @Override - public Observable<Edge> doSearch( final GraphManager manager ) { + public Observable<MarkedEdge> doSearch( final GraphManager manager ) { return manager.loadEdgesFromSource( new SimpleSearchByEdgeType( sourceId, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent(), false ) ); @@ -729,7 +729,7 @@ public class GraphManagerShardConsistencyIT { /** * Perform the search returning an observable edge */ - public Observable<Edge> doSearch( final GraphManager manager ); + public Observable<MarkedEdge> doSearch( final GraphManager manager ); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerStressTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerStressTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerStressTest.java index 2889684..6a2efc9 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerStressTest.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerStressTest.java @@ -107,25 +107,25 @@ public class GraphManagerStressTest { @Override - public Observable<Edge> doSearch( final GraphManager manager ) { + public Observable<MarkedEdge> doSearch( final GraphManager manager ) { final long timestamp = System.currentTimeMillis(); - return Observable.create( new Observable.OnSubscribe<Edge>() { + return Observable.create( new Observable.OnSubscribe<MarkedEdge>() { @Override - public void call( final Subscriber<? super Edge> subscriber ) { + public void call( final Subscriber<? super MarkedEdge> subscriber ) { try { for ( Id sourceId : sourceIds ) { - final Iterable<Edge> edges = manager.loadEdgesFromSource( + final Iterable<MarkedEdge> edges = manager.loadEdgesFromSource( new SimpleSearchByEdgeType( sourceId, "test", timestamp, SearchByEdgeType.Order.DESCENDING, Optional .<Edge>absent() ) ) .toBlocking().toIterable(); - for ( Edge edge : edges ) { + for ( MarkedEdge edge : edges ) { log.debug( "Firing on next for edge {}", edge ); subscriber.onNext( edge ); @@ -195,7 +195,7 @@ public class GraphManagerStressTest { @Override - public Observable<Edge> doSearch( final GraphManager manager ) { + public Observable<MarkedEdge> doSearch( final GraphManager manager ) { return manager.loadEdgesFromSource( new SimpleSearchByEdgeType( sourceId, "test", System.currentTimeMillis(), SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) ); } }; @@ -222,7 +222,7 @@ public class GraphManagerStressTest { @Override - public Observable<Edge> doSearch( final GraphManager manager ) { + public Observable<MarkedEdge> doSearch( final GraphManager manager ) { return manager.loadEdgesToTarget( new SimpleSearchByEdgeType( targetId, "test", System.currentTimeMillis(), SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) ); } @@ -307,6 +307,6 @@ public class GraphManagerStressTest { */ public Edge newEdge(); - public Observable<Edge> doSearch( final GraphManager manager ); + public Observable<MarkedEdge> doSearch( final GraphManager manager ); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d4f22c5/stack/services/src/main/java/org/apache/usergrid/management/AppInfoMigrationPlugin.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/management/AppInfoMigrationPlugin.java b/stack/services/src/main/java/org/apache/usergrid/management/AppInfoMigrationPlugin.java index d9d3d0d..6e84601 100644 --- a/stack/services/src/main/java/org/apache/usergrid/management/AppInfoMigrationPlugin.java +++ b/stack/services/src/main/java/org/apache/usergrid/management/AppInfoMigrationPlugin.java @@ -47,6 +47,7 @@ import org.apache.usergrid.persistence.exceptions.ApplicationAlreadyExistsExcept import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.GraphManager; import org.apache.usergrid.persistence.graph.GraphManagerFactory; +import org.apache.usergrid.persistence.graph.MarkedEdge; import org.apache.usergrid.persistence.graph.SearchByEdgeType; import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge; import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType; @@ -239,7 +240,7 @@ public class AppInfoMigrationPlugin implements MigrationPlugin { final EntityCollectionManager managementCollectionManager = entityCollectionManagerFactory.createCollectionManager( managementAppScope ); - Observable<Edge> edgesObservable = getApplicationInfoEdges( appId ); + Observable<MarkedEdge> edgesObservable = getApplicationInfoEdges( appId ); //get the graph for all app infos Observable<org.apache.usergrid.persistence.model.entity.Entity> entityObs = edgesObservable.flatMap( edge -> { final Id appInfoId = edge.getTargetNode(); @@ -299,7 +300,7 @@ public class AppInfoMigrationPlugin implements MigrationPlugin { } - public Observable<Edge> getApplicationInfoEdges( final UUID applicationId ) { + public Observable<MarkedEdge> getApplicationInfoEdges( final UUID applicationId ) { final ApplicationScope managementAppScope = getApplicationScope( CpNamingUtils.MANAGEMENT_APPLICATION_ID ); final GraphManager gm = graphManagerFactory.createEdgeManager( managementAppScope );
