Repository: usergrid Updated Branches: refs/heads/USERGRID-909 5f7387cd1 -> 101e9f96d
Updated tests and logic Lowered audit chance to 1% Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/101e9f96 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/101e9f96 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/101e9f96 Branch: refs/heads/USERGRID-909 Commit: 101e9f96df09000222bb47a98b06bff27dd41bac Parents: 5f7387c Author: Todd Nine <[email protected]> Authored: Wed Aug 26 18:29:15 2015 -0600 Committer: Todd Nine <[email protected]> Committed: Wed Aug 26 18:29:15 2015 -0600 ---------------------------------------------------------------------- .../usergrid/persistence/graph/GraphFig.java | 13 +- .../impl/shard/EdgeShardSerialization.java | 18 +- .../impl/shard/NodeShardAllocation.java | 4 +- .../impl/shard/ShardConsistency.java | 5 - .../shard/impl/EdgeShardSerializationImpl.java | 26 +- .../shard/impl/NodeShardAllocationImpl.java | 126 +++-- .../impl/shard/impl/NodeShardCacheImpl.java | 6 +- .../impl/shard/impl/ShardConsistencyImpl.java | 6 +- .../graph/GraphManagerShardConsistencyIT.java | 40 +- .../impl/shard/NodeShardAllocationTest.java | 5 +- .../impl/shard/NodeShardCacheTest.java | 4 +- .../shard/count/NodeShardApproximationTest.java | 471 ------------------- 12 files changed, 114 insertions(+), 610 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/101e9f96/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java index d0df2eb..0517bf9 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java @@ -80,7 +80,11 @@ public interface GraphFig extends GuicyFig { int getRepairConcurrentSize(); - @Default( ".10" ) + /** + * A 1% repair chance. On average we'll check to repair on 1 out of every 100 reads + * @return + */ + @Default( ".01" ) @Key( SHARD_REPAIR_CHANCE ) double getShardRepairChance(); @@ -120,12 +124,5 @@ public interface GraphFig extends GuicyFig { @Default( "CL_LOCAL_QUORUM" ) @Key( SHARD_READ_CONSISTENCY ) String getShardReadConsistency(); - - /** - * Get the consistency level for performing a shard audit - */ - @Default( "CL_EACH_QUORUM" ) - @Key( SHARD_AUDIT_CONSISTENCY ) - String getShardAuditConsistency(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/101e9f96/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerialization.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerialization.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerialization.java index d8c561f..63b7594 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerialization.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerialization.java @@ -40,7 +40,7 @@ public interface EdgeShardSerialization extends Migration{ * @param shard The shard to write * @param directedEdgeMeta The edge meta data to use */ - public MutationBatch writeShardMeta( ApplicationScope scope, Shard shard, DirectedEdgeMeta directedEdgeMeta ); + MutationBatch writeShardMeta( ApplicationScope scope, Shard shard, DirectedEdgeMeta directedEdgeMeta ); /** * Get an iterator of all meta data and types. Returns a range from High to low. Only reads the local region @@ -49,20 +49,10 @@ public interface EdgeShardSerialization extends Migration{ * @param directedEdgeMeta The edge meta data to use * @return */ - public Iterator<Shard> getShardMetaDataLocal( ApplicationScope scope, Optional<Shard> start, - DirectedEdgeMeta directedEdgeMeta ); - - - /** - * Get an iterator of all meta data and types. Returns a range from High to low. Reads quorum of all regions - * @param scope The organization scope - * @param start The shard time to start seeking from. Values <= this value will be returned. - * @param directedEdgeMeta The edge meta data to use - * @return - */ - Iterator<Shard> getShardMetaDataAudit( ApplicationScope scope, Optional<Shard> start, + Iterator<Shard> getShardMetaDataLocal( ApplicationScope scope, Optional<Shard> start, DirectedEdgeMeta directedEdgeMeta ); + /** * Remove the shard from the edge meta data from the types. @@ -71,6 +61,6 @@ public interface EdgeShardSerialization extends Migration{ * @param directedEdgeMeta The edge meta data to use * @return */ - public MutationBatch removeShardMeta( ApplicationScope scope, Shard shard, DirectedEdgeMeta directedEdgeMeta ); + MutationBatch removeShardMeta( ApplicationScope scope, Shard shard, DirectedEdgeMeta directedEdgeMeta ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/101e9f96/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java index 5039b35..e7be0b0 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java @@ -42,8 +42,8 @@ public interface NodeShardAllocation { * @param directedEdgeMeta The directed edge metadata to use * @return A list of all shards <= the current shard. This will always return 0l if no shards are allocated */ - Iterator<ShardEntryGroup> getShards( final ApplicationScope scope, Optional<Shard> maxShardId, - final DirectedEdgeMeta directedEdgeMeta ); + Iterator<ShardEntryGroup> getShardsLocal( final ApplicationScope scope, Optional<Shard> maxShardId, + final DirectedEdgeMeta directedEdgeMeta ); /** http://git-wip-us.apache.org/repos/asf/usergrid/blob/101e9f96/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardConsistency.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardConsistency.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardConsistency.java index 5c52af6..6f992eb 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardConsistency.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardConsistency.java @@ -38,9 +38,4 @@ public interface ShardConsistency { */ ConsistencyLevel getShardReadConsistency(); - /** - * Get the consistency level for performing a shard audit - * @return - */ - ConsistencyLevel getShardAuditConsistency(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/101e9f96/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java index 8ccf809..d6cd998 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java @@ -116,28 +116,6 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization { public Iterator<Shard> getShardMetaDataLocal( final ApplicationScope scope, final Optional<Shard> start, final DirectedEdgeMeta metaData ) { - return getShardMetaDataInternal( scope, start, metaData, shardConsistency.getShardReadConsistency() ); - } - - - @Override - public Iterator<Shard> getShardMetaDataAudit( final ApplicationScope scope, final Optional<Shard> start, - final DirectedEdgeMeta directedEdgeMeta ) { - return getShardMetaDataInternal( scope, start, directedEdgeMeta, shardConsistency.getShardAuditConsistency() ); - } - - - /** - * Get the shard meta data, allowing the caller to specify the consistency level - * @param scope - * @param start - * @param metaData - * @param consistencyLevel - * @return - */ - private Iterator<Shard> getShardMetaDataInternal( final ApplicationScope scope, final Optional<Shard> start, - final DirectedEdgeMeta metaData, - final ConsistencyLevel consistencyLevel ) { ValidationUtils.validateApplicationScope( scope ); GraphValidation.validateDirectedEdgeMeta( metaData ); @@ -162,7 +140,7 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization { final RowQuery<ScopedRowKey<DirectedEdgeMeta>, Long> query = - keyspace.prepareQuery( EDGE_SHARDS ).setConsistencyLevel( consistencyLevel ).getKey( rowKey ) + keyspace.prepareQuery( EDGE_SHARDS ).setConsistencyLevel( shardConsistency.getShardReadConsistency() ).getKey( rowKey ) .autoPaginate( true ).withColumnRange( rangeBuilder.build() ); @@ -170,6 +148,8 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization { } + + @Override public MutationBatch removeShardMeta( final ApplicationScope scope, final Shard shard, final DirectedEdgeMeta metaData ) { http://git-wip-us.apache.org/repos/asf/usergrid/blob/101e9f96/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java index 592d308..eced1a8 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java @@ -59,7 +59,7 @@ import com.netflix.astyanax.util.TimeUUIDUtils; public class NodeShardAllocationImpl implements NodeShardAllocation { - private static final Logger LOG = LoggerFactory.getLogger( NodeShardAllocationImpl.class ); + private static final Logger logger = LoggerFactory.getLogger( NodeShardAllocationImpl.class ); private static final Shard MIN_SHARD = new Shard( 0, 0, true ); @@ -91,8 +91,8 @@ public class NodeShardAllocationImpl implements NodeShardAllocation { @Override - public Iterator<ShardEntryGroup> getShards( final ApplicationScope scope, final Optional<Shard> maxShardId, - final DirectedEdgeMeta directedEdgeMeta ) { + public Iterator<ShardEntryGroup> getShardsLocal( final ApplicationScope scope, final Optional<Shard> maxShardId, + final DirectedEdgeMeta directedEdgeMeta ) { ValidationUtils.validateApplicationScope( scope ); Preconditions.checkNotNull( maxShardId, "maxShardId cannot be null" ); @@ -128,20 +128,30 @@ public class NodeShardAllocationImpl implements NodeShardAllocation { @Override - public boolean auditShard( final ApplicationScope scope, final ShardEntryGroup shardEntryGroup, + public boolean auditShard( final ApplicationScope scope, final ShardEntryGroup lastLoadedShardEntryGroup, final DirectedEdgeMeta directedEdgeMeta ) { ValidationUtils.validateApplicationScope( scope ); - GraphValidation.validateShardEntryGroup( shardEntryGroup ); + GraphValidation.validateShardEntryGroup( lastLoadedShardEntryGroup ); GraphValidation.validateDirectedEdgeMeta( directedEdgeMeta ); - Preconditions.checkNotNull( shardEntryGroup, "shardEntryGroup cannot be null" ); + Preconditions.checkNotNull( lastLoadedShardEntryGroup, "lastLoadedShardEntryGroup cannot be null" ); //we have to read our state from cassandra first to ensure we have an up to date view from other regions + //read our shard entry groups to ensure we have a current state + final Iterator<ShardEntryGroup> shardEntryGroupIterator = + getCurrentStateIterator( scope, lastLoadedShardEntryGroup, directedEdgeMeta ); + if ( !shardEntryGroupIterator.hasNext() ) { + logger.warn( "Could not read our shard entries. Our state is unknown, short circuiting" ); + return false; + } + + final ShardEntryGroup shardEntryGroup = shardEntryGroupIterator.next(); + /** * Nothing to do, it's been created very recently, we don't create a new one */ @@ -155,37 +165,37 @@ public class NodeShardAllocationImpl implements NodeShardAllocation { } - /** - * Check the min shard in our system - */ - final Shard shard = shardEntryGroup.getMinShard(); + final Shard minShard = shardEntryGroup.getMinShard(); /** * Check out if we have a count for our shard allocation */ - final long count = nodeShardApproximation.getCount( scope, shard, directedEdgeMeta ); + final long count = nodeShardApproximation.getCount( scope, minShard, directedEdgeMeta ); final long shardSize = graphFig.getShardSize(); if ( count < shardSize ) { + logger.debug( "Our count is less than our shard size, not allocating a new shard" ); return false; } - if ( LOG.isDebugEnabled() ) { - LOG.debug( "Count of {} has exceeded shard config of {} will begin compacting", count, shardSize ); - } + + logger.debug( "Count of {} has exceeded shard config of {} will begin compacting", count, shardSize ); /** - * We want to allocate a new shard as close to the max value as possible. This way if we're filling up a shard rapidly, we split it near the head of the values. - * Further checks to this group will result in more splits, similar to creating a tree type structure and splitting each node. + * We want to allocate a new shard as close to the max value as possible. This way if we're filling up a + * shard rapidly, we split it near the head of the values. * - * This means that the lower shard can be re-split later if it is still too large. We do the division to truncate - * to a split point < what our current max is that would be approximately be our pivot ultimately if we split from the - * lower bound and moved forward. Doing this will stop the current shard from expanding and avoid a point where we cannot - * ultimately compact to the correct shard size. + * Further checks to this group will result in more splits, similar to creating a tree type structure and + * splitting each node. + * + * This means that the lower shard can be re-split later if it is still too large. We do the division to + * truncate to a split point < what our current max is that would be approximately be our pivot ultimately + * if we split from the lower bound and moved forward. Doing this will stop the current shard from expanding + * and avoid a point where we cannot ultimately compact to the correct shard size due to excessive tombstones. */ @@ -199,7 +209,8 @@ public class NodeShardAllocationImpl implements NodeShardAllocation { if ( !edges.hasNext() ) { - LOG.warn( "Tried to allocate a new shard for edge meta data {}, " + "but no max value could be found in that row", + logger.warn( + "Tried to allocate a new shard for edge meta data {}, " + "but no max value could be found in that row", directedEdgeMeta ); return false; } @@ -230,7 +241,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation { * Sanity check in case our counters become severely out of sync with our edge state in cassandra. */ if ( marked == null ) { - LOG.warn( "Incorrect shard count for shard group {}, ignoring", shardEntryGroup ); + logger.warn( "Incorrect shard count for shard group {}, ignoring", shardEntryGroup ); return false; } @@ -238,7 +249,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation { final Shard newShard = new Shard( marked.getTimestamp(), createTimestamp, false ); - LOG.info( "Allocating new shard {} for edge meta {}", newShard, directedEdgeMeta ); + logger.info( "Allocating new shard {} for edge meta {}", newShard, directedEdgeMeta ); final MutationBatch batch = this.edgeShardSerialization.writeShardMeta( scope, newShard, directedEdgeMeta ); @@ -252,50 +263,69 @@ public class NodeShardAllocationImpl implements NodeShardAllocation { return true; } + /** - * Return true if the node has been created within our timeout. If this is the case, we dont' need to check - * cassandra, we know it won't exist - */ - private boolean isNewNode( DirectedEdgeMeta directedEdgeMeta ) { + * Return true if the node has been created within our timeout. If this is the case, we dont' need to check + * cassandra, we know it won't exist + */ + private boolean isNewNode( DirectedEdgeMeta directedEdgeMeta ) { - //TODO: TN this is broken.... - //The timeout is in milliseconds. Time for a time uuid is 1/10000 of a milli, so we need to get the units correct - final long timeNow = timeService.getCurrentTime(); + //TODO: TN this is broken.... + //The timeout is in milliseconds. Time for a time uuid is 1/10000 of a milli, so we need to get the units + // correct + final long timeNow = timeService.getCurrentTime(); - boolean isNew = true; + boolean isNew = true; - for ( DirectedEdgeMeta.NodeMeta node : directedEdgeMeta.getNodes() ) { + for ( DirectedEdgeMeta.NodeMeta node : directedEdgeMeta.getNodes() ) { - //short circuit - if(!isNew || node.getId().getUuid().version() > 2){ - return false; - } + //short circuit + if ( !isNew || node.getId().getUuid().version() > 2 ) { + return false; + } - final long uuidTime = TimeUUIDUtils.getTimeFromUUID( node.getId().getUuid()); + final long uuidTime = TimeUUIDUtils.getTimeFromUUID( node.getId().getUuid() ); - //take our uuid time and add 10 seconds, if the uuid is within 10 seconds of system time, we can consider it "new" - final long newExpirationTimeout = uuidTime + 10000 ; + //take our uuid time and add 10 seconds, if the uuid is within 10 seconds of system time, we can consider + // it "new" + final long newExpirationTimeout = uuidTime + 1000; - //our expiration is after our current time, treat it as new - isNew = isNew && newExpirationTimeout > timeNow; - } + //our expiration is after our current time, treat it as new + isNew = isNew && newExpirationTimeout > timeNow; + } - return isNew; - } + return isNew; + } - private ShardEntryGroupIterator getCurrentStateIterator(final ApplicationScope scope, final ShardEntryGroup shardEntryGroup, - final DirectedEdgeMeta directedEdgeMeta ){ + + /** + * Re-reads our shard groups using our to ensure we get a consistent view of all shards + */ + private Iterator<ShardEntryGroup> getCurrentStateIterator( final ApplicationScope scope, + final ShardEntryGroup shardEntryGroup, + final DirectedEdgeMeta directedEdgeMeta ) { final Shard start = shardEntryGroup.getMaxShard(); - final Iterator<Shard> shards = this.edgeShardSerialization.getShardMetaDataAudit( scope, Optional.fromNullable( start ), directedEdgeMeta ); + //sanity check + if ( start == null ) { + logger.warn( "Could not audit shard group {}. Returning.", shardEntryGroup ); + return Collections.<ShardEntryGroup>emptyList().iterator(); + } + + final Iterator<Shard> shards = this.edgeShardSerialization + .getShardMetaDataLocal( scope, Optional.fromNullable( start ), directedEdgeMeta ); + + if(!shards.hasNext()){ + return Collections.<ShardEntryGroup>emptyList().iterator(); + } return new ShardEntryGroupIterator( shards, NO_OP_COMPACTION, scope, directedEdgeMeta ); } - private final static class NoOpCompaction implements ShardGroupCompaction{ + private final static class NoOpCompaction implements ShardGroupCompaction { @Override public ListenableFuture<AuditResult> evaluateShardGroup( final ApplicationScope scope, http://git-wip-us.apache.org/repos/asf/usergrid/blob/101e9f96/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java index 80375a8..ffd39bf 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java @@ -24,12 +24,8 @@ import java.util.Iterator; import java.util.Map; import java.util.TreeMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.core.util.ValidationUtils; -import org.apache.usergrid.persistence.graph.GraphFig; import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException; import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta; import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardAllocation; @@ -160,7 +156,7 @@ public class NodeShardCacheImpl implements NodeShardCache { private CacheEntry getShards( final CacheKey key ) { final Iterator<ShardEntryGroup> edges = - nodeShardAllocation.getShards( key.scope, Optional.<Shard>absent(), key.directedEdgeMeta ); + nodeShardAllocation.getShardsLocal( key.scope, Optional.<Shard>absent(), key.directedEdgeMeta ); final CacheEntry cacheEntry = new CacheEntry( edges ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/101e9f96/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardConsistencyImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardConsistencyImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardConsistencyImpl.java index c76fae6..a52ec0c 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardConsistencyImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardConsistencyImpl.java @@ -47,12 +47,8 @@ public class ShardConsistencyImpl implements ShardConsistency{ @Override public ConsistencyLevel getShardReadConsistency() { - return null; + return ConsistencyLevel.valueOf( graphFig.getShardReadConsistency() ); } - @Override - public ConsistencyLevel getShardAuditConsistency() { - return null; - } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/101e9f96/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java index fdb0952..b1ac52e 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java @@ -98,10 +98,6 @@ public class GraphManagerShardConsistencyIT { protected Object originalShardSize; - protected Object originalShardTimeout; - - protected Object originalShardDelta; - @Before public void setupOrg() { @@ -158,8 +154,8 @@ public class GraphManagerShardConsistencyIT { }; -// final int numInjectors = 2; - final int numInjectors = 1; + final int numInjectors = 2; +// final int numInjectors = 1; /** * create 3 injectors. This way all the caches are independent of one another. This is the same as @@ -173,11 +169,8 @@ public class GraphManagerShardConsistencyIT { final long shardSize = graphFig.getShardSize(); - //we don't want to starve the cass runtime since it will be on the same box. Only take 50% of processing - // power for writes - final int numProcessors = Runtime.getRuntime().availableProcessors() / 2; - final int numWorkersPerInjector = numProcessors / numInjectors; + final int numWorkersPerInjector = 1; /** @@ -200,9 +193,6 @@ public class GraphManagerShardConsistencyIT { final AtomicLong writeCounter = new AtomicLong(); - //min stop time the min delta + 1 cache cycle timeout - final long minExecutionTime = 10000; - log.info( "Writing {} edges per worker on {} workers in {} injectors", workerWriteLimit, numWorkersPerInjector, numInjectors ); @@ -212,13 +202,15 @@ public class GraphManagerShardConsistencyIT { + //create multiple instances of injectors. This simulates multiple nodes, so that we can ensure we're not + //sharing state in our guice DI for ( Injector injector : injectors ) { final GraphManagerFactory gmf = injector.getInstance( GraphManagerFactory.class ); for ( int i = 0; i < numWorkersPerInjector; i++ ) { Future<Boolean> future = executor - .submit( new Worker( gmf, generator, workerWriteLimit, minExecutionTime, writeCounter ) ); + .submit( new Worker( gmf, generator, workerWriteLimit, writeCounter ) ); futures.add( future ); } @@ -328,7 +320,9 @@ public class GraphManagerShardConsistencyIT { //we're done if ( compactedCount >= expectedShardCount ) { - log.info( "All compactions complete, sleeping" ); + log.info( "All compactions complete. Compacted shards are {}. Expected at least expectedShardCount" ); + + assertEquals("Compacted should match expected", expectedShardCount, compactedCount); // final Object mutex = new Object(); // @@ -385,16 +379,13 @@ public class GraphManagerShardConsistencyIT { private final GraphManagerFactory factory; private final EdgeGenerator generator; private final long writeLimit; - private final long minExecutionTime; private final AtomicLong writeCounter; - private Worker( final GraphManagerFactory factory, final EdgeGenerator generator, final long writeLimit, - final long minExecutionTime, final AtomicLong writeCounter ) { + private Worker( final GraphManagerFactory factory, final EdgeGenerator generator, final long writeLimit, final AtomicLong writeCounter ) { this.factory = factory; this.generator = generator; this.writeLimit = writeLimit; - this.minExecutionTime = minExecutionTime; this.writeCounter = writeCounter; } @@ -404,10 +395,9 @@ public class GraphManagerShardConsistencyIT { GraphManager manager = factory.createEdgeManager( scope ); - final long startTime = System.currentTimeMillis(); - + long i; - for ( long i = 0; i < writeLimit || System.currentTimeMillis() - startTime < minExecutionTime; i++ ) { + for ( i = 0; i < writeLimit ; i++ ) { Edge edge = generator.newEdge(); @@ -428,6 +418,8 @@ public class GraphManagerShardConsistencyIT { } } + log.info( "Completed writing {} edges on worker", i ); + return true; } @@ -528,12 +520,12 @@ public class GraphManagerShardConsistencyIT { /** * Create a new edge to persiste */ - public Edge newEdge(); + Edge newEdge(); /** * Perform the search returning an observable edge */ - public Observable<Edge> doSearch( final GraphManager manager ); + Observable<Edge> doSearch( final GraphManager manager ); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/101e9f96/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java index dae99b1..0fc2eb1 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java @@ -36,7 +36,6 @@ import org.apache.usergrid.persistence.core.util.IdGenerator; import org.apache.usergrid.persistence.graph.GraphFig; import org.apache.usergrid.persistence.graph.MarkedEdge; import org.apache.usergrid.persistence.graph.SearchByIdType; -import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException; import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge; import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.NodeShardAllocationImpl; import org.apache.usergrid.persistence.model.entity.Id; @@ -526,7 +525,7 @@ public class NodeShardAllocationTest { final Iterator<ShardEntryGroup> result = - approximation.getShards( scope, Optional.<Shard>absent(), directedEdgeMeta ); + approximation.getShardsLocal( scope, Optional.<Shard>absent(), directedEdgeMeta ); assertTrue( "Shards present", result.hasNext() ); @@ -616,7 +615,7 @@ public class NodeShardAllocationTest { final Iterator<ShardEntryGroup> result = - approximation.getShards( scope, Optional.<Shard>absent(), directedEdgeMeta ); + approximation.getShardsLocal( scope, Optional.<Shard>absent(), directedEdgeMeta ); ShardEntryGroup shardEntryGroup = result.next(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/101e9f96/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCacheTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCacheTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCacheTest.java index 57996ec..96feb26 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCacheTest.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCacheTest.java @@ -102,7 +102,7 @@ public class NodeShardCacheTest { /** * Simulate returning no shards at all. */ - when( allocation.getShards( same( scope ), same( max ), same( directedEdgeMeta ) ) ) + when( allocation.getShardsLocal( same( scope ), same( max ), same( directedEdgeMeta ) ) ) //use "thenAnswer" so we always return the value, even if it's invoked more than 1 time. .thenAnswer( new Answer<Iterator<ShardEntryGroup>>() { @@ -191,7 +191,7 @@ public class NodeShardCacheTest { /** * Simulate returning no shards at all. */ - when( allocation.getShards( same( scope ), any( Optional.class ), same( directedEdgeMeta ) ) ) + when( allocation.getShardsLocal( same( scope ), any( Optional.class ), same( directedEdgeMeta ) ) ) //use "thenAnswer" so we always return the value, even if it's invoked more than 1 time. .thenAnswer( new Answer<Iterator<ShardEntryGroup>>() { http://git-wip-us.apache.org/repos/asf/usergrid/blob/101e9f96/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java index 71947f8..bcaf637 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java @@ -135,135 +135,6 @@ public class NodeShardApproximationTest { } - @Ignore("outdated and no longer relevant test") - @Test - public void testSingleShardMultipleThreads() throws ExecutionException, InterruptedException { - - - NodeShardCounterSerialization serialization = new TestNodeShardCounterSerialization(); - - final NodeShardApproximation approximation = - new NodeShardApproximationImpl( new TestGraphFig(), serialization, new TestTimeService() ); - - - final int increments = 1000000; - final int workers = Runtime.getRuntime().availableProcessors() * 2; - - final Id id = IdGenerator.createId( "test" ); - final String type = "type"; - final String type2 = "subType"; - - final Shard shard = new Shard( 10000, 0, true ); - - final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromTargetNodeSourceType( id, type, type2 ); - - ExecutorService executor = Executors.newFixedThreadPool( workers ); - - List<Future<Long>> futures = new ArrayList<>( workers ); - - for ( int i = 0; i < workers; i++ ) { - - final Future<Long> future = executor.submit( new Callable<Long>() { - @Override - public Long call() throws Exception { - - for ( int i = 0; i < increments; i++ ) { - approximation.increment( scope, shard, 1, directedEdgeMeta ); - } - - return 0l; - } - } ); - - futures.add( future ); - } - - - for ( Future<Long> future : futures ) { - future.get(); - } - - waitForFlush( approximation ); - //get our count. It should be accurate b/c we only have 1 instance - - final long returnedCount = approximation.getCount( scope, shard, directedEdgeMeta ); - final long expected = workers * increments; - - - assertEquals( expected, returnedCount ); - - //test we get nothing with the other type - - final long emptyCount = - approximation.getCount( scope, shard, DirectedEdgeMeta.fromSourceNodeTargetType( id, type, type2 ) ); - - - assertEquals( 0, emptyCount ); - } - - - @Ignore("outdated and no longer relevant test") - @Test - public void testMultipleShardMultipleThreads() throws ExecutionException, InterruptedException { - - - NodeShardCounterSerialization serialization = new TestNodeShardCounterSerialization(); - - final NodeShardApproximation approximation = - new NodeShardApproximationImpl( new TestGraphFig(), serialization, new TestTimeService() ); - - - final int increments = 1000000; - final int workers = Runtime.getRuntime().availableProcessors() * 2; - - final Id id = IdGenerator.createId( "test" ); - final String type = "type"; - final String type2 = "subType"; - - final AtomicLong shardIdCounter = new AtomicLong(); - - - final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromTargetNodeSourceType( id, type, type2 ); - - - ExecutorService executor = Executors.newFixedThreadPool( workers ); - - List<Future<Shard>> futures = new ArrayList<>( workers ); - - for ( int i = 0; i < workers; i++ ) { - - final Future<Shard> future = executor.submit( new Callable<Shard>() { - @Override - public Shard call() throws Exception { - - final long threadShardId = shardIdCounter.incrementAndGet(); - - final Shard shard = new Shard( threadShardId, 0, true ); - - for ( int i = 0; i < increments; i++ ) { - approximation.increment( scope, shard, 1, directedEdgeMeta ); - } - - return shard; - } - } ); - - futures.add( future ); - } - - - for ( Future<Shard> future : futures ) { - final Shard shardId = future.get(); - - waitForFlush( approximation ); - - final long returnedCount = approximation.getCount( scope, shardId, directedEdgeMeta ); - - assertEquals( increments, returnedCount ); - } - } - - private void waitForFlush( NodeShardApproximation approximation ) throws InterruptedException { approximation.beginFlush(); @@ -277,346 +148,4 @@ public class NodeShardApproximationTest { } - /** - * These are created b/c we can't use Mockito. It OOM's with keeping track of all the mock invocations - */ - - private static class TestNodeShardCounterSerialization implements NodeShardCounterSerialization { - - private Counter copy = new Counter(); - - - @Override - public MutationBatch flush( final Counter counter ) { - copy.merge( counter ); - return new TestMutationBatch(); - } - - - @Override - public long getCount( final ShardKey key ) { - return copy.get( key ); - } - - - @Override - public Collection<MultiTennantColumnFamilyDefinition> getColumnFamilies() { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - } - - - /** - * Simple test mutation to no-op during tests - */ - private - static class TestMutationBatch implements MutationBatch { - - @Override - public <K, C> ColumnListMutation<C> withRow( final ColumnFamily<K, C> columnFamily, final K rowKey ) { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public <K> void deleteRow( final Iterable<? extends ColumnFamily<K, ?>> columnFamilies, final K rowKey ) { - //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public void discardMutations() { - //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public void mergeShallow( final MutationBatch other ) { - //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public boolean isEmpty() { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public int getRowCount() { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public Map<ByteBuffer, Set<String>> getRowKeys() { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public MutationBatch pinToHost( final Host host ) { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public MutationBatch setConsistencyLevel( final ConsistencyLevel consistencyLevel ) { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public MutationBatch withConsistencyLevel( final ConsistencyLevel consistencyLevel ) { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public MutationBatch withRetryPolicy( final RetryPolicy retry ) { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public MutationBatch usingWriteAheadLog( final WriteAheadLog manager ) { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public MutationBatch lockCurrentTimestamp() { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public MutationBatch setTimeout( final long timeout ) { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public MutationBatch setTimestamp( final long timestamp ) { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public MutationBatch withTimestamp( final long timestamp ) { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public MutationBatch withAtomicBatch( final boolean condition ) { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public ByteBuffer serialize() throws Exception { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public void deserialize( final ByteBuffer data ) throws Exception { - //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public OperationResult<Void> execute() throws ConnectionException { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public ListenableFuture<OperationResult<Void>> executeAsync() throws ConnectionException { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - } - - - private static class TestGraphFig implements GraphFig { - - @Override - public int getScanPageSize() { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public int getRepairConcurrentSize() { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public double getShardRepairChance() { - return 0; - } - - - @Override - public long getShardSize() { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - - - - @Override - public int getShardAuditWorkerCount() { - return 0; - } - - - @Override - public int getShardAuditWorkerQueueSize() { - return 0; - } - - - @Override - public long getCounterFlushCount() { - return 100000l; - } - - - @Override - public long getCounterFlushInterval() { - return 30000l; - } - - - @Override - public int getCounterFlushQueueSize() { - return 10000; - } - - - @Override - public String getShardWriteConsistency() { - return null; - } - - - @Override - public String getShardReadConsistency() { - return null; - } - - - @Override - public String getShardAuditConsistency() { - return null; - } - - - @Override - public void addPropertyChangeListener( final PropertyChangeListener propertyChangeListener ) { - //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public void removePropertyChangeListener( final PropertyChangeListener propertyChangeListener ) { - //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public OptionState[] getOptions() { - return new OptionState[0]; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public OptionState getOption( final String s ) { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public String getKeyByMethod( final String s ) { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public Object getValueByMethod( final String s ) { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public Properties filterOptions( final Properties properties ) { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public Map<String, Object> filterOptions( final Map<String, Object> stringObjectMap ) { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public void override( final String s, final String s2 ) { - //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public boolean setOverrides( final Overrides overrides ) { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public Overrides getOverrides() { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public void bypass( final String s, final String s2 ) { - //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public boolean setBypass( final Bypass bypass ) { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public Bypass getBypass() { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public Class getFigInterface() { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - - @Override - public boolean isSingleton() { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - } - - - private static class TestTimeService implements TimeService { - - @Override - public long getCurrentTime() { - return System.currentTimeMillis(); - } - } }
