Introduce in-jvm distributed tests Patch by Alex Petrov and Benedict Elliott Smith; reviewed by Benedict Elliott Smith and Dinesh Joshi for CASSANDRA-14821.
Co-authored-by: Benedict Elliott Smith <bened...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f22fec92 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f22fec92 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f22fec92 Branch: refs/heads/trunk Commit: f22fec927de7ac291266660c2f34de5b8cc1c695 Parents: 7877035 Author: Alex Petrov <oleksandr.pet...@gmail.com> Authored: Fri Nov 16 19:41:58 2018 +0100 Committer: Alex Petrov <oleksandr.pet...@gmail.com> Committed: Fri Nov 16 19:41:58 2018 +0100 ---------------------------------------------------------------------- .circleci/config.yml | 14 +- build.xml | 10 +- ide/idea-iml-file.xml | 1 + .../org/apache/cassandra/auth/AuthCache.java | 33 +- .../cassandra/batchlog/BatchlogManager.java | 48 ++- .../concurrent/InfiniteLoopExecutor.java | 83 ++++ .../JMXEnabledThreadPoolExecutor.java | 24 +- .../concurrent/ScheduledExecutors.java | 15 + .../concurrent/SharedExecutorPool.java | 15 +- .../cassandra/concurrent/StageManager.java | 10 + .../cassandra/config/DatabaseDescriptor.java | 8 +- .../apache/cassandra/cql3/QueryProcessor.java | 6 +- .../cassandra/db/BlacklistedDirectories.java | 17 +- .../apache/cassandra/db/ColumnFamilyStore.java | 48 ++- .../cassandra/db/HintedHandOffManager.java | 15 +- .../cassandra/db/commitlog/CommitLog.java | 15 +- .../db/compaction/CompactionManager.java | 18 +- .../cassandra/diag/DiagnosticEventService.java | 13 +- .../cassandra/diag/LastEventIdBroadcaster.java | 16 +- .../apache/cassandra/gms/FailureDetector.java | 14 +- src/java/org/apache/cassandra/gms/Gossiper.java | 14 +- .../apache/cassandra/hints/HintsService.java | 15 +- .../cassandra/index/SecondaryIndexManager.java | 10 + .../io/sstable/IndexSummaryManager.java | 30 +- .../cassandra/io/util/DataInputBuffer.java | 8 +- .../locator/DynamicEndpointSnitch.java | 24 +- .../cassandra/locator/EndpointSnitchInfo.java | 16 +- .../metrics/CassandraMetricsRegistry.java | 30 +- .../cassandra/net/ForwardToContainer.java | 3 +- .../org/apache/cassandra/net/MessageIn.java | 12 +- .../apache/cassandra/net/MessagingService.java | 28 +- .../net/async/ByteBufDataInputPlus.java | 8 + .../cassandra/net/async/MessageInHandler.java | 67 +++- .../net/async/MessageInHandlerPre40.java | 50 ++- .../cassandra/net/async/NettyFactory.java | 12 +- .../apache/cassandra/schema/SchemaEvent.java | 5 +- .../cassandra/service/ActiveRepairService.java | 15 +- .../apache/cassandra/service/CacheService.java | 16 +- .../cassandra/service/CassandraDaemon.java | 11 +- .../service/PendingRangeCalculatorService.java | 9 + .../apache/cassandra/service/StorageProxy.java | 15 +- .../cassandra/service/StorageService.java | 16 +- .../apache/cassandra/utils/ByteBufferUtil.java | 26 ++ .../apache/cassandra/utils/MBeanWrapper.java | 179 +++++++++ .../org/apache/cassandra/utils/Mx4jTool.java | 4 +- .../apache/cassandra/utils/concurrent/Ref.java | 35 +- .../cassandra/utils/memory/BufferPool.java | 43 +- .../utils/memory/MemtableCleanerThread.java | 77 ++-- .../cassandra/utils/memory/MemtablePool.java | 9 + test/conf/logback-dtest.xml | 79 ++++ .../cassandra/distributed/Coordinator.java | 80 ++++ .../DistributedReadWritePathTest.java | 348 ++++++++++++++++ .../distributed/DistributedTestBase.java | 86 ++++ .../apache/cassandra/distributed/Instance.java | 399 +++++++++++++++++++ .../distributed/InstanceClassLoader.java | 101 +++++ .../cassandra/distributed/InstanceConfig.java | 87 ++++ .../distributed/InstanceIDDefiner.java | 38 ++ .../distributed/InvokableInstance.java | 133 +++++++ .../apache/cassandra/distributed/Message.java | 41 ++ .../cassandra/distributed/MessageFilters.java | 175 ++++++++ .../apache/cassandra/distributed/RowUtil.java | 47 +++ .../cassandra/distributed/TestCluster.java | 308 ++++++++++++++ .../cassandra/net/async/NettyFactoryTest.java | 2 +- .../async/StreamCompressionSerializerTest.java | 4 - 64 files changed, 2661 insertions(+), 477 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/.circleci/config.yml ---------------------------------------------------------------------- diff --git a/.circleci/config.yml b/.circleci/config.yml index 430354a..3b2b978 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -164,12 +164,20 @@ jobs: # get all of our unit test filenames set -eo pipefail && circleci tests glob "$HOME/cassandra/test/unit/**/*.java" > /tmp/all_java_unit_tests.txt + # append distributed tests + set -eo pipefail && circleci tests glob "$HOME/cassandra/test/distributed/**/*.java" > /tmp/all_java_distributed_tests.txt # split up the unit tests into groups based on the number of containers we have set -eo pipefail && circleci tests split --split-by=timings --timings-type=filename --index=${CIRCLE_NODE_INDEX} --total=${CIRCLE_NODE_TOTAL} /tmp/all_java_unit_tests.txt > /tmp/java_tests_${CIRCLE_NODE_INDEX}.txt set -eo pipefail && cat /tmp/java_tests_${CIRCLE_NODE_INDEX}.txt | cut -c 37-1000000 | grep "Test\.java$" > /tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt echo "** /tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt" cat /tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt + + set -eo pipefail && circleci tests split --split-by=timings --timings-type=filename --index=${CIRCLE_NODE_INDEX} --total=${CIRCLE_NODE_TOTAL} /tmp/all_java_distributed_tests.txt > /tmp/java_dtests_${CIRCLE_NODE_INDEX}.txt + set +eo pipefail && cat /tmp/java_dtests_${CIRCLE_NODE_INDEX}.txt | cut -c 44-1000000 | grep "Test\.java$" > /tmp/java_dtests_${CIRCLE_NODE_INDEX}_final.txt + echo "** /tmp/java_dtests_${CIRCLE_NODE_INDEX}_final.txt" + cat /tmp/java_dtests_${CIRCLE_NODE_INDEX}_final.txt + - run: name: Run Unit Tests command: | @@ -181,7 +189,11 @@ jobs: time mv ~/cassandra /tmp cd /tmp/cassandra - ant testclasslist -Dtest.classlistfile=/tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt + ant testclasslist -Dtest.classlistfile=/tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt -Dtest.classlistprefix=unit + + if [ -s "/tmp/java_dtests_${CIRCLE_NODE_INDEX}_final.txt" ]; then + ant testclasslist -Dtest.classlistfile=/tmp/java_dtests_${CIRCLE_NODE_INDEX}_final.txt -Dtest.classlistprefix=distributed + fi no_output_timeout: 15m - store_test_results: path: /tmp/cassandra/build/test/output/ http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/build.xml ---------------------------------------------------------------------- diff --git a/build.xml b/build.xml index 3d3014c..f24647e 100644 --- a/build.xml +++ b/build.xml @@ -56,12 +56,14 @@ <property name="test.data" value="${test.dir}/data"/> <property name="test.name" value="*Test"/> <property name="test.classlistfile" value="testlist.txt"/> + <property name="test.classlistprefix" value="unit"/> <property name="benchmark.name" value=""/> <property name="test.methods" value=""/> <property name="test.unit.src" value="${test.dir}/unit"/> <property name="test.long.src" value="${test.dir}/long"/> <property name="test.burn.src" value="${test.dir}/burn"/> <property name="test.microbench.src" value="${test.dir}/microbench"/> + <property name="test.distributed.src" value="${test.dir}/distributed"/> <property name="dist.dir" value="${build.dir}/dist"/> <property name="tmp.dir" value="${java.io.tmpdir}"/> @@ -103,6 +105,7 @@ <property name="test.timeout" value="240000" /> <property name="test.long.timeout" value="600000" /> <property name="test.burn.timeout" value="60000000" /> + <property name="test.distributed.timeout" value="600000" /> <!-- default for cql tests. Can be override by -Dcassandra.test.use_prepared=false --> <property name="cassandra.test.use_prepared" value="true" /> @@ -1253,6 +1256,7 @@ <src path="${test.long.src}"/> <src path="${test.burn.src}"/> <src path="${test.microbench.src}"/> + <src path="${test.distributed.src}"/> </javac> <!-- Non-java resources needed by the test suite --> @@ -1364,7 +1368,7 @@ <attribute name="test.file.list"/> <attribute name="testlist.offset"/> <sequential> - <testmacrohelper inputdir="${test.unit.src}" filelist="@{test.file.list}" poffset="@{testlist.offset}" exclude="**/*.java" timeout="${test.timeout}"> + <testmacrohelper inputdir="${test.dir}/${test.classlistprefix}" filelist="@{test.file.list}" poffset="@{testlist.offset}" exclude="**/*.java" timeout="${test.timeout}"> <jvmarg value="-Dlegacy-sstable-root=${test.data}/legacy-sstables"/> <jvmarg value="-Dinvalid-legacy-sstable-root=${test.data}/invalid-legacy-sstables"/> <jvmarg value="-Dcassandra.ring_delay_ms=1000"/> @@ -1468,6 +1472,7 @@ </concat> <path id="all-test-classes-path"> <fileset dir="${test.unit.src}" includes="**/${test.name}.java" /> + <fileset dir="${test.distributed.src}" includes="**/${test.name}.java" /> </path> <property name="all-test-classes" refid="all-test-classes-path"/> <testparallel testdelegate="testlist-compression" /> @@ -1844,7 +1849,7 @@ e.g. org/apache/cassandra/hints/HintMessageTest.java --> <target name="testclasslist" depends="build-test" description="Parallel-run tests given in file -Dtest.classlistfile (one-class-per-line, e.g. org/apache/cassandra/db/SomeTest.java)"> <path id="all-test-classes-path"> - <fileset dir="${test.unit.src}" includesfile="${test.classlistfile}"/> + <fileset dir="${test.dir}/${test.classlistprefix}" includesfile="${test.classlistfile}"/> </path> <property name="all-test-classes" refid="all-test-classes-path"/> <testparallel testdelegate="testlist"/> @@ -1939,6 +1944,7 @@ <classpathentry kind="src" path="conf" including="hotspot_compiler"/> <classpathentry kind="src" output="build/test/classes" path="test/unit"/> <classpathentry kind="src" output="build/test/classes" path="test/long"/> + <classpathentry kind="src" output="build/test/classes" path="test/distributed"/> <classpathentry kind="src" output="build/test/classes" path="test/resources" /> <classpathentry kind="src" path="tools/stress/src"/> <classpathentry kind="src" path="tools/fqltool/src"/> http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/ide/idea-iml-file.xml ---------------------------------------------------------------------- diff --git a/ide/idea-iml-file.xml b/ide/idea-iml-file.xml index b83abfa..0045ae6 100644 --- a/ide/idea-iml-file.xml +++ b/ide/idea-iml-file.xml @@ -35,6 +35,7 @@ <sourceFolder url="file://$MODULE_DIR$/test/long" isTestSource="true" /> <sourceFolder url="file://$MODULE_DIR$/test/microbench" isTestSource="true" /> <sourceFolder url="file://$MODULE_DIR$/test/burn" isTestSource="true" /> + <sourceFolder url="file://$MODULE_DIR$/test/distributed" isTestSource="true" /> <sourceFolder url="file://$MODULE_DIR$/test/resources" type="java-test-resource" /> <sourceFolder url="file://$MODULE_DIR$/test/conf" type="java-test-resource" /> <excludeFolder url="file://$MODULE_DIR$/.idea" /> http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/auth/AuthCache.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/auth/AuthCache.java b/src/java/org/apache/cassandra/auth/AuthCache.java index 3adf914..4bf15c1 100644 --- a/src/java/org/apache/cassandra/auth/AuthCache.java +++ b/src/java/org/apache/cassandra/auth/AuthCache.java @@ -18,22 +18,19 @@ package org.apache.cassandra.auth; -import java.lang.management.ManagementFactory; import java.util.concurrent.TimeUnit; import java.util.function.BooleanSupplier; import java.util.function.Function; import java.util.function.IntConsumer; import java.util.function.IntSupplier; -import javax.management.MBeanServer; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; import com.google.common.util.concurrent.MoreExecutors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.cassandra.utils.MBeanWrapper; import static com.google.common.base.Preconditions.checkNotNull; @@ -97,33 +94,17 @@ public class AuthCache<K, V> implements AuthCacheMBean protected void init() { cache = initCache(null); - try - { - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - mbs.registerMBean(this, getObjectName()); - } - catch (Exception e) - { - throw new RuntimeException(e); - } + MBeanWrapper.instance.registerMBean(this, getObjectName()); } protected void unregisterMBean() { - try - { - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - mbs.unregisterMBean(getObjectName()); - } - catch (Exception e) - { - logger.warn("Error unregistering {} cache mbean", name, e); - } + MBeanWrapper.instance.unregisterMBean(getObjectName(), MBeanWrapper.OnException.LOG); } - protected ObjectName getObjectName() throws MalformedObjectNameException + protected String getObjectName() { - return new ObjectName(MBEAN_NAME_BASE + name); + return MBEAN_NAME_BASE + name; } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/batchlog/BatchlogManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java index 91129ed..b2b851d 100644 --- a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java +++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java @@ -18,28 +18,37 @@ package org.apache.cassandra.batchlog; import java.io.IOException; -import java.lang.management.ManagementFactory; import java.nio.ByteBuffer; -import java.util.*; -import java.util.concurrent.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; -import javax.management.MBeanServer; -import javax.management.ObjectName; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.*; +import com.google.common.collect.Collections2; +import com.google.common.collect.Iterables; import com.google.common.util.concurrent.RateLimiter; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.locator.Replica; -import org.apache.cassandra.locator.Replicas; -import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.cql3.UntypedResultSet; -import org.apache.cassandra.db.*; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.db.WriteType; import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.db.marshal.UUIDType; import org.apache.cassandra.db.partitions.PartitionUpdate; @@ -51,15 +60,20 @@ import org.apache.cassandra.hints.Hint; import org.apache.cassandra.hints.HintsService; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.Replica; import org.apache.cassandra.locator.ReplicaLayout; import org.apache.cassandra.locator.ReplicaPlan; +import org.apache.cassandra.locator.Replicas; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.WriteResponseHandler; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MBeanWrapper; import org.apache.cassandra.utils.UUIDGen; import static com.google.common.collect.Iterables.transform; @@ -93,15 +107,7 @@ public class BatchlogManager implements BatchlogManagerMBean public void start() { - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - try - { - mbs.registerMBean(this, new ObjectName(MBEAN_NAME)); - } - catch (Exception e) - { - throw new RuntimeException(e); - } + MBeanWrapper.instance.registerMBean(this, MBEAN_NAME); batchlogTasks.scheduleWithFixedDelay(this::replayFailedBatches, StorageService.RING_DELAY, http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java b/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java new file mode 100644 index 0000000..1b8173e --- /dev/null +++ b/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.concurrent; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +public class InfiniteLoopExecutor +{ + private static final Logger logger = LoggerFactory.getLogger(InfiniteLoopExecutor.class); + + public interface InterruptibleRunnable + { + void run() throws InterruptedException; + } + + private final Thread thread; + private final InterruptibleRunnable runnable; + private volatile boolean isShutdown = false; + + public InfiniteLoopExecutor(String name, InterruptibleRunnable runnable) + { + this.runnable = runnable; + this.thread = new Thread(this::loop, name); + this.thread.setDaemon(true); + } + + private void loop() + { + while (!isShutdown) + { + try + { + runnable.run(); + } + catch (InterruptedException ie) + { + if (isShutdown) + return; + logger.error("Interrupted while executing {}, but not shutdown; continuing with loop", runnable, ie); + } + catch (Throwable t) + { + logger.error("Exception thrown by runnable, continuing with loop", t); + } + } + } + + public InfiniteLoopExecutor start() + { + thread.start(); + return this; + } + + public void shutdown() + { + isShutdown = true; + thread.interrupt(); + } + + public void awaitTermination(long time, TimeUnit unit) throws InterruptedException + { + thread.join(unit.toMillis(time)); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java index 278b399..0e61de9 100644 --- a/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java +++ b/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java @@ -17,16 +17,14 @@ */ package org.apache.cassandra.concurrent; -import java.lang.management.ManagementFactory; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.TimeUnit; -import javax.management.MBeanServer; -import javax.management.ObjectName; import org.apache.cassandra.metrics.ThreadPoolMetrics; +import org.apache.cassandra.utils.MBeanWrapper; /** * This is a wrapper class for the <i>ScheduledThreadPoolExecutor</i>. It provides an implementation @@ -81,17 +79,8 @@ public class JMXEnabledThreadPoolExecutor extends DebuggableThreadPoolExecutor i super.prestartAllCoreThreads(); metrics = new ThreadPoolMetrics(this, jmxPath, threadFactory.id).register(); - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); mbeanName = "org.apache.cassandra." + jmxPath + ":type=" + threadFactory.id; - - try - { - mbs.registerMBean(this, new ObjectName(mbeanName)); - } - catch (Exception e) - { - throw new RuntimeException(e); - } + MBeanWrapper.instance.registerMBean(this, mbeanName); } public JMXEnabledThreadPoolExecutor(int corePoolSize, @@ -114,14 +103,7 @@ public class JMXEnabledThreadPoolExecutor extends DebuggableThreadPoolExecutor i private void unregisterMBean() { - try - { - ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(mbeanName)); - } - catch (Exception e) - { - throw new RuntimeException(e); - } + MBeanWrapper.instance.unregisterMBean(mbeanName); // release metrics metrics.release(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java b/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java index 22dc769..e51e4c2 100644 --- a/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java +++ b/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java @@ -17,6 +17,11 @@ */ package org.apache.cassandra.concurrent; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +import com.google.common.annotations.VisibleForTesting; + /** * Centralized location for shared executors */ @@ -41,4 +46,14 @@ public class ScheduledExecutors * This executor is used for tasks that do not need to be waited for on shutdown/drain. */ public static final DebuggableScheduledThreadPoolExecutor optionalTasks = new DebuggableScheduledThreadPoolExecutor("OptionalTasks"); + + @VisibleForTesting + public static void shutdownAndWait() throws InterruptedException + { + ExecutorService[] executors = new ExecutorService[] { scheduledFastTasks, scheduledTasks, nonPeriodicTasks, optionalTasks }; + for (ExecutorService executor : executors) + executor.shutdown(); + for (ExecutorService executor : executors) + executor.awaitTermination(60, TimeUnit.SECONDS); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java index 3b0600f..5352ad7 100644 --- a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java +++ b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java @@ -21,9 +21,12 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import com.google.common.annotations.VisibleForTesting; + import static org.apache.cassandra.concurrent.SEPWorker.Work; /** @@ -61,7 +64,7 @@ public class SharedExecutorPool final AtomicLong workerId = new AtomicLong(); // the collection of executors serviced by this pool; periodically ordered by traffic volume - final List<SEPExecutor> executors = new CopyOnWriteArrayList<>(); + public final List<SEPExecutor> executors = new CopyOnWriteArrayList<>(); // the number of workers currently in a spinning state final AtomicInteger spinningCount = new AtomicInteger(); @@ -109,4 +112,14 @@ public class SharedExecutorPool executors.add(executor); return executor; } + + @VisibleForTesting + public static void shutdownSharedPool() throws InterruptedException + { + for (SEPExecutor executor : SHARED.executors) + executor.shutdown(); + + for (SEPExecutor executor : SHARED.executors) + executor.awaitTermination(60, TimeUnit.SECONDS); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/concurrent/StageManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/StageManager.java b/src/java/org/apache/cassandra/concurrent/StageManager.java index c102042..608a005 100644 --- a/src/java/org/apache/cassandra/concurrent/StageManager.java +++ b/src/java/org/apache/cassandra/concurrent/StageManager.java @@ -20,6 +20,7 @@ package org.apache.cassandra.concurrent; import java.util.EnumMap; import java.util.concurrent.*; +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -112,6 +113,15 @@ public class StageManager } } + @VisibleForTesting + public static void shutdownAndWait() throws InterruptedException + { + for (Stage stage : Stage.values()) + StageManager.stages.get(stage).shutdown(); + for (Stage stage : Stage.values()) + StageManager.stages.get(stage).awaitTermination(60, TimeUnit.SECONDS); + } + /** * The executor used for tracing. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index bc1e5a2..2f5f49f 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -27,6 +27,7 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.*; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -142,6 +143,11 @@ public class DatabaseDescriptor public static void daemonInitialization() throws ConfigurationException { + daemonInitialization(DatabaseDescriptor::loadConfig); + } + + public static void daemonInitialization(Supplier<Config> config) throws ConfigurationException + { if (toolInitialized) throw new AssertionError("toolInitialization() already called"); if (clientInitialized) @@ -152,7 +158,7 @@ public class DatabaseDescriptor return; daemonInitialized = true; - setConfig(loadConfig()); + setConfig(config.get()); applyAll(); AuthConfig.applyAuth(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/cql3/QueryProcessor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java index 45db947..b8ec648 100644 --- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java +++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java @@ -162,7 +162,8 @@ public class QueryProcessor implements QueryHandler SystemKeyspace.resetPreparedStatements(); } - private static QueryState internalQueryState() + @VisibleForTesting + public static QueryState internalQueryState() { return new QueryState(InternalStateInstance.INSTANCE.clientState); } @@ -265,7 +266,8 @@ public class QueryProcessor implements QueryHandler return null; } - private static QueryOptions makeInternalOptions(CQLStatement prepared, Object[] values) + @VisibleForTesting + public static QueryOptions makeInternalOptions(CQLStatement prepared, Object[] values) { return makeInternalOptions(prepared, values, ConsistencyLevel.ONE); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/db/BlacklistedDirectories.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/BlacklistedDirectories.java b/src/java/org/apache/cassandra/db/BlacklistedDirectories.java index f090013..cff9a78 100644 --- a/src/java/org/apache/cassandra/db/BlacklistedDirectories.java +++ b/src/java/org/apache/cassandra/db/BlacklistedDirectories.java @@ -21,18 +21,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; -import java.lang.management.ManagementFactory; import java.util.Collections; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicInteger; -import javax.management.MBeanServer; -import javax.management.ObjectName; - import com.google.common.annotations.VisibleForTesting; import org.apache.cassandra.utils.JVMStabilityInspector; +import org.apache.cassandra.utils.MBeanWrapper; public class BlacklistedDirectories implements BlacklistedDirectoriesMBean { @@ -48,17 +45,7 @@ public class BlacklistedDirectories implements BlacklistedDirectoriesMBean private BlacklistedDirectories() { // Register this instance with JMX - try - { - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - mbs.registerMBean(this, new ObjectName(MBEAN_NAME)); - } - catch (Exception e) - { - JVMStabilityInspector.inspectThrowable(e); - logger.error("error registering MBean {}", MBEAN_NAME, e); - //Allow the server to start even if the bean can't be registered - } + MBeanWrapper.instance.registerMBean(this, MBEAN_NAME, MBeanWrapper.OnException.LOG); } public Set<File> getUnreadableDirectories() http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 877a3c5..c5149cf 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -20,7 +20,6 @@ package org.apache.cassandra.db; import java.io.File; import java.io.IOException; import java.io.PrintStream; -import java.lang.management.ManagementFactory; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.nio.ByteBuffer; @@ -41,7 +40,6 @@ import com.google.common.util.concurrent.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.codahale.metrics.Snapshot; import org.apache.cassandra.cache.*; import org.apache.cassandra.concurrent.*; import org.apache.cassandra.config.*; @@ -72,7 +70,6 @@ import org.apache.cassandra.io.sstable.SSTableMultiWriter; import org.apache.cassandra.io.sstable.format.*; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.metrics.Sampler; import org.apache.cassandra.metrics.Sampler.Sample; import org.apache.cassandra.metrics.Sampler.SamplerType; @@ -220,12 +217,33 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean private volatile boolean neverPurgeTombstones = false; + public static void shutdownFlushExecutor() throws InterruptedException + { + flushExecutor.shutdown(); + flushExecutor.awaitTermination(60, TimeUnit.SECONDS); + } + + public static void shutdownPostFlushExecutor() throws InterruptedException { postFlushExecutor.shutdown(); postFlushExecutor.awaitTermination(60, TimeUnit.SECONDS); } + public static void shutdownReclaimExecutor() throws InterruptedException + { + reclaimExecutor.shutdown(); + reclaimExecutor.awaitTermination(60, TimeUnit.SECONDS); + } + + public static void shutdownPerDiskFlushExecutors() throws InterruptedException + { + for (ExecutorService executorService : perDiskflushExecutors) + executorService.shutdown(); + for (ExecutorService executorService : perDiskflushExecutors) + executorService.awaitTermination(60, TimeUnit.SECONDS); + } + public void reload() { // metadata object has been mutated directly. make all the members jibe with new settings. @@ -425,19 +443,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean oldMBeanName = String.format("org.apache.cassandra.db:type=%s,keyspace=%s,columnfamily=%s", isIndex() ? "IndexColumnFamilies" : "ColumnFamilies", keyspace.getName(), name); - try - { - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - ObjectName[] objectNames = {new ObjectName(mbeanName), new ObjectName(oldMBeanName)}; - for (ObjectName objectName : objectNames) - { - mbs.registerMBean(this, objectName); - } - } - catch (Exception e) - { - throw new RuntimeException(e); - } + + String[] objectNames = {mbeanName, oldMBeanName}; + for (String objectName : objectNames) + MBeanWrapper.instance.registerMBean(this, objectName); } else { @@ -548,14 +557,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean data.removeUnreadableSSTables(directory); } - void unregisterMBean() throws MalformedObjectNameException, InstanceNotFoundException, MBeanRegistrationException + void unregisterMBean() throws MalformedObjectNameException { - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); ObjectName[] objectNames = {new ObjectName(mbeanName), new ObjectName(oldMBeanName)}; for (ObjectName objectName : objectNames) { - if (mbs.isRegistered(objectName)) - mbs.unregisterMBean(objectName); + if (MBeanWrapper.instance.isRegistered(objectName)) + MBeanWrapper.instance.unregisterMBean(objectName); } // unregister metrics http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/db/HintedHandOffManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java index 3279acf..e26f658 100644 --- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java +++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java @@ -17,13 +17,10 @@ */ package org.apache.cassandra.db; -import java.lang.management.ManagementFactory; import java.util.List; -import javax.management.MBeanServer; -import javax.management.ObjectName; - import org.apache.cassandra.hints.HintsService; +import org.apache.cassandra.utils.MBeanWrapper; /** * A proxy class that implement the deprecated legacy HintedHandoffManagerMBean interface. @@ -44,15 +41,7 @@ public final class HintedHandOffManager implements HintedHandOffManagerMBean public void registerMBean() { - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - try - { - mbs.registerMBean(this, new ObjectName(MBEAN_NAME)); - } - catch (Exception e) - { - throw new RuntimeException(e); - } + MBeanWrapper.instance.registerMBean(this, MBEAN_NAME); } public void deleteHintsForEndpoint(String host) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/db/commitlog/CommitLog.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java index 6537adc..9d2a369 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java @@ -18,19 +18,15 @@ package org.apache.cassandra.db.commitlog; import java.io.*; -import java.lang.management.ManagementFactory; import java.nio.ByteBuffer; import java.util.*; import java.util.zip.CRC32; -import javax.management.MBeanServer; -import javax.management.ObjectName; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.ParameterizedClass; import org.apache.cassandra.db.*; @@ -49,6 +45,7 @@ import org.apache.cassandra.security.EncryptionContext; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JVMStabilityInspector; +import org.apache.cassandra.utils.MBeanWrapper; import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation; import static org.apache.cassandra.db.commitlog.CommitLogSegment.CommitLogSegmentFileComparator; @@ -82,15 +79,7 @@ public class CommitLog implements CommitLogMBean { CommitLog log = new CommitLog(CommitLogArchiver.construct()); - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - try - { - mbs.registerMBean(log, new ObjectName("org.apache.cassandra.db:type=Commitlog")); - } - catch (Exception e) - { - throw new RuntimeException(e); - } + MBeanWrapper.instance.registerMBean(log, "org.apache.cassandra.db:type=Commitlog"); return log.start(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index e56ed60..bc5a883 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -19,14 +19,11 @@ package org.apache.cassandra.db.compaction; import java.io.File; import java.io.IOException; -import java.lang.management.ManagementFactory; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; import java.util.stream.Collectors; -import javax.management.MBeanServer; -import javax.management.ObjectName; import javax.management.openmbean.OpenDataException; import javax.management.openmbean.TabularData; @@ -36,7 +33,6 @@ import com.google.common.collect.*; import com.google.common.util.concurrent.*; import org.apache.cassandra.locator.RangesAtEndpoint; -import org.apache.cassandra.locator.Replica; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -119,15 +115,8 @@ public class CompactionManager implements CompactionManagerMBean static { instance = new CompactionManager(); - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - try - { - mbs.registerMBean(instance, new ObjectName(MBEAN_OBJECT_NAME)); - } - catch (Exception e) - { - throw new RuntimeException(e); - } + + MBeanWrapper.instance.registerMBean(instance, MBEAN_OBJECT_NAME); } private final CompactionExecutor executor = new CompactionExecutor(); @@ -232,6 +221,7 @@ public class CompactionManager implements CompactionManagerMBean executor.shutdown(); validationExecutor.shutdown(); viewBuildExecutor.shutdown(); + cacheCleanupExecutor.shutdown(); // interrupt compactions and validations for (Holder compactionHolder : CompactionMetrics.getCompactions()) @@ -242,7 +232,7 @@ public class CompactionManager implements CompactionManagerMBean // wait for tasks to terminate // compaction tasks are interrupted above, so it shuold be fairy quick // until not interrupted tasks to complete. - for (ExecutorService exec : Arrays.asList(executor, validationExecutor, viewBuildExecutor)) + for (ExecutorService exec : Arrays.asList(executor, validationExecutor, viewBuildExecutor, cacheCleanupExecutor)) { try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/diag/DiagnosticEventService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/diag/DiagnosticEventService.java b/src/java/org/apache/cassandra/diag/DiagnosticEventService.java index 3f3de7c..5953a1d 100644 --- a/src/java/org/apache/cassandra/diag/DiagnosticEventService.java +++ b/src/java/org/apache/cassandra/diag/DiagnosticEventService.java @@ -41,6 +41,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.utils.MBeanWrapper; /** * Service for publishing and consuming {@link DiagnosticEvent}s. @@ -62,17 +63,7 @@ public final class DiagnosticEventService implements DiagnosticEventServiceMBean private DiagnosticEventService() { - - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - try - { - ObjectName jmxObjectName = new ObjectName("org.apache.cassandra.diag:type=DiagnosticEventService"); - mbs.registerMBean(this, jmxObjectName); - } - catch (Exception e) - { - throw new RuntimeException(e); - } + MBeanWrapper.instance.registerMBean(this,"org.apache.cassandra.diag:type=DiagnosticEventService"); // register broadcasters for JMX events DiagnosticEventPersistence.start(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/diag/LastEventIdBroadcaster.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/diag/LastEventIdBroadcaster.java b/src/java/org/apache/cassandra/diag/LastEventIdBroadcaster.java index 9fe5c48..8e991e6 100644 --- a/src/java/org/apache/cassandra/diag/LastEventIdBroadcaster.java +++ b/src/java/org/apache/cassandra/diag/LastEventIdBroadcaster.java @@ -18,21 +18,19 @@ package org.apache.cassandra.diag; -import java.lang.management.ManagementFactory; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import javax.management.MBeanServer; import javax.management.Notification; import javax.management.NotificationBroadcasterSupport; import javax.management.NotificationFilter; import javax.management.NotificationListener; -import javax.management.ObjectName; import org.apache.cassandra.concurrent.ScheduledExecutors; +import org.apache.cassandra.utils.MBeanWrapper; import org.apache.cassandra.utils.progress.jmx.JMXBroadcastExecutor; /** @@ -61,16 +59,8 @@ final class LastEventIdBroadcaster extends NotificationBroadcasterSupport implem super(JMXBroadcastExecutor.executor); summary.put("last_updated_at", 0L); - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - try - { - ObjectName jmxObjectName = new ObjectName("org.apache.cassandra.diag:type=LastEventIdBroadcaster"); - mbs.registerMBean(this, jmxObjectName); - } - catch (Exception e) - { - throw new RuntimeException(e); - } + + MBeanWrapper.instance.registerMBean(this, "org.apache.cassandra.diag:type=LastEventIdBroadcaster"); } public static LastEventIdBroadcaster instance() http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/gms/FailureDetector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/FailureDetector.java b/src/java/org/apache/cassandra/gms/FailureDetector.java index d7f73ab..4a16f2a 100644 --- a/src/java/org/apache/cassandra/gms/FailureDetector.java +++ b/src/java/org/apache/cassandra/gms/FailureDetector.java @@ -21,15 +21,12 @@ import java.nio.file.Files; import java.nio.file.StandardOpenOption; import java.nio.file.Path; import java.io.*; -import java.lang.management.ManagementFactory; import java.net.UnknownHostException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; -import javax.management.MBeanServer; -import javax.management.ObjectName; import javax.management.openmbean.CompositeData; import javax.management.openmbean.*; @@ -42,6 +39,7 @@ import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.utils.Clock; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MBeanWrapper; /** * This FailureDetector is an implementation of the paper titled @@ -88,15 +86,7 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean public FailureDetector() { // Register this instance with JMX - try - { - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - mbs.registerMBean(this, new ObjectName(MBEAN_NAME)); - } - catch (Exception e) - { - throw new RuntimeException(e); - } + MBeanWrapper.instance.registerMBean(this, MBEAN_NAME); } private static long getInitialValue() http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/gms/Gossiper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index aedcb04..b789fe7 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.gms; -import java.lang.management.ManagementFactory; import java.net.UnknownHostException; import java.util.*; import java.util.Map.Entry; @@ -28,8 +27,6 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import javax.annotation.Nullable; -import javax.management.MBeanServer; -import javax.management.ObjectName; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Suppliers; @@ -41,6 +38,7 @@ import com.google.common.util.concurrent.Uninterruptibles; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.utils.CassandraVersion; +import org.apache.cassandra.utils.MBeanWrapper; import org.apache.cassandra.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -248,15 +246,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean // Register this instance with JMX if (registerJmx) { - try - { - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - mbs.registerMBean(this, new ObjectName(MBEAN_NAME)); - } - catch (Exception e) - { - throw new RuntimeException(e); - } + MBeanWrapper.instance.registerMBean(this, MBEAN_NAME); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/hints/HintsService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsService.java b/src/java/org/apache/cassandra/hints/HintsService.java index 73840d3..1a352c2 100644 --- a/src/java/org/apache/cassandra/hints/HintsService.java +++ b/src/java/org/apache/cassandra/hints/HintsService.java @@ -18,7 +18,6 @@ package org.apache.cassandra.hints; import java.io.File; -import java.lang.management.ManagementFactory; import java.net.UnknownHostException; import java.util.Collection; import java.util.Collections; @@ -29,9 +28,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; import java.util.stream.Collectors; -import javax.management.MBeanServer; -import javax.management.ObjectName; - import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import org.apache.cassandra.db.Keyspace; @@ -51,6 +47,7 @@ import org.apache.cassandra.metrics.StorageMetrics; import org.apache.cassandra.dht.Token; import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.MBeanWrapper; import static com.google.common.collect.Iterables.transform; @@ -138,15 +135,7 @@ public final class HintsService implements HintsServiceMBean public void registerMBean() { - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - try - { - mbs.registerMBean(this, new ObjectName(MBEAN_NAME)); - } - catch (Exception e) - { - throw new RuntimeException(e); - } + MBeanWrapper.instance.registerMBean(this, MBEAN_NAME); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/index/SecondaryIndexManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java index c9a7cc6..ec54a65 100644 --- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java +++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java @@ -1485,4 +1485,14 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum false); } } + + @VisibleForTesting + public static void shutdownExecutors() throws InterruptedException + { + ExecutorService[] executors = new ExecutorService[]{ asyncExecutor, blockingExecutor }; + for (ExecutorService executor : executors) + executor.shutdown(); + for (ExecutorService executor : executors) + executor.awaitTermination(60, TimeUnit.SECONDS); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java index b8d236a..3630c2a 100644 --- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java +++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java @@ -18,29 +18,32 @@ package org.apache.cassandra.io.sstable; import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import javax.management.MBeanServer; -import javax.management.ObjectName; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.*; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; -import org.apache.cassandra.db.lifecycle.View; import org.apache.cassandra.db.lifecycle.SSTableSet; -import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.lifecycle.View; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.utils.MBeanWrapper; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.WrappedRunnable; @@ -65,16 +68,7 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean static { instance = new IndexSummaryManager(); - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - - try - { - mbs.registerMBean(instance, new ObjectName(MBEAN_NAME)); - } - catch (Exception e) - { - throw new RuntimeException(e); - } + MBeanWrapper.instance.registerMBean(instance, MBEAN_NAME); } private IndexSummaryManager() http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/io/util/DataInputBuffer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/DataInputBuffer.java b/src/java/org/apache/cassandra/io/util/DataInputBuffer.java index a68dcc2..9df9861 100644 --- a/src/java/org/apache/cassandra/io/util/DataInputBuffer.java +++ b/src/java/org/apache/cassandra/io/util/DataInputBuffer.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.io.util; -import java.io.IOException; import java.nio.ByteBuffer; /** @@ -57,14 +56,17 @@ public class DataInputBuffer extends RebufferingInputStream } @Override - protected void reBuffer() throws IOException + protected void reBuffer() { //nope, we don't rebuffer, we are done! } @Override - public int available() throws IOException + public int available() { return buffer.remaining(); } + + @Override + public void close() {} } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java index d35f1fb..ddc8fba 100644 --- a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java +++ b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java @@ -18,7 +18,6 @@ package org.apache.cassandra.locator; -import java.lang.management.ManagementFactory; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.*; @@ -28,8 +27,6 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import com.codahale.metrics.ExponentiallyDecayingReservoir; -import javax.management.MBeanServer; -import javax.management.ObjectName; import com.codahale.metrics.Snapshot; import org.apache.cassandra.concurrent.ScheduledExecutors; @@ -41,6 +38,7 @@ import org.apache.cassandra.gms.VersionedValue; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MBeanWrapper; /** * A dynamic snitch that sorts endpoints by latency with an adapted phi failure detector @@ -141,15 +139,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa private void registerMBean() { - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - try - { - mbs.registerMBean(this, new ObjectName(mbeanName)); - } - catch (Exception e) - { - throw new RuntimeException(e); - } + MBeanWrapper.instance.registerMBean(this, mbeanName); } public void close() @@ -157,15 +147,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa updateSchedular.cancel(false); resetSchedular.cancel(false); - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - try - { - mbs.unregisterMBean(new ObjectName(mbeanName)); - } - catch (Exception e) - { - throw new RuntimeException(e); - } + MBeanWrapper.instance.unregisterMBean(mbeanName); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java b/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java index da90a79..d836cd1 100644 --- a/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java +++ b/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java @@ -17,28 +17,16 @@ */ package org.apache.cassandra.locator; - -import java.lang.management.ManagementFactory; import java.net.UnknownHostException; -import javax.management.MBeanServer; -import javax.management.ObjectName; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MBeanWrapper; public class EndpointSnitchInfo implements EndpointSnitchInfoMBean { public static void create() { - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - try - { - mbs.registerMBean(new EndpointSnitchInfo(), new ObjectName("org.apache.cassandra.db:type=EndpointSnitchInfo")); - } - catch (Exception e) - { - throw new RuntimeException(e); - } + MBeanWrapper.instance.registerMBean(new EndpointSnitchInfo(), "org.apache.cassandra.db:type=EndpointSnitchInfo"); } public String getDatacenter(String host) throws UnknownHostException http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java b/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java index 43d6609..74c3367 100644 --- a/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java +++ b/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.metrics; -import java.lang.management.ManagementFactory; import java.lang.reflect.Method; import java.util.Collection; import java.util.Collections; @@ -26,14 +25,14 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; - -import javax.management.MBeanServer; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; -import com.codahale.metrics.*; import com.google.common.annotations.VisibleForTesting; +import com.codahale.metrics.*; +import org.apache.cassandra.utils.MBeanWrapper; + /** * Makes integrating 3.0 metrics API with 2.0. * <p> @@ -45,7 +44,7 @@ public class CassandraMetricsRegistry extends MetricRegistry public static final CassandraMetricsRegistry Metrics = new CassandraMetricsRegistry(); private final Map<String, ThreadPoolMetrics> threadPoolMetrics = new ConcurrentHashMap<>(); - private final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); + private final MBeanWrapper mBeanServer = MBeanWrapper.instance; private CassandraMetricsRegistry() { @@ -159,11 +158,7 @@ public class CassandraMetricsRegistry extends MetricRegistry { boolean removed = remove(name.getMetricName()); - try - { - mBeanServer.unregisterMBean(name.getMBeanName()); - } catch (Exception ignore) {} - + mBeanServer.unregisterMBean(name.getMBeanName(), MBeanWrapper.OnException.IGNORE); return removed; } @@ -194,13 +189,8 @@ public class CassandraMetricsRegistry extends MetricRegistry else throw new IllegalArgumentException("Unknown metric type: " + metric.getClass()); - try - { - mBeanServer.registerMBean(mbean, name); - } - catch (Exception ignored) - { - } + if (!mBeanServer.isRegistered(name)) + mBeanServer.registerMBean(mbean, name, MBeanWrapper.OnException.LOG); } private void registerAlias(MetricName existingName, MetricName aliasName) @@ -213,10 +203,8 @@ public class CassandraMetricsRegistry extends MetricRegistry private void removeAlias(MetricName name) { - try - { - mBeanServer.unregisterMBean(name.getMBeanName()); - } catch (Exception ignore) {} + if (mBeanServer.isRegistered(name.getMBeanName())) + MBeanWrapper.instance.unregisterMBean(name.getMBeanName(), MBeanWrapper.OnException.IGNORE); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/net/ForwardToContainer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/ForwardToContainer.java b/src/java/org/apache/cassandra/net/ForwardToContainer.java index ac9e725..b22eed6 100644 --- a/src/java/org/apache/cassandra/net/ForwardToContainer.java +++ b/src/java/org/apache/cassandra/net/ForwardToContainer.java @@ -18,6 +18,7 @@ package org.apache.cassandra.net; +import java.io.Serializable; import java.util.Collection; import com.google.common.base.Preconditions; @@ -28,7 +29,7 @@ import org.apache.cassandra.locator.InetAddressAndPort; * Contains forward to information until it can be serialized as part of a message using a version * specific serialization */ -public class ForwardToContainer +public class ForwardToContainer implements Serializable { public final Collection<InetAddressAndPort> targets; public final int[] messageIds; http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/net/MessageIn.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessageIn.java b/src/java/org/apache/cassandra/net/MessageIn.java index 1cd39f3..c8f4bfc 100644 --- a/src/java/org/apache/cassandra/net/MessageIn.java +++ b/src/java/org/apache/cassandra/net/MessageIn.java @@ -49,12 +49,12 @@ public class MessageIn<T> public final int version; public final long constructionTime; - private MessageIn(InetAddressAndPort from, - T payload, - Map<ParameterType, Object> parameters, - Verb verb, - int version, - long constructionTime) + public MessageIn(InetAddressAndPort from, + T payload, + Map<ParameterType, Object> parameters, + Verb verb, + int version, + long constructionTime) { this.from = from; this.payload = payload; http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index c6e8496..761e210 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -19,7 +19,6 @@ package org.apache.cassandra.net; import java.io.IOError; import java.io.IOException; -import java.lang.management.ManagementFactory; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collection; @@ -35,8 +34,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import javax.management.MBeanServer; -import javax.management.ObjectName; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; @@ -112,6 +109,7 @@ import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.BooleanSerializer; import org.apache.cassandra.utils.ExpiringMap; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MBeanWrapper; import org.apache.cassandra.utils.NativeLibrary; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.StatusLogger; @@ -464,6 +462,20 @@ public final class MessagingService implements MessagingServiceMBean } } + public static IVersionedSerializer<?> getVerbSerializer(Verb verb, int id) + { + IVersionedSerializer serializer = verbSerializers.get(verb); + if (serializer instanceof MessagingService.CallbackDeterminedSerializer) + { + CallbackInfo callback = MessagingService.instance().getRegisteredCallback(id); + if (callback == null) + return null; + + serializer = callback.serializer; + } + return serializer; + } + /* Lookup table for registering message handlers based on the verb. */ private final Map<Verb, IVerbHandler> verbHandlers; @@ -618,15 +630,7 @@ public final class MessagingService implements MessagingServiceMBean if (!testOnly) { - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - try - { - mbs.registerMBean(this, new ObjectName(MBEAN_NAME)); - } - catch (Exception e) - { - throw new RuntimeException(e); - } + MBeanWrapper.instance.registerMBean(this, MBEAN_NAME); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/net/async/ByteBufDataInputPlus.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/async/ByteBufDataInputPlus.java b/src/java/org/apache/cassandra/net/async/ByteBufDataInputPlus.java index 23e532c..e0be715 100644 --- a/src/java/org/apache/cassandra/net/async/ByteBufDataInputPlus.java +++ b/src/java/org/apache/cassandra/net/async/ByteBufDataInputPlus.java @@ -22,6 +22,8 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufInputStream; import org.apache.cassandra.io.util.DataInputPlus; +import java.io.IOException; + public class ByteBufDataInputPlus extends ByteBufInputStream implements DataInputPlus { /** @@ -40,4 +42,10 @@ public class ByteBufDataInputPlus extends ByteBufInputStream implements DataInpu { return buf; } + + @Override + public String readUTF() throws IOException + { + return DataInputStreamPlus.readUTF(this); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/net/async/MessageInHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/async/MessageInHandler.java b/src/java/org/apache/cassandra/net/async/MessageInHandler.java index 0a194d4..dafa993 100644 --- a/src/java/org/apache/cassandra/net/async/MessageInHandler.java +++ b/src/java/org/apache/cassandra/net/async/MessageInHandler.java @@ -18,15 +18,17 @@ package org.apache.cassandra.net.async; -import java.io.DataInputStream; import java.io.IOException; import java.util.Collections; import java.util.EnumMap; import java.util.List; import java.util.Map; import java.util.function.BiConsumer; +import java.util.function.BooleanSupplier; import com.google.common.primitives.Ints; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -102,7 +104,7 @@ public class MessageInHandler extends BaseMessageInHandler { if (in.readableBytes() < messageHeader.parameterLength) return; - readParameters(in, inputPlus, messageHeader.parameterLength, messageHeader.parameters); + readParameters(in, inputPlus, messagingVersion, messageHeader.parameterLength, messageHeader.parameters); } state = State.READ_PAYLOAD_SIZE; // fall-through @@ -134,17 +136,17 @@ public class MessageInHandler extends BaseMessageInHandler } } - private void readParameters(ByteBuf in, ByteBufDataInputPlus inputPlus, int parameterLength, Map<ParameterType, Object> parameters) throws IOException + private static void readParameters(ByteBuf buf, DataInputPlus in, int messagingVersion, int parameterLength, Map<ParameterType, Object> parameters) throws IOException { // makes the assumption we have all the bytes required to read the headers - final int endIndex = in.readerIndex() + parameterLength; - while (in.readerIndex() < endIndex) + final int endIndex = buf.readerIndex() + parameterLength; + while (buf.readerIndex() < endIndex) { - String key = DataInputStream.readUTF(inputPlus); + String key = in.readUTF(); ParameterType parameterType = ParameterType.byName.get(key); - long valueLength = VIntCoding.readUnsignedVInt(in); + long valueLength = in.readUnsignedVInt(); byte[] value = new byte[Ints.checkedCast(valueLength)]; - in.readBytes(value); + in.readFully(value); try (DataInputBuffer buffer = new DataInputBuffer(value)) { parameters.put(parameterType, parameterType.serializer.deserialize(buffer, messagingVersion)); @@ -152,6 +154,55 @@ public class MessageInHandler extends BaseMessageInHandler } } + private static void readParameters(BooleanSupplier isDone, DataInputPlus in, int messagingVersion, Map<ParameterType, Object> parameters) throws IOException + { + // makes the assumption we have all the bytes required to read the headers + while (!isDone.getAsBoolean()) + { + String key = in.readUTF(); + ParameterType parameterType = ParameterType.byName.get(key); + in.readUnsignedVInt(); + parameters.put(parameterType, parameterType.serializer.deserialize(in, messagingVersion)); + } + } + + public static MessageIn<?> deserialize(DataInputPlus in, int id, int version, InetAddressAndPort from) throws IOException + { + if (version >= MessagingService.VERSION_40) + return deserialize40(in, id, version, from); + else + return MessageInHandlerPre40.deserializePre40(in, id, version, from); + } + + private static MessageIn<?> deserialize40(DataInputPlus in, int id, int version, InetAddressAndPort from) throws IOException + { + MessagingService.Verb verb = MessagingService.Verb.fromId(in.readInt()); + + Map<ParameterType, Object> parameters = Collections.emptyMap(); + int parameterLength = (int) in.readUnsignedVInt(); + if (parameterLength != 0) + { + parameters = new EnumMap<>(ParameterType.class); + byte[] bytes = new byte[parameterLength]; + in.readFully(bytes); + try (DataInputBuffer buffer = new DataInputBuffer(bytes)) + { + readParameters(() -> buffer.available() == 0, buffer, version, parameters); + } + } + + Object payload = null; + int payloadSize = (int) in.readUnsignedVInt(); + if (payloadSize > 0) + { + IVersionedSerializer serializer = MessagingService.getVerbSerializer(verb, id); + if (serializer == null) in.skipBytesFully(payloadSize); + else payload = serializer.deserialize(in, version); + } + + return new MessageIn<>(from, payload, parameters, verb, version, System.nanoTime()); + } + @Override MessageHeader getMessageHeader() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/net/async/MessageInHandlerPre40.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/async/MessageInHandlerPre40.java b/src/java/org/apache/cassandra/net/async/MessageInHandlerPre40.java index f5b6fc4..6eeeea7 100644 --- a/src/java/org/apache/cassandra/net/async/MessageInHandlerPre40.java +++ b/src/java/org/apache/cassandra/net/async/MessageInHandlerPre40.java @@ -25,8 +25,11 @@ import java.util.EnumMap; import java.util.List; import java.util.Map; import java.util.function.BiConsumer; +import java.util.function.BooleanSupplier; import com.google.common.annotations.VisibleForTesting; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -155,10 +158,10 @@ public class MessageInHandlerPre40 extends BaseMessageInHandler if (!canReadNextParam(in)) return false; - String key = DataInputStream.readUTF(inputPlus); + String key = inputPlus.readUTF(); ParameterType parameterType = ParameterType.byName.get(key); - byte[] value = new byte[in.readInt()]; - in.readBytes(value); + byte[] value = new byte[inputPlus.readInt()]; + inputPlus.readFully(value); try (DataInputBuffer buffer = new DataInputBuffer(value)) { parameters.put(parameterType, parameterType.serializer.deserialize(buffer, messagingVersion)); @@ -168,6 +171,47 @@ public class MessageInHandlerPre40 extends BaseMessageInHandler return true; } + private static boolean readParameters(DataInputPlus in, int messagingVersion, int parameterCount, Map<ParameterType, Object> parameters) throws IOException + { + // makes the assumption that map.size() is a constant time function (HashMap.size() is) + while (parameters.size() < parameterCount) + { + String key = in.readUTF(); + ParameterType parameterType = ParameterType.byName.get(key); + in.readInt(); + parameters.put(parameterType, parameterType.serializer.deserialize(in, messagingVersion)); + } + + return true; + } + + static MessageIn<?> deserializePre40(DataInputPlus in, int id, int version, InetAddressAndPort from) throws IOException + { + assert from.equals(CompactEndpointSerializationHelper.instance.deserialize(in, version)); + MessagingService.Verb verb = MessagingService.Verb.fromId(in.readInt()); + + Map<ParameterType, Object> parameters = Collections.emptyMap(); + int parameterCount = in.readInt(); + if (parameterCount != 0) + { + parameters = new EnumMap<>(ParameterType.class); + readParameters(in, version, parameterCount, parameters); + } + + Object payload = null; + int payloadSize = in.readInt(); + if (payloadSize > 0) + { + IVersionedSerializer serializer = MessagingService.getVerbSerializer(verb, id); + if (serializer == null) in.skipBytesFully(payloadSize); + else payload = serializer.deserialize(in, version); + } + + return new MessageIn<>(from, payload, parameters, verb, version, System.nanoTime()); + } + + + /** * Determine if we can read the next parameter from the {@link ByteBuf}. This method will *always* set the {@code in} * readIndex back to where it was when this method was invoked. http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/net/async/NettyFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/async/NettyFactory.java b/src/java/org/apache/cassandra/net/async/NettyFactory.java index 989e33c..2366722 100644 --- a/src/java/org/apache/cassandra/net/async/NettyFactory.java +++ b/src/java/org/apache/cassandra/net/async/NettyFactory.java @@ -2,6 +2,7 @@ package org.apache.cassandra.net.async; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.util.concurrent.TimeUnit; import java.util.zip.Checksum; import javax.annotation.Nullable; @@ -384,12 +385,13 @@ public final class NettyFactory } } - public void close() + public void close() throws InterruptedException { - acceptGroup.shutdownGracefully(); - outboundGroup.shutdownGracefully(); - inboundGroup.shutdownGracefully(); - streamingGroup.shutdownGracefully(); + EventLoopGroup[] groups = new EventLoopGroup[] { acceptGroup, outboundGroup, inboundGroup, streamingGroup }; + for (EventLoopGroup group : groups) + group.shutdownGracefully(); + for (EventLoopGroup group : groups) + group.awaitTermination(60, TimeUnit.SECONDS); } static Lz4FrameEncoder createLz4Encoder(int protocolVersion) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/schema/SchemaEvent.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/SchemaEvent.java b/src/java/org/apache/cassandra/schema/SchemaEvent.java index e26cee5..00c8136 100644 --- a/src/java/org/apache/cassandra/schema/SchemaEvent.java +++ b/src/java/org/apache/cassandra/schema/SchemaEvent.java @@ -29,13 +29,14 @@ import java.util.UUID; import java.util.stream.Collectors; import javax.annotation.Nullable; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.collect.MapDifference; import org.apache.cassandra.diag.DiagnosticEvent; import org.apache.cassandra.utils.Pair; -final class SchemaEvent extends DiagnosticEvent +public final class SchemaEvent extends DiagnosticEvent { private final SchemaEventType type; @@ -62,7 +63,7 @@ final class SchemaEvent extends DiagnosticEvent @Nullable private final MapDifference<String,TableMetadata> indexesDiff; - enum SchemaEventType + public enum SchemaEventType { KS_METADATA_LOADED, KS_METADATA_RELOADED, http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index b32f67e..1a54e75 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -18,15 +18,11 @@ package org.apache.cassandra.service; import java.io.IOException; -import java.lang.management.ManagementFactory; import java.net.UnknownHostException; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; -import javax.management.MBeanServer; -import javax.management.ObjectName; - import com.google.common.base.Preconditions; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; @@ -78,6 +74,7 @@ import org.apache.cassandra.schema.TableId; import org.apache.cassandra.utils.CassandraVersion; import org.apache.cassandra.utils.Clock; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MBeanWrapper; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.UUIDGen; @@ -169,15 +166,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai .maximumSize(Long.getLong("cassandra.parent_repair_status_cache_size", 100_000)) .build(); - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - try - { - mbs.registerMBean(this, new ObjectName(MBEAN_NAME)); - } - catch (Exception e) - { - throw new RuntimeException(e); - } + MBeanWrapper.instance.registerMBean(this, MBEAN_NAME); } public void start() http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/service/CacheService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java index 479470c..5eeaf20 100644 --- a/src/java/org/apache/cassandra/service/CacheService.java +++ b/src/java/org/apache/cassandra/service/CacheService.java @@ -18,7 +18,6 @@ package org.apache.cassandra.service; import java.io.IOException; -import java.lang.management.ManagementFactory; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Iterator; @@ -27,9 +26,6 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import javax.management.MBeanServer; -import javax.management.ObjectName; - import com.google.common.util.concurrent.Futures; import org.slf4j.Logger; @@ -53,6 +49,7 @@ import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MBeanWrapper; import org.apache.cassandra.utils.Pair; public class CacheService implements CacheServiceMBean @@ -88,16 +85,7 @@ public class CacheService implements CacheServiceMBean private CacheService() { - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - - try - { - mbs.registerMBean(this, new ObjectName(MBEAN_NAME)); - } - catch (Exception e) - { - throw new RuntimeException(e); - } + MBeanWrapper.instance.registerMBean(this, MBEAN_NAME); keyCache = initKeyCache(); rowCache = initRowCache(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/service/CassandraDaemon.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index f0b2dc1..592419a 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -569,16 +569,7 @@ public class CassandraDaemon { applyConfig(); - try - { - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - mbs.registerMBean(new StandardMBean(new NativeAccess(), NativeAccessMBean.class), new ObjectName(MBEAN_NAME)); - } - catch (Exception e) - { - logger.error("error registering MBean {}", MBEAN_NAME, e); - //Allow the server to start even if the bean can't be registered - } + MBeanWrapper.instance.registerMBean(new StandardMBean(new NativeAccess(), NativeAccessMBean.class), MBEAN_NAME, MBeanWrapper.OnException.LOG); if (FBUtilities.isWindows) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java index 7b6bd58..a3f6b52 100644 --- a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java +++ b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java @@ -30,6 +30,8 @@ import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.annotations.VisibleForTesting; + public class PendingRangeCalculatorService { public static final PendingRangeCalculatorService instance = new PendingRangeCalculatorService(); @@ -117,4 +119,11 @@ public class PendingRangeCalculatorService { StorageService.instance.getTokenMetadata().calculatePendingRanges(strategy, keyspaceName); } + + @VisibleForTesting + public void shutdownExecutor() throws InterruptedException + { + executor.shutdown(); + executor.awaitTermination(60, TimeUnit.SECONDS); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org