Merge branch 'cassandra-3.11' into trunk

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/af2c7845
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/af2c7845
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/af2c7845

Branch: refs/heads/trunk
Commit: af2c78459ad69b3263620fbb5da484f1186133d1
Parents: 2b507c0 7d26815
Author: Blake Eggleston <bdeggles...@gmail.com>
Authored: Thu Oct 26 13:37:50 2017 -0700
Committer: Blake Eggleston <bdeggles...@gmail.com>
Committed: Thu Oct 26 13:38:18 2017 -0700

----------------------------------------------------------------------
 .../CommitLogSegmentBackpressureTest.java       | 146 ++++++++++++++++
 .../commitlog/CommitLogSegmentManagerTest.java  | 173 -------------------
 .../db/commitlog/CommitlogShutdownTest.java     |  99 +++++++++++
 3 files changed, 245 insertions(+), 173 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/af2c7845/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
----------------------------------------------------------------------
diff --cc 
test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
index 0000000,3956de5..f4a63a9
mode 000000,100644..100644
--- 
a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
+++ 
b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
@@@ -1,0 -1,145 +1,146 @@@
+ /*
+  *
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  *
+  */
+ package org.apache.cassandra.db.commitlog;
+ 
+ import java.nio.ByteBuffer;
+ import java.util.ArrayList;
+ import java.util.Random;
+ import java.util.concurrent.Semaphore;
+ 
+ import com.google.common.collect.ImmutableMap;
+ 
+ import org.junit.Assert;
+ import org.junit.Test;
+ import org.junit.runner.RunWith;
+ 
+ import org.apache.cassandra.SchemaLoader;
+ import org.apache.cassandra.Util;
+ import org.apache.cassandra.config.Config.CommitLogSync;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.config.ParameterizedClass;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.Mutation;
+ import org.apache.cassandra.db.RowUpdateBuilder;
+ import org.apache.cassandra.db.compaction.CompactionManager;
+ import org.apache.cassandra.db.marshal.AsciiType;
+ import org.apache.cassandra.db.marshal.BytesType;
+ import org.apache.cassandra.schema.KeyspaceParams;
++import org.apache.cassandra.schema.TableId;
+ import org.jboss.byteman.contrib.bmunit.BMRule;
+ import org.jboss.byteman.contrib.bmunit.BMRules;
+ import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+ 
+ /**
+  * Since this test depends on byteman rules being setup during 
initialization, you shouldn't add tests to this class
+  */
+ @RunWith(BMUnitRunner.class)
+ public class CommitLogSegmentBackpressureTest
+ {
+     //Block commit log service from syncing
+     private static final Semaphore allowSync = new Semaphore(1);
+ 
+     private static final String KEYSPACE1 = "CommitLogTest";
+     private static final String STANDARD1 = "Standard1";
+     private static final String STANDARD2 = "Standard2";
+ 
+     private final static byte[] entropy = new byte[1024 * 256];
+ 
+     @Test
+     @BMRules(rules = {@BMRule(name = "Acquire Semaphore before sync",
+                               targetClass = "AbstractCommitLogService$1",
+                               targetMethod = "run",
+                               targetLocation = "AT INVOKE 
org.apache.cassandra.db.commitlog.CommitLog.sync",
+                               action = 
"org.apache.cassandra.db.commitlog.CommitLogSegmentBackpressureTest.allowSync.acquire()"),
+                       @BMRule(name = "Release Semaphore after sync",
+                               targetClass = "AbstractCommitLogService$1",
+                               targetMethod = "run",
+                               targetLocation = "AFTER INVOKE 
org.apache.cassandra.db.commitlog.CommitLog.sync",
+                               action = 
"org.apache.cassandra.db.commitlog.CommitLogSegmentBackpressureTest.allowSync.release()")})
+     public void testCompressedCommitLogBackpressure() throws Throwable
+     {
+         // Perform all initialization before making CommitLog.Sync blocking
+         // Doing the initialization within the method guarantee that Byteman 
has performed its injections when we start
+         new Random().nextBytes(entropy);
+         DatabaseDescriptor.daemonInitialization();
+         DatabaseDescriptor.setCommitLogCompression(new 
ParameterizedClass("LZ4Compressor", ImmutableMap.of()));
+         DatabaseDescriptor.setCommitLogSegmentSize(1);
+         DatabaseDescriptor.setCommitLogSync(CommitLogSync.periodic);
+         DatabaseDescriptor.setCommitLogSyncPeriod(10 * 1000);
+         DatabaseDescriptor.setCommitLogMaxCompressionBuffersPerPool(3);
+         SchemaLoader.prepareServer();
+         SchemaLoader.createKeyspace(KEYSPACE1,
+                                     KeyspaceParams.simple(1),
+                                     SchemaLoader.standardCFMD(KEYSPACE1, 
STANDARD1, 0, AsciiType.instance, BytesType.instance),
+                                     SchemaLoader.standardCFMD(KEYSPACE1, 
STANDARD2, 0, AsciiType.instance, BytesType.instance));
+ 
+         CompactionManager.instance.disableAutoCompaction();
+ 
+         ColumnFamilyStore cfs1 = 
Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+ 
 -        final Mutation m = new RowUpdateBuilder(cfs1.metadata, 0, 
"k").clustering("bytes")
++        final Mutation m = new RowUpdateBuilder(cfs1.metadata(), 0, 
"k").clustering("bytes")
+                                                                       
.add("val", ByteBuffer.wrap(entropy))
+                                                                       
.build();
+ 
+         Thread dummyThread = new Thread(() -> {
+             for (int i = 0; i < 20; i++)
+                 CommitLog.instance.add(m);
+         });
+ 
+         try
+         {
+             // Makes sure any call to CommitLog.sync is blocking
+             allowSync.acquire();
+ 
+             dummyThread.start();
+ 
+             AbstractCommitLogSegmentManager clsm = 
CommitLog.instance.segmentManager;
+ 
+             Util.spinAssertEquals(3, () -> clsm.getActiveSegments().size(), 
5);
+ 
+             Thread.sleep(1000);
+ 
+             // Should only be able to create 3 segments not 7 because it 
blocks waiting for truncation that never comes
+             Assert.assertEquals(3, clsm.getActiveSegments().size());
+ 
+             // Discard the currently active segments so allocation can 
continue.
+             // Take snapshot of the list, otherwise this will also discard 
newly allocated segments.
+             new ArrayList<>(clsm.getActiveSegments()).forEach( 
clsm::archiveAndDiscard );
+ 
+             // The allocated count should reach the limit again.
+             Util.spinAssertEquals(3, () -> clsm.getActiveSegments().size(), 
5);
+         }
+         finally
+         {
+             // Allow the CommitLog.sync to perform normally.
+             allowSync.release();
+         }
+         try
+         {
+             // Wait for the dummy thread to die
+             dummyThread.join();
+         }
+         catch (InterruptedException e)
+         {
+             Thread.currentThread().interrupt();
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af2c7845/test/unit/org/apache/cassandra/db/commitlog/CommitlogShutdownTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitlogShutdownTest.java
index 0000000,91a3f02..711cf65
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitlogShutdownTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitlogShutdownTest.java
@@@ -1,0 -1,99 +1,99 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.db.commitlog;
+ 
+ import java.io.File;
+ import java.nio.ByteBuffer;
+ import java.util.Random;
+ import java.util.UUID;
+ 
+ import com.google.common.collect.ImmutableMap;
+ import org.junit.Assert;
+ import org.junit.Test;
+ import org.junit.runner.RunWith;
+ 
+ import org.apache.cassandra.SchemaLoader;
+ import org.apache.cassandra.config.Config;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.config.ParameterizedClass;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.Mutation;
+ import org.apache.cassandra.db.RowUpdateBuilder;
+ import org.apache.cassandra.db.compaction.CompactionManager;
+ import org.apache.cassandra.db.marshal.AsciiType;
+ import org.apache.cassandra.db.marshal.BytesType;
+ import org.apache.cassandra.schema.KeyspaceParams;
++import org.apache.cassandra.schema.TableId;
+ import org.jboss.byteman.contrib.bmunit.BMRule;
+ import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+ 
+ /**
+  * Since this test depends on byteman rules being setup during 
initialization, you shouldn't add tests to this class
+  */
+ @RunWith(BMUnitRunner.class)
+ public class CommitlogShutdownTest
+ {
+     private static final String KEYSPACE1 = "CommitLogTest";
+     private static final String STANDARD1 = "Standard1";
+ 
+     private final static byte[] entropy = new byte[1024 * 256];
+ 
+     @Test
+     @BMRule(name = "Make removing commitlog segments slow",
+     targetClass = "CommitLogSegment",
+     targetMethod = "discard",
+     action = "Thread.sleep(50)")
+     public void testShutdownWithPendingTasks() throws Exception
+     {
+         new Random().nextBytes(entropy);
+         DatabaseDescriptor.daemonInitialization();
+         DatabaseDescriptor.setCommitLogCompression(new 
ParameterizedClass("LZ4Compressor", ImmutableMap.of()));
+         DatabaseDescriptor.setCommitLogSegmentSize(1);
+         DatabaseDescriptor.setCommitLogSync(Config.CommitLogSync.periodic);
+         DatabaseDescriptor.setCommitLogSyncPeriod(10 * 1000);
+         SchemaLoader.prepareServer();
+         SchemaLoader.createKeyspace(KEYSPACE1,
+                                     KeyspaceParams.simple(1),
+                                     SchemaLoader.standardCFMD(KEYSPACE1, 
STANDARD1, 0, AsciiType.instance, BytesType.instance));
+ 
+                                     
CompactionManager.instance.disableAutoCompaction();
+ 
 -        CommitLog.instance.resetUnsafe(true);
+         ColumnFamilyStore cfs1 = 
Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+ 
 -        final Mutation m = new RowUpdateBuilder(cfs1.metadata, 0, "k")
++        final Mutation m = new RowUpdateBuilder(cfs1.metadata.get(), 0, "k")
+                            .clustering("bytes")
+                            .add("val", ByteBuffer.wrap(entropy))
+                            .build();
+ 
+         // force creating several commitlog files
+         for (int i = 0; i < 10; i++)
+         {
+             CommitLog.instance.add(m);
+         }
+ 
+         // schedule discarding completed segments and immediately issue a 
shutdown
 -        UUID cfid = m.getColumnFamilyIds().iterator().next();
 -        CommitLog.instance.discardCompletedSegments(cfid, 
CommitLogPosition.NONE, CommitLog.instance.getCurrentPosition());
++        TableId tableId = m.getTableIds().iterator().next();
++        CommitLog.instance.discardCompletedSegments(tableId, 
CommitLogPosition.NONE, CommitLog.instance.getCurrentPosition());
+         CommitLog.instance.shutdownBlocking();
+ 
+         // the shutdown should block until all logs except the currently 
active one and perhaps a new, empty one are gone
+         Assert.assertTrue(new 
File(DatabaseDescriptor.getCommitLogLocation()).listFiles().length <= 2);
+     }
+ }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to