Merge branch 'cassandra-3.11' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/af2c7845 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/af2c7845 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/af2c7845 Branch: refs/heads/trunk Commit: af2c78459ad69b3263620fbb5da484f1186133d1 Parents: 2b507c0 7d26815 Author: Blake Eggleston <bdeggles...@gmail.com> Authored: Thu Oct 26 13:37:50 2017 -0700 Committer: Blake Eggleston <bdeggles...@gmail.com> Committed: Thu Oct 26 13:38:18 2017 -0700 ---------------------------------------------------------------------- .../CommitLogSegmentBackpressureTest.java | 146 ++++++++++++++++ .../commitlog/CommitLogSegmentManagerTest.java | 173 ------------------- .../db/commitlog/CommitlogShutdownTest.java | 99 +++++++++++ 3 files changed, 245 insertions(+), 173 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/af2c7845/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java index 0000000,3956de5..f4a63a9 mode 000000,100644..100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java @@@ -1,0 -1,145 +1,146 @@@ + /* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + package org.apache.cassandra.db.commitlog; + + import java.nio.ByteBuffer; + import java.util.ArrayList; + import java.util.Random; + import java.util.concurrent.Semaphore; + + import com.google.common.collect.ImmutableMap; + + import org.junit.Assert; + import org.junit.Test; + import org.junit.runner.RunWith; + + import org.apache.cassandra.SchemaLoader; + import org.apache.cassandra.Util; + import org.apache.cassandra.config.Config.CommitLogSync; + import org.apache.cassandra.config.DatabaseDescriptor; + import org.apache.cassandra.config.ParameterizedClass; + import org.apache.cassandra.db.ColumnFamilyStore; + import org.apache.cassandra.db.Keyspace; + import org.apache.cassandra.db.Mutation; + import org.apache.cassandra.db.RowUpdateBuilder; + import org.apache.cassandra.db.compaction.CompactionManager; + import org.apache.cassandra.db.marshal.AsciiType; + import org.apache.cassandra.db.marshal.BytesType; + import org.apache.cassandra.schema.KeyspaceParams; ++import org.apache.cassandra.schema.TableId; + import org.jboss.byteman.contrib.bmunit.BMRule; + import org.jboss.byteman.contrib.bmunit.BMRules; + import org.jboss.byteman.contrib.bmunit.BMUnitRunner; + + /** + * Since this test depends on byteman rules being setup during initialization, you shouldn't add tests to this class + */ + @RunWith(BMUnitRunner.class) + public class CommitLogSegmentBackpressureTest + { + //Block commit log service from syncing + private static final Semaphore allowSync = new Semaphore(1); + + private static final String KEYSPACE1 = "CommitLogTest"; + private static final String STANDARD1 = "Standard1"; + private static final String STANDARD2 = "Standard2"; + + private final static byte[] entropy = new byte[1024 * 256]; + + @Test + @BMRules(rules = {@BMRule(name = "Acquire Semaphore before sync", + targetClass = "AbstractCommitLogService$1", + targetMethod = "run", + targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync", + action = "org.apache.cassandra.db.commitlog.CommitLogSegmentBackpressureTest.allowSync.acquire()"), + @BMRule(name = "Release Semaphore after sync", + targetClass = "AbstractCommitLogService$1", + targetMethod = "run", + targetLocation = "AFTER INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync", + action = "org.apache.cassandra.db.commitlog.CommitLogSegmentBackpressureTest.allowSync.release()")}) + public void testCompressedCommitLogBackpressure() throws Throwable + { + // Perform all initialization before making CommitLog.Sync blocking + // Doing the initialization within the method guarantee that Byteman has performed its injections when we start + new Random().nextBytes(entropy); + DatabaseDescriptor.daemonInitialization(); + DatabaseDescriptor.setCommitLogCompression(new ParameterizedClass("LZ4Compressor", ImmutableMap.of())); + DatabaseDescriptor.setCommitLogSegmentSize(1); + DatabaseDescriptor.setCommitLogSync(CommitLogSync.periodic); + DatabaseDescriptor.setCommitLogSyncPeriod(10 * 1000); + DatabaseDescriptor.setCommitLogMaxCompressionBuffersPerPool(3); + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE1, + KeyspaceParams.simple(1), + SchemaLoader.standardCFMD(KEYSPACE1, STANDARD1, 0, AsciiType.instance, BytesType.instance), + SchemaLoader.standardCFMD(KEYSPACE1, STANDARD2, 0, AsciiType.instance, BytesType.instance)); + + CompactionManager.instance.disableAutoCompaction(); + + ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1); + - final Mutation m = new RowUpdateBuilder(cfs1.metadata, 0, "k").clustering("bytes") ++ final Mutation m = new RowUpdateBuilder(cfs1.metadata(), 0, "k").clustering("bytes") + .add("val", ByteBuffer.wrap(entropy)) + .build(); + + Thread dummyThread = new Thread(() -> { + for (int i = 0; i < 20; i++) + CommitLog.instance.add(m); + }); + + try + { + // Makes sure any call to CommitLog.sync is blocking + allowSync.acquire(); + + dummyThread.start(); + + AbstractCommitLogSegmentManager clsm = CommitLog.instance.segmentManager; + + Util.spinAssertEquals(3, () -> clsm.getActiveSegments().size(), 5); + + Thread.sleep(1000); + + // Should only be able to create 3 segments not 7 because it blocks waiting for truncation that never comes + Assert.assertEquals(3, clsm.getActiveSegments().size()); + + // Discard the currently active segments so allocation can continue. + // Take snapshot of the list, otherwise this will also discard newly allocated segments. + new ArrayList<>(clsm.getActiveSegments()).forEach( clsm::archiveAndDiscard ); + + // The allocated count should reach the limit again. + Util.spinAssertEquals(3, () -> clsm.getActiveSegments().size(), 5); + } + finally + { + // Allow the CommitLog.sync to perform normally. + allowSync.release(); + } + try + { + // Wait for the dummy thread to die + dummyThread.join(); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + } + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/af2c7845/test/unit/org/apache/cassandra/db/commitlog/CommitlogShutdownTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitlogShutdownTest.java index 0000000,91a3f02..711cf65 mode 000000,100644..100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitlogShutdownTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitlogShutdownTest.java @@@ -1,0 -1,99 +1,99 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.cassandra.db.commitlog; + + import java.io.File; + import java.nio.ByteBuffer; + import java.util.Random; + import java.util.UUID; + + import com.google.common.collect.ImmutableMap; + import org.junit.Assert; + import org.junit.Test; + import org.junit.runner.RunWith; + + import org.apache.cassandra.SchemaLoader; + import org.apache.cassandra.config.Config; + import org.apache.cassandra.config.DatabaseDescriptor; + import org.apache.cassandra.config.ParameterizedClass; + import org.apache.cassandra.db.ColumnFamilyStore; + import org.apache.cassandra.db.Keyspace; + import org.apache.cassandra.db.Mutation; + import org.apache.cassandra.db.RowUpdateBuilder; + import org.apache.cassandra.db.compaction.CompactionManager; + import org.apache.cassandra.db.marshal.AsciiType; + import org.apache.cassandra.db.marshal.BytesType; + import org.apache.cassandra.schema.KeyspaceParams; ++import org.apache.cassandra.schema.TableId; + import org.jboss.byteman.contrib.bmunit.BMRule; + import org.jboss.byteman.contrib.bmunit.BMUnitRunner; + + /** + * Since this test depends on byteman rules being setup during initialization, you shouldn't add tests to this class + */ + @RunWith(BMUnitRunner.class) + public class CommitlogShutdownTest + { + private static final String KEYSPACE1 = "CommitLogTest"; + private static final String STANDARD1 = "Standard1"; + + private final static byte[] entropy = new byte[1024 * 256]; + + @Test + @BMRule(name = "Make removing commitlog segments slow", + targetClass = "CommitLogSegment", + targetMethod = "discard", + action = "Thread.sleep(50)") + public void testShutdownWithPendingTasks() throws Exception + { + new Random().nextBytes(entropy); + DatabaseDescriptor.daemonInitialization(); + DatabaseDescriptor.setCommitLogCompression(new ParameterizedClass("LZ4Compressor", ImmutableMap.of())); + DatabaseDescriptor.setCommitLogSegmentSize(1); + DatabaseDescriptor.setCommitLogSync(Config.CommitLogSync.periodic); + DatabaseDescriptor.setCommitLogSyncPeriod(10 * 1000); + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE1, + KeyspaceParams.simple(1), + SchemaLoader.standardCFMD(KEYSPACE1, STANDARD1, 0, AsciiType.instance, BytesType.instance)); + + CompactionManager.instance.disableAutoCompaction(); + - CommitLog.instance.resetUnsafe(true); + ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1); + - final Mutation m = new RowUpdateBuilder(cfs1.metadata, 0, "k") ++ final Mutation m = new RowUpdateBuilder(cfs1.metadata.get(), 0, "k") + .clustering("bytes") + .add("val", ByteBuffer.wrap(entropy)) + .build(); + + // force creating several commitlog files + for (int i = 0; i < 10; i++) + { + CommitLog.instance.add(m); + } + + // schedule discarding completed segments and immediately issue a shutdown - UUID cfid = m.getColumnFamilyIds().iterator().next(); - CommitLog.instance.discardCompletedSegments(cfid, CommitLogPosition.NONE, CommitLog.instance.getCurrentPosition()); ++ TableId tableId = m.getTableIds().iterator().next(); ++ CommitLog.instance.discardCompletedSegments(tableId, CommitLogPosition.NONE, CommitLog.instance.getCurrentPosition()); + CommitLog.instance.shutdownBlocking(); + + // the shutdown should block until all logs except the currently active one and perhaps a new, empty one are gone + Assert.assertTrue(new File(DatabaseDescriptor.getCommitLogLocation()).listFiles().length <= 2); + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org