Added back pressure block as a test

Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/f73ac4c7
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/f73ac4c7
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/f73ac4c7

Branch: refs/heads/master
Commit: f73ac4c724088150bc1e0315942e93306321a72a
Parents: d35fea5
Author: Todd Nine <[email protected]>
Authored: Fri Oct 23 10:45:10 2015 -0600
Committer: Todd Nine <[email protected]>
Committed: Fri Oct 23 10:50:27 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/asyncevents/EventBuilderImpl.java        | 3 ++-
 .../usergrid/corepersistence/pipeline/read/FilterResult.java | 7 +++++++
 .../pipeline/read/traverse/AbstractReadGraphFilter.java      | 2 +-
 .../usergrid/persistence/graph/impl/GraphManagerImpl.java    | 8 ++++----
 4 files changed, 14 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/f73ac4c7/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index cc0356b..18f080b 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -155,7 +155,8 @@ public class EventBuilderImpl implements EventBuilder {
         //observable of entries as the batches are deleted
         final Observable<List<MvccLogEntry>> entries =
             ecm.getVersions( entityId ).buffer( 
serializationFig.getBufferSize() )
-               .doOnNext( buffer -> ecm.delete( buffer ) ).doOnCompleted( () 
-> gm.compactNode( entityId ) );
+               .doOnNext( buffer -> ecm.delete( buffer ) ).doOnCompleted( () 
-> gm.compactNode( entityId ).toBlocking().lastOrDefault(null) );
+
 
 
         return new EntityDeleteResults( edgeObservable, entries );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f73ac4c7/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java
index 3c41a2b..915af03 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java
@@ -53,4 +53,11 @@ public class FilterResult<T> {
     }
 
 
+    @Override
+    public String toString() {
+        return "FilterResult{" +
+            "path=" + path +
+            ", value=" + value +
+            '}';
+    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f73ac4c7/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
index d3e0345..88c912a 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
@@ -85,7 +85,7 @@ public abstract class AbstractReadGraphFilter extends 
AbstractPathFilter<Id, Id,
             /**
              * TODO, pass a message with pointers to our cursor values to be 
generated later
              */
-            return graphManager.loadEdgesFromSource( search )
+            return graphManager.loadEdgesFromSource( search 
).onBackpressureBlock()
                 //set the edge state for cursors
                 .doOnNext( edge -> {
                     logger.trace( "Seeking over edge {}", edge );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f73ac4c7/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
index e119c59..c1e9cea 100644
--- 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
+++ 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
@@ -300,7 +300,7 @@ public class GraphManagerImpl implements GraphManager {
     @Override
     public Observable<Edge> loadEdgesFromSource( final SearchByEdgeType search 
) {
         final Observable<Edge> edges =
-            Observable.create( new ObservableIterator<MarkedEdge>( 
"getEdgeTypesFromSource" ) {
+            Observable.create( new ObservableIterator<MarkedEdge>( 
"loadEdgesFromSource" ) {
                 @Override
                 protected Iterator<MarkedEdge> getIterator() {
                     return storageEdgeSerialization.getEdgesFromSource( scope, 
search );
@@ -315,7 +315,7 @@ public class GraphManagerImpl implements GraphManager {
     @Override
     public Observable<Edge> loadEdgesToTarget( final SearchByEdgeType search ) 
{
         final Observable<Edge> edges =
-            Observable.create( new ObservableIterator<MarkedEdge>( 
"getEdgeTypesFromSource" ) {
+            Observable.create( new ObservableIterator<MarkedEdge>( 
"loadEdgesToTarget" ) {
                 @Override
                 protected Iterator<MarkedEdge> getIterator() {
                     return storageEdgeSerialization.getEdgesToTarget( scope, 
search );
@@ -331,7 +331,7 @@ public class GraphManagerImpl implements GraphManager {
     @Override
     public Observable<Edge> loadEdgesFromSourceByType( final SearchByIdType 
search ) {
         final Observable<Edge> edges =
-            Observable.create( new ObservableIterator<MarkedEdge>( 
"getEdgeTypesFromSource" ) {
+            Observable.create( new ObservableIterator<MarkedEdge>( 
"loadEdgesFromSourceByType" ) {
                 @Override
                 protected Iterator<MarkedEdge> getIterator() {
                     return 
storageEdgeSerialization.getEdgesFromSourceByTargetType( scope, search );
@@ -346,7 +346,7 @@ public class GraphManagerImpl implements GraphManager {
     @Override
     public Observable<Edge> loadEdgesToTargetByType( final SearchByIdType 
search ) {
         final Observable<Edge> edges =
-            Observable.create( new ObservableIterator<MarkedEdge>( 
"getEdgeTypesFromSource" ) {
+            Observable.create( new ObservableIterator<MarkedEdge>( 
"loadEdgesToTargetByType" ) {
                 @Override
                 protected Iterator<MarkedEdge> getIterator() {
                     return 
storageEdgeSerialization.getEdgesToTargetBySourceType( scope, search );

Reply via email to