This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit e1a0db798aedc6fcbb02e3076d545581bad28b0e
Merge: 406a859 4d42c18
Author: Marcus Eriksson <marc...@apache.org>
AuthorDate: Wed May 20 08:44:06 2020 +0200

    Merge branch 'cassandra-3.0' into cassandra-3.11

 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/config/Config.java   |  18 ++
 .../cassandra/config/DatabaseDescriptor.java       |  31 +++
 src/java/org/apache/cassandra/db/LegacyLayout.java |  18 +-
 .../db/compaction/CompactionIterator.java          |   5 +-
 .../db/partitions/AbstractBTreePartition.java      |   5 +-
 .../db/partitions/ImmutableBTreePartition.java     |   2 +-
 .../cassandra/db/partitions/PartitionUpdate.java   |  26 ++-
 .../db/transform/DuplicateRowChecker.java          | 139 ++++++++++++
 .../org/apache/cassandra/service/ReadCallback.java |   4 +-
 .../cassandra/service/SnapshotVerbHandler.java     |   5 +
 .../org/apache/cassandra/service/StorageProxy.java |  54 +++++
 .../cassandra/service/StorageProxyMBean.java       |  13 ++
 .../cassandra/utils/DiagnosticSnapshotService.java | 188 ++++++++++++++++
 .../cassandra/distributed/impl/Instance.java       |   4 +-
 .../upgrade/MixedModeReadRepairTest.java           |  85 +++++++
 .../distributed/upgrade/UpgradeTestBase.java       |   3 +-
 .../org/apache/cassandra/db/LegacyLayoutTest.java  |  39 +++-
 .../db/compaction/CompactionIteratorTest.java      |  80 ++++++-
 .../db/partition/PartitionUpdateTest.java          | 144 ++++++++++++
 .../db/transform/DuplicateRowCheckerTest.java      | 246 +++++++++++++++++++++
 21 files changed, 1092 insertions(+), 18 deletions(-)

diff --cc CHANGES.txt
index 46625b3,b875ae1..3506589
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,7 -1,5 +1,8 @@@
 -3.0.21
 +3.11.7
 + * Fix CQL formatting of read command restrictions for slow query log 
(CASSANDRA-15503)
 + * Allow sstableloader to use SSL on the native port (CASSANDRA-14904)
 +Merged from 3.0:
+  * Avoid creating duplicate rows during major upgrades (CASSANDRA-15789)
   * liveDiskSpaceUsed and totalDiskSpaceUsed get corrupted if 
IndexSummaryRedistribution gets interrupted (CASSANDRA-15674)
   * Fix Debian init start/stop (CASSANDRA-15770)
   * Fix infinite loop on index query paging in tables with clustering 
(CASSANDRA-14242)
diff --cc src/java/org/apache/cassandra/config/Config.java
index 7f28546,6003bd1..322f1f5
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@@ -397,12 -362,29 +397,30 @@@ public class Confi
      }
  
      /**
+      * If true, when rows with duplicate clustering keys are detected during 
a read or compaction
+      * a snapshot will be taken. In the read case, each a snapshot request 
will be issued to each
+      * replica involved in the query, for compaction the snapshot will be 
created locally.
+      * These are limited at the replica level so that only a single snapshot 
per-day can be taken
+      * via this method.
+      *
+      * This requires check_for_duplicate_rows_during_reads and/or 
check_for_duplicate_rows_during_compaction
+      * below to be enabled
+      */
+     public volatile boolean snapshot_on_duplicate_row_detection = false;
 -
+     /**
+      * If these are enabled duplicate keys will get logged, and if 
snapshot_on_duplicate_row_detection
+      * is enabled, the table will get snapshotted for offline investigation
+      */
+     public volatile boolean check_for_duplicate_rows_during_reads = true;
+     public volatile boolean check_for_duplicate_rows_during_compaction = true;
+ 
 -    public static boolean isClientMode()
 -    {
 -        return isClientMode;
 -    }
 -
++    /**
 +     * Client mode means that the process is a pure client, that uses C* code 
base but does
 +     * not read or write local C* database files.
 +     *
 +     * @deprecated migrate to {@link 
DatabaseDescriptor#clientInitialization(boolean)}
 +     */
 +    @Deprecated
      public static void setClientMode(boolean clientMode)
      {
          isClientMode = clientMode;
diff --cc src/java/org/apache/cassandra/db/LegacyLayout.java
index 09f9cfa,37cc935..4ec0c30
--- a/src/java/org/apache/cassandra/db/LegacyLayout.java
+++ b/src/java/org/apache/cassandra/db/LegacyLayout.java
@@@ -28,8 -28,9 +28,10 @@@ import java.util.stream.Collectors
  
  import org.apache.cassandra.cql3.ColumnIdentifier;
  import org.apache.cassandra.cql3.SuperColumnCompatibility;
 +import org.apache.cassandra.config.SchemaConstants;
  import org.apache.cassandra.utils.AbstractIterator;
+ 
+ import com.google.common.annotations.VisibleForTesting;
  import com.google.common.collect.Iterators;
  import com.google.common.collect.Lists;
  import com.google.common.collect.PeekingIterator;
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index 9aba938,b132d90..4460d4d
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@@ -17,14 -17,17 +17,16 @@@
   */
  package org.apache.cassandra.db.compaction;
  
 -import java.util.List;
 -import java.util.UUID;
 +import java.util.*;
  import java.util.function.Predicate;
  
 -import org.slf4j.Logger;
 -import org.slf4j.LoggerFactory;
 +import com.google.common.collect.Ordering;
  
  import org.apache.cassandra.config.CFMetaData;
+ 
+ import org.apache.cassandra.db.transform.DuplicateRowChecker;
  import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.filter.ColumnFilter;
  import org.apache.cassandra.db.partitions.PurgeFunction;
  import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
  import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
@@@ -104,8 -106,8 +106,9 @@@ public class CompactionIterator extend
                                               ? 
EmptyIterators.unfilteredPartition(controller.cfs.metadata, false)
                                               : 
UnfilteredPartitionIterators.merge(scanners, nowInSec, listener());
          boolean isForThrift = merged.isForThrift(); // to stop capture of 
iterator in Purger, which is confusing for debug
 +        merged = Transformation.apply(merged, new GarbageSkipper(controller, 
nowInSec));
-         this.compacted = Transformation.apply(merged, new Purger(isForThrift, 
controller, nowInSec));
+         merged = Transformation.apply(merged, new Purger(isForThrift, 
controller, nowInSec));
+         this.compacted = DuplicateRowChecker.duringCompaction(merged, type);
      }
  
      public boolean isForThrift()
diff --cc 
src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
index befbfbb,2cd9e97..34d6d46
--- a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
@@@ -271,12 -284,46 +271,12 @@@ public abstract class AbstractBTreePart
          }
      }
  
 -    public class SliceableIterator extends AbstractIterator implements 
SliceableUnfilteredRowIterator
 -    {
 -        private Iterator<Unfiltered> iterator;
 -
 -        protected SliceableIterator(ColumnFilter selection, boolean 
isReversed)
 -        {
 -            super(selection, isReversed);
 -        }
 -
 -        protected Unfiltered computeNext()
 -        {
 -            if (iterator == null)
 -                iterator = unfilteredIterator(selection, Slices.ALL, 
isReverseOrder);
 -            if (!iterator.hasNext())
 -                return endOfData();
 -            return iterator.next();
 -        }
 -
 -        public Iterator<Unfiltered> slice(Slice slice)
 -        {
 -            return sliceIterator(selection, slice, isReverseOrder, current, 
staticRow);
 -        }
 -    }
 -
 -    public SliceableUnfilteredRowIterator 
sliceableUnfilteredIterator(ColumnFilter columns, boolean reversed)
 -    {
 -        return new SliceableIterator(columns, reversed);
 -    }
 -
 -    protected SliceableUnfilteredRowIterator sliceableUnfilteredIterator()
 -    {
 -        return sliceableUnfilteredIterator(ColumnFilter.all(metadata), false);
 -    }
 -
      protected static Holder build(UnfilteredRowIterator iterator, int 
initialRowCapacity)
      {
-         return build(iterator, initialRowCapacity, true);
+         return build(iterator, initialRowCapacity, true, null);
      }
  
-     protected static Holder build(UnfilteredRowIterator iterator, int 
initialRowCapacity, boolean ordered)
+     protected static Holder build(UnfilteredRowIterator iterator, int 
initialRowCapacity, boolean ordered, BTree.Builder.QuickResolver<Row> 
quickResolver)
      {
          CFMetaData metadata = iterator.metadata();
          PartitionColumns columns = iterator.columns();
diff --cc src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index 3409079,3560e90..4aca6d2
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@@ -217,10 -214,28 +219,30 @@@ public class PartitionUpdate extends Ab
       * Warning: this method does not close the provided iterator, it is up to
       * the caller to close it.
       */
 -    public static PartitionUpdate fromIterator(UnfilteredRowIterator iterator)
 +    public static PartitionUpdate fromIterator(UnfilteredRowIterator 
iterator, ColumnFilter filter)
      {
 -        return fromIterator(iterator, true,  null);
++
++        return fromIterator(iterator, filter, true,  null);
+     }
+ 
+     private static final NoSpamLogger rowMergingLogger = 
NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES);
+     /**
+      * Removes duplicate rows from incoming iterator, to be used when we 
can't trust the underlying iterator (like when reading legacy sstables)
+      */
 -    public static PartitionUpdate fromPre30Iterator(UnfilteredRowIterator 
iterator)
++    public static PartitionUpdate fromPre30Iterator(UnfilteredRowIterator 
iterator, ColumnFilter filter)
+     {
 -        return fromIterator(iterator, false, (a, b) -> {
++        return fromIterator(iterator, filter, false, (a, b) -> {
+             CFMetaData cfm = iterator.metadata();
+             rowMergingLogger.warn(String.format("Merging rows from pre 3.0 
iterator for partition key: %s",
+                                                 
cfm.getKeyValidator().getString(iterator.partitionKey().getKey())));
+             return Rows.merge(a, b, FBUtilities.nowInSeconds());
+         });
+     }
+ 
 -    private static PartitionUpdate fromIterator(UnfilteredRowIterator 
iterator, boolean ordered, BTree.Builder.QuickResolver<Row> quickResolver)
++    private static PartitionUpdate fromIterator(UnfilteredRowIterator 
iterator, ColumnFilter filter, boolean ordered, 
BTree.Builder.QuickResolver<Row> quickResolver)
+     {
 +        iterator = UnfilteredRowIterators.withOnlyQueriedData(iterator, 
filter);
-         Holder holder = build(iterator, 16);
+         Holder holder = build(iterator, 16, ordered, quickResolver);
          MutableDeletionInfo deletionInfo = (MutableDeletionInfo) 
holder.deletionInfo;
          return new PartitionUpdate(iterator.metadata(), 
iterator.partitionKey(), holder, deletionInfo, false);
      }
@@@ -911,7 -767,7 +933,7 @@@
              try (UnfilteredRowIterator iterator = 
LegacyLayout.deserializeLegacyPartition(in, version, flag, key))
              {
                  assert iterator != null; // This is only used in mutation, 
and mutation have never allowed "null" column families
-                 return PartitionUpdate.fromIterator(iterator, 
ColumnFilter.all(iterator.metadata()));
 -                return PartitionUpdate.fromPre30Iterator(iterator);
++                return PartitionUpdate.fromPre30Iterator(iterator, 
ColumnFilter.all(iterator.metadata()));
              }
          }
  
diff --cc src/java/org/apache/cassandra/service/ReadCallback.java
index 3ef2fac,71eb0bc..b312852
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@@ -34,7 -32,10 +34,8 @@@ import org.apache.cassandra.concurrent.
  import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.db.*;
  import org.apache.cassandra.db.partitions.PartitionIterator;
 -import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 -import org.apache.cassandra.db.rows.*;
 +import org.apache.cassandra.exceptions.RequestFailureReason;
+ import org.apache.cassandra.db.transform.DuplicateRowChecker;
 -import org.apache.cassandra.db.transform.Transformation;
  import org.apache.cassandra.exceptions.ReadFailureException;
  import org.apache.cassandra.exceptions.ReadTimeoutException;
  import org.apache.cassandra.exceptions.UnavailableException;
@@@ -144,8 -143,8 +146,8 @@@ public class ReadCallback implements IA
  
          PartitionIterator result = blockfor == 1 ? resolver.getData() : 
resolver.resolve();
          if (logger.isTraceEnabled())
 -            logger.trace("Read: {} ms.", 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
 +            logger.trace("Read: {} ms.", 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - queryStartNanoTime));
-         return result;
+         return DuplicateRowChecker.duringRead(result, endpoints);
      }
  
      public int blockFor()
diff --cc src/java/org/apache/cassandra/service/StorageProxyMBean.java
index 8678dde,047934c..cdf07f4
--- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
@@@ -65,5 -67,14 +67,16 @@@ public interface StorageProxyMBea
      /** Returns each live node's schema version */
      public Map<String, List<String>> getSchemaVersions();
  
 +    public int getNumberOfTables();
++
+     void enableSnapshotOnDuplicateRowDetection();
+     void disableSnapshotOnDuplicateRowDetection();
+     boolean getSnapshotOnDuplicateRowDetectionEnabled();
+ 
+     boolean getCheckForDuplicateRowsDuringReads();
+     void enableCheckForDuplicateRowsDuringReads();
+     void disableCheckForDuplicateRowsDuringReads();
+     boolean getCheckForDuplicateRowsDuringCompaction();
+     void enableCheckForDuplicateRowsDuringCompaction();
+     void disableCheckForDuplicateRowsDuringCompaction();
  }
diff --cc test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 47ad405,d23eec0..5bb449e
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@@ -99,8 -99,11 +99,9 @@@ import org.apache.cassandra.tools.NodeT
  import org.apache.cassandra.tracing.TraceState;
  import org.apache.cassandra.tracing.Tracing;
  import org.apache.cassandra.transport.messages.ResultMessage;
+ import org.apache.cassandra.utils.DiagnosticSnapshotService;
  import org.apache.cassandra.utils.ExecutorUtils;
  import org.apache.cassandra.utils.FBUtilities;
 -import org.apache.cassandra.utils.NanoTimeToCurrentTimeMillis;
 -import org.apache.cassandra.utils.Pair;
  import org.apache.cassandra.utils.Throwables;
  import org.apache.cassandra.utils.UUIDGen;
  import org.apache.cassandra.utils.concurrent.Ref;
@@@ -649,9 -698,11 +650,10 @@@ public class Instance extends IsolatedE
                                  () -> 
ColumnFamilyStore.shutdownExecutorsAndWait(1L, MINUTES),
                                  () -> 
PendingRangeCalculatorService.instance.shutdownExecutor(1L, MINUTES),
                                  () -> BufferPool.shutdownLocalCleaner(1L, 
MINUTES),
 -                                () -> 
