This is an automated email from the ASF dual-hosted git repository.

jonmeredith pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 965fe0decf4a354cafeffc33cddadc6d103de45d
Merge: 67f913a beb4563
Author: Jon Meredith <[email protected]>
AuthorDate: Thu Feb 3 14:26:11 2022 -0700

    Merge branch 'cassandra-3.11' into cassandra-4.0

 .../apache/cassandra/service/StorageService.java   | 14 ++++--
 .../cassandra/distributed/impl/Instance.java       |  9 ++--
 .../distributed/test/ring/AutoBootstrapTest.java   | 56 ++++++++++++++++++++++
 .../distributed/test/ring/BootstrapTest.java       | 20 --------
 .../cassandra/streaming/StreamingTransferTest.java |  2 +-
 5 files changed, 73 insertions(+), 28 deletions(-)

diff --cc src/java/org/apache/cassandra/service/StorageService.java
index d78e0e8,49f3835..45fa119
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -4977,10 -4686,7 +4977,8 @@@ public class StorageService extends Not
              if (!isFinalShutdown)
                  setMode(Mode.DRAINING, "flushing column families", false);
  
-             // disable autocompaction - we don't want to start any new 
compactions while we are draining
-             for (Keyspace keyspace : Keyspace.all())
-                 for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
-                     cfs.disableAutoCompaction();
++            // we don't want to start any new compactions while we are 
draining
+             disableAutoCompaction();
  
              // count CFs first, since forceFlush could block for the 
flushWriter to get a queue slot empty
              totalCFs = 0;
@@@ -5059,6 -4765,6 +5057,14 @@@
          }
      }
  
