Make sure sstables only get committed when it's safe to discard commit log records
Patch by Alex Petrov; reviewed by Branimir Lambov for CASSANDRA-12956 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6f90e55e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6f90e55e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6f90e55e Branch: refs/heads/cassandra-3.11 Commit: 6f90e55e7e23cbe814a3232c8d1ec67f2ff2a537 Parents: 5f64ed7 Author: Alex Petrov <oleksandr.pet...@gmail.com> Authored: Tue Nov 29 22:58:36 2016 +0100 Committer: Branimir Lambov <branimir.lam...@datastax.com> Committed: Tue Dec 6 12:10:31 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 77 +++++++++++++---- src/java/org/apache/cassandra/db/Memtable.java | 81 ++++++++---------- .../miscellaneous/ColumnFamilyStoreTest.java | 90 ++++++++++++++++++++ 4 files changed, 186 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f90e55e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 5cacdd0..5242adf 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.11 + * Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956) * Reject default_time_to_live option when creating or altering MVs (CASSANDRA-12868) * Nodetool should use a more sane max heap size (CASSANDRA-12739) * LocalToken ensures token values are cloned on heap (CASSANDRA-12651) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f90e55e/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index d2a51a9..113e10d 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -63,6 +63,7 @@ import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.SSTableTxnWriter; import org.apache.cassandra.io.sstable.format.*; import org.apache.cassandra.io.sstable.format.big.BigFormat; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; @@ -81,6 +82,7 @@ import org.json.simple.JSONArray; import org.json.simple.JSONObject; import static org.apache.cassandra.utils.Throwables.maybeFail; +import static org.apache.cassandra.utils.Throwables.merge; public class ColumnFamilyStore implements ColumnFamilyStoreMBean { @@ -124,7 +126,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class); - private static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(), + @VisibleForTesting + public static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(), StageManager.KEEPALIVE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), @@ -921,8 +924,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { final boolean flushSecondaryIndexes; final OpOrder.Barrier writeBarrier; - final CountDownLatch latch = new CountDownLatch(1); - volatile FSWriteError flushFailure = null; + final CountDownLatch memtablesFlushLatch = new CountDownLatch(1); + final CountDownLatch secondaryIndexFlushLatch = new CountDownLatch(1); + volatile Throwable flushFailure = null; final List<Memtable> memtables; private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier, @@ -943,15 +947,27 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean * TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly * with CL as we do with memtables/CFS-backed SecondaryIndexes. */ - - if (flushSecondaryIndexes) - indexManager.flushAllNonCFSBackedIndexesBlocking(); + try + { + if (flushSecondaryIndexes) + { + indexManager.flushAllNonCFSBackedIndexesBlocking(); + } + } + catch (Throwable e) + { + flushFailure = merge(flushFailure, e); + } + finally + { + secondaryIndexFlushLatch.countDown(); + } try { // we wait on the latch for the commitLogUpperBound to be set, and so that waiters // on this task can rely on all prior flushes being complete - latch.await(); + memtablesFlushLatch.await(); } catch (InterruptedException e) { @@ -970,7 +986,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean metric.pendingFlushes.dec(); if (flushFailure != null) - throw flushFailure; + Throwables.propagate(flushFailure); return commitLogUpperBound; } @@ -1048,15 +1064,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean try { for (Memtable memtable : memtables) - { - Collection<SSTableReader> readers = Collections.emptyList(); - if (!memtable.isClean() && !truncate) - readers = memtable.flush(); - memtable.cfs.replaceFlushed(memtable, readers); - reclaim(memtable); - } + flushMemtable(memtable); } - catch (FSWriteError e) + catch (Throwable e) { JVMStabilityInspector.inspectThrowable(e); // If we weren't killed, try to continue work but do not allow CommitLog to be discarded. @@ -1064,7 +1074,40 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } // signal the post-flush we've done our work - postFlush.latch.countDown(); + postFlush.memtablesFlushLatch.countDown(); + } + + public Collection<SSTableReader> flushMemtable(Memtable memtable) + { + if (memtable.isClean() || truncate) + { + memtable.cfs.replaceFlushed(memtable, Collections.emptyList()); + reclaim(memtable); + return Collections.emptyList(); + } + + Collection<SSTableReader> readers = Collections.emptyList(); + try (SSTableTxnWriter writer = memtable.flush()) + { + try + { + postFlush.secondaryIndexFlushLatch.await(); + } + catch (InterruptedException e) + { + postFlush.flushFailure = merge(postFlush.flushFailure, e); + } + + if (postFlush.flushFailure == null && writer.getFilePointer() > 0) + // sstables should contain non-repaired data. + readers = writer.finish(true); + else + maybeFail(writer.abort(postFlush.flushFailure)); + } + + memtable.cfs.replaceFlushed(memtable, readers); + reclaim(memtable); + return readers; } private void reclaim(final Memtable memtable) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f90e55e/src/java/org/apache/cassandra/db/Memtable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java index 1a7d6cb..6404b37 100644 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@ -48,6 +48,7 @@ import org.apache.cassandra.index.transactions.UpdateTransaction; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTableTxnWriter; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.io.util.DiskAwareRunnable; import org.apache.cassandra.service.ActiveRepairService; @@ -317,7 +318,7 @@ public class Memtable implements Comparable<Memtable> return partitions.get(key); } - public Collection<SSTableReader> flush() + public SSTableTxnWriter flush() { long estimatedSize = estimatedSize(); Directories.DataDirectory dataDirectory = cfs.getDirectories().getWriteableLocation(estimatedSize); @@ -357,64 +358,52 @@ public class Memtable implements Comparable<Memtable> * 1.2); // bloom filter and row index overhead } - private Collection<SSTableReader> writeSortedContents(File sstableDirectory) + private SSTableTxnWriter writeSortedContents(File sstableDirectory) { boolean isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SystemKeyspace.NAME); logger.debug("Writing {}", Memtable.this.toString()); - Collection<SSTableReader> ssTables; - try (SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get())) + SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get()); + boolean trackContention = logger.isTraceEnabled(); + int heavilyContendedRowCount = 0; + // (we can't clear out the map as-we-go to free up memory, + // since the memtable is being used for queries in the "pending flush" category) + for (AtomicBTreePartition partition : partitions.values()) { - boolean trackContention = logger.isTraceEnabled(); - int heavilyContendedRowCount = 0; - // (we can't clear out the map as-we-go to free up memory, - // since the memtable is being used for queries in the "pending flush" category) - for (AtomicBTreePartition partition : partitions.values()) + // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2 + // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local, + // we don't need to preserve tombstones for repair. So if both operation are in this + // memtable (which will almost always be the case if there is no ongoing failure), we can + // just skip the entry (CASSANDRA-4667). + if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows()) + continue; + + if (trackContention && partition.usePessimisticLocking()) + heavilyContendedRowCount++; + + if (!partition.isEmpty()) { - // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2 - // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local, - // we don't need to preserve tombstones for repair. So if both operation are in this - // memtable (which will almost always be the case if there is no ongoing failure), we can - // just skip the entry (CASSANDRA-4667). - if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows()) - continue; - - if (trackContention && partition.usePessimisticLocking()) - heavilyContendedRowCount++; - - if (!partition.isEmpty()) + try (UnfilteredRowIterator iter = partition.unfilteredIterator()) { - try (UnfilteredRowIterator iter = partition.unfilteredIterator()) - { - writer.append(iter); - } + writer.append(iter); } } + } - if (writer.getFilePointer() > 0) - { - logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s", - writer.getFilename(), - FBUtilities.prettyPrintMemory(writer.getFilePointer()), - commitLogUpperBound)); - - // sstables should contain non-repaired data. - ssTables = writer.finish(true); - } - else - { - logger.debug("Completed flushing {}; nothing needed to be retained. Commitlog position was {}", - writer.getFilename(), commitLogUpperBound); - writer.abort(); - ssTables = Collections.emptyList(); - } + if (writer.getFilePointer() > 0) + logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s", + writer.getFilename(), + FBUtilities.prettyPrintMemory(writer.getFilePointer()), + commitLogUpperBound)); + else + logger.debug("Completed flushing {}; nothing needed to be retained. Commitlog position was {}", + writer.getFilename(), commitLogUpperBound); - if (heavilyContendedRowCount > 0) - logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString())); + if (heavilyContendedRowCount > 0) + logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString())); - return ssTables; - } + return writer; } @SuppressWarnings("resource") // log and writer closed by SSTableTxnWriter http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f90e55e/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java new file mode 100644 index 0000000..1285392 --- /dev/null +++ b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.cql3.validation.miscellaneous; + +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import org.junit.Test; + +import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.index.StubIndex; +import org.apache.cassandra.schema.IndexMetadata; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class ColumnFamilyStoreTest extends CQLTester +{ + @Test + public void testFailing2iFlush() throws Throwable + { + createTable("CREATE TABLE %s (pk int PRIMARY KEY, value int)"); + createIndex("CREATE CUSTOM INDEX IF NOT EXISTS ON %s(value) USING 'org.apache.cassandra.cql3.validation.miscellaneous.ColumnFamilyStoreTest$BrokenCustom2I'"); + + for (int i = 0; i < 10; i++) + execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, i); + + try + { + getCurrentColumnFamilyStore().forceBlockingFlush(); + } + catch (Throwable t) + { + // ignore + } + + // Make sure there's no flush running + waitFor(() -> ((JMXEnabledThreadPoolExecutor) ColumnFamilyStore.flushExecutor).getActiveCount() == 0, + TimeUnit.SECONDS.toMillis(5)); + + // SSTables remain uncommitted. + assertEquals(1, getCurrentColumnFamilyStore().getDirectories().getDirectoryForNewSSTables().listFiles().length); + } + + public void waitFor(Supplier<Boolean> condition, long timeout) + { + long start = System.currentTimeMillis(); + while(true) + { + if (condition.get()) + return; + + assertTrue("Timeout ocurred while waiting for condition", + System.currentTimeMillis() - start < timeout); + } + } + + // Used for index creation above + public static class BrokenCustom2I extends StubIndex + { + public BrokenCustom2I(ColumnFamilyStore baseCfs, IndexMetadata metadata) + { + super(baseCfs, metadata); + } + + public Callable<?> getBlockingFlushTask() + { + throw new RuntimeException(); + } + } +}