This is an automated email from the ASF dual-hosted git repository.

slebresne pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 8358e19840d352475a5831d130ff3c43a11f2f4e
Author: Sylvain Lebresne <lebre...@gmail.com>
AuthorDate: Fri May 8 18:12:55 2020 +0200

    Fix legacy handling of RangeTombstone with collection ones
    
    When a multi-row range tombstone interacts with a a collection tombstone
    within one of a covered row, the resulting range tombstone in the legacy
    format will start in the middle of the row and extend past said row and
    it needs special handling.
    
    Before this commit, the code deserializing that RT was making it
    artificially start at the end of the row (in which the collection
    tombstone is), but that means that when `LegacyLayout.CellGrouper`
    encountered it, it decided the row was finished, even if it was not,
    leading to potential row duplication.
    
    The patch solves this by:
    1. making that problematic tombstone start at the beginning of the row
    instead of its end (to avoid code deciding the row is over).
    2. modify `UnfilteredDeserializer` to 'split' that range tombstone into
    a row tombstone for the row it covers, which is handled as a normal row
    tombstone, and push the rest of the range tombstone (that starts after
    the row and extends to the original end of the RT) to be handled after
    that row is fully "grouped".
    
    The patch also removes the possibility of getting an empty row from
    `LegacyLayout#getNextRow` to avoid theoretical problems with that.
    
    Patch by Sylvain Lebresne; reviewed by Marcus Eriksson & Aleksey
    Yeschenko for CASSANDRA-15805
---
 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/db/LegacyLayout.java |  99 ++++++++++++----
 .../cassandra/db/UnfilteredDeserializer.java       | 129 ++++++++++++++++-----
 .../upgrade/MixedModeRangeTombstoneTest.java       |  73 ++++++++++++
 4 files changed, 252 insertions(+), 50 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index cdb9ad0..46b3f56 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.21
+ * Fix duplicated row on 2.x upgrades when multi-rows range tombstones 
interact with collection ones (CASSANDRA-15805)
  * Rely on snapshotted session infos on StreamResultFuture.maybeComplete to 
avoid race conditions (CASSANDRA-15667)
  * EmptyType doesn't override writeValue so could attempt to write bytes when 
expected not to (CASSANDRA-15790)
  * Fix index queries on partition key columns when some partitions contains 
only static data (CASSANDRA-13666)
diff --git a/src/java/org/apache/cassandra/db/LegacyLayout.java 
b/src/java/org/apache/cassandra/db/LegacyLayout.java
index 37cc935..39dd54a 100644
--- a/src/java/org/apache/cassandra/db/LegacyLayout.java
+++ b/src/java/org/apache/cassandra/db/LegacyLayout.java
@@ -1115,7 +1115,7 @@ public abstract class LegacyLayout
         return true;
     }
 