++    @VisibleForTesting
++    public void disableAutoCompaction()
++    {
++        for (Keyspace keyspace : Keyspace.all())
++            for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
++                cfs.disableAutoCompaction();
++    }
++
      /**
       * Add a runnable which will be called before shut down or drain. This is 
useful for other
       * applications running in the same JVM which may want to shut down first 
rather than time
diff --cc test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 39ba770,1859742..9ea6d01
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@@ -721,25 -739,19 +723,26 @@@ public class Instance extends IsolatedE
                                  () -> 
SecondaryIndexManager.shutdownAndWait(1L, MINUTES),
                                  () -> 
IndexSummaryManager.instance.shutdownAndWait(1L, MINUTES),
                                  () -> 
ColumnFamilyStore.shutdownExecutorsAndWait(1L, MINUTES),
 -                                () -> 
PendingRangeCalculatorService.instance.shutdownExecutor(1L, MINUTES),
 -                                () -> BufferPool.shutdownLocalCleaner(1L, 
MINUTES),
 +                                () -> 
PendingRangeCalculatorService.instance.shutdownAndWait(1L, MINUTES),
 +                                () -> BufferPools.shutdownLocalCleaner(1L, 
MINUTES),
                                  () -> Ref.shutdownReferenceReaper(1L, 
MINUTES),
                                  () -> 
Memtable.MEMORY_POOL.shutdownAndWait(1L, MINUTES),
 +                                () -> 
DiagnosticSnapshotService.instance.shutdownAndWait(1L, MINUTES),
 +                                () -> ScheduledExecutors.shutdownAndWait(1L, 
MINUTES),
                                  () -> SSTableReader.shutdownBlocking(1L, 
MINUTES),
-                                 () -> 
shutdownAndWait(Collections.singletonList(ActiveRepairService.repairCommandExecutor())),
-                                 () -> ScheduledExecutors.shutdownAndWait(1L, 
MINUTES)
 -                                () -> 
DiagnosticSnapshotService.instance.shutdownAndWait(1L, MINUTES)
++                                () -> 
shutdownAndWait(Collections.singletonList(ActiveRepairService.repairCommandExecutor()))
              );
 +
++            error = parallelRun(error, executor, () -> 
ScheduledExecutors.shutdownAndWait(1L, MINUTES));
++
              error = parallelRun(error, executor,
 -                                () -> ScheduledExecutors.shutdownAndWait(1L, 
MINUTES),
 -                                (IgnoreThrowingRunnable) 
MessagingService.instance()::shutdown
 +                                CommitLog.instance::shutdownBlocking,
 +                                // can only shutdown message once, so if the 
test shutsdown an instance, then ignore the failure
 +                                (IgnoreThrowingRunnable) () -> 
MessagingService.instance().shutdown(1L, MINUTES, false, true)
              );
              error = parallelRun(error, executor,
 -                                () -> StageManager.shutdownAndWait(1L, 
MINUTES),
 +                                () -> 
GlobalEventExecutor.INSTANCE.awaitInactivity(1l, MINUTES),
 +                                () -> Stage.shutdownAndWait(1L, MINUTES),
                                  () -> 
SharedExecutorPool.SHARED.shutdownAndWait(1L, MINUTES)
              );
              error = parallelRun(error, executor,
diff --cc 
test/distributed/org/apache/cassandra/distributed/test/ring/AutoBootstrapTest.java
index 0000000,0000000..dc754ca
new file mode 100644
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/ring/AutoBootstrapTest.java
@@@ -1,0 -1,0 +1,56 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.cassandra.distributed.test.ring;
++
++import java.util.Map;
++
++import org.junit.Assert;
++import org.junit.Test;
++
++import org.apache.cassandra.distributed.Cluster;
++import org.apache.cassandra.distributed.api.TokenSupplier;
++import org.apache.cassandra.distributed.shared.NetworkTopology;
++
++import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
++import static org.apache.cassandra.distributed.api.Feature.NETWORK;
++
++public class AutoBootstrapTest extends BootstrapTest
++{
++    // Originally part of BootstrapTest. Broken out into separate test as the 
in-JVM dtests fail
++    // if too many instances are created in the same JVM. Bug in the JVM is 
suspected.
++    @Test
++    public void autoBootstrapTest() throws Throwable
++    {
++        int originalNodeCount = 2;
++        int expandedNodeCount = originalNodeCount + 1;
++
++        try (Cluster cluster = builder().withNodes(originalNodeCount)
++                                        
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
++                                        
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, 
"dc0", "rack0"))
++                                        .withConfig(config -> 
config.with(NETWORK, GOSSIP))
++                                        .start())
++        {
++            populate(cluster,0, 100);
++            bootstrapAndJoinNode(cluster);
++
++            for (Map.Entry<Integer, Long> e : count(cluster).entrySet())
++                Assert.assertEquals("Node " + e.getKey() + " has incorrect 
row state", e.getValue().longValue(), 100L);
++        }
++    }
++}
diff --cc 
test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java
index 52d0f16,0000000..d6e715a
mode 100644,000000..100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java
@@@ -1,149 -1,0 +1,129 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.test.ring;
 +
 +import java.util.Map;
 +import java.util.stream.Collectors;
 +import java.util.stream.IntStream;
 +
 +import org.junit.Assert;
 +import org.junit.Test;
 +
 +import org.apache.cassandra.distributed.Cluster;
 +import org.apache.cassandra.distributed.api.ConsistencyLevel;
 +import org.apache.cassandra.distributed.api.ICluster;
 +import org.apache.cassandra.distributed.api.IInstanceConfig;
 +import org.apache.cassandra.distributed.api.IInvokableInstance;
 +import org.apache.cassandra.distributed.api.TokenSupplier;
 +import org.apache.cassandra.distributed.shared.NetworkTopology;
 +import org.apache.cassandra.distributed.test.TestBaseImpl;
 +
 +import static java.util.Arrays.asList;
 +import static org.apache.cassandra.distributed.action.GossipHelper.bootstrap;
 +import static 
org.apache.cassandra.distributed.action.GossipHelper.pullSchemaFrom;
 +import static 
org.apache.cassandra.distributed.action.GossipHelper.statusToBootstrap;
 +import static 
org.apache.cassandra.distributed.action.GossipHelper.withProperty;
 +import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
 +import static org.apache.cassandra.distributed.api.Feature.NETWORK;
 +
 +public class BootstrapTest extends TestBaseImpl
 +{
 +    @Test
 +    public void bootstrapTest() throws Throwable
 +    {
 +        int originalNodeCount = 2;
 +        int expandedNodeCount = originalNodeCount + 1;
 +
 +        try (Cluster cluster = builder().withNodes(originalNodeCount)
 +                                        
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
 +                                        
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, 
"dc0", "rack0"))
 +                                        .withConfig(config -> 
config.with(NETWORK, GOSSIP))
 +                                        .start())
 +        {
 +            populate(cluster,0, 100);
 +
 +            IInstanceConfig config = cluster.newInstanceConfig();
 +            IInvokableInstance newInstance = cluster.bootstrap(config);
 +            withProperty("cassandra.join_ring", false,
 +                         () -> newInstance.startup(cluster));
 +
 +            cluster.forEach(statusToBootstrap(newInstance));
 +
 +            cluster.run(asList(pullSchemaFrom(cluster.get(1)),
 +                               bootstrap()),
 +                        newInstance.config().num());
 +
 +            for (Map.Entry<Integer, Long> e : count(cluster).entrySet())
 +                Assert.assertEquals("Node " + e.getKey() + " has incorrect 
row state",
 +                                    100L,
 +                                    e.getValue().longValue());
 +        }
 +    }
 +
 +    @Test
 +    public void readWriteDuringBootstrapTest() throws Throwable
 +    {
 +        int originalNodeCount = 2;
 +        int expandedNodeCount = originalNodeCount + 1;
 +
 +        try (Cluster cluster = builder().withNodes(originalNodeCount)
 +                                        
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
 +                                        
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, 
"dc0", "rack0"))
 +                                        .withConfig(config -> 
config.with(NETWORK, GOSSIP))
 +                                        .start())
 +        {
 +            IInstanceConfig config = cluster.newInstanceConfig();
 +            IInvokableInstance newInstance = cluster.bootstrap(config);
 +            withProperty("cassandra.join_ring", false,
 +                         () -> newInstance.startup(cluster));
 +
 +            cluster.forEach(statusToBootstrap(newInstance));
 +
 +            populate(cluster,0, 100);
 +
 +            Assert.assertEquals(100, newInstance.executeInternal("SELECT 
*FROM " + KEYSPACE + ".tbl").length);
 +        }
 +    }
 +
-     @Test
-     public void autoBootstrapTest() throws Throwable
-     {
-         int originalNodeCount = 2;
-         int expandedNodeCount = originalNodeCount + 1;
- 
-         try (Cluster cluster = builder().withNodes(originalNodeCount)
-                                         
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
-                                         
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, 
"dc0", "rack0"))
-                                         .withConfig(config -> 
config.with(NETWORK, GOSSIP))
-                                         .start())
-         {
-             populate(cluster,0, 100);
-             bootstrapAndJoinNode(cluster);
- 
-             for (Map.Entry<Integer, Long> e : count(cluster).entrySet())
-                 Assert.assertEquals("Node " + e.getKey() + " has incorrect 
row state", e.getValue().longValue(), 100L);
-         }
-     }
- 
 +    public static void populate(ICluster cluster, int from, int to)
 +    {
 +        populate(cluster, from, to, 1, 3, ConsistencyLevel.QUORUM);
 +    }
 +
 +    public static void populate(ICluster cluster, int from, int to, int 
coord, int rf, ConsistencyLevel cl)
 +    {
 +        cluster.schemaChange("CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE + " 
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + rf + 
"};");
 +        cluster.schemaChange("CREATE TABLE IF NOT EXISTS " + KEYSPACE + ".tbl 
(pk int, ck int, v int, PRIMARY KEY (pk, ck))");
 +        for (int i = from; i < to; i++)
 +        {
 +            cluster.coordinator(coord).execute("INSERT INTO " + KEYSPACE + 
".tbl (pk, ck, v) VALUES (?, ?, ?)",
 +                                               cl,
 +                                               i, i, i);
 +        }
 +    }
 +
 +    public static Map<Integer, Long> count(ICluster cluster)
 +    {
 +        return IntStream.rangeClosed(1, cluster.size())
 +                        .boxed()
 +                        .collect(Collectors.toMap(nodeId -> nodeId,
 +                                                  nodeId -> (Long) 
cluster.get(nodeId).executeInternal("SELECT count(*) FROM " + KEYSPACE + 
".tbl")[0][0]));
 +    }
 +
 +}
diff --cc test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index 7b00f93,8f3061a..a4aa77f
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@@ -294,7 -304,7 +294,7 @@@ public class StreamingTransferTes
      {
          final Keyspace keyspace = Keyspace.open(KEYSPACE1);
          final ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_INDEX);
--
++        cfs.disableAutoCompaction();
          List<String> keys = createAndTransfer(cfs, new Mutator()
          {
              public void mutate(String key, String col, long timestamp) throws 
Exception

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to