Make the shard consistency tests a little smarter. Update shard compaction to be a safer background process by updating the edge writes to be atomic, and the deletes delayed (ensures data will always be available for seeking and we filter any dupes).
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/b112488d Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/b112488d Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/b112488d Branch: refs/heads/release-2.1.1 Commit: b112488db1ef01ee8417e35e48e428c40d0aa206 Parents: 4bbebc5 Author: Michael Russo <[email protected]> Authored: Wed Mar 16 17:25:22 2016 -0700 Committer: Michael Russo <[email protected]> Committed: Wed Mar 16 17:25:22 2016 -0700 ---------------------------------------------------------------------- .../core/astyanax/MultiRowColumnIterator.java | 8 +- .../shard/impl/ShardGroupCompactionImpl.java | 79 +++++++++++--- .../graph/GraphManagerShardConsistencyIT.java | 108 ++++++++++++------- .../graph/src/test/resources/log4j.properties | 9 +- 4 files changed, 142 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/b112488d/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java index 10786f7..6c91aca 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java @@ -169,9 +169,6 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> { } - // reset the start column as we'll be seeking a new row, any duplicates will be filtered out - startColumn = null; - advance(); } @@ -312,8 +309,9 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> { } - // if a whole page is skipped, this is likely during a shard transition and we should assume there is more to read - if( skipSize == selectSize || skipSize == selectSize - 1){ + // if a whole page is skipped OR the result size equals the the difference of what's skipped, + // it is likely during a shard transition and we should assume there is more to read + if( skipSize == selectSize || skipSize == selectSize - 1 || size == selectSize - skipSize ){ moreToReturn = true; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/b112488d/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java index 80b63ec..f644380 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java @@ -23,20 +23,17 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl; import java.nio.charset.Charset; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Random; -import java.util.Set; -import java.util.UUID; +import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; import com.google.common.base.Optional; +import com.netflix.astyanax.connectionpool.OperationResult; import org.apache.usergrid.persistence.graph.Edge; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,6 +69,8 @@ import com.google.inject.Singleton; import com.netflix.astyanax.Keyspace; import com.netflix.astyanax.MutationBatch; import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; +import rx.Observable; +import rx.schedulers.Schedulers; /** @@ -199,7 +198,6 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { break; } - newRowBatch.mergeShallow( edgeMeta .writeEdge( shardedEdgeSerialization, edgeColumnFamilies, scope, targetShard, edge, timestamp ) ); @@ -216,8 +214,35 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { if ( edgeCount % maxWorkSize == 0 ) { try { - newRowBatch.execute(); - deleteRowBatch.execute(); + + // write the edges into the new shard atomically so we know they all succeed + newRowBatch.withAtomicBatch(true).execute(); + + List<MutationBatch> deleteMutations = new ArrayList<MutationBatch>(1) + {{ + add(deleteRowBatch); + }}; + + // fire the mutation in the background after 1 second delay + if(logger.isTraceEnabled()){ + logger.trace("scheduling shard compaction delete"); + + } + + // perform the deletes after some delay, but we need to block before marking this shard as 'compacted' + Observable.from(deleteMutations) + .delay(1000, TimeUnit.MILLISECONDS) + .map(deleteRowBatchSingle -> { + try { + return deleteRowBatchSingle.execute(); + } catch (ConnectionException e) { + logger.error("Unable to remove edges from old shards"); + throw new RuntimeException("Unable to remove edges from old shards"); + } + }) + .subscribeOn(Schedulers.io()) + .toBlocking().last(); + } catch ( Throwable t ) { logger.error( "Unable to move edges from shard {} to shard {}", sourceShard, targetShard ); @@ -236,9 +261,35 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { try { - newRowBatch.execute(); - deleteRowBatch.execute(); - updateShardMetaBatch.execute(); + + // write the edges into the new shard atomically so we know they all succeed + newRowBatch.withAtomicBatch(true).execute(); + + List<MutationBatch> deleteMutations = new ArrayList<MutationBatch>(1) + {{ + add(deleteRowBatch); + }}; + + + if(logger.isTraceEnabled()) { + logger.trace("scheduling shard compaction delete"); + } + + // perform the deletes after some delay, but we need to block before marking this shard as 'compacted' + Observable.from(deleteMutations) + .delay(1000, TimeUnit.MILLISECONDS) + .map(deleteRowBatchSingle -> { + try { + return deleteRowBatchSingle.execute(); + } catch (ConnectionException e) { + logger.error("Unable to remove edges from old shards"); + throw new RuntimeException("Unable to remove edges from old shards"); + } + }) + .subscribeOn(Schedulers.io()) + .toBlocking().last(); + + //updateShardMetaBatch.execute(); } catch ( Throwable t ) { logger.error( "Unable to move edges to target shard {}", targetShard ); @@ -286,7 +337,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { shardRemovalRollup.execute(); } catch ( ConnectionException e ) { - throw new RuntimeException( "Unable to connect to casandra", e ); + throw new RuntimeException( "Unable to connect to cassandra", e ); } @@ -300,7 +351,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { updateMark.execute(); } catch ( ConnectionException e ) { - throw new RuntimeException( "Unable to connect to casandra", e ); + throw new RuntimeException( "Unable to connect to cassandra", e ); } resultBuilder.withCompactedShard( compactedShard ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/b112488d/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java index 82b0879..439553c 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java @@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; import org.apache.usergrid.StressTest; +import org.apache.usergrid.persistence.model.util.UUIDGenerator; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -78,7 +79,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; - public class GraphManagerShardConsistencyIT { private static final Logger logger = LoggerFactory.getLogger( GraphManagerShardConsistencyIT.class ); @@ -98,7 +98,10 @@ public class GraphManagerShardConsistencyIT { protected Object originalShardDelta; - protected ListeningExecutorService executor; + protected ListeningExecutorService writeExecutor; + + protected ListeningExecutorService deleteExecutor; + @Before @@ -112,7 +115,7 @@ public class GraphManagerShardConsistencyIT { originalShardDelta = ConfigurationManager.getConfigInstance().getProperty( GraphFig.SHARD_MIN_DELTA ); - ConfigurationManager.getConfigInstance().setProperty( GraphFig.SHARD_SIZE, 10000 ); + ConfigurationManager.getConfigInstance().setProperty( GraphFig.SHARD_SIZE, 5000 ); final long cacheTimeout = 2000; @@ -145,28 +148,39 @@ public class GraphManagerShardConsistencyIT { reporter.stop(); reporter.report(); - executor.shutdownNow(); + if(writeExecutor != null){ + writeExecutor.shutdownNow(); + + } + if(deleteExecutor != null){ + deleteExecutor.shutdownNow(); + + } + } - private void createExecutor( final int size ) { - executor = MoreExecutors.listeningDecorator( Executors.newFixedThreadPool( size ) ); + private void createWriteExecutor( final int size ) { + writeExecutor = MoreExecutors.listeningDecorator( Executors.newFixedThreadPool( size ) ); } + private void createDeleteExecutor( final int size ) { + deleteExecutor = MoreExecutors.listeningDecorator( Executors.newFixedThreadPool( size ) ); + } @Test public void writeThousandsSingleSource() throws InterruptedException, ExecutionException, MigrationException, UnsupportedEncodingException { - final Id sourceId = IdGenerator.createId( "source" ); - final String edgeType = "test"; + final Id sourceId = IdGenerator.createId( "sourceWrite" ); + final String edgeType = "testWrite_"+ UUIDGenerator.newTimeUUID().toString(); final EdgeGenerator generator = new EdgeGenerator() { @Override public Edge newEdge() { - Edge edge = createEdge( sourceId, edgeType, IdGenerator.createId( "target" ) ); + Edge edge = createEdge( sourceId, edgeType, IdGenerator.createId( "targetWrite" ) ); return edge; @@ -182,12 +196,12 @@ public class GraphManagerShardConsistencyIT { }; - // final int numInjectors = 2; + //final int numInjectors = 2; final int numInjectors = 1; /** - * create 3 injectors. This way all the caches are independent of one another. This is the same as - * multiple nodes + * create injectors. This way all the caches are independent of one another. This is the same as + * multiple nodes if there are multiple injectors */ final List<Injector> injectors = createInjectors( numInjectors ); @@ -214,7 +228,7 @@ public class GraphManagerShardConsistencyIT { final long workerWriteLimit = numberOfEdges / numWorkersPerInjector / numInjectors; - createExecutor( numWorkersPerInjector ); + createWriteExecutor( numWorkersPerInjector ); final AtomicLong writeCounter = new AtomicLong(); @@ -236,7 +250,7 @@ public class GraphManagerShardConsistencyIT { for ( int i = 0; i < numWorkersPerInjector; i++ ) { Future<Boolean> future = - executor.submit( new Worker( gmf, generator, workerWriteLimit, minExecutionTime, writeCounter ) ); + writeExecutor.submit( new Worker( gmf, generator, workerWriteLimit, minExecutionTime, writeCounter ) ); futures.add( future ); } @@ -260,20 +274,20 @@ public class GraphManagerShardConsistencyIT { final long writeCount = writeCounter.get(); final long expectedShardCount = writeCount / shardSize; - final Meter readMeter = registry.meter( "readThroughput" ); + final Meter readMeter = registry.meter( "readThroughput-writeTest" ); final List<Throwable> failures = new ArrayList<>(); - //Thread.sleep(5000); + Thread.sleep(3000); // let's make sure everything is written - for(int i = 0; i < 2; i ++) { + for(int i = 0; i < 1; i ++) { /** * Start reading continuously while we migrate data to ensure our view is always correct */ final ListenableFuture<Long> future = - executor.submit( new ReadWorker( gmf, generator, writeCount, readMeter ) ); + writeExecutor.submit( new ReadWorker( gmf, generator, writeCount, readMeter ) ); //add the future @@ -282,7 +296,7 @@ public class GraphManagerShardConsistencyIT { @Override public void onSuccess( @Nullable final Long result ) { logger.info( "Successfully ran the read, re-running" ); - executor.submit( new ReadWorker( gmf, generator, writeCount, readMeter ) ); + writeExecutor.submit( new ReadWorker( gmf, generator, writeCount, readMeter ) ); } @@ -360,7 +374,7 @@ public class GraphManagerShardConsistencyIT { Thread.sleep(30000); - executor.shutdownNow(); + writeExecutor.shutdownNow(); } @@ -390,20 +404,20 @@ public class GraphManagerShardConsistencyIT { } - @Test(timeout=120000) + @Test(timeout=300000) // this test is SLOW as deletes are intensive and shard cleanup is async @Category(StressTest.class) public void writeThousandsDelete() throws InterruptedException, ExecutionException, MigrationException, UnsupportedEncodingException { - final Id sourceId = IdGenerator.createId( "source" ); - final String edgeType = "test"; + final Id sourceId = IdGenerator.createId( "sourceDelete" ); + final String deleteEdgeType = "testDelete_"+ UUIDGenerator.newTimeUUID().toString(); final EdgeGenerator generator = new EdgeGenerator() { @Override public Edge newEdge() { - Edge edge = createEdge( sourceId, edgeType, IdGenerator.createId( "target" ) ); + Edge edge = createEdge( sourceId, deleteEdgeType, IdGenerator.createId( "targetDelete" ) ); return edge; @@ -413,18 +427,17 @@ public class GraphManagerShardConsistencyIT { @Override public Observable<MarkedEdge> doSearch( final GraphManager manager ) { return manager.loadEdgesFromSource( - new SimpleSearchByEdgeType( sourceId, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, + new SimpleSearchByEdgeType( sourceId, deleteEdgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent(), false ) ); } }; - // final int numInjectors = 2; final int numInjectors = 1; /** - * create 3 injectors. This way all the caches are independent of one another. This is the same as - * multiple nodes + * create injectors. This way all the caches are independent of one another. This is the same as + * multiple nodes if there are multiple injectors */ final List<Injector> injectors = createInjectors( numInjectors ); @@ -449,7 +462,7 @@ public class GraphManagerShardConsistencyIT { final long workerWriteLimit = numberOfEdges / numWorkersPerInjector / numInjectors; - createExecutor( numWorkersPerInjector ); + createDeleteExecutor( numWorkersPerInjector ); final AtomicLong writeCounter = new AtomicLong(); @@ -472,7 +485,7 @@ public class GraphManagerShardConsistencyIT { for ( int i = 0; i < numWorkersPerInjector; i++ ) { Future<Boolean> future = - executor.submit( new Worker( gmf, generator, workerWriteLimit, minExecutionTime, writeCounter ) ); + deleteExecutor.submit( new Worker( gmf, generator, workerWriteLimit, minExecutionTime, writeCounter ) ); futures.add( future ); } @@ -488,14 +501,14 @@ public class GraphManagerShardConsistencyIT { //now get all our shards final NodeShardCache cache = getInstance( injectors, NodeShardCache.class ); - final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( sourceId, edgeType ); + final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( sourceId, deleteEdgeType ); //now submit the readers. final GraphManagerFactory gmf = getInstance( injectors, GraphManagerFactory.class ); final long writeCount = writeCounter.get(); - final Meter readMeter = registry.meter( "readThroughput" ); + final Meter readMeter = registry.meter( "readThroughput-deleteTest" ); //check our shard state @@ -526,11 +539,28 @@ public class GraphManagerShardConsistencyIT { long count = Long.MAX_VALUE; + Thread.sleep(3000); // let's make sure everything is written + + long totalDeleted = 0; + while(count != 0) { - //take 10000 then sleep - count = generator.doSearch( manager ).onBackpressureBlock().take( 1000 ).flatMap( edge -> manager.markEdge( edge ) ) + + logger.info("total deleted: {}", totalDeleted); + if(count != Long.MAX_VALUE) { // count starts with Long.MAX + logger.info("deleted {} entities, continuing until count is 0", count); + } + //take 1000 then sleep + count = generator.doSearch( manager ).take( 1000 ) + .filter(markedEdge -> { + + // if it's already been marked let's filter, move on as async deleteEdge() + logger.trace("edge already marked, may indicated a problem with gm.deleteEdge(): {}", markedEdge); + return !markedEdge.isDeleted(); + }) + .flatMap( edge -> manager.markEdge( edge )) .flatMap( edge -> manager.deleteEdge( edge ) ).countLong().toBlocking().last(); + totalDeleted += count; Thread.sleep( 500 ); } @@ -541,7 +571,7 @@ public class GraphManagerShardConsistencyIT { /** * Start reading continuously while we migrate data to ensure our view is always correct */ - final ListenableFuture<Long> future = executor.submit( new ReadWorker( gmf, generator, 0, readMeter ) ); + final ListenableFuture<Long> future = deleteExecutor.submit( new ReadWorker( gmf, generator, 0, readMeter ) ); final List<Throwable> failures = new ArrayList<>(); @@ -552,7 +582,7 @@ public class GraphManagerShardConsistencyIT { @Override public void onSuccess( @Nullable final Long result ) { logger.info( "Successfully ran the read, re-running" ); - executor.submit( new ReadWorker( gmf, generator, writeCount, readMeter ) ); + deleteExecutor.submit( new ReadWorker( gmf, generator, writeCount, readMeter ) ); } @@ -606,8 +636,8 @@ public class GraphManagerShardConsistencyIT { } - //we're done, 1 shard remains, we have a group, and it's our default shard - if ( shardCount == 1 && group != null && group.getMinShard().getShardIndex() == Shard.MIN_SHARD.getShardIndex() ) { + // we're done, 1 shard remains, we have a group, and it's our default shard + if ( shardCount == 1 && group.getMinShard().getShardIndex() == Shard.MIN_SHARD.getShardIndex() ) { logger.info( "All compactions complete," ); break; @@ -619,7 +649,7 @@ public class GraphManagerShardConsistencyIT { //now that we have finished expanding s - executor.shutdownNow(); + deleteExecutor.shutdownNow(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/b112488d/stack/corepersistence/graph/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/resources/log4j.properties b/stack/corepersistence/graph/src/test/resources/log4j.properties index 79401c3..5afc288 100644 --- a/stack/corepersistence/graph/src/test/resources/log4j.properties +++ b/stack/corepersistence/graph/src/test/resources/log4j.properties @@ -37,8 +37,9 @@ log4j.logger.cassandra.db=ERROR #log4j.logger.org.apache.usergrid.persistence.core.rx=TRACE #log4j.logger.org.apache.usergrid.persistence.core.astyanax=TRACE #log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.parse=TRACE -#log4j.logger.org.apache.usergrid.persistence.core.astyanax.MultiRowColumnIterator=INFO -#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardsColumnIterator=INFO -#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupColumnIterator=INFO -#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup=INFO +#log4j.logger.org.apache.usergrid.persistence.core.astyanax.MultiRowColumnIterator=TRACE +#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardsColumnIterator=TRACE +#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupColumnIterator=TRACE +#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup=TRACE +#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup=TRACE
