Repository: cassandra Updated Branches: refs/heads/cassandra-3.5 4651ac735 -> ee40e3b45
Add backpressure to compressed or encrypted commit log patch by Ariel Weisberg; reviewed by Benjamin Lerer for CASSANDRA-10971 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ee40e3b4 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ee40e3b4 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ee40e3b4 Branch: refs/heads/cassandra-3.5 Commit: ee40e3b4529aa77d4d83fc3e7073902402cb3753 Parents: 4651ac7 Author: Ariel Weisberg <[email protected]> Authored: Wed Mar 16 18:20:29 2016 +0100 Committer: Benjamin Lerer <[email protected]> Committed: Wed Mar 16 18:20:29 2016 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/commitlog/AbstractCommitLogService.java | 1 + .../db/commitlog/CommitLogSegment.java | 23 +++-- .../db/commitlog/CommitLogSegmentManager.java | 11 +- .../db/commitlog/CompressedSegment.java | 4 +- .../db/commitlog/EncryptedSegment.java | 4 +- .../db/commitlog/FileDirectSegment.java | 42 ++++++-- .../commitlog/CommitLogSegmentManagerTest.java | 100 ++++++++++++++++++ .../cassandra/db/commitlog/CommitLogTest.java | 101 ++++++++++--------- 9 files changed, 220 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee40e3b4/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 1ff4e6d..ffd0b24 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 3.5 Merged from 3.0: + * Add backpressure to compressed or encrypted commit log (CASSANDRA-10971) * SSTableExport supports secondary index tables (CASSANDRA-11330) * Fix sstabledump to include missing info in debug output (CASSANDRA-11321) * Establish and implement canonical bulk reading workload(s) (CASSANDRA-10331) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee40e3b4/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java index 557bf50..113d1ba 100644 --- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java +++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java @@ -89,6 +89,7 @@ public abstract class AbstractCommitLogService // sync and signal long syncStarted = System.currentTimeMillis(); + //This is a target for Byteman in CommitLogSegmentManagerTest commitLog.sync(shutdown); lastSyncedAt = syncStarted; syncComplete.signalAll(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee40e3b4/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java index 5e99a07..f2d8f92 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java @@ -119,15 +119,26 @@ public abstract class CommitLogSegment final CommitLog commitLog; public final CommitLogDescriptor descriptor; - static CommitLogSegment createSegment(CommitLog commitLog) + static CommitLogSegment createSegment(CommitLog commitLog, Runnable onClose) { - CommitLogSegment segment = commitLog.encryptionContext.isEnabled() ? new EncryptedSegment(commitLog, commitLog.encryptionContext) : - commitLog.compressor != null ? new CompressedSegment(commitLog) : - new MemoryMappedSegment(commitLog); + CommitLogSegment segment = commitLog.encryptionContext.isEnabled() ? new EncryptedSegment(commitLog, commitLog.encryptionContext, onClose) : + commitLog.compressor != null ? new CompressedSegment(commitLog, onClose) : + new MemoryMappedSegment(commitLog); segment.writeLogHeader(); return segment; } + /** + * Checks if the segments use a buffer pool. + * + * @param commitLog the commit log + * @return <code>true</code> if the segments use a buffer pool, <code>false</code> otherwise. + */ + static boolean usesBufferPool(CommitLog commitLog) + { + return commitLog.encryptionContext.isEnabled() || commitLog.compressor != null; + } + static long getNextId() { return idBase + nextId.getAndIncrement(); @@ -152,7 +163,7 @@ public abstract class CommitLogSegment { throw new FSWriteError(e, logFile); } - + buffer = createBuffer(commitLog); } @@ -276,7 +287,7 @@ public abstract class CommitLogSegment // Note: Even if the very first allocation of this sync section failed, we still want to enter this // to ensure the segment is closed. As allocatePosition is set to 1 beyond the capacity of the buffer, // this will always be entered when a mutation allocation has been attempted after the marker allocation - // succeeded in the previous sync. + // succeeded in the previous sync. assert buffer != null; // Only close once. int startMarker = lastSyncedOffset; http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee40e3b4/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java index acc93c9..c4357bd 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java @@ -114,11 +114,11 @@ public class CommitLogSegmentManager if (task == null) { // if we have no more work to do, check if we should create a new segment - if (availableSegments.isEmpty() && (activeSegments.isEmpty() || createReserveSegments)) + if (!atSegmentLimit() && availableSegments.isEmpty() && (activeSegments.isEmpty() || createReserveSegments)) { logger.trace("No segments in reserve; creating a fresh one"); // TODO : some error handling in case we fail to create a new segment - availableSegments.add(CommitLogSegment.createSegment(commitLog)); + availableSegments.add(CommitLogSegment.createSegment(commitLog, () -> wakeManager())); hasAvailableSegments.signalAll(); } @@ -163,6 +163,12 @@ public class CommitLogSegmentManager } } } + + private boolean atSegmentLimit() + { + return CommitLogSegment.usesBufferPool(commitLog) && CompressedSegment.hasReachedPoolLimit(); + } + }; run = true; @@ -553,5 +559,6 @@ public class CommitLogSegmentManager { return Collections.unmodifiableCollection(activeSegments); } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee40e3b4/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java index 6b25ab7..573428a 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java @@ -44,9 +44,9 @@ public class CompressedSegment extends FileDirectSegment /** * Constructs a new segment file. */ - CompressedSegment(CommitLog commitLog) + CompressedSegment(CommitLog commitLog, Runnable onClose) { - super(commitLog); + super(commitLog, onClose); this.compressor = commitLog.compressor; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee40e3b4/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java b/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java index 46969ac..731dea4 100644 --- a/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java @@ -65,9 +65,9 @@ public class EncryptedSegment extends FileDirectSegment private final EncryptionContext encryptionContext; private final Cipher cipher; - public EncryptedSegment(CommitLog commitLog, EncryptionContext encryptionContext) + public EncryptedSegment(CommitLog commitLog, EncryptionContext encryptionContext, Runnable onClose) { - super(commitLog); + super(commitLog, onClose); this.encryptionContext = encryptionContext; try http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee40e3b4/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java b/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java index 75a7fc0..ec4aa91 100644 --- a/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java @@ -21,11 +21,11 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.compress.BufferType; -import org.apache.cassandra.io.compress.ICompressor; import org.apache.cassandra.io.util.FileUtils; /** @@ -51,11 +51,19 @@ public abstract class FileDirectSegment extends CommitLogSegment */ static final int MAX_BUFFERPOOL_SIZE = DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool(); + /** + * The number of buffers in use + */ + private static AtomicInteger usedBuffers = new AtomicInteger(0); + volatile long lastWrittenPos = 0; - FileDirectSegment(CommitLog commitLog) + private final Runnable onClose; + + FileDirectSegment(CommitLog commitLog, Runnable onClose) { super(commitLog); + this.onClose = onClose; } void writeLogHeader() @@ -74,6 +82,7 @@ public abstract class FileDirectSegment extends CommitLogSegment ByteBuffer createBuffer(BufferType bufferType) { + usedBuffers.incrementAndGet(); ByteBuffer buf = bufferPool.poll(); if (buf != null) { @@ -87,16 +96,35 @@ public abstract class FileDirectSegment extends CommitLogSegment @Override protected void internalClose() { - if (bufferPool.size() < MAX_BUFFERPOOL_SIZE) - bufferPool.add(buffer); - else - FileUtils.clean(buffer); + usedBuffers.decrementAndGet(); - super.internalClose(); + try + { + if (bufferPool.size() < MAX_BUFFERPOOL_SIZE) + bufferPool.add(buffer); + else + FileUtils.clean(buffer); + super.internalClose(); + } + finally + { + onClose.run(); + } } static void shutdown() { bufferPool.clear(); } + + /** + * Checks if the number of buffers in use is greater or equals to the maximum number of buffers allowed in the pool. + * + * @return <code>true</code> if the number of buffers in use is greater or equals to the maximum number of buffers + * allowed in the pool, <code>false</code> otherwise. + */ + static boolean hasReachedPoolLimit() + { + return usedBuffers.get() >= MAX_BUFFERPOOL_SIZE; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee40e3b4/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java new file mode 100644 index 0000000..b5c2f41 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java @@ -0,0 +1,100 @@ +package org.apache.cassandra.db.commitlog; + +import java.nio.ByteBuffer; +import java.util.Random; +import java.util.concurrent.Semaphore; + +import javax.naming.ConfigurationException; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; +import org.apache.cassandra.config.Config.CommitLogSync; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.ParameterizedClass; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.RowUpdateBuilder; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.marshal.AsciiType; +import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.schema.KeyspaceParams; +import org.jboss.byteman.contrib.bmunit.BMRule; +import org.jboss.byteman.contrib.bmunit.BMUnitRunner; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; + +import com.google.common.collect.ImmutableMap; + +@RunWith(BMUnitRunner.class) +public class CommitLogSegmentManagerTest +{ + //Block commit log service from syncing + private static final Semaphore allowSync = new Semaphore(0); + + private static final String KEYSPACE1 = "CommitLogTest"; + private static final String STANDARD1 = "Standard1"; + private static final String STANDARD2 = "Standard2"; + + private final static byte[] entropy = new byte[1024 * 256]; + @BeforeClass + public static void defineSchema() throws ConfigurationException + { + new Random().nextBytes(entropy); + DatabaseDescriptor.setCommitLogCompression(new ParameterizedClass("LZ4Compressor", ImmutableMap.of())); + DatabaseDescriptor.setCommitLogSegmentSize(1); + DatabaseDescriptor.setCommitLogSync(CommitLogSync.periodic); + DatabaseDescriptor.setCommitLogSyncPeriod(10 * 1000); + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE1, + KeyspaceParams.simple(1), + SchemaLoader.standardCFMD(KEYSPACE1, STANDARD1, 0, AsciiType.instance, BytesType.instance), + SchemaLoader.standardCFMD(KEYSPACE1, STANDARD2, 0, AsciiType.instance, BytesType.instance)); + + CompactionManager.instance.disableAutoCompaction(); + } + + @Test + @BMRule(name = "Block AbstractCommitLogSegment segment flushing", + targetClass = "AbstractCommitLogService$1", + targetMethod = "run", + targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync", + action = "org.apache.cassandra.db.commitlog.CommitLogSegmentManagerTest.allowSync.acquire()") + public void testCompressedCommitLogBackpressure() throws Throwable + { + CommitLog.instance.resetUnsafe(true); + ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1); + + final Mutation m = new RowUpdateBuilder(cfs1.metadata, 0, "k") + .clustering("bytes") + .add("val", ByteBuffer.wrap(entropy)) + .build(); + + Thread dummyThread = new Thread( () -> + { + for (int i = 0; i < 20; i++) + CommitLog.instance.add(m); + }); + dummyThread.start(); + + CommitLogSegmentManager clsm = CommitLog.instance.allocator; + + //Protect against delay, but still break out as fast as possible + long start = System.currentTimeMillis(); + while (System.currentTimeMillis() - start < 5000) + { + if (clsm.getActiveSegments().size() >= 3) + break; + } + Thread.sleep(1000); + + //Should only be able to create 3 segments not 7 because it blocks waiting for truncation that never comes + Assert.assertEquals(3, clsm.getActiveSegments().size()); + + clsm.getActiveSegments().forEach( segment -> clsm.recycleSegment(segment)); + + Util.spinAssertEquals(3, () -> clsm.getActiveSegments().size(), 5); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee40e3b4/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java index 91a25e1..b5cbf8b 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java @@ -18,20 +18,9 @@ */ package org.apache.cassandra.db.commitlog; -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.io.RandomAccessFile; +import java.io.*; import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.UUID; +import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.zip.CRC32; @@ -39,19 +28,17 @@ import java.util.zip.Checksum; import com.google.common.collect.Iterables; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.db.Mutation; -import org.apache.cassandra.db.RowUpdateBuilder; -import org.apache.cassandra.db.marshal.AsciiType; -import org.apache.cassandra.db.marshal.BytesType; +import org.junit.*; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.ParameterizedClass; +import org.apache.cassandra.db.*; import org.apache.cassandra.db.commitlog.CommitLogReplayer.CommitLogReplayException; import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.marshal.AsciiType; +import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.db.rows.SerializationHelper; @@ -61,22 +48,13 @@ import org.apache.cassandra.io.compress.LZ4Compressor; import org.apache.cassandra.io.compress.SnappyCompressor; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.FastByteArrayInputStream; -import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.KeyspaceParams; -import org.apache.cassandra.utils.JVMStabilityInspector; -import org.apache.cassandra.utils.KillerForTests; import org.apache.cassandra.security.EncryptionContext; import org.apache.cassandra.security.EncryptionContextGenerator; -import org.apache.cassandra.utils.Hex; -import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.*; import org.apache.cassandra.utils.vint.VIntCoding; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - import static org.apache.cassandra.utils.ByteBufferUtil.bytes; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -106,7 +84,7 @@ public class CommitLogTest } @Before - public void setup() + public void setup() throws IOException { logDirectory = DatabaseDescriptor.getCommitLogLocation() + "/unit"; new File(logDirectory).mkdirs(); @@ -575,11 +553,22 @@ public class CommitLogTest @Test public void replay_StandardMmapped() throws IOException { - DatabaseDescriptor.setCommitLogCompression(null); - DatabaseDescriptor.setEncryptionContext(EncryptionContextGenerator.createDisabledContext()); - CommitLog commitLog = new CommitLog(logDirectory, CommitLogArchiver.disabled()).start(); - replaySimple(commitLog); - replayWithDiscard(commitLog); + ParameterizedClass originalCompression = DatabaseDescriptor.getCommitLogCompression(); + EncryptionContext originalEncryptionContext = DatabaseDescriptor.getEncryptionContext(); + try + { + DatabaseDescriptor.setCommitLogCompression(null); + DatabaseDescriptor.setEncryptionContext(EncryptionContextGenerator.createDisabledContext()); + CommitLog.instance.resetUnsafe(true); + replaySimple(CommitLog.instance); + replayWithDiscard(CommitLog.instance); + } + finally + { + DatabaseDescriptor.setCommitLogCompression(originalCompression); + DatabaseDescriptor.setEncryptionContext(originalEncryptionContext); + CommitLog.instance.resetUnsafe(true); + } } @Test @@ -602,29 +591,44 @@ public class CommitLogTest private void replay_Compressed(ParameterizedClass parameterizedClass) throws IOException { - DatabaseDescriptor.setCommitLogCompression(parameterizedClass); - DatabaseDescriptor.setEncryptionContext(EncryptionContextGenerator.createDisabledContext()); - CommitLog commitLog = new CommitLog(logDirectory, CommitLogArchiver.disabled()).start(); - replaySimple(commitLog); - replayWithDiscard(commitLog); + ParameterizedClass originalCompression = DatabaseDescriptor.getCommitLogCompression(); + EncryptionContext originalEncryptionContext = DatabaseDescriptor.getEncryptionContext(); + try + { + DatabaseDescriptor.setCommitLogCompression(parameterizedClass); + DatabaseDescriptor.setEncryptionContext(EncryptionContextGenerator.createDisabledContext()); + CommitLog.instance.resetUnsafe(true); + + replaySimple(CommitLog.instance); + replayWithDiscard(CommitLog.instance); + } + finally + { + DatabaseDescriptor.setCommitLogCompression(originalCompression); + DatabaseDescriptor.setEncryptionContext(originalEncryptionContext); + CommitLog.instance.resetUnsafe(true); + } } @Test public void replay_Encrypted() throws IOException { - DatabaseDescriptor.setCommitLogCompression(null); - DatabaseDescriptor.setEncryptionContext(EncryptionContextGenerator.createContext(true)); - CommitLog commitLog = new CommitLog(logDirectory, CommitLogArchiver.disabled()).start(); - + ParameterizedClass originalCompression = DatabaseDescriptor.getCommitLogCompression(); + EncryptionContext originalEncryptionContext = DatabaseDescriptor.getEncryptionContext(); try { - replaySimple(commitLog); - replayWithDiscard(commitLog); + DatabaseDescriptor.setCommitLogCompression(null); + DatabaseDescriptor.setEncryptionContext(EncryptionContextGenerator.createContext(true)); + CommitLog.instance.resetUnsafe(true); + + replaySimple(CommitLog.instance); + replayWithDiscard(CommitLog.instance); } finally { - for (String file : commitLog.getActiveSegmentNames()) - FileUtils.delete(new File(commitLog.location, file)); + DatabaseDescriptor.setCommitLogCompression(originalCompression); + DatabaseDescriptor.setEncryptionContext(originalEncryptionContext); + CommitLog.instance.resetUnsafe(true); } } @@ -706,6 +710,7 @@ public class CommitLogTest this.filterPosition = filterPosition; } + @SuppressWarnings("resource") void replayMutation(byte[] inputBuffer, int size, final long entryLocation, final CommitLogDescriptor desc) throws IOException { if (entryLocation <= filterPosition.position)
