This is an automated email from the ASF dual-hosted git repository.

bdeggleston pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 9bf1ab1a6a8393a1e5cc67041f6e85dd9065b9f4
Merge: 3f73c16 31b9078
Author: Blake Eggleston <bdeggles...@gmail.com>
AuthorDate: Mon Oct 5 14:20:16 2020 -0700

    Merge branch 'cassandra-3.0' into cassandra-3.11

 CHANGES.txt                                        |   1 +
 .../apache/cassandra/config/ColumnDefinition.java  |  23 ++++
 src/java/org/apache/cassandra/db/Columns.java      |  19 +++-
 .../apache/cassandra/db/SerializationHeader.java   |  17 ++-
 .../cassandra/db/UnknownColumnException.java       |  12 ++-
 .../apache/cassandra/db/filter/ColumnFilter.java   |   8 +-
 .../cassandra/db/partitions/PartitionUpdate.java   |   7 ++
 .../cassandra/db/rows/UnfilteredSerializer.java    |  20 +++-
 .../cassandra/distributed/test/SchemaTest.java     | 117 +++++++++++++++++++++
 .../distributed/test/SimpleReadWriteTest.java      |  87 ++++++++++++---
 test/unit/org/apache/cassandra/db/ColumnsTest.java |   2 +-
 11 files changed, 280 insertions(+), 33 deletions(-)

diff --cc CHANGES.txt
index b735ba5,1ea5184..99369fa
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,17 -1,10 +1,18 @@@
 -3.0.23:
 - * Handle unexpected columns due to schema races (CASSANDRA-15899)
 +3.11.9
 + * Fix memory leak in CompressedChunkReader (CASSANDRA-15880)
 + * Don't attempt value skipping with mixed version cluster (CASSANDRA-15833)
   * Avoid failing compactions with very large partitions (CASSANDRA-15164)
 - * Use IF NOT EXISTS for index and UDT create statements in snapshot schema 
files (CASSANDRA-13935)
 + * Make sure LCS handles duplicate sstable added/removed notifications 
correctly (CASSANDRA-14103)
 +Merged from 3.0:
++ * Handle unexpected columns due to schema races (CASSANDRA-15899)
   * Add flag to ignore unreplicated keyspaces during repair (CASSANDRA-15160)
  
 -3.0.22:
 +3.11.8
 + * Correctly interpret SASI's `max_compaction_flush_memory_in_mb` setting in 
megabytes not bytes (CASSANDRA-16071)
 + * Fix short read protection for GROUP BY queries (CASSANDRA-15459)
 + * Frozen RawTuple is not annotated with frozen in the toString method 
(CASSANDRA-15857)
 +Merged from 3.0:
 + * Use IF NOT EXISTS for index and UDT create statements in snapshot schema 
files (CASSANDRA-13935)
   * Fix gossip shutdown order (CASSANDRA-15816)
   * Remove broken 'defrag-on-read' optimization (CASSANDRA-15432)
   * Check for endpoint collision with hibernating nodes (CASSANDRA-14599)
diff --cc src/java/org/apache/cassandra/db/filter/ColumnFilter.java
index 57ff729,858c944..3c79539
--- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
@@@ -481,11 -441,11 +481,11 @@@ public class ColumnFilte
                  }
              }
  
 -            if (hasSelection)
 +            if (hasQueried)
              {
-                 Columns statics = Columns.serializer.deserialize(in, 
metadata);
-                 Columns regulars = Columns.serializer.deserialize(in, 
metadata);
+                 Columns statics = Columns.serializer.deserializeStatics(in, 
metadata);
+                 Columns regulars = Columns.serializer.deserializeRegulars(in, 
metadata);
 -                selection = new PartitionColumns(statics, regulars);
 +                queried = new PartitionColumns(statics, regulars);
              }
  
              SortedSetMultimap<ColumnIdentifier, ColumnSubselection> 
subSelections = null;
diff --cc src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
index 926f3ef,9e11f94..0890611
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@@ -19,18 -19,13 +19,18 @@@ package org.apache.cassandra.db.rows
  
  import java.io.IOException;
  
- import com.google.common.collect.Collections2;
  
 +import net.nicoulaj.compilecommand.annotations.Inline;
  import org.apache.cassandra.config.ColumnDefinition;
+ import org.apache.cassandra.db.marshal.UTF8Type;
  import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.rows.Row.Deletion;
  import org.apache.cassandra.io.util.DataInputPlus;
 +import org.apache.cassandra.io.util.DataOutputBuffer;
  import org.apache.cassandra.io.util.DataOutputPlus;
 +import org.apache.cassandra.io.util.FileDataInput;
  import org.apache.cassandra.utils.SearchIterator;
 +import org.apache.cassandra.utils.WrappedException;
  
  /**
   * Serialize/deserialize a single Unfiltered (both on-wire and on-disk).
@@@ -230,37 -184,25 +230,42 @@@ public class UnfilteredSerialize
              Columns.serializer.serializeSubset(row.columns(), headerColumns, 
out);
  
          SearchIterator<ColumnDefinition, ColumnDefinition> si = 
headerColumns.iterator();
 -        for (ColumnData data : row)
 -        {
 -            // We can obtain the column for data directly from data.column(). 
However, if the cell/complex data
 -            // originates from a sstable, the column we'll get will have the 
type used when the sstable was serialized,
 -            // and if that type have been recently altered, that may not be 
the type we want to serialize the column
 -            // with. So we use the ColumnDefinition from the "header" which 
is "current". Also see #11810 for what
 -            // happens if we don't do that.
 -            ColumnDefinition column = si.next(data.column());
  
 -            // we may have columns that the remote node isn't aware of due to 
inflight schema changes
 -            // in cases where it tries to fetch all columns, it will set the 
`all columns` flag, but only
 -            // expect a subset of columns (from this node's perspective). See 
CASSANDRA-15899
 -            if (column == null)
 -                continue;
 +        try
 +        {
 +            row.apply(cd -> {
 +                // We can obtain the column for data directly from 
data.column(). However, if the cell/complex data
 +                // originates from a sstable, the column we'll get will have 
the type used when the sstable was serialized,
 +                // and if that type have been recently altered, that may not 
be the type we want to serialize the column
 +                // with. So we use the ColumnDefinition from the "header" 
which is "current". Also see #11810 for what
 +                // happens if we don't do that.
 +                ColumnDefinition column = si.next(cd.column());
-                 assert column != null : cd.column.toString();
++
++                // we may have columns that the remote node isn't aware of 
due to inflight schema changes
++                // in cases where it tries to fetch all columns, it will set 
the `all columns` flag, but only
++                // expect a subset of columns (from this node's perspective). 
See CASSANDRA-15899
++                if (column == null)
++                    return;
 +
 +                try
 +                {
 +                    if (cd.column.isSimple())
 +                        Cell.serializer.serialize((Cell) cd, column, out, 
pkLiveness, header);
 +                    else
 +                        writeComplexColumn((ComplexColumnData) cd, column, 
(flags & HAS_COMPLEX_DELETION) != 0, pkLiveness, header, out);
 +                }
 +                catch (IOException e)
 +                {
 +                    throw new WrappedException(e);
 +                }
 +            }, false);
 +        }
 +        catch (WrappedException e)
 +        {
 +            if (e.getCause() instanceof IOException)
 +                throw (IOException) e.getCause();
  
 -            if (data.column.isSimple())
 -                Cell.serializer.serialize((Cell) data, column, out, 
pkLiveness, header);
 -            else
 -                writeComplexColumn((ComplexColumnData) data, column, 
hasComplexDeletion, pkLiveness, header, out);
 +            throw e;
          }
      }
  
@@@ -597,31 -488,15 +603,35 @@@
              builder.addRowDeletion(hasDeletion ? new 
Row.Deletion(header.readDeletionTime(in), deletionIsShadowable) : 
Row.Deletion.LIVE);
  
              Columns columns = hasAllColumns ? headerColumns : 
Columns.serializer.deserializeSubset(headerColumns, in);
 -            for (ColumnDefinition column : columns)
 +
 +            final LivenessInfo livenessInfo = rowLiveness;
 +
 +            try
              {
 -                // if the column is a placeholder, then it's not part of our 
schema, and we can't deserialize it
 -                if (column.isPlaceholder())
 -                    throw new UnknownColumnException(column.ksName, 
column.cfName, column.name.bytes);
 -                if (column.isSimple())
 -                    readSimpleColumn(column, in, header, helper, builder, 
rowLiveness);
 -                else
 -                    readComplexColumn(column, in, header, helper, 
hasComplexDeletion, builder, rowLiveness);
 +                columns.apply(column -> {
 +                    try
 +                    {
++                        // if the column is a placeholder, then it's not part 
of our schema, and we can't deserialize it
++                        if (column.isPlaceholder())
++                            throw new UnknownColumnException(column.ksName, 
column.cfName, column.name.bytes);
++
 +                        if (column.isSimple())
 +                            readSimpleColumn(column, in, header, helper, 
builder, livenessInfo);
 +                        else
 +                            readComplexColumn(column, in, header, helper, 
hasComplexDeletion, builder, livenessInfo);
 +                    }
 +                    catch (IOException e)
 +                    {
 +                        throw new WrappedException(e);
 +                    }
 +                }, false);
 +            }
 +            catch (WrappedException e)
 +            {
 +                if (e.getCause() instanceof IOException)
 +                    throw (IOException) e.getCause();
 +
 +                throw e;
              }
  
              return builder.build();
diff --cc 
test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
index 75e5ba9,17064fa..05f3458
--- 
a/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
@@@ -116,11 -119,46 +119,47 @@@ public class SimpleReadWriteTest extend
          }
  
          Assert.assertTrue(thrown.getMessage().contains("Exception occurred on 
node"));
-         
Assert.assertTrue(thrown.getCause().getCause().getCause().getMessage().contains("Unknown
 column v2 during deserialization"));
++        
Assert.assertTrue(thrown.getCause().getCause().getCause().getMessage().contains("Unknown
 column v2"));
      }
  
+     /**
+      * If a node receives a mutation for a column it knows has been dropped, 
the write should succeed
+      */
      @Test
-     public void readWithSchemaDisagreement() throws Throwable
+     public void writeWithSchemaDisagreement2() throws Throwable
+     {
+         cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck 
int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
+ 
+         cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, 
ck, v1, v2) VALUES (1, 1, 1, 1)");
+         cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, 
ck, v1, v2) VALUES (1, 1, 1, 1)");
+         cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, 
ck, v1, v2) VALUES (1, 1, 1, 1)");
+ 
+         for (int i=0; i<cluster.size(); i++)
+             cluster.get(i+1).flush(KEYSPACE);;
+ 
+         // Introduce schema disagreement
+         cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl DROP v2", 1);
+ 
+         // execute a write including the dropped column where the coordinator 
is not yet aware of the drop
+         // all nodes should process this without error
+         cluster.coordinator(2).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, 
ck, v1, v2) VALUES (2, 2, 2, 2)",
+                                            ConsistencyLevel.ALL);
+         // and flushing should also be fine
+         for (int i=0; i<cluster.size(); i++)
+             cluster.get(i+1).flush(KEYSPACE);;
+         // the results of reads will vary depending on whether the 
coordinator has seen the schema change
+         // note: read repairs will propagate the v2 value to node1, but this 
is safe and handled correctly
+         assertRows(cluster.coordinator(2).execute("SELECT * FROM " + KEYSPACE 
+ ".tbl", ConsistencyLevel.ALL),
+                    rows(row(1,1,1,1), row(2,2,2,2)));
+         assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE 
+ ".tbl", ConsistencyLevel.ALL),
+                    rows(row(1,1,1), row(2,2,2)));
+     }
+ 
+     /**
+      * If a node isn't aware of a column, but receives a mutation without 
that column, the write should succeed
+      */
+     @Test
+     public void writeWithInconsequentialSchemaDisagreement() throws Throwable
      {
          cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck 
int, v1 int, PRIMARY KEY (pk, ck))");
  


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to