Added back pressure block as a test
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/f73ac4c7 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/f73ac4c7 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/f73ac4c7 Branch: refs/heads/master Commit: f73ac4c724088150bc1e0315942e93306321a72a Parents: d35fea5 Author: Todd Nine <[email protected]> Authored: Fri Oct 23 10:45:10 2015 -0600 Committer: Todd Nine <[email protected]> Committed: Fri Oct 23 10:50:27 2015 -0600 ---------------------------------------------------------------------- .../corepersistence/asyncevents/EventBuilderImpl.java | 3 ++- .../usergrid/corepersistence/pipeline/read/FilterResult.java | 7 +++++++ .../pipeline/read/traverse/AbstractReadGraphFilter.java | 2 +- .../usergrid/persistence/graph/impl/GraphManagerImpl.java | 8 ++++---- 4 files changed, 14 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/f73ac4c7/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java index cc0356b..18f080b 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java @@ -155,7 +155,8 @@ public class EventBuilderImpl implements EventBuilder { //observable of entries as the batches are deleted final Observable<List<MvccLogEntry>> entries = ecm.getVersions( entityId ).buffer( serializationFig.getBufferSize() ) - .doOnNext( buffer -> ecm.delete( buffer ) ).doOnCompleted( () -> gm.compactNode( entityId ) ); + .doOnNext( buffer -> ecm.delete( buffer ) ).doOnCompleted( () -> gm.compactNode( entityId ).toBlocking().lastOrDefault(null) ); + return new EntityDeleteResults( edgeObservable, entries ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/f73ac4c7/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java index 3c41a2b..915af03 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java @@ -53,4 +53,11 @@ public class FilterResult<T> { } + @Override + public String toString() { + return "FilterResult{" + + "path=" + path + + ", value=" + value + + '}'; + } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/f73ac4c7/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java index d3e0345..88c912a 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java @@ -85,7 +85,7 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, /** * TODO, pass a message with pointers to our cursor values to be generated later */ - return graphManager.loadEdgesFromSource( search ) + return graphManager.loadEdgesFromSource( search ).onBackpressureBlock() //set the edge state for cursors .doOnNext( edge -> { logger.trace( "Seeking over edge {}", edge ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/f73ac4c7/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java index e119c59..c1e9cea 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java @@ -300,7 +300,7 @@ public class GraphManagerImpl implements GraphManager { @Override public Observable<Edge> loadEdgesFromSource( final SearchByEdgeType search ) { final Observable<Edge> edges = - Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) { + Observable.create( new ObservableIterator<MarkedEdge>( "loadEdgesFromSource" ) { @Override protected Iterator<MarkedEdge> getIterator() { return storageEdgeSerialization.getEdgesFromSource( scope, search ); @@ -315,7 +315,7 @@ public class GraphManagerImpl implements GraphManager { @Override public Observable<Edge> loadEdgesToTarget( final SearchByEdgeType search ) { final Observable<Edge> edges = - Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) { + Observable.create( new ObservableIterator<MarkedEdge>( "loadEdgesToTarget" ) { @Override protected Iterator<MarkedEdge> getIterator() { return storageEdgeSerialization.getEdgesToTarget( scope, search ); @@ -331,7 +331,7 @@ public class GraphManagerImpl implements GraphManager { @Override public Observable<Edge> loadEdgesFromSourceByType( final SearchByIdType search ) { final Observable<Edge> edges = - Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) { + Observable.create( new ObservableIterator<MarkedEdge>( "loadEdgesFromSourceByType" ) { @Override protected Iterator<MarkedEdge> getIterator() { return storageEdgeSerialization.getEdgesFromSourceByTargetType( scope, search ); @@ -346,7 +346,7 @@ public class GraphManagerImpl implements GraphManager { @Override public Observable<Edge> loadEdgesToTargetByType( final SearchByIdType search ) { final Observable<Edge> edges = - Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) { + Observable.create( new ObservableIterator<MarkedEdge>( "loadEdgesToTargetByType" ) { @Override protected Iterator<MarkedEdge> getIterator() { return storageEdgeSerialization.getEdgesToTargetBySourceType( scope, search );
