This is an automated email from the ASF dual-hosted git repository.

mck pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit af097240bca91b2ff52788bf1828bb9c4d405a4a
Merge: d9b38a1 ae326ee
Author: Mick Semb Wever <m...@apache.org>
AuthorDate: Tue Mar 10 08:59:59 2020 +0100

    Merge branch 'cassandra-2.2' into cassandra-3.0

 CHANGES.txt                                        |  1 +
 .../cassandra/serializers/MapSerializer.java       |  4 ++
 .../cassandra/serializers/SetSerializer.java       |  4 ++
 .../cassandra/db/commitlog/CommitLogTest.java      | 44 +++++++++++++++++++++-
 4 files changed, 52 insertions(+), 1 deletion(-)

diff --cc CHANGES.txt
index 77b9fdf,246627b..9a10106
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,6 -1,5 +1,7 @@@
 -2.2.17
 +3.0.21
 + * Run evictFromMembership in GossipStage (CASSANDRA-15592)
 +Merged from 2.2:
+  * Fix Commit log replays when static column clustering keys are collections 
(CASSANDRA-14365)
   * Fix Red Hat init script on newer systemd versions (CASSANDRA-15273)
   * Allow EXTRA_CLASSPATH to work on tar/source installations (CASSANDRA-15567)
  
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index b8f68ed,c883cbd..479a090
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@@ -41,9 -39,12 +41,10 @@@ import org.junit.runners.Parameterized.
  
  import org.apache.cassandra.SchemaLoader;
  import org.apache.cassandra.Util;
+ import org.apache.cassandra.config.CFMetaData;
  import org.apache.cassandra.config.DatabaseDescriptor;
 -import org.apache.cassandra.config.KSMetaData;
  import org.apache.cassandra.config.ParameterizedClass;
 -import org.apache.cassandra.config.Schema;
 -import org.apache.cassandra.cql3.ColumnIdentifier;
 +import org.apache.cassandra.config.Config.DiskFailurePolicy;
  import org.apache.cassandra.db.*;
  import 
org.apache.cassandra.db.commitlog.CommitLogReplayer.CommitLogReplayException;
  import org.apache.cassandra.db.compaction.CompactionManager;
@@@ -71,8 -67,9 +72,9 @@@ public class CommitLogTes
  {
      private static final String KEYSPACE1 = "CommitLogTest";
      private static final String KEYSPACE2 = "CommitLogTestNonDurable";
 -    private static final String CF1 = "Standard1";
 -    private static final String CF2 = "Standard2";
 -    private static final String CF3 = "Custom1";
 +    private static final String STANDARD1 = "Standard1";
 +    private static final String STANDARD2 = "Standard2";
++    private static final String CUSTOM1 = "Custom1";
  
      public CommitLogTest(ParameterizedClass commitLogCompression)
      {
@@@ -98,20 -95,28 +100,30 @@@
      @BeforeClass
      public static void defineSchema() throws ConfigurationException
      {
 +        // Disable durable writes for system keyspaces to prevent system 
mutations, e.g. sstable_activity,
 +        // to end up in CL segments and cause unexpected results in this test 
wrt counting CL segments,
 +        // see CASSANDRA-12854
 +        KeyspaceParams.DEFAULT_LOCAL_DURABLE_WRITES = false;
 +
          SchemaLoader.prepareServer();
++
+         CFMetaData custom = CFMetaData.compile(String.format("CREATE TABLE 
\"%s\" (" +
+                                                              "k int," +
+                                                              "c1 
frozen<map<text, text>>," +
+                                                              "c2 
frozen<set<text>>," +
+                                                              "s int static," +
+                                                              "PRIMARY KEY (k, 
c1, c2)" +
 -                                                             ");", 
CF3),KEYSPACE1);
++                                                             ");", CUSTOM1), 
KEYSPACE1);
++
          SchemaLoader.createKeyspace(KEYSPACE1,
 -                                    SimpleStrategy.class,
 -                                    KSMetaData.optsWithRF(1),
 -                                    SchemaLoader.standardCFMD(KEYSPACE1, CF1),
 -                                    SchemaLoader.standardCFMD(KEYSPACE1, CF2),
 +                                    KeyspaceParams.simple(1),
 +                                    SchemaLoader.standardCFMD(KEYSPACE1, 
STANDARD1, 0, AsciiType.instance, BytesType.instance),
-                                     SchemaLoader.standardCFMD(KEYSPACE1, 
STANDARD2, 0, AsciiType.instance, BytesType.instance));
++                                    SchemaLoader.standardCFMD(KEYSPACE1, 
STANDARD2, 0, AsciiType.instance, BytesType.instance),
+                                     custom);
          SchemaLoader.createKeyspace(KEYSPACE2,
 -                                    false,
 -                                    true,
 -                                    SimpleStrategy.class,
 -                                    KSMetaData.optsWithRF(1),
 -                                    SchemaLoader.standardCFMD(KEYSPACE1, CF1),
 -                                    SchemaLoader.standardCFMD(KEYSPACE1, 
CF2));
 -
 +                                    KeyspaceParams.simpleTransient(1),
 +                                    SchemaLoader.standardCFMD(KEYSPACE1, 
STANDARD1, 0, AsciiType.instance, BytesType.instance),
 +                                    SchemaLoader.standardCFMD(KEYSPACE1, 
STANDARD2, 0, AsciiType.instance, BytesType.instance));
          CompactionManager.instance.disableAutoCompaction();
      }
  
@@@ -548,182 -460,29 +560,184 @@@
      @Test
      public void testTruncateWithoutSnapshot() throws ExecutionException, 
InterruptedException, IOException
      {
 -        boolean prev = DatabaseDescriptor.isAutoSnapshot();
 -        DatabaseDescriptor.setAutoSnapshot(false);
 -        ColumnFamilyStore cfs1 = 
Keyspace.open(KEYSPACE1).getColumnFamilyStore("Standard1");
 -        ColumnFamilyStore cfs2 = 
Keyspace.open(KEYSPACE1).getColumnFamilyStore("Standard2");
 +        boolean originalState = DatabaseDescriptor.isAutoSnapshot();
 +        try
 +        {
 +            CommitLog.instance.resetUnsafe(true);
 +            boolean prev = DatabaseDescriptor.isAutoSnapshot();
 +            DatabaseDescriptor.setAutoSnapshot(false);
 +            ColumnFamilyStore cfs1 = 
Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
 +            ColumnFamilyStore cfs2 = 
Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD2);
 +
 +            new RowUpdateBuilder(cfs1.metadata, 0, 
"k").clustering("bytes").add("val", 
ByteBuffer.allocate(100)).build().applyUnsafe();
 +            cfs1.truncateBlocking();
 +            DatabaseDescriptor.setAutoSnapshot(prev);
 +            Mutation m2 = new RowUpdateBuilder(cfs2.metadata, 0, "k")
 +                          .clustering("bytes")
 +                          .add("val", 
ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize() / 4))
 +                          .build();
 +
 +            for (int i = 0 ; i < 5 ; i++)
 +                CommitLog.instance.add(m2);
 +
 +            assertEquals(2, CommitLog.instance.activeSegments());
 +            ReplayPosition position = CommitLog.instance.getContext();
 +            for (Keyspace ks : Keyspace.system())
 +                for (ColumnFamilyStore syscfs : ks.getColumnFamilyStores())
 +                    
CommitLog.instance.discardCompletedSegments(syscfs.metadata.cfId, 
ReplayPosition.NONE, position);
 +            CommitLog.instance.discardCompletedSegments(cfs2.metadata.cfId, 
ReplayPosition.NONE, position);
 +            assertEquals(1, CommitLog.instance.activeSegments());
 +        }
 +        finally
 +        {
 +            DatabaseDescriptor.setAutoSnapshot(originalState);
 +        }
 +    }
  
 -        final Mutation rm1 = new Mutation(KEYSPACE1, bytes("k"));
 -        rm1.add("Standard1", Util.cellname("c1"), ByteBuffer.allocate(100), 
0);
 -        rm1.apply();
 -        cfs1.truncateBlocking();
 -        DatabaseDescriptor.setAutoSnapshot(prev);
 -        final Mutation rm2 = new Mutation(KEYSPACE1, bytes("k"));
 -        rm2.add("Standard2", Util.cellname("c1"), 
ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize() / 4), 0);
 +    @Test
 +    public void testTruncateWithoutSnapshotNonDurable() throws IOException
 +    {
 +        boolean originalState = DatabaseDescriptor.getAutoSnapshot();
 +        try
 +        {
 +            DatabaseDescriptor.setAutoSnapshot(false);
 +            Keyspace notDurableKs = Keyspace.open(KEYSPACE2);
 +            
Assert.assertFalse(notDurableKs.getMetadata().params.durableWrites);
 +
 +            ColumnFamilyStore cfs = 
notDurableKs.getColumnFamilyStore("Standard1");
 +            new RowUpdateBuilder(cfs.metadata, 0, "key1")
 +                .clustering("bytes").add("val", ByteBufferUtil.bytes("abcd"))
 +                .build()
 +                .applyUnsafe();
 +
 +            assertTrue(Util.getOnlyRow(Util.cmd(cfs).columns("val").build())
 +                            
.cells().iterator().next().value().equals(ByteBufferUtil.bytes("abcd")));
 +
 +            cfs.truncateBlocking();
 +
 +            Util.assertEmpty(Util.cmd(cfs).columns("val").build());
 +        }
 +        finally
 +        {
 +            DatabaseDescriptor.setAutoSnapshot(originalState);
 +        }
 +    }
 +
 +    @Test
 +    public void testUnwriteableFlushRecovery() throws ExecutionException, 
InterruptedException, IOException
 +    {
 +        CommitLog.instance.resetUnsafe(true);
 +
 +        ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
 +
 +        DiskFailurePolicy oldPolicy = 
DatabaseDescriptor.getDiskFailurePolicy();
 +        try
 +        {
 +            DatabaseDescriptor.setDiskFailurePolicy(DiskFailurePolicy.ignore);
 +
 +            for (int i = 0 ; i < 5 ; i++)
 +            {
 +                new RowUpdateBuilder(cfs.metadata, 0, "k")
 +                    .clustering("c" + i).add("val", ByteBuffer.allocate(100))
 +                    .build()
 +                    .apply();
 +
 +                if (i == 2)
 +                {
 +                    try (Closeable c = Util.markDirectoriesUnwriteable(cfs))
 +                    {
 +                        cfs.forceBlockingFlush();
 +                    }
 +                    catch (Throwable t)
 +                    {
 +                        // expected. Cause (after some wrappings) should be a 
write error
 +                        while (!(t instanceof FSWriteError))
 +                            t = t.getCause();
 +                    }
 +                }
 +                else
 +                    cfs.forceBlockingFlush();
 +            }
 +        }
 +        finally
 +        {
 +            DatabaseDescriptor.setDiskFailurePolicy(oldPolicy);
 +        }
 +
 +        CommitLog.instance.sync(true, true);
 +        System.setProperty("cassandra.replayList", KEYSPACE1 + "." + 
STANDARD1);
 +        // Currently we don't attempt to re-flush a memtable that failed, 
thus make sure data is replayed by commitlog.
 +        // If retries work subsequent flushes should clear up error and this 
should change to expect 0.
 +        Assert.assertEquals(1, CommitLog.instance.resetUnsafe(false));
++        System.clearProperty("cassandra.replayList");
 +    }
 +
 +    public void testOutOfOrderFlushRecovery(BiConsumer<ColumnFamilyStore, 
Memtable> flushAction, boolean performCompaction)
 +            throws ExecutionException, InterruptedException, IOException
 +    {
 +        CommitLog.instance.resetUnsafe(true);
 +
 +        ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
  
          for (int i = 0 ; i < 5 ; i++)
 -            CommitLog.instance.add(rm2);
 +        {
 +            new RowUpdateBuilder(cfs.metadata, 0, "k")
 +                .clustering("c" + i).add("val", ByteBuffer.allocate(100))
 +                .build()
 +                .apply();
 +
 +            Memtable current = 
cfs.getTracker().getView().getCurrentMemtable();
 +            if (i == 2)
 +                current.makeUnflushable();
 +
 +            flushAction.accept(cfs, current);
 +        }
 +        if (performCompaction)
 +            cfs.forceMajorCompaction();
 +        // Make sure metadata saves and reads fine
 +        for (SSTableReader reader : cfs.getLiveSSTables())
 +            reader.reloadSSTableMetadata();
 +
 +        CommitLog.instance.sync(true, true);
 +        System.setProperty("cassandra.replayList", KEYSPACE1 + "." + 
STANDARD1);
 +        // In the absence of error, this should be 0 because 
forceBlockingFlush/forceRecycleAllSegments would have
 +        // persisted all data in the commit log. Because we know there was an 
error, there must be something left to
 +        // replay.
 +        Assert.assertEquals(1, CommitLog.instance.resetUnsafe(false));
++        System.clearProperty("cassandra.replayList");
 +    }
 +
 +    BiConsumer<ColumnFamilyStore, Memtable> flush = (cfs, current) ->
 +    {
 +        try
 +        {
 +            cfs.forceBlockingFlush();
 +        }
 +        catch (Throwable t)
 +        {
 +            // expected after makeUnflushable. Cause (after some wrappings) 
should be a write error
 +            while (!(t instanceof FSWriteError))
 +                t = t.getCause();
 +            // Wait for started flushes to complete.
 +            cfs.switchMemtableIfCurrent(current);
 +        }
 +    };
 +
 +    BiConsumer<ColumnFamilyStore, Memtable> recycleSegments = (cfs, current) 
->
 +    {
 +        // Move to new commit log segment and try to flush all data. Also 
delete segments that no longer contain
 +        // flushed data.
 +        // This does not stop on errors and should retain segments for which 
flushing failed.
 +        CommitLog.instance.forceRecycleAllSegments();
 +
 +        // Wait for started flushes to complete.
 +        cfs.switchMemtableIfCurrent(current);
 +    };
  
 -        Assert.assertEquals(2, CommitLog.instance.activeSegments());
 -        ReplayPosition position = CommitLog.instance.getContext();
 -        for (Keyspace ks : Keyspace.system())
 -            for (ColumnFamilyStore syscfs : ks.getColumnFamilyStores())
 -                
CommitLog.instance.discardCompletedSegments(syscfs.metadata.cfId, position);
 -        CommitLog.instance.discardCompletedSegments(cfs2.metadata.cfId, 
position);
 -        Assert.assertEquals(1, CommitLog.instance.activeSegments());
 +    @Test
 +    public void testOutOfOrderFlushRecovery() throws ExecutionException, 
InterruptedException, IOException
 +    {
 +        testOutOfOrderFlushRecovery(flush, false);
      }
  
      @Test
@@@ -733,15 -513,33 +747,43 @@@
      }
  
      @Test
 -    public void testRecoveryWithCollectionClusteringKeysStatic() throws 
Exception
 +    public void testOutOfOrderFlushRecoveryWithCompaction() throws 
ExecutionException, InterruptedException, IOException
 +    {
 +        testOutOfOrderFlushRecovery(flush, true);
 +    }
 +
 +    @Test
 +    public void testOutOfOrderLogDiscardWithCompaction() throws 
ExecutionException, InterruptedException, IOException
      {
 -        Mutation rm = new Mutation(KEYSPACE1, bytes(0));
 +        testOutOfOrderFlushRecovery(recycleSegments, true);
 +    }
+ 
 -        CFMetaData cfm = Schema.instance.getCFMetaData(KEYSPACE1,CF3);
++    @Test
++    public void testRecoveryWithCollectionClusteringKeysStatic() throws 
Exception
++    {
+ 
 -        int clusterSize = cfm.comparator.clusteringPrefixSize();
 -        ByteBuffer[] elements = new ByteBuffer[clusterSize];
 -        for (int i = 0; i < clusterSize; i++)
 -            elements[i] = ByteBufferUtil.EMPTY_BYTE_BUFFER;
++        ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE1).getColumnFamilyStore(CUSTOM1);
++        RowUpdateBuilder rb = new RowUpdateBuilder(cfs.metadata, 0, 1);
+ 
 -        rm.add(CF3, CellNames.compositeSparse(elements, new 
ColumnIdentifier("s", true), true), bytes(1), 0);
++        rb.add("s", 2);
+ 
++        Mutation rm = rb.build();
+         CommitLog.instance.add(rm);
++
+         int replayed = 0;
+ 
+         try
+         {
+             
System.setProperty(CommitLogReplayer.IGNORE_REPLAY_ERRORS_PROPERTY, "true");
+             replayed = CommitLog.instance.resetUnsafe(false);
+         }
+         finally
+         {
+             
System.clearProperty(CommitLogReplayer.IGNORE_REPLAY_ERRORS_PROPERTY);
+         }
+ 
+         Assert.assertEquals(replayed, 1);
+ 
+     }
  }
 +


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to