Repository: incubator-usergrid Updated Branches: refs/heads/two-dot-o-dev cf8b55159 -> d0bde1884
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java index 6135121..619e65d 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java @@ -29,7 +29,9 @@ import java.util.Iterator; import java.util.Random; import java.util.Set; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.RejectedExecutionException; import javax.annotation.Nullable; @@ -37,13 +39,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.usergrid.persistence.core.consistency.TimeService; +import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory; import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.core.task.Task; -import org.apache.usergrid.persistence.core.task.TaskExecutor; import org.apache.usergrid.persistence.graph.GraphFig; import org.apache.usergrid.persistence.graph.MarkedEdge; import org.apache.usergrid.persistence.graph.SearchByEdgeType; -import org.apache.usergrid.persistence.graph.guice.GraphTaskExecutor; import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta; import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies; import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization; @@ -63,6 +63,8 @@ import com.google.common.hash.PrimitiveSink; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; import com.google.inject.Singleton; import com.netflix.astyanax.Keyspace; @@ -85,7 +87,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { private static final HashFunction MURMUR_128 = Hashing.murmur3_128(); - private final TaskExecutor taskExecutor; + private final ListeningExecutorService taskExecutor; private final TimeService timeService; private final GraphFig graphFig; private final NodeShardAllocation nodeShardAllocation; @@ -104,8 +106,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { final NodeShardAllocation nodeShardAllocation, final ShardedEdgeSerialization shardedEdgeSerialization, final EdgeColumnFamilies edgeColumnFamilies, final Keyspace keyspace, - final EdgeShardSerialization edgeShardSerialization, - @GraphTaskExecutor final TaskExecutor taskExecutor ) { + final EdgeShardSerialization edgeShardSerialization) { this.timeService = timeService; this.graphFig = graphFig; @@ -119,7 +120,10 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { this.shardCompactionTaskTracker = new ShardCompactionTaskTracker(); this.shardAuditTaskTracker = new ShardAuditTaskTracker(); - this.taskExecutor = taskExecutor; + + this.taskExecutor = MoreExecutors.listeningDecorator( TaskExecutorFactory + .createTaskExecutor( "ShardCompaction", graphFig.getShardAuditWorkerCount(), + graphFig.getShardAuditWorkerQueueSize() ) ); } @@ -139,8 +143,8 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { Preconditions.checkNotNull( group, "group cannot be null" ); Preconditions.checkArgument( group.isCompactionPending(), "Compaction is pending" ); - Preconditions.checkArgument( group.shouldCompact( startTime ), - "Compaction cannot be run yet. Ignoring compaction." ); + Preconditions + .checkArgument( group.shouldCompact( startTime ), "Compaction cannot be run yet. Ignoring compaction." ); final CompactionResult.CompactionBuilder resultBuilder = CompactionResult.builder(); @@ -170,8 +174,9 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { for ( Shard sourceShard : sourceShards ) { - Iterator<MarkedEdge> edges = edgeMeta.loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, - Collections.singleton( sourceShard ), Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING ); + Iterator<MarkedEdge> edges = edgeMeta + .loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, Collections.singleton( sourceShard ), + Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING ); while ( edges.hasNext() ) { final MarkedEdge edge = edges.next(); @@ -186,13 +191,13 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { } - newRowBatch.mergeShallow( - edgeMeta.writeEdge( shardedEdgeSerialization, edgeColumnFamilies, scope, targetShard, edge, - timestamp ) ); + newRowBatch.mergeShallow( edgeMeta + .writeEdge( shardedEdgeSerialization, edgeColumnFamilies, scope, targetShard, edge, + timestamp ) ); - deleteRowBatch.mergeShallow( - edgeMeta.deleteEdge( shardedEdgeSerialization, edgeColumnFamilies, scope, sourceShard, edge, - timestamp ) ); + deleteRowBatch.mergeShallow( edgeMeta + .deleteEdge( shardedEdgeSerialization, edgeColumnFamilies, scope, sourceShard, edge, + timestamp ) ); edgeCount++; @@ -298,7 +303,18 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { * Try and submit. During back pressure, we may not be able to submit, that's ok. Better to drop than to * hose the system */ - ListenableFuture<AuditResult> future = taskExecutor.submit( new ShardAuditTask( scope, edgeMeta, group ) ); + final ListenableFuture<AuditResult> future; + + try { + future = taskExecutor.submit( new ShardAuditTask( scope, edgeMeta, group ) ); + } + catch ( RejectedExecutionException ree ) { + + //ignore, if this happens we don't care, we're saturated, we can check later + LOG.error( "Rejected audit for shard of scope {} edge, meta {} and group {}", scope, edgeMeta, group ); + + return Futures.immediateFuture( AuditResult.NOT_CHECKED ); + } /** * Log our success or failures for debugging purposes @@ -320,7 +336,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { } - private final class ShardAuditTask implements Task<AuditResult> { + private final class ShardAuditTask implements Callable<AuditResult> { private final ApplicationScope scope; private final DirectedEdgeMeta edgeMeta; @@ -334,20 +350,6 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { this.group = group; } - @Override - public void exceptionThrown( final Throwable throwable ) { - LOG.error( "Unable to execute audit for shard of {}", throwable ); - } - - - @Override - public AuditResult rejected() { - //ignore, if this happens we don't care, we're saturated, we can check later - LOG.error( "Rejected audit for shard of scope {} edge, meta {} and group {}", scope, edgeMeta, group ); - - return AuditResult.NOT_CHECKED; - } - @Override public AuditResult call() throws Exception { @@ -401,10 +403,8 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { */ try { CompactionResult result = compact( scope, edgeMeta, group ); - LOG.info( - "Compaction result for compaction of scope {} with edge meta data of {} and shard group " + - "{} is {}", - new Object[] { scope, edgeMeta, group, result } ); + LOG.info( "Compaction result for compaction of scope {} with edge meta data of {} and shard group " + + "{} is {}", new Object[] { scope, edgeMeta, group, result } ); } finally { shardCompactionTaskTracker.complete( scope, edgeMeta, group ); @@ -418,8 +418,6 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { } - - /** * Inner class used to track running tasks per instance */ @@ -534,8 +532,6 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { } - - public static final class CompactionResult { public final long copiedEdges; @@ -566,12 +562,12 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { @Override public String toString() { return "CompactionResult{" + - "copiedEdges=" + copiedEdges + - ", targetShard=" + targetShard + - ", sourceShards=" + sourceShards + - ", removedShards=" + removedShards + - ", compactedShard=" + compactedShard + - '}'; + "copiedEdges=" + copiedEdges + + ", targetShard=" + targetShard + + ", sourceShards=" + sourceShards + + ", removedShards=" + removedShards + + ", compactedShard=" + compactedShard + + '}'; } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/CommittedGraphManagerIT.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/CommittedGraphManagerIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/CommittedGraphManagerIT.java index b471119..658f4bf 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/CommittedGraphManagerIT.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/CommittedGraphManagerIT.java @@ -66,14 +66,14 @@ public class CommittedGraphManagerIT extends GraphManagerIT { @Override - public Observable<Edge> deleteEdge( final Edge edge ) { - return graphManager.deleteEdge( edge ); + public Observable<Edge> markEdge( final Edge edge ) { + return graphManager.markEdge( edge ); } @Override - public Observable<Id> deleteNode( final Id node, final long timestamp) { - return graphManager.deleteNode( node, timestamp ); + public Observable<Id> markNode( final Id node, final long timestamp ) { + return graphManager.markNode( node, timestamp ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java index 2ed4f13..340c712 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java @@ -584,7 +584,7 @@ public abstract class GraphManagerIT { //now delete it - returned = gm.deleteEdge( edge ).toBlocking().last(); + returned = gm.markEdge( edge ).toBlocking().last(); //now test retrieval, should be null @@ -645,7 +645,7 @@ public abstract class GraphManagerIT { //now delete it - gm.deleteEdge( edge ).toBlocking().last(); + gm.markEdge( edge ).toBlocking().last(); //now test retrieval, should be null edges = gm.loadEdgesToTarget( search ); @@ -1005,7 +1005,7 @@ public abstract class GraphManagerIT { System.out.println( "\n\n\n\n\n\n\n\n\n\n" ); - gm.deleteEdge( edge1 ).toBlocking().last(); + gm.markEdge( edge1 ).toBlocking().last(); System.out.println( "\n\n\n\n\n\n\n\n\n\n" ); @@ -1025,7 +1025,7 @@ public abstract class GraphManagerIT { //now delete one of the edges - gm.deleteEdge( edge2 ).toBlocking().last(); + gm.markEdge( edge2 ).toBlocking().last(); System.out.println( "\n\n\n\n\n\n\n\n\n\n" ); @@ -1081,7 +1081,7 @@ public abstract class GraphManagerIT { //now delete one of the edges - gm.deleteEdge( edge1 ).toBlocking().last(); + gm.markEdge( edge1 ).toBlocking().last(); edges = gm.loadEdgesToTarget( createSearchByEdge( edge1.getTargetNode(), edge1.getType(), maxVersion, null ) ); @@ -1096,7 +1096,7 @@ public abstract class GraphManagerIT { //now delete one of the edges - gm.deleteEdge( edge2 ).toBlocking().last(); + gm.markEdge( edge2 ).toBlocking().last(); edges = gm.loadEdgesToTarget( createSearchByEdge( edge1.getTargetNode(), edge1.getType(), maxVersion, null ) ); @@ -1146,7 +1146,7 @@ public abstract class GraphManagerIT { //now delete one of the edges - gm.deleteEdge( edge1 ).toBlocking().last(); + gm.markEdge( edge1 ).toBlocking().last(); edges = gm.loadEdgesFromSourceByType( @@ -1171,7 +1171,7 @@ public abstract class GraphManagerIT { //now delete one of the edges - gm.deleteEdge( edge2 ).toBlocking().last(); + gm.markEdge( edge2 ).toBlocking().last(); edges = gm.loadEdgesFromSourceByType( @@ -1223,7 +1223,7 @@ public abstract class GraphManagerIT { //now delete one of the edges - gm.deleteEdge( edge1 ).toBlocking().last(); + gm.markEdge( edge1 ).toBlocking().last(); edges = gm.loadEdgesToTargetByType( @@ -1249,7 +1249,7 @@ public abstract class GraphManagerIT { //now delete one of the edges - gm.deleteEdge( edge2 ).toBlocking().last(); + gm.markEdge( edge2 ).toBlocking().last(); edges = gm.loadEdgesToTargetByType( @@ -1320,7 +1320,7 @@ public abstract class GraphManagerIT { assertFalse( "No more edges", results.hasNext() ); //mark the source node - gm.deleteNode( sourceId, edge2.getTimestamp() ).toBlocking().last(); + gm.markNode( sourceId, edge2.getTimestamp() ).toBlocking().last(); //now re-read, nothing should be there since they're marked @@ -1402,7 +1402,7 @@ public abstract class GraphManagerIT { assertFalse( "No more edges", results.hasNext() ); //mark the source node - gm.deleteNode( targetId, edge2.getTimestamp() ).toBlocking().last(); + gm.markNode( targetId, edge2.getTimestamp() ).toBlocking().last(); //now re-read, nothing should be there since they're marked @@ -1641,7 +1641,7 @@ public abstract class GraphManagerIT { public void invalidEdgeTypesDelete( ) { final GraphManager em = emf.createEdgeManager( scope ); - em.deleteEdge( null ); + em.markEdge( null ); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/StorageGraphManagerIT.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/StorageGraphManagerIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/StorageGraphManagerIT.java index 6f3d388..84825aa 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/StorageGraphManagerIT.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/StorageGraphManagerIT.java @@ -116,16 +116,16 @@ public class StorageGraphManagerIT extends GraphManagerIT { @Override - public Observable<Edge> deleteEdge( final Edge edge ) { + public Observable<Edge> markEdge( final Edge edge ) { waitForComplete(); - return graphManager.deleteEdge( edge ); + return graphManager.markEdge( edge ); } @Override - public Observable<Id> deleteNode( final Id node, final long timestamp ) { + public Observable<Id> markNode( final Id node, final long timestamp ) { waitForComplete(); - return graphManager.deleteNode( node, timestamp ); + return graphManager.markNode( node, timestamp ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java index 68b6f8f..7d4b7f6 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java @@ -32,14 +32,12 @@ import org.junit.Test; import org.apache.usergrid.persistence.core.consistency.TimeService; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; -import org.apache.usergrid.persistence.core.task.TaskExecutor; import org.apache.usergrid.persistence.core.util.IdGenerator; import org.apache.usergrid.persistence.graph.GraphFig; import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupCompactionImpl; import com.netflix.astyanax.Keyspace; -import static org.apache.usergrid.persistence.core.util.IdGenerator.createId; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; @@ -79,8 +77,6 @@ public class ShardGroupCompactionTest { final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class ); - final TaskExecutor taskExecutor = mock( TaskExecutor.class ); - final long delta = 10000; final long createTime = 20000; @@ -97,7 +93,7 @@ public class ShardGroupCompactionTest { ShardGroupCompactionImpl compaction = new ShardGroupCompactionImpl( timeService, graphFig, nodeShardAllocation, shardedEdgeSerialization, - edgeColumnFamilies, keyspace, edgeShardSerialization, taskExecutor ); + edgeColumnFamilies, keyspace, edgeShardSerialization ); DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( IdGenerator.createId( "source" ), "test" );
