Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-641 cf8b55159 -> bc3cafb00


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index 6135121..619e65d 100644
--- 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -29,7 +29,9 @@ import java.util.Iterator;
 import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.RejectedExecutionException;
 
 import javax.annotation.Nullable;
 
@@ -37,13 +39,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.core.consistency.TimeService;
+import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.task.Task;
-import org.apache.usergrid.persistence.core.task.TaskExecutor;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
-import org.apache.usergrid.persistence.graph.guice.GraphTaskExecutor;
 import 
org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
 import 
org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
 import 
org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
@@ -63,6 +63,8 @@ import com.google.common.hash.PrimitiveSink;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.netflix.astyanax.Keyspace;
@@ -85,7 +87,7 @@ public class ShardGroupCompactionImpl implements 
ShardGroupCompaction {
     private static final HashFunction MURMUR_128 = Hashing.murmur3_128();
 
 
-    private final TaskExecutor taskExecutor;
+    private final ListeningExecutorService taskExecutor;
     private final TimeService timeService;
     private final GraphFig graphFig;
     private final NodeShardAllocation nodeShardAllocation;
@@ -104,8 +106,7 @@ public class ShardGroupCompactionImpl implements 
ShardGroupCompaction {
                                      final NodeShardAllocation 
nodeShardAllocation,
                                      final ShardedEdgeSerialization 
shardedEdgeSerialization,
                                      final EdgeColumnFamilies 
edgeColumnFamilies, final Keyspace keyspace,
-                                     final EdgeShardSerialization 
edgeShardSerialization,
-                                     @GraphTaskExecutor final TaskExecutor 
taskExecutor ) {
+                                     final EdgeShardSerialization 
edgeShardSerialization) {
 
         this.timeService = timeService;
         this.graphFig = graphFig;
@@ -119,7 +120,10 @@ public class ShardGroupCompactionImpl implements 
ShardGroupCompaction {
         this.shardCompactionTaskTracker = new ShardCompactionTaskTracker();
         this.shardAuditTaskTracker = new ShardAuditTaskTracker();
 
-        this.taskExecutor = taskExecutor;
+
+        this.taskExecutor = MoreExecutors.listeningDecorator( 
TaskExecutorFactory
+            .createTaskExecutor( "ShardCompaction", 
graphFig.getShardAuditWorkerCount(),
+                graphFig.getShardAuditWorkerQueueSize() ) );
     }
 
 
@@ -139,8 +143,8 @@ public class ShardGroupCompactionImpl implements 
ShardGroupCompaction {
 
         Preconditions.checkNotNull( group, "group cannot be null" );
         Preconditions.checkArgument( group.isCompactionPending(), "Compaction 
is pending" );
-        Preconditions.checkArgument( group.shouldCompact( startTime ),
-                "Compaction cannot be run yet.  Ignoring compaction." );
+        Preconditions
+            .checkArgument( group.shouldCompact( startTime ), "Compaction 
cannot be run yet.  Ignoring compaction." );
 
 
         final CompactionResult.CompactionBuilder resultBuilder = 
CompactionResult.builder();
@@ -170,8 +174,9 @@ public class ShardGroupCompactionImpl implements 
ShardGroupCompaction {
 
 
         for ( Shard sourceShard : sourceShards ) {
-            Iterator<MarkedEdge> edges = edgeMeta.loadEdges( 
shardedEdgeSerialization, edgeColumnFamilies, scope,
-                    Collections.singleton( sourceShard ), Long.MAX_VALUE, 
SearchByEdgeType.Order.DESCENDING );
+            Iterator<MarkedEdge> edges = edgeMeta
+                .loadEdges( shardedEdgeSerialization, edgeColumnFamilies, 
scope, Collections.singleton( sourceShard ),
+                    Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING );
 
             while ( edges.hasNext() ) {
                 final MarkedEdge edge = edges.next();
@@ -186,13 +191,13 @@ public class ShardGroupCompactionImpl implements 
ShardGroupCompaction {
                 }
 
 
-                newRowBatch.mergeShallow(
-                        edgeMeta.writeEdge( shardedEdgeSerialization, 
edgeColumnFamilies, scope, targetShard, edge,
-                                timestamp ) );
+                newRowBatch.mergeShallow( edgeMeta
+                        .writeEdge( shardedEdgeSerialization, 
edgeColumnFamilies, scope, targetShard, edge,
+                            timestamp ) );
 
-                deleteRowBatch.mergeShallow(
-                        edgeMeta.deleteEdge( shardedEdgeSerialization, 
edgeColumnFamilies, scope, sourceShard, edge,
-                                timestamp ) );
+                deleteRowBatch.mergeShallow( edgeMeta
+                        .deleteEdge( shardedEdgeSerialization, 
edgeColumnFamilies, scope, sourceShard, edge,
+                            timestamp ) );
 
                 edgeCount++;
 
@@ -298,7 +303,18 @@ public class ShardGroupCompactionImpl implements 
ShardGroupCompaction {
          * Try and submit.  During back pressure, we may not be able to 
submit, that's ok.  Better to drop than to
          * hose the system
          */
-        ListenableFuture<AuditResult> future = taskExecutor.submit( new 
ShardAuditTask( scope, edgeMeta, group ) );
+        final ListenableFuture<AuditResult> future;
+
+        try {
+            future = taskExecutor.submit( new ShardAuditTask( scope, edgeMeta, 
group ) );
+        }
+        catch ( RejectedExecutionException ree ) {
+
+            //ignore, if this happens we don't care, we're saturated, we can 
check later
+            LOG.error( "Rejected audit for shard of scope {} edge, meta {} and 
group {}", scope, edgeMeta, group );
+
+            return Futures.immediateFuture( AuditResult.NOT_CHECKED );
+        }
 
         /**
          * Log our success or failures for debugging purposes
@@ -320,7 +336,7 @@ public class ShardGroupCompactionImpl implements 
ShardGroupCompaction {
     }
 
 
-    private final class ShardAuditTask implements Task<AuditResult> {
+    private final class ShardAuditTask implements Callable<AuditResult> {
 
         private final ApplicationScope scope;
         private final DirectedEdgeMeta edgeMeta;
@@ -334,20 +350,6 @@ public class ShardGroupCompactionImpl implements 
ShardGroupCompaction {
             this.group = group;
         }
 
-         @Override
-        public void exceptionThrown( final Throwable throwable ) {
-            LOG.error( "Unable to execute audit for shard of {}", throwable );
-        }
-
-
-        @Override
-        public AuditResult rejected() {
-            //ignore, if this happens we don't care, we're saturated, we can 
check later
-            LOG.error( "Rejected audit for shard of scope {} edge, meta {} and 
group {}", scope, edgeMeta, group );
-
-            return AuditResult.NOT_CHECKED;
-        }
-
 
         @Override
         public AuditResult call() throws Exception {
@@ -401,10 +403,8 @@ public class ShardGroupCompactionImpl implements 
ShardGroupCompaction {
                  */
                 try {
                     CompactionResult result = compact( scope, edgeMeta, group 
);
-                    LOG.info(
-                            "Compaction result for compaction of scope {} with 
edge meta data of {} and shard group " +
-                                    "{} is {}",
-                            new Object[] { scope, edgeMeta, group, result } );
+                    LOG.info( "Compaction result for compaction of scope {} 
with edge meta data of {} and shard group "
+                            + "{} is {}", new Object[] { scope, edgeMeta, 
group, result } );
                 }
                 finally {
                     shardCompactionTaskTracker.complete( scope, edgeMeta, 
group );
@@ -418,8 +418,6 @@ public class ShardGroupCompactionImpl implements 
ShardGroupCompaction {
     }
 
 
-
-
     /**
      * Inner class used to track running tasks per instance
      */
@@ -534,8 +532,6 @@ public class ShardGroupCompactionImpl implements 
ShardGroupCompaction {
     }
 
 
-
-
     public static final class CompactionResult {
 
         public final long copiedEdges;
@@ -566,12 +562,12 @@ public class ShardGroupCompactionImpl implements 
ShardGroupCompaction {
         @Override
         public String toString() {
             return "CompactionResult{" +
-                    "copiedEdges=" + copiedEdges +
-                    ", targetShard=" + targetShard +
-                    ", sourceShards=" + sourceShards +
-                    ", removedShards=" + removedShards +
-                    ", compactedShard=" + compactedShard +
-                    '}';
+                "copiedEdges=" + copiedEdges +
+                ", targetShard=" + targetShard +
+                ", sourceShards=" + sourceShards +
+                ", removedShards=" + removedShards +
+                ", compactedShard=" + compactedShard +
+                '}';
         }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/CommittedGraphManagerIT.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/CommittedGraphManagerIT.java
 
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/CommittedGraphManagerIT.java
index b471119..658f4bf 100644
--- 
a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/CommittedGraphManagerIT.java
+++ 
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/CommittedGraphManagerIT.java
@@ -66,14 +66,14 @@ public class CommittedGraphManagerIT extends GraphManagerIT 
{
 
 
         @Override
-        public Observable<Edge> deleteEdge( final Edge edge ) {
-            return graphManager.deleteEdge( edge );
+        public Observable<Edge> markEdge( final Edge edge ) {
+            return graphManager.markEdge( edge );
         }
 
 
         @Override
-        public Observable<Id> deleteNode( final Id node, final long timestamp) 
{
-            return graphManager.deleteNode( node, timestamp );
+        public Observable<Id> markNode( final Id node, final long timestamp ) {
+            return graphManager.markNode( node, timestamp );
         }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
 
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
index 2ed4f13..340c712 100644
--- 
a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
+++ 
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
@@ -584,7 +584,7 @@ public abstract class GraphManagerIT {
 
 
         //now delete it
-        returned = gm.deleteEdge( edge ).toBlocking().last();
+        returned = gm.markEdge( edge ).toBlocking().last();
 
 
         //now test retrieval, should be null
@@ -645,7 +645,7 @@ public abstract class GraphManagerIT {
 
 
         //now delete it
-        gm.deleteEdge( edge ).toBlocking().last();
+        gm.markEdge( edge ).toBlocking().last();
 
         //now test retrieval, should be null
         edges = gm.loadEdgesToTarget( search );
@@ -1005,7 +1005,7 @@ public abstract class GraphManagerIT {
 
         System.out.println( "\n\n\n\n\n\n\n\n\n\n" );
 
-        gm.deleteEdge( edge1 ).toBlocking().last();
+        gm.markEdge( edge1 ).toBlocking().last();
 
         System.out.println( "\n\n\n\n\n\n\n\n\n\n" );
 
@@ -1025,7 +1025,7 @@ public abstract class GraphManagerIT {
 
         //now delete one of the edges
 
-        gm.deleteEdge( edge2 ).toBlocking().last();
+        gm.markEdge( edge2 ).toBlocking().last();
 
         System.out.println( "\n\n\n\n\n\n\n\n\n\n" );
 
@@ -1081,7 +1081,7 @@ public abstract class GraphManagerIT {
 
         //now delete one of the edges
 
-        gm.deleteEdge( edge1 ).toBlocking().last();
+        gm.markEdge( edge1 ).toBlocking().last();
 
 
         edges = gm.loadEdgesToTarget( createSearchByEdge( 
edge1.getTargetNode(), edge1.getType(), maxVersion, null ) );
@@ -1096,7 +1096,7 @@ public abstract class GraphManagerIT {
 
         //now delete one of the edges
 
-        gm.deleteEdge( edge2 ).toBlocking().last();
+        gm.markEdge( edge2 ).toBlocking().last();
 
         edges = gm.loadEdgesToTarget( createSearchByEdge( 
edge1.getTargetNode(), edge1.getType(), maxVersion, null ) );
 
@@ -1146,7 +1146,7 @@ public abstract class GraphManagerIT {
 
         //now delete one of the edges
 
-        gm.deleteEdge( edge1 ).toBlocking().last();
+        gm.markEdge( edge1 ).toBlocking().last();
 
 
         edges = gm.loadEdgesFromSourceByType(
@@ -1171,7 +1171,7 @@ public abstract class GraphManagerIT {
 
         //now delete one of the edges
 
-        gm.deleteEdge( edge2 ).toBlocking().last();
+        gm.markEdge( edge2 ).toBlocking().last();
 
 
         edges = gm.loadEdgesFromSourceByType(
@@ -1223,7 +1223,7 @@ public abstract class GraphManagerIT {
 
         //now delete one of the edges
 
-        gm.deleteEdge( edge1 ).toBlocking().last();
+        gm.markEdge( edge1 ).toBlocking().last();
 
 
         edges = gm.loadEdgesToTargetByType(
@@ -1249,7 +1249,7 @@ public abstract class GraphManagerIT {
 
         //now delete one of the edges
 
-        gm.deleteEdge( edge2 ).toBlocking().last();
+        gm.markEdge( edge2 ).toBlocking().last();
 
 
         edges = gm.loadEdgesToTargetByType(
@@ -1320,7 +1320,7 @@ public abstract class GraphManagerIT {
         assertFalse( "No more edges", results.hasNext() );
 
         //mark the source node
-        gm.deleteNode( sourceId, edge2.getTimestamp() ).toBlocking().last();
+        gm.markNode( sourceId, edge2.getTimestamp() ).toBlocking().last();
 
 
         //now re-read, nothing should be there since they're marked
@@ -1402,7 +1402,7 @@ public abstract class GraphManagerIT {
         assertFalse( "No more edges", results.hasNext() );
 
         //mark the source node
-        gm.deleteNode( targetId, edge2.getTimestamp() ).toBlocking().last();
+        gm.markNode( targetId, edge2.getTimestamp() ).toBlocking().last();
 
 
         //now re-read, nothing should be there since they're marked
@@ -1641,7 +1641,7 @@ public abstract class GraphManagerIT {
     public void invalidEdgeTypesDelete( ) {
         final GraphManager em = emf.createEdgeManager( scope );
 
-        em.deleteEdge( null );
+        em.markEdge( null );
     }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/StorageGraphManagerIT.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/StorageGraphManagerIT.java
 
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/StorageGraphManagerIT.java
index 6f3d388..84825aa 100644
--- 
a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/StorageGraphManagerIT.java
+++ 
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/StorageGraphManagerIT.java
@@ -116,16 +116,16 @@ public class StorageGraphManagerIT extends GraphManagerIT 
{
 
 
         @Override
-        public Observable<Edge> deleteEdge( final Edge edge ) {
+        public Observable<Edge> markEdge( final Edge edge ) {
             waitForComplete();
-            return graphManager.deleteEdge( edge );
+            return graphManager.markEdge( edge );
         }
 
 
         @Override
-        public Observable<Id> deleteNode( final Id node, final long timestamp 
) {
+        public Observable<Id> markNode( final Id node, final long timestamp ) {
             waitForComplete();
-            return graphManager.deleteNode( node, timestamp );
+            return graphManager.markNode( node, timestamp );
         }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
 
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
index 68b6f8f..7d4b7f6 100644
--- 
a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
+++ 
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
@@ -32,14 +32,12 @@ import org.junit.Test;
 import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
-import org.apache.usergrid.persistence.core.task.TaskExecutor;
 import org.apache.usergrid.persistence.core.util.IdGenerator;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import 
org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupCompactionImpl;
 
 import com.netflix.astyanax.Keyspace;
 
-import static org.apache.usergrid.persistence.core.util.IdGenerator.createId;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
@@ -79,8 +77,6 @@ public class ShardGroupCompactionTest {
 
         final EdgeShardSerialization edgeShardSerialization = mock( 
EdgeShardSerialization.class );
 
-        final TaskExecutor taskExecutor = mock( TaskExecutor.class );
-
         final long delta = 10000;
 
         final long createTime = 20000;
@@ -97,7 +93,7 @@ public class ShardGroupCompactionTest {
 
         ShardGroupCompactionImpl compaction =
                 new ShardGroupCompactionImpl( timeService, graphFig, 
nodeShardAllocation, shardedEdgeSerialization,
-                        edgeColumnFamilies, keyspace, edgeShardSerialization, 
taskExecutor );
+                        edgeColumnFamilies, keyspace, edgeShardSerialization );
 
 
         DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( 
IdGenerator.createId( "source" ), "test" );

Reply via email to