Throttle the compactions and auditing such that the 'type' can only be compacted one at a time (source or target node shard(s))
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/58ae197e Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/58ae197e Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/58ae197e Branch: refs/heads/release-2.1.1 Commit: 58ae197ea581f271de644760531a9cd45287c7c9 Parents: 4e407ff Author: Michael Russo <[email protected]> Authored: Fri Mar 18 14:31:30 2016 -0700 Committer: Michael Russo <[email protected]> Committed: Fri Mar 18 14:31:30 2016 -0700 ---------------------------------------------------------------------- .../core/astyanax/MultiRowColumnIterator.java | 2 +- .../shard/impl/ShardGroupCompactionImpl.java | 54 ++++++++++++++++---- .../graph/GraphManagerShardConsistencyIT.java | 25 ++++----- 3 files changed, 54 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/58ae197e/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java index d8b9097..6049c1f 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java @@ -375,7 +375,7 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> { // if a whole page is skipped OR the result size equals the the difference of what's skipped, // it is likely during a shard transition and we should assume there is more to read - if( skipSize == selectSize || skipSize == selectSize - 1 || size == selectSize - skipSize ){ + if( skipSize == selectSize || skipSize == selectSize - 1 || size == selectSize - skipSize || size == (selectSize -1) - skipSize ){ moreToReturn = true; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/58ae197e/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java index 1890d53..8728c6c 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java @@ -34,6 +34,7 @@ import javax.annotation.Nullable; import com.google.common.base.Optional; import com.netflix.astyanax.connectionpool.OperationResult; +import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey; import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.serialization.impl.shard.*; import org.slf4j.Logger; @@ -200,37 +201,61 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { edgeCount++; - shardEnd = edge; + // if we're at our count, execute the mutation of writing the edges to the new row, then remove them // from the old rows if ( edgeCount % maxWorkSize == 0 ) { + + try { // write the edges into the new shard atomically so we know they all succeed newRowBatch.withAtomicBatch(true).execute(); + // set the shardEnd after the write is known to be successful + shardEnd = edge; + + // Update the shard end after each batch so any reads during transition stay as close to current + sourceShard.setShardEnd( + Optional.of(new DirectedEdge(shardEnd.getTargetNode(), shardEnd.getTimestamp())) + ); + + logger.info("Updating shard {} for nodes {} with shardEnd {}", sourceShard, edgeMeta.getNodes(), shardEnd ); + updateShardMetaBatch.mergeShallow( + edgeShardSerialization.writeShardMeta(scope, sourceShard, edgeMeta)); + + + // on purpose block this thread before deleting the old edges to be sure there are no gaps // duplicates are filtered on graph seeking so this is OK Thread.sleep(1000); + logger.info("Deleting batch of {} from old shard", maxWorkSize); deleteRowBatch.execute(); + } catch ( Throwable t ) { logger.error( "Unable to move edges from shard {} to shard {}", sourceShard, targetShard ); } + }else { + + shardEnd = edge; + } + + + } - if (shardEnd != null){ + if (shardEnd != null && edgeCount > 0){ sourceShard.setShardEnd( Optional.of(new DirectedEdge(shardEnd.getTargetNode(), shardEnd.getTimestamp())) ); - - logger.info("Updating shard {} with shardEnd: {}", sourceShard, shardEnd ); + logger.info("Updating shard {} for nodes {} with shardEnd {}", sourceShard, shardEnd ); updateShardMetaBatch.mergeShallow( edgeShardSerialization.writeShardMeta(scope, sourceShard, edgeMeta)); } @@ -247,9 +272,13 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { // on purpose block this thread before deleting the old edges to be sure there are no gaps // duplicates are filtered on graph seeking so this is OK Thread.sleep(1000); + + logger.info("Deleting remaining edges from old shard"); deleteRowBatch.execute(); + // now update with our shard end updateShardMetaBatch.execute(); + } catch ( Throwable t ) { logger.error( "Unable to move edges to target shard {}", targetShard ); @@ -438,6 +467,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { * It's already compacting, don't do anything */ if ( !shardCompactionTaskTracker.canStartTask( scope, edgeMeta, group ) ) { + logger.info("the group is already compacting"); return AuditResult.COMPACTING; } @@ -477,8 +507,9 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { public boolean canStartTask( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta, ShardEntryGroup group ) { final Long hash = doHash( scope, edgeMeta, group ).hash().asLong(); - final Boolean returned = runningTasks.putIfAbsent( hash, TRUE ); + //logger.info("hash components are app: {}, edgeMeta: {}, group: {}", scope.getApplication(), edgeMeta, group); + //logger.info("checking hash value of: {}, already started: {}", hash, returned ); /** * Someone already put the value @@ -509,12 +540,13 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { /** * Hash our data into a consistent long */ + @Override protected Hasher doHash( final ApplicationScope scope, final DirectedEdgeMeta directedEdgeMeta, final ShardEntryGroup shardEntryGroup ) { final Hasher hasher = super.doHash( scope, directedEdgeMeta, shardEntryGroup ); - //add our compaction target to the hash + // add the compaction target to the hash final Shard compactionTarget = shardEntryGroup.getCompactionTarget(); hasher.putLong( compactionTarget.getShardIndex() ); @@ -541,14 +573,16 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { addToHash( hasher, scope.getApplication() ); - /** - * add our edge meta data - */ + + /** Commenting the full meta from the hash so we allocate/compact shards in a more controlled fashion + for ( DirectedEdgeMeta.NodeMeta nodeMeta : directedEdgeMeta.getNodes() ) { addToHash( hasher, nodeMeta.getId() ); hasher.putInt( nodeMeta.getNodeType().getStorageValue() ); } + **/ + /** * Add our edge type @@ -557,8 +591,6 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { hasher.putString( type, CHARSET ); } - //add our compaction target to the hash - return hasher; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/58ae197e/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java index 8fd7cea..9e6996d 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java @@ -102,6 +102,8 @@ public class GraphManagerShardConsistencyIT { protected ListeningExecutorService deleteExecutor; + protected int TARGET_NUM_SHARDS = 6; + @Before @@ -172,15 +174,15 @@ public class GraphManagerShardConsistencyIT { public void writeThousandsSingleSource() throws InterruptedException, ExecutionException, MigrationException, UnsupportedEncodingException { - final Id sourceId = IdGenerator.createId( "sourceWrite" ); - final String edgeType = "testWrite_"+ UUIDGenerator.newTimeUUID().toString(); + final Id sourceId = IdGenerator.createId( "sourceWrite_"+ UUIDGenerator.newTimeUUID().toString() ); + final String edgeType = "testWrite"; final EdgeGenerator generator = new EdgeGenerator() { @Override public Edge newEdge() { - Edge edge = createEdge( sourceId, edgeType, IdGenerator.createId( "targetWrite" ) ); + Edge edge = createEdge( sourceId, edgeType, IdGenerator.createId( "targetWrite_"+ UUIDGenerator.newTimeUUID().toString() ) ); return edge; @@ -196,7 +198,7 @@ public class GraphManagerShardConsistencyIT { }; - final int numInjectors = 2; + final int numInjectors = 1; /** * create injectors. This way all the caches are independent of one another. This is the same as @@ -218,10 +220,7 @@ public class GraphManagerShardConsistencyIT { - /** - * Do 4x shard size so we should have approximately 4 shards - */ - final long numberOfEdges = shardSize * 4; + final long numberOfEdges = shardSize * TARGET_NUM_SHARDS; final long workerWriteLimit = numberOfEdges / numWorkersPerInjector / numInjectors; @@ -233,7 +232,7 @@ public class GraphManagerShardConsistencyIT { //min stop time the min delta + 1 cache cycle timeout - final long minExecutionTime = graphFig.getShardMinDelta() + graphFig.getShardCacheTimeout() + 60000; + final long minExecutionTime = graphFig.getShardMinDelta() + graphFig.getShardCacheTimeout() + 120000; logger.info( "Writing {} edges per worker on {} workers in {} injectors", workerWriteLimit, numWorkersPerInjector, @@ -279,7 +278,7 @@ public class GraphManagerShardConsistencyIT { final List<Throwable> failures = new ArrayList<>(); Thread.sleep(3000); // let's make sure everything is written - for(int i = 0; i < 2; i ++) { + for(int i = 0; i < 1; i ++) { /** @@ -452,11 +451,7 @@ public class GraphManagerShardConsistencyIT { final int numWorkersPerInjector = numProcessors / numInjectors; - - /** - * Do 4x shard size so we should have approximately 4 shards - */ - final long numberOfEdges = shardSize * 4; + final long numberOfEdges = shardSize * TARGET_NUM_SHARDS; final long workerWriteLimit = numberOfEdges / numWorkersPerInjector / numInjectors;
