This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch cluster-
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/cluster- by this push:
     new e0ac0c0  add threadpool implementataion
e0ac0c0 is described below

commit e0ac0c01710ee6f6f68dac29eb739eb1f38dd397
Author: xiangdong huang <[email protected]>
AuthorDate: Wed Aug 11 01:25:07 2021 +0800

    add threadpool implementataion
---
 .../db/concurrent/IoTDBThreadPoolFactory.java      |  93 +++++++---
 .../db/concurrent/threadpool/IThreadPoolMBean.java |  44 +++++
 .../WrappedScheduledExecutorService.java           | 192 +++++++++++++++++++++
 .../WrappedScheduledExecutorServiceMBean.java      |  21 +++
 .../WrappedSingleThreadExecutorService.java        | 118 +++++++++++++
 .../WrappedSingleThreadExecutorServiceMBean.java   |  21 +++
 .../WrappedSingleThreadScheduledExecutor.java      | 123 +++++++++++++
 .../WrappedSingleThreadScheduledExecutorMBean.java |  21 +++
 .../threadpool/WrappedThreadPoolExecutor.java      |  88 ++++++++++
 .../threadpool/WrappedThreadPoolExecutorMBean.java |  21 +++
 .../org/apache/iotdb/db/conf/IoTDBConstant.java    |   1 +
 11 files changed, 721 insertions(+), 22 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java
 
b/server/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java
index 7c75e89..8e4eed9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java
@@ -18,18 +18,27 @@
  */
 package org.apache.iotdb.db.concurrent;
 
+import 
org.apache.iotdb.db.concurrent.threadpool.WrappedScheduledExecutorService;
+import 
org.apache.iotdb.db.concurrent.threadpool.WrappedSingleThreadExecutorService;
+import 
org.apache.iotdb.db.concurrent.threadpool.WrappedSingleThreadScheduledExecutor;
+import org.apache.iotdb.db.concurrent.threadpool.WrappedThreadPoolExecutor;
+
 import org.apache.thrift.server.TThreadPoolServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