-    private static Comparator<LegacyAtom> legacyAtomComparator(CFMetaData 
metadata)
+    static Comparator<LegacyAtom> legacyAtomComparator(CFMetaData metadata)
     {
         return (o1, o2) ->
         {
@@ -1373,8 +1373,24 @@ public abstract class LegacyLayout
             this.hasValidCells = false;
         }
 
+        /**
+         * Try adding the provided atom to the currently grouped row.
+         *
+         * @param atom the new atom to try to add. This <b>must</b> be a "row" 
atom, that is either a cell or a legacy
+         *             range tombstone that covers only one row (row deletion) 
or a subset of it (collection
+         *             deletion). Meaning that legacy range tombstone covering 
multiple rows (that should be handled as
+         *             legit range tombstone in the new storage engine) should 
be handled separately. Atoms should also
+         *             be provided in proper clustering order.
+         * @return {@code true} if the provided atom has been "consumed" by 
this grouper (this does _not_ mean the
+         *          atom has been "used" by the grouper as the grouper will 
skip some shadowed atoms for instance, just
+         *          that {@link #getRow()} shouldn't be called just yet if 
there is more atom in the atom iterator we're
+         *          grouping). {@code false} otherwise, that is if the row 
currently built by this grouper is done
+         *          _without_ the provided atom being "consumed" (and so 
{@link #getRow()} should be called and the
+         *          grouper resetted, after which the provided atom should be 
provided again).
+         */
         public boolean addAtom(LegacyAtom atom)
         {
+            assert atom.isRowAtom(metadata) : "Unexpected non in-row legacy 
range tombstone " + atom;
             return atom.isCell()
                  ? addCell(atom.asCell())
                  : addRangeTombstone(atom.asRangeTombstone());
@@ -1472,11 +1488,16 @@ public abstract class LegacyLayout
         private boolean addRangeTombstone(LegacyRangeTombstone tombstone)
         {
             if (tombstone.isRowDeletion(metadata))
+            {
                 return addRowTombstone(tombstone);
-            else if (tombstone.isCollectionTombstone())
-                return addCollectionTombstone(tombstone);
+            }
             else
-                return addGenericRangeTombstone(tombstone);
+            {
+                // The isRowAtom() assertion back in addAtom would have 
already triggered otherwise, but spelling it
+                // out nonetheless.
+                assert tombstone.isCollectionTombstone();
+                return addCollectionTombstone(tombstone);
+            }
         }
 
         private boolean addRowTombstone(LegacyRangeTombstone tombstone)
@@ -1545,24 +1566,32 @@ public abstract class LegacyLayout
             return true;
         }
 
-        private boolean addGenericRangeTombstone(LegacyRangeTombstone 
tombstone)
+        /**
+         * Whether the provided range tombstone starts strictly after the 
current row of the cell grouper (if no row is
+         * currently started, this return false).
+         */
+        public boolean startsAfterCurrentRow(LegacyRangeTombstone 
rangeTombstone)
         {
-            /*
-             * We can see a non-collection, non-row deletion in two scenarios:
-             *
-             * 1. Most commonly, the tombstone's start bound is bigger than 
current row's clustering, which means that
-             *    the current row is over, and we should move on to the next 
row or RT;
-             *
-             * 2. Less commonly, the tombstone's start bound is smaller than 
current row's clustering, which means that
-             *    we've crossed an index boundary and are seeing a non-closed 
RT from the previous block, repeated;
-             *    we should ignore it and stay in the current row.
-             *
-             *  In either case, clustering should be non-null, or we shouldn't 
have gotten to this method at all
-             *  However, to be absolutely SURE we're in case two above, we 
check here.
-             */
-            return clustering != null && 
metadata.comparator.compare(clustering, tombstone.start.bound.clustering()) > 0;
+            return clustering != null && 
metadata.comparator.compare(rangeTombstone.start.bound, clustering) > 0;
+        }
+
+        /**
+         * The clustering of the current row of the cell grouper, or {@code 
null} if no row is currently started.
+         */
+        public Clustering currentRowClustering()
+        {
+            return clustering;
         }
 
+        /**
+         * Generates the row currently grouped by this grouper and reset it 
for the following row.
+         * <p>
+         * Note that the only correct way to call this is when either all the 
atom we're trying to group has been
+         * consumed, or when {@link #addAtom(LegacyAtom)} returns {@code 
false}.
+         *
+         * @return the current row that has been grouped, or {@code null} in 
the rare case where all the atoms
+         * "consumed" by {@link #addAtom(LegacyAtom)} for this row were 
skipped (we skip atoms under a few conditions).
+         */
         public Row getRow()
         {
             if (!hasValidCells && invalidLivenessInfo != null)
@@ -1718,6 +1747,12 @@ public abstract class LegacyLayout
 
         public LegacyCell asCell();
         public LegacyRangeTombstone asRangeTombstone();
+
+        /**
+         * Whether the atom is one that becomes part of a {@link Row} in the 
new storage engine, meaning it is either
+         * as cell or a legacy range tombstone that covers a single row, or 
parts of one.
+         */
+        public boolean isRowAtom(CFMetaData metadata);
     }
 
     /**
@@ -1835,6 +1870,12 @@ public abstract class LegacyLayout
             throw new UnsupportedOperationException();
         }
 
+        @Override
+        public boolean isRowAtom(CFMetaData metaData)
+        {
+            return true;
+        }
+
         public boolean isCounter()
         {
             return kind == Kind.COUNTER;
@@ -1889,9 +1930,9 @@ public abstract class LegacyLayout
             if ((start.collectionName == null) != (stop.collectionName == 
null))
             {
                 if (start.collectionName == null)
-                    stop = new LegacyBound(stop.bound, stop.isStatic, null);
+                    stop = new 
LegacyBound(Slice.Bound.inclusiveEndOf(stop.bound.values), stop.isStatic, null);
                 else
-                    start = new LegacyBound(start.bound, start.isStatic, null);
+                    start = new 
LegacyBound(Slice.Bound.inclusiveStartOf(start.bound.values), start.isStatic, 
null);
             }
             else if (!Objects.equals(start.collectionName, 
stop.collectionName))
             {
@@ -1918,11 +1959,21 @@ public abstract class LegacyLayout
             return new LegacyRangeTombstone(newStart, stop, deletionTime);
         }
 
+        public LegacyRangeTombstone withNewStart(Slice.Bound newStart)
+        {
+            return withNewStart(new LegacyBound(newStart, start.isStatic, 
null));
+        }
+
         public LegacyRangeTombstone withNewEnd(LegacyBound newStop)
         {
             return new LegacyRangeTombstone(start, newStop, deletionTime);
         }
 
+        public LegacyRangeTombstone withNewEnd(Slice.Bound newEnd)
+        {
+            return withNewEnd(new LegacyBound(newEnd, stop.isStatic, null));
+        }
+
         public boolean isCell()
         {
             return false;
@@ -1943,6 +1994,12 @@ public abstract class LegacyLayout
             return this;
         }
 
+        @Override
+        public boolean isRowAtom(CFMetaData metadata)
+        {
+            return isCollectionTombstone() || isRowDeletion(metadata);
+        }
+
         public boolean isCollectionTombstone()
         {
             return start.collectionName != null;
diff --git a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java 
b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
index 62ad76a..2d270bc 100644
--- a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
+++ b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
@@ -480,16 +480,7 @@ public abstract class UnfilteredDeserializer
                 this.helper = helper;
                 this.grouper = new LegacyLayout.CellGrouper(metadata, helper);
                 this.tombstoneTracker = new 
TombstoneTracker(partitionDeletion);
-                this.atoms = new AtomIterator(atomReader);
-            }
-
-            private boolean isRow(LegacyLayout.LegacyAtom atom)
-            {
-                if (atom.isCell())
-                    return true;
-
-                LegacyLayout.LegacyRangeTombstone tombstone = 
atom.asRangeTombstone();
-                return tombstone.isCollectionTombstone() || 
tombstone.isRowDeletion(metadata);
+                this.atoms = new AtomIterator(atomReader, metadata);
             }
 
             public boolean hasNext()
@@ -515,7 +506,7 @@ public abstract class UnfilteredDeserializer
                             if (tombstoneTracker.isShadowed(atom))
                                 continue;
 
-                            if (isRow(atom))
+                            if (atom.isRowAtom(metadata))
                                 next = readRow(atom);
                             else
                                 
tombstoneTracker.openNew(atom.asRangeTombstone());
@@ -539,13 +530,57 @@ public abstract class UnfilteredDeserializer
                                                  ? 
LegacyLayout.CellGrouper.staticGrouper(metadata, helper)
                                                  : this.grouper;
                 grouper.reset();
+                // We know the first atom is not shadowed and is a "row" atom, 
so can be added blindly.
                 grouper.addAtom(first);
-                // As long as atoms are part of the same row, consume them. 
Note that the call to addAtom() uses
-                // atoms.peek() so that the atom is only consumed (by next) if 
it's part of the row (addAtom returns true)
-                while (atoms.hasNext() && grouper.addAtom(atoms.peek()))
+
+                // We're less sure about the next atoms. In particular, 
CellGrouper want to make sure we only pass it
+                // "row" atoms (it's the only type it knows how to handle) so 
we should handle anything else.
+                while (atoms.hasNext())
                 {
-                    atoms.next();
+                    // Peek, but don't consume the next atom just yet
+                    LegacyLayout.LegacyAtom atom = atoms.peek();
+                    // First, that atom may be shadowed in which case we can 
simply ignore it. Note that this handles
+                    // the case of repeated RT start marker after we've 
crossed an index boundary, which could well
+                    // appear in the middle of a row (CASSANDRA-14008).
+                    if (!tombstoneTracker.hasClosingMarkerBefore(atom) && 
tombstoneTracker.isShadowed(atom))
+                    {
+                        atoms.next(); // consume the atom since we only peeked 
it so far
+                        continue;
+                    }
+
+                    // Second, we should only pass "row" atoms to the cell 
grouper
+                    if (atom.isRowAtom(metadata))
+                    {
+                        if (!grouper.addAtom(atom))
+                            break; // done with the row; don't consume the atom
+                        atoms.next(); // the grouper "accepted" the atom, 
consume it since we only peeked above
+                    }
+                    else
+                    {
+                        LegacyLayout.LegacyRangeTombstone rt = 
(LegacyLayout.LegacyRangeTombstone) atom;
+                        // This means we have a non-row range tombstone. 
Unfortunately, that does not guarantee the
+                        // current row is finished (though it may), because 
due to the logic within LegacyRangeTombstone
+                        // constructor, we can get an out-of-order RT that 
includes on the current row (even if it is
+                        // already started) and extends past it.
+
+                        // So first, evacuate the easy case of the range 
tombstone simply starting after the current
+                        // row, in which case we're done with the current row 
(but don't consume the new RT yet so it
+                        // gets handled as any other non-row RT).
+                        if (grouper.startsAfterCurrentRow(rt))
+                            break;
+
+                        // Otherwise, we "split" the RT in 2: the part 
covering the current row, which is now an
+                        // inRowAtom and can be passed to the grouper, and the 
part after that, which we push back into
+                        // the iterator for later processing.
+                        Clustering currentRow = grouper.currentRowClustering();
+                        atoms.next(); // consume since we had only just peeked 
it so far and we're using it
+                        
atoms.pushOutOfOrder(rt.withNewStart(Slice.Bound.exclusiveStartOf(currentRow)));
+                        // Note: in theory the withNewStart is a no-op here, 
but not taking any risk
+                        
grouper.addAtom(rt.withNewStart(Slice.Bound.inclusiveStartOf(currentRow))
+                                          
.withNewEnd(Slice.Bound.inclusiveEndOf(currentRow)));
+                    }
                 }
+
                 return grouper.getRow();
             }
 
@@ -583,51 +618,87 @@ public abstract class UnfilteredDeserializer
             private static class AtomIterator implements 
PeekingIterator<LegacyLayout.LegacyAtom>
             {
                 private final Supplier<LegacyLayout.LegacyAtom> atomReader;
-                private boolean isDone;
+                private boolean readerExhausted;
                 private LegacyLayout.LegacyAtom next;
 
-                private AtomIterator(Supplier<LegacyLayout.LegacyAtom> 
atomReader)
+                private final Comparator<LegacyLayout.LegacyAtom> 
atomComparator;
+                // May temporarily store atoms that needs to be handler later 
than when they were deserialized.
+                // Lazily initialized since it is used infrequently.
+                private Queue<LegacyLayout.LegacyAtom> outOfOrderAtoms;
+
+                private AtomIterator(Supplier<LegacyLayout.LegacyAtom> 
atomReader, CFMetaData metadata)
                 {
                     this.atomReader = atomReader;
+                    this.atomComparator = 
LegacyLayout.legacyAtomComparator(metadata);
                 }
 
                 public boolean hasNext()
                 {
-                    if (isDone)
-                        return false;
+                    if (readerExhausted)
+                        return hasOutOfOrderAtoms(); // We have to return out 
of order atoms when reader exhausts
 
+                    // Note that next() and peek() assumes that next has been 
set by this method, so we do it even if
+                    // we have some outOfOrderAtoms stacked up.
                     if (next == null)
-                    {
                         next = atomReader.get();
-                        if (next == null)
-                        {
-                            isDone = true;
-                            return false;
-                        }
-                    }
-                    return true;
+
+                    readerExhausted = next == null;
+                    return !readerExhausted || hasOutOfOrderAtoms();
                 }
 
                 public LegacyLayout.LegacyAtom next()
                 {
                     if (!hasNext())
                         throw new UnsupportedOperationException();
+
+                    if (hasOutOrderAtomBeforeNext())
+                        return outOfOrderAtoms.poll();
+
                     LegacyLayout.LegacyAtom toReturn = next;
                     next = null;
                     return toReturn;
                 }
 
+                private boolean hasOutOfOrderAtoms()
+                {
+                    return outOfOrderAtoms != null && 
!outOfOrderAtoms.isEmpty();
+                }
+
+                private boolean hasOutOrderAtomBeforeNext()
+                {
+                    // Note that if outOfOrderAtoms is null, the first 
condition will be false, so we can save a null
+                    // check on calling `outOfOrderAtoms.peek()` in the right 
branch.
+                    return hasOutOfOrderAtoms()
+                           && (next == null || 
atomComparator.compare(outOfOrderAtoms.peek(), next) <= 0);
+                }
+
                 public LegacyLayout.LegacyAtom peek()
                 {
                     if (!hasNext())
                         throw new UnsupportedOperationException();
+                    if (hasOutOrderAtomBeforeNext())
+                        return outOfOrderAtoms.peek();
                     return next;
                 }
 
+                /**
+                 * Push back an atom in the iterator assuming said atom sorts 
strictly _after_ the atom returned by
+                 * the last next() call (meaning the pushed atom fall in the 
part of the iterator that has not been
+                 * returned yet, not before). The atom will then be returned 
by the iterator in proper order.
+                 */
+                public void pushOutOfOrder(LegacyLayout.LegacyAtom atom)
+                {
+                    if (outOfOrderAtoms == null)
+                        outOfOrderAtoms = new PriorityQueue<>(atomComparator);
+                    outOfOrderAtoms.offer(atom);
+                }
+
                 public void clearState()
                 {
                     this.next = null;
-                    this.isDone = false;
+                    this.readerExhausted = false;
+                    if (outOfOrderAtoms != null)
+                        outOfOrderAtoms.clear();
                 }
 
                 public void remove()
@@ -685,7 +756,7 @@ public abstract class UnfilteredDeserializer
                     if (partitionDeletion.deletes(timestamp))
                         return true;
 
-                    SortedSet<LegacyLayout.LegacyRangeTombstone> 
coveringTombstones = isRow(atom) ? openTombstones : 
openTombstones.tailSet(atom.asRangeTombstone());
+                    SortedSet<LegacyLayout.LegacyRangeTombstone> 
coveringTombstones = atom.isRowAtom(metadata) ? openTombstones : 
openTombstones.tailSet(atom.asRangeTombstone());
                     return Iterables.any(coveringTombstones, tombstone -> 
tombstone.deletionTime.deletes(timestamp));
                 }
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeRangeTombstoneTest.java
 
b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeRangeTombstoneTest.java
new file mode 100644
index 0000000..e4b3a17
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeRangeTombstoneTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.shared.DistributedTestBase;
+import org.apache.cassandra.distributed.shared.Versions;
+
+import static java.lang.String.format;
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.apache.cassandra.distributed.shared.AssertUtils.row;
+
+/**
+ * Tests related to the handle of range tombstones during 2.x to 3.x upgrades.
+ */
+public class MixedModeRangeTombstoneTest extends UpgradeTestBase
+{
+    /**
+     * Tests the interaction of range tombstones covering multiple rows and 
collection tombsones within the covered
+     * rows.
+     *
+     * <p>This test reproduces the issue of CASSANDRA-15805.
+     */
+    @Test
+    public void multiRowsRangeTombstoneAndCollectionTombstoneInteractionTest() 
throws Throwable {
+        String tableName = DistributedTestBase.KEYSPACE + ".t";
+        String schema = "CREATE TABLE " + tableName + " (" +
+                        "  k int," +
+                        "  c1 text," +
+                        "  c2 text," +
+                        "  a text," +
+                        "  b set<text>," +
+                        "  c text," +
+                        "  PRIMARY KEY((k), c1, c2)" +
+                        " )";
+
+
+        new TestCase()
+        .nodes(2)
+        .upgrade(Versions.Major.v22, Versions.Major.v30)
+        .setup(cluster -> {
+            cluster.schemaChange(schema);
+            cluster.coordinator(1).execute(format("DELETE FROM %s USING 
TIMESTAMP 1 WHERE k = 0 AND c1 = 'A'", tableName), ConsistencyLevel.ALL);
+            cluster.coordinator(1).execute(format("INSERT INTO %s(k, c1, c2, 
a, b, c) VALUES (0, 'A', 'X', 'foo', {'whatever'}, 'bar') USING TIMESTAMP 2", 
tableName), ConsistencyLevel.ALL);
+            cluster.coordinator(1).execute(format("DELETE b FROM %s USING 
TIMESTAMP 3 WHERE k = 0 AND c1 = 'A' and c2 = 'X'", tableName), 
ConsistencyLevel.ALL);
+            cluster.get(1).flush(DistributedTestBase.KEYSPACE);
+            cluster.get(2).flush(DistributedTestBase.KEYSPACE);
+        })
+        .runAfterNodeUpgrade((cluster, node) -> {
+            assertRows(cluster.coordinator(node).execute(format("SELECT * FROM 
%s", tableName), ConsistencyLevel.ALL),
+                       row(0, "A", "X", "foo", null, "bar"));
+        })
+        .run();
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to