This is an automated email from the ASF dual-hosted git repository. hxd pushed a commit to branch feature_add_flush_queue_jmx_interface in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 21bd10cd537a37b7099ea7f9d89d39e53b4e644d Author: xiangdong huang <[email protected]> AuthorDate: Sat Jul 27 15:15:40 2019 +0800 add jmx interface for getting the active and pending tasks in FlushManager --- .../apache/iotdb/db/engine/flush/FlushManager.java | 32 +++++++++++++++++++++- .../iotdb/db/engine/flush/FlushManagerMBean.java | 13 +++++++++ .../db/engine/flush/pool/AbstractPoolManager.java | 8 ++++++ .../engine/flush/pool/FlushSubTaskPoolManager.java | 1 - .../db/engine/flush/pool/FlushTaskPoolManager.java | 1 + .../org/apache/iotdb/db/service/ServiceType.java | 9 +++++- 6 files changed, 61 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java index ff5136f..46e86d6 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java @@ -19,16 +19,18 @@ package org.apache.iotdb.db.engine.flush; import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ThreadPoolExecutor; import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager; import org.apache.iotdb.db.engine.flush.pool.FlushTaskPoolManager; import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor; import org.apache.iotdb.db.exception.StartupException; import org.apache.iotdb.db.service.IService; +import org.apache.iotdb.db.service.JMXService; import org.apache.iotdb.db.service.ServiceType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class FlushManager implements IService { +public class FlushManager implements FlushManagerMBean, IService { private static final Logger logger = LoggerFactory.getLogger(FlushManager.class); @@ -40,6 +42,14 @@ public class FlushManager implements IService { public void start() throws StartupException { FlushSubTaskPoolManager.getInstance().start(); FlushTaskPoolManager.getInstance().start(); + try { + JMXService.registerMBean(this, ServiceType.FLUSH_SERVICE.getJmxName()); + } catch (Exception e) { + String errorMessage = String + .format("Failed to start %s because of %s", this.getID().getName(), + e.getMessage()); + throw new StartupException(errorMessage, e); + } } @Override @@ -53,6 +63,26 @@ public class FlushManager implements IService { return ServiceType.FLUSH_SERVICE; } + @Override + public int getNumberOfWorkingTasks() { + return flushPool.getNumberOfWorkingTasks(); + } + + @Override + public int getNumberOfPendingTasks() { + return flushPool.getNumberOfPendingTasks(); + } + + @Override + public int getNumberOfWorkingSubTasks() { + return FlushSubTaskPoolManager.getInstance().getNumberOfWorkingTasks(); + } + + @Override + public int getNumberOfPendingSubTasks() { + return FlushSubTaskPoolManager.getInstance().getNumberOfPendingTasks(); + } + class FlushThread implements Runnable { @Override diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManagerMBean.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManagerMBean.java new file mode 100644 index 0000000..7da3cc6 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManagerMBean.java @@ -0,0 +1,13 @@ +package org.apache.iotdb.db.engine.flush; + +public interface FlushManagerMBean { + + public int getNumberOfWorkingTasks(); + + public int getNumberOfPendingTasks(); + + public int getNumberOfWorkingSubTasks(); + + public int getNumberOfPendingSubTasks(); + +} diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/AbstractPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/AbstractPoolManager.java index d1a3dcc..8800bf9 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/AbstractPoolManager.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/AbstractPoolManager.java @@ -84,4 +84,12 @@ public abstract class AbstractPoolManager { public abstract void stop(); public abstract String getName(); + + public int getNumberOfWorkingTasks() { + return ((ThreadPoolExecutor)pool).getActiveCount(); + } + + public int getNumberOfPendingTasks() { + return ((ThreadPoolExecutor)pool).getQueue().size(); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushSubTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushSubTaskPoolManager.java index 94eff1b..2264d21 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushSubTaskPoolManager.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushSubTaskPoolManager.java @@ -21,7 +21,6 @@ package org.apache.iotdb.db.engine.flush.pool; import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.db.concurrent.ThreadName; -import org.apache.iotdb.db.service.IService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushTaskPoolManager.java index aec372c..0d02b2d 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushTaskPoolManager.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushTaskPoolManager.java @@ -54,6 +54,7 @@ public class FlushTaskPoolManager extends AbstractPoolManager { pool = IoTDBThreadPoolFactory .newFixedThreadPool(threadCnt, ThreadName.FLUSH_SERVICE.getName()); } + LOGGER.info("Flush task manager started."); } diff --git a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java index bff3e49..b2f245b 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java +++ b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java @@ -33,7 +33,9 @@ public enum ServiceType { SYNC_SERVICE("SYNC ServerService", ""), PERFORMANCE_STATISTIC_SERVICE("PERFORMANCE_STATISTIC_SERVICE","PERFORMANCE_STATISTIC_SERVICE"), TVLIST_ALLOCATOR_SERVICE("TVList Allocator", ""), - FLUSH_SERVICE("Flush ServerService", ""); + + FLUSH_SERVICE("Flush ServerService", + generateJmxName("org.apache.iotdb.db.engine.pool", "Flush Manager")); private String name; private String jmxName; @@ -50,4 +52,9 @@ public enum ServiceType { public String getJmxName() { return jmxName; } + + private static String generateJmxName(String packageName, String jmxName) { + return String + .format("%s:type=%s", packageName, jmxName); + } }
