Fix issue where new shards were not picked up after new shards are allocated.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/97719684 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/97719684 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/97719684 Branch: refs/heads/release-2.1.1 Commit: 9771968408658e8ce4c14db305bc10fc173d65d8 Parents: 85cd12d Author: Michael Russo <[email protected]> Authored: Tue Mar 22 18:16:44 2016 -0700 Committer: Michael Russo <[email protected]> Committed: Tue Mar 22 18:16:44 2016 -0700 ---------------------------------------------------------------------- .../astyanax/MultiRowShardColumnIterator.java | 2 +- .../impl/shard/NodeShardCache.java | 7 +++- .../shard/impl/EdgeShardSerializationImpl.java | 1 + .../shard/impl/NodeShardAllocationImpl.java | 23 +++++----- .../impl/shard/impl/NodeShardCacheImpl.java | 11 +++++ .../shard/impl/ShardGroupCompactionImpl.java | 23 +++++----- .../graph/GraphManagerShardConsistencyIT.java | 28 +++++++++---- .../impl/shard/NodeShardAllocationTest.java | 44 ++++++++++++++++---- .../impl/shard/ShardGroupCompactionTest.java | 4 +- .../graph/src/test/resources/log4j.properties | 4 +- 10 files changed, 103 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/97719684/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java index b13d0f5..86e3b4d 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java @@ -43,7 +43,7 @@ import com.netflix.astyanax.util.RangeBuilder; */ public class MultiRowShardColumnIterator<R, C, T> implements Iterator<T> { - private static final Logger logger = LoggerFactory.getLogger( MultiRowColumnIterator.class ); + private static final Logger logger = LoggerFactory.getLogger( MultiRowShardColumnIterator.class ); private final int pageSize; http://git-wip-us.apache.org/repos/asf/usergrid/blob/97719684/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java index 173b89d..23c2c25 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java @@ -38,7 +38,7 @@ public interface NodeShardCache { * @param timestamp The time to select the slice for. * @param directedEdgeMeta The directed edge meta data */ - public ShardEntryGroup getWriteShardGroup( final ApplicationScope scope, + ShardEntryGroup getWriteShardGroup( final ApplicationScope scope, final long timestamp, final DirectedEdgeMeta directedEdgeMeta ); /** @@ -49,6 +49,9 @@ public interface NodeShardCache { * @param directedEdgeMeta The directed edge meta data * @return */ - public Iterator<ShardEntryGroup> getReadShardGroup( final ApplicationScope scope, final long maxTimestamp, final DirectedEdgeMeta directedEdgeMeta ); + Iterator<ShardEntryGroup> getReadShardGroup( final ApplicationScope scope, final long maxTimestamp, final DirectedEdgeMeta directedEdgeMeta ); + + + void invalidate(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/97719684/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java index 5eeeae0..76a0922 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java @@ -39,6 +39,7 @@ import org.apache.usergrid.persistence.core.util.ValidationUtils; import org.apache.usergrid.persistence.graph.GraphFig; import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta; import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization; +import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardCache; import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard; import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize.EdgeShardRowKeySerializer; import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize.ShardSerializer; http://git-wip-us.apache.org/repos/asf/usergrid/blob/97719684/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java index 6b190a1..a6cf378 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java @@ -23,6 +23,7 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl; import java.util.Collections; import java.util.Iterator; +import org.apache.usergrid.persistence.graph.serialization.impl.shard.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,14 +34,6 @@ import org.apache.usergrid.persistence.graph.GraphFig; import org.apache.usergrid.persistence.graph.MarkedEdge; import org.apache.usergrid.persistence.graph.SearchByEdgeType; import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException; -import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta; -import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies; -import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization; -import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardAllocation; -import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard; -import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup; -import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupCompaction; -import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization; import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation; import com.google.common.base.Optional; @@ -65,19 +58,22 @@ public class NodeShardAllocationImpl implements NodeShardAllocation { private final TimeService timeService; private final GraphFig graphFig; private final ShardGroupCompaction shardGroupCompaction; + private final NodeShardCache nodeShardCache; @Inject public NodeShardAllocationImpl( final EdgeShardSerialization edgeShardSerialization, final EdgeColumnFamilies edgeColumnFamilies, final ShardedEdgeSerialization shardedEdgeSerialization, final TimeService timeService, - final GraphFig graphFig, final ShardGroupCompaction shardGroupCompaction ) { + final GraphFig graphFig, final ShardGroupCompaction shardGroupCompaction, + final NodeShardCache nodeShardCache) { this.edgeShardSerialization = edgeShardSerialization; this.edgeColumnFamilies = edgeColumnFamilies; this.shardedEdgeSerialization = shardedEdgeSerialization; this.timeService = timeService; this.graphFig = graphFig; this.shardGroupCompaction = shardGroupCompaction; + this.nodeShardCache = nodeShardCache; } @@ -101,7 +97,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation { //logger.info("existing shards has something: {}", existingShards.hasNext()); /** - * We didn't get anything out of cassandra, so we need to create the minumum shard + * We didn't get anything out of cassandra, so we need to create the minimum shard */ if ( existingShards == null || !existingShards.hasNext() ) { @@ -250,6 +246,13 @@ public class NodeShardAllocationImpl implements NodeShardAllocation { try { batch.execute(); + + if(logger.isTraceEnabled()) { + logger.trace("Clearing shard cache"); + } + + // invalidate the shard cache so we can be sure that all read shards are up to date + nodeShardCache.invalidate(); } catch ( ConnectionException e ) { throw new RuntimeException( "Unable to connect to casandra", e ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/97719684/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java index 1a88ebb..5eaaaa0 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java @@ -34,6 +34,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import com.google.inject.Singleton; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,6 +69,7 @@ import com.google.inject.Inject; * Simple implementation of the shard. Uses a local Guava shard with a timeout. If a value is not present in the * shard, it will need to be searched via cassandra. */ +@Singleton public class NodeShardCacheImpl implements NodeShardCache { private static final Logger logger = LoggerFactory.getLogger( NodeShardCacheImpl.class ); @@ -171,6 +173,9 @@ public class NodeShardCacheImpl implements NodeShardCache { throw new GraphRuntimeException( "Unable to load shard key for graph", e ); } + // do this if wanting to bypass the cache for getting the read shards + //entry = new CacheEntry(nodeShardAllocation.getShards( key.scope, Optional.<Shard>absent(), key.directedEdgeMeta )); + Iterator<ShardEntryGroup> iterator = entry.getShards( maxTimestamp ); if ( iterator == null ) { @@ -180,6 +185,12 @@ public class NodeShardCacheImpl implements NodeShardCache { return iterator; } + @Override + public void invalidate(){ + + graphs.invalidateAll(); + + } /** * This is a race condition. We could re-init the shard while another thread is reading it. This is fine, the read http://git-wip-us.apache.org/repos/asf/usergrid/blob/97719684/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java index b88c52c..7854c3b 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java @@ -27,15 +27,11 @@ import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; import com.google.common.base.Optional; -import com.netflix.astyanax.connectionpool.OperationResult; -import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey; -import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.serialization.impl.shard.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,8 +58,6 @@ import com.google.inject.Singleton; import com.netflix.astyanax.Keyspace; import com.netflix.astyanax.MutationBatch; import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; -import rx.Observable; -import rx.schedulers.Schedulers; /** @@ -94,6 +88,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { private final Random random; private final ShardCompactionTaskTracker shardCompactionTaskTracker; private final ShardAuditTaskTracker shardAuditTaskTracker; + private final NodeShardCache nodeShardCache; @Inject @@ -102,7 +97,8 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { final ShardedEdgeSerialization shardedEdgeSerialization, final EdgeColumnFamilies edgeColumnFamilies, final Keyspace keyspace, final EdgeShardSerialization edgeShardSerialization, - final AsyncTaskExecutor asyncTaskExecutor) { + final AsyncTaskExecutor asyncTaskExecutor, + final NodeShardCache nodeShardCache ) { this.timeService = timeService; this.countAudits = new AtomicLong(); @@ -119,6 +115,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { this.taskExecutor = asyncTaskExecutor.getExecutorService(); + this.nodeShardCache = nodeShardCache; } @@ -319,7 +316,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { if ( totalEdgeCount == 0 ) { - //now that we've marked our target as compacted, we can successfully remove any shards that are not + // now that we've marked our target as compacted, we can successfully remove any shards that are not // compacted themselves in the sources final MutationBatch shardRemovalRollup = keyspace.prepareMutationBatch(); @@ -342,6 +339,9 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { try { shardRemovalRollup.execute(); + + // invalidate the shard cache so we can be sure that all read shards are up to date + nodeShardCache.invalidate(); } catch ( ConnectionException e ) { throw new RuntimeException( "Unable to connect to cassandra", e ); @@ -357,6 +357,9 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { final MutationBatch updateMark = edgeShardSerialization.writeShardMeta( scope, compactedShard, edgeMeta ); try { updateMark.execute(); + + // invalidate the shard cache so we can be sure that all read shards are up to date + nodeShardCache.invalidate(); } catch ( ConnectionException e ) { throw new RuntimeException( "Unable to connect to cassandra", e ); @@ -598,15 +601,11 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { addToHash( hasher, scope.getApplication() ); - /** Commenting the full meta from the hash so we allocate/compact shards in a more controlled fashion - for ( DirectedEdgeMeta.NodeMeta nodeMeta : directedEdgeMeta.getNodes() ) { addToHash( hasher, nodeMeta.getId() ); hasher.putInt( nodeMeta.getNodeType().getStorageValue() ); } - **/ - /** * Add our edge type http://git-wip-us.apache.org/repos/asf/usergrid/blob/97719684/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java index 1ce23b9..652c8d6 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java @@ -105,6 +105,8 @@ public class GraphManagerShardConsistencyIT { protected int TARGET_NUM_SHARDS = 5; + protected int POST_WRITE_SLEEP = 2000; + @Before @@ -175,7 +177,7 @@ public class GraphManagerShardConsistencyIT { public void writeThousandsSingleSource() throws InterruptedException, ExecutionException, MigrationException, UnsupportedEncodingException { - final Id sourceId = IdGenerator.createId( "sourceWrite_"+ UUIDGenerator.newTimeUUID().toString() ); + final Id sourceId = IdGenerator.createId( "sourceWrite" ); final String edgeType = "testWrite"; final EdgeGenerator generator = new EdgeGenerator() { @@ -183,7 +185,7 @@ public class GraphManagerShardConsistencyIT { @Override public Edge newEdge() { - Edge edge = createEdge( sourceId, edgeType, IdGenerator.createId( "targetWrite_"+ UUIDGenerator.newTimeUUID().toString() ) ); + Edge edge = createEdge( sourceId, edgeType, IdGenerator.createId( "targetWrite" ) ); return edge; @@ -199,7 +201,7 @@ public class GraphManagerShardConsistencyIT { }; - final int numInjectors = 1; + final int numInjectors = 2; /** * create injectors. This way all the caches are independent of one another. This is the same as @@ -277,9 +279,11 @@ public class GraphManagerShardConsistencyIT { final List<Throwable> failures = new ArrayList<>(); - Thread.sleep(3000); // let's make sure everything is written - for(int i = 0; i < 2; i ++) { + logger.info("Sleeping {}ms before reading to ensure all compactions have completed", POST_WRITE_SLEEP); + Thread.sleep(POST_WRITE_SLEEP); // let's make sure everything is written + + for(int i = 0; i < 1; i ++) { /** @@ -303,6 +307,16 @@ public class GraphManagerShardConsistencyIT { public void onFailure( final Throwable t ) { failures.add( t ); logger.error( "Failed test!", t ); + + final Iterator<ShardEntryGroup> groups = cache.getReadShardGroup( scope, Long.MAX_VALUE, directedEdgeMeta ); + + while ( groups.hasNext() ) { + + logger.info( "Shard entry group: {}", groups.next() ); + + } + + } } ); } @@ -409,7 +423,7 @@ public class GraphManagerShardConsistencyIT { throws InterruptedException, ExecutionException, MigrationException, UnsupportedEncodingException { final Id sourceId = IdGenerator.createId( "sourceDelete" ); - final String deleteEdgeType = "testDelete_"+ UUIDGenerator.newTimeUUID().toString(); + final String deleteEdgeType = "testDelete"; final EdgeGenerator generator = new EdgeGenerator() { @@ -432,7 +446,7 @@ public class GraphManagerShardConsistencyIT { }; - final int numInjectors = 2; + final int numInjectors = 3; /** * create injectors. This way all the caches are independent of one another. This is the same as http://git-wip-us.apache.org/repos/asf/usergrid/blob/97719684/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java index 6671dec..00406c0 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java @@ -101,10 +101,12 @@ public class NodeShardAllocationTest { final TimeService timeService = mock( TimeService.class ); + final NodeShardCache nodeShardCache = mock( NodeShardCache.class); + NodeShardAllocation approximation = new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization, - timeService, graphFig, shardGroupCompaction ); + timeService, graphFig, shardGroupCompaction, nodeShardCache ); final long timeservicetime = System.currentTimeMillis(); @@ -131,10 +133,13 @@ public class NodeShardAllocationTest { final TimeService timeService = mock( TimeService.class ); + final NodeShardCache nodeShardCache = mock( NodeShardCache.class); + + NodeShardAllocation approximation = new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization, - timeService, graphFig, shardGroupCompaction ); + timeService, graphFig, shardGroupCompaction, nodeShardCache); final Id nodeId = IdGenerator.createId( "test" ); final String type = "type"; @@ -172,9 +177,12 @@ public class NodeShardAllocationTest { final TimeService timeService = mock( TimeService.class ); + final NodeShardCache nodeShardCache = mock( NodeShardCache.class); + + NodeShardAllocation approximation = new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization, - timeService, graphFig, shardGroupCompaction ); + timeService, graphFig, shardGroupCompaction, nodeShardCache ); final Id nodeId = IdGenerator.createId( "test" ); final String type = "type"; @@ -216,9 +224,12 @@ public class NodeShardAllocationTest { final TimeService timeService = mock( TimeService.class ); + final NodeShardCache nodeShardCache = mock( NodeShardCache.class); + + NodeShardAllocation approximation = new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization, - timeService, graphFig, shardGroupCompaction ); + timeService, graphFig, shardGroupCompaction, nodeShardCache ); final Id nodeId = IdGenerator.createId( "test" ); final String type = "type"; @@ -326,9 +337,12 @@ public class NodeShardAllocationTest { final TimeService timeService = mock( TimeService.class ); + final NodeShardCache nodeShardCache = mock( NodeShardCache.class); + + NodeShardAllocation approximation = new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization, - timeService, graphFig, shardGroupCompaction ); + timeService, graphFig, shardGroupCompaction, nodeShardCache ); final Id nodeId = IdGenerator.createId( "test" ); final String type = "type"; @@ -412,9 +426,12 @@ public class NodeShardAllocationTest { final TimeService timeService = mock( TimeService.class ); + final NodeShardCache nodeShardCache = mock( NodeShardCache.class); + + NodeShardAllocation approximation = new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization, - timeService, graphFig, shardGroupCompaction ); + timeService, graphFig, shardGroupCompaction, nodeShardCache ); final Id nodeId = IdGenerator.createId( "test" ); final String type = "type"; @@ -475,10 +492,13 @@ public class NodeShardAllocationTest { final TimeService timeService = mock( TimeService.class ); + final NodeShardCache nodeShardCache = mock( NodeShardCache.class); + + NodeShardAllocation approximation = new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization, - timeService, graphFig, shardGroupCompaction ); + timeService, graphFig, shardGroupCompaction, nodeShardCache ); final Id nodeId = IdGenerator.createId( "test" ); final String type = "type"; @@ -613,9 +633,12 @@ public class NodeShardAllocationTest { final MutationBatch batch = mock( MutationBatch.class ); + final NodeShardCache nodeShardCache = mock( NodeShardCache.class); + + NodeShardAllocation approximation = new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization, - timeService, graphFig, shardGroupCompaction ); + timeService, graphFig, shardGroupCompaction, nodeShardCache ); final Id nodeId = IdGenerator.createId( "test" ); final String type = "type"; @@ -704,9 +727,12 @@ public class NodeShardAllocationTest { when( graphFig.getShardMinDelta() ).thenReturn( tooSmallDelta ); + final NodeShardCache nodeShardCache = mock( NodeShardCache.class); + + NodeShardAllocation approximation = new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization, - timeService, graphFig, shardGroupCompaction ); + timeService, graphFig, shardGroupCompaction, nodeShardCache ); /** http://git-wip-us.apache.org/repos/asf/usergrid/blob/97719684/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java index 65f19ff..666e30a 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java @@ -99,6 +99,8 @@ public class ShardGroupCompactionTest { final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class ); + final NodeShardCache nodeShardCache = mock( NodeShardCache.class ); + final long delta = 10000; @@ -116,7 +118,7 @@ public class ShardGroupCompactionTest { ShardGroupCompactionImpl compaction = new ShardGroupCompactionImpl( timeService, graphFig, nodeShardAllocation, shardedEdgeSerialization, - edgeColumnFamilies, keyspace, edgeShardSerialization, asyncTaskExecutor ); + edgeColumnFamilies, keyspace, edgeShardSerialization, asyncTaskExecutor, nodeShardCache ); DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( IdGenerator.createId( "source" ), "test" ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/97719684/stack/corepersistence/graph/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/resources/log4j.properties b/stack/corepersistence/graph/src/test/resources/log4j.properties index e7f7524..5c6b045 100644 --- a/stack/corepersistence/graph/src/test/resources/log4j.properties +++ b/stack/corepersistence/graph/src/test/resources/log4j.properties @@ -37,10 +37,10 @@ log4j.logger.cassandra.db=ERROR #log4j.logger.org.apache.usergrid.persistence.core.rx=TRACE #log4j.logger.org.apache.usergrid.persistence.core.astyanax=TRACE #log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.parse=TRACE -#log4j.logger.org.apache.usergrid.persistence.core.astyanax.MultiRowColumnIterator=TRACE +#log4j.logger.org.apache.usergrid.persistence.core.astyanax.MultiRowShardColumnIterator=TRACE #log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardsColumnIterator=TRACE #log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupColumnIterator=TRACE -#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup=TRACE +#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardEntryGroupIterator=TRACE #log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup=TRACE #log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.NodeShardAllocationImpl=TRACE
