This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch cassandra-3.11 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit e1a0db798aedc6fcbb02e3076d545581bad28b0e Merge: 406a859 4d42c18 Author: Marcus Eriksson <marc...@apache.org> AuthorDate: Wed May 20 08:44:06 2020 +0200 Merge branch 'cassandra-3.0' into cassandra-3.11 CHANGES.txt | 1 + src/java/org/apache/cassandra/config/Config.java | 18 ++ .../cassandra/config/DatabaseDescriptor.java | 31 +++ src/java/org/apache/cassandra/db/LegacyLayout.java | 18 +- .../db/compaction/CompactionIterator.java | 5 +- .../db/partitions/AbstractBTreePartition.java | 5 +- .../db/partitions/ImmutableBTreePartition.java | 2 +- .../cassandra/db/partitions/PartitionUpdate.java | 26 ++- .../db/transform/DuplicateRowChecker.java | 139 ++++++++++++ .../org/apache/cassandra/service/ReadCallback.java | 4 +- .../cassandra/service/SnapshotVerbHandler.java | 5 + .../org/apache/cassandra/service/StorageProxy.java | 54 +++++ .../cassandra/service/StorageProxyMBean.java | 13 ++ .../cassandra/utils/DiagnosticSnapshotService.java | 188 ++++++++++++++++ .../cassandra/distributed/impl/Instance.java | 4 +- .../upgrade/MixedModeReadRepairTest.java | 85 +++++++ .../distributed/upgrade/UpgradeTestBase.java | 3 +- .../org/apache/cassandra/db/LegacyLayoutTest.java | 39 +++- .../db/compaction/CompactionIteratorTest.java | 80 ++++++- .../db/partition/PartitionUpdateTest.java | 144 ++++++++++++ .../db/transform/DuplicateRowCheckerTest.java | 246 +++++++++++++++++++++ 21 files changed, 1092 insertions(+), 18 deletions(-) diff --cc CHANGES.txt index 46625b3,b875ae1..3506589 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,7 -1,5 +1,8 @@@ -3.0.21 +3.11.7 + * Fix CQL formatting of read command restrictions for slow query log (CASSANDRA-15503) + * Allow sstableloader to use SSL on the native port (CASSANDRA-14904) +Merged from 3.0: + * Avoid creating duplicate rows during major upgrades (CASSANDRA-15789) * liveDiskSpaceUsed and totalDiskSpaceUsed get corrupted if IndexSummaryRedistribution gets interrupted (CASSANDRA-15674) * Fix Debian init start/stop (CASSANDRA-15770) * Fix infinite loop on index query paging in tables with clustering (CASSANDRA-14242) diff --cc src/java/org/apache/cassandra/config/Config.java index 7f28546,6003bd1..322f1f5 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@@ -397,12 -362,29 +397,30 @@@ public class Confi } /** + * If true, when rows with duplicate clustering keys are detected during a read or compaction + * a snapshot will be taken. In the read case, each a snapshot request will be issued to each + * replica involved in the query, for compaction the snapshot will be created locally. + * These are limited at the replica level so that only a single snapshot per-day can be taken + * via this method. + * + * This requires check_for_duplicate_rows_during_reads and/or check_for_duplicate_rows_during_compaction + * below to be enabled + */ + public volatile boolean snapshot_on_duplicate_row_detection = false; - + /** + * If these are enabled duplicate keys will get logged, and if snapshot_on_duplicate_row_detection + * is enabled, the table will get snapshotted for offline investigation + */ + public volatile boolean check_for_duplicate_rows_during_reads = true; + public volatile boolean check_for_duplicate_rows_during_compaction = true; + - public static boolean isClientMode() - { - return isClientMode; - } - ++ /** + * Client mode means that the process is a pure client, that uses C* code base but does + * not read or write local C* database files. + * + * @deprecated migrate to {@link DatabaseDescriptor#clientInitialization(boolean)} + */ + @Deprecated public static void setClientMode(boolean clientMode) { isClientMode = clientMode; diff --cc src/java/org/apache/cassandra/db/LegacyLayout.java index 09f9cfa,37cc935..4ec0c30 --- a/src/java/org/apache/cassandra/db/LegacyLayout.java +++ b/src/java/org/apache/cassandra/db/LegacyLayout.java @@@ -28,8 -28,9 +28,10 @@@ import java.util.stream.Collectors import org.apache.cassandra.cql3.ColumnIdentifier; import org.apache.cassandra.cql3.SuperColumnCompatibility; +import org.apache.cassandra.config.SchemaConstants; import org.apache.cassandra.utils.AbstractIterator; + + import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.PeekingIterator; diff --cc src/java/org/apache/cassandra/db/compaction/CompactionIterator.java index 9aba938,b132d90..4460d4d --- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java @@@ -17,14 -17,17 +17,16 @@@ */ package org.apache.cassandra.db.compaction; -import java.util.List; -import java.util.UUID; +import java.util.*; import java.util.function.Predicate; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import com.google.common.collect.Ordering; import org.apache.cassandra.config.CFMetaData; + + import org.apache.cassandra.db.transform.DuplicateRowChecker; import org.apache.cassandra.db.*; +import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.partitions.PurgeFunction; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; @@@ -104,8 -106,8 +106,9 @@@ public class CompactionIterator extend ? EmptyIterators.unfilteredPartition(controller.cfs.metadata, false) : UnfilteredPartitionIterators.merge(scanners, nowInSec, listener()); boolean isForThrift = merged.isForThrift(); // to stop capture of iterator in Purger, which is confusing for debug + merged = Transformation.apply(merged, new GarbageSkipper(controller, nowInSec)); - this.compacted = Transformation.apply(merged, new Purger(isForThrift, controller, nowInSec)); + merged = Transformation.apply(merged, new Purger(isForThrift, controller, nowInSec)); + this.compacted = DuplicateRowChecker.duringCompaction(merged, type); } public boolean isForThrift() diff --cc src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java index befbfbb,2cd9e97..34d6d46 --- a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java +++ b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java @@@ -271,12 -284,46 +271,12 @@@ public abstract class AbstractBTreePart } } - public class SliceableIterator extends AbstractIterator implements SliceableUnfilteredRowIterator - { - private Iterator<Unfiltered> iterator; - - protected SliceableIterator(ColumnFilter selection, boolean isReversed) - { - super(selection, isReversed); - } - - protected Unfiltered computeNext() - { - if (iterator == null) - iterator = unfilteredIterator(selection, Slices.ALL, isReverseOrder); - if (!iterator.hasNext()) - return endOfData(); - return iterator.next(); - } - - public Iterator<Unfiltered> slice(Slice slice) - { - return sliceIterator(selection, slice, isReverseOrder, current, staticRow); - } - } - - public SliceableUnfilteredRowIterator sliceableUnfilteredIterator(ColumnFilter columns, boolean reversed) - { - return new SliceableIterator(columns, reversed); - } - - protected SliceableUnfilteredRowIterator sliceableUnfilteredIterator() - { - return sliceableUnfilteredIterator(ColumnFilter.all(metadata), false); - } - protected static Holder build(UnfilteredRowIterator iterator, int initialRowCapacity) { - return build(iterator, initialRowCapacity, true); + return build(iterator, initialRowCapacity, true, null); } - protected static Holder build(UnfilteredRowIterator iterator, int initialRowCapacity, boolean ordered) + protected static Holder build(UnfilteredRowIterator iterator, int initialRowCapacity, boolean ordered, BTree.Builder.QuickResolver<Row> quickResolver) { CFMetaData metadata = iterator.metadata(); PartitionColumns columns = iterator.columns(); diff --cc src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java index 3409079,3560e90..4aca6d2 --- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java +++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java @@@ -217,10 -214,28 +219,30 @@@ public class PartitionUpdate extends Ab * Warning: this method does not close the provided iterator, it is up to * the caller to close it. */ - public static PartitionUpdate fromIterator(UnfilteredRowIterator iterator) + public static PartitionUpdate fromIterator(UnfilteredRowIterator iterator, ColumnFilter filter) { - return fromIterator(iterator, true, null); ++ ++ return fromIterator(iterator, filter, true, null); + } + + private static final NoSpamLogger rowMergingLogger = NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES); + /** + * Removes duplicate rows from incoming iterator, to be used when we can't trust the underlying iterator (like when reading legacy sstables) + */ - public static PartitionUpdate fromPre30Iterator(UnfilteredRowIterator iterator) ++ public static PartitionUpdate fromPre30Iterator(UnfilteredRowIterator iterator, ColumnFilter filter) + { - return fromIterator(iterator, false, (a, b) -> { ++ return fromIterator(iterator, filter, false, (a, b) -> { + CFMetaData cfm = iterator.metadata(); + rowMergingLogger.warn(String.format("Merging rows from pre 3.0 iterator for partition key: %s", + cfm.getKeyValidator().getString(iterator.partitionKey().getKey()))); + return Rows.merge(a, b, FBUtilities.nowInSeconds()); + }); + } + - private static PartitionUpdate fromIterator(UnfilteredRowIterator iterator, boolean ordered, BTree.Builder.QuickResolver<Row> quickResolver) ++ private static PartitionUpdate fromIterator(UnfilteredRowIterator iterator, ColumnFilter filter, boolean ordered, BTree.Builder.QuickResolver<Row> quickResolver) + { + iterator = UnfilteredRowIterators.withOnlyQueriedData(iterator, filter); - Holder holder = build(iterator, 16); + Holder holder = build(iterator, 16, ordered, quickResolver); MutableDeletionInfo deletionInfo = (MutableDeletionInfo) holder.deletionInfo; return new PartitionUpdate(iterator.metadata(), iterator.partitionKey(), holder, deletionInfo, false); } @@@ -911,7 -767,7 +933,7 @@@ try (UnfilteredRowIterator iterator = LegacyLayout.deserializeLegacyPartition(in, version, flag, key)) { assert iterator != null; // This is only used in mutation, and mutation have never allowed "null" column families - return PartitionUpdate.fromIterator(iterator, ColumnFilter.all(iterator.metadata())); - return PartitionUpdate.fromPre30Iterator(iterator); ++ return PartitionUpdate.fromPre30Iterator(iterator, ColumnFilter.all(iterator.metadata())); } } diff --cc src/java/org/apache/cassandra/service/ReadCallback.java index 3ef2fac,71eb0bc..b312852 --- a/src/java/org/apache/cassandra/service/ReadCallback.java +++ b/src/java/org/apache/cassandra/service/ReadCallback.java @@@ -34,7 -32,10 +34,8 @@@ import org.apache.cassandra.concurrent. import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.*; import org.apache.cassandra.db.partitions.PartitionIterator; -import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; -import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.exceptions.RequestFailureReason; + import org.apache.cassandra.db.transform.DuplicateRowChecker; -import org.apache.cassandra.db.transform.Transformation; import org.apache.cassandra.exceptions.ReadFailureException; import org.apache.cassandra.exceptions.ReadTimeoutException; import org.apache.cassandra.exceptions.UnavailableException; @@@ -144,8 -143,8 +146,8 @@@ public class ReadCallback implements IA PartitionIterator result = blockfor == 1 ? resolver.getData() : resolver.resolve(); if (logger.isTraceEnabled()) - logger.trace("Read: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); + logger.trace("Read: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - queryStartNanoTime)); - return result; + return DuplicateRowChecker.duringRead(result, endpoints); } public int blockFor() diff --cc src/java/org/apache/cassandra/service/StorageProxyMBean.java index 8678dde,047934c..cdf07f4 --- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java +++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java @@@ -65,5 -67,14 +67,16 @@@ public interface StorageProxyMBea /** Returns each live node's schema version */ public Map<String, List<String>> getSchemaVersions(); + public int getNumberOfTables(); ++ + void enableSnapshotOnDuplicateRowDetection(); + void disableSnapshotOnDuplicateRowDetection(); + boolean getSnapshotOnDuplicateRowDetectionEnabled(); + + boolean getCheckForDuplicateRowsDuringReads(); + void enableCheckForDuplicateRowsDuringReads(); + void disableCheckForDuplicateRowsDuringReads(); + boolean getCheckForDuplicateRowsDuringCompaction(); + void enableCheckForDuplicateRowsDuringCompaction(); + void disableCheckForDuplicateRowsDuringCompaction(); } diff --cc test/distributed/org/apache/cassandra/distributed/impl/Instance.java index 47ad405,d23eec0..5bb449e --- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java @@@ -99,8 -99,11 +99,9 @@@ import org.apache.cassandra.tools.NodeT import org.apache.cassandra.tracing.TraceState; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.transport.messages.ResultMessage; + import org.apache.cassandra.utils.DiagnosticSnapshotService; import org.apache.cassandra.utils.ExecutorUtils; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.NanoTimeToCurrentTimeMillis; -import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.Throwables; import org.apache.cassandra.utils.UUIDGen; import org.apache.cassandra.utils.concurrent.Ref; @@@ -649,9 -698,11 +650,10 @@@ public class Instance extends IsolatedE () -> ColumnFamilyStore.shutdownExecutorsAndWait(1L, MINUTES), () -> PendingRangeCalculatorService.instance.shutdownExecutor(1L, MINUTES), () -> BufferPool.shutdownLocalCleaner(1L, MINUTES), - () -> StorageService.instance.shutdownBGMonitorAndWait(1L, MINUTES), () -> Ref.shutdownReferenceReaper(1L, MINUTES), () -> Memtable.MEMORY_POOL.shutdownAndWait(1L, MINUTES), - () -> SSTableReader.shutdownBlocking(1L, MINUTES) + () -> SSTableReader.shutdownBlocking(1L, MINUTES), + () -> DiagnosticSnapshotService.instance.shutdownAndWait(1L, MINUTES) ); error = parallelRun(error, executor, () -> ScheduledExecutors.shutdownAndWait(1L, MINUTES), diff --cc test/unit/org/apache/cassandra/db/LegacyLayoutTest.java index fc7c4c4,0bb2459..1bc3af6 --- a/test/unit/org/apache/cassandra/db/LegacyLayoutTest.java +++ b/test/unit/org/apache/cassandra/db/LegacyLayoutTest.java @@@ -24,8 -24,9 +24,10 @@@ import java.nio.file.Files import java.nio.file.Path; import java.nio.file.Paths; +import org.junit.AfterClass; import org.apache.cassandra.db.filter.ColumnFilter; + import org.apache.cassandra.db.marshal.MapType; + import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.db.partitions.ImmutableBTreePartition; import org.apache.cassandra.db.rows.BufferCell; import org.apache.cassandra.db.rows.Cell; @@@ -382,4 -372,39 +384,39 @@@ public class LegacyLayoutTes LegacyLayout.fromUnfilteredRowIterator(null, p.unfilteredIterator()); LegacyLayout.serializedSizeAsLegacyPartition(null, p.unfilteredIterator(), VERSION_21); } - } + + @Test + public void testCellGrouper() + { + // CREATE TABLE %s (pk int, ck int, v map<text, text>, PRIMARY KEY (pk, ck)) + CFMetaData cfm = CFMetaData.Builder.create("ks", "table") + .addPartitionKey("pk", Int32Type.instance) + .addClusteringColumn("ck", Int32Type.instance) + .addRegularColumn("v", MapType.getInstance(UTF8Type.instance, UTF8Type.instance, true)) + .build(); + SerializationHelper helper = new SerializationHelper(cfm, MessagingService.VERSION_22, SerializationHelper.Flag.LOCAL, ColumnFilter.all(cfm)); + LegacyLayout.CellGrouper cg = new LegacyLayout.CellGrouper(cfm, helper); + - Slice.Bound startBound = Slice.Bound.create(ClusteringPrefix.Kind.INCL_START_BOUND, new ByteBuffer[] {ByteBufferUtil.bytes(2)}); - Slice.Bound endBound = Slice.Bound.create(ClusteringPrefix.Kind.EXCL_END_BOUND, new ByteBuffer[] {ByteBufferUtil.bytes(2)}); ++ ClusteringBound startBound = ClusteringBound.create(ClusteringPrefix.Kind.INCL_START_BOUND, new ByteBuffer[] {ByteBufferUtil.bytes(2)}); ++ ClusteringBound endBound = ClusteringBound.create(ClusteringPrefix.Kind.EXCL_END_BOUND, new ByteBuffer[] {ByteBufferUtil.bytes(2)}); + LegacyLayout.LegacyBound start = new LegacyLayout.LegacyBound(startBound, false, cfm.getColumnDefinition(ByteBufferUtil.bytes("v"))); + LegacyLayout.LegacyBound end = new LegacyLayout.LegacyBound(endBound, false, cfm.getColumnDefinition(ByteBufferUtil.bytes("v"))); + LegacyLayout.LegacyRangeTombstone lrt = new LegacyLayout.LegacyRangeTombstone(start, end, new DeletionTime(2, 1588598040)); + assertTrue(cg.addAtom(lrt)); + + // add a real cell + LegacyLayout.LegacyCell cell = new LegacyLayout.LegacyCell(LegacyLayout.LegacyCell.Kind.REGULAR, - new LegacyLayout.LegacyCellName(new Clustering(ByteBufferUtil.bytes(2)), ++ new LegacyLayout.LegacyCellName(Clustering.make(ByteBufferUtil.bytes(2)), + cfm.getColumnDefinition(ByteBufferUtil.bytes("v")), + ByteBufferUtil.bytes("g")), + ByteBufferUtil.bytes("v"), 3, Integer.MAX_VALUE, 0); + assertTrue(cg.addAtom(cell)); + + // add legacy range tombstone where collection name is null for the end bound (this gets translated to a row tombstone) - startBound = Slice.Bound.create(ClusteringPrefix.Kind.EXCL_START_BOUND, new ByteBuffer[] {ByteBufferUtil.bytes(2)}); - endBound = Slice.Bound.create(ClusteringPrefix.Kind.EXCL_END_BOUND, new ByteBuffer[] {ByteBufferUtil.bytes(2)}); ++ startBound = ClusteringBound.create(ClusteringPrefix.Kind.EXCL_START_BOUND, new ByteBuffer[] {ByteBufferUtil.bytes(2)}); ++ endBound = ClusteringBound.create(ClusteringPrefix.Kind.EXCL_END_BOUND, new ByteBuffer[] {ByteBufferUtil.bytes(2)}); + start = new LegacyLayout.LegacyBound(startBound, false, cfm.getColumnDefinition(ByteBufferUtil.bytes("v"))); + end = new LegacyLayout.LegacyBound(endBound, false, null); + assertTrue(cg.addAtom(new LegacyLayout.LegacyRangeTombstone(start, end, new DeletionTime(1, 1588598040)))); + } + } diff --cc test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java index dc5fd06,549a94d..58c5a00 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java @@@ -17,379 -17,118 +17,455 @@@ */ package org.apache.cassandra.db.compaction; + import static org.apache.cassandra.db.transform.DuplicateRowCheckerTest.assertCommandIssued; -import static org.apache.cassandra.db.transform.DuplicateRowCheckerTest.iter; + import static org.apache.cassandra.db.transform.DuplicateRowCheckerTest.makeRow; ++import static org.apache.cassandra.db.transform.DuplicateRowCheckerTest.rows; import static org.junit.Assert.*; + import java.net.InetAddress; import java.util.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import com.google.common.collect.*; import org.junit.Test; +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; + import org.apache.cassandra.cql3.CQLTester; -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.io.sstable.ISSTableScanner; - import org.apache.cassandra.io.sstable.metadata.MetadataCollector; -import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.net.*; ++import org.apache.cassandra.net.IMessageSink; ++import org.apache.cassandra.net.MessageIn; ++import org.apache.cassandra.net.MessageOut; ++import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.KeyspaceParams; ++import org.apache.cassandra.utils.ByteBufferUtil; + import org.apache.cassandra.utils.FBUtilities; - public class CompactionIteratorTest + public class CompactionIteratorTest extends CQLTester { + + private static final int NOW = 1000; + private static final int GC_BEFORE = 100; + private static final String KSNAME = "CompactionIteratorTest"; + private static final String CFNAME = "Integer1"; + + static final DecoratedKey kk; + static final CFMetaData metadata; + private static final int RANGE = 1000; + private static final int COUNT = 100; + + Map<List<Unfiltered>, DeletionTime> deletionTimes = new HashMap<>(); + + static { + DatabaseDescriptor.daemonInitialization(); + + kk = Util.dk("key"); + + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KSNAME, + KeyspaceParams.simple(1), + metadata = SchemaLoader.standardCFMD(KSNAME, + CFNAME, + 1, + UTF8Type.instance, + Int32Type.instance, + Int32Type.instance)); + } + + // See org.apache.cassandra.db.rows.UnfilteredRowsGenerator.parse for the syntax used in these tests. + + @Test + public void testGcCompactionSupersedeLeft() + { + testCompaction(new String[] { + "5<=[140] 10[150] [140]<20 22<[130] [130]<25 30[150]" + }, new String[] { + "7<[160] 15[180] [160]<30 40[120]" + }, + 3); + } + + @Test + public void testGcCompactionSupersedeMiddle() + { + testCompaction(new String[] { + "5<=[140] 10[150] [140]<40 60[150]" + }, new String[] { + "7<=[160] 15[180] [160]<=30 40[120]" + }, + 3); + } + + @Test + public void testGcCompactionSupersedeRight() + { + testCompaction(new String[] { + "9<=[140] 10[150] [140]<40 60[150]" + }, new String[] { + "7<[160] 15[180] [160]<30 40[120]" + }, + 3); + } + + @Test + public void testGcCompactionSwitchInSuperseded() + { + testCompaction(new String[] { + "5<=[140] 10[150] [140]<20 20<=[170] [170]<=50 60[150]" + }, new String[] { + "7<[160] 15[180] [160]<30 40[120]" + }, + 5); + } + + @Test + public void testGcCompactionBoundaries() + { + testCompaction(new String[] { + "5<=[120] [120]<9 9<=[140] 10[150] [140]<40 40<=[120] 60[150] [120]<90" + }, new String[] { + "7<[160] 15[180] [160]<30 40[120] 45<[140] [140]<80 88<=[130] [130]<100" + }, + 7); + } + + @Test + public void testGcCompactionMatches() + { + testCompaction(new String[] { + "5<=[120] [120]<=9 9<[140] 10[150] [140]<40 40<=[120] 60[150] [120]<90 120<=[100] [100]<130" + }, new String[] { + "9<[160] 15[180] [160]<40 40[120] 45<[140] [140]<90 90<=[110] [110]<100 120<=[100] [100]<130" + }, + 5); + } + + @Test + public void testGcCompactionRowDeletion() + { + testCompaction(new String[] { + "10[150] 20[160] 25[160] 30[170] 40[120] 50[120]" + }, new String[] { + "10<=[155] 20[200D180] 30[200D160] [155]<=30 40[150D130] 50[150D100]" + }, + "25[160] 30[170] 50[120]"); + } + + @Test + public void testGcCompactionPartitionDeletion() + { + testCompaction(new String[] { + "10[150] 20[160] 25[160] 30[170] 40[120] 50[120]" + }, new String[] { + // Dxx| stands for partition deletion at time xx + "D165|10<=[155] 20[200D180] 30[200D160] [155]<=30 40[150D130] 50[150D100]" + }, + "30[170]"); + } + + void testCompaction(String[] inputs, String[] tombstones, String expected) + { + testNonGcCompaction(inputs, tombstones); + + UnfilteredRowsGenerator generator = new UnfilteredRowsGenerator(metadata.comparator, false); + List<List<Unfiltered>> inputLists = parse(inputs, generator); + List<List<Unfiltered>> tombstoneLists = parse(tombstones, generator); + List<Unfiltered> result = compact(inputLists, tombstoneLists); + System.out.println("GC compaction resulted in " + size(result) + " Unfiltereds"); + generator.verifyValid(result); + verifyEquivalent(inputLists, result, tombstoneLists, generator); + List<Unfiltered> expectedResult = generator.parse(expected, NOW - 1); + if (!expectedResult.equals(result)) + fail("Expected " + expected + ", got " + generator.str(result)); + } + + void testCompaction(String[] inputs, String[] tombstones, int expectedCount) + { + testNonGcCompaction(inputs, tombstones); + + UnfilteredRowsGenerator generator = new UnfilteredRowsGenerator(metadata.comparator, false); + List<List<Unfiltered>> inputLists = parse(inputs, generator); + List<List<Unfiltered>> tombstoneLists = parse(tombstones, generator); + List<Unfiltered> result = compact(inputLists, tombstoneLists); + System.out.println("GC compaction resulted in " + size(result) + " Unfiltereds"); + generator.verifyValid(result); + verifyEquivalent(inputLists, result, tombstoneLists, generator); + if (size(result) > expectedCount) + fail("Expected compaction with " + expectedCount + " elements, got " + size(result) + ": " + generator.str(result)); + } + + int testNonGcCompaction(String[] inputs, String[] tombstones) + { + UnfilteredRowsGenerator generator = new UnfilteredRowsGenerator(metadata.comparator, false); + List<List<Unfiltered>> inputLists = parse(inputs, generator); + List<List<Unfiltered>> tombstoneLists = parse(tombstones, generator); + List<Unfiltered> result = compact(inputLists, Collections.emptyList()); + System.out.println("Non-GC compaction resulted in " + size(result) + " Unfiltereds"); + generator.verifyValid(result); + verifyEquivalent(inputLists, result, tombstoneLists, generator); + return size(result); + } + + private static int size(List<Unfiltered> data) + { + return data.stream().mapToInt(x -> x instanceof RangeTombstoneBoundaryMarker ? 2 : 1).sum(); + } + + private void verifyEquivalent(List<List<Unfiltered>> sources, List<Unfiltered> result, List<List<Unfiltered>> tombstoneSources, UnfilteredRowsGenerator generator) + { + // sources + tombstoneSources must be the same as result + tombstoneSources + List<Unfiltered> expected = compact(Iterables.concat(sources, tombstoneSources), Collections.emptyList()); + List<Unfiltered> actual = compact(Iterables.concat(ImmutableList.of(result), tombstoneSources), Collections.emptyList()); + if (!expected.equals(actual)) + { + System.out.println("Equivalence test failure between sources:"); + for (List<Unfiltered> partition : sources) + generator.dumpList(partition); + System.out.println("and compacted " + generator.str(result)); + System.out.println("with tombstone sources:"); + for (List<Unfiltered> partition : tombstoneSources) + generator.dumpList(partition); + System.out.println("expected " + generator.str(expected)); + System.out.println("got " + generator.str(actual)); + fail("Failed equivalence test."); + } + } + + private List<List<Unfiltered>> parse(String[] inputs, UnfilteredRowsGenerator generator) + { + return ImmutableList.copyOf(Lists.transform(Arrays.asList(inputs), x -> parse(x, generator))); + } + + private List<Unfiltered> parse(String input, UnfilteredRowsGenerator generator) + { + Matcher m = Pattern.compile("D(\\d+)\\|").matcher(input); + if (m.lookingAt()) + { + int del = Integer.parseInt(m.group(1)); + input = input.substring(m.end()); + List<Unfiltered> list = generator.parse(input, NOW - 1); + deletionTimes.put(list, new DeletionTime(del, del)); + return list; + } + else + return generator.parse(input, NOW - 1); + } + + private List<Unfiltered> compact(Iterable<List<Unfiltered>> sources, Iterable<List<Unfiltered>> tombstoneSources) + { + List<Iterable<UnfilteredRowIterator>> content = ImmutableList.copyOf(Iterables.transform(sources, list -> ImmutableList.of(listToIterator(list, kk)))); + Map<DecoratedKey, Iterable<UnfilteredRowIterator>> transformedSources = new TreeMap<>(); + transformedSources.put(kk, Iterables.transform(tombstoneSources, list -> listToIterator(list, kk))); + try (CompactionController controller = new Controller(Keyspace.openAndGetStore(metadata), transformedSources, GC_BEFORE); + CompactionIterator iter = new CompactionIterator(OperationType.COMPACTION, + Lists.transform(content, x -> new Scanner(x)), + controller, NOW, null)) + { + List<Unfiltered> result = new ArrayList<>(); + assertTrue(iter.hasNext()); + try (UnfilteredRowIterator partition = iter.next()) + { + Iterators.addAll(result, partition); + } + assertFalse(iter.hasNext()); + return result; + } + } + + private UnfilteredRowIterator listToIterator(List<Unfiltered> list, DecoratedKey key) + { + return UnfilteredRowsGenerator.source(list, metadata, key, deletionTimes.getOrDefault(list, DeletionTime.LIVE)); + } + + NavigableMap<DecoratedKey, List<Unfiltered>> generateContent(Random rand, UnfilteredRowsGenerator generator, + List<DecoratedKey> keys, int pcount, int rcount) + { + NavigableMap<DecoratedKey, List<Unfiltered>> map = new TreeMap<>(); + for (int i = 0; i < pcount; ++i) + { + DecoratedKey key = keys.get(rand.nextInt(keys.size())); + map.put(key, generator.generateSource(rand, rcount, RANGE, NOW - 5, x -> NOW - 1)); + } + return map; + } + + @Test + public void testRandom() + { + UnfilteredRowsGenerator generator = new UnfilteredRowsGenerator(metadata.comparator, false); + for (int seed = 1; seed < 100; ++seed) + { + Random rand = new Random(seed); + List<List<Unfiltered>> sources = new ArrayList<>(); + for (int i = 0; i < 10; ++i) + sources.add(generator.generateSource(rand, COUNT, RANGE, NOW - 5, x -> NOW - 15)); + int srcSz = sources.stream().mapToInt(CompactionIteratorTest::size).sum(); + List<List<Unfiltered>> tombSources = new ArrayList<>(); + for (int i = 0; i < 10; ++i) + sources.add(generator.generateSource(rand, COUNT, RANGE, NOW - 5, x -> NOW - 15)); + List<Unfiltered> result = compact(sources, tombSources); + verifyEquivalent(sources, result, tombSources, generator); + assertTrue(size(result) < srcSz); + } + } + + class Controller extends CompactionController + { + private final Map<DecoratedKey, Iterable<UnfilteredRowIterator>> tombstoneSources; + + public Controller(ColumnFamilyStore cfs, Map<DecoratedKey, Iterable<UnfilteredRowIterator>> tombstoneSources, int gcBefore) + { + super(cfs, Collections.emptySet(), gcBefore); + this.tombstoneSources = tombstoneSources; + } + + @Override + public Iterable<UnfilteredRowIterator> shadowSources(DecoratedKey key, boolean tombstoneOnly) + { + assert tombstoneOnly; + return tombstoneSources.get(key); + } + } + + class Scanner extends AbstractUnfilteredPartitionIterator implements ISSTableScanner + { + Iterator<UnfilteredRowIterator> iter; + + Scanner(Iterable<UnfilteredRowIterator> content) + { + iter = content.iterator(); + } + + @Override + public boolean isForThrift() + { + return false; + } + + @Override + public CFMetaData metadata() + { + return metadata; + } + + @Override + public boolean hasNext() + { + return iter.hasNext(); + } + + @Override + public UnfilteredRowIterator next() + { + return iter.next(); + } + + @Override + public long getLengthInBytes() + { + return 0; + } + + @Override + public long getCurrentPosition() + { + return 0; + } + + @Override + public long getBytesScanned() + { + return 0; + } + + @Override + public long getCompressedLengthInBytes() + { + return 0; + } + + @Override + public String getBackingFiles() + { + return null; + } + } ++ + @Test + public void duplicateRowsTest() throws Throwable + { + System.setProperty("cassandra.diagnostic_snapshot_interval_nanos", "0"); + // Create a table and insert some data. The actual rows read in the test will be synthetic + // but this creates an sstable on disk to be snapshotted. + createTable("CREATE TABLE %s (pk text, ck1 int, ck2 int, v int, PRIMARY KEY (pk, ck1, ck2))"); + for (int i = 0; i < 10; i++) + execute("insert into %s (pk, ck1, ck2, v) values (?, ?, ?, ?)", "key", i, i, i); + flush(); + + DatabaseDescriptor.setSnapshotOnDuplicateRowDetection(true); - ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + CFMetaData metadata = getCurrentColumnFamilyStore().metadata; + + final HashMap<InetAddress, MessageOut> sentMessages = new HashMap<>(); + IMessageSink sink = new IMessageSink() + { + public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to) + { + sentMessages.put(to, message); + return false; + } + + public boolean allowIncomingMessage(MessageIn message, int id) + { + return false; + } + }; + MessagingService.instance().addMessageSink(sink); + + // no duplicates + sentMessages.clear(); - iterate(cfs, iter(metadata, - false, - makeRow(metadata,0, 0), - makeRow(metadata,0, 1), - makeRow(metadata,0, 2))); ++ iterate(makeRow(metadata,0, 0), ++ makeRow(metadata,0, 1), ++ makeRow(metadata,0, 2)); + assertCommandIssued(sentMessages, false); + + // now test with a duplicate row and see that we issue a snapshot command + sentMessages.clear(); - iterate(cfs, iter(metadata, - false, - makeRow(metadata, 0, 0), - makeRow(metadata, 0, 1), - makeRow(metadata, 0, 1))); ++ iterate(makeRow(metadata, 0, 0), ++ makeRow(metadata, 0, 1), ++ makeRow(metadata, 0, 1)); + assertCommandIssued(sentMessages, true); + } + - private void iterate(ColumnFamilyStore cfs, UnfilteredPartitionIterator partitions) ++ private void iterate(Unfiltered...unfiltereds) + { - - try (CompactionController controller = new CompactionController(getCurrentColumnFamilyStore(), Integer.MAX_VALUE); - ISSTableScanner scanner = scanner(cfs, partitions); ++ ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); ++ DecoratedKey key = cfs.metadata.partitioner.decorateKey(ByteBufferUtil.bytes("key")); ++ try (CompactionController controller = new CompactionController(cfs, Integer.MAX_VALUE); ++ UnfilteredRowIterator rows = rows(metadata, key, false, unfiltereds); ++ ISSTableScanner scanner = new Scanner(Collections.singletonList(rows)); + CompactionIterator iter = new CompactionIterator(OperationType.COMPACTION, + Collections.singletonList(scanner), + controller, FBUtilities.nowInSeconds(), null)) + { + while (iter.hasNext()) + { + try (UnfilteredRowIterator partition = iter.next()) + { + partition.forEachRemaining(u -> {}); + } + } + } + } - - private ISSTableScanner scanner(final ColumnFamilyStore cfs, final UnfilteredPartitionIterator partitions) - { - - return new ISSTableScanner() - { - public long getLengthInBytes() { return 0; } - - public long getCurrentPosition() { return 0; } - - public String getBackingFiles() { return cfs.getLiveSSTables().iterator().next().toString(); } - - public boolean isForThrift() { return false; } - - public CFMetaData metadata() { return cfs.metadata; } - - public void close() { } - - public boolean hasNext() { return partitions.hasNext(); } - - public UnfilteredRowIterator next() { return partitions.next(); } - }; - } } diff --cc test/unit/org/apache/cassandra/db/partition/PartitionUpdateTest.java index 0330b65,2bd685c..df23e4f --- a/test/unit/org/apache/cassandra/db/partition/PartitionUpdateTest.java +++ b/test/unit/org/apache/cassandra/db/partition/PartitionUpdateTest.java @@@ -17,11 -17,34 +17,35 @@@ */ package org.apache.cassandra.db.partition; +import org.apache.cassandra.UpdateBuilder; + import java.util.ArrayList; + import java.util.Collections; + import java.util.List; + + import com.google.common.collect.Lists; + import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.cql3.CQLTester; + import org.apache.cassandra.db.Clustering; + import org.apache.cassandra.db.DecoratedKey; + import org.apache.cassandra.db.DeletionTime; + import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.RowUpdateBuilder; + import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.partitions.PartitionUpdate; + import org.apache.cassandra.db.rows.BTreeRow; + import org.apache.cassandra.db.rows.BufferCell; + import org.apache.cassandra.db.rows.Cell; + import org.apache.cassandra.db.rows.CellPath; + import org.apache.cassandra.db.rows.EncodingStats; + import org.apache.cassandra.db.rows.Row; + import org.apache.cassandra.db.rows.RowAndDeletionMergeIterator; + import org.apache.cassandra.db.rows.Rows; + import org.apache.cassandra.db.rows.UnfilteredRowIterator; + import org.apache.cassandra.dht.Murmur3Partitioner; + import org.apache.cassandra.io.sstable.ISSTableScanner; + import org.apache.cassandra.io.sstable.format.SSTableReader; + import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.junit.Test; @@@ -84,4 -112,121 +111,121 @@@ public class PartitionUpdateTest extend update = new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), "key0").buildUpdate(); Assert.assertEquals(0, update.operationCount()); } + + /** + * Makes sure we merge duplicate rows, see CASSANDRA-15789 + */ + @Test + public void testDuplicate() + { + createTable("CREATE TABLE %s (pk int, ck int, v map<text, text>, PRIMARY KEY (pk, ck))"); + CFMetaData cfm = currentTableMetadata(); + + DecoratedKey dk = Murmur3Partitioner.instance.decorateKey(ByteBufferUtil.bytes(1)); + + List<Row> rows = new ArrayList<>(); + Row.Builder builder = BTreeRow.unsortedBuilder(FBUtilities.nowInSeconds()); - builder.newRow(new Clustering(ByteBufferUtil.bytes(2))); ++ builder.newRow(Clustering.make(ByteBufferUtil.bytes(2))); + builder.addComplexDeletion(cfm.getColumnDefinition(ByteBufferUtil.bytes("v")), new DeletionTime(2, 1588586647)); + - Cell c = BufferCell.live(cfm, cfm.getColumnDefinition(ByteBufferUtil.bytes("v")), 3, ByteBufferUtil.bytes("h"), CellPath.create(ByteBufferUtil.bytes("g"))); ++ Cell c = BufferCell.live(cfm.getColumnDefinition(ByteBufferUtil.bytes("v")), 3, ByteBufferUtil.bytes("h"), CellPath.create(ByteBufferUtil.bytes("g"))); + builder.addCell(c); + + Row r = builder.build(); + rows.add(r); + - builder.newRow(new Clustering(ByteBufferUtil.bytes(2))); ++ builder.newRow(Clustering.make(ByteBufferUtil.bytes(2))); + builder.addRowDeletion(new Row.Deletion(new DeletionTime(1588586647, 1), false)); + r = builder.build(); + rows.add(r); + + RowAndDeletionMergeIterator rmi = new RowAndDeletionMergeIterator(cfm, + dk, + DeletionTime.LIVE, + ColumnFilter.all(cfm), + Rows.EMPTY_STATIC_ROW, + false, + EncodingStats.NO_STATS, + rows.iterator(), + Collections.emptyIterator(), + true); + - PartitionUpdate pu = PartitionUpdate.fromPre30Iterator(rmi); ++ PartitionUpdate pu = PartitionUpdate.fromPre30Iterator(rmi, ColumnFilter.all(cfm)); + pu.iterator(); + + Mutation m = new Mutation(getCurrentColumnFamilyStore().keyspace.getName(), dk); + m.add(pu); + m.apply(); + getCurrentColumnFamilyStore().forceBlockingFlush(); + + SSTableReader sst = getCurrentColumnFamilyStore().getLiveSSTables().iterator().next(); + int count = 0; + try (ISSTableScanner scanner = sst.getScanner()) + { + while (scanner.hasNext()) + { + try (UnfilteredRowIterator iter = scanner.next()) + { + while (iter.hasNext()) + { + iter.next(); + count++; + } + } + } + } + assertEquals(1, count); + } + + /** + * Makes sure we don't create duplicates when merging 2 partition updates + */ + @Test + public void testMerge() + { + createTable("CREATE TABLE %s (pk int, ck int, v map<text, text>, PRIMARY KEY (pk, ck))"); + CFMetaData cfm = currentTableMetadata(); + + DecoratedKey dk = Murmur3Partitioner.instance.decorateKey(ByteBufferUtil.bytes(1)); + + Row.Builder builder = BTreeRow.unsortedBuilder(FBUtilities.nowInSeconds()); - builder.newRow(new Clustering(ByteBufferUtil.bytes(2))); ++ builder.newRow(Clustering.make(ByteBufferUtil.bytes(2))); + builder.addComplexDeletion(cfm.getColumnDefinition(ByteBufferUtil.bytes("v")), new DeletionTime(2, 1588586647)); - Cell c = BufferCell.live(cfm, cfm.getColumnDefinition(ByteBufferUtil.bytes("v")), 3, ByteBufferUtil.bytes("h"), CellPath.create(ByteBufferUtil.bytes("g"))); ++ Cell c = BufferCell.live(cfm.getColumnDefinition(ByteBufferUtil.bytes("v")), 3, ByteBufferUtil.bytes("h"), CellPath.create(ByteBufferUtil.bytes("g"))); + builder.addCell(c); + Row r = builder.build(); + + PartitionUpdate p1 = new PartitionUpdate(cfm, dk, cfm.partitionColumns(), 2); + p1.add(r); + - builder.newRow(new Clustering(ByteBufferUtil.bytes(2))); ++ builder.newRow(Clustering.make(ByteBufferUtil.bytes(2))); + builder.addRowDeletion(new Row.Deletion(new DeletionTime(1588586647, 1), false)); + r = builder.build(); + PartitionUpdate p2 = new PartitionUpdate(cfm, dk, cfm.partitionColumns(), 2); + p2.add(r); + + Mutation m = new Mutation(getCurrentColumnFamilyStore().keyspace.getName(), dk); + m.add(PartitionUpdate.merge(Lists.newArrayList(p1, p2))); + m.apply(); + + getCurrentColumnFamilyStore().forceBlockingFlush(); + + SSTableReader sst = getCurrentColumnFamilyStore().getLiveSSTables().iterator().next(); + int count = 0; + try (ISSTableScanner scanner = sst.getScanner()) + { + while (scanner.hasNext()) + { + try (UnfilteredRowIterator iter = scanner.next()) + { + while (iter.hasNext()) + { + iter.next(); + count++; + } + } + } + } + assertEquals(1, count); + } } diff --cc test/unit/org/apache/cassandra/db/transform/DuplicateRowCheckerTest.java index 0000000,78a0c8c..432bce3 mode 000000,100644..100644 --- a/test/unit/org/apache/cassandra/db/transform/DuplicateRowCheckerTest.java +++ b/test/unit/org/apache/cassandra/db/transform/DuplicateRowCheckerTest.java @@@ -1,0 -1,240 +1,246 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.cassandra.db.transform; + + import java.net.InetAddress; + import java.nio.ByteBuffer; + import java.util.Collections; + import java.util.HashMap; + import java.util.Iterator; + + import com.google.common.collect.Iterators; + import org.junit.*; + + import org.apache.cassandra.config.CFMetaData; + import org.apache.cassandra.config.DatabaseDescriptor; + import org.apache.cassandra.cql3.CQLTester; + import org.apache.cassandra.db.*; + import org.apache.cassandra.db.marshal.AbstractType; + import org.apache.cassandra.db.partitions.*; + import org.apache.cassandra.db.rows.*; + import org.apache.cassandra.net.*; + import org.apache.cassandra.utils.DiagnosticSnapshotService; + import org.apache.cassandra.utils.FBUtilities; + + import static org.apache.cassandra.utils.ByteBufferUtil.bytes; + import static org.junit.Assert.assertEquals; + import static org.junit.Assert.assertTrue; + + public class DuplicateRowCheckerTest extends CQLTester + { + ColumnFamilyStore cfs; + CFMetaData metadata; + static HashMap<InetAddress, MessageOut> sentMessages; + + @BeforeClass + public static void setupMessaging() + { + sentMessages = new HashMap<>(); + IMessageSink sink = new IMessageSink() + { + public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to) + { + sentMessages.put(to, message); + return false; + } + + public boolean allowIncomingMessage(MessageIn message, int id) + { + return false; + } + }; + MessagingService.instance().addMessageSink(sink); + } + + @Before + public void setup() throws Throwable + { + DatabaseDescriptor.setSnapshotOnDuplicateRowDetection(true); + System.setProperty("cassandra.diagnostic_snapshot_interval_nanos", "0"); + // Create a table and insert some data. The actual rows read in the test will be synthetic + // but this creates an sstable on disk to be snapshotted. + createTable("CREATE TABLE %s (pk text, ck1 int, ck2 int, v int, PRIMARY KEY (pk, ck1, ck2))"); + for (int i = 0; i < 10; i++) + execute("insert into %s (pk, ck1, ck2, v) values (?, ?, ?, ?)", "key", i, i, i); + getCurrentColumnFamilyStore().forceBlockingFlush(); + + metadata = getCurrentColumnFamilyStore().metadata; + cfs = getCurrentColumnFamilyStore(); + sentMessages.clear(); + } + + @Test + public void noDuplicates() + { + // no duplicates + iterate(iter(metadata, + false, + makeRow(metadata, 0, 0), + makeRow(metadata, 0, 1), + makeRow(metadata, 0, 2))); + assertCommandIssued(sentMessages, false); + } + + @Test + public void singleDuplicateForward() + { + + iterate(iter(metadata, + false, + makeRow(metadata, 0, 0), + makeRow(metadata, 0, 1), + makeRow(metadata, 0, 1))); + assertCommandIssued(sentMessages, true); + } + + @Test + public void singleDuplicateReverse() + { + iterate(iter(metadata, + true, + makeRow(metadata, 0, 0), + makeRow(metadata, 0, 1), + makeRow(metadata, 0, 1))); + assertCommandIssued(sentMessages, true); + } + + @Test + public void multipleContiguousForward() + { + iterate(iter(metadata, + false, + makeRow(metadata, 0, 1), + makeRow(metadata, 0, 1), + makeRow(metadata, 0, 1))); + assertCommandIssued(sentMessages, true); + } + + @Test + public void multipleContiguousReverse() + { + iterate(iter(metadata, + true, + makeRow(metadata, 0, 1), + makeRow(metadata, 0, 1), + makeRow(metadata, 0, 1))); + assertCommandIssued(sentMessages, true); + } + + @Test + public void multipleDisjointForward() + { + iterate(iter(metadata, + false, + makeRow(metadata, 0, 0), + makeRow(metadata, 0, 0), + makeRow(metadata, 0, 1), + makeRow(metadata, 0, 2), + makeRow(metadata, 0, 2))); + assertCommandIssued(sentMessages, true); + } + + @Test + public void multipleDisjointReverse() + { + iterate(iter(metadata, + true, + makeRow(metadata, 0, 0), + makeRow(metadata, 0, 0), + makeRow(metadata, 0, 1), + makeRow(metadata, 0, 2), + makeRow(metadata, 0, 2))); + assertCommandIssued(sentMessages, true); + } + + public static void assertCommandIssued(HashMap<InetAddress, MessageOut> sent, boolean isExpected) + { + assertEquals(isExpected, !sent.isEmpty()); + if (isExpected) + { + assertEquals(1, sent.size()); + assertTrue(sent.containsKey(FBUtilities.getBroadcastAddress())); + SnapshotCommand command = (SnapshotCommand) sent.get(FBUtilities.getBroadcastAddress()).payload; + assertTrue(command.snapshot_name.startsWith(DiagnosticSnapshotService.DUPLICATE_ROWS_DETECTED_SNAPSHOT_PREFIX)); + } + } + + private void iterate(UnfilteredPartitionIterator iter) + { + try (PartitionIterator partitions = applyChecker(iter)) + { + while (partitions.hasNext()) + { + try (RowIterator partition = partitions.next()) + { + partition.forEachRemaining(u -> {}); + } + } + } + } + + @SuppressWarnings("unchecked") + private static <T> ByteBuffer decompose(AbstractType<?> type, T value) + { + return ((AbstractType<T>) type).decompose(value); + } + + public static Row makeRow(CFMetaData metadata, Object... clusteringValues) + { + ByteBuffer[] clusteringByteBuffers = new ByteBuffer[clusteringValues.length]; + for (int i = 0; i < clusteringValues.length; i++) + clusteringByteBuffers[i] = decompose(metadata.clusteringColumns().get(i).type, clusteringValues[i]); + - return BTreeRow.noCellLiveRow(new Clustering(clusteringByteBuffers), LivenessInfo.create(metadata, 0, 0)); ++ return BTreeRow.noCellLiveRow(Clustering.make(clusteringByteBuffers), LivenessInfo.create(0, 0)); ++ } ++ ++ public static UnfilteredRowIterator rows(CFMetaData metadata, ++ DecoratedKey key, ++ boolean isReversedOrder, ++ Unfiltered... unfiltereds) ++ { ++ Iterator<Unfiltered> iterator = Iterators.forArray(unfiltereds); ++ return new AbstractUnfilteredRowIterator(metadata, ++ key, ++ DeletionTime.LIVE, ++ metadata.partitionColumns(), ++ Rows.EMPTY_STATIC_ROW, ++ isReversedOrder, ++ EncodingStats.NO_STATS) ++ { ++ protected Unfiltered computeNext() ++ { ++ return iterator.hasNext() ? iterator.next() : endOfData(); ++ } ++ }; + } + + private static PartitionIterator applyChecker(UnfilteredPartitionIterator unfiltered) + { + int nowInSecs = 0; + return DuplicateRowChecker.duringRead(FilteredPartitions.filter(unfiltered, nowInSecs), + Collections.singletonList(FBUtilities.getBroadcastAddress())); + } + + public static UnfilteredPartitionIterator iter(CFMetaData metadata, boolean isReversedOrder, Unfiltered... unfiltereds) + { + DecoratedKey key = metadata.partitioner.decorateKey(bytes("key")); - Iterator<Unfiltered> iterator = Iterators.forArray(unfiltereds); - - UnfilteredRowIterator rowIter = new AbstractUnfilteredRowIterator(metadata, - key, - DeletionTime.LIVE, - metadata.partitionColumns(), - Rows.EMPTY_STATIC_ROW, - isReversedOrder, - EncodingStats.NO_STATS) - { - protected Unfiltered computeNext() - { - return iterator.hasNext() ? iterator.next() : endOfData(); - } - }; - ++ UnfilteredRowIterator rowIter = rows(metadata, key, isReversedOrder, unfiltereds); + return new SingletonUnfilteredPartitionIterator(rowIter, false); + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org