Repository: cassandra Updated Branches: refs/heads/trunk 28b838ed6 -> d6a701ea1
Incremental repair not streaming correct sstables Patch by Blake Eggleston; reviewed by Marcus Eriksson for CASSANDRA-13328 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d6a701ea Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d6a701ea Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d6a701ea Branch: refs/heads/trunk Commit: d6a701ea11c919938cb09b0fca2ea0ec7ad2123b Parents: 28b838e Author: Blake Eggleston <[email protected]> Authored: Tue Mar 14 15:37:57 2017 -0700 Committer: Blake Eggleston <[email protected]> Committed: Thu Mar 16 15:41:54 2017 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/streaming/StreamSession.java | 21 ++- .../io/sstable/SSTableRewriterTest.java | 4 +- .../cassandra/streaming/StreamSessionTest.java | 132 +++++++++++++++++++ 4 files changed, 151 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6a701ea/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index a162aeb..4f856e2 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Incremental repair not streaming correct sstables (CASSANDRA-13328) * Upgrade the jna version to 4.3.0 (CASSANDRA-13300) * Add the currentTimestamp, currentDate, currentTime and currentTimeUUID functions (CASSANDRA-13132) * Remove config option index_interval (CASSANDRA-10671) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6a701ea/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index b7db2b2..7ee99db 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -26,6 +26,8 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; import com.google.common.collect.*; import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; @@ -320,7 +322,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber flushSSTables(stores); List<Range<Token>> normalizedRanges = Range.normalize(ranges); - List<SSTableStreamingSections> sections = getSSTableSectionsForRanges(normalizedRanges, stores, repairedAt, isIncremental); + List<SSTableStreamingSections> sections = getSSTableSectionsForRanges(normalizedRanges, stores, repairedAt, pendingRepair); try { addTransferFiles(sections); @@ -362,7 +364,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber } @VisibleForTesting - public static List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores, long overriddenRepairedAt, final boolean isIncremental) + public static List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores, long overriddenRepairedAt, UUID pendingRepair) { Refs<SSTableReader> refs = new Refs<>(); try @@ -375,6 +377,16 @@ public class StreamSession implements IEndpointStateChangeSubscriber refs.addAll(cfStore.selectAndReference(view -> { Set<SSTableReader> sstables = Sets.newHashSet(); SSTableIntervalTree intervalTree = SSTableIntervalTree.build(view.select(SSTableSet.CANONICAL)); + Predicate<SSTableReader> predicate; + if (pendingRepair == ActiveRepairService.NO_PENDING_REPAIR) + { + predicate = Predicates.alwaysTrue(); + } + else + { + predicate = s -> s.isPendingRepair() && s.getSSTableMetadata().pendingRepair.equals(pendingRepair); + } + for (Range<PartitionPosition> keyRange : keyRanges) { // keyRange excludes its start, while sstableInBounds is inclusive (of both start and end). @@ -383,10 +395,9 @@ public class StreamSession implements IEndpointStateChangeSubscriber // sort after all keys having the token. That "fake" key cannot however be equal to any real key, so that even // including keyRange.left will still exclude any key having the token of the original token range, and so we're // still actually selecting what we wanted. - for (SSTableReader sstable : View.sstablesInBounds(keyRange.left, keyRange.right, intervalTree)) + for (SSTableReader sstable : Iterables.filter(View.sstablesInBounds(keyRange.left, keyRange.right, intervalTree), predicate)) { - if (!isIncremental || !sstable.isRepaired()) - sstables.add(sstable); + sstables.add(sstable); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6a701ea/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java index 41a6828..b8595af 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java @@ -789,7 +789,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase List<StreamSession.SSTableStreamingSections> sectionsBeforeRewrite = StreamSession.getSSTableSectionsForRanges( Collections.singleton(new Range<Token>(firstToken, firstToken)), - Collections.singleton(cfs), 0L, false); + Collections.singleton(cfs), 0L, null); assertEquals(1, sectionsBeforeRewrite.size()); for (StreamSession.SSTableStreamingSections section : sectionsBeforeRewrite) section.ref.release(); @@ -804,7 +804,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase while (!done.get()) { Set<Range<Token>> range = Collections.singleton(new Range<Token>(firstToken, firstToken)); - List<StreamSession.SSTableStreamingSections> sections = StreamSession.getSSTableSectionsForRanges(range, Collections.singleton(cfs), 0L, false); + List<StreamSession.SSTableStreamingSections> sections = StreamSession.getSSTableSectionsForRanges(range, Collections.singleton(cfs), 0L, null); if (sections.size() != 1) failed.set(true); for (StreamSession.SSTableStreamingSections section : sections) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6a701ea/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java b/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java new file mode 100644 index 0000000..8d388ab --- /dev/null +++ b/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.streaming; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.statements.CreateTableStatement; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.utils.UUIDGen; + +public class StreamSessionTest +{ + private String keyspace = null; + private static final String table = "tbl"; + + private TableMetadata tbm; + private ColumnFamilyStore cfs; + + @BeforeClass + public static void setupClass() throws Exception + { + SchemaLoader.prepareServer(); + } + + @Before + public void createKeyspace() throws Exception + { + keyspace = String.format("ks_%s", System.currentTimeMillis()); + tbm = CreateTableStatement.parse(String.format("CREATE TABLE %s (k INT PRIMARY KEY, v INT)", table), keyspace).build(); + SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), tbm); + cfs = Schema.instance.getColumnFamilyStoreInstance(tbm.id); + } + + private SSTableReader createSSTable(Runnable queryable) + { + Set<SSTableReader> before = cfs.getLiveSSTables(); + queryable.run(); + cfs.forceBlockingFlush(); + Set<SSTableReader> after = cfs.getLiveSSTables(); + + Set<SSTableReader> diff = Sets.difference(after, before); + assert diff.size() == 1 : "Expected 1 new sstable, got " + diff.size(); + return diff.iterator().next(); + } + + private static void mutateRepaired(SSTableReader sstable, long repairedAt, UUID pendingRepair) throws IOException + { + Descriptor descriptor = sstable.descriptor; + descriptor.getMetadataSerializer().mutateRepaired(descriptor, repairedAt, pendingRepair); + sstable.reloadSSTableMetadata(); + + } + + private Set<SSTableReader> selectReaders(UUID pendingRepair) + { + IPartitioner partitioner = DatabaseDescriptor.getPartitioner(); + Collection<Range<Token>> ranges = Lists.newArrayList(new Range<Token>(partitioner.getMinimumToken(), partitioner.getMinimumToken())); + List<StreamSession.SSTableStreamingSections> sections = StreamSession.getSSTableSectionsForRanges(ranges, + Lists.newArrayList(cfs), + ActiveRepairService.UNREPAIRED_SSTABLE, + pendingRepair); + Set<SSTableReader> sstables = new HashSet<>(); + for (StreamSession.SSTableStreamingSections section: sections) + { + sstables.add(section.ref.get()); + } + return sstables; + } + + @Test + public void incrementalSSTableSelection() throws Exception + { + // make 3 tables, 1 unrepaired, 2 pending repair with different repair ids, and 1 repaired + SSTableReader sstable1 = createSSTable(() -> QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (1, 1)", keyspace, table))); + SSTableReader sstable2 = createSSTable(() -> QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (2, 2)", keyspace, table))); + SSTableReader sstable3 = createSSTable(() -> QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (3, 3)", keyspace, table))); + SSTableReader sstable4 = createSSTable(() -> QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (4, 4)", keyspace, table))); + + + UUID pendingRepair = UUIDGen.getTimeUUID(); + long repairedAt = System.currentTimeMillis(); + mutateRepaired(sstable2, ActiveRepairService.UNREPAIRED_SSTABLE, pendingRepair); + mutateRepaired(sstable3, ActiveRepairService.UNREPAIRED_SSTABLE, UUIDGen.getTimeUUID()); + mutateRepaired(sstable4, repairedAt, ActiveRepairService.NO_PENDING_REPAIR); + + // no pending repair should return all sstables + Assert.assertEquals(Sets.newHashSet(sstable1, sstable2, sstable3, sstable4), selectReaders(ActiveRepairService.NO_PENDING_REPAIR)); + + // a pending repair arg should only return sstables with the same pending repair id + Assert.assertEquals(Sets.newHashSet(sstable2), selectReaders(pendingRepair)); + } +}
