Move to shard deletion being a mark plus read filtering strategy vs. deleting. This is so Usergrid can keep proper shard end times and 're-activate' a shard by flipping the deleted flag when past data is re-written to a collection.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/42c5c4b8 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/42c5c4b8 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/42c5c4b8 Branch: refs/heads/master Commit: 42c5c4b8760b78ea1e8ef9b8d583a92f2a45e1d4 Parents: dbdd243 Author: Michael Russo <[email protected]> Authored: Sat Aug 27 23:24:01 2016 -0700 Committer: Michael Russo <[email protected]> Committed: Sat Aug 27 23:24:01 2016 -0700 ---------------------------------------------------------------------- .../astyanax/MultiRowShardColumnIterator.java | 27 ++ .../persistence/core/shard/SmartShard.java | 9 +- .../impl/shard/NodeShardAllocation.java | 5 +- .../graph/serialization/impl/shard/Shard.java | 17 +- .../impl/shard/impl/EdgeSearcher.java | 17 +- .../shard/impl/NodeShardAllocationImpl.java | 9 +- .../impl/shard/impl/NodeShardCacheImpl.java | 24 +- .../shard/impl/ShardGroupCompactionImpl.java | 2 +- .../impl/shard/impl/ShardGroupDeletionImpl.java | 24 +- .../impl/ShardedEdgeSerializationImpl.java | 106 +++-- .../shard/impl/serialize/ShardSerializer.java | 11 +- .../graph/GraphManagerShardConsistencyIT.java | 396 +++++++++++++++++-- .../impl/shard/NodeShardAllocationTest.java | 4 +- .../impl/shard/NodeShardCacheTest.java | 4 +- 14 files changed, 559 insertions(+), 96 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/42c5c4b8/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java index f3e8d4c..04266e1 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java @@ -127,6 +127,23 @@ public class MultiRowShardColumnIterator<R, C, T> implements Iterator<T> { // advance to the next shard currentShard = currentShardIterator.next(); + // handle marked deleted shards + if( currentShard.isDeleted() && currentShardIterator.hasNext()){ + + if(logger.isTraceEnabled()) { + logger.trace("Shard is marked deleted, advancing to next - {}", currentShard); + } + + currentShard = currentShardIterator.next(); + }else if ( currentShard.isDeleted() && !currentShardIterator.hasNext()){ + + if(logger.isTraceEnabled()) { + logger.trace("Shard is marked deleted, and there is no more - {}", currentShard); + } + + return false; + } + if(logger.isTraceEnabled()){ logger.trace("Shard after advance: {}", currentShard); @@ -221,6 +238,16 @@ public class MultiRowShardColumnIterator<R, C, T> implements Iterator<T> { } } + // skip over shards that are marked deleted + if( currentShard.isDeleted() && currentShardIterator.hasNext() ){ + + if(logger.isTraceEnabled()){ + logger.trace("Shard is marked deleted - {}", currentShard); + } + + currentShard = currentShardIterator.next(); + } + if(logger.isTraceEnabled()){ logger.trace("all shards when starting: {}", rowKeysWithShardEnd); http://git-wip-us.apache.org/repos/asf/usergrid/blob/42c5c4b8/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/shard/SmartShard.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/shard/SmartShard.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/shard/SmartShard.java index 39ddc35..dd6df34 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/shard/SmartShard.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/shard/SmartShard.java @@ -25,13 +25,15 @@ public class SmartShard<R, T> { final ScopedRowKey<R> rowKey; final T shardEnd; final long shardIndex; + final boolean isDeleted; - public SmartShard(final ScopedRowKey<R> rowKey, final long shardIndex, final T shardEnd){ + public SmartShard(final ScopedRowKey<R> rowKey, final long shardIndex, final T shardEnd, final boolean isDeleted){ this.rowKey = rowKey; this.shardIndex = shardIndex; this.shardEnd = shardEnd; + this.isDeleted = isDeleted; } @@ -48,12 +50,15 @@ public class SmartShard<R, T> { return shardIndex; } + public boolean isDeleted(){ + return isDeleted; + } @Override public String toString(){ - return "Shard { rowKey="+rowKey + ", shardEnd="+shardEnd+" }"; + return "Shard { rowKey="+rowKey + ", shardIndex="+shardIndex+", shardEnd="+shardEnd+", isDeleted="+isDeleted+" }"; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/42c5c4b8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java index fcc0fc3..49fde9b 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java @@ -24,8 +24,6 @@ import java.util.Iterator; import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import com.google.common.base.Optional; - /** * Interface used to create and retrieve shards @@ -38,11 +36,10 @@ public interface NodeShardAllocation { * Get all shards for the given info. If none exist, a default shard should be allocated. The nodeId is the source node * * @param scope The application scope - * @param maxShardId The max value to start seeking from. Values <= this will be returned if specified * @param directedEdgeMeta The directed edge metadata to use * @return A list of all shards <= the current shard. This will always return 0l if no shards are allocated */ - public Iterator<ShardEntryGroup> getShards( final ApplicationScope scope, Optional<Shard> maxShardId, final DirectedEdgeMeta directedEdgeMeta ); + public Iterator<ShardEntryGroup> getShards(final ApplicationScope scope, final DirectedEdgeMeta directedEdgeMeta); /** http://git-wip-us.apache.org/repos/asf/usergrid/blob/42c5c4b8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java index 6394703..f92c37a 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java @@ -32,8 +32,9 @@ public class Shard implements Comparable<Shard> { private final long shardIndex; private final long createdTime; - private final boolean compacted; + private boolean compacted; private Optional<DirectedEdge> shardEnd; + private boolean deleted; public Shard( final long shardIndex, final long createdTime, final boolean compacted ) { @@ -41,6 +42,7 @@ public class Shard implements Comparable<Shard> { this.createdTime = createdTime; this.compacted = compacted; this.shardEnd = Optional.absent(); + this.deleted = false; } @@ -67,6 +69,10 @@ public class Shard implements Comparable<Shard> { return compacted; } + public void setCompacted(final boolean compacted){ + this.compacted = compacted; + } + /** * Returns true if this is the minimum shard @@ -84,6 +90,14 @@ public class Shard implements Comparable<Shard> { return shardEnd; } + public boolean isDeleted(){ + return deleted; + } + + public void setDeleted( final boolean deleted){ + this.deleted = deleted; + } + /** * Compare the shards based on the timestamp first, then the created time second @@ -174,6 +188,7 @@ public class Shard implements Comparable<Shard> { }else{ string.append("null"); } + string.append(", isDeleted=").append(deleted); string.append(" }"); return string.toString(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/42c5c4b8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java index 917e943..eb90866 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java @@ -80,6 +80,10 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Colum public List<ScopedRowKey<R>> getRowKeys() { + if(logger.isTraceEnabled()) { + logger.trace("Shards: {}", shards); + } + List<ScopedRowKey<R>> rowKeys = new ArrayList<>(shards.size()); for(Shard shard : shards){ @@ -89,12 +93,18 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Colum rowKeys.add( rowKey ); } + if(logger.isTraceEnabled()) { + logger.trace("Resulting Shards: {}", rowKeys); + } return rowKeys; } public List<SmartShard> getRowKeysWithShardEnd(){ + if(logger.isTraceEnabled()) { + logger.trace("Shards: {}", shards); + } final List<SmartShard> rowKeysWithShardEnd = new ArrayList<>(shards.size()); @@ -111,9 +121,14 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Colum shardEnd = null; } - rowKeysWithShardEnd.add(new SmartShard(rowKey, shard.getShardIndex(), shardEnd)); + rowKeysWithShardEnd.add(new SmartShard(rowKey, shard.getShardIndex(), shardEnd, shard.isDeleted())); } + if(logger.isTraceEnabled()) { + logger.trace("Resulting Smart Shards: {}", rowKeysWithShardEnd); + } + + return rowKeysWithShardEnd; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/42c5c4b8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java index 47b630f..cf9a51c 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java @@ -23,6 +23,7 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl; import java.util.Collections; import java.util.Iterator; +import com.google.common.base.Optional; import org.apache.usergrid.persistence.graph.serialization.impl.shard.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +37,6 @@ import org.apache.usergrid.persistence.graph.SearchByEdgeType; import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException; import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation; -import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.inject.Inject; import com.netflix.astyanax.MutationBatch; @@ -78,11 +78,10 @@ public class NodeShardAllocationImpl implements NodeShardAllocation { @Override - public Iterator<ShardEntryGroup> getShards( final ApplicationScope scope, final Optional<Shard> maxShardId, - final DirectedEdgeMeta directedEdgeMeta ) { + public Iterator<ShardEntryGroup> getShards(final ApplicationScope scope, + final DirectedEdgeMeta directedEdgeMeta) { ValidationUtils.validateApplicationScope( scope ); - Preconditions.checkNotNull( maxShardId, "maxShardId cannot be null" ); GraphValidation.validateDirectedEdgeMeta( directedEdgeMeta ); Iterator<Shard> existingShards; @@ -93,7 +92,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation { } else { - existingShards = edgeShardSerialization.getShardMetaData( scope, maxShardId, directedEdgeMeta ); + existingShards = edgeShardSerialization.getShardMetaData( scope, Optional.absent(), directedEdgeMeta ); /** * We didn't get anything out of cassandra, so we need to create the minimum shard http://git-wip-us.apache.org/repos/asf/usergrid/blob/42c5c4b8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java index 0a259f0..8e3be4f 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java @@ -21,42 +21,26 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl; import java.beans.PropertyChangeEvent; import java.beans.PropertyChangeListener; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; +import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import com.google.inject.Singleton; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.usergrid.persistence.core.consistency.TimeService; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.core.util.ValidationUtils; import org.apache.usergrid.persistence.graph.GraphFig; import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException; -import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta; -import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardAllocation; -import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardCache; -import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard; -import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup; +import org.apache.usergrid.persistence.graph.serialization.impl.shard.*; import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation; import org.apache.usergrid.persistence.graph.serialization.util.IterableUtil; -import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; -import com.google.common.cache.Weigher; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFutureTask; import com.google.common.util.concurrent.ListeningScheduledExecutorService; @@ -164,7 +148,7 @@ public class NodeShardCacheImpl implements NodeShardCache { } else { - entry = new CacheEntry(nodeShardAllocation.getShards( key.scope, Optional.<Shard>absent(), key.directedEdgeMeta )); + entry = new CacheEntry(nodeShardAllocation.getShards( key.scope, key.directedEdgeMeta )); } @@ -330,7 +314,7 @@ public class NodeShardCacheImpl implements NodeShardCache { final Iterator<ShardEntryGroup> edges = - nodeShardAllocation.getShards( key.scope, Optional.<Shard>absent(), key.directedEdgeMeta ); + nodeShardAllocation.getShards( key.scope, key.directedEdgeMeta ); final CacheEntry cacheEntry = new CacheEntry( edges ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/42c5c4b8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java index 0853adb..8db491f 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java @@ -346,7 +346,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { //Overwrite our shard index with a newly created one that has been marked as compacted Shard compactedShard = new Shard( targetShard.getShardIndex(), timeService.getCurrentTime(), true ); - compactedShard.setShardEnd(targetShard.getShardEnd()); + compactedShard.setShardEnd(Optional.absent()); if(logger.isTraceEnabled()) { logger.trace("Shard has been fully compacted. Marking shard {} as compacted in Cassandra", compactedShard); http://git-wip-us.apache.org/repos/asf/usergrid/blob/42c5c4b8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java index 2fc0d50..7d91eda 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java @@ -191,21 +191,35 @@ public class ShardGroupDeletionImpl implements ShardGroupDeletion { return DeleteResult.NO_OP; } + if(shard.isDeleted()){ + if(logger.isTraceEnabled()){ + logger.trace("Shard {} already deleted. Short circuiting.", shard); + } + return DeleteResult.NO_OP; + } + + shard.setDeleted(true); + + final MutationBatch setShardDeletedFlagMutation = + edgeShardSerialization.writeShardMeta(applicationScope, shard, directedEdgeMeta ); + + /* Previously the below was used for actually deleting the shard vs.a marking strategy with read filtering - final MutationBatch shardRemovalMutation = - edgeShardSerialization.removeShardMeta( applicationScope, shard, directedEdgeMeta ); + final MutationBatch shardRemovalMutation = + edgeShardSerialization.removeShardMeta( applicationScope, shard, directedEdgeMeta ); + */ if ( rollup == null ) { - rollup = shardRemovalMutation; + rollup = setShardDeletedFlagMutation; } else { - rollup.mergeShallow( shardRemovalMutation ); + rollup.mergeShallow( setShardDeletedFlagMutation ); } result = DeleteResult.DELETED; - logger.info( "Removing shard {} in group {}", shard, shardEntryGroup ); + logger.info( "{} - Marking shard {} as deleted in group {}", Thread.currentThread().getName()shard, shardEntryGroup ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/42c5c4b8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java index 65a6f40..55eb172 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java @@ -43,15 +43,7 @@ import org.apache.usergrid.persistence.graph.SearchByEdge; import org.apache.usergrid.persistence.graph.SearchByEdgeType; import org.apache.usergrid.persistence.graph.SearchByIdType; import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge; -import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdge; -import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta; -import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies; -import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeRowKey; -import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardStrategy; -import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKey; -import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKeyType; -import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard; -import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization; +import org.apache.usergrid.persistence.graph.serialization.impl.shard.*; import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators.DescendingTimestampComparator; import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators.OrderedComparator; import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators @@ -88,12 +80,15 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { protected final GraphFig graphFig; protected final EdgeShardStrategy writeEdgeShardStrategy; protected final TimeService timeService; + protected final EdgeShardSerialization edgeShardSerialization; + @Inject public ShardedEdgeSerializationImpl( final Keyspace keyspace, final CassandraConfig cassandraConfig, final GraphFig graphFig, final EdgeShardStrategy writeEdgeShardStrategy, - final TimeService timeService ) { + final TimeService timeService, + final EdgeShardSerialization edgeShardSerialization ) { checkNotNull( "keyspace required", keyspace ); @@ -101,6 +96,8 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { checkNotNull( "consistencyFig required", graphFig ); checkNotNull( "writeEdgeShardStrategy required", writeEdgeShardStrategy ); checkNotNull( "timeService required", timeService ); + checkNotNull( "edgeShardSerialization required", edgeShardSerialization ); + this.keyspace = keyspace; @@ -108,6 +105,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { this.graphFig = graphFig; this.writeEdgeShardStrategy = writeEdgeShardStrategy; this.timeService = timeService; + this.edgeShardSerialization = edgeShardSerialization; } @@ -119,7 +117,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { GraphValidation.validateEdge( markedEdge ); ValidationUtils.verifyTimeUuid( timestamp, "timestamp" ); - return new SourceWriteOp( columnFamilies, markedEdge ) { + return new SourceWriteOp( columnFamilies, markedEdge, directedEdgeMeta ) { @Override void writeEdge( final MutationBatch batch, @@ -144,7 +142,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { ValidationUtils.verifyTimeUuid( timestamp, "timestamp" ); - return new SourceTargetTypeWriteOp( columnFamilies, markedEdge ) { + return new SourceTargetTypeWriteOp( columnFamilies, markedEdge, directedEdgeMeta ) { @Override void writeEdge( final MutationBatch batch, @@ -168,7 +166,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { ValidationUtils.verifyTimeUuid( timestamp, "timestamp" ); - return new TargetWriteOp( columnFamilies, markedEdge ) { + return new TargetWriteOp( columnFamilies, markedEdge, targetEdgeMeta ) { @Override void writeEdge( final MutationBatch batch, @@ -194,7 +192,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { ValidationUtils.verifyTimeUuid( timestamp, "timestamp" ); - return new TargetSourceTypeWriteOp( columnFamilies, markedEdge ) { + return new TargetSourceTypeWriteOp( columnFamilies, markedEdge, directedEdgeMeta ) { @Override void writeEdge( final MutationBatch batch, @@ -219,7 +217,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { ValidationUtils.verifyTimeUuid( timestamp, "timestamp" ); - return new EdgeVersions( columnFamilies, markedEdge ) { + return new EdgeVersions( columnFamilies, markedEdge, directedEdgeMeta ) { @Override void writeEdge( final MutationBatch batch, @@ -238,7 +236,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { final MarkedEdge markedEdge, final Collection<Shard> shards, final DirectedEdgeMeta directedEdgeMeta, final UUID timestamp ) { - return new SourceWriteOp( columnFamilies, markedEdge ) { + return new SourceWriteOp( columnFamilies, markedEdge, directedEdgeMeta ) { @Override void writeEdge( final MutationBatch batch, @@ -258,7 +256,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { final Collection<Shard> shards, final DirectedEdgeMeta directedEdgeMeta, final UUID timestamp ) { - return new SourceTargetTypeWriteOp( columnFamilies, markedEdge ) { + return new SourceTargetTypeWriteOp( columnFamilies, markedEdge, directedEdgeMeta ) { @Override void writeEdge( final MutationBatch batch, @@ -279,7 +277,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { final MarkedEdge markedEdge, final Collection<Shard> shards, final DirectedEdgeMeta directedEdgeMeta, final UUID timestamp ) { - return new TargetWriteOp( columnFamilies, markedEdge ) { + return new TargetWriteOp( columnFamilies, markedEdge, directedEdgeMeta ) { @Override void writeEdge( final MutationBatch batch, @@ -300,7 +298,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { final DirectedEdgeMeta directedEdgeMeta, final UUID timestamp ) { - return new TargetSourceTypeWriteOp( columnFamilies, markedEdge ) { + return new TargetSourceTypeWriteOp( columnFamilies, markedEdge, directedEdgeMeta ) { @Override void writeEdge( final MutationBatch batch, @@ -320,7 +318,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { final MarkedEdge markedEdge, final Collection<Shard> shards, final DirectedEdgeMeta directedEdgeMeta, final UUID timestamp ) { - return new EdgeVersions( columnFamilies, markedEdge ) { + return new EdgeVersions( columnFamilies, markedEdge, directedEdgeMeta ) { @Override void writeEdge( final MutationBatch batch, @@ -696,6 +694,11 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { */ protected abstract boolean isDeleted(); + /** + * Get the directed edge meta for the op + */ + protected abstract DirectedEdgeMeta getDirectedEdgeMeta(); + /** * Write the edge with the given data @@ -725,6 +728,16 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { for ( Shard shard : shards ) { final R rowKey = getRowKey( shard ); writeEdge( batch, columnFamily, scope, rowKey, column, shard, isDeleted ); + + // if an edge is being written to this shard, un-delete it in case it was previously marked + // don't un-delete if the edge write is to actually remove an edge + // Usergrid allows entities to be written with a UUID generated from the past (time) + if(shard.isDeleted() && !isDeleted) { + logger.info("Shard is deleted. Un-deleting as new data is being written to the shard - {}", shard); + shard.setDeleted(false); + batch.mergeShallow(edgeShardSerialization.writeShardMeta(scope, shard, getDirectedEdgeMeta())); + } + } @@ -745,12 +758,14 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { private final String type; private final boolean isDeleted; private final DirectedEdge directedEdge; + private final DirectedEdgeMeta directedEdgeMeta; /** * Write the source write operation */ - private SourceWriteOp( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge ) { + private SourceWriteOp( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge, + final DirectedEdgeMeta directedEdgeMeta ) { this.columnFamily = edgeColumnFamilies.getSourceNodeCfName(); this.sourceNodeId = markedEdge.getSourceNode(); @@ -759,6 +774,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { this.isDeleted = markedEdge.isDeleted(); this.directedEdge = new DirectedEdge( markedEdge.getTargetNode(), markedEdge.getTimestamp() ); + this.directedEdgeMeta = directedEdgeMeta; } @@ -784,6 +800,11 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { protected boolean isDeleted() { return isDeleted; } + + @Override + protected DirectedEdgeMeta getDirectedEdgeMeta() { + return directedEdgeMeta; + } } @@ -798,12 +819,14 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { private Id targetId; private final boolean isDeleted; private final DirectedEdge directedEdge; + private final DirectedEdgeMeta directedEdgeMeta; /** * Write the source write operation */ - private SourceTargetTypeWriteOp( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge ) { + private SourceTargetTypeWriteOp( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge, + final DirectedEdgeMeta directedEdgeMeta ) { this.columnFamily = edgeColumnFamilies.getSourceNodeTargetTypeCfName(); this.sourceNodeId = markedEdge.getSourceNode(); @@ -813,6 +836,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { this.isDeleted = markedEdge.isDeleted(); this.directedEdge = new DirectedEdge( targetId, markedEdge.getTimestamp() ); + this.directedEdgeMeta = directedEdgeMeta; } @@ -838,6 +862,11 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { protected boolean isDeleted() { return isDeleted; } + + @Override + protected DirectedEdgeMeta getDirectedEdgeMeta() { + return directedEdgeMeta; + } } @@ -853,12 +882,14 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { private final String type; private final boolean isDeleted; private final DirectedEdge directedEdge; + private final DirectedEdgeMeta directedEdgeMeta; /** * Write the source write operation */ - private TargetWriteOp( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge ) { + private TargetWriteOp( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge, + final DirectedEdgeMeta directedEdgeMeta ) { this.columnFamily = edgeColumnFamilies.getTargetNodeCfName(); this.targetNode = markedEdge.getTargetNode(); @@ -867,6 +898,8 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { this.isDeleted = markedEdge.isDeleted(); this.directedEdge = new DirectedEdge( markedEdge.getSourceNode(), markedEdge.getTimestamp() ); + + this.directedEdgeMeta = directedEdgeMeta; } @@ -892,6 +925,11 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { protected boolean isDeleted() { return isDeleted; } + + @Override + protected DirectedEdgeMeta getDirectedEdgeMeta() { + return directedEdgeMeta; + } } @@ -909,12 +947,14 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { final boolean isDeleted; final DirectedEdge directedEdge; + final DirectedEdgeMeta directedEdgeMeta; /** * Write the source write operation */ - private TargetSourceTypeWriteOp( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge ) { + private TargetSourceTypeWriteOp( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge, + final DirectedEdgeMeta directedEdgeMeta ) { this.columnFamily = edgeColumnFamilies.getSourceNodeTargetTypeCfName(); this.targetNode = markedEdge.getTargetNode(); @@ -924,6 +964,8 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { this.isDeleted = markedEdge.isDeleted(); this.directedEdge = new DirectedEdge( sourceNode, markedEdge.getTimestamp() ); + + this.directedEdgeMeta = directedEdgeMeta; } @@ -949,6 +991,11 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { protected boolean isDeleted() { return isDeleted; } + + @Override + protected DirectedEdgeMeta getDirectedEdgeMeta() { + return directedEdgeMeta; + } } @@ -967,11 +1014,14 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { final boolean isDeleted; final Long edgeVersion; + final DirectedEdgeMeta directedEdgeMeta; + /** * Write the source write operation */ - private EdgeVersions( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge ) { + private EdgeVersions( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge, + final DirectedEdgeMeta directedEdgeMeta ) { this.columnFamily = edgeColumnFamilies.getGraphEdgeVersions(); this.targetNode = markedEdge.getTargetNode(); @@ -981,6 +1031,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { this.isDeleted = markedEdge.isDeleted(); this.edgeVersion = markedEdge.getTimestamp(); + this.directedEdgeMeta = directedEdgeMeta; } @@ -1006,6 +1057,11 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { protected boolean isDeleted() { return isDeleted; } + + @Override + protected DirectedEdgeMeta getDirectedEdgeMeta() { + return directedEdgeMeta; + } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/42c5c4b8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/ShardSerializer.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/ShardSerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/ShardSerializer.java index 8ab6288..b47f7cc 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/ShardSerializer.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/ShardSerializer.java @@ -58,6 +58,7 @@ public class ShardSerializer extends AbstractSerializer<Shard> { } composite.addComponent( shard.isCompacted(), BOOLEAN_SERIALIZER); + composite.addComponent( shard.isDeleted(), BOOLEAN_SERIALIZER); return composite.serialize(); } @@ -67,7 +68,7 @@ public class ShardSerializer extends AbstractSerializer<Shard> { public Shard fromByteBuffer( final ByteBuffer byteBuffer ) { DynamicComposite composite = DynamicComposite.fromByteBuffer( byteBuffer ); - Preconditions.checkArgument( composite.size() == 5, "Composite should 5 elements" ); + Preconditions.checkArgument( composite.size() == 5 || composite.size() == 6, "Composite should 5 elements" ); final byte version = composite.get(0, BYTE_SERIALIZER); @@ -76,9 +77,15 @@ public class ShardSerializer extends AbstractSerializer<Shard> { final DirectedEdge shardEnd = composite.get( 3, EDGE_SERIALIZER); final boolean isCompacted = composite.get( 4, BOOLEAN_SERIALIZER); - final Shard shard = new Shard(shardIndex, shardCreated, isCompacted); shard.setShardEnd(Optional.fromNullable(shardEnd)); + + if( composite.size() == 6){ + final boolean isDeleted = composite.get( 5, BOOLEAN_SERIALIZER); + shard.setDeleted(isDeleted); + } + + return shard; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/42c5c4b8/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java index 0d6a27e..6e6abd8 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java @@ -22,12 +22,7 @@ package org.apache.usergrid.persistence.graph; import java.io.ByteArrayOutputStream; import java.io.PrintWriter; import java.io.UnsupportedEncodingException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Set; -import java.util.UUID; +import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; @@ -136,7 +131,7 @@ public class GraphManagerShardConsistencyIT { // get the system property of the UUID to use. If one is not set, use the defualt String uuidString = System.getProperty( "org.id", "80a42760-b699-11e3-a5e2-0800200c9a66" ); - scope = new ApplicationScopeImpl( IdGenerator.createId( UUID.fromString( uuidString ), "test" ) ); + scope = new ApplicationScopeImpl( IdGenerator.createId(UUIDGenerator.newTimeUUID(), "test" ) ); reporter = @@ -149,19 +144,24 @@ public class GraphManagerShardConsistencyIT { @After - public void tearDown() { + public void tearDown() throws Exception { reporter.stop(); reporter.report(); - if(writeExecutor != null){ + if(writeExecutor != null && !writeExecutor.isShutdown()){ writeExecutor.shutdownNow(); + writeExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS); } - if(deleteExecutor != null){ + if(deleteExecutor != null && !deleteExecutor.isShutdown()){ deleteExecutor.shutdownNow(); + deleteExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS); + } + Thread.sleep(3000); + } @@ -251,7 +251,7 @@ public class GraphManagerShardConsistencyIT { for ( int i = 0; i < numWorkersPerInjector; i++ ) { Future<Boolean> future = - writeExecutor.submit( new Worker( gmf, generator, workerWriteLimit, minExecutionTime, writeCounter ) ); + writeExecutor.submit( new Worker( gmf, generator, workerWriteLimit, minExecutionTime, writeCounter) ); futures.add( future ); } @@ -299,7 +299,9 @@ public class GraphManagerShardConsistencyIT { @Override public void onSuccess( @Nullable final Long result ) { logger.info( "Successfully ran the read, re-running" ); - writeExecutor.submit( new ReadWorker( gmf, generator, writeCount, readMeter ) ); + if( !writeExecutor.isShutdown() ) { + writeExecutor.submit(new ReadWorker(gmf, generator, writeCount, readMeter)); + } } @@ -383,11 +385,12 @@ public class GraphManagerShardConsistencyIT { } - //now continue reading everything for 30 seconds + //now continue reading everything for 30 seconds to make sure things are OK Thread.sleep(30000); - writeExecutor.shutdownNow(); + //writeExecutor.shutdownNow(); + //writeExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS); } @@ -494,7 +497,7 @@ public class GraphManagerShardConsistencyIT { for ( int i = 0; i < numWorkersPerInjector; i++ ) { Future<Boolean> future = - deleteExecutor.submit( new Worker( gmf, generator, workerWriteLimit, minExecutionTime, writeCounter ) ); + deleteExecutor.submit( new Worker( gmf, generator, workerWriteLimit, minExecutionTime, writeCounter) ); futures.add( future ); } @@ -563,14 +566,18 @@ public class GraphManagerShardConsistencyIT { .filter(markedEdge -> { // if it's already been marked let's filter, move on as async deleteEdge() - logger.trace("edge already marked, may indicated a problem with gm.deleteEdge(): {}", markedEdge); - return !markedEdge.isDeleted(); + if(markedEdge.isDeleted()) { + logger.info("Edge already marked, but gm.deleteEdge() is async, Edge: {}", markedEdge); + } + //return !markedEdge.isDeleted(); + return true; }) .flatMap( edge -> manager.markEdge( edge )) .flatMap( edge -> manager.deleteEdge( edge ) ).countLong().toBlocking().last(); totalDeleted += count; - Thread.sleep( 500 ); + logger.info("Sleeping 250ms second because deleteEdge() is async."); + Thread.sleep( 250 ); } @@ -591,7 +598,9 @@ public class GraphManagerShardConsistencyIT { @Override public void onSuccess( @Nullable final Long result ) { logger.info( "Successfully ran the read, re-running" ); - deleteExecutor.submit( new ReadWorker( gmf, generator, 0, readMeter ) ); + if( !deleteExecutor.isShutdown() ) { + deleteExecutor.submit(new ReadWorker(gmf, generator, 0, readMeter)); + } } @@ -641,7 +650,16 @@ public class GraphManagerShardConsistencyIT { logger.info( "Shard size for group is {}", group.getReadShards() ); - shardCount += group.getReadShards().size(); + Collection<Shard> shards = group.getReadShards(); + + for(Shard shard: shards){ + + if(!shard.isDeleted()){ + shardCount++; + } + } + + //shardCount += group.getReadShards().size(); } @@ -657,12 +675,337 @@ public class GraphManagerShardConsistencyIT { } //now that we have finished deleting and shards are removed, shutdown - deleteExecutor.shutdownNow(); + //deleteExecutor.shutdownNow(); + //deleteExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS); - Thread.sleep( 3000 ); // sleep before the next test } + + @Test(timeout=300000) // this test is SLOW as deletes are intensive and shard cleanup is async + @Category(StressTest.class) + public void writeThousandsDeleteWriteAgain() + throws InterruptedException, ExecutionException, MigrationException, UnsupportedEncodingException { + + ConfigurationManager.getConfigInstance().setProperty( GraphFig.SHARD_SIZE, 2000 ); + + + final Id sourceId = IdGenerator.createId( "sourceDeleteRewrite" ); + final String deleteEdgeType = "testDeleteRewrite"; + + final List<Edge> edges = new ArrayList<>(); + + final EdgeGenerator generator = new EdgeGenerator() { + + + @Override + public Edge newEdge() { + Edge edge = createEdge( sourceId, deleteEdgeType, IdGenerator.createId( "targetDeleteRewrite" ) ); + + edges.add(edge); + + return edge; + } + + + @Override + public Observable<MarkedEdge> doSearch( final GraphManager manager ) { + return manager.loadEdgesFromSource( + new SimpleSearchByEdgeType( sourceId, deleteEdgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, + Optional.<Edge>absent(), false ) ); + } + }; + + + final int numInjectors = 3; + + /** + * create injectors. This way all the caches are independent of one another. This is the same as + * multiple nodes if there are multiple injectors + */ + final List<Injector> injectors = createInjectors( numInjectors ); + + + final GraphFig graphFig = getInstance( injectors, GraphFig.class ); + + final long shardSize = graphFig.getShardSize(); + + + //we don't want to starve the cass runtime since it will be on the same box. Only take 50% of processing + // power for writes + final int numProcessors = Runtime.getRuntime().availableProcessors() / 2; + + final int numWorkersPerInjector = numProcessors / numInjectors; + + final long numberOfEdges = shardSize * TARGET_NUM_SHARDS; + + + final long workerWriteLimit = numberOfEdges / numWorkersPerInjector / numInjectors; + + createDeleteExecutor( numWorkersPerInjector ); + + + final AtomicLong writeCounter = new AtomicLong(); + + + //min stop time the min delta + 1 cache cycle timeout + final long minExecutionTime = graphFig.getShardMinDelta() + graphFig.getShardCacheTimeout(); + + + logger.info( "Writing {} edges per worker on {} workers in {} injectors", workerWriteLimit, numWorkersPerInjector, + numInjectors ); + + + final List<Future<Boolean>> futures = new ArrayList<>(); + + + for ( Injector injector : injectors ) { + final GraphManagerFactory gmf = injector.getInstance( GraphManagerFactory.class ); + + + for ( int i = 0; i < numWorkersPerInjector; i++ ) { + Future<Boolean> future = + deleteExecutor.submit( new Worker( gmf, generator, workerWriteLimit, minExecutionTime, writeCounter) ); + + futures.add( future ); + } + } + + /** + * Wait for all writes to complete + */ + for ( Future<Boolean> future : futures ) { + future.get(); + } + + // now get all our shards + final NodeShardCache cache = getInstance( injectors, NodeShardCache.class ); + + final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( sourceId, deleteEdgeType ); + + final GraphManagerFactory gmf = getInstance( injectors, GraphManagerFactory.class ); + + + final long writeCount = writeCounter.get(); + final Meter readMeter = registry.meter( "readThroughput-deleteTestRewrite" ); + + + //check our shard state + + + final Iterator<ShardEntryGroup> existingShardGroups = + cache.getReadShardGroup( scope, Long.MAX_VALUE, directedEdgeMeta ); + int shardCount = 0; + + while ( existingShardGroups.hasNext() ) { + final ShardEntryGroup group = existingShardGroups.next(); + + shardCount++; + + logger.info( "Compaction pending status for group {} is {}", group, group.isCompactionPending() ); + } + + + logger.info( "Found {} shard groups", shardCount ); + + + //now mark and delete all the edges + + + final GraphManager manager = gmf.createEdgeManager( scope ); + + //sleep occasionally to stop pushing cassandra over + + long count = Long.MAX_VALUE; + + Thread.sleep(3000); // let's make sure everything is written + + long totalDeleted = 0; + + // now do the deletes + while(count != 0) { + + logger.info("total deleted: {}", totalDeleted); + if(count != Long.MAX_VALUE) { // count starts with Long.MAX + logger.info("deleted {} entities, continuing until count is 0", count); + } + //take 1000 then sleep + count = generator.doSearch( manager ).take( 1000 ) + .filter(markedEdge -> { + + // if it's already been marked let's filter, move on as async deleteEdge() + if(markedEdge.isDeleted()){ + logger.info("Edge already marked, gm.deleteEdge() is Async, Edge: {}", markedEdge); + } + //return !markedEdge.isDeleted(); + return true; + }) + .flatMap( edge -> manager.markEdge( edge )) + .flatMap( edge -> manager.deleteEdge( edge ) ).countLong().toBlocking().last(); + + totalDeleted += count; + logger.info("Sleeping 250ms second because deleteEdge() is an async process"); + Thread.sleep( 250 ); + } + + + logger.info("Sleeping before starting the read"); + Thread.sleep(6000); // let the edge readers start + + // loop with a reader until our shards are gone + + + GraphManager gm = gmf.createEdgeManager( scope ); + + + + + //do a read to eventually trigger our group compaction. Take 2 pages of columns + long returnedEdgeCount = generator.doSearch( gm ) + + .doOnNext( edge -> readMeter.mark() ) + + .countLong().toBlocking().last(); + + logger.info( "Completed reading {} edges", returnedEdgeCount ); + + if ( 0 != returnedEdgeCount ) { + + //logger.warn( "Unexpected edge count returned!!! Expected {} but was {}", 0, + // returnedEdgeCount ); + + fail("Unexpected edge count returned!!! Expected 0 but was "+ returnedEdgeCount ); + } + + logger.info("Got expected read count of 0"); + + + //we have to get it from the cache, because this will trigger the compaction process + final Iterator<ShardEntryGroup> groups = cache.getReadShardGroup( scope, Long.MAX_VALUE, directedEdgeMeta ); + + ShardEntryGroup group = null; + + while ( groups.hasNext() ) { + + group = groups.next(); + logger.info( "Shard size for group is {}", group.getReadShards() ); + Collection<Shard> shards = group.getReadShards(); + + for(Shard shard: shards){ + if(!shard.isDeleted()){ + shardCount++; + } + } + } + + + // we're done, 1 shard remains, we have a group, and it's our default shard + if ( shardCount == 1 && group.getMinShard().getShardIndex() == Shard.MIN_SHARD.getShardIndex() ) { + logger.info( "All compactions complete," ); + + } + + + Thread.sleep( 2000 ); + + + logger.info( "Re-Writing same edges", workerWriteLimit, numWorkersPerInjector, + numInjectors ); + + + int edgeCount = 0; + if ( edges.size() > 0) { + + for (Edge edge : edges) { + + Edge returned = gm.writeEdge(edge).toBlocking().last(); + + assertNotNull("Returned has a version", returned.getTimestamp()); + + edgeCount++; + + writeMeter.mark(); + + writeCounter.incrementAndGet(); + + + if (edgeCount % 100 == 0) { + logger.info("wrote: " + edgeCount); + } + + + } + logger.info("Re-wrote total: {}", edgeCount); + } + + int retries = 2; + while ( retries > 0 ) { + + + //do a read to eventually trigger our group compaction. Take 2 pages of columns + returnedEdgeCount = generator.doSearch( gm ) + + .doOnNext( edge -> readMeter.mark() ) + + .countLong().toBlocking().last(); + + logger.info( "Completed reading {} edges", returnedEdgeCount ); + + if ( edgeCount != returnedEdgeCount ) { + logger.warn( "Unexpected edge count returned!!! Expected {} but was {}", edgeCount, + returnedEdgeCount ); + } + + retries--; + + if( returnedEdgeCount == edgeCount ){ + logger.info("Got expected read count of {}", edgeCount); + break; + + } + } + + //we have to get it from the cache, because this will trigger the compaction process + final Iterator<ShardEntryGroup> finalgroups = cache.getReadShardGroup( scope, Long.MAX_VALUE, directedEdgeMeta ); + + ShardEntryGroup finalgroup = null; + + // reset the shard count + shardCount = 0; + while ( finalgroups.hasNext() ) { + + finalgroup = finalgroups.next(); + + logger.info( "Shard size for group is {}", finalgroup.getReadShards() ); + + Collection<Shard> shards = finalgroup.getReadShards(); + + for(Shard shard: shards){ + + if(!shard.isDeleted()){ + shardCount++; + } + } + + } + + + // we're done, 1 shard remains, we have a group, and it's our default shard + if ( shardCount > 1 ) { + logger.info( "All done. Final shard count: {}", shardCount ); + assertEquals(1,1); + + }else{ + fail("There should be more than 1 shard"); + } + + //deleteExecutor.shutdownNow(); + //deleteExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS); + + } + + + private class Worker implements Callable<Boolean> { private final GraphManagerFactory factory; private final EdgeGenerator generator; @@ -671,8 +1014,8 @@ public class GraphManagerShardConsistencyIT { private final AtomicLong writeCounter; - private Worker( final GraphManagerFactory factory, final EdgeGenerator generator, final long writeLimit, - final long minExecutionTime, final AtomicLong writeCounter ) { + private Worker(final GraphManagerFactory factory, final EdgeGenerator generator, final long writeLimit, + final long minExecutionTime, final AtomicLong writeCounter) { this.factory = factory; this.generator = generator; this.writeLimit = writeLimit; @@ -699,7 +1042,6 @@ public class GraphManagerShardConsistencyIT { assertNotNull( "Returned has a version", returned.getTimestamp() ); - writeMeter.mark(); writeCounter.incrementAndGet(); @@ -738,7 +1080,7 @@ public class GraphManagerShardConsistencyIT { GraphManager gm = factory.createEdgeManager( scope ); - while ( true ) { + while ( !Thread.currentThread().isInterrupted() ) { //do a read to eventually trigger our group compaction. Take 2 pages of columns @@ -757,6 +1099,8 @@ public class GraphManagerShardConsistencyIT { assertEquals( "Expected to read same edge count", writeCount, returnedEdgeCount ); } + + return 0L; } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/42c5c4b8/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java index 00406c0..8a9dcfe 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java @@ -564,7 +564,7 @@ public class NodeShardAllocationTest { final Iterator<ShardEntryGroup> result = - approximation.getShards( scope, Optional.<Shard>absent(), directedEdgeMeta ); + approximation.getShards( scope, directedEdgeMeta ); assertTrue( "Shards present", result.hasNext() ); @@ -663,7 +663,7 @@ public class NodeShardAllocationTest { final Iterator<ShardEntryGroup> result = - approximation.getShards( scope, Optional.<Shard>absent(), directedEdgeMeta ); + approximation.getShards( scope, directedEdgeMeta ); ShardEntryGroup shardEntryGroup = result.next(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/42c5c4b8/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCacheTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCacheTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCacheTest.java index 3b717a4..ecf0091 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCacheTest.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCacheTest.java @@ -102,7 +102,7 @@ public class NodeShardCacheTest { /** * Simulate returning no shards at all. */ - when( allocation.getShards( same( scope ), same( max ), same( directedEdgeMeta ) ) ) + when( allocation.getShards( same( scope ), same( directedEdgeMeta ) ) ) //use "thenAnswer" so we always return the value, even if it's invoked more than 1 time. .thenAnswer( new Answer<Iterator<ShardEntryGroup>>() { @@ -191,7 +191,7 @@ public class NodeShardCacheTest { /** * Simulate returning no shards at all. */ - when( allocation.getShards( same( scope ), any( Optional.class ), same( directedEdgeMeta ) ) ) + when( allocation.getShards( same( scope ), same( directedEdgeMeta ) ) ) //use "thenAnswer" so we always return the value, even if it's invoked more than 1 time. .thenAnswer( new Answer<Iterator<ShardEntryGroup>>() {