StorageService.instance.shutdownBGMonitorAndWait(1L, MINUTES),
                                  () -> Ref.shutdownReferenceReaper(1L, 
MINUTES),
                                  () -> 
Memtable.MEMORY_POOL.shutdownAndWait(1L, MINUTES),
-                                 () -> SSTableReader.shutdownBlocking(1L, 
MINUTES)
+                                 () -> SSTableReader.shutdownBlocking(1L, 
MINUTES),
+                                 () -> 
DiagnosticSnapshotService.instance.shutdownAndWait(1L, MINUTES)
              );
              error = parallelRun(error, executor,
                                  () -> ScheduledExecutors.shutdownAndWait(1L, 
MINUTES),
diff --cc test/unit/org/apache/cassandra/db/LegacyLayoutTest.java
index fc7c4c4,0bb2459..1bc3af6
--- a/test/unit/org/apache/cassandra/db/LegacyLayoutTest.java
+++ b/test/unit/org/apache/cassandra/db/LegacyLayoutTest.java
@@@ -24,8 -24,9 +24,10 @@@ import java.nio.file.Files
  import java.nio.file.Path;
  import java.nio.file.Paths;
  
 +import org.junit.AfterClass;
  import org.apache.cassandra.db.filter.ColumnFilter;
+ import org.apache.cassandra.db.marshal.MapType;
+ import org.apache.cassandra.db.marshal.UTF8Type;
  import org.apache.cassandra.db.partitions.ImmutableBTreePartition;
  import org.apache.cassandra.db.rows.BufferCell;
  import org.apache.cassandra.db.rows.Cell;
@@@ -382,4 -372,39 +384,39 @@@ public class LegacyLayoutTes
          LegacyLayout.fromUnfilteredRowIterator(null, p.unfilteredIterator());
          LegacyLayout.serializedSizeAsLegacyPartition(null, 
p.unfilteredIterator(), VERSION_21);
      }
- }
+ 
+     @Test
+     public void testCellGrouper()
+     {
+         // CREATE TABLE %s (pk int, ck int, v map<text, text>, PRIMARY KEY 
(pk, ck))
+         CFMetaData cfm = CFMetaData.Builder.create("ks", "table")
+                                            .addPartitionKey("pk", 
Int32Type.instance)
+                                            .addClusteringColumn("ck", 
Int32Type.instance)
+                                            .addRegularColumn("v", 
MapType.getInstance(UTF8Type.instance, UTF8Type.instance, true))
+                                            .build();
+         SerializationHelper helper = new SerializationHelper(cfm, 
MessagingService.VERSION_22, SerializationHelper.Flag.LOCAL, 
ColumnFilter.all(cfm));
+         LegacyLayout.CellGrouper cg = new LegacyLayout.CellGrouper(cfm, 
helper);
+ 
 -        Slice.Bound startBound = 
Slice.Bound.create(ClusteringPrefix.Kind.INCL_START_BOUND, new ByteBuffer[] 
{ByteBufferUtil.bytes(2)});
 -        Slice.Bound endBound = 
Slice.Bound.create(ClusteringPrefix.Kind.EXCL_END_BOUND, new ByteBuffer[] 
{ByteBufferUtil.bytes(2)});
++        ClusteringBound startBound = 
ClusteringBound.create(ClusteringPrefix.Kind.INCL_START_BOUND, new ByteBuffer[] 
{ByteBufferUtil.bytes(2)});
++        ClusteringBound endBound = 
ClusteringBound.create(ClusteringPrefix.Kind.EXCL_END_BOUND, new ByteBuffer[] 
{ByteBufferUtil.bytes(2)});
+         LegacyLayout.LegacyBound start = new 
LegacyLayout.LegacyBound(startBound, false, 
cfm.getColumnDefinition(ByteBufferUtil.bytes("v")));
+         LegacyLayout.LegacyBound end = new LegacyLayout.LegacyBound(endBound, 
false, cfm.getColumnDefinition(ByteBufferUtil.bytes("v")));
+         LegacyLayout.LegacyRangeTombstone lrt = new 
LegacyLayout.LegacyRangeTombstone(start, end, new DeletionTime(2, 1588598040));
+         assertTrue(cg.addAtom(lrt));
+ 
+         // add a real cell
+         LegacyLayout.LegacyCell cell = new 
LegacyLayout.LegacyCell(LegacyLayout.LegacyCell.Kind.REGULAR,
 -                                                                   new 
LegacyLayout.LegacyCellName(new Clustering(ByteBufferUtil.bytes(2)),
++                                                                   new 
LegacyLayout.LegacyCellName(Clustering.make(ByteBufferUtil.bytes(2)),
+                                                                               
                     cfm.getColumnDefinition(ByteBufferUtil.bytes("v")),
+                                                                               
                     ByteBufferUtil.bytes("g")),
+                                                                    
ByteBufferUtil.bytes("v"), 3, Integer.MAX_VALUE, 0);
+         assertTrue(cg.addAtom(cell));
+ 
+         // add legacy range tombstone where collection name is null for the 
end bound (this gets translated to a row tombstone)
 -        startBound = 
Slice.Bound.create(ClusteringPrefix.Kind.EXCL_START_BOUND, new ByteBuffer[] 
{ByteBufferUtil.bytes(2)});
 -        endBound = Slice.Bound.create(ClusteringPrefix.Kind.EXCL_END_BOUND, 
new ByteBuffer[] {ByteBufferUtil.bytes(2)});
++        startBound = 
ClusteringBound.create(ClusteringPrefix.Kind.EXCL_START_BOUND, new ByteBuffer[] 
{ByteBufferUtil.bytes(2)});
++        endBound = 
ClusteringBound.create(ClusteringPrefix.Kind.EXCL_END_BOUND, new ByteBuffer[] 
{ByteBufferUtil.bytes(2)});
+         start = new LegacyLayout.LegacyBound(startBound, false, 
cfm.getColumnDefinition(ByteBufferUtil.bytes("v")));
+         end = new LegacyLayout.LegacyBound(endBound, false, null);
+         assertTrue(cg.addAtom(new LegacyLayout.LegacyRangeTombstone(start, 
end, new DeletionTime(1, 1588598040))));
+     }
+ }
diff --cc 
test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java
index dc5fd06,549a94d..58c5a00
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java
@@@ -17,379 -17,118 +17,455 @@@
   */
  package org.apache.cassandra.db.compaction;
  
+ import static 
org.apache.cassandra.db.transform.DuplicateRowCheckerTest.assertCommandIssued;
 -import static org.apache.cassandra.db.transform.DuplicateRowCheckerTest.iter;
+ import static 
org.apache.cassandra.db.transform.DuplicateRowCheckerTest.makeRow;
++import static org.apache.cassandra.db.transform.DuplicateRowCheckerTest.rows;
  import static org.junit.Assert.*;
  
+ import java.net.InetAddress;
  import java.util.*;
 +import java.util.regex.Matcher;
 +import java.util.regex.Pattern;
 +
 +import com.google.common.collect.*;
  
  import org.junit.Test;
  
 +import org.apache.cassandra.SchemaLoader;
 +import org.apache.cassandra.Util;
  import org.apache.cassandra.config.CFMetaData;
  import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.cql3.CQLTester;
 -import org.apache.cassandra.db.*;
 -import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.DeletionTime;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.marshal.Int32Type;
 +import org.apache.cassandra.db.marshal.UTF8Type;
 +import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator;
  import org.apache.cassandra.db.rows.*;
  import org.apache.cassandra.io.sstable.ISSTableScanner;
- import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 -import org.apache.cassandra.io.sstable.format.SSTableReader;
 -import org.apache.cassandra.net.*;
++import org.apache.cassandra.net.IMessageSink;
++import org.apache.cassandra.net.MessageIn;
++import org.apache.cassandra.net.MessageOut;
++import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.schema.KeyspaceParams;
++import org.apache.cassandra.utils.ByteBufferUtil;
+ import org.apache.cassandra.utils.FBUtilities;
  
- public class CompactionIteratorTest
+ public class CompactionIteratorTest extends CQLTester
  {
 +
 +    private static final int NOW = 1000;
 +    private static final int GC_BEFORE = 100;
 +    private static final String KSNAME = "CompactionIteratorTest";
 +    private static final String CFNAME = "Integer1";
 +
 +    static final DecoratedKey kk;
 +    static final CFMetaData metadata;
 +    private static final int RANGE = 1000;
 +    private static final int COUNT = 100;
 +
 +    Map<List<Unfiltered>, DeletionTime> deletionTimes = new HashMap<>();
 +
 +    static {
 +        DatabaseDescriptor.daemonInitialization();
 +
 +        kk = Util.dk("key");
 +
 +        SchemaLoader.prepareServer();
 +        SchemaLoader.createKeyspace(KSNAME,
 +                                    KeyspaceParams.simple(1),
 +                                    metadata = 
SchemaLoader.standardCFMD(KSNAME,
 +                                                                         
CFNAME,
 +                                                                         1,
 +                                                                         
UTF8Type.instance,
 +                                                                         
Int32Type.instance,
 +                                                                         
Int32Type.instance));
 +    }
 +
 +    // See org.apache.cassandra.db.rows.UnfilteredRowsGenerator.parse for the 
syntax used in these tests.
 +
 +    @Test
 +    public void testGcCompactionSupersedeLeft()
 +    {
 +        testCompaction(new String[] {
 +            "5<=[140] 10[150] [140]<20 22<[130] [130]<25 30[150]"
 +        }, new String[] {
 +            "7<[160] 15[180] [160]<30 40[120]"
 +        },
 +        3);
 +    }
 +
 +    @Test
 +    public void testGcCompactionSupersedeMiddle()
 +    {
 +        testCompaction(new String[] {
 +            "5<=[140] 10[150] [140]<40 60[150]"
 +        }, new String[] {
 +            "7<=[160] 15[180] [160]<=30 40[120]"
 +        },
 +        3);
 +    }
 +
 +    @Test
 +    public void testGcCompactionSupersedeRight()
 +    {
 +        testCompaction(new String[] {
 +            "9<=[140] 10[150] [140]<40 60[150]"
 +        }, new String[] {
 +            "7<[160] 15[180] [160]<30 40[120]"
 +        },
 +        3);
 +    }
 +
 +    @Test
 +    public void testGcCompactionSwitchInSuperseded()
 +    {
 +        testCompaction(new String[] {
 +            "5<=[140] 10[150] [140]<20 20<=[170] [170]<=50 60[150]"
 +        }, new String[] {
 +            "7<[160] 15[180] [160]<30 40[120]"
 +        },
 +        5);
 +    }
 +
 +    @Test
 +    public void testGcCompactionBoundaries()
 +    {
 +        testCompaction(new String[] {
 +            "5<=[120] [120]<9 9<=[140] 10[150] [140]<40 40<=[120] 60[150] 
[120]<90"
 +        }, new String[] {
 +            "7<[160] 15[180] [160]<30 40[120] 45<[140] [140]<80 88<=[130] 
[130]<100"
 +        },
 +        7);
 +    }
 +
 +    @Test
 +    public void testGcCompactionMatches()
 +    {
 +        testCompaction(new String[] {
 +            "5<=[120] [120]<=9 9<[140] 10[150] [140]<40 40<=[120] 60[150] 
[120]<90 120<=[100] [100]<130"
 +        }, new String[] {
 +            "9<[160] 15[180] [160]<40 40[120] 45<[140] [140]<90 90<=[110] 
[110]<100 120<=[100] [100]<130"
 +        },
 +        5);
 +    }
 +
 +    @Test
 +    public void testGcCompactionRowDeletion()
 +    {
 +        testCompaction(new String[] {
 +            "10[150] 20[160] 25[160] 30[170] 40[120] 50[120]"
 +        }, new String[] {
 +            "10<=[155] 20[200D180] 30[200D160] [155]<=30 40[150D130] 
50[150D100]"
 +        },
 +        "25[160] 30[170] 50[120]");
 +    }
 +
 +    @Test
 +    public void testGcCompactionPartitionDeletion()
 +    {
 +        testCompaction(new String[] {
 +            "10[150] 20[160] 25[160] 30[170] 40[120] 50[120]"
 +        }, new String[] {
 +            // Dxx| stands for partition deletion at time xx
 +            "D165|10<=[155] 20[200D180] 30[200D160] [155]<=30 40[150D130] 
50[150D100]"
 +        },
 +        "30[170]");
 +    }
 +
 +    void testCompaction(String[] inputs, String[] tombstones, String expected)
 +    {
 +        testNonGcCompaction(inputs, tombstones);
 +
 +        UnfilteredRowsGenerator generator = new 
UnfilteredRowsGenerator(metadata.comparator, false);
 +        List<List<Unfiltered>> inputLists = parse(inputs, generator);
 +        List<List<Unfiltered>> tombstoneLists = parse(tombstones, generator);
 +        List<Unfiltered> result = compact(inputLists, tombstoneLists);
 +        System.out.println("GC compaction resulted in " + size(result) + " 
Unfiltereds");
 +        generator.verifyValid(result);
 +        verifyEquivalent(inputLists, result, tombstoneLists, generator);
 +        List<Unfiltered> expectedResult = generator.parse(expected, NOW - 1);
 +        if (!expectedResult.equals(result))
 +            fail("Expected " + expected + ", got " + generator.str(result));
 +    }
 +
 +    void testCompaction(String[] inputs, String[] tombstones, int 
expectedCount)
 +    {
 +        testNonGcCompaction(inputs, tombstones);
 +
 +        UnfilteredRowsGenerator generator = new 
UnfilteredRowsGenerator(metadata.comparator, false);
 +        List<List<Unfiltered>> inputLists = parse(inputs, generator);
 +        List<List<Unfiltered>> tombstoneLists = parse(tombstones, generator);
 +        List<Unfiltered> result = compact(inputLists, tombstoneLists);
 +        System.out.println("GC compaction resulted in " + size(result) + " 
Unfiltereds");
 +        generator.verifyValid(result);
 +        verifyEquivalent(inputLists, result, tombstoneLists, generator);
 +        if (size(result) > expectedCount)
 +            fail("Expected compaction with " + expectedCount + " elements, 
got " + size(result) + ": " + generator.str(result));
 +    }
 +
 +    int testNonGcCompaction(String[] inputs, String[] tombstones)
 +    {
 +        UnfilteredRowsGenerator generator = new 
UnfilteredRowsGenerator(metadata.comparator, false);
 +        List<List<Unfiltered>> inputLists = parse(inputs, generator);
 +        List<List<Unfiltered>> tombstoneLists = parse(tombstones, generator);
 +        List<Unfiltered> result = compact(inputLists, 
Collections.emptyList());
 +        System.out.println("Non-GC compaction resulted in " + size(result) + 
" Unfiltereds");
 +        generator.verifyValid(result);
 +        verifyEquivalent(inputLists, result, tombstoneLists, generator);
 +        return size(result);
 +    }
 +
 +    private static int size(List<Unfiltered> data)
 +    {
 +        return data.stream().mapToInt(x -> x instanceof 
RangeTombstoneBoundaryMarker ? 2 : 1).sum();
 +    }
 +
 +    private void verifyEquivalent(List<List<Unfiltered>> sources, 
List<Unfiltered> result, List<List<Unfiltered>> tombstoneSources, 
UnfilteredRowsGenerator generator)
 +    {
 +        // sources + tombstoneSources must be the same as result + 
tombstoneSources
 +        List<Unfiltered> expected = compact(Iterables.concat(sources, 
tombstoneSources), Collections.emptyList());
 +        List<Unfiltered> actual = 
compact(Iterables.concat(ImmutableList.of(result), tombstoneSources), 
Collections.emptyList());
 +        if (!expected.equals(actual))
 +        {
 +            System.out.println("Equivalence test failure between sources:");
 +            for (List<Unfiltered> partition : sources)
 +                generator.dumpList(partition);
 +            System.out.println("and compacted " + generator.str(result));
 +            System.out.println("with tombstone sources:");
 +            for (List<Unfiltered> partition : tombstoneSources)
 +                generator.dumpList(partition);
 +            System.out.println("expected " + generator.str(expected));
 +            System.out.println("got " + generator.str(actual));
 +            fail("Failed equivalence test.");
 +        }
 +    }
 +
 +    private List<List<Unfiltered>> parse(String[] inputs, 
UnfilteredRowsGenerator generator)
 +    {
 +        return ImmutableList.copyOf(Lists.transform(Arrays.asList(inputs), x 
-> parse(x, generator)));
 +    }
 +
 +    private List<Unfiltered> parse(String input, UnfilteredRowsGenerator 
generator)
 +    {
 +        Matcher m = Pattern.compile("D(\\d+)\\|").matcher(input);
 +        if (m.lookingAt())
 +        {
 +            int del = Integer.parseInt(m.group(1));
 +            input = input.substring(m.end());
 +            List<Unfiltered> list = generator.parse(input, NOW - 1);
 +            deletionTimes.put(list, new DeletionTime(del, del));
 +            return list;
 +        }
 +        else
 +            return generator.parse(input, NOW - 1);
 +    }
 +
 +    private List<Unfiltered> compact(Iterable<List<Unfiltered>> sources, 
Iterable<List<Unfiltered>> tombstoneSources)
 +    {
 +        List<Iterable<UnfilteredRowIterator>> content = 
ImmutableList.copyOf(Iterables.transform(sources, list -> 
ImmutableList.of(listToIterator(list, kk))));
 +        Map<DecoratedKey, Iterable<UnfilteredRowIterator>> transformedSources 
= new TreeMap<>();
 +        transformedSources.put(kk, Iterables.transform(tombstoneSources, list 
-> listToIterator(list, kk)));
 +        try (CompactionController controller = new 
Controller(Keyspace.openAndGetStore(metadata), transformedSources, GC_BEFORE);
 +             CompactionIterator iter = new 
CompactionIterator(OperationType.COMPACTION,
 +                                                              
Lists.transform(content, x -> new Scanner(x)),
 +                                                              controller, 
NOW, null))
 +        {
 +            List<Unfiltered> result = new ArrayList<>();
 +            assertTrue(iter.hasNext());
 +            try (UnfilteredRowIterator partition = iter.next())
 +            {
 +                Iterators.addAll(result, partition);
 +            }
 +            assertFalse(iter.hasNext());
 +            return result;
 +        }
 +    }
 +
 +    private UnfilteredRowIterator listToIterator(List<Unfiltered> list, 
DecoratedKey key)
 +    {
 +        return UnfilteredRowsGenerator.source(list, metadata, key, 
deletionTimes.getOrDefault(list, DeletionTime.LIVE));
 +    }
 +
 +    NavigableMap<DecoratedKey, List<Unfiltered>> generateContent(Random rand, 
UnfilteredRowsGenerator generator,
 +                                                                 
List<DecoratedKey> keys, int pcount, int rcount)
 +    {
 +        NavigableMap<DecoratedKey, List<Unfiltered>> map = new TreeMap<>();
 +        for (int i = 0; i < pcount; ++i)
 +        {
 +            DecoratedKey key = keys.get(rand.nextInt(keys.size()));
 +            map.put(key, generator.generateSource(rand, rcount, RANGE, NOW - 
5, x -> NOW - 1));
 +        }
 +        return map;
 +    }
 +
 +    @Test
 +    public void testRandom()
 +    {
 +        UnfilteredRowsGenerator generator = new 
UnfilteredRowsGenerator(metadata.comparator, false);
 +        for (int seed = 1; seed < 100; ++seed)
 +        {
 +            Random rand = new Random(seed);
 +            List<List<Unfiltered>> sources = new ArrayList<>();
 +            for (int i = 0; i < 10; ++i)
 +                sources.add(generator.generateSource(rand, COUNT, RANGE, NOW 
- 5, x -> NOW - 15));
 +            int srcSz = 
sources.stream().mapToInt(CompactionIteratorTest::size).sum();
 +            List<List<Unfiltered>> tombSources = new ArrayList<>();
 +            for (int i = 0; i < 10; ++i)
 +                sources.add(generator.generateSource(rand, COUNT, RANGE, NOW 
- 5, x -> NOW - 15));
 +            List<Unfiltered> result = compact(sources, tombSources);
 +            verifyEquivalent(sources, result, tombSources, generator);
 +            assertTrue(size(result) < srcSz);
 +        }
 +    }
 +
 +    class Controller extends CompactionController
 +    {
 +        private final Map<DecoratedKey, Iterable<UnfilteredRowIterator>> 
tombstoneSources;
 +
 +        public Controller(ColumnFamilyStore cfs, Map<DecoratedKey, 
Iterable<UnfilteredRowIterator>> tombstoneSources, int gcBefore)
 +        {
 +            super(cfs, Collections.emptySet(), gcBefore);
 +            this.tombstoneSources = tombstoneSources;
 +        }
 +
 +        @Override
 +        public Iterable<UnfilteredRowIterator> shadowSources(DecoratedKey 
key, boolean tombstoneOnly)
 +        {
 +            assert tombstoneOnly;
 +            return tombstoneSources.get(key);
 +        }
 +    }
 +
 +    class Scanner extends AbstractUnfilteredPartitionIterator implements 
ISSTableScanner
 +    {
 +        Iterator<UnfilteredRowIterator> iter;
 +
 +        Scanner(Iterable<UnfilteredRowIterator> content)
 +        {
 +            iter = content.iterator();
 +        }
 +
 +        @Override
 +        public boolean isForThrift()
 +        {
 +            return false;
 +        }
 +
 +        @Override
 +        public CFMetaData metadata()
 +        {
 +            return metadata;
 +        }
 +
 +        @Override
 +        public boolean hasNext()
 +        {
 +            return iter.hasNext();
 +        }
 +
 +        @Override
 +        public UnfilteredRowIterator next()
 +        {
 +            return iter.next();
 +        }
 +
 +        @Override
 +        public long getLengthInBytes()
 +        {
 +            return 0;
 +        }
 +
 +        @Override
 +        public long getCurrentPosition()
 +        {
 +            return 0;
 +        }
 +
 +        @Override
 +        public long getBytesScanned()
 +        {
 +            return 0;
 +        }
 +
 +        @Override
 +        public long getCompressedLengthInBytes()
 +        {
 +            return 0;
 +        }
 +
 +        @Override
 +        public String getBackingFiles()
 +        {
 +            return null;
 +        }
 +    }
++
+     @Test
+     public void duplicateRowsTest() throws Throwable
+     {
+         System.setProperty("cassandra.diagnostic_snapshot_interval_nanos", 
"0");
+         // Create a table and insert some data. The actual rows read in the 
test will be synthetic
+         // but this creates an sstable on disk to be snapshotted.
+         createTable("CREATE TABLE %s (pk text, ck1 int, ck2 int, v int, 
PRIMARY KEY (pk, ck1, ck2))");
+         for (int i = 0; i < 10; i++)
+             execute("insert into %s (pk, ck1, ck2, v) values (?, ?, ?, ?)", 
"key", i, i, i);
+         flush();
+ 
+         DatabaseDescriptor.setSnapshotOnDuplicateRowDetection(true);
 -        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+         CFMetaData metadata = getCurrentColumnFamilyStore().metadata;
+ 
+         final HashMap<InetAddress, MessageOut> sentMessages = new HashMap<>();
+         IMessageSink sink = new IMessageSink()
+         {
+             public boolean allowOutgoingMessage(MessageOut message, int id, 
InetAddress to)
+             {
+                 sentMessages.put(to, message);
+                 return false;
+             }
+ 
+             public boolean allowIncomingMessage(MessageIn message, int id)
+             {
+                 return false;
+             }
+         };
+         MessagingService.instance().addMessageSink(sink);
+ 
+         // no duplicates
+         sentMessages.clear();
 -        iterate(cfs, iter(metadata,
 -                          false,
 -                          makeRow(metadata,0, 0),
 -                          makeRow(metadata,0, 1),
 -                          makeRow(metadata,0, 2)));
++        iterate(makeRow(metadata,0, 0),
++                makeRow(metadata,0, 1),
++                makeRow(metadata,0, 2));
+         assertCommandIssued(sentMessages, false);
+ 
+         // now test with a duplicate row and see that we issue a snapshot 
command
+         sentMessages.clear();
 -        iterate(cfs, iter(metadata,
 -                          false,
 -                          makeRow(metadata, 0, 0),
 -                          makeRow(metadata, 0, 1),
 -                          makeRow(metadata, 0, 1)));
++        iterate(makeRow(metadata, 0, 0),
++                makeRow(metadata, 0, 1),
++                makeRow(metadata, 0, 1));
+         assertCommandIssued(sentMessages, true);
+     }
+ 
 -    private void iterate(ColumnFamilyStore cfs, UnfilteredPartitionIterator 
partitions)
++    private void iterate(Unfiltered...unfiltereds)
+     {
 -
 -        try (CompactionController controller = new 
CompactionController(getCurrentColumnFamilyStore(), Integer.MAX_VALUE);
 -             ISSTableScanner scanner = scanner(cfs, partitions);
++        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
++        DecoratedKey key = 
cfs.metadata.partitioner.decorateKey(ByteBufferUtil.bytes("key"));
++        try (CompactionController controller = new CompactionController(cfs, 
Integer.MAX_VALUE);
++             UnfilteredRowIterator rows = rows(metadata, key, false, 
unfiltereds);
++             ISSTableScanner scanner = new 
Scanner(Collections.singletonList(rows));
+              CompactionIterator iter = new 
CompactionIterator(OperationType.COMPACTION,
+                                                               
Collections.singletonList(scanner),
+                                                               controller, 
FBUtilities.nowInSeconds(), null))
+         {
+             while (iter.hasNext())
+             {
+                 try (UnfilteredRowIterator partition = iter.next())
+                 {
+                     partition.forEachRemaining(u -> {});
+                 }
+             }
+         }
+     }
 -
 -    private ISSTableScanner scanner(final ColumnFamilyStore cfs, final 
UnfilteredPartitionIterator partitions)
 -    {
 -
 -        return new ISSTableScanner()
 -        {
 -            public long getLengthInBytes() { return 0; }
 -
 -            public long getCurrentPosition() { return 0; }
 -
 -            public String getBackingFiles() { return 
cfs.getLiveSSTables().iterator().next().toString(); }
 -
 -            public boolean isForThrift() { return false; }
 -
 -            public CFMetaData metadata() { return cfs.metadata; }
 -
 -            public void close() { }
 -
 -            public boolean hasNext() { return partitions.hasNext(); }
 -
 -            public UnfilteredRowIterator next() { return partitions.next(); }
 -        };
 -    }
  }
diff --cc test/unit/org/apache/cassandra/db/partition/PartitionUpdateTest.java
index 0330b65,2bd685c..df23e4f
--- a/test/unit/org/apache/cassandra/db/partition/PartitionUpdateTest.java
+++ b/test/unit/org/apache/cassandra/db/partition/PartitionUpdateTest.java
@@@ -17,11 -17,34 +17,35 @@@
   */
  package org.apache.cassandra.db.partition;
  
 +import org.apache.cassandra.UpdateBuilder;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.List;
+ 
+ import com.google.common.collect.Lists;
+ 
  import org.apache.cassandra.config.CFMetaData;
  import org.apache.cassandra.cql3.CQLTester;
+ import org.apache.cassandra.db.Clustering;
+ import org.apache.cassandra.db.DecoratedKey;
+ import org.apache.cassandra.db.DeletionTime;
+ import org.apache.cassandra.db.Mutation;
  import org.apache.cassandra.db.RowUpdateBuilder;
+ import org.apache.cassandra.db.filter.ColumnFilter;
  import org.apache.cassandra.db.partitions.PartitionUpdate;
+ import org.apache.cassandra.db.rows.BTreeRow;
+ import org.apache.cassandra.db.rows.BufferCell;
+ import org.apache.cassandra.db.rows.Cell;
+ import org.apache.cassandra.db.rows.CellPath;
+ import org.apache.cassandra.db.rows.EncodingStats;
+ import org.apache.cassandra.db.rows.Row;
+ import org.apache.cassandra.db.rows.RowAndDeletionMergeIterator;
+ import org.apache.cassandra.db.rows.Rows;
+ import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+ import org.apache.cassandra.dht.Murmur3Partitioner;
+ import org.apache.cassandra.io.sstable.ISSTableScanner;
+ import org.apache.cassandra.io.sstable.format.SSTableReader;
+ import org.apache.cassandra.utils.ByteBufferUtil;
  import org.apache.cassandra.utils.FBUtilities;
  import org.junit.Test;
  
@@@ -84,4 -112,121 +111,121 @@@ public class PartitionUpdateTest extend
          update = new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), 
"key0").buildUpdate();
          Assert.assertEquals(0, update.operationCount());
      }
+ 
+     /**
+      * Makes sure we merge duplicate rows, see CASSANDRA-15789
+      */
+     @Test
+     public void testDuplicate()
+     {
+         createTable("CREATE TABLE %s (pk int, ck int, v map<text, text>, 
PRIMARY KEY (pk, ck))");
+         CFMetaData cfm = currentTableMetadata();
+ 
+         DecoratedKey dk = 
Murmur3Partitioner.instance.decorateKey(ByteBufferUtil.bytes(1));
+ 
+         List<Row> rows = new ArrayList<>();
+         Row.Builder builder = 
BTreeRow.unsortedBuilder(FBUtilities.nowInSeconds());
 -        builder.newRow(new Clustering(ByteBufferUtil.bytes(2)));
++        builder.newRow(Clustering.make(ByteBufferUtil.bytes(2)));
+         
builder.addComplexDeletion(cfm.getColumnDefinition(ByteBufferUtil.bytes("v")), 
new DeletionTime(2, 1588586647));
+ 
 -        Cell c = BufferCell.live(cfm, 
cfm.getColumnDefinition(ByteBufferUtil.bytes("v")), 3, 
ByteBufferUtil.bytes("h"), CellPath.create(ByteBufferUtil.bytes("g")));
++        Cell c = 
BufferCell.live(cfm.getColumnDefinition(ByteBufferUtil.bytes("v")), 3, 
ByteBufferUtil.bytes("h"), CellPath.create(ByteBufferUtil.bytes("g")));
+         builder.addCell(c);
+ 
+         Row r = builder.build();
+         rows.add(r);
+ 
 -        builder.newRow(new Clustering(ByteBufferUtil.bytes(2)));
++        builder.newRow(Clustering.make(ByteBufferUtil.bytes(2)));
+         builder.addRowDeletion(new Row.Deletion(new DeletionTime(1588586647, 
1), false));
+         r = builder.build();
+         rows.add(r);
+ 
+         RowAndDeletionMergeIterator rmi = new RowAndDeletionMergeIterator(cfm,
+                                                                           dk,
+                                                                           
DeletionTime.LIVE,
+                                                                           
ColumnFilter.all(cfm),
+                                                                           
Rows.EMPTY_STATIC_ROW,
+                                                                           
false,
+                                                                           
EncodingStats.NO_STATS,
+                                                                           
rows.iterator(),
+                                                                           
Collections.emptyIterator(),
+                                                                           
true);
+ 
 -        PartitionUpdate pu = PartitionUpdate.fromPre30Iterator(rmi);
++        PartitionUpdate pu = PartitionUpdate.fromPre30Iterator(rmi, 
ColumnFilter.all(cfm));
+         pu.iterator();
+ 
+         Mutation m = new 
Mutation(getCurrentColumnFamilyStore().keyspace.getName(), dk);
+         m.add(pu);
+         m.apply();
+         getCurrentColumnFamilyStore().forceBlockingFlush();
+ 
+         SSTableReader sst = 
getCurrentColumnFamilyStore().getLiveSSTables().iterator().next();
+         int count = 0;
+         try (ISSTableScanner scanner = sst.getScanner())
+         {
+             while (scanner.hasNext())
+             {
+                 try (UnfilteredRowIterator iter = scanner.next())
+                 {
+                     while (iter.hasNext())
+                     {
+                         iter.next();
+                         count++;
+                     }
+                 }
+             }
+         }
+         assertEquals(1, count);
+     }
+ 
+     /**
+      * Makes sure we don't create duplicates when merging 2 partition updates
+      */
+     @Test
+     public void testMerge()
+     {
+         createTable("CREATE TABLE %s (pk int, ck int, v map<text, text>, 
PRIMARY KEY (pk, ck))");
+         CFMetaData cfm = currentTableMetadata();
+ 
+         DecoratedKey dk = 
Murmur3Partitioner.instance.decorateKey(ByteBufferUtil.bytes(1));
+ 
+         Row.Builder builder = 
BTreeRow.unsortedBuilder(FBUtilities.nowInSeconds());
 -        builder.newRow(new Clustering(ByteBufferUtil.bytes(2)));
++        builder.newRow(Clustering.make(ByteBufferUtil.bytes(2)));
+         
builder.addComplexDeletion(cfm.getColumnDefinition(ByteBufferUtil.bytes("v")), 
new DeletionTime(2, 1588586647));
 -        Cell c = BufferCell.live(cfm, 
cfm.getColumnDefinition(ByteBufferUtil.bytes("v")), 3, 
ByteBufferUtil.bytes("h"), CellPath.create(ByteBufferUtil.bytes("g")));
++        Cell c = 
BufferCell.live(cfm.getColumnDefinition(ByteBufferUtil.bytes("v")), 3, 
ByteBufferUtil.bytes("h"), CellPath.create(ByteBufferUtil.bytes("g")));
+         builder.addCell(c);
+         Row r = builder.build();
+ 
+         PartitionUpdate p1 = new PartitionUpdate(cfm, dk, 
cfm.partitionColumns(), 2);
+         p1.add(r);
+ 
 -        builder.newRow(new Clustering(ByteBufferUtil.bytes(2)));
++        builder.newRow(Clustering.make(ByteBufferUtil.bytes(2)));
+         builder.addRowDeletion(new Row.Deletion(new DeletionTime(1588586647, 
1), false));
+         r = builder.build();
+         PartitionUpdate p2 = new PartitionUpdate(cfm, dk, 
cfm.partitionColumns(), 2);
+         p2.add(r);
+ 
+         Mutation m = new 
Mutation(getCurrentColumnFamilyStore().keyspace.getName(), dk);
+         m.add(PartitionUpdate.merge(Lists.newArrayList(p1, p2)));
+         m.apply();
+ 
+         getCurrentColumnFamilyStore().forceBlockingFlush();
+ 
+         SSTableReader sst = 
getCurrentColumnFamilyStore().getLiveSSTables().iterator().next();
+         int count = 0;
+         try (ISSTableScanner scanner = sst.getScanner())
+         {
+             while (scanner.hasNext())
+             {
+                 try (UnfilteredRowIterator iter = scanner.next())
+                 {
+                     while (iter.hasNext())
+                     {
+                         iter.next();
+                         count++;
+                     }
+                 }
+             }
+         }
+         assertEquals(1, count);
+     }
  }
diff --cc 
test/unit/org/apache/cassandra/db/transform/DuplicateRowCheckerTest.java
index 0000000,78a0c8c..432bce3
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/db/transform/DuplicateRowCheckerTest.java
+++ b/test/unit/org/apache/cassandra/db/transform/DuplicateRowCheckerTest.java
@@@ -1,0 -1,240 +1,246 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.db.transform;
+ 
+ import java.net.InetAddress;
+ import java.nio.ByteBuffer;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.Iterator;
+ 
+ import com.google.common.collect.Iterators;
+ import org.junit.*;
+ 
+ import org.apache.cassandra.config.CFMetaData;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.cql3.CQLTester;
+ import org.apache.cassandra.db.*;
+ import org.apache.cassandra.db.marshal.AbstractType;
+ import org.apache.cassandra.db.partitions.*;
+ import org.apache.cassandra.db.rows.*;
+ import org.apache.cassandra.net.*;
+ import org.apache.cassandra.utils.DiagnosticSnapshotService;
+ import org.apache.cassandra.utils.FBUtilities;
+ 
+ import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertTrue;
+ 
+ public class DuplicateRowCheckerTest extends CQLTester
+ {
+     ColumnFamilyStore cfs;
+     CFMetaData metadata;
+     static HashMap<InetAddress, MessageOut> sentMessages;
+ 
+     @BeforeClass
+     public static void setupMessaging()
+     {
+         sentMessages = new HashMap<>();
+         IMessageSink sink = new IMessageSink()
+         {
+             public boolean allowOutgoingMessage(MessageOut message, int id, 
InetAddress to)
+             {
+                 sentMessages.put(to, message);
+                 return false;
+             }
+ 
+             public boolean allowIncomingMessage(MessageIn message, int id)
+             {
+                 return false;
+             }
+         };
+         MessagingService.instance().addMessageSink(sink);
+     }
+ 
+     @Before
+     public void setup() throws Throwable
+     {
+         DatabaseDescriptor.setSnapshotOnDuplicateRowDetection(true);
+         System.setProperty("cassandra.diagnostic_snapshot_interval_nanos", 
"0");
+         // Create a table and insert some data. The actual rows read in the 
test will be synthetic
+         // but this creates an sstable on disk to be snapshotted.
+         createTable("CREATE TABLE %s (pk text, ck1 int, ck2 int, v int, 
PRIMARY KEY (pk, ck1, ck2))");
+         for (int i = 0; i < 10; i++)
+             execute("insert into %s (pk, ck1, ck2, v) values (?, ?, ?, ?)", 
"key", i, i, i);
+         getCurrentColumnFamilyStore().forceBlockingFlush();
+ 
+         metadata = getCurrentColumnFamilyStore().metadata;
+         cfs = getCurrentColumnFamilyStore();
+         sentMessages.clear();
+     }
+ 
+     @Test
+     public void noDuplicates()
+     {
+         // no duplicates
+         iterate(iter(metadata,
+                      false,
+                      makeRow(metadata, 0, 0),
+                      makeRow(metadata, 0, 1),
+                      makeRow(metadata, 0, 2)));
+         assertCommandIssued(sentMessages, false);
+     }
+ 
+     @Test
+     public void singleDuplicateForward()
+     {
+ 
+         iterate(iter(metadata,
+                      false,
+                      makeRow(metadata, 0, 0),
+                      makeRow(metadata, 0, 1),
+                      makeRow(metadata, 0, 1)));
+         assertCommandIssued(sentMessages, true);
+     }
+ 
+     @Test
+     public void singleDuplicateReverse()
+     {
+         iterate(iter(metadata,
+                      true,
+                      makeRow(metadata, 0, 0),
+                      makeRow(metadata, 0, 1),
+                      makeRow(metadata, 0, 1)));
+         assertCommandIssued(sentMessages, true);
+     }
+ 
+     @Test
+     public void multipleContiguousForward()
+     {
+         iterate(iter(metadata,
+                      false,
+                      makeRow(metadata, 0, 1),
+                      makeRow(metadata, 0, 1),
+                      makeRow(metadata, 0, 1)));
+         assertCommandIssued(sentMessages, true);
+     }
+ 
+     @Test
+     public void multipleContiguousReverse()
+     {
+         iterate(iter(metadata,
+                      true,
+                      makeRow(metadata, 0, 1),
+                      makeRow(metadata, 0, 1),
+                      makeRow(metadata, 0, 1)));
+         assertCommandIssued(sentMessages, true);
+     }
+ 
+     @Test
+     public void multipleDisjointForward()
+     {
+         iterate(iter(metadata,
+                      false,
+                      makeRow(metadata, 0, 0),
+                      makeRow(metadata, 0, 0),
+                      makeRow(metadata, 0, 1),
+                      makeRow(metadata, 0, 2),
+                      makeRow(metadata, 0, 2)));
+         assertCommandIssued(sentMessages, true);
+     }
+ 
+     @Test
+     public void multipleDisjointReverse()
+     {
+         iterate(iter(metadata,
+                      true,
+                      makeRow(metadata, 0, 0),
+                      makeRow(metadata, 0, 0),
+                      makeRow(metadata, 0, 1),
+                      makeRow(metadata, 0, 2),
+                      makeRow(metadata, 0, 2)));
+         assertCommandIssued(sentMessages, true);
+     }
+ 
+     public static void assertCommandIssued(HashMap<InetAddress, MessageOut> 
sent, boolean isExpected)
+     {
+         assertEquals(isExpected, !sent.isEmpty());
+         if (isExpected)
+         {
+             assertEquals(1, sent.size());
+             assertTrue(sent.containsKey(FBUtilities.getBroadcastAddress()));
+             SnapshotCommand command = (SnapshotCommand) 
sent.get(FBUtilities.getBroadcastAddress()).payload;
+             
assertTrue(command.snapshot_name.startsWith(DiagnosticSnapshotService.DUPLICATE_ROWS_DETECTED_SNAPSHOT_PREFIX));
+         }
+     }
+ 
+     private void iterate(UnfilteredPartitionIterator iter)
+     {
+         try (PartitionIterator partitions = applyChecker(iter))
+         {
+             while (partitions.hasNext())
+             {
+                 try (RowIterator partition = partitions.next())
+                 {
+                     partition.forEachRemaining(u -> {});
+                 }
+             }
+         }
+     }
+ 
+     @SuppressWarnings("unchecked")
+     private static <T> ByteBuffer decompose(AbstractType<?> type, T value)
+     {
+         return ((AbstractType<T>) type).decompose(value);
+     }
+ 
+     public static Row makeRow(CFMetaData metadata, Object... clusteringValues)
+     {
+         ByteBuffer[] clusteringByteBuffers = new 
ByteBuffer[clusteringValues.length];
+         for (int i = 0; i < clusteringValues.length; i++)
+             clusteringByteBuffers[i] = 
decompose(metadata.clusteringColumns().get(i).type, clusteringValues[i]);
+ 
 -        return BTreeRow.noCellLiveRow(new Clustering(clusteringByteBuffers), 
LivenessInfo.create(metadata, 0, 0));
++        return BTreeRow.noCellLiveRow(Clustering.make(clusteringByteBuffers), 
LivenessInfo.create(0, 0));
++    }
++
++    public static UnfilteredRowIterator rows(CFMetaData metadata,
++                                             DecoratedKey key,
++                                             boolean isReversedOrder,
++                                             Unfiltered... unfiltereds)
++    {
++        Iterator<Unfiltered> iterator = Iterators.forArray(unfiltereds);
++        return new AbstractUnfilteredRowIterator(metadata,
++                                                 key,
++                                                 DeletionTime.LIVE,
++                                                 metadata.partitionColumns(),
++                                                 Rows.EMPTY_STATIC_ROW,
++                                                 isReversedOrder,
++                                                 EncodingStats.NO_STATS)
++        {
++            protected Unfiltered computeNext()
++            {
++                return iterator.hasNext() ? iterator.next() : endOfData();
++            }
++        };
+     }
+ 
+     private static PartitionIterator applyChecker(UnfilteredPartitionIterator 
unfiltered)
+     {
+         int nowInSecs = 0;
+         return 
DuplicateRowChecker.duringRead(FilteredPartitions.filter(unfiltered, nowInSecs),
+                                               
Collections.singletonList(FBUtilities.getBroadcastAddress()));
+     }
+ 
+     public static UnfilteredPartitionIterator iter(CFMetaData metadata, 
boolean isReversedOrder, Unfiltered... unfiltereds)
+     {
+         DecoratedKey key = metadata.partitioner.decorateKey(bytes("key"));
 -        Iterator<Unfiltered> iterator = Iterators.forArray(unfiltereds);
 -
 -        UnfilteredRowIterator rowIter = new 
AbstractUnfilteredRowIterator(metadata,
 -                                                                          key,
 -                                                                          
DeletionTime.LIVE,
 -                                                                          
metadata.partitionColumns(),
 -                                                                          
Rows.EMPTY_STATIC_ROW,
 -                                                                          
isReversedOrder,
 -                                                                          
EncodingStats.NO_STATS)
 -        {
 -            protected Unfiltered computeNext()
 -            {
 -                return iterator.hasNext() ? iterator.next() : endOfData();
 -            }
 -        };
 -
++        UnfilteredRowIterator rowIter = rows(metadata, key, isReversedOrder, 
unfiltereds);
+         return new SingletonUnfilteredPartitionIterator(rowIter, false);
+     }
+ }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to