This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch feature_add_flush_queue_jmx_interface
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 21bd10cd537a37b7099ea7f9d89d39e53b4e644d
Author: xiangdong huang <[email protected]>
AuthorDate: Sat Jul 27 15:15:40 2019 +0800

    add jmx interface for getting the active and pending tasks in FlushManager
---
 .../apache/iotdb/db/engine/flush/FlushManager.java | 32 +++++++++++++++++++++-
 .../iotdb/db/engine/flush/FlushManagerMBean.java   | 13 +++++++++
 .../db/engine/flush/pool/AbstractPoolManager.java  |  8 ++++++
 .../engine/flush/pool/FlushSubTaskPoolManager.java |  1 -
 .../db/engine/flush/pool/FlushTaskPoolManager.java |  1 +
 .../org/apache/iotdb/db/service/ServiceType.java   |  9 +++++-
 6 files changed, 61 insertions(+), 3 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java 
b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
index ff5136f..46e86d6 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
@@ -19,16 +19,18 @@
 package org.apache.iotdb.db.engine.flush;
 
 import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ThreadPoolExecutor;
 import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager;
 import org.apache.iotdb.db.engine.flush.pool.FlushTaskPoolManager;
 import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
 import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.service.IService;
+import org.apache.iotdb.db.service.JMXService;
 import org.apache.iotdb.db.service.ServiceType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class FlushManager implements IService {
+public class FlushManager implements FlushManagerMBean, IService {
 
   private static final Logger logger = 
LoggerFactory.getLogger(FlushManager.class);
 
@@ -40,6 +42,14 @@ public class FlushManager implements IService {
   public void start() throws StartupException {
     FlushSubTaskPoolManager.getInstance().start();
     FlushTaskPoolManager.getInstance().start();
+    try {
+      JMXService.registerMBean(this, ServiceType.FLUSH_SERVICE.getJmxName());
+    } catch (Exception e) {
+      String errorMessage = String
+          .format("Failed to start %s because of %s", this.getID().getName(),
+              e.getMessage());
+      throw new StartupException(errorMessage, e);
+    }
   }
 
   @Override
@@ -53,6 +63,26 @@ public class FlushManager implements IService {
     return ServiceType.FLUSH_SERVICE;
   }
 
+  @Override
+  public int getNumberOfWorkingTasks() {
+    return flushPool.getNumberOfWorkingTasks();
+  }
+
+  @Override
+  public int getNumberOfPendingTasks() {
+    return flushPool.getNumberOfPendingTasks();
+  }
+
+  @Override
+  public int getNumberOfWorkingSubTasks() {
+    return FlushSubTaskPoolManager.getInstance().getNumberOfWorkingTasks();
+  }
+
+  @Override
+  public int getNumberOfPendingSubTasks() {
+    return FlushSubTaskPoolManager.getInstance().getNumberOfPendingTasks();
+  }
+
   class FlushThread implements Runnable {
 
     @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManagerMBean.java 
b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManagerMBean.java
new file mode 100644
index 0000000..7da3cc6
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManagerMBean.java
@@ -0,0 +1,13 @@
+package org.apache.iotdb.db.engine.flush;
+
+public interface FlushManagerMBean {
+
+  public int getNumberOfWorkingTasks();
+
+  public int getNumberOfPendingTasks();
+
+  public int getNumberOfWorkingSubTasks();
+
+  public int getNumberOfPendingSubTasks();
+
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/AbstractPoolManager.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/AbstractPoolManager.java
index d1a3dcc..8800bf9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/AbstractPoolManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/AbstractPoolManager.java
@@ -84,4 +84,12 @@ public abstract class AbstractPoolManager {
   public abstract void stop();
 
   public abstract String getName();
+
+  public int getNumberOfWorkingTasks() {
+    return ((ThreadPoolExecutor)pool).getActiveCount();
+  }
+
+  public int getNumberOfPendingTasks() {
+    return ((ThreadPoolExecutor)pool).getQueue().size();
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushSubTaskPoolManager.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushSubTaskPoolManager.java
index 94eff1b..2264d21 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushSubTaskPoolManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushSubTaskPoolManager.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.engine.flush.pool;
 
 import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.concurrent.ThreadName;
-import org.apache.iotdb.db.service.IService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushTaskPoolManager.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushTaskPoolManager.java
index aec372c..0d02b2d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushTaskPoolManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushTaskPoolManager.java
@@ -54,6 +54,7 @@ public class FlushTaskPoolManager extends AbstractPoolManager 
{
       pool = IoTDBThreadPoolFactory
           .newFixedThreadPool(threadCnt, ThreadName.FLUSH_SERVICE.getName());
     }
+
     LOGGER.info("Flush task manager started.");
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java 
b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
index bff3e49..b2f245b 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
@@ -33,7 +33,9 @@ public enum ServiceType {
   SYNC_SERVICE("SYNC ServerService", ""),
   
PERFORMANCE_STATISTIC_SERVICE("PERFORMANCE_STATISTIC_SERVICE","PERFORMANCE_STATISTIC_SERVICE"),
   TVLIST_ALLOCATOR_SERVICE("TVList Allocator", ""),
-  FLUSH_SERVICE("Flush ServerService", "");
+
+  FLUSH_SERVICE("Flush ServerService",
+      generateJmxName("org.apache.iotdb.db.engine.pool", "Flush Manager"));
 
   private String name;
   private String jmxName;
@@ -50,4 +52,9 @@ public enum ServiceType {
   public String getJmxName() {
     return jmxName;
   }
+
+  private static String generateJmxName(String packageName,  String jmxName) {
+    return String
+        .format("%s:type=%s", packageName, jmxName);
+  }
 }

Reply via email to