Fixes issue that caused shards to get removed prematurely.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/9bc22410 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/9bc22410 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/9bc22410 Branch: refs/heads/master Commit: 9bc22410dd785482f715faec4cbfeae0e712a502 Parents: 784fe51 Author: Michael Russo <[email protected]> Authored: Wed Nov 18 15:48:07 2015 -0800 Committer: Michael Russo <[email protected]> Committed: Wed Nov 18 15:48:07 2015 -0800 ---------------------------------------------------------------------- .../impl/EdgeSerializationImpl.java | 61 ++++++++++++++++++-- .../shard/impl/EdgeShardSerializationImpl.java | 3 +- .../shard/impl/ShardGroupColumnIterator.java | 4 +- .../shard/impl/ShardGroupCompactionImpl.java | 2 +- .../impl/shard/EdgeShardSerializationTest.java | 44 ++++++++++++++ 5 files changed, 106 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/9bc22410/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java index 9e25946..0f4d722 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java @@ -26,15 +26,15 @@ import java.util.UUID; import javax.inject.Inject; +import com.google.common.base.Optional; import org.apache.usergrid.persistence.core.astyanax.CassandraConfig; import org.apache.usergrid.persistence.core.consistency.TimeService; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.core.util.ValidationUtils; -import org.apache.usergrid.persistence.graph.GraphFig; -import org.apache.usergrid.persistence.graph.MarkedEdge; -import org.apache.usergrid.persistence.graph.SearchByEdge; -import org.apache.usergrid.persistence.graph.SearchByEdgeType; -import org.apache.usergrid.persistence.graph.SearchByIdType; +import org.apache.usergrid.persistence.graph.*; +import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge; +import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType; +import org.apache.usergrid.persistence.graph.impl.SimpleSearchByIdType; import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization; import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta; import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies; @@ -303,6 +303,15 @@ public class EdgeSerializationImpl implements EdgeSerialization { protected Iterator<MarkedEdge> getIterator( final Collection<Shard> readShards ) { return shardedEdgeSerialization.getEdgeVersions( edgeColumnFamilies, scope, search, readShards ); } + + @Override + protected Iterator<MarkedEdge> getIteratorFullRange( final Collection<Shard> readShards ) { + + final SearchByEdge searchFullRange = new SimpleSearchByEdge( + search.sourceNode(), search.getType(),search.targetNode(), Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.absent()); + + return shardedEdgeSerialization.getEdgeVersions( edgeColumnFamilies, scope, searchFullRange, readShards ); + } }; } @@ -329,6 +338,15 @@ public class EdgeSerializationImpl implements EdgeSerialization { protected Iterator<MarkedEdge> getIterator( final Collection<Shard> readShards ) { return shardedEdgeSerialization.getEdgesFromSource( edgeColumnFamilies, scope, edgeType, readShards ); } + + @Override + protected Iterator<MarkedEdge> getIteratorFullRange( final Collection<Shard> readShards ) { + + final SearchByEdgeType searchFullRange = new SimpleSearchByEdgeType( + edgeType.getNode(), edgeType.getType(), Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.absent(), false ); + + return shardedEdgeSerialization.getEdgesFromSource( edgeColumnFamilies, scope, searchFullRange, readShards ); + } }; } @@ -357,6 +375,17 @@ public class EdgeSerializationImpl implements EdgeSerialization { return shardedEdgeSerialization .getEdgesFromSourceByTargetType( edgeColumnFamilies, scope, edgeType, readShards ); } + + @Override + protected Iterator<MarkedEdge> getIteratorFullRange (final Collection<Shard> readShards) { + + final SearchByIdType edgeTypeFullRange = new SimpleSearchByIdType( + edgeType.getNode(), edgeType.getType(), Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, + edgeType.getIdType(), Optional.absent(), false ); + + return shardedEdgeSerialization + .getEdgesFromSourceByTargetType( edgeColumnFamilies, scope, edgeTypeFullRange, readShards); + } }; } @@ -382,6 +411,17 @@ public class EdgeSerializationImpl implements EdgeSerialization { protected Iterator<MarkedEdge> getIterator( final Collection<Shard> readShards ) { return shardedEdgeSerialization.getEdgesToTarget( edgeColumnFamilies, scope, edgeType, readShards ); } + + @Override + protected Iterator<MarkedEdge> getIteratorFullRange( final Collection<Shard> readShards ) { + + final SearchByEdgeType edgeTypeFullRange = new SimpleSearchByEdgeType( + edgeType.getNode(), edgeType.getType(), Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, + Optional.absent(), false ); + + return shardedEdgeSerialization.getEdgesToTarget( edgeColumnFamilies, scope, edgeTypeFullRange, readShards ); + } + }; } @@ -411,6 +451,17 @@ public class EdgeSerializationImpl implements EdgeSerialization { return shardedEdgeSerialization .getEdgesToTargetBySourceType( edgeColumnFamilies, scope, edgeType, readShards ); } + + @Override + protected Iterator<MarkedEdge> getIteratorFullRange( final Collection<Shard> readShards ) { + + final SearchByIdType edgeTypeFullRange = new SimpleSearchByIdType( + edgeType.getNode(), edgeType.getType(), Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, + edgeType.getIdType(), Optional.absent(), false ); + + return shardedEdgeSerialization.getEdgesToTargetBySourceType( edgeColumnFamilies, scope, edgeTypeFullRange, readShards); + + } }; } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/9bc22410/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java index f107307..2f83d6f 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java @@ -156,7 +156,8 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization { final MutationBatch batch = keyspace.prepareMutationBatch(); - batch.withRow( EDGE_SHARDS, rowKey ).deleteColumn( shard.getShardIndex() ); + batch.withTimestamp(shard.getCreatedTime()).withRow( EDGE_SHARDS, rowKey ) + .deleteColumn( shard.getShardIndex() ); return batch; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/9bc22410/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java index 72b617f..9604e63 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java @@ -117,6 +117,8 @@ public abstract class ShardGroupColumnIterator implements Iterator<MarkedEdge> { */ protected abstract Iterator<MarkedEdge> getIterator( Collection<Shard> readShards ); + protected abstract Iterator<MarkedEdge> getIteratorFullRange( Collection<Shard> readShards ); + public boolean advance() { @@ -141,7 +143,7 @@ public abstract class ShardGroupColumnIterator implements Iterator<MarkedEdge> { logger.trace( "Our shard is empty, we need to perform an audit on shard group {}", group ); //fire and forget if we miss it here, we'll get it next read - shardGroupDeletion.maybeDeleteShard(this.applicationScope, this.directedEdgeMeta, group, getIterator( group.getReadShards() ) ); + shardGroupDeletion.maybeDeleteShard(this.applicationScope, this.directedEdgeMeta, group, getIteratorFullRange( group.getReadShards() ) ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/9bc22410/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java index 7dd0521..e663d5a 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java @@ -150,7 +150,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction { .checkArgument( group.shouldCompact( startTime ), "Compaction cannot be run yet. Ignoring compaction." ); if(LOG.isDebugEnabled()) { - LOG.debug("Compacting shard group. count is {} ", countAudits.get()); + LOG.debug("Compacting shard group. Audit count is {} ", countAudits.get()); } final CompactionResult.CompactionBuilder resultBuilder = CompactionResult.builder(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/9bc22410/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java index 2641ed7..1f8bfa9 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java @@ -40,6 +40,8 @@ import com.google.common.base.Optional; import com.google.inject.Inject; import com.netflix.astyanax.MutationBatch; import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.apache.usergrid.persistence.core.util.IdGenerator.createId; import static org.junit.Assert.assertEquals; @@ -52,6 +54,8 @@ import static org.mockito.Mockito.when; @UseModules({ TestGraphModule.class }) public class EdgeShardSerializationTest { + private static final Logger log = LoggerFactory.getLogger( EdgeShardSerializationTest.class ); + @Inject @Rule @@ -203,4 +207,44 @@ public class EdgeShardSerializationTest { assertFalse( results.hasNext() ); } + + @Test + public void sameShardIndexRemoval() throws ConnectionException { + + final Id now = IdGenerator.createId( "test" ); + + final long timestamp = 2000L; + + final Shard shard1 = new Shard( 1000L, timestamp, false ); + final Shard shard2 = new Shard( shard1.getShardIndex(), timestamp * 2, true ); + + + final DirectedEdgeMeta sourceEdgeMeta = DirectedEdgeMeta.fromSourceNodeTargetType( now, "edgeType", "subType" ); + MutationBatch batch = edgeShardSerialization.writeShardMeta( scope, shard1, sourceEdgeMeta ); + batch.mergeShallow( edgeShardSerialization.writeShardMeta( scope, shard2, sourceEdgeMeta ) ); + batch.execute(); + + + Iterator<Shard> results = + edgeShardSerialization.getShardMetaData( scope, Optional.<Shard>absent(), sourceEdgeMeta ); + + // Latest timestamp comes first + assertEquals( shard2, results.next() ); + + // This should now not remove anything + edgeShardSerialization.removeShardMeta( scope, shard1, sourceEdgeMeta ).execute(); + + + // Get iterator again + results = edgeShardSerialization.getShardMetaData( scope, Optional.<Shard>absent(), sourceEdgeMeta ); + + // We should still have shard3 stored + assertEquals( shard2, results.next() ); + + + + } + + + }
