This is an automated email from the ASF dual-hosted git repository. mck pushed a commit to branch cassandra-3.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit af097240bca91b2ff52788bf1828bb9c4d405a4a Merge: d9b38a1 ae326ee Author: Mick Semb Wever <m...@apache.org> AuthorDate: Tue Mar 10 08:59:59 2020 +0100 Merge branch 'cassandra-2.2' into cassandra-3.0 CHANGES.txt | 1 + .../cassandra/serializers/MapSerializer.java | 4 ++ .../cassandra/serializers/SetSerializer.java | 4 ++ .../cassandra/db/commitlog/CommitLogTest.java | 44 +++++++++++++++++++++- 4 files changed, 52 insertions(+), 1 deletion(-) diff --cc CHANGES.txt index 77b9fdf,246627b..9a10106 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,6 -1,5 +1,7 @@@ -2.2.17 +3.0.21 + * Run evictFromMembership in GossipStage (CASSANDRA-15592) +Merged from 2.2: + * Fix Commit log replays when static column clustering keys are collections (CASSANDRA-14365) * Fix Red Hat init script on newer systemd versions (CASSANDRA-15273) * Allow EXTRA_CLASSPATH to work on tar/source installations (CASSANDRA-15567) diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java index b8f68ed,c883cbd..479a090 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java @@@ -41,9 -39,12 +41,10 @@@ import org.junit.runners.Parameterized. import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; + import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.config.KSMetaData; import org.apache.cassandra.config.ParameterizedClass; -import org.apache.cassandra.config.Schema; -import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.config.Config.DiskFailurePolicy; import org.apache.cassandra.db.*; import org.apache.cassandra.db.commitlog.CommitLogReplayer.CommitLogReplayException; import org.apache.cassandra.db.compaction.CompactionManager; @@@ -71,8 -67,9 +72,9 @@@ public class CommitLogTes { private static final String KEYSPACE1 = "CommitLogTest"; private static final String KEYSPACE2 = "CommitLogTestNonDurable"; - private static final String CF1 = "Standard1"; - private static final String CF2 = "Standard2"; - private static final String CF3 = "Custom1"; + private static final String STANDARD1 = "Standard1"; + private static final String STANDARD2 = "Standard2"; ++ private static final String CUSTOM1 = "Custom1"; public CommitLogTest(ParameterizedClass commitLogCompression) { @@@ -98,20 -95,28 +100,30 @@@ @BeforeClass public static void defineSchema() throws ConfigurationException { + // Disable durable writes for system keyspaces to prevent system mutations, e.g. sstable_activity, + // to end up in CL segments and cause unexpected results in this test wrt counting CL segments, + // see CASSANDRA-12854 + KeyspaceParams.DEFAULT_LOCAL_DURABLE_WRITES = false; + SchemaLoader.prepareServer(); ++ + CFMetaData custom = CFMetaData.compile(String.format("CREATE TABLE \"%s\" (" + + "k int," + + "c1 frozen<map<text, text>>," + + "c2 frozen<set<text>>," + + "s int static," + + "PRIMARY KEY (k, c1, c2)" + - ");", CF3),KEYSPACE1); ++ ");", CUSTOM1), KEYSPACE1); ++ SchemaLoader.createKeyspace(KEYSPACE1, - SimpleStrategy.class, - KSMetaData.optsWithRF(1), - SchemaLoader.standardCFMD(KEYSPACE1, CF1), - SchemaLoader.standardCFMD(KEYSPACE1, CF2), + KeyspaceParams.simple(1), + SchemaLoader.standardCFMD(KEYSPACE1, STANDARD1, 0, AsciiType.instance, BytesType.instance), - SchemaLoader.standardCFMD(KEYSPACE1, STANDARD2, 0, AsciiType.instance, BytesType.instance)); ++ SchemaLoader.standardCFMD(KEYSPACE1, STANDARD2, 0, AsciiType.instance, BytesType.instance), + custom); SchemaLoader.createKeyspace(KEYSPACE2, - false, - true, - SimpleStrategy.class, - KSMetaData.optsWithRF(1), - SchemaLoader.standardCFMD(KEYSPACE1, CF1), - SchemaLoader.standardCFMD(KEYSPACE1, CF2)); - + KeyspaceParams.simpleTransient(1), + SchemaLoader.standardCFMD(KEYSPACE1, STANDARD1, 0, AsciiType.instance, BytesType.instance), + SchemaLoader.standardCFMD(KEYSPACE1, STANDARD2, 0, AsciiType.instance, BytesType.instance)); CompactionManager.instance.disableAutoCompaction(); } @@@ -548,182 -460,29 +560,184 @@@ @Test public void testTruncateWithoutSnapshot() throws ExecutionException, InterruptedException, IOException { - boolean prev = DatabaseDescriptor.isAutoSnapshot(); - DatabaseDescriptor.setAutoSnapshot(false); - ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore("Standard1"); - ColumnFamilyStore cfs2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore("Standard2"); + boolean originalState = DatabaseDescriptor.isAutoSnapshot(); + try + { + CommitLog.instance.resetUnsafe(true); + boolean prev = DatabaseDescriptor.isAutoSnapshot(); + DatabaseDescriptor.setAutoSnapshot(false); + ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1); + ColumnFamilyStore cfs2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD2); + + new RowUpdateBuilder(cfs1.metadata, 0, "k").clustering("bytes").add("val", ByteBuffer.allocate(100)).build().applyUnsafe(); + cfs1.truncateBlocking(); + DatabaseDescriptor.setAutoSnapshot(prev); + Mutation m2 = new RowUpdateBuilder(cfs2.metadata, 0, "k") + .clustering("bytes") + .add("val", ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize() / 4)) + .build(); + + for (int i = 0 ; i < 5 ; i++) + CommitLog.instance.add(m2); + + assertEquals(2, CommitLog.instance.activeSegments()); + ReplayPosition position = CommitLog.instance.getContext(); + for (Keyspace ks : Keyspace.system()) + for (ColumnFamilyStore syscfs : ks.getColumnFamilyStores()) + CommitLog.instance.discardCompletedSegments(syscfs.metadata.cfId, ReplayPosition.NONE, position); + CommitLog.instance.discardCompletedSegments(cfs2.metadata.cfId, ReplayPosition.NONE, position); + assertEquals(1, CommitLog.instance.activeSegments()); + } + finally + { + DatabaseDescriptor.setAutoSnapshot(originalState); + } + } - final Mutation rm1 = new Mutation(KEYSPACE1, bytes("k")); - rm1.add("Standard1", Util.cellname("c1"), ByteBuffer.allocate(100), 0); - rm1.apply(); - cfs1.truncateBlocking(); - DatabaseDescriptor.setAutoSnapshot(prev); - final Mutation rm2 = new Mutation(KEYSPACE1, bytes("k")); - rm2.add("Standard2", Util.cellname("c1"), ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize() / 4), 0); + @Test + public void testTruncateWithoutSnapshotNonDurable() throws IOException + { + boolean originalState = DatabaseDescriptor.getAutoSnapshot(); + try + { + DatabaseDescriptor.setAutoSnapshot(false); + Keyspace notDurableKs = Keyspace.open(KEYSPACE2); + Assert.assertFalse(notDurableKs.getMetadata().params.durableWrites); + + ColumnFamilyStore cfs = notDurableKs.getColumnFamilyStore("Standard1"); + new RowUpdateBuilder(cfs.metadata, 0, "key1") + .clustering("bytes").add("val", ByteBufferUtil.bytes("abcd")) + .build() + .applyUnsafe(); + + assertTrue(Util.getOnlyRow(Util.cmd(cfs).columns("val").build()) + .cells().iterator().next().value().equals(ByteBufferUtil.bytes("abcd"))); + + cfs.truncateBlocking(); + + Util.assertEmpty(Util.cmd(cfs).columns("val").build()); + } + finally + { + DatabaseDescriptor.setAutoSnapshot(originalState); + } + } + + @Test + public void testUnwriteableFlushRecovery() throws ExecutionException, InterruptedException, IOException + { + CommitLog.instance.resetUnsafe(true); + + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1); + + DiskFailurePolicy oldPolicy = DatabaseDescriptor.getDiskFailurePolicy(); + try + { + DatabaseDescriptor.setDiskFailurePolicy(DiskFailurePolicy.ignore); + + for (int i = 0 ; i < 5 ; i++) + { + new RowUpdateBuilder(cfs.metadata, 0, "k") + .clustering("c" + i).add("val", ByteBuffer.allocate(100)) + .build() + .apply(); + + if (i == 2) + { + try (Closeable c = Util.markDirectoriesUnwriteable(cfs)) + { + cfs.forceBlockingFlush(); + } + catch (Throwable t) + { + // expected. Cause (after some wrappings) should be a write error + while (!(t instanceof FSWriteError)) + t = t.getCause(); + } + } + else + cfs.forceBlockingFlush(); + } + } + finally + { + DatabaseDescriptor.setDiskFailurePolicy(oldPolicy); + } + + CommitLog.instance.sync(true, true); + System.setProperty("cassandra.replayList", KEYSPACE1 + "." + STANDARD1); + // Currently we don't attempt to re-flush a memtable that failed, thus make sure data is replayed by commitlog. + // If retries work subsequent flushes should clear up error and this should change to expect 0. + Assert.assertEquals(1, CommitLog.instance.resetUnsafe(false)); ++ System.clearProperty("cassandra.replayList"); + } + + public void testOutOfOrderFlushRecovery(BiConsumer<ColumnFamilyStore, Memtable> flushAction, boolean performCompaction) + throws ExecutionException, InterruptedException, IOException + { + CommitLog.instance.resetUnsafe(true); + + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1); for (int i = 0 ; i < 5 ; i++) - CommitLog.instance.add(rm2); + { + new RowUpdateBuilder(cfs.metadata, 0, "k") + .clustering("c" + i).add("val", ByteBuffer.allocate(100)) + .build() + .apply(); + + Memtable current = cfs.getTracker().getView().getCurrentMemtable(); + if (i == 2) + current.makeUnflushable(); + + flushAction.accept(cfs, current); + } + if (performCompaction) + cfs.forceMajorCompaction(); + // Make sure metadata saves and reads fine + for (SSTableReader reader : cfs.getLiveSSTables()) + reader.reloadSSTableMetadata(); + + CommitLog.instance.sync(true, true); + System.setProperty("cassandra.replayList", KEYSPACE1 + "." + STANDARD1); + // In the absence of error, this should be 0 because forceBlockingFlush/forceRecycleAllSegments would have + // persisted all data in the commit log. Because we know there was an error, there must be something left to + // replay. + Assert.assertEquals(1, CommitLog.instance.resetUnsafe(false)); ++ System.clearProperty("cassandra.replayList"); + } + + BiConsumer<ColumnFamilyStore, Memtable> flush = (cfs, current) -> + { + try + { + cfs.forceBlockingFlush(); + } + catch (Throwable t) + { + // expected after makeUnflushable. Cause (after some wrappings) should be a write error + while (!(t instanceof FSWriteError)) + t = t.getCause(); + // Wait for started flushes to complete. + cfs.switchMemtableIfCurrent(current); + } + }; + + BiConsumer<ColumnFamilyStore, Memtable> recycleSegments = (cfs, current) -> + { + // Move to new commit log segment and try to flush all data. Also delete segments that no longer contain + // flushed data. + // This does not stop on errors and should retain segments for which flushing failed. + CommitLog.instance.forceRecycleAllSegments(); + + // Wait for started flushes to complete. + cfs.switchMemtableIfCurrent(current); + }; - Assert.assertEquals(2, CommitLog.instance.activeSegments()); - ReplayPosition position = CommitLog.instance.getContext(); - for (Keyspace ks : Keyspace.system()) - for (ColumnFamilyStore syscfs : ks.getColumnFamilyStores()) - CommitLog.instance.discardCompletedSegments(syscfs.metadata.cfId, position); - CommitLog.instance.discardCompletedSegments(cfs2.metadata.cfId, position); - Assert.assertEquals(1, CommitLog.instance.activeSegments()); + @Test + public void testOutOfOrderFlushRecovery() throws ExecutionException, InterruptedException, IOException + { + testOutOfOrderFlushRecovery(flush, false); } @Test @@@ -733,15 -513,33 +747,43 @@@ } @Test - public void testRecoveryWithCollectionClusteringKeysStatic() throws Exception + public void testOutOfOrderFlushRecoveryWithCompaction() throws ExecutionException, InterruptedException, IOException + { + testOutOfOrderFlushRecovery(flush, true); + } + + @Test + public void testOutOfOrderLogDiscardWithCompaction() throws ExecutionException, InterruptedException, IOException { - Mutation rm = new Mutation(KEYSPACE1, bytes(0)); + testOutOfOrderFlushRecovery(recycleSegments, true); + } + - CFMetaData cfm = Schema.instance.getCFMetaData(KEYSPACE1,CF3); ++ @Test ++ public void testRecoveryWithCollectionClusteringKeysStatic() throws Exception ++ { + - int clusterSize = cfm.comparator.clusteringPrefixSize(); - ByteBuffer[] elements = new ByteBuffer[clusterSize]; - for (int i = 0; i < clusterSize; i++) - elements[i] = ByteBufferUtil.EMPTY_BYTE_BUFFER; ++ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CUSTOM1); ++ RowUpdateBuilder rb = new RowUpdateBuilder(cfs.metadata, 0, 1); + - rm.add(CF3, CellNames.compositeSparse(elements, new ColumnIdentifier("s", true), true), bytes(1), 0); ++ rb.add("s", 2); + ++ Mutation rm = rb.build(); + CommitLog.instance.add(rm); ++ + int replayed = 0; + + try + { + System.setProperty(CommitLogReplayer.IGNORE_REPLAY_ERRORS_PROPERTY, "true"); + replayed = CommitLog.instance.resetUnsafe(false); + } + finally + { + System.clearProperty(CommitLogReplayer.IGNORE_REPLAY_ERRORS_PROPERTY); + } + + Assert.assertEquals(replayed, 1); + + } } + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org