-/** This class is used to create thread pool which must contain the pool name. 
*/
+/**
+ * This class is used to create thread pool which must contain the pool name. 
Notice that IoTDB
+ * project does not allow creating ThreadPool using Executors.newXXX() 
function to get a threadpool
+ * because it is hard to be traced.
+ */
 public class IoTDBThreadPoolFactory {
 
   private static final Logger logger = 
LoggerFactory.getLogger(IoTDBThreadPoolFactory.class);
@@ -42,15 +51,30 @@ public class IoTDBThreadPoolFactory {
    * @param poolName - the name of thread pool
    * @return fixed size thread pool
    */
-  public static ExecutorService newFixedThreadPool(int nthreads, String 
poolName) {
-    logger.info("new fixed thread pool: {}, thread number: {}", poolName, 
nthreads);
-    return Executors.newFixedThreadPool(nthreads, new 
IoTThreadFactory(poolName));
+  public static ExecutorService newFixedThreadPool(int nThreads, String 
poolName) {
+    logger.info("new fixed thread pool: {}, thread number: {}", poolName, 
nThreads);
+
+    return new WrappedThreadPoolExecutor(
+        nThreads,
+        nThreads,
+        0L,
+        TimeUnit.MILLISECONDS,
+        new LinkedBlockingQueue<>(),
+        new IoTThreadFactory(poolName),
+        poolName);
   }
 
   public static ExecutorService newFixedThreadPool(
-      int nthreads, String poolName, Thread.UncaughtExceptionHandler handler) {
-    logger.info("new fixed thread pool: {}, thread number: {}", poolName, 
nthreads);
-    return Executors.newFixedThreadPool(nthreads, new 
IoTThreadFactory(poolName, handler));
+      int nThreads, String poolName, Thread.UncaughtExceptionHandler handler) {
+    logger.info("new fixed thread pool: {}, thread number: {}", poolName, 
nThreads);
+    return new WrappedThreadPoolExecutor(
+        nThreads,
+        nThreads,
+        0L,
+        TimeUnit.MILLISECONDS,
+        new LinkedBlockingQueue<Runnable>(),
+        new IoTThreadFactory(poolName, handler),
+        poolName);
   }
 
   /**
@@ -61,13 +85,15 @@ public class IoTDBThreadPoolFactory {
    */
   public static ExecutorService newSingleThreadExecutor(String poolName) {
     logger.info("new single thread pool: {}", poolName);
-    return Executors.newSingleThreadExecutor(new IoTThreadFactory(poolName));
+    return new WrappedSingleThreadExecutorService(
+        Executors.newSingleThreadExecutor(new IoTThreadFactory(poolName)), 
poolName);
   }
 
   public static ExecutorService newSingleThreadExecutor(
       String poolName, Thread.UncaughtExceptionHandler handler) {
     logger.info("new single thread pool: {}", poolName);
-    return Executors.newSingleThreadExecutor(new IoTThreadFactory(poolName, 
handler));
+    return new WrappedSingleThreadExecutorService(
+        Executors.newSingleThreadExecutor(new IoTThreadFactory(poolName, 
handler)), poolName);
   }
 
   /**
@@ -78,13 +104,27 @@ public class IoTDBThreadPoolFactory {
    */
   public static ExecutorService newCachedThreadPool(String poolName) {
     logger.info("new cached thread pool: {}", poolName);
-    return Executors.newCachedThreadPool(new IoTThreadFactory(poolName));
+    return new WrappedThreadPoolExecutor(
+        0,
+        Integer.MAX_VALUE,
+        60L,
+        TimeUnit.SECONDS,
+        new SynchronousQueue<>(),
+        new IoTThreadFactory(poolName),
+        poolName);
   }
 
   public static ExecutorService newCachedThreadPool(
       String poolName, Thread.UncaughtExceptionHandler handler) {
     logger.info("new cached thread pool: {}", poolName);
-    return Executors.newCachedThreadPool(new IoTThreadFactory(poolName, 
handler));
+    return new WrappedThreadPoolExecutor(
+        0,
+        Integer.MAX_VALUE,
+        60L,
+        TimeUnit.SECONDS,
+        new SynchronousQueue<>(),
+        new IoTThreadFactory(poolName, handler),
+        poolName);
   }
 
   /**
@@ -94,12 +134,15 @@ public class IoTDBThreadPoolFactory {
    * @return scheduled thread pool.
    */
   public static ScheduledExecutorService 
newSingleThreadScheduledExecutor(String poolName) {
-    return Executors.newSingleThreadScheduledExecutor(new 
IoTThreadFactory(poolName));
+    return new WrappedSingleThreadScheduledExecutor(
+        Executors.newSingleThreadScheduledExecutor(new 
IoTThreadFactory(poolName)), poolName);
   }
 
   public static ScheduledExecutorService newSingleThreadScheduledExecutor(
       String poolName, Thread.UncaughtExceptionHandler handler) {
-    return Executors.newSingleThreadScheduledExecutor(new 
IoTThreadFactory(poolName, handler));
+    return new WrappedSingleThreadScheduledExecutor(
+        Executors.newSingleThreadScheduledExecutor(new 
IoTThreadFactory(poolName, handler)),
+        poolName);
   }
 
   /**
@@ -110,25 +153,29 @@ public class IoTDBThreadPoolFactory {
    * @return thread pool.
    */
   public static ScheduledExecutorService newScheduledThreadPool(int 
corePoolSize, String poolName) {
-    return Executors.newScheduledThreadPool(corePoolSize, new 
IoTThreadFactory(poolName));
+    return new WrappedScheduledExecutorService(
+        Executors.newScheduledThreadPool(corePoolSize, new 
IoTThreadFactory(poolName)), poolName);
   }
 
   public static ScheduledExecutorService newScheduledThreadPool(
       int corePoolSize, String poolName, Thread.UncaughtExceptionHandler 
handler) {
-    return Executors.newScheduledThreadPool(corePoolSize, new 
IoTThreadFactory(poolName, handler));
+    return new WrappedScheduledExecutorService(
+        Executors.newScheduledThreadPool(corePoolSize, new 
IoTThreadFactory(poolName, handler)),
+        poolName);
   }
 
   /** function for creating thrift rpc client thread pool. */
   public static ExecutorService createThriftRpcClientThreadPool(
       TThreadPoolServer.Args args, String poolName) {
     SynchronousQueue<Runnable> executorQueue = new SynchronousQueue<>();
-    return new ThreadPoolExecutor(
+    return new WrappedThreadPoolExecutor(
         args.minWorkerThreads,
         args.maxWorkerThreads,
         args.stopTimeoutVal,
         args.stopTimeoutUnit,
         executorQueue,
-        new IoTThreadFactory(poolName));
+        new IoTThreadFactory(poolName),
+        poolName);
   }
 
   /** function for creating thrift rpc client thread pool. */
@@ -139,25 +186,27 @@ public class IoTDBThreadPoolFactory {
       TimeUnit stopTimeoutUnit,
       String poolName) {
     SynchronousQueue<Runnable> executorQueue = new SynchronousQueue<>();
-    return new ThreadPoolExecutor(
+    return new WrappedThreadPoolExecutor(
         minWorkerThreads,
         maxWorkerThreads,
         stopTimeoutVal,
         stopTimeoutUnit,
         executorQueue,
-        new IoTThreadFactory(poolName));
+        new IoTThreadFactory(poolName),
+        poolName);
   }
 
   /** function for creating thrift rpc client thread pool. */
   public static ExecutorService createThriftRpcClientThreadPool(
       TThreadPoolServer.Args args, String poolName, 
Thread.UncaughtExceptionHandler handler) {
     SynchronousQueue<Runnable> executorQueue = new SynchronousQueue<>();
-    return new ThreadPoolExecutor(
+    return new WrappedThreadPoolExecutor(
         args.minWorkerThreads,
         args.maxWorkerThreads,
         args.stopTimeoutVal,
         args.stopTimeoutUnit,
         executorQueue,
-        new IoTThreadFactory(poolName, handler));
+        new IoTThreadFactory(poolName, handler),
+        poolName);
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/IThreadPoolMBean.java
 
b/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/IThreadPoolMBean.java
new file mode 100644
index 0000000..743b8fc
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/IThreadPoolMBean.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.concurrent.threadpool;
+
+import java.util.Queue;
+
+public interface IThreadPoolMBean {
+
+  int getCorePoolSize();
+
+  boolean prestartCoreThread();
+
+  int getMaximumPoolSize();
+
+  Queue<Runnable> getQueue();
+
+  int getQueueLength();
+
+  int getPoolSize();
+
+  int getActiveCount();
+
+  int getLargestPoolSize();
+
+  long getTaskCount();
+
+  long getCompletedTaskCount();
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedScheduledExecutorService.java
 
b/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedScheduledExecutorService.java
new file mode 100644
index 0000000..9653609
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedScheduledExecutorService.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.concurrent.threadpool;
+
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.service.JMXService;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WrappedScheduledExecutorService
+    implements ScheduledExecutorService, WrappedScheduledExecutorServiceMBean {
+  private final String mbeanName;
+  ScheduledExecutorService service;
+
+  public WrappedScheduledExecutorService(ScheduledExecutorService service, 
String mbeanName) {
+    this.mbeanName =
+        String.format(
+            "%s:%s=%s", IoTDBConstant.IOTDB_THREADPOOL_PACKAGE, 
IoTDBConstant.JMX_TYPE, mbeanName);
+    this.service = service;
+    JMXService.registerMBean(this, this.mbeanName);
+  }
+
+  @Override
+  public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit 
unit) {
+    return service.schedule(command, delay, unit);
+  }
+
+  @Override
+  public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, 
TimeUnit unit) {
+    return service.schedule(callable, delay, unit);
+  }
+
+  @Override
+  public ScheduledFuture<?> scheduleAtFixedRate(
+      Runnable command, long initialDelay, long period, TimeUnit unit) {
+    return service.scheduleAtFixedRate(command, initialDelay, period, unit);
+  }
+
+  @Override
+  public ScheduledFuture<?> scheduleWithFixedDelay(
+      Runnable command, long initialDelay, long delay, TimeUnit unit) {
+    return service.scheduleWithFixedDelay(command, initialDelay, delay, unit);
+  }
+
+  @Override
+  public void shutdown() {
+    service.shutdown();
+    JMXService.deregisterMBean(mbeanName);
+  }
+
+  @Override
+  public List<Runnable> shutdownNow() {
+    JMXService.deregisterMBean(mbeanName);
+    return service.shutdownNow();
+  }
+
+  @Override
+  public boolean isShutdown() {
+    return service.isShutdown();
+  }
+
+  @Override
+  public boolean isTerminated() {
+    return service.isTerminated();
+  }
+
+  @Override
+  public boolean awaitTermination(long timeout, TimeUnit unit) throws 
InterruptedException {
+    return service.awaitTermination(timeout, unit);
+  }
+
+  @Override
+  public <T> Future<T> submit(Callable<T> task) {
+    return service.submit(task);
+  }
+
+  @Override
+  public <T> Future<T> submit(Runnable task, T result) {
+    return service.submit(task, result);
+  }
+
+  @Override
+  public Future<?> submit(Runnable task) {
+    return service.submit(task);
+  }
+
+  @Override
+  public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+      throws InterruptedException {
+    return service.invokeAll(tasks);
+  }
+
+  @Override
+  public <T> List<Future<T>> invokeAll(
+      Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+      throws InterruptedException {
+    return service.invokeAll(tasks, timeout, unit);
+  }
+
+  @Override
+  public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+      throws InterruptedException, ExecutionException {
+    return service.invokeAny(tasks);
+  }
+
+  @Override
+  public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long 
timeout, TimeUnit unit)
+      throws InterruptedException, ExecutionException, TimeoutException {
+    return service.invokeAny(tasks, timeout, unit);
+  }
+
+  @Override
+  public void execute(Runnable command) {
+    service.execute(command);
+  }
+
+  @Override
+  public int getCorePoolSize() {
+    return ((ThreadPoolExecutor) service).getCorePoolSize();
+  }
+
+  @Override
+  public boolean prestartCoreThread() {
+    return ((ThreadPoolExecutor) service).prestartCoreThread();
+  }
+
+  @Override
+  public int getMaximumPoolSize() {
+    return ((ThreadPoolExecutor) service).getMaximumPoolSize();
+  }
+
+  @Override
+  public Queue<Runnable> getQueue() {
+    return ((ThreadPoolExecutor) service).getQueue();
+  }
+
+  @Override
+  public int getPoolSize() {
+    return ((ThreadPoolExecutor) service).getPoolSize();
+  }
+
+  @Override
+  public int getActiveCount() {
+    return ((ThreadPoolExecutor) service).getActiveCount();
+  }
+
+  @Override
+  public int getLargestPoolSize() {
+    return ((ThreadPoolExecutor) service).getLargestPoolSize();
+  }
+
+  @Override
+  public long getTaskCount() {
+    return ((ThreadPoolExecutor) service).getTaskCount();
+  }
+
+  @Override
+  public long getCompletedTaskCount() {
+    return ((ThreadPoolExecutor) service).getCompletedTaskCount();
+  }
+
+  @Override
+  public int getQueueLength() {
+    return getQueue().size();
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedScheduledExecutorServiceMBean.java
 
b/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedScheduledExecutorServiceMBean.java
new file mode 100644
index 0000000..e3cc1ba
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedScheduledExecutorServiceMBean.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.concurrent.threadpool;
+
+public interface WrappedScheduledExecutorServiceMBean extends IThreadPoolMBean 
{}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedSingleThreadExecutorService.java
 
b/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedSingleThreadExecutorService.java
new file mode 100644
index 0000000..47f3a91
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedSingleThreadExecutorService.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.concurrent.threadpool;
+
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.service.JMXService;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WrappedSingleThreadExecutorService
+    implements ExecutorService, WrappedSingleThreadExecutorServiceMBean {
+  private final String mbeanName;
+
+  ExecutorService service;
+
+  public WrappedSingleThreadExecutorService(ExecutorService service, String 
mbeanName) {
+    this.service = service;
+    this.mbeanName =
+        String.format(
+            "%s:%s=%s", IoTDBConstant.IOTDB_THREADPOOL_PACKAGE, 
IoTDBConstant.JMX_TYPE, mbeanName);
+    JMXService.registerMBean(this, this.mbeanName);
+  }
+
+  @Override
+  public void shutdown() {
+    service.shutdown();
+    JMXService.deregisterMBean(mbeanName);
+  }
+
+  @Override
+  public List<Runnable> shutdownNow() {
+    JMXService.deregisterMBean(mbeanName);
+    return service.shutdownNow();
+  }
+
+  @Override
+  public boolean isShutdown() {
+    return service.isShutdown();
+  }
+
+  @Override
+  public boolean isTerminated() {
+    return service.isTerminated();
+  }
+
+  @Override
+  public boolean awaitTermination(long timeout, TimeUnit unit) throws 
InterruptedException {
+    return service.awaitTermination(timeout, unit);
+  }
+
+  @Override
+  public <T> Future<T> submit(Callable<T> task) {
+    return service.submit(task);
+  }
+
+  @Override
+  public <T> Future<T> submit(Runnable task, T result) {
+    return service.submit(task, result);
+  }
+
+  @Override
+  public Future<?> submit(Runnable task) {
+    return service.submit(task);
+  }
+
+  @Override
+  public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+      throws InterruptedException {
+    return service.invokeAll(tasks);
+  }
+
+  @Override
+  public <T> List<Future<T>> invokeAll(
+      Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+      throws InterruptedException {
+    return service.invokeAll(tasks, timeout, unit);
+  }
+
+  @Override
+  public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+      throws InterruptedException, ExecutionException {
+    return service.invokeAny(tasks);
+  }
+
+  @Override
+  public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long 
timeout, TimeUnit unit)
+      throws InterruptedException, ExecutionException, TimeoutException {
+    return service.invokeAny(tasks, timeout, unit);
+  }
+
+  @Override
+  public void execute(Runnable command) {
+    service.execute(command);
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedSingleThreadExecutorServiceMBean.java
 
b/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedSingleThreadExecutorServiceMBean.java
new file mode 100644
index 0000000..6bb3463
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedSingleThreadExecutorServiceMBean.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.concurrent.threadpool;
+
+public interface WrappedSingleThreadExecutorServiceMBean {}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedSingleThreadScheduledExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedSingleThreadScheduledExecutor.java
new file mode 100644
index 0000000..407c634
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedSingleThreadScheduledExecutor.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.concurrent.threadpool;
+
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.service.JMXService;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WrappedSingleThreadScheduledExecutor
+    implements ScheduledExecutorService, 
WrappedSingleThreadScheduledExecutorMBean {
+  private final String mbeanName;
+  ScheduledExecutorService service;
+
+  public WrappedSingleThreadScheduledExecutor(ScheduledExecutorService 
service, String mbeanName) {
+    this.service = service;
+    this.mbeanName =
+        String.format(
+            "%s:%s=%s", IoTDBConstant.IOTDB_THREADPOOL_PACKAGE, 
IoTDBConstant.JMX_TYPE, mbeanName);
+    JMXService.registerMBean(this, this.mbeanName);
+  }
+
+  public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit 
unit) {
+    return service.schedule(command, delay, unit);
+  }
+
+  public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, 
TimeUnit unit) {
+    return service.schedule(callable, delay, unit);
+  }
+
+  public ScheduledFuture<?> scheduleAtFixedRate(
+      Runnable command, long initialDelay, long period, TimeUnit unit) {
+    return service.scheduleAtFixedRate(command, initialDelay, period, unit);
+  }
+
+  public ScheduledFuture<?> scheduleWithFixedDelay(
+      Runnable command, long initialDelay, long delay, TimeUnit unit) {
+    return service.scheduleWithFixedDelay(command, initialDelay, delay, unit);
+  }
+
+  public void shutdown() {
+    service.shutdown();
+    JMXService.deregisterMBean(mbeanName);
+  }
+
+  public List<Runnable> shutdownNow() {
+    JMXService.deregisterMBean(mbeanName);
+    return service.shutdownNow();
+  }
+
+  public boolean isShutdown() {
+    return service.isShutdown();
+  }
+
+  public boolean isTerminated() {
+    return service.isTerminated();
+  }
+
+  public boolean awaitTermination(long timeout, TimeUnit unit) throws 
InterruptedException {
+    return service.awaitTermination(timeout, unit);
+  }
+
+  public <T> Future<T> submit(Callable<T> task) {
+    return service.submit(task);
+  }
+
+  public <T> Future<T> submit(Runnable task, T result) {
+    return service.submit(task, result);
+  }
+
+  public Future<?> submit(Runnable task) {
+    return service.submit(task);
+  }
+
+  public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+      throws InterruptedException {
+    return service.invokeAll(tasks);
+  }
+
+  public <T> List<Future<T>> invokeAll(
+      Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+      throws InterruptedException {
+    return service.invokeAll(tasks, timeout, unit);
+  }
+
+  public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+      throws InterruptedException, ExecutionException {
+    return service.invokeAny(tasks);
+  }
+
+  public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long 
timeout, TimeUnit unit)
+      throws InterruptedException, ExecutionException, TimeoutException {
+    return service.invokeAny(tasks, timeout, unit);
+  }
+
+  public void execute(Runnable command) {
+    service.execute(command);
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedSingleThreadScheduledExecutorMBean.java
 
b/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedSingleThreadScheduledExecutorMBean.java
new file mode 100644
index 0000000..a510974
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedSingleThreadScheduledExecutorMBean.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.concurrent.threadpool;
+
+public interface WrappedSingleThreadScheduledExecutorMBean {}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedThreadPoolExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedThreadPoolExecutor.java
new file mode 100644
index 0000000..2b71ab3
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedThreadPoolExecutor.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.concurrent.threadpool;
+
+import org.apache.iotdb.db.concurrent.IoTThreadFactory;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.service.JMXService;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+public class WrappedThreadPoolExecutor extends ThreadPoolExecutor
+    implements WrappedThreadPoolExecutorMBean {
+  private final String mbeanName;
+
+  public WrappedThreadPoolExecutor(
+      int corePoolSize,
+      int maximumPoolSize,
+      long keepAliveTime,
+      TimeUnit unit,
+      BlockingQueue<Runnable> workQueue,
+      ThreadFactory threadFactory,
+      String mbeanName) {
+    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 
threadFactory);
+    this.mbeanName =
+        String.format(
+            "%s:%s=%s", IoTDBConstant.IOTDB_THREADPOOL_PACKAGE, 
IoTDBConstant.JMX_TYPE, mbeanName);
+    JMXService.registerMBean(this, this.mbeanName);
+  }
+
+  public WrappedThreadPoolExecutor(
+      int minWorkerThreads,
+      int maxWorkerThreads,
+      int stopTimeoutVal,
+      TimeUnit stopTimeoutUnit,
+      SynchronousQueue<Runnable> executorQueue,
+      IoTThreadFactory ioTThreadFactory,
+      String mbeanName) {
+    super(
+        minWorkerThreads,
+        maxWorkerThreads,
+        stopTimeoutVal,
+        stopTimeoutUnit,
+        executorQueue,
+        ioTThreadFactory);
+    this.mbeanName =
+        String.format(
+            "%s:%s=%s", IoTDBConstant.IOTDB_THREADPOOL_PACKAGE, 
IoTDBConstant.JMX_TYPE, mbeanName);
+    JMXService.registerMBean(this, this.mbeanName);
+  }
+
+  @Override
+  public void shutdown() {
+    super.shutdown();
+    JMXService.deregisterMBean(mbeanName);
+  }
+
+  @Override
+  public List<Runnable> shutdownNow() {
+    JMXService.deregisterMBean(mbeanName);
+    return super.shutdownNow();
+  }
+
+  @Override
+  public int getQueueLength() {
+    return getQueue().size();
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedThreadPoolExecutorMBean.java
 
b/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedThreadPoolExecutorMBean.java
new file mode 100644
index 0000000..3427146
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedThreadPoolExecutorMBean.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.concurrent.threadpool;
+
+public interface WrappedThreadPoolExecutorMBean extends IThreadPoolMBean {}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
index ef68816..9e723b6 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
@@ -37,6 +37,7 @@ public class IoTDBConstant {
   public static final String IOTDB_JMX_PORT = "iotdb.jmx.port";
 
   public static final String IOTDB_PACKAGE = "org.apache.iotdb.service";
+  public static final String IOTDB_THREADPOOL_PACKAGE = 
"org.apache.iotdb.threadpool";
   public static final String JMX_TYPE = "type";
 
   public static final long GB = 1024 * 1024 * 1024L;

Reply via email to