This is an automated email from the ASF dual-hosted git repository. jonmeredith pushed a commit to branch cassandra-4.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 965fe0decf4a354cafeffc33cddadc6d103de45d Merge: 67f913a beb4563 Author: Jon Meredith <[email protected]> AuthorDate: Thu Feb 3 14:26:11 2022 -0700 Merge branch 'cassandra-3.11' into cassandra-4.0 .../apache/cassandra/service/StorageService.java | 14 ++++-- .../cassandra/distributed/impl/Instance.java | 9 ++-- .../distributed/test/ring/AutoBootstrapTest.java | 56 ++++++++++++++++++++++ .../distributed/test/ring/BootstrapTest.java | 20 -------- .../cassandra/streaming/StreamingTransferTest.java | 2 +- 5 files changed, 73 insertions(+), 28 deletions(-) diff --cc src/java/org/apache/cassandra/service/StorageService.java index d78e0e8,49f3835..45fa119 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@@ -4977,10 -4686,7 +4977,8 @@@ public class StorageService extends Not if (!isFinalShutdown) setMode(Mode.DRAINING, "flushing column families", false); - // disable autocompaction - we don't want to start any new compactions while we are draining - for (Keyspace keyspace : Keyspace.all()) - for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores()) - cfs.disableAutoCompaction(); ++ // we don't want to start any new compactions while we are draining + disableAutoCompaction(); // count CFs first, since forceFlush could block for the flushWriter to get a queue slot empty totalCFs = 0; @@@ -5059,6 -4765,6 +5057,14 @@@ } } ++ @VisibleForTesting ++ public void disableAutoCompaction() ++ { ++ for (Keyspace keyspace : Keyspace.all()) ++ for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores()) ++ cfs.disableAutoCompaction(); ++ } ++ /** * Add a runnable which will be called before shut down or drain. This is useful for other * applications running in the same JVM which may want to shut down first rather than time diff --cc test/distributed/org/apache/cassandra/distributed/impl/Instance.java index 39ba770,1859742..9ea6d01 --- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java @@@ -721,25 -739,19 +723,26 @@@ public class Instance extends IsolatedE () -> SecondaryIndexManager.shutdownAndWait(1L, MINUTES), () -> IndexSummaryManager.instance.shutdownAndWait(1L, MINUTES), () -> ColumnFamilyStore.shutdownExecutorsAndWait(1L, MINUTES), - () -> PendingRangeCalculatorService.instance.shutdownExecutor(1L, MINUTES), - () -> BufferPool.shutdownLocalCleaner(1L, MINUTES), + () -> PendingRangeCalculatorService.instance.shutdownAndWait(1L, MINUTES), + () -> BufferPools.shutdownLocalCleaner(1L, MINUTES), () -> Ref.shutdownReferenceReaper(1L, MINUTES), () -> Memtable.MEMORY_POOL.shutdownAndWait(1L, MINUTES), + () -> DiagnosticSnapshotService.instance.shutdownAndWait(1L, MINUTES), + () -> ScheduledExecutors.shutdownAndWait(1L, MINUTES), () -> SSTableReader.shutdownBlocking(1L, MINUTES), - () -> shutdownAndWait(Collections.singletonList(ActiveRepairService.repairCommandExecutor())), - () -> ScheduledExecutors.shutdownAndWait(1L, MINUTES) - () -> DiagnosticSnapshotService.instance.shutdownAndWait(1L, MINUTES) ++ () -> shutdownAndWait(Collections.singletonList(ActiveRepairService.repairCommandExecutor())) ); + ++ error = parallelRun(error, executor, () -> ScheduledExecutors.shutdownAndWait(1L, MINUTES)); ++ error = parallelRun(error, executor, - () -> ScheduledExecutors.shutdownAndWait(1L, MINUTES), - (IgnoreThrowingRunnable) MessagingService.instance()::shutdown + CommitLog.instance::shutdownBlocking, + // can only shutdown message once, so if the test shutsdown an instance, then ignore the failure + (IgnoreThrowingRunnable) () -> MessagingService.instance().shutdown(1L, MINUTES, false, true) ); error = parallelRun(error, executor, - () -> StageManager.shutdownAndWait(1L, MINUTES), + () -> GlobalEventExecutor.INSTANCE.awaitInactivity(1l, MINUTES), + () -> Stage.shutdownAndWait(1L, MINUTES), () -> SharedExecutorPool.SHARED.shutdownAndWait(1L, MINUTES) ); error = parallelRun(error, executor, diff --cc test/distributed/org/apache/cassandra/distributed/test/ring/AutoBootstrapTest.java index 0000000,0000000..dc754ca new file mode 100644 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/ring/AutoBootstrapTest.java @@@ -1,0 -1,0 +1,56 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.cassandra.distributed.test.ring; ++ ++import java.util.Map; ++ ++import org.junit.Assert; ++import org.junit.Test; ++ ++import org.apache.cassandra.distributed.Cluster; ++import org.apache.cassandra.distributed.api.TokenSupplier; ++import org.apache.cassandra.distributed.shared.NetworkTopology; ++ ++import static org.apache.cassandra.distributed.api.Feature.GOSSIP; ++import static org.apache.cassandra.distributed.api.Feature.NETWORK; ++ ++public class AutoBootstrapTest extends BootstrapTest ++{ ++ // Originally part of BootstrapTest. Broken out into separate test as the in-JVM dtests fail ++ // if too many instances are created in the same JVM. Bug in the JVM is suspected. ++ @Test ++ public void autoBootstrapTest() throws Throwable ++ { ++ int originalNodeCount = 2; ++ int expandedNodeCount = originalNodeCount + 1; ++ ++ try (Cluster cluster = builder().withNodes(originalNodeCount) ++ .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount)) ++ .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0")) ++ .withConfig(config -> config.with(NETWORK, GOSSIP)) ++ .start()) ++ { ++ populate(cluster,0, 100); ++ bootstrapAndJoinNode(cluster); ++ ++ for (Map.Entry<Integer, Long> e : count(cluster).entrySet()) ++ Assert.assertEquals("Node " + e.getKey() + " has incorrect row state", e.getValue().longValue(), 100L); ++ } ++ } ++} diff --cc test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java index 52d0f16,0000000..d6e715a mode 100644,000000..100644 --- a/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java @@@ -1,149 -1,0 +1,129 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.ring; + +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.ICluster; +import org.apache.cassandra.distributed.api.IInstanceConfig; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.api.TokenSupplier; +import org.apache.cassandra.distributed.shared.NetworkTopology; +import org.apache.cassandra.distributed.test.TestBaseImpl; + +import static java.util.Arrays.asList; +import static org.apache.cassandra.distributed.action.GossipHelper.bootstrap; +import static org.apache.cassandra.distributed.action.GossipHelper.pullSchemaFrom; +import static org.apache.cassandra.distributed.action.GossipHelper.statusToBootstrap; +import static org.apache.cassandra.distributed.action.GossipHelper.withProperty; +import static org.apache.cassandra.distributed.api.Feature.GOSSIP; +import static org.apache.cassandra.distributed.api.Feature.NETWORK; + +public class BootstrapTest extends TestBaseImpl +{ + @Test + public void bootstrapTest() throws Throwable + { + int originalNodeCount = 2; + int expandedNodeCount = originalNodeCount + 1; + + try (Cluster cluster = builder().withNodes(originalNodeCount) + .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount)) + .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0")) + .withConfig(config -> config.with(NETWORK, GOSSIP)) + .start()) + { + populate(cluster,0, 100); + + IInstanceConfig config = cluster.newInstanceConfig(); + IInvokableInstance newInstance = cluster.bootstrap(config); + withProperty("cassandra.join_ring", false, + () -> newInstance.startup(cluster)); + + cluster.forEach(statusToBootstrap(newInstance)); + + cluster.run(asList(pullSchemaFrom(cluster.get(1)), + bootstrap()), + newInstance.config().num()); + + for (Map.Entry<Integer, Long> e : count(cluster).entrySet()) + Assert.assertEquals("Node " + e.getKey() + " has incorrect row state", + 100L, + e.getValue().longValue()); + } + } + + @Test + public void readWriteDuringBootstrapTest() throws Throwable + { + int originalNodeCount = 2; + int expandedNodeCount = originalNodeCount + 1; + + try (Cluster cluster = builder().withNodes(originalNodeCount) + .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount)) + .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0")) + .withConfig(config -> config.with(NETWORK, GOSSIP)) + .start()) + { + IInstanceConfig config = cluster.newInstanceConfig(); + IInvokableInstance newInstance = cluster.bootstrap(config); + withProperty("cassandra.join_ring", false, + () -> newInstance.startup(cluster)); + + cluster.forEach(statusToBootstrap(newInstance)); + + populate(cluster,0, 100); + + Assert.assertEquals(100, newInstance.executeInternal("SELECT *FROM " + KEYSPACE + ".tbl").length); + } + } + - @Test - public void autoBootstrapTest() throws Throwable - { - int originalNodeCount = 2; - int expandedNodeCount = originalNodeCount + 1; - - try (Cluster cluster = builder().withNodes(originalNodeCount) - .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount)) - .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0")) - .withConfig(config -> config.with(NETWORK, GOSSIP)) - .start()) - { - populate(cluster,0, 100); - bootstrapAndJoinNode(cluster); - - for (Map.Entry<Integer, Long> e : count(cluster).entrySet()) - Assert.assertEquals("Node " + e.getKey() + " has incorrect row state", e.getValue().longValue(), 100L); - } - } - + public static void populate(ICluster cluster, int from, int to) + { + populate(cluster, from, to, 1, 3, ConsistencyLevel.QUORUM); + } + + public static void populate(ICluster cluster, int from, int to, int coord, int rf, ConsistencyLevel cl) + { + cluster.schemaChange("CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + rf + "};"); + cluster.schemaChange("CREATE TABLE IF NOT EXISTS " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + for (int i = from; i < to; i++) + { + cluster.coordinator(coord).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?, ?, ?)", + cl, + i, i, i); + } + } + + public static Map<Integer, Long> count(ICluster cluster) + { + return IntStream.rangeClosed(1, cluster.size()) + .boxed() + .collect(Collectors.toMap(nodeId -> nodeId, + nodeId -> (Long) cluster.get(nodeId).executeInternal("SELECT count(*) FROM " + KEYSPACE + ".tbl")[0][0])); + } + +} diff --cc test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java index 7b00f93,8f3061a..a4aa77f --- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java @@@ -294,7 -304,7 +294,7 @@@ public class StreamingTransferTes { final Keyspace keyspace = Keyspace.open(KEYSPACE1); final ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_INDEX); -- ++ cfs.disableAutoCompaction(); List<String> keys = createAndTransfer(cfs, new Mutator() { public void mutate(String key, String col, long timestamp) throws Exception --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
