Repository: cassandra Updated Branches: refs/heads/cassandra-3.X a345a810f -> 699fa9a9d
Fix CommitLogSegmentManagerTest patch by Benjamin Lerer; reviewed by Joshua McKenzie for CASSANDRA-12283 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/77eee181 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/77eee181 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/77eee181 Branch: refs/heads/cassandra-3.X Commit: 77eee181c5627446a86c10b9ee996806f0dceaa1 Parents: 83639d4 Author: Benjamin Lerer <[email protected]> Authored: Thu Nov 10 21:29:06 2016 +0100 Committer: Benjamin Lerer <[email protected]> Committed: Thu Nov 10 21:29:06 2016 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../commitlog/CommitLogSegmentManagerTest.java | 97 ++++++++++++-------- 2 files changed, 58 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/77eee181/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 9598546..7fd5101 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.11 + * Fix CommitLogSegmentManagerTest (CASSANDRA-12283) * Pass root cause to CorruptBlockException when uncompression failed (CASSANDRA-12889) * Fix partition count log during compaction (CASSANDRA-12184) http://git-wip-us.apache.org/repos/asf/cassandra/blob/77eee181/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java index 6a4aace..a9b0669 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java @@ -24,7 +24,11 @@ import java.nio.ByteBuffer; import java.util.Random; import java.util.concurrent.Semaphore; -import javax.naming.ConfigurationException; +import com.google.common.collect.ImmutableMap; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; @@ -40,28 +44,36 @@ import org.apache.cassandra.db.marshal.AsciiType; import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.schema.KeyspaceParams; import org.jboss.byteman.contrib.bmunit.BMRule; +import org.jboss.byteman.contrib.bmunit.BMRules; import org.jboss.byteman.contrib.bmunit.BMUnitRunner; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.runner.RunWith; - -import com.google.common.collect.ImmutableMap; @RunWith(BMUnitRunner.class) public class CommitLogSegmentManagerTest { //Block commit log service from syncing - private static final Semaphore allowSync = new Semaphore(0); + private static final Semaphore allowSync = new Semaphore(1); private static final String KEYSPACE1 = "CommitLogTest"; private static final String STANDARD1 = "Standard1"; private static final String STANDARD2 = "Standard2"; private final static byte[] entropy = new byte[1024 * 256]; - @BeforeClass - public static void defineSchema() throws ConfigurationException + + @Test + @BMRules(rules = {@BMRule(name = "Acquire Semaphore before sync", + targetClass = "AbstractCommitLogService$1", + targetMethod = "run", + targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync", + action = "org.apache.cassandra.db.commitlog.CommitLogSegmentManagerTest.allowSync.acquire()"), + @BMRule(name = "Release Semaphore after sync", + targetClass = "AbstractCommitLogService$1", + targetMethod = "run", + targetLocation = "AFTER INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync", + action = "org.apache.cassandra.db.commitlog.CommitLogSegmentManagerTest.allowSync.release()")}) + public void testCompressedCommitLogBackpressure() throws Throwable { + // Perform all initialization before making CommitLog.Sync blocking + // Doing the initialization within the method guarantee that Byteman has performed its injections when we start new Random().nextBytes(entropy); DatabaseDescriptor.setCommitLogCompression(new ParameterizedClass("LZ4Compressor", ImmutableMap.of())); DatabaseDescriptor.setCommitLogSegmentSize(1); @@ -74,47 +86,52 @@ public class CommitLogSegmentManagerTest SchemaLoader.standardCFMD(KEYSPACE1, STANDARD2, 0, AsciiType.instance, BytesType.instance)); CompactionManager.instance.disableAutoCompaction(); - } - @Test - @BMRule(name = "Block AbstractCommitLogSegment segment flushing", - targetClass = "AbstractCommitLogService$1", - targetMethod = "run", - targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync", - action = "org.apache.cassandra.db.commitlog.CommitLogSegmentManagerTest.allowSync.acquire()") - public void testCompressedCommitLogBackpressure() throws Throwable - { - CommitLog.instance.resetUnsafe(true); ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1); - final Mutation m = new RowUpdateBuilder(cfs1.metadata, 0, "k") - .clustering("bytes") - .add("val", ByteBuffer.wrap(entropy)) - .build(); + final Mutation m = new RowUpdateBuilder(cfs1.metadata, 0, "k").clustering("bytes") + .add("val", ByteBuffer.wrap(entropy)) + .build(); - Thread dummyThread = new Thread( () -> - { + Thread dummyThread = new Thread(() -> { for (int i = 0; i < 20; i++) CommitLog.instance.add(m); }); - dummyThread.start(); - CommitLogSegmentManager clsm = CommitLog.instance.allocator; - - //Protect against delay, but still break out as fast as possible - long start = System.currentTimeMillis(); - while (System.currentTimeMillis() - start < 5000) + try { - if (clsm.getActiveSegments().size() >= 3) - break; - } - Thread.sleep(1000); + // Makes sure any call to CommitLog.sync is blocking + allowSync.acquire(); + + dummyThread.start(); + + CommitLogSegmentManager clsm = CommitLog.instance.allocator; + + Util.spinAssertEquals(3, () -> clsm.getActiveSegments().size(), 5); + + Thread.sleep(1000); - //Should only be able to create 3 segments not 7 because it blocks waiting for truncation that never comes - Assert.assertEquals(3, clsm.getActiveSegments().size()); + // Should only be able to create 3 segments (not 7) because it blocks waiting for truncation that never + // comes. + Assert.assertEquals(3, clsm.getActiveSegments().size()); - clsm.getActiveSegments().forEach( segment -> clsm.recycleSegment(segment)); + clsm.getActiveSegments().forEach(segment -> clsm.recycleSegment(segment)); - Util.spinAssertEquals(3, () -> clsm.getActiveSegments().size(), 5); + Util.spinAssertEquals(3, () -> clsm.getActiveSegments().size(), 5); + } + finally + { + // Allow the CommitLog.sync to perform normally. + allowSync.release(); + } + try + { + // Wait for the dummy thread to die + dummyThread.join(); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + } } }
