Repository: usergrid Updated Branches: refs/heads/USERGRID-909 101e9f96d -> a3cf062ce
Updated tests Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/a3cf062c Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/a3cf062c Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/a3cf062c Branch: refs/heads/USERGRID-909 Commit: a3cf062ce75c9480d17c45e5437c845d7882dd7e Parents: 101e9f9 Author: Todd Nine <[email protected]> Authored: Thu Aug 27 16:04:51 2015 -0600 Committer: Todd Nine <[email protected]> Committed: Thu Aug 27 16:04:51 2015 -0600 ---------------------------------------------------------------------- .../usergrid/persistence/graph/GraphFig.java | 5 +++-- .../shard/impl/NodeShardAllocationImpl.java | 23 +++++++++++++++++++- .../shard/impl/ShardGroupCompactionImpl.java | 2 +- .../graph/GraphManagerShardConsistencyIT.java | 10 ++++++--- .../impl/shard/NodeShardAllocationTest.java | 3 +++ .../graph/src/test/resources/log4j.properties | 4 +++- 6 files changed, 39 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/a3cf062c/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java index 0517bf9..73587de 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java @@ -92,11 +92,12 @@ public interface GraphFig extends GuicyFig { @Key( SHARD_SIZE ) long getShardSize(); - @Default( "10" ) + @Default( "1" ) +// @Default( "10" ) @Key( SHARD_AUDIT_WORKERS ) int getShardAuditWorkerCount(); - @Default( "1000" ) + @Default( "1" ) @Key( SHARD_AUDIT_QUEUE_SIZE ) int getShardAuditWorkerQueueSize(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/a3cf062c/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java index eced1a8..60e0376 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java @@ -260,6 +260,25 @@ public class NodeShardAllocationImpl implements NodeShardAllocation { throw new RuntimeException( "Unable to connect to casandra", e ); } + //now re-load again, if our shard we allocated is not the compaction target, then we delete it + final Iterator<ShardEntryGroup> currentStateIterator = + getCurrentStateIterator( scope, lastLoadedShardEntryGroup, directedEdgeMeta ); + + + if ( !currentStateIterator.hasNext() ) { + logger.warn( "Could not read our shard entries. Our state is unknown, short circuiting" ); + return false; + } + + final ShardEntryGroup currentStateGroup = currentStateIterator.next(); + + //we're not the target of compaction, remove ourselves + if ( !newShard.equals( currentStateGroup.getCompactionTarget() ) ) { + logger.debug("Our proposed shard is not the comaction target. Removing shard {}", newShard); + this.edgeShardSerialization.removeShardMeta( scope, newShard, directedEdgeMeta ); + } + + return true; } @@ -314,10 +333,12 @@ public class NodeShardAllocationImpl implements NodeShardAllocation { return Collections.<ShardEntryGroup>emptyList().iterator(); } + logger.debug( "Loading current shard state for shards starting at {}", start ); + final Iterator<Shard> shards = this.edgeShardSerialization .getShardMetaDataLocal( scope, Optional.fromNullable( start ), directedEdgeMeta ); - if(!shards.hasNext()){ + if ( !shards.hasNext() ) { return Collections.<ShardEntryGroup>emptyList().iterator(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/a3cf062c/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java index c1a70e2..f33181f 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java @@ -322,7 +322,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { catch ( RejectedExecutionException ree ) { //ignore, if this happens we don't care, we're saturated, we can check later - LOG.error( "Rejected audit for shard of scope {} edge, meta {} and group {}", scope, edgeMeta, group ); + LOG.info( "Rejected audit for shard of scope {} edge, meta {} and group {}", scope, edgeMeta, group ); return Futures.immediateFuture( AuditResult.NOT_CHECKED ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/a3cf062c/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java index b1ac52e..5b240f9 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java @@ -173,17 +173,21 @@ public class GraphManagerShardConsistencyIT { final int numWorkersPerInjector = 1; + final long expectedShardCount = 4; + /** - * Do 4x shard size so we should have approximately 4 shards + * Do 4x expected shard size so we have 4 shards */ - final long numberOfEdges = shardSize * 4; + final long numberOfEdges = shardSize * expectedShardCount; + + final long workerWriteLimit = numberOfEdges / numWorkersPerInjector; - final long expectedShardCount = numberOfEdges/shardSize; + final ListeningExecutorService http://git-wip-us.apache.org/repos/asf/usergrid/blob/a3cf062c/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java index 0fc2eb1..dc36f69 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java @@ -440,6 +440,9 @@ public class NodeShardAllocationTest { .getEdgesFromSourceByTargetType( same( edgeColumnFamilies ), same( scope ), any( SearchByIdType.class ), any( Collection.class ) ) ).thenReturn( edgeIterator ); + //mock up empty shard read return + when(edgeShardSerialization.getShardMetaDataLocal( same(scope), any(Optional.class), any(DirectedEdgeMeta.class) )).thenReturn( Collections.singleton( + futureShard).iterator() ); final boolean result = approximation.auditShard( scope, shardEntryGroup, targetEdgeMeta ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/a3cf062c/stack/corepersistence/graph/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/resources/log4j.properties b/stack/corepersistence/graph/src/test/resources/log4j.properties index 608ee03..d0dede1 100644 --- a/stack/corepersistence/graph/src/test/resources/log4j.properties +++ b/stack/corepersistence/graph/src/test/resources/log4j.properties @@ -18,7 +18,7 @@ # # suppress inspection "UnusedProperty" for whole file -log4j.rootLogger=INFO,stdout +log4j.rootLogger=WARN,stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout @@ -38,3 +38,5 @@ log4j.logger.cassandra.db=ERROR #log4j.logger.org.apache.usergrid.persistence.core.astyanax=TRACE #log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.parse=TRACE +log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.NodeShardAllocationImpl=TRACE +
