Repository: cassandra Updated Branches: refs/heads/trunk 1bd5c64ba -> 8badc2856
Centralize shared executors patch by Sam Tunnicliffe; reviewed by Aleksey Yeschenko for CASSANDRA-8055 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4397c344 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4397c344 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4397c344 Branch: refs/heads/trunk Commit: 4397c34476070ea15ee0d2b9c625887a8b08b622 Parents: e3862bc Author: Sam Tunnicliffe <[email protected]> Authored: Thu Nov 20 01:42:03 2014 +0300 Committer: Aleksey Yeschenko <[email protected]> Committed: Thu Nov 20 01:57:01 2014 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/auth/Auth.java | 17 ++++---- .../cassandra/auth/PasswordAuthenticator.java | 18 ++++---- .../apache/cassandra/cache/AutoSavingCache.java | 10 ++--- .../concurrent/ScheduledExecutors.java | 43 ++++++++++++++++++++ .../apache/cassandra/cql3/QueryProcessor.java | 5 +-- .../apache/cassandra/db/BatchlogManager.java | 8 +++- .../apache/cassandra/db/ColumnFamilyStore.java | 42 +++++++++++-------- .../cassandra/db/HintedHandOffManager.java | 7 ++-- .../db/commitlog/CommitLogArchiver.java | 2 +- .../io/sstable/SSTableDeletingTask.java | 6 +-- .../cassandra/io/sstable/SSTableReader.java | 3 +- .../org/apache/cassandra/io/util/FileUtils.java | 4 +- .../locator/DynamicEndpointSnitch.java | 5 ++- .../apache/cassandra/net/MessagingService.java | 4 +- .../cassandra/service/CassandraDaemon.java | 3 +- .../cassandra/service/LoadBroadcaster.java | 3 +- .../cassandra/service/MigrationManager.java | 3 +- .../cassandra/service/StorageService.java | 38 ++++------------- .../apache/cassandra/utils/ResourceWatcher.java | 4 +- .../org/apache/cassandra/cql3/CQLTester.java | 6 +-- .../org/apache/cassandra/db/KeyCacheTest.java | 5 +-- 22 files changed, 139 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c00e671..41a5aaf 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.3 + * Centralize shared executors (CASSANDRA-8055) * Fix filtering for CONTAINS (KEY) relations on frozen collection clustering columns when the query is restricted to a single partition (CASSANDRA-8203) http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/auth/Auth.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/auth/Auth.java b/src/java/org/apache/cassandra/auth/Auth.java index 4f18111..ed7aa87 100644 --- a/src/java/org/apache/cassandra/auth/Auth.java +++ b/src/java/org/apache/cassandra/auth/Auth.java @@ -29,6 +29,7 @@ import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.KSMetaData; @@ -189,15 +190,13 @@ public class Auth implements AuthMBean // the delay is here to give the node some time to see its peers - to reduce // "Skipped default superuser setup: some nodes were not ready" log spam. // It's the only reason for the delay. - StorageService.tasks.schedule(new Runnable() - { - public void run() - { - setupDefaultSuperuser(); - } - }, - SUPERUSER_SETUP_DELAY, - TimeUnit.MILLISECONDS); + ScheduledExecutors.nonPeriodicTasks.schedule(new Runnable() + { + public void run() + { + setupDefaultSuperuser(); + } + }, SUPERUSER_SETUP_DELAY, TimeUnit.MILLISECONDS); try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java b/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java index 1218ee2..9570770 100644 --- a/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java +++ b/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java @@ -30,6 +30,7 @@ import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.QueryOptions; @@ -37,7 +38,6 @@ import org.apache.cassandra.cql3.statements.SelectStatement; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.exceptions.*; import org.apache.cassandra.service.QueryState; -import org.apache.cassandra.service.StorageService; import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.utils.ByteBufferUtil; import org.mindrot.jbcrypt.BCrypt; @@ -169,15 +169,13 @@ public class PasswordAuthenticator implements ISaslAwareAuthenticator // the delay is here to give the node some time to see its peers - to reduce // "skipped default user setup: some nodes are were not ready" log spam. // It's the only reason for the delay. - StorageService.tasks.schedule(new Runnable() - { - public void run() - { - setupDefaultUser(); - } - }, - Auth.SUPERUSER_SETUP_DELAY, - TimeUnit.MILLISECONDS); + ScheduledExecutors.nonPeriodicTasks.schedule(new Runnable() + { + public void run() + { + setupDefaultUser(); + } + }, Auth.SUPERUSER_SETUP_DELAY, TimeUnit.MILLISECONDS); try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/cache/AutoSavingCache.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java index fca939a..2117eb8 100644 --- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java +++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java @@ -27,6 +27,7 @@ import org.cliffc.high_scale_lib.NonBlockingHashSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; @@ -39,7 +40,6 @@ import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.util.*; import org.apache.cassandra.service.CacheService; -import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.Pair; @@ -121,10 +121,10 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K submitWrite(keysToSave); } }; - saveTask = StorageService.optionalTasks.scheduleWithFixedDelay(runnable, - savePeriodInSeconds, - savePeriodInSeconds, - TimeUnit.SECONDS); + saveTask = ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(runnable, + savePeriodInSeconds, + savePeriodInSeconds, + TimeUnit.SECONDS); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java b/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java new file mode 100644 index 0000000..5935669 --- /dev/null +++ b/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.concurrent; + +/** + * Centralized location for shared executors + */ +public class ScheduledExecutors +{ + /** + * This pool is used for periodic short (sub-second) tasks. + */ + public static final DebuggableScheduledThreadPoolExecutor scheduledTasks = new DebuggableScheduledThreadPoolExecutor("ScheduledTasks"); + + /** + * This executor is used for tasks that can have longer execution times, and usually are non periodic. + */ + public static final DebuggableScheduledThreadPoolExecutor nonPeriodicTasks = new DebuggableScheduledThreadPoolExecutor("NonPeriodicTasks"); + static + { + nonPeriodicTasks.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); + } + + /** + * This executor is used for tasks that do not need to be waited for on shutdown/drain. + */ + public static final DebuggableScheduledThreadPoolExecutor optionalTasks = new DebuggableScheduledThreadPoolExecutor("OptionalTasks"); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/cql3/QueryProcessor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java index 9f71d71..45ef39c 100644 --- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java +++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java @@ -22,7 +22,6 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; -import com.google.common.annotations.VisibleForTesting; import com.google.common.primitives.Ints; import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap; @@ -33,6 +32,7 @@ import org.github.jamm.MemoryMeter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.cql3.statements.*; import org.apache.cassandra.db.*; import org.apache.cassandra.db.composites.*; @@ -89,7 +89,6 @@ public class QueryProcessor implements QueryHandler public static final CQLMetrics metrics = new CQLMetrics(); private static final AtomicInteger lastMinuteEvictionsCount = new AtomicInteger(0); - private static final ScheduledExecutorService evictionCheckTimer = Executors.newScheduledThreadPool(1); static { @@ -118,7 +117,7 @@ public class QueryProcessor implements QueryHandler }) .build(); - evictionCheckTimer.scheduleAtFixedRate(new Runnable() + ScheduledExecutors.scheduledTasks.scheduleAtFixedRate(new Runnable() { public void run() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/db/BatchlogManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java index 279f876..20f134d 100644 --- a/src/java/org/apache/cassandra/db/BatchlogManager.java +++ b/src/java/org/apache/cassandra/db/BatchlogManager.java @@ -70,7 +70,7 @@ public class BatchlogManager implements BatchlogManagerMBean private final AtomicLong totalBatchesReplayed = new AtomicLong(); // Single-thread executor service for scheduling and serializing log replay. - public static final ScheduledExecutorService batchlogTasks = new DebuggableScheduledThreadPoolExecutor("BatchlogTasks"); + private static final ScheduledExecutorService batchlogTasks = new DebuggableScheduledThreadPoolExecutor("BatchlogTasks"); public void start() { @@ -95,6 +95,12 @@ public class BatchlogManager implements BatchlogManagerMBean batchlogTasks.scheduleWithFixedDelay(runnable, StorageService.RING_DELAY, REPLAY_INTERVAL, TimeUnit.MILLISECONDS); } + public static void shutdown() throws InterruptedException + { + batchlogTasks.shutdown(); + batchlogTasks.awaitTermination(60, TimeUnit.SECONDS); + } + public int countAllBatches() { String query = String.format("SELECT count(*) FROM %s.%s", Keyspace.SYSTEM_KS, SystemKeyspace.BATCHLOG_CF); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 0fa50bb..7e1dd18 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -33,15 +33,13 @@ import com.google.common.collect.*; import com.google.common.util.concurrent.*; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Uninterruptibles; -import org.apache.cassandra.io.FSWriteError; + import org.json.simple.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.cache.*; -import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; -import org.apache.cassandra.concurrent.NamedThreadFactory; -import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.concurrent.*; import org.apache.cassandra.config.*; import org.apache.cassandra.config.CFMetaData.SpeculativeRetry; import org.apache.cassandra.db.commitlog.CommitLog; @@ -61,6 +59,7 @@ import org.apache.cassandra.dht.*; import org.apache.cassandra.dht.Range; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.FSReadError; +import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.compress.CompressionParameters; import org.apache.cassandra.io.sstable.*; import org.apache.cassandra.io.sstable.Descriptor; @@ -86,18 +85,21 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("MemtableFlushWriter"), "internal"); + // post-flush executor is single threaded to provide guarantee that any flush Future on a CF will never return until prior flushes have completed - public static final ExecutorService postFlushExecutor = new JMXEnabledThreadPoolExecutor(1, - StageManager.KEEPALIVE, - TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(), - new NamedThreadFactory("MemtablePostFlush"), - "internal"); - public static final ExecutorService reclaimExecutor = new JMXEnabledThreadPoolExecutor(1, StageManager.KEEPALIVE, - TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(), - new NamedThreadFactory("MemtableReclaimMemory"), - "internal"); + private static final ExecutorService postFlushExecutor = new JMXEnabledThreadPoolExecutor(1, + StageManager.KEEPALIVE, + TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>(), + new NamedThreadFactory("MemtablePostFlush"), + "internal"); + + private static final ExecutorService reclaimExecutor = new JMXEnabledThreadPoolExecutor(1, + StageManager.KEEPALIVE, + TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>(), + new NamedThreadFactory("MemtableReclaimMemory"), + "internal"); public final Keyspace keyspace; public final String name; @@ -134,6 +136,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public final ColumnFamilyMetrics metric; public volatile long sampleLatencyNanos; + public static void shutdownPostFlushExecutor() throws InterruptedException + { + postFlushExecutor.shutdown(); + postFlushExecutor.awaitTermination(60, TimeUnit.SECONDS); + } + public void reload() { // metadata object has been mutated directly. make all the members jibe with new settings. @@ -188,7 +196,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } } }; - StorageService.scheduledTasks.schedule(runnable, period, TimeUnit.MILLISECONDS); + ScheduledExecutors.scheduledTasks.schedule(runnable, period, TimeUnit.MILLISECONDS); } } @@ -310,7 +318,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean throw new RuntimeException(e); } logger.debug("retryPolicy for {} is {}", name, this.metadata.getSpeculativeRetry()); - StorageService.optionalTasks.scheduleWithFixedDelay(new Runnable() + ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(new Runnable() { public void run() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/db/HintedHandOffManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java index 0e68a71..ad8546e 100644 --- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java +++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java @@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; @@ -176,7 +177,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean metrics.log(); } }; - StorageService.optionalTasks.scheduleWithFixedDelay(runnable, 10, 10, TimeUnit.MINUTES); + ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(runnable, 10, 10, TimeUnit.MINUTES); } private static void deleteHint(ByteBuffer tokenBytes, CellName columnName, long timestamp) @@ -228,7 +229,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean } } }; - StorageService.optionalTasks.submit(runnable); + ScheduledExecutors.optionalTasks.submit(runnable); } //foobar @@ -249,7 +250,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean } } }; - StorageService.optionalTasks.submit(runnable).get(); + ScheduledExecutors.optionalTasks.submit(runnable).get(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java index 1b1a1e0..6cba603 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java @@ -52,7 +52,7 @@ public class CommitLogArchiver } public final Map<String, Future<?>> archivePending = new ConcurrentHashMap<String, Future<?>>(); - public final ExecutorService executor = new JMXEnabledThreadPoolExecutor("CommitLogArchiver"); + private final ExecutorService executor = new JMXEnabledThreadPoolExecutor("CommitLogArchiver"); private final String archiveCommand; private final String restoreCommand; private final String restoreDirectories; http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java b/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java index d95dff7..fb1cbb3 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java @@ -27,9 +27,9 @@ import com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.db.DataTracker; import org.apache.cassandra.db.SystemKeyspace; -import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; public class SSTableDeletingTask implements Runnable @@ -69,7 +69,7 @@ public class SSTableDeletingTask implements Runnable public void schedule() { - StorageService.tasks.submit(this); + ScheduledExecutors.nonPeriodicTasks.submit(this); } public void run() @@ -119,7 +119,7 @@ public class SSTableDeletingTask implements Runnable } }; - FBUtilities.waitOnFuture(StorageService.tasks.schedule(runnable, 0, TimeUnit.MILLISECONDS)); + FBUtilities.waitOnFuture(ScheduledExecutors.nonPeriodicTasks.schedule(runnable, 0, TimeUnit.MILLISECONDS)); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java index 8f302f3..a3e3cf5 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@ -59,6 +59,7 @@ import org.apache.cassandra.cache.CachingOptions; import org.apache.cassandra.cache.InstrumentingCache; import org.apache.cassandra.cache.KeyCacheKey; import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; +import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.config.Config; @@ -635,7 +636,7 @@ public class SSTableReader extends SSTable else barrier = null; - StorageService.tasks.execute(new Runnable() + ScheduledExecutors.nonPeriodicTasks.execute(new Runnable() { public void run() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/io/util/FileUtils.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java index 295679e..7d187ac 100644 --- a/src/java/org/apache/cassandra/io/util/FileUtils.java +++ b/src/java/org/apache/cassandra/io/util/FileUtils.java @@ -38,7 +38,7 @@ import sun.nio.ch.DirectBuffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.config.Config; +import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.BlacklistedDirectories; import org.apache.cassandra.db.Keyspace; @@ -326,7 +326,7 @@ public class FileUtils deleteWithConfirm(new File(file)); } }; - StorageService.tasks.execute(runnable); + ScheduledExecutors.nonPeriodicTasks.execute(runnable); } public static String stringifyFileSize(double value) http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java index 49442c8..e4b714c 100644 --- a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java +++ b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java @@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit; import javax.management.MBeanServer; import javax.management.ObjectName; +import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageService; @@ -84,8 +85,8 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa reset(); } }; - StorageService.scheduledTasks.scheduleWithFixedDelay(update, UPDATE_INTERVAL_IN_MS, UPDATE_INTERVAL_IN_MS, TimeUnit.MILLISECONDS); - StorageService.scheduledTasks.scheduleWithFixedDelay(reset, RESET_INTERVAL_IN_MS, RESET_INTERVAL_IN_MS, TimeUnit.MILLISECONDS); + ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(update, UPDATE_INTERVAL_IN_MS, UPDATE_INTERVAL_IN_MS, TimeUnit.MILLISECONDS); + ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(reset, RESET_INTERVAL_IN_MS, RESET_INTERVAL_IN_MS, TimeUnit.MILLISECONDS); registerMBean(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 05b449c..73bc9ff 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -37,6 +37,8 @@ import com.google.common.collect.Lists; import org.cliffc.high_scale_lib.NonBlockingHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.concurrent.TracingAwareExecutorService; @@ -329,7 +331,7 @@ public final class MessagingService implements MessagingServiceMBean logDroppedMessages(); } }; - StorageService.scheduledTasks.scheduleWithFixedDelay(logDropped, LOG_DROPPED_INTERVAL_IN_MS, LOG_DROPPED_INTERVAL_IN_MS, TimeUnit.MILLISECONDS); + ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(logDropped, LOG_DROPPED_INTERVAL_IN_MS, LOG_DROPPED_INTERVAL_IN_MS, TimeUnit.MILLISECONDS); Function<Pair<Integer, ExpiringMap.CacheableObject<CallbackInfo>>, ?> timeoutReporter = new Function<Pair<Integer, ExpiringMap.CacheableObject<CallbackInfo>>, Object>() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/service/CassandraDaemon.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index 5897a22..1c99348 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory; import com.addthis.metrics.reporter.config.ReporterConfig; import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; +import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.CFMetaData; @@ -329,7 +330,7 @@ public class CassandraDaemon } } }; - StorageService.optionalTasks.schedule(runnable, 5 * 60, TimeUnit.SECONDS); + ScheduledExecutors.optionalTasks.schedule(runnable, 5 * 60, TimeUnit.SECONDS); SystemKeyspace.finishStartup(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/service/LoadBroadcaster.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/LoadBroadcaster.java b/src/java/org/apache/cassandra/service/LoadBroadcaster.java index 4996e52..d12ffba 100644 --- a/src/java/org/apache/cassandra/service/LoadBroadcaster.java +++ b/src/java/org/apache/cassandra/service/LoadBroadcaster.java @@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.gms.*; public class LoadBroadcaster implements IEndpointStateChangeSubscriber @@ -91,7 +92,7 @@ public class LoadBroadcaster implements IEndpointStateChangeSubscriber StorageService.instance.valueFactory.load(StorageService.instance.getLoad())); } }; - StorageService.scheduledTasks.scheduleWithFixedDelay(runnable, 2 * Gossiper.intervalInMillis, BROADCAST_INTERVAL, TimeUnit.MILLISECONDS); + ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(runnable, 2 * Gossiper.intervalInMillis, BROADCAST_INTERVAL, TimeUnit.MILLISECONDS); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/service/MigrationManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java index bdae208..ce4dca4 100644 --- a/src/java/org/apache/cassandra/service/MigrationManager.java +++ b/src/java/org/apache/cassandra/service/MigrationManager.java @@ -32,6 +32,7 @@ import java.lang.management.RuntimeMXBean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.CFMetaData; @@ -126,7 +127,7 @@ public class MigrationManager submitMigrationTask(endpoint); } }; - StorageService.optionalTasks.schedule(runnable, MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS); + ScheduledExecutors.optionalTasks.schedule(runnable, MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 29054f4..ae8c798 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -54,7 +54,7 @@ import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.auth.Auth; -import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; +import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.CFMetaData; @@ -121,24 +121,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return 30 * 1000; } - /** - * This pool is used for periodic short (sub-second) tasks. - */ - public static final DebuggableScheduledThreadPoolExecutor scheduledTasks = new DebuggableScheduledThreadPoolExecutor("ScheduledTasks"); - - /** - * This pool is used by tasks that can have longer execution times, and usually are non periodic. - */ - public static final DebuggableScheduledThreadPoolExecutor tasks = new DebuggableScheduledThreadPoolExecutor("NonPeriodicTasks"); - /** - * tasks that do not need to be waited for on shutdown/drain - */ - public static final DebuggableScheduledThreadPoolExecutor optionalTasks = new DebuggableScheduledThreadPoolExecutor("OptionalTasks"); - static - { - tasks.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); - } - /* This abstraction maintains the token/endpoint metadata information */ private TokenMetadata tokenMetadata = new TokenMetadata(); @@ -597,7 +579,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE if (daemon != null) shutdownClientServers(); - optionalTasks.shutdown(); + ScheduledExecutors.optionalTasks.shutdown(); Gossiper.instance.stop(); // In-progress writes originating here could generate hints to be written, so shut down MessagingService @@ -633,8 +615,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE CommitLog.instance.shutdownBlocking(); // wait for miscellaneous tasks like sstable and commitlog segment deletion - tasks.shutdown(); - if (!tasks.awaitTermination(1, TimeUnit.MINUTES)) + ScheduledExecutors.nonPeriodicTasks.shutdown(); + if (!ScheduledExecutors.nonPeriodicTasks.awaitTermination(1, TimeUnit.MINUTES)) logger.warn("Miscellaneous task executor still busy after one minute; proceeding with shutdown"); } }, "StorageServiceShutdownHook"); @@ -3602,7 +3584,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } setMode(Mode.DRAINING, "starting drain process", true); shutdownClientServers(); - optionalTasks.shutdown(); + ScheduledExecutors.optionalTasks.shutdown(); Gossiper.instance.stop(); setMode(Mode.DRAINING, "shutting down MessageService", false); @@ -3647,21 +3629,19 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } FBUtilities.waitOnFutures(flushes); - BatchlogManager.batchlogTasks.shutdown(); - BatchlogManager.batchlogTasks.awaitTermination(60, TimeUnit.SECONDS); + BatchlogManager.shutdown(); // whilst we've flushed all the CFs, which will have recycled all completed segments, we want to ensure // there are no segments to replay, so we force the recycling of any remaining (should be at most one) CommitLog.instance.forceRecycleAllSegments(); - ColumnFamilyStore.postFlushExecutor.shutdown(); - ColumnFamilyStore.postFlushExecutor.awaitTermination(60, TimeUnit.SECONDS); + ColumnFamilyStore.shutdownPostFlushExecutor(); CommitLog.instance.shutdownBlocking(); // wait for miscellaneous tasks like sstable and commitlog segment deletion - tasks.shutdown(); - if (!tasks.awaitTermination(1, TimeUnit.MINUTES)) + ScheduledExecutors.nonPeriodicTasks.shutdown(); + if (!ScheduledExecutors.nonPeriodicTasks.awaitTermination(1, TimeUnit.MINUTES)) logger.warn("Miscellaneous task executor still busy after one minute; proceeding with shutdown"); setMode(Mode.DRAINED, true); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/utils/ResourceWatcher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/ResourceWatcher.java b/src/java/org/apache/cassandra/utils/ResourceWatcher.java index 2dfab95..5e7cbdd 100644 --- a/src/java/org/apache/cassandra/utils/ResourceWatcher.java +++ b/src/java/org/apache/cassandra/utils/ResourceWatcher.java @@ -23,13 +23,13 @@ import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.concurrent.ScheduledExecutors; public class ResourceWatcher { public static void watch(String resource, Runnable callback, int period) { - StorageService.scheduledTasks.scheduleWithFixedDelay(new WatchedResource(resource, callback), period, period, TimeUnit.MILLISECONDS); + ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(new WatchedResource(resource, callback), period, period, TimeUnit.MILLISECONDS); } public static class WatchedResource implements Runnable http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/test/unit/org/apache/cassandra/cql3/CQLTester.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java index 470b701..dd22896 100644 --- a/test/unit/org/apache/cassandra/cql3/CQLTester.java +++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java @@ -35,6 +35,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.Directories; @@ -43,7 +44,6 @@ import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.exceptions.*; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.serializers.TypeSerializer; -import org.apache.cassandra.service.StorageService; /** * Base class for CQL tests. @@ -88,7 +88,7 @@ public abstract class CQLTester currentTypes.clear(); // We want to clean up after the test, but dropping a table is rather long so just do that asynchronously - StorageService.optionalTasks.execute(new Runnable() + ScheduledExecutors.optionalTasks.execute(new Runnable() { public void run() { @@ -105,7 +105,7 @@ public abstract class CQLTester // mono-threaded, just push a task on the queue to find when it's empty. No perfect but good enough. final CountDownLatch latch = new CountDownLatch(1); - StorageService.tasks.execute(new Runnable() + ScheduledExecutors.nonPeriodicTasks.execute(new Runnable() { public void run() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/test/unit/org/apache/cassandra/db/KeyCacheTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/KeyCacheTest.java b/test/unit/org/apache/cassandra/db/KeyCacheTest.java index c0560ab..1bc7caf 100644 --- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java +++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.db; -import java.nio.file.Files; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -31,12 +30,12 @@ import org.junit.Test; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; import org.apache.cassandra.cache.KeyCacheKey; +import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.db.composites.*; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.service.CacheService; -import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; import static org.junit.Assert.assertEquals; @@ -165,7 +164,7 @@ public class KeyCacheTest extends SchemaLoader reader.releaseReference(); Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);; - while (StorageService.tasks.getActiveCount() + StorageService.tasks.getQueue().size() > 0); + while (ScheduledExecutors.nonPeriodicTasks.getActiveCount() + ScheduledExecutors.nonPeriodicTasks.getQueue().size() > 0); // after releasing the reference this should drop to 2 assertKeyCacheSize(2, KEYSPACE1, COLUMN_FAMILY1);
