This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch cluster-
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/cluster- by this push:
new e0ac0c0 add threadpool implementataion
e0ac0c0 is described below
commit e0ac0c01710ee6f6f68dac29eb739eb1f38dd397
Author: xiangdong huang <[email protected]>
AuthorDate: Wed Aug 11 01:25:07 2021 +0800
add threadpool implementataion
---
.../db/concurrent/IoTDBThreadPoolFactory.java | 93 +++++++---
.../db/concurrent/threadpool/IThreadPoolMBean.java | 44 +++++
.../WrappedScheduledExecutorService.java | 192 +++++++++++++++++++++
.../WrappedScheduledExecutorServiceMBean.java | 21 +++
.../WrappedSingleThreadExecutorService.java | 118 +++++++++++++
.../WrappedSingleThreadExecutorServiceMBean.java | 21 +++
.../WrappedSingleThreadScheduledExecutor.java | 123 +++++++++++++
.../WrappedSingleThreadScheduledExecutorMBean.java | 21 +++
.../threadpool/WrappedThreadPoolExecutor.java | 88 ++++++++++
.../threadpool/WrappedThreadPoolExecutorMBean.java | 21 +++
.../org/apache/iotdb/db/conf/IoTDBConstant.java | 1 +
11 files changed, 721 insertions(+), 22 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java
b/server/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java
index 7c75e89..8e4eed9 100644
---
a/server/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java
+++
b/server/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java
@@ -18,18 +18,27 @@
*/
package org.apache.iotdb.db.concurrent;
+import
org.apache.iotdb.db.concurrent.threadpool.WrappedScheduledExecutorService;
+import
org.apache.iotdb.db.concurrent.threadpool.WrappedSingleThreadExecutorService;
+import
org.apache.iotdb.db.concurrent.threadpool.WrappedSingleThreadScheduledExecutor;
+import org.apache.iotdb.db.concurrent.threadpool.WrappedThreadPoolExecutor;
+
import org.apache.thrift.server.TThreadPoolServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-/** This class is used to create thread pool which must contain the pool name.
*/
+/**
+ * This class is used to create thread pool which must contain the pool name.
Notice that IoTDB
+ * project does not allow creating ThreadPool using Executors.newXXX()
function to get a threadpool
+ * because it is hard to be traced.
+ */
public class IoTDBThreadPoolFactory {
private static final Logger logger =
LoggerFactory.getLogger(IoTDBThreadPoolFactory.class);
@@ -42,15 +51,30 @@ public class IoTDBThreadPoolFactory {
* @param poolName - the name of thread pool
* @return fixed size thread pool
*/
- public static ExecutorService newFixedThreadPool(int nthreads, String
poolName) {
- logger.info("new fixed thread pool: {}, thread number: {}", poolName,
nthreads);
- return Executors.newFixedThreadPool(nthreads, new
IoTThreadFactory(poolName));
+ public static ExecutorService newFixedThreadPool(int nThreads, String
poolName) {
+ logger.info("new fixed thread pool: {}, thread number: {}", poolName,
nThreads);
+
+ return new WrappedThreadPoolExecutor(
+ nThreads,
+ nThreads,
+ 0L,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(),
+ new IoTThreadFactory(poolName),
+ poolName);
}
public static ExecutorService newFixedThreadPool(
- int nthreads, String poolName, Thread.UncaughtExceptionHandler handler) {
- logger.info("new fixed thread pool: {}, thread number: {}", poolName,
nthreads);
- return Executors.newFixedThreadPool(nthreads, new
IoTThreadFactory(poolName, handler));
+ int nThreads, String poolName, Thread.UncaughtExceptionHandler handler) {
+ logger.info("new fixed thread pool: {}, thread number: {}", poolName,
nThreads);
+ return new WrappedThreadPoolExecutor(
+ nThreads,
+ nThreads,
+ 0L,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new IoTThreadFactory(poolName, handler),
+ poolName);
}
/**
@@ -61,13 +85,15 @@ public class IoTDBThreadPoolFactory {
*/
public static ExecutorService newSingleThreadExecutor(String poolName) {
logger.info("new single thread pool: {}", poolName);
- return Executors.newSingleThreadExecutor(new IoTThreadFactory(poolName));
+ return new WrappedSingleThreadExecutorService(
+ Executors.newSingleThreadExecutor(new IoTThreadFactory(poolName)),
poolName);
}
public static ExecutorService newSingleThreadExecutor(
String poolName, Thread.UncaughtExceptionHandler handler) {
logger.info("new single thread pool: {}", poolName);
- return Executors.newSingleThreadExecutor(new IoTThreadFactory(poolName,
handler));
+ return new WrappedSingleThreadExecutorService(
+ Executors.newSingleThreadExecutor(new IoTThreadFactory(poolName,
handler)), poolName);
}
/**
@@ -78,13 +104,27 @@ public class IoTDBThreadPoolFactory {
*/
public static ExecutorService newCachedThreadPool(String poolName) {
logger.info("new cached thread pool: {}", poolName);
- return Executors.newCachedThreadPool(new IoTThreadFactory(poolName));
+ return new WrappedThreadPoolExecutor(
+ 0,
+ Integer.MAX_VALUE,
+ 60L,
+ TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ new IoTThreadFactory(poolName),
+ poolName);
}
public static ExecutorService newCachedThreadPool(
String poolName, Thread.UncaughtExceptionHandler handler) {
logger.info("new cached thread pool: {}", poolName);
- return Executors.newCachedThreadPool(new IoTThreadFactory(poolName,
handler));
+ return new WrappedThreadPoolExecutor(
+ 0,
+ Integer.MAX_VALUE,
+ 60L,
+ TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ new IoTThreadFactory(poolName, handler),
+ poolName);
}
/**
@@ -94,12 +134,15 @@ public class IoTDBThreadPoolFactory {
* @return scheduled thread pool.
*/
public static ScheduledExecutorService
newSingleThreadScheduledExecutor(String poolName) {
- return Executors.newSingleThreadScheduledExecutor(new
IoTThreadFactory(poolName));
+ return new WrappedSingleThreadScheduledExecutor(
+ Executors.newSingleThreadScheduledExecutor(new
IoTThreadFactory(poolName)), poolName);
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor(
String poolName, Thread.UncaughtExceptionHandler handler) {
- return Executors.newSingleThreadScheduledExecutor(new
IoTThreadFactory(poolName, handler));
+ return new WrappedSingleThreadScheduledExecutor(
+ Executors.newSingleThreadScheduledExecutor(new
IoTThreadFactory(poolName, handler)),
+ poolName);
}
/**
@@ -110,25 +153,29 @@ public class IoTDBThreadPoolFactory {
* @return thread pool.
*/
public static ScheduledExecutorService newScheduledThreadPool(int
corePoolSize, String poolName) {
- return Executors.newScheduledThreadPool(corePoolSize, new
IoTThreadFactory(poolName));
+ return new WrappedScheduledExecutorService(
+ Executors.newScheduledThreadPool(corePoolSize, new
IoTThreadFactory(poolName)), poolName);
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, String poolName, Thread.UncaughtExceptionHandler
handler) {
- return Executors.newScheduledThreadPool(corePoolSize, new
IoTThreadFactory(poolName, handler));
+ return new WrappedScheduledExecutorService(
+ Executors.newScheduledThreadPool(corePoolSize, new
IoTThreadFactory(poolName, handler)),
+ poolName);
}
/** function for creating thrift rpc client thread pool. */
public static ExecutorService createThriftRpcClientThreadPool(
TThreadPoolServer.Args args, String poolName) {
SynchronousQueue<Runnable> executorQueue = new SynchronousQueue<>();
- return new ThreadPoolExecutor(
+ return new WrappedThreadPoolExecutor(
args.minWorkerThreads,
args.maxWorkerThreads,
args.stopTimeoutVal,
args.stopTimeoutUnit,
executorQueue,
- new IoTThreadFactory(poolName));
+ new IoTThreadFactory(poolName),
+ poolName);
}
/** function for creating thrift rpc client thread pool. */
@@ -139,25 +186,27 @@ public class IoTDBThreadPoolFactory {
TimeUnit stopTimeoutUnit,
String poolName) {
SynchronousQueue<Runnable> executorQueue = new SynchronousQueue<>();
- return new ThreadPoolExecutor(
+ return new WrappedThreadPoolExecutor(
minWorkerThreads,
maxWorkerThreads,
stopTimeoutVal,
stopTimeoutUnit,
executorQueue,
- new IoTThreadFactory(poolName));
+ new IoTThreadFactory(poolName),
+ poolName);
}
/** function for creating thrift rpc client thread pool. */
public static ExecutorService createThriftRpcClientThreadPool(
TThreadPoolServer.Args args, String poolName,
Thread.UncaughtExceptionHandler handler) {
SynchronousQueue<Runnable> executorQueue = new SynchronousQueue<>();
- return new ThreadPoolExecutor(
+ return new WrappedThreadPoolExecutor(
args.minWorkerThreads,
args.maxWorkerThreads,
args.stopTimeoutVal,
args.stopTimeoutUnit,
executorQueue,
- new IoTThreadFactory(poolName, handler));
+ new IoTThreadFactory(poolName, handler),
+ poolName);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/IThreadPoolMBean.java
b/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/IThreadPoolMBean.java
new file mode 100644
index 0000000..743b8fc
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/IThreadPoolMBean.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.concurrent.threadpool;
+
+import java.util.Queue;
+
+public interface IThreadPoolMBean {
+
+ int getCorePoolSize();
+
+ boolean prestartCoreThread();
+
+ int getMaximumPoolSize();
+
+ Queue<Runnable> getQueue();
+
+ int getQueueLength();
+
+ int getPoolSize();
+
+ int getActiveCount();
+
+ int getLargestPoolSize();
+
+ long getTaskCount();
+
+ long getCompletedTaskCount();
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedScheduledExecutorService.java
b/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedScheduledExecutorService.java
new file mode 100644
index 0000000..9653609
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedScheduledExecutorService.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.concurrent.threadpool;
+
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.service.JMXService;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WrappedScheduledExecutorService
+ implements ScheduledExecutorService, WrappedScheduledExecutorServiceMBean {
+ private final String mbeanName;
+ ScheduledExecutorService service;
+
+ public WrappedScheduledExecutorService(ScheduledExecutorService service,
String mbeanName) {
+ this.mbeanName =
+ String.format(
+ "%s:%s=%s", IoTDBConstant.IOTDB_THREADPOOL_PACKAGE,
IoTDBConstant.JMX_TYPE, mbeanName);
+ this.service = service;
+ JMXService.registerMBean(this, this.mbeanName);
+ }
+
+ @Override
+ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit
unit) {
+ return service.schedule(command, delay, unit);
+ }
+
+ @Override
+ public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay,
TimeUnit unit) {
+ return service.schedule(callable, delay, unit);
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleAtFixedRate(
+ Runnable command, long initialDelay, long period, TimeUnit unit) {
+ return service.scheduleAtFixedRate(command, initialDelay, period, unit);
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleWithFixedDelay(
+ Runnable command, long initialDelay, long delay, TimeUnit unit) {
+ return service.scheduleWithFixedDelay(command, initialDelay, delay, unit);
+ }
+
+ @Override
+ public void shutdown() {
+ service.shutdown();
+ JMXService.deregisterMBean(mbeanName);
+ }
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ JMXService.deregisterMBean(mbeanName);
+ return service.shutdownNow();
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return service.isShutdown();
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return service.isTerminated();
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws
InterruptedException {
+ return service.awaitTermination(timeout, unit);
+ }
+
+ @Override
+ public <T> Future<T> submit(Callable<T> task) {
+ return service.submit(task);
+ }
+
+ @Override
+ public <T> Future<T> submit(Runnable task, T result) {
+ return service.submit(task, result);
+ }
+
+ @Override
+ public Future<?> submit(Runnable task) {
+ return service.submit(task);
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+ throws InterruptedException {
+ return service.invokeAll(tasks);
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(
+ Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+ throws InterruptedException {
+ return service.invokeAll(tasks, timeout, unit);
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+ throws InterruptedException, ExecutionException {
+ return service.invokeAny(tasks);
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long
timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ return service.invokeAny(tasks, timeout, unit);
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ service.execute(command);
+ }
+
+ @Override
+ public int getCorePoolSize() {
+ return ((ThreadPoolExecutor) service).getCorePoolSize();
+ }
+
+ @Override
+ public boolean prestartCoreThread() {
+ return ((ThreadPoolExecutor) service).prestartCoreThread();
+ }
+
+ @Override
+ public int getMaximumPoolSize() {
+ return ((ThreadPoolExecutor) service).getMaximumPoolSize();
+ }
+
+ @Override
+ public Queue<Runnable> getQueue() {
+ return ((ThreadPoolExecutor) service).getQueue();
+ }
+
+ @Override
+ public int getPoolSize() {
+ return ((ThreadPoolExecutor) service).getPoolSize();
+ }
+
+ @Override
+ public int getActiveCount() {
+ return ((ThreadPoolExecutor) service).getActiveCount();
+ }
+
+ @Override
+ public int getLargestPoolSize() {
+ return ((ThreadPoolExecutor) service).getLargestPoolSize();
+ }
+
+ @Override
+ public long getTaskCount() {
+ return ((ThreadPoolExecutor) service).getTaskCount();
+ }
+
+ @Override
+ public long getCompletedTaskCount() {
+ return ((ThreadPoolExecutor) service).getCompletedTaskCount();
+ }
+
+ @Override
+ public int getQueueLength() {
+ return getQueue().size();
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedScheduledExecutorServiceMBean.java
b/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedScheduledExecutorServiceMBean.java
new file mode 100644
index 0000000..e3cc1ba
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedScheduledExecutorServiceMBean.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.concurrent.threadpool;
+
+public interface WrappedScheduledExecutorServiceMBean extends IThreadPoolMBean
{}
diff --git
a/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedSingleThreadExecutorService.java
b/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedSingleThreadExecutorService.java
new file mode 100644
index 0000000..47f3a91
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedSingleThreadExecutorService.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.concurrent.threadpool;
+
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.service.JMXService;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WrappedSingleThreadExecutorService
+ implements ExecutorService, WrappedSingleThreadExecutorServiceMBean {
+ private final String mbeanName;
+
+ ExecutorService service;
+
+ public WrappedSingleThreadExecutorService(ExecutorService service, String
mbeanName) {
+ this.service = service;
+ this.mbeanName =
+ String.format(
+ "%s:%s=%s", IoTDBConstant.IOTDB_THREADPOOL_PACKAGE,
IoTDBConstant.JMX_TYPE, mbeanName);
+ JMXService.registerMBean(this, this.mbeanName);
+ }
+
+ @Override
+ public void shutdown() {
+ service.shutdown();
+ JMXService.deregisterMBean(mbeanName);
+ }
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ JMXService.deregisterMBean(mbeanName);
+ return service.shutdownNow();
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return service.isShutdown();
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return service.isTerminated();
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws
InterruptedException {
+ return service.awaitTermination(timeout, unit);
+ }
+
+ @Override
+ public <T> Future<T> submit(Callable<T> task) {
+ return service.submit(task);
+ }
+
+ @Override
+ public <T> Future<T> submit(Runnable task, T result) {
+ return service.submit(task, result);
+ }
+
+ @Override
+ public Future<?> submit(Runnable task) {
+ return service.submit(task);
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+ throws InterruptedException {
+ return service.invokeAll(tasks);
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(
+ Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+ throws InterruptedException {
+ return service.invokeAll(tasks, timeout, unit);
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+ throws InterruptedException, ExecutionException {
+ return service.invokeAny(tasks);
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long
timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ return service.invokeAny(tasks, timeout, unit);
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ service.execute(command);
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedSingleThreadExecutorServiceMBean.java
b/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedSingleThreadExecutorServiceMBean.java
new file mode 100644
index 0000000..6bb3463
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedSingleThreadExecutorServiceMBean.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.concurrent.threadpool;
+
+public interface WrappedSingleThreadExecutorServiceMBean {}
diff --git
a/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedSingleThreadScheduledExecutor.java
b/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedSingleThreadScheduledExecutor.java
new file mode 100644
index 0000000..407c634
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedSingleThreadScheduledExecutor.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.concurrent.threadpool;
+
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.service.JMXService;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WrappedSingleThreadScheduledExecutor
+ implements ScheduledExecutorService,
WrappedSingleThreadScheduledExecutorMBean {
+ private final String mbeanName;
+ ScheduledExecutorService service;
+
+ public WrappedSingleThreadScheduledExecutor(ScheduledExecutorService
service, String mbeanName) {
+ this.service = service;
+ this.mbeanName =
+ String.format(
+ "%s:%s=%s", IoTDBConstant.IOTDB_THREADPOOL_PACKAGE,
IoTDBConstant.JMX_TYPE, mbeanName);
+ JMXService.registerMBean(this, this.mbeanName);
+ }
+
+ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit
unit) {
+ return service.schedule(command, delay, unit);
+ }
+
+ public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay,
TimeUnit unit) {
+ return service.schedule(callable, delay, unit);
+ }
+
+ public ScheduledFuture<?> scheduleAtFixedRate(
+ Runnable command, long initialDelay, long period, TimeUnit unit) {
+ return service.scheduleAtFixedRate(command, initialDelay, period, unit);
+ }
+
+ public ScheduledFuture<?> scheduleWithFixedDelay(
+ Runnable command, long initialDelay, long delay, TimeUnit unit) {
+ return service.scheduleWithFixedDelay(command, initialDelay, delay, unit);
+ }
+
+ public void shutdown() {
+ service.shutdown();
+ JMXService.deregisterMBean(mbeanName);
+ }
+
+ public List<Runnable> shutdownNow() {
+ JMXService.deregisterMBean(mbeanName);
+ return service.shutdownNow();
+ }
+
+ public boolean isShutdown() {
+ return service.isShutdown();
+ }
+
+ public boolean isTerminated() {
+ return service.isTerminated();
+ }
+
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws
InterruptedException {
+ return service.awaitTermination(timeout, unit);
+ }
+
+ public <T> Future<T> submit(Callable<T> task) {
+ return service.submit(task);
+ }
+
+ public <T> Future<T> submit(Runnable task, T result) {
+ return service.submit(task, result);
+ }
+
+ public Future<?> submit(Runnable task) {
+ return service.submit(task);
+ }
+
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+ throws InterruptedException {
+ return service.invokeAll(tasks);
+ }
+
+ public <T> List<Future<T>> invokeAll(
+ Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+ throws InterruptedException {
+ return service.invokeAll(tasks, timeout, unit);
+ }
+
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+ throws InterruptedException, ExecutionException {
+ return service.invokeAny(tasks);
+ }
+
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long
timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ return service.invokeAny(tasks, timeout, unit);
+ }
+
+ public void execute(Runnable command) {
+ service.execute(command);
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedSingleThreadScheduledExecutorMBean.java
b/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedSingleThreadScheduledExecutorMBean.java
new file mode 100644
index 0000000..a510974
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedSingleThreadScheduledExecutorMBean.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.concurrent.threadpool;
+
+public interface WrappedSingleThreadScheduledExecutorMBean {}
diff --git
a/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedThreadPoolExecutor.java
b/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedThreadPoolExecutor.java
new file mode 100644
index 0000000..2b71ab3
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedThreadPoolExecutor.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.concurrent.threadpool;
+
+import org.apache.iotdb.db.concurrent.IoTThreadFactory;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.service.JMXService;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+public class WrappedThreadPoolExecutor extends ThreadPoolExecutor
+ implements WrappedThreadPoolExecutorMBean {
+ private final String mbeanName;
+
+ public WrappedThreadPoolExecutor(
+ int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ BlockingQueue<Runnable> workQueue,
+ ThreadFactory threadFactory,
+ String mbeanName) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory);
+ this.mbeanName =
+ String.format(
+ "%s:%s=%s", IoTDBConstant.IOTDB_THREADPOOL_PACKAGE,
IoTDBConstant.JMX_TYPE, mbeanName);
+ JMXService.registerMBean(this, this.mbeanName);
+ }
+
+ public WrappedThreadPoolExecutor(
+ int minWorkerThreads,
+ int maxWorkerThreads,
+ int stopTimeoutVal,
+ TimeUnit stopTimeoutUnit,
+ SynchronousQueue<Runnable> executorQueue,
+ IoTThreadFactory ioTThreadFactory,
+ String mbeanName) {
+ super(
+ minWorkerThreads,
+ maxWorkerThreads,
+ stopTimeoutVal,
+ stopTimeoutUnit,
+ executorQueue,
+ ioTThreadFactory);
+ this.mbeanName =
+ String.format(
+ "%s:%s=%s", IoTDBConstant.IOTDB_THREADPOOL_PACKAGE,
IoTDBConstant.JMX_TYPE, mbeanName);
+ JMXService.registerMBean(this, this.mbeanName);
+ }
+
+ @Override
+ public void shutdown() {
+ super.shutdown();
+ JMXService.deregisterMBean(mbeanName);
+ }
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ JMXService.deregisterMBean(mbeanName);
+ return super.shutdownNow();
+ }
+
+ @Override
+ public int getQueueLength() {
+ return getQueue().size();
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedThreadPoolExecutorMBean.java
b/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedThreadPoolExecutorMBean.java
new file mode 100644
index 0000000..3427146
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedThreadPoolExecutorMBean.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.concurrent.threadpool;
+
+public interface WrappedThreadPoolExecutorMBean extends IThreadPoolMBean {}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
index ef68816..9e723b6 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
@@ -37,6 +37,7 @@ public class IoTDBConstant {
public static final String IOTDB_JMX_PORT = "iotdb.jmx.port";
public static final String IOTDB_PACKAGE = "org.apache.iotdb.service";
+ public static final String IOTDB_THREADPOOL_PACKAGE =
"org.apache.iotdb.threadpool";
public static final String JMX_TYPE = "type";
public static final long GB = 1024 * 1024 * 1024L;