This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch enhance_merge_management
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit efa77dd52f0f073dfc47c8af8d4709e828d1a12c
Author: jt2594838 <[email protected]>
AuthorDate: Tue Jun 9 19:24:23 2020 +0800

    enhance merge task management
---
 docs/UserGuide/Operation Manual/SQL Reference.md   |   8 +
 .../org/apache/iotdb/db/qp/strategy/SqlBase.g4     |   5 +
 .../org/apache/iotdb/db/conf/IoTDBConstant.java    |   6 +
 .../iotdb/db/engine/merge/manage/MergeFuture.java  | 153 +++++++++++++++
 .../iotdb/db/engine/merge/manage/MergeManager.java | 193 +++++++++++++++++-
 .../db/engine/merge/manage/MergeManagerMBean.java  |  26 +++
 .../db/engine/merge/manage/MergeThreadPool.java    |  48 +++++
 .../iotdb/db/engine/merge/task/MergeFileTask.java  | 107 +++++++---
 .../db/engine/merge/task/MergeMultiChunkTask.java  | 216 ++++++++++++++-------
 .../iotdb/db/engine/merge/task/MergeTask.java      |  77 +++++++-
 .../db/engine/merge/task/RecoverMergeTask.java     |  13 +-
 .../main/java/org/apache/iotdb/db/qp/Planner.java  |   2 +
 .../apache/iotdb/db/qp/constant/SQLConstant.java   |   4 +
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |  49 +++++
 .../org/apache/iotdb/db/qp/logical/Operator.java   |   3 +-
 .../db/qp/logical/sys/ShowMergeStatusOperator.java |  30 +++
 .../db/qp/physical/sys/ShowMergeStatusPlan.java    |  27 +++
 .../apache/iotdb/db/qp/physical/sys/ShowPlan.java  |   2 +-
 .../iotdb/db/qp/strategy/LogicalGenerator.java     |   8 +
 .../iotdb/db/qp/strategy/PhysicalGenerator.java    |   3 +
 .../org/apache/iotdb/db/service/RPCService.java    |   4 +-
 .../org/apache/iotdb/db/service/ServiceType.java   |   2 +-
 .../org/apache/iotdb/db/service/StaticResps.java   |  12 ++
 .../org/apache/iotdb/db/service/TSServiceImpl.java |   2 +
 .../iotdb/db/engine/merge/MergeManagerTest.java    | 168 ++++++++++++++++
 .../iotdb/db/integration/IoTDBMergeTest.java       |  47 ++++-
 26 files changed, 1092 insertions(+), 123 deletions(-)

diff --git a/docs/UserGuide/Operation Manual/SQL Reference.md 
b/docs/UserGuide/Operation Manual/SQL Reference.md
index 3e15816..d23b36b 100644
--- a/docs/UserGuide/Operation Manual/SQL Reference.md  
+++ b/docs/UserGuide/Operation Manual/SQL Reference.md  
@@ -208,6 +208,14 @@ Eg: IoTDB > SHOW STORAGE GROUP
 Note: This statement can be used in IoTDB Client and JDBC.
 ```
 
+* Show Merge Status Statement
+
+```
+SHOW MERGE STATUS
+Eg: IoTDB > SHOW MERGE STATUS
+Note: This statement can be used in IoTDB Client and JDBC.
+```
+
 * Count Timeseries Statement
 
 ```
diff --git a/server/src/main/antlr4/org/apache/iotdb/db/qp/strategy/SqlBase.g4 
b/server/src/main/antlr4/org/apache/iotdb/db/qp/strategy/SqlBase.g4
index 7d5b9f2..c464446 100644
--- a/server/src/main/antlr4/org/apache/iotdb/db/qp/strategy/SqlBase.g4
+++ b/server/src/main/antlr4/org/apache/iotdb/db/qp/strategy/SqlBase.g4
@@ -73,6 +73,7 @@ statement
     | SHOW STORAGE GROUP #showStorageGroup
     | SHOW CHILD PATHS prefixPath? #showChildPaths
     | SHOW DEVICES prefixPath? #showDevices
+    | SHOW MERGE STATUS #showMergeStatus
     | COUNT TIMESERIES prefixPath? (GROUP BY LEVEL OPERATOR_EQ INT)? 
#countTimeseries
     | COUNT NODES prefixPath LEVEL OPERATOR_EQ INT #countNodes
     | LOAD CONFIGURATION (MINUS GLOBAL)? #loadConfigurationStatement
@@ -876,6 +877,10 @@ TRUE
 FALSE
     : F A L S E
     ;
+
+STATUS
+    : S T A T U S
+    ;
 //============================
 // End of the keywords list
 //============================
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
index 58e3154..836afbd 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
@@ -73,6 +73,12 @@ public class IoTDBConstant {
   public static final String COLUMN_STORAGE_GROUP = "storage group";
   public static final String COLUMN_TTL = "ttl";
 
+  public static final String COLUMN_TASK_NAME = "task name";
+  public static final String COLUMN_CREATED_TIME = "created time";
+  public static final String COLUMN_PROGRESS = "progress";
+  public static final String COLUMN_CANCELLED = "cancelled";
+  public static final String COLUMN_DONE = "done";
+
   public static final String PATH_WILDCARD = "*";
 
   // data folder name
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeFuture.java 
b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeFuture.java
new file mode 100644
index 0000000..8d42003
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeFuture.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge.manage;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.concurrent.FutureTask;
+import 
org.apache.iotdb.db.engine.merge.task.MergeMultiChunkTask.MergeChunkHeapTask;
+import org.apache.iotdb.db.engine.merge.task.MergeTask;
+
+public abstract class MergeFuture extends FutureTask<Void> implements 
Comparable<MergeFuture> {
+
+  private final DateFormat dateFormat = new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss"
+      + ".SSS'Z'");
+  private Date createdTime;
+
+  public MergeFuture(Callable callable) {
+    super(callable);
+    createdTime = new Date(System.currentTimeMillis());
+  }
+
+  public String getCreatedTime() {
+    return dateFormat.format(createdTime);
+  }
+
+  public abstract String getTaskName();
+
+  public abstract String getProgress();
+
+  @Override
+  public int compareTo(MergeFuture future) {
+    return this.createdTime.compareTo(future.createdTime);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    MergeFuture future = (MergeFuture) o;
+    return Objects.equals(createdTime, future.createdTime);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(createdTime);
+  }
+
+  public static class MainMergeFuture extends MergeFuture {
+
+    private MergeTask bindingTask;
+
+    public MainMergeFuture(MergeTask task) {
+      super(task);
+      bindingTask = task;
+    }
+
+    @Override
+    public String getTaskName() {
+      return bindingTask.getTaskName();
+    }
+
+    @Override
+    public String getProgress() {
+      return bindingTask.getProgress();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      if (!super.equals(o)) {
+        return false;
+      }
+      MainMergeFuture that = (MainMergeFuture) o;
+      return Objects.equals(bindingTask, that.bindingTask);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(super.hashCode(), bindingTask);
+    }
+  }
+
+  public static class SubMergeFuture extends MergeFuture {
+
+    private MergeChunkHeapTask bindingTask;
+
+    public SubMergeFuture(MergeChunkHeapTask task) {
+      super(task);
+      bindingTask = task;
+    }
+
+    @Override
+    public String getTaskName() {
+      return bindingTask.getTaskName();
+    }
+
+    @Override
+    public String getProgress() {
+      return bindingTask.getProgress();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      if (!super.equals(o)) {
+        return false;
+      }
+      SubMergeFuture that = (SubMergeFuture) o;
+      return Objects.equals(bindingTask, that.bindingTask);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(super.hashCode(), bindingTask);
+    }
+  }
+}
+
+
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
index 06de716..1c87a40 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
@@ -19,7 +19,16 @@
 
 package org.apache.iotdb.db.engine.merge.manage;
 
-import java.util.concurrent.Callable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -27,11 +36,14 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.StorageEngine;
+import 
org.apache.iotdb.db.engine.merge.task.MergeMultiChunkTask.MergeChunkHeapTask;
 import org.apache.iotdb.db.engine.merge.task.MergeTask;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.service.IService;
+import org.apache.iotdb.db.service.JMXService;
 import org.apache.iotdb.db.service.ServiceType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,15 +52,23 @@ import org.slf4j.LoggerFactory;
  * MergeManager provides a ThreadPool to queue and run all merge tasks to 
restrain the total
  * resources occupied by merge and manages a Timer to periodically issue a 
global merge.
  */
-public class MergeManager implements IService {
+public class MergeManager implements IService, MergeManagerMBean {
 
   private static final Logger logger = 
LoggerFactory.getLogger(MergeManager.class);
   private static final MergeManager INSTANCE = new MergeManager();
+  private final String mbeanName = String
+      .format("%s:%s=%s", IoTDBConstant.IOTDB_PACKAGE, IoTDBConstant.JMX_TYPE,
+          getID().getJmxName());
 
   private AtomicInteger threadCnt = new AtomicInteger();
   private ThreadPoolExecutor mergeTaskPool;
   private ThreadPoolExecutor mergeChunkSubTaskPool;
   private ScheduledExecutorService timedMergeThreadPool;
+  private ScheduledExecutorService taskCleanerThreadPool;
+
+  // TODO: add to JMX
+  private Map<String, Set<MergeFuture>> storageGroupMainTasks = new 
ConcurrentHashMap<>();
+  private Map<String, Set<MergeFuture>> storageGroupSubTasks = new 
ConcurrentHashMap<>();
 
   private MergeManager() {
   }
@@ -58,15 +78,20 @@ public class MergeManager implements IService {
   }
 
   public void submitMainTask(MergeTask mergeTask) {
-    mergeTaskPool.submit(mergeTask);
+    MergeFuture future = (MergeFuture) mergeTaskPool.submit(mergeTask);
+    storageGroupMainTasks.computeIfAbsent(mergeTask.getStorageGroupName(),
+        k -> new ConcurrentSkipListSet<>()).add(future);
   }
 
-  public Future submitChunkSubTask(Callable callable) {
-    return mergeChunkSubTaskPool.submit(callable);
+  public Future<Void> submitChunkSubTask(MergeChunkHeapTask task) {
+    MergeFuture future = (MergeFuture) mergeChunkSubTaskPool.submit(task);
+    storageGroupSubTasks.computeIfAbsent(task.getStorageGroupName(), k -> new 
ConcurrentSkipListSet<>()).add(future);
+    return future;
   }
 
   @Override
   public void start() {
+    JMXService.registerMBean(this, mbeanName);
     if (mergeTaskPool == null) {
       int threadNum = 
IoTDBDescriptor.getInstance().getConfig().getMergeThreadNum();
       if (threadNum <= 0) {
@@ -78,11 +103,9 @@ public class MergeManager implements IService {
         chunkSubThreadNum = 1;
       }
 
-      mergeTaskPool =
-          (ThreadPoolExecutor) Executors.newFixedThreadPool(threadNum,
+      mergeTaskPool = new MergeThreadPool(threadNum,
               r -> new Thread(r, "MergeThread-" + 
threadCnt.getAndIncrement()));
-      mergeChunkSubTaskPool =
-          (ThreadPoolExecutor) Executors.newFixedThreadPool(threadNum * 
chunkSubThreadNum,
+      mergeChunkSubTaskPool = new MergeThreadPool(threadNum * 
chunkSubThreadNum,
               r -> new Thread(r, "MergeChunkSubThread-" + 
threadCnt.getAndIncrement()));
       long mergeInterval = 
IoTDBDescriptor.getInstance().getConfig().getMergeIntervalSec();
       if (mergeInterval > 0) {
@@ -91,6 +114,10 @@ public class MergeManager implements IService {
         timedMergeThreadPool.scheduleAtFixedRate(this::mergeAll, mergeInterval,
             mergeInterval, TimeUnit.SECONDS);
       }
+
+      taskCleanerThreadPool = Executors.newSingleThreadScheduledExecutor( r -> 
new Thread(r,
+          "MergeTaskCleaner"));
+      taskCleanerThreadPool.scheduleAtFixedRate(this::cleanFinishedTask, 30, 
30, TimeUnit.MINUTES);
       logger.info("MergeManager started");
     }
   }
@@ -102,6 +129,8 @@ public class MergeManager implements IService {
         timedMergeThreadPool.shutdownNow();
         timedMergeThreadPool = null;
       }
+      taskCleanerThreadPool.shutdownNow();
+      taskCleanerThreadPool = null;
       mergeTaskPool.shutdownNow();
       mergeChunkSubTaskPool.shutdownNow();
       logger.info("Waiting for task pool to shut down");
@@ -116,6 +145,7 @@ public class MergeManager implements IService {
       mergeTaskPool = null;
       logger.info("MergeManager stopped");
     }
+    JMXService.deregisterMBean(mbeanName);
   }
 
   @Override
@@ -125,6 +155,9 @@ public class MergeManager implements IService {
         awaitTermination(timedMergeThreadPool, millseconds);
         timedMergeThreadPool = null;
       }
+      awaitTermination(taskCleanerThreadPool, millseconds);
+      taskCleanerThreadPool = null;
+
       awaitTermination(mergeTaskPool, millseconds);
       awaitTermination(mergeChunkSubTaskPool, millseconds);
       logger.info("Waiting for task pool to shut down");
@@ -164,4 +197,146 @@ public class MergeManager implements IService {
       logger.error("Cannot perform a global merge because", e);
     }
   }
+
+  /**
+   * Abort all merges of a storage group. The caller must acquire the write 
lock of the
+   * corresponding storage group.
+   * @param storageGroup
+   */
+  @Override
+  public void abortMerge(String storageGroup) {
+    // abort sub-tasks first
+    Set<MergeFuture> subTasks = storageGroupSubTasks
+        .getOrDefault(storageGroup, Collections.emptySet());
+    Iterator<MergeFuture> subIterator = subTasks.iterator();
+    while (subIterator.hasNext()) {
+      Future<Void> next = subIterator.next();
+      if (!next.isDone() && !next.isCancelled()) {
+        next.cancel(true);
+      }
+      subIterator.remove();
+    }
+    // abort main tasks
+    Set<MergeFuture> mainTasks = storageGroupMainTasks
+        .getOrDefault(storageGroup, Collections.emptySet());
+    Iterator<MergeFuture> mainIterator = mainTasks.iterator();
+    while (mainIterator.hasNext()) {
+      Future<Void> next = mainIterator.next();
+      if (!next.isDone() && !next.isCancelled()) {
+        next.cancel(true);
+      }
+      mainIterator.remove();
+    }
+  }
+
+  private void cleanFinishedTask() {
+    for (Set<MergeFuture> subTasks : storageGroupSubTasks.values()) {
+      subTasks.removeIf(next -> next.isDone() || next.isCancelled());
+    }
+    for (Set<MergeFuture> mainTasks : storageGroupMainTasks.values()) {
+      mainTasks.removeIf(next -> next.isDone() || next.isCancelled());
+    }
+  }
+
+  /**
+   *
+   * @return 2 maps, the first map contains status of main merge tasks and the 
second map
+   * contains status of merge chunk heap tasks, both map use storage groups as 
keys and list of
+   * merge status as values.
+   */
+  public Map<String, List<TaskStatus>>[] collectTaskStatus() {
+    Map<String, List<TaskStatus>>[] result = new Map[2];
+    result[0] = new HashMap<>();
+    result[1] = new HashMap<>();
+    for (Entry<String, Set<MergeFuture>> stringSetEntry : 
storageGroupMainTasks.entrySet()) {
+      String storageGroup = stringSetEntry.getKey();
+      Set<MergeFuture> tasks = stringSetEntry.getValue();
+      for (MergeFuture task : tasks) {
+        result[0].computeIfAbsent(storageGroup, s -> new 
ArrayList<>()).add(new TaskStatus(task));
+      }
+    }
+
+    for (Entry<String, Set<MergeFuture>> stringSetEntry : 
storageGroupSubTasks.entrySet()) {
+      String storageGroup = stringSetEntry.getKey();
+      Set<MergeFuture> tasks = stringSetEntry.getValue();
+      for (MergeFuture task : tasks) {
+        result[1].computeIfAbsent(storageGroup, s -> new 
ArrayList<>()).add(new TaskStatus(task));
+      }
+    }
+    return result;
+  }
+
+  public String genMergeTaskReport() {
+    Map<String, List<TaskStatus>>[] statusMaps = collectTaskStatus();
+    StringBuilder builder = new StringBuilder("Main 
tasks:").append(System.lineSeparator());
+    for (Entry<String, List<TaskStatus>> stringListEntry : 
statusMaps[0].entrySet()) {
+      String storageGroup = stringListEntry.getKey();
+      List<TaskStatus> statusList = stringListEntry.getValue();
+      builder.append("\t").append("Storage group: 
").append(storageGroup).append(System.lineSeparator());
+      for (TaskStatus status : statusList) {
+        
builder.append("\t\t").append(status.toString()).append(System.lineSeparator());
+      }
+    }
+
+    builder.append("Sub tasks:").append(System.lineSeparator());
+    for (Entry<String, List<TaskStatus>> stringListEntry : 
statusMaps[1].entrySet()) {
+      String storageGroup = stringListEntry.getKey();
+      List<TaskStatus> statusList = stringListEntry.getValue();
+      builder.append("\t").append("Storage group: 
").append(storageGroup).append(System.lineSeparator());
+      for (TaskStatus status : statusList) {
+        
builder.append("\t\t").append(status.toString()).append(System.lineSeparator());
+      }
+    }
+    return builder.toString();
+  }
+
+  @Override
+  public void printMergeStatus() {
+    if (logger.isInfoEnabled()) {
+      logger.info("Running tasks:\n {}", genMergeTaskReport());
+    }
+  }
+
+  public static class TaskStatus {
+    private String taskName;
+    private String createdTime;
+    private String progress;
+    private boolean isDone;
+    private boolean isCancelled;
+
+    public TaskStatus(MergeFuture future) {
+      this.taskName = future.getTaskName();
+      this.createdTime = future.getCreatedTime();
+      this.progress = future.getProgress();
+      this.isCancelled = future.isCancelled();
+      this.isDone = future.isDone();
+    }
+
+    @Override
+    public String toString() {
+      return String.format("%s, "
+              + "%s, %s, done:%s, cancelled:%s", taskName,
+          createdTime, progress, isDone, isCancelled);
+    }
+
+    public String getTaskName() {
+      return taskName;
+    }
+
+    public String getCreatedTime() {
+      return createdTime;
+    }
+
+    public String getProgress() {
+      return progress;
+    }
+
+    public boolean isDone() {
+      return isDone;
+    }
+
+    public boolean isCancelled() {
+      return isCancelled;
+    }
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManagerMBean.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManagerMBean.java
new file mode 100644
index 0000000..645f51f
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManagerMBean.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge.manage;
+
+public interface MergeManagerMBean {
+  void printMergeStatus();
+
+  void abortMerge(String storageGroup);
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeThreadPool.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeThreadPool.java
new file mode 100644
index 0000000..0ac292f
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeThreadPool.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge.manage;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.iotdb.db.engine.merge.manage.MergeFuture.MainMergeFuture;
+import org.apache.iotdb.db.engine.merge.manage.MergeFuture.SubMergeFuture;
+import 
org.apache.iotdb.db.engine.merge.task.MergeMultiChunkTask.MergeChunkHeapTask;
+import org.apache.iotdb.db.engine.merge.task.MergeTask;
+
+public class MergeThreadPool extends ThreadPoolExecutor {
+
+  public MergeThreadPool(int corePoolSize, ThreadFactory threadFactory) {
+    super(corePoolSize, corePoolSize, 0, TimeUnit.MILLISECONDS, new 
LinkedBlockingQueue<>(),
+        threadFactory);
+  }
+
+  @Override
+  protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
+    if (callable instanceof MergeTask) {
+      return (RunnableFuture<T>) new MainMergeFuture((MergeTask) callable);
+    } else {
+      return (RunnableFuture<T>) new SubMergeFuture((MergeChunkHeapTask) 
callable);
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java 
b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
index 15cfa6e..4a2efbf 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
@@ -65,6 +65,9 @@ class MergeFileTask {
 
   private FSFactory fsFactory = FSFactoryProducer.getFSFactory();
 
+  private int currentMergeIndex;
+  private String currMergeFile;
+
   MergeFileTask(String taskName, MergeContext context, MergeLogger mergeLogger,
       MergeResource resource, List<TsFileResource> unmergedSeqFiles) {
     this.taskName = taskName;
@@ -81,8 +84,11 @@ class MergeFileTask {
       logger.info("{} starts to merge {} files", taskName, 
unmergedFiles.size());
     }
     long startTime = System.currentTimeMillis();
-    int cnt = 0;
-    for (TsFileResource seqFile : unmergedFiles) {
+    for (int i = 0; i < unmergedFiles.size(); i++) {
+      TsFileResource seqFile = unmergedFiles.get(i);
+      currentMergeIndex = i;
+      currMergeFile = seqFile.getPath();
+
       int mergedChunkNum = context.getMergedChunkCnt().getOrDefault(seqFile, 
0);
       int unmergedChunkNum = 
context.getUnmergedChunkCnt().getOrDefault(seqFile, 0);
       if (mergedChunkNum >= unmergedChunkNum) {
@@ -102,10 +108,13 @@ class MergeFileTask {
         }
         moveMergedToOld(seqFile);
       }
-      cnt++;
-      if (logger.isInfoEnabled()) {
-        logger.debug("{} has merged {}/{} files", taskName, cnt, 
unmergedFiles.size());
+
+      if (Thread.interrupted()) {
+        Thread.currentThread().interrupt();
+        return;
       }
+
+      logProgress();
     }
     if (logger.isInfoEnabled()) {
       logger.info("{} has merged all files after {}ms", taskName,
@@ -114,6 +123,18 @@ class MergeFileTask {
     mergeLogger.logMergeEnd();
   }
 
+  private void logProgress() {
+    if (logger.isInfoEnabled()) {
+      logger.debug("{} has merged {}, processed {}/{} files", taskName, 
currMergeFile,
+          currentMergeIndex + 1, unmergedFiles.size());
+    }
+  }
+
+  public String getProgress() {
+    return String.format("Merging %s, processed %d/%d files", currMergeFile,
+        currentMergeIndex + 1, unmergedFiles.size());
+  }
+
   private void moveMergedToOld(TsFileResource seqFile) throws IOException {
     int mergedChunkNum = context.getMergedChunkCnt().getOrDefault(seqFile, 0);
     if (mergedChunkNum == 0) {
@@ -127,17 +148,8 @@ class MergeFileTask {
       
FileReaderManager.getInstance().closeFileAndRemoveReader(seqFile.getPath());
 
       resource.removeFileReader(seqFile);
-      TsFileIOWriter oldFileWriter;
-      try {
-        oldFileWriter = new ForceAppendTsFileWriter(seqFile.getFile());
-        mergeLogger.logFileMergeStart(seqFile.getFile(),
-            ((ForceAppendTsFileWriter) oldFileWriter).getTruncatePosition());
-        logger.debug("{} moving merged chunks of {} to the old file", 
taskName, seqFile);
-        ((ForceAppendTsFileWriter) oldFileWriter).doTruncate();
-      } catch (TsFileNotCompleteException e) {
-        // this file may already be truncated if this merge is a system reboot 
merge
-        oldFileWriter = new RestorableTsFileIOWriter(seqFile.getFile());
-      }
+      TsFileIOWriter oldFileWriter = getOldFileWriter(seqFile);
+
       // filter the chunks that have been merged
       
oldFileWriter.filterChunks(context.getUnmergedChunkStartTimes().get(seqFile));
 
@@ -156,6 +168,13 @@ class MergeFileTask {
           String deviceId = entry.getKey();
           List<ChunkMetadata> chunkMetadataList = entry.getValue();
           writeMergedChunkGroup(chunkMetadataList, deviceId, newFileReader, 
oldFileWriter);
+
+          if (Thread.interrupted()) {
+            Thread.currentThread().interrupt();
+            oldFileWriter.close();
+            restoreOldFile(seqFile);
+            return;
+          }
         }
       }
       oldFileWriter.endFile();
@@ -175,19 +194,50 @@ class MergeFileTask {
               .getFile(nextMergeVersionFile.getAbsolutePath() + 
TsFileResource.RESOURCE_SUFFIX));
       seqFile.setFile(nextMergeVersionFile);
     } catch (Exception e) {
-      RestorableTsFileIOWriter oldFileRecoverWriter = new 
RestorableTsFileIOWriter(
-          seqFile.getFile());
-      if (oldFileRecoverWriter.hasCrashed() && 
oldFileRecoverWriter.canWrite()) {
-        oldFileRecoverWriter.endFile();
-      } else {
-        oldFileRecoverWriter.close();
-      }
+      restoreOldFile(seqFile);
       throw e;
     } finally {
       seqFile.writeUnlock();
     }
   }
 
+  /**
+   * Restore an old seq file which is being written new chunks when exceptions 
occur or the task
+   * is aborted.
+   * @param seqFile
+   * @throws IOException
+   */
+  private void restoreOldFile(TsFileResource seqFile) throws IOException {
+    RestorableTsFileIOWriter oldFileRecoverWriter = new 
RestorableTsFileIOWriter(
+        seqFile.getFile());
+    if (oldFileRecoverWriter.hasCrashed() && oldFileRecoverWriter.canWrite()) {
+      oldFileRecoverWriter.endFile();
+    } else {
+      oldFileRecoverWriter.close();
+    }
+  }
+
+  /**
+   * Open an appending writer for an old seq file so we can add new chunks to 
it.
+   * @param seqFile
+   * @return
+   * @throws IOException
+   */
+  private TsFileIOWriter getOldFileWriter(TsFileResource seqFile) throws 
IOException {
+    TsFileIOWriter oldFileWriter;
+    try {
+      oldFileWriter = new ForceAppendTsFileWriter(seqFile.getFile());
+      mergeLogger.logFileMergeStart(seqFile.getFile(),
+          ((ForceAppendTsFileWriter) oldFileWriter).getTruncatePosition());
+      logger.debug("{} moving merged chunks of {} to the old file", taskName, 
seqFile);
+      ((ForceAppendTsFileWriter) oldFileWriter).doTruncate();
+    } catch (TsFileNotCompleteException e) {
+      // this file may already be truncated if this merge is a system reboot 
merge
+      oldFileWriter = new RestorableTsFileIOWriter(seqFile.getFile());
+    }
+    return oldFileWriter;
+  }
+
   private void updateHistoricalVersions(TsFileResource seqFile) {
     // as the new file contains data of other files, track their versions in 
the new file
     // so that we will be able to compare data across different IoTDBs that 
share the same file
@@ -245,6 +295,12 @@ class MergeFileTask {
         fileWriter.startChunkGroup(path.getDevice());
         long maxVersion = writeUnmergedChunks(chunkStartTimes, 
chunkMetadataList,
             resource.getFileReader(seqFile), fileWriter);
+
+        if (Thread.interrupted()) {
+          Thread.currentThread().interrupt();
+          return;
+        }
+
         fileWriter.writeVersion(maxVersion + 1);
         fileWriter.endChunkGroup();
       }
@@ -303,6 +359,11 @@ class MergeFileTask {
           break;
         }
       }
+
+      if (Thread.interrupted()) {
+        Thread.currentThread().interrupt();
+        return maxVersion;
+      }
     }
     return maxVersion;
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
index 67caf2c..464ce44 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
@@ -28,6 +28,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.PriorityQueue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -57,7 +58,7 @@ import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class MergeMultiChunkTask {
+public class MergeMultiChunkTask {
 
   private static final Logger logger = 
LoggerFactory.getLogger(MergeMultiChunkTask.class);
   private static int minChunkPointNum = 
IoTDBDescriptor.getInstance().getConfig()
@@ -81,9 +82,11 @@ class MergeMultiChunkTask {
   private int concurrentMergeSeriesNum;
   private List<Path> currMergingPaths = new ArrayList<>();
 
-  MergeMultiChunkTask(MergeContext context, String taskName, MergeLogger 
mergeLogger,
+  private String storageGroupName;
+
+  public MergeMultiChunkTask(MergeContext context, String taskName, 
MergeLogger mergeLogger,
       MergeResource mergeResource, boolean fullMerge, List<Path> 
unmergedSeries,
-      int concurrentMergeSeriesNum) {
+      int concurrentMergeSeriesNum, String storageGroupName) {
     this.mergeContext = context;
     this.taskName = taskName;
     this.mergeLogger = mergeLogger;
@@ -91,6 +94,7 @@ class MergeMultiChunkTask {
     this.fullMerge = fullMerge;
     this.unmergedSeries = unmergedSeries;
     this.concurrentMergeSeriesNum = concurrentMergeSeriesNum;
+    this.storageGroupName = storageGroupName;
   }
 
   void mergeSeries() throws IOException {
@@ -110,6 +114,11 @@ class MergeMultiChunkTask {
         currMergingPaths = pathSelector.next();
         mergePaths();
         resource.clearChunkWriterCache();
+        if (Thread.interrupted()) {
+          logger.info("MergeMultiChunkTask {} aborted", taskName);
+          Thread.currentThread().interrupt();
+          return;
+        }
         mergedSeriesCnt += currMergingPaths.size();
         logMergeProgress();
       }
@@ -131,6 +140,10 @@ class MergeMultiChunkTask {
     }
   }
 
+  public String getProgress() {
+    return String.format("Processed %d/%d series", mergedSeriesCnt, 
unmergedSeries.size());
+  }
+
   private void mergePaths() throws IOException {
     mergeLogger.logTSStart(currMergingPaths);
     IPointReader[] unseqReaders;
@@ -144,6 +157,11 @@ class MergeMultiChunkTask {
 
     for (int i = 0; i < resource.getSeqFiles().size(); i++) {
       pathsMergeOneFile(i, unseqReaders);
+
+      if (Thread.interrupted()) {
+        Thread.currentThread().interrupt();
+        return;
+      }
     }
     mergeLogger.logTSEnd();
   }
@@ -153,7 +171,7 @@ class MergeMultiChunkTask {
     TsFileResource currTsFile = resource.getSeqFiles().get(seqFileIdx);
     String deviceId = currMergingPaths.get(0).getDevice();
     long currDeviceMinTime = currTsFile.getStartTime(deviceId);
-    //COMMENTS: is this correct? how about if there are other devices (in the 
currMergingPaths) that have unseq data?
+    // all paths in one call are from the same device
     if (currDeviceMinTime == Long.MAX_VALUE) {
       return;
     }
@@ -178,6 +196,11 @@ class MergeMultiChunkTask {
       modifications[i] = resource.getModifications(currTsFile, 
currMergingPaths.get(i));
       seqChunkMeta[i] = resource.queryChunkMetadata(currMergingPaths.get(i), 
currTsFile);
       modifyChunkMetaData(seqChunkMeta[i], modifications[i]);
+
+      if (Thread.interrupted()) {
+        Thread.currentThread().interrupt();
+        return;
+      }
     }
 
     List<Integer> unskippedPathIndices = filterNoDataPaths(seqChunkMeta, 
seqFileIdx);
@@ -245,22 +268,28 @@ class MergeMultiChunkTask {
     mergedChunkNum.set(0);
     unmergedChunkNum.set(0);
 
-    List<Future> futures = new ArrayList<>();
+    List<Future<Void>> futures = new ArrayList<>();
     for (int i = 0; i < mergeChunkSubTaskNum; i++) {
-      int finalI = i;
-      futures.add(MergeManager.getINSTANCE().submitChunkSubTask(() -> {
-        mergeChunkHeap(chunkIdxHeaps[finalI], metaListEntries, ptWrittens,
-            reader,
-            mergeFileWriter, unseqReaders,
-            currFile,
-            isLastFile);
-        return null;
-      }));
+      futures.add(MergeManager.getINSTANCE()
+          .submitChunkSubTask(new MergeChunkHeapTask(chunkIdxHeaps[i],
+              metaListEntries, ptWrittens,
+              reader,
+              mergeFileWriter, unseqReaders,
+              currFile,
+              isLastFile, i)));
+
+      if (Thread.interrupted()) {
+        Thread.currentThread().interrupt();
+        return false;
+      }
     }
     for (int i = 0; i < mergeChunkSubTaskNum; i++) {
       try {
         futures.get(i).get();
-      } catch (InterruptedException | ExecutionException e) {
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        return false;
+      } catch (ExecutionException e) {
         throw new IOException(e);
       }
     }
@@ -274,56 +303,7 @@ class MergeMultiChunkTask {
     return mergedChunkNum.get() > 0;
   }
 
-  private void mergeChunkHeap(PriorityQueue<Integer> chunkIdxHeap, 
MetaListEntry[] metaListEntries,
-      int[] ptWrittens,
-      TsFileSequenceReader reader,
-      RestorableTsFileIOWriter mergeFileWriter, IPointReader[] unseqReaders,
-      TsFileResource currFile,
-      boolean isLastFile) throws IOException {
-    while (!chunkIdxHeap.isEmpty()) {
-      int pathIdx = chunkIdxHeap.poll();
-      Path path = currMergingPaths.get(pathIdx);
-      MeasurementSchema measurementSchema = resource.getSchema(path);
-      IChunkWriter chunkWriter = resource.getChunkWriter(measurementSchema);
-      if (metaListEntries[pathIdx] != null) {
-        MetaListEntry metaListEntry = metaListEntries[pathIdx];
-        ChunkMetadata currMeta = metaListEntry.current();
-        boolean isLastChunk = !metaListEntry.hasNext();
-        boolean chunkOverflowed = MergeUtils
-            .isChunkOverflowed(currTimeValuePairs[pathIdx], currMeta);
-        boolean chunkTooSmall = MergeUtils
-            .isChunkTooSmall(ptWrittens[pathIdx], currMeta, isLastChunk, 
minChunkPointNum);
-
-        Chunk chunk;
-        synchronized (reader) {
-          chunk = reader.readMemChunk(currMeta);
-        }
-        ptWrittens[pathIdx] = mergeChunkV2(currMeta, chunkOverflowed, 
chunkTooSmall, chunk,
-            ptWrittens[pathIdx], pathIdx, mergeFileWriter, 
unseqReaders[pathIdx], chunkWriter,
-            currFile);
-
-        if (!isLastChunk) {
-          metaListEntry.next();
-          chunkIdxHeap.add(pathIdx);
-          continue;
-        }
-      }
-      // this only happens when the seqFiles do not contain this series, 
otherwise the remaining
-      // data will be merged with the last chunk in the seqFiles
-      if (isLastFile && currTimeValuePairs[pathIdx] != null) {
-        ptWrittens[pathIdx] += writeRemainingUnseq(chunkWriter, 
unseqReaders[pathIdx],
-            Long.MAX_VALUE,
-            pathIdx);
-        mergedChunkNum.incrementAndGet();
-      }
-      // the last merged chunk may still be smaller than the threshold, flush 
it anyway
-      if (ptWrittens[pathIdx] > 0) {
-        synchronized (mergeFileWriter) {
-          chunkWriter.writeToFileWriter(mergeFileWriter);
-        }
-      }
-    }
-  }
+
 
   /**
    * merge a sequence chunk SK
@@ -338,6 +318,7 @@ class MergeMultiChunkTask {
    * 3. other cases: need to unCompress the chunk and write 3.1 SK isn't 
overflowed 3.2 SK is
    * overflowed
    */
+  @SuppressWarnings("java:S2445") // avoid writing the same writer concurrently
   private int mergeChunkV2(ChunkMetadata currMeta, boolean chunkOverflowed,
       boolean chunkTooSmall, Chunk chunk, int lastUnclosedChunkPoint, int 
pathIdx,
       TsFileIOWriter mergeFileWriter, IPointReader unseqReader,
@@ -379,7 +360,7 @@ class MergeMultiChunkTask {
     }
 
     // update points written statistics
-    mergeContext.incTotalPointWritten(unclosedChunkPoint - 
lastUnclosedChunkPoint);
+    mergeContext.incTotalPointWritten((long) unclosedChunkPoint - 
lastUnclosedChunkPoint);
     if (minChunkPointNum > 0 && unclosedChunkPoint >= minChunkPointNum
         || unclosedChunkPoint > 0 && minChunkPointNum < 0) {
       // the new chunk's size is large enough and it should be flushed
@@ -445,4 +426,107 @@ class MergeMultiChunkTask {
     }
     return cnt;
   }
+
+  public class MergeChunkHeapTask implements Callable<Void> {
+
+    private PriorityQueue<Integer> chunkIdxHeap;
+    private MetaListEntry[] metaListEntries;
+    private int[] ptWrittens;
+    private TsFileSequenceReader reader;
+    private RestorableTsFileIOWriter mergeFileWriter;
+    private IPointReader[] unseqReaders;
+    private TsFileResource currFile;
+    private boolean isLastFile;
+    private int taskNum;
+
+    private int totalSeriesNum;
+
+    public MergeChunkHeapTask(PriorityQueue<Integer> chunkIdxHeap,
+        MetaListEntry[] metaListEntries, int[] ptWrittens,
+        TsFileSequenceReader reader,
+        RestorableTsFileIOWriter mergeFileWriter,
+        IPointReader[] unseqReaders, TsFileResource currFile, boolean 
isLastFile, int taskNum) {
+      this.chunkIdxHeap = chunkIdxHeap;
+      this.metaListEntries = metaListEntries;
+      this.ptWrittens = ptWrittens;
+      this.reader = reader;
+      this.mergeFileWriter = mergeFileWriter;
+      this.unseqReaders = unseqReaders;
+      this.currFile = currFile;
+      this.isLastFile = isLastFile;
+      this.taskNum = taskNum;
+      this.totalSeriesNum = chunkIdxHeap.size();
+    }
+
+    @Override
+    public Void call() throws Exception {
+      mergeChunkHeap();
+      return null;
+    }
+
+    @SuppressWarnings("java:S2445") // avoid reading the same reader 
concurrently
+    private void mergeChunkHeap() throws IOException {
+      while (!chunkIdxHeap.isEmpty()) {
+        int pathIdx = chunkIdxHeap.poll();
+        Path path = currMergingPaths.get(pathIdx);
+        MeasurementSchema measurementSchema = resource.getSchema(path);
+        IChunkWriter chunkWriter = resource.getChunkWriter(measurementSchema);
+        if (Thread.interrupted()) {
+          Thread.currentThread().interrupt();
+          return;
+        }
+
+        if (metaListEntries[pathIdx] != null) {
+          MetaListEntry metaListEntry = metaListEntries[pathIdx];
+          ChunkMetadata currMeta = metaListEntry.current();
+          boolean isLastChunk = !metaListEntry.hasNext();
+          boolean chunkOverflowed = MergeUtils
+              .isChunkOverflowed(currTimeValuePairs[pathIdx], currMeta);
+          boolean chunkTooSmall = MergeUtils
+              .isChunkTooSmall(ptWrittens[pathIdx], currMeta, isLastChunk, 
minChunkPointNum);
+
+          Chunk chunk;
+          synchronized (reader) {
+            chunk = reader.readMemChunk(currMeta);
+          }
+          ptWrittens[pathIdx] = mergeChunkV2(currMeta, chunkOverflowed, 
chunkTooSmall, chunk,
+              ptWrittens[pathIdx], pathIdx, mergeFileWriter, 
unseqReaders[pathIdx], chunkWriter,
+              currFile);
+
+          if (!isLastChunk) {
+            metaListEntry.next();
+            chunkIdxHeap.add(pathIdx);
+            continue;
+          }
+        }
+        // this only happens when the seqFiles do not contain this series, 
otherwise the remaining
+        // data will be merged with the last chunk in the seqFiles
+        if (isLastFile && currTimeValuePairs[pathIdx] != null) {
+          ptWrittens[pathIdx] += writeRemainingUnseq(chunkWriter, 
unseqReaders[pathIdx],
+              Long.MAX_VALUE,
+              pathIdx);
+          mergedChunkNum.incrementAndGet();
+        }
+        // the last merged chunk may still be smaller than the threshold, 
flush it anyway
+        if (ptWrittens[pathIdx] > 0) {
+          synchronized (mergeFileWriter) {
+            chunkWriter.writeToFileWriter(mergeFileWriter);
+          }
+        }
+      }
+    }
+
+    public String getStorageGroupName() {
+      return storageGroupName;
+    }
+
+    public String getTaskName() {
+      return taskName + "_" + taskNum;
+    }
+
+    public String getProgress() {
+      return String.format("Processed %d/%d series", totalSeriesNum - 
chunkIdxHeap.size(),
+          totalSeriesNum);
+    }
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java 
b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
index 6d1a6fc..68476aa 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
@@ -67,6 +67,11 @@ public class MergeTask implements Callable<Void> {
   String taskName;
   boolean fullMerge;
 
+  States states = States.START;
+
+  MergeMultiChunkTask chunkTask;
+  MergeFileTask fileTask;
+
   MergeTask(List<TsFileResource> seqFiles,
       List<TsFileResource> unseqFiles, String storageGroupSysDir, 
MergeCallback callback,
       String taskName, boolean fullMerge, String storageGroupName) {
@@ -96,15 +101,19 @@ public class MergeTask implements Callable<Void> {
       doMerge();
     } catch (Exception e) {
       logger.error("Runtime exception in merge {}", taskName, e);
-      cleanUp(false);
-      // call the callback to make sure the StorageGroup exit merging status, 
but passing 2
-      // empty file lists to avoid files being deleted.
-      callback.call(Collections.emptyList(), Collections.emptyList(), new 
File(storageGroupSysDir, MergeLogger.MERGE_LOG_NAME));
-      throw e;
+      abort();
     }
     return null;
   }
 
+  private void abort() throws IOException {
+    states = States.ABORTED;
+    cleanUp(false);
+    // call the callback to make sure the StorageGroup exit merging status, 
but passing 2
+    // empty file lists to avoid files being deleted.
+    callback.call(Collections.emptyList(), Collections.emptyList(), new 
File(storageGroupSysDir, MergeLogger.MERGE_LOG_NAME));
+  }
+
   private void doMerge() throws IOException, MetadataException {
     if (logger.isInfoEnabled()) {
       logger.info("{} starts to merge {} seqFiles, {} unseqFiles", taskName,
@@ -132,14 +141,30 @@ public class MergeTask implements Callable<Void> {
 
     mergeLogger.logMergeStart();
 
-    MergeMultiChunkTask mergeChunkTask = new MergeMultiChunkTask(mergeContext, 
taskName, mergeLogger, resource,
-        fullMerge, unmergedSeries, concurrentMergeSeriesNum);
-    mergeChunkTask.mergeSeries();
+    chunkTask = new MergeMultiChunkTask(mergeContext, taskName, mergeLogger, 
resource,
+        fullMerge, unmergedSeries, concurrentMergeSeriesNum, storageGroupName);
+    states = States.MERGE_CHUNKS;
+    chunkTask.mergeSeries();
+    if (Thread.interrupted()) {
+      logger.info("Merge task {} aborted", taskName);
+      abort();
+      return;
+    }
+
 
-    MergeFileTask mergeFileTask = new MergeFileTask(taskName, mergeContext, 
mergeLogger, resource,
+    fileTask = new MergeFileTask(taskName, mergeContext, mergeLogger, resource,
         resource.getSeqFiles());
-    mergeFileTask.mergeFiles();
+    states = States.MERGE_FILES;
+    chunkTask = null;
+    fileTask.mergeFiles();
+    if (Thread.interrupted()) {
+      logger.info("Merge task {} aborted", taskName);
+      abort();
+      return;
+    }
 
+    states = States.CLEAN_UP;
+    fileTask = null;
     cleanUp(true);
     if (logger.isInfoEnabled()) {
       double elapsedTime = (double) (System.currentTimeMillis() - startTime) / 
1000.0;
@@ -183,4 +208,36 @@ public class MergeTask implements Callable<Void> {
       logFile.delete();
     }
   }
+
+  public String getStorageGroupName() {
+    return storageGroupName;
+  }
+
+  enum States {
+    START,
+    MERGE_CHUNKS,
+    MERGE_FILES,
+    CLEAN_UP,
+    ABORTED
+  }
+
+  public String getProgress() {
+    switch (states) {
+      case ABORTED:
+        return "Aborted";
+      case CLEAN_UP:
+        return "Cleaning up";
+      case MERGE_FILES:
+        return "Merging files: " + fileTask.getProgress();
+      case MERGE_CHUNKS:
+        return "Merging series: " + chunkTask.getProgress();
+      case START:
+      default:
+        return "Just started";
+    }
+  }
+
+  public String getTaskName() {
+    return taskName;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
index c0beb3d..9857d40 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
@@ -104,7 +104,7 @@ public class RecoverMergeTask extends MergeTask {
       }
 
       MergeMultiChunkTask mergeChunkTask = new 
MergeMultiChunkTask(mergeContext, taskName, mergeLogger, resource,
-          fullMerge, analyzer.getUnmergedPaths(), concurrentMergeSeriesNum);
+          fullMerge, analyzer.getUnmergedPaths(), concurrentMergeSeriesNum, 
storageGroupName);
       analyzer.setUnmergedPaths(null);
       mergeChunkTask.mergeSeries();
 
@@ -158,9 +158,8 @@ public class RecoverMergeTask extends MergeTask {
       long maxChunkNum = chunkNums[1];
       long fileMetaSize = MergeUtils.getFileMetaSize(seqFile, 
resource.getFileReader(seqFile));
       long newSingleSeriesSeqReadCost =  fileMetaSize * maxChunkNum / 
totalChunkNum;
-      singleSeriesSeqReadCost = newSingleSeriesSeqReadCost > 
singleSeriesSeqReadCost ?
-          newSingleSeriesSeqReadCost : singleSeriesSeqReadCost;
-      maxSeqReadCost = fileMetaSize > maxSeqReadCost ? fileMetaSize : 
maxSeqReadCost;
+      singleSeriesSeqReadCost = Math.max(newSingleSeriesSeqReadCost, 
singleSeriesSeqReadCost);
+      maxSeqReadCost = Math.max(fileMetaSize, maxSeqReadCost);
       seqWriteCost += fileMetaSize;
     }
 
@@ -169,10 +168,8 @@ public class RecoverMergeTask extends MergeTask {
     int ub = MaxSeriesMergeFileSelector.MAX_SERIES_NUM;
     int mid = (lb + ub) / 2;
     while (mid != lb) {
-      long unseqCost = singleSeriesUnseqCost * mid < maxUnseqCost ? 
singleSeriesUnseqCost * mid :
-          maxUnseqCost;
-      long seqReadCos = singleSeriesSeqReadCost * mid < maxSeqReadCost ?
-          singleSeriesSeqReadCost * mid : maxSeqReadCost;
+      long unseqCost = Math.min(singleSeriesUnseqCost * mid, maxUnseqCost);
+      long seqReadCos = Math.min(singleSeriesSeqReadCost * mid, 
maxSeqReadCost);
       long totalCost = unseqCost + seqReadCos + seqWriteCost;
       if (totalCost <= memBudget) {
         lb = mid;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/Planner.java 
b/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
index 68031e9..3b31bbe 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
@@ -98,6 +98,8 @@ public class Planner {
       case FLUSH:
       case MERGE:
       case CLEAR_CACHE:
+      case NULL:
+      case SHOW_MERGE_STATUS:
         return operator;
       case QUERY:
       case UPDATE:
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java 
b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
index 468f286..1acf1a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
@@ -150,6 +150,8 @@ public class SQLConstant {
   public static final int TOK_LOAD_CONFIGURATION_GLOBAL = 85;
   public static final int TOK_LOAD_CONFIGURATION_LOCAL = 86;
 
+  public static final int TOK_SHOW_MERGE_STATUS = 87;
+
   public static final Map<Integer, String> tokenSymbol = new HashMap<>();
   public static final Map<Integer, String> tokenNames = new HashMap<>();
   public static final Map<Integer, Integer> reverseWords = new HashMap<>();
@@ -217,6 +219,8 @@ public class SQLConstant {
     tokenNames.put(TOK_LOAD_FILES, "TOK_LOAD_FILES");
     tokenNames.put(TOK_REMOVE_FILE, "TOK_REMOVE_FILE");
     tokenNames.put(TOK_MOVE_FILE, "TOK_MOVE_FILE");
+
+    tokenNames.put(TOK_SHOW_MERGE_STATUS, "TOK_SHOW_MERGE_STATUS");
   }
 
   static {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java 
b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index cce6140..f77ef78 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -18,15 +18,20 @@
  */
 package org.apache.iotdb.db.qp.executor;
 
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CANCELLED;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CHILD_PATHS;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_COLUMN;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_COUNT;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CREATED_TIME;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_DEVICES;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_DONE;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ITEM;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PARAMETER;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PRIVILEGE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PROGRESS;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ROLE;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_STORAGE_GROUP;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TASK_NAME;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES_ALIAS;
 import static 
org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES_COMPRESSION;
@@ -48,6 +53,7 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeSet;
 
@@ -67,6 +73,8 @@ import org.apache.iotdb.db.engine.cache.ChunkCache;
 import org.apache.iotdb.db.engine.cache.ChunkMetadataCache;
 import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
 import org.apache.iotdb.db.engine.flush.pool.FlushTaskPoolManager;
+import org.apache.iotdb.db.engine.merge.manage.MergeManager;
+import org.apache.iotdb.db.engine.merge.manage.MergeManager.TaskStatus;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.DeleteFailedException;
@@ -349,6 +357,8 @@ public class PlanExecutor implements IPlanExecutor {
         return processCountNodeTimeSeries((CountPlan) showPlan);
       case COUNT_NODES:
         return processCountNodes((CountPlan) showPlan);
+      case MERGE_STATUS:
+        return processShowMergeStatus();
       default:
         throw new QueryProcessException(String.format("Unrecognized show plan 
%s", showPlan));
     }
@@ -1493,4 +1503,43 @@ public class PlanExecutor implements IPlanExecutor {
   protected void loadConfiguration(LoadConfigurationPlan plan) throws 
QueryProcessException {
     IoTDBDescriptor.getInstance().loadHotModifiedProps();
   }
+
+  private QueryDataSet processShowMergeStatus() {
+    List<Path> headerList = new ArrayList<>();
+    List<TSDataType> typeList = new ArrayList<>();
+    headerList.add(new Path(COLUMN_STORAGE_GROUP));
+    headerList.add(new Path(COLUMN_TASK_NAME));
+    headerList.add(new Path(COLUMN_CREATED_TIME));
+    headerList.add(new Path(COLUMN_PROGRESS));
+    headerList.add(new Path(COLUMN_CANCELLED));
+    headerList.add(new Path(COLUMN_DONE));
+
+    typeList.add(TSDataType.TEXT);
+    typeList.add(TSDataType.TEXT);
+    typeList.add(TSDataType.TEXT);
+    typeList.add(TSDataType.TEXT);
+    typeList.add(TSDataType.BOOLEAN);
+    typeList.add(TSDataType.BOOLEAN);
+    ListDataSet dataSet = new ListDataSet(headerList, typeList);
+    Map<String, List<TaskStatus>>[] taskStatus = 
MergeManager.getINSTANCE().collectTaskStatus();
+    for (Map<String, List<TaskStatus>> statusMap : taskStatus) {
+      for (Entry<String, List<TaskStatus>> stringListEntry : 
statusMap.entrySet()) {
+        for (TaskStatus status : stringListEntry.getValue()) {
+          dataSet.putRecord(toRowRecord(status, stringListEntry.getKey()));
+        }
+      }
+    }
+    return dataSet;
+  }
+
+  public RowRecord toRowRecord(TaskStatus status, String storageGroup) {
+    RowRecord record = new RowRecord(0);
+    record.addField(new Binary(storageGroup), TSDataType.TEXT);
+    record.addField(new Binary(status.getTaskName()), TSDataType.TEXT);
+    record.addField(new Binary(status.getCreatedTime()), TSDataType.TEXT);
+    record.addField(new Binary(status.getProgress()), TSDataType.TEXT);
+    record.addField(status.isCancelled(), TSDataType.BOOLEAN);
+    record.addField(status.isDone(), TSDataType.BOOLEAN);
+    return record;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java 
b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
index 0a849df..746326c 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
@@ -75,6 +75,7 @@ public abstract class Operator {
     LIST_USER_PRIVILEGE, LIST_ROLE_PRIVILEGE, LIST_USER_ROLES, LIST_ROLE_USERS,
     GRANT_WATERMARK_EMBEDDING, REVOKE_WATERMARK_EMBEDDING,
     TTL, DELETE_STORAGE_GROUP, LOAD_CONFIGURATION, SHOW, LOAD_FILES, 
REMOVE_FILE, MOVE_FILE, LAST, GROUP_BY_FILL,
-    ALTER_TIMESERIES, FLUSH, MERGE, FULL_MERGE, CLEAR_CACHE
+    ALTER_TIMESERIES, FLUSH, MERGE, FULL_MERGE, CLEAR_CACHE,
+    SHOW_MERGE_STATUS
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowMergeStatusOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowMergeStatusOperator.java
new file mode 100644
index 0000000..d10cc2e
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowMergeStatusOperator.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.qp.logical.sys;
+
+import org.apache.iotdb.db.qp.logical.RootOperator;
+
+public class ShowMergeStatusOperator extends RootOperator {
+
+  public ShowMergeStatusOperator(int tokenIntType) {
+    super(tokenIntType);
+    setOperatorType(OperatorType.SHOW_MERGE_STATUS);
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowMergeStatusPlan.java
 
b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowMergeStatusPlan.java
new file mode 100644
index 0000000..51c2d52
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowMergeStatusPlan.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.qp.physical.sys;
+
+public class ShowMergeStatusPlan extends ShowPlan {
+
+  public ShowMergeStatusPlan() {
+    super(ShowContentType.MERGE_STATUS);
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java 
b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java
index ced1b80..bde812a 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java
@@ -50,7 +50,7 @@ public class ShowPlan extends PhysicalPlan {
 
   public enum ShowContentType {
     DYNAMIC_PARAMETER, FLUSH_TASK_INFO, TTL, VERSION, TIMESERIES, 
STORAGE_GROUP, CHILD_PATH, DEVICES,
-    COUNT_TIMESERIES, COUNT_NODE_TIMESERIES, COUNT_NODES
+    COUNT_TIMESERIES, COUNT_NODE_TIMESERIES, COUNT_NODES, MERGE_STATUS
   }
 
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java 
b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
index 43e4c4a..8048f86 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
@@ -65,6 +65,7 @@ import 
org.apache.iotdb.db.qp.logical.sys.SetStorageGroupOperator;
 import org.apache.iotdb.db.qp.logical.sys.SetTTLOperator;
 import org.apache.iotdb.db.qp.logical.sys.ShowChildPathsOperator;
 import org.apache.iotdb.db.qp.logical.sys.ShowDevicesOperator;
+import org.apache.iotdb.db.qp.logical.sys.ShowMergeStatusOperator;
 import org.apache.iotdb.db.qp.logical.sys.ShowOperator;
 import org.apache.iotdb.db.qp.logical.sys.ShowTTLOperator;
 import org.apache.iotdb.db.qp.logical.sys.ShowTimeSeriesOperator;
@@ -141,6 +142,7 @@ import 
org.apache.iotdb.db.qp.strategy.SqlBaseParser.SetTTLStatementContext;
 import 
org.apache.iotdb.db.qp.strategy.SqlBaseParser.ShowAllTTLStatementContext;
 import org.apache.iotdb.db.qp.strategy.SqlBaseParser.ShowChildPathsContext;
 import org.apache.iotdb.db.qp.strategy.SqlBaseParser.ShowDevicesContext;
+import org.apache.iotdb.db.qp.strategy.SqlBaseParser.ShowMergeStatusContext;
 import org.apache.iotdb.db.qp.strategy.SqlBaseParser.ShowStorageGroupContext;
 import org.apache.iotdb.db.qp.strategy.SqlBaseParser.ShowTTLStatementContext;
 import org.apache.iotdb.db.qp.strategy.SqlBaseParser.ShowTimeseriesContext;
@@ -1610,4 +1612,10 @@ public class LogicalGenerator extends 
SqlBaseBaseListener {
               String.format("encoding %s does not support %s", tsEncoding, 
tsDataType));
     }
   }
+
+  @Override
+  public void enterShowMergeStatus(ShowMergeStatusContext ctx) {
+    super.enterShowMergeStatus(ctx);
+    initializedOperator = new 
ShowMergeStatusOperator(SQLConstant.TOK_SHOW_MERGE_STATUS);
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java 
b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index 644b99b..abd2d23 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -82,6 +82,7 @@ import 
org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
 import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowChildPathsPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowMergeStatusPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowPlan.ShowContentType;
 import org.apache.iotdb.db.qp.physical.sys.ShowTTLPlan;
@@ -271,6 +272,8 @@ public class PhysicalGenerator {
             OperatorType.MOVE_FILE);
       case CLEAR_CACHE:
         return new ClearCachePlan();
+      case SHOW_MERGE_STATUS:
+        return new ShowMergeStatusPlan();
       default:
         throw new LogicalOperatorException(operator.getType().toString(), "");
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/RPCService.java 
b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
index f3edd9f..2ae9928 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
@@ -94,8 +94,8 @@ public class RPCService implements RPCServiceMBean, IService {
 
   @Override
   public void start() throws StartupException {
-      JMXService.registerMBean(getInstance(), mbeanName);
-      startService();
+    JMXService.registerMBean(getInstance(), mbeanName);
+    startService();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java 
b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
index e1498bc..ad1569b 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
@@ -36,7 +36,7 @@ public enum ServiceType {
   FILE_READER_MANAGER_SERVICE("File reader manager ServerService", ""),
   SYNC_SERVICE("SYNC ServerService", ""),
   UPGRADE_SERVICE("UPGRADE DataService", ""),
-  MERGE_SERVICE("Merge Manager", ""),
+  MERGE_SERVICE("Merge Manager", "Merge Manager"),
   PERFORMANCE_STATISTIC_SERVICE("PERFORMANCE_STATISTIC_SERVICE", 
"PERFORMANCE_STATISTIC_SERVICE"),
   MANAGE_DYNAMIC_PARAMETERS_SERVICE("Manage Dynamic Parameters", "Manage 
Dynamic Parameters"),
   TVLIST_ALLOCATOR_SERVICE("TVList Allocator", ""),
diff --git a/server/src/main/java/org/apache/iotdb/db/service/StaticResps.java 
b/server/src/main/java/org/apache/iotdb/db/service/StaticResps.java
index 26fe5f8..6b0a0c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/StaticResps.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/StaticResps.java
@@ -19,15 +19,20 @@
 
 package org.apache.iotdb.db.service;
 
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CANCELLED;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CHILD_PATHS;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_COLUMN;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_COUNT;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CREATED_TIME;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_DEVICES;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_DONE;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ITEM;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PARAMETER;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PRIVILEGE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PROGRESS;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ROLE;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_STORAGE_GROUP;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TASK_NAME;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES;
 import static 
org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES_COMPRESSION;
 import static 
org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES_DATATYPE;
@@ -123,6 +128,13 @@ class StaticResps {
       Arrays.asList(TSDataType.TEXT.toString(), TSDataType.TEXT.toString()), 
false
   );
 
+  static final TSExecuteStatementResp MERGE_STATUS_RESP = getNoTimeExecuteResp(
+      Arrays.asList(COLUMN_STORAGE_GROUP, COLUMN_TASK_NAME, 
COLUMN_CREATED_TIME, COLUMN_PROGRESS,
+          COLUMN_CANCELLED, COLUMN_DONE),
+      Arrays.asList(TSDataType.TEXT.toString(), TSDataType.TEXT.toString(),
+          TSDataType.TEXT.toString(),
+          TSDataType.TEXT.toString(), TSDataType.BOOLEAN.toString(), 
TSDataType.BOOLEAN.toString()));
+
   private static TSExecuteStatementResp getNoTimeExecuteResp(List<String> 
columns,
       List<String> dataTypes) {
     return getExecuteResp(columns, dataTypes, true);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java 
b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 7ae24a9..fe79492 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -633,6 +633,8 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
         return StaticResps.COUNT_NODES;
       case COUNT_TIMESERIES:
         return StaticResps.COUNT_TIMESERIES;
+      case MERGE_STATUS:
+        return StaticResps.MERGE_STATUS_RESP;
       default:
         logger.error("Unsupported show content type: {}", 
showPlan.getShowContentType());
         throw new QueryProcessException(
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeManagerTest.java 
b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeManagerTest.java
new file mode 100644
index 0000000..abe7660
--- /dev/null
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeManagerTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.PriorityQueue;
+import org.apache.iotdb.db.engine.merge.manage.MergeManager;
+import org.apache.iotdb.db.engine.merge.task.MergeMultiChunkTask;
+import org.apache.iotdb.db.engine.merge.task.MergeTask;
+import org.junit.Test;
+
+public class MergeManagerTest extends MergeTest {
+
+  @Test
+  public void testGenMergeReport() {
+    FakedMergeMultiChunkTask chunkTask = new FakedMergeMultiChunkTask();
+    for (int i = 0; i < 5; i++) {
+      MergeManager.getINSTANCE().submitMainTask(new FakedMainMergeTask(i));
+      
MergeManager.getINSTANCE().submitChunkSubTask(chunkTask.createSubTask(i));
+    }
+
+    String report = MergeManager.getINSTANCE().genMergeTaskReport();
+    checkReport(report);
+  }
+
+  @Test
+  public void testAbortMerge() {
+    FakedMergeMultiChunkTask chunkTask = new FakedMergeMultiChunkTask();
+    for (int i = 0; i < 5; i++) {
+      MergeManager.getINSTANCE().submitMainTask(new FakedMainMergeTask(i));
+      
MergeManager.getINSTANCE().submitChunkSubTask(chunkTask.createSubTask(i));
+    }
+
+    MergeManager.getINSTANCE().abortMerge("non-exist");
+    String report = MergeManager.getINSTANCE().genMergeTaskReport();
+
+    checkReport(report);
+
+    MergeManager.getINSTANCE().abortMerge("test");
+    report = MergeManager.getINSTANCE().genMergeTaskReport();
+    assertEquals(String.format("Main tasks:%n"
+        + "Sub tasks:%n"), report);
+  }
+
+  private void checkReport(String report) {
+    String[] split = report.split(System.lineSeparator());
+    assertEquals("Main tasks:", split[0]);
+    assertEquals("\tStorage group: test", split[1]);
+    for (int i = 0; i < 5; i++) {
+      assertTrue(split[2 + i].contains("task" + i));
+      assertTrue(split[2 + i].contains("0,"));
+      assertTrue(split[2 + i].contains("done:false"));
+      assertTrue(split[2 + i].contains("cancelled:false"));
+    }
+    assertEquals("Sub tasks:", split[7]);
+    assertEquals("\tStorage group: test", split[8]);
+    for (int i = 0; i < 5; i++) {
+      assertTrue(split[9 + i].contains("task" + i));
+      assertTrue(split[9 + i].contains("0,"));
+      assertTrue(split[9 + i].contains("done:false"));
+      assertTrue(split[9 + i].contains("cancelled:false"));
+    }
+  }
+
+  static class FakedMainMergeTask extends MergeTask {
+
+    private int serialNum;
+    private String progress = "0";
+
+    public FakedMainMergeTask(int serialNum) {
+      super(null, null, null, null, false,
+          0,
+          null);
+      this.serialNum = serialNum;
+    }
+
+    @Override
+    public Void call() {
+      while (!Thread.currentThread().isInterrupted()) {
+        // wait until interrupt
+      }
+      progress = "1";
+      return null;
+    }
+
+    @Override
+    public String getStorageGroupName() {
+      return "test";
+    }
+
+    @Override
+    public String getProgress() {
+      return progress;
+    }
+
+    @Override
+    public String getTaskName() {
+      return "task" + serialNum;
+    }
+  }
+
+  static class FakedMergeMultiChunkTask extends MergeMultiChunkTask {
+
+    public FakedMergeMultiChunkTask() {
+      super(null, null, null, null, false, null,
+          0, null);
+    }
+
+    public MergeChunkHeapTask createSubTask(int serialNum) {
+      return new FakedSubMergeTask(serialNum);
+    }
+
+    class FakedSubMergeTask extends MergeChunkHeapTask {
+
+      private int serialNum;
+      private String progress = "0";
+
+      public FakedSubMergeTask(int serialNum) {
+        super(new PriorityQueue<>(), null, null, null, null, null, null, 
false, serialNum);
+        this.serialNum = serialNum;
+      }
+
+      @Override
+      public Void call() {
+        while (!Thread.currentThread().isInterrupted()) {
+          // wait until interrupt
+        }
+        progress = "1";
+        return null;
+      }
+
+      @Override
+      public String getStorageGroupName() {
+        return "test";
+      }
+
+      @Override
+      public String getProgress() {
+        return progress;
+      }
+
+      @Override
+      public String getTaskName() {
+        return "task" + serialNum;
+      }
+    }
+  }
+
+}
diff --git 
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java 
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java
index 35d402f..e2313b4 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java
@@ -28,7 +28,6 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.jdbc.Config;
 import org.junit.After;
@@ -169,7 +168,7 @@ public class IoTDBMergeTest {
   }
 
   @Test
-  public void testCrossPartition() throws SQLException, StorageEngineException 
{
+  public void testCrossPartition() throws SQLException {
     logger.info("testCrossPartition...");
     try (Connection connection = DriverManager
         .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", 
"root");
@@ -229,4 +228,48 @@ public class IoTDBMergeTest {
       assertEquals(10000, cnt);
     }
   }
+
+  @Test
+  public void testShowMergeStatus() throws SQLException {
+    try (Connection connection = DriverManager
+        .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", 
"root");
+        Statement statement = connection.createStatement()) {
+      statement.execute("SET STORAGE GROUP TO root.mergeTest");
+      for (int i = 1; i <= 3; i++) {
+        try {
+          statement.execute("CREATE TIMESERIES root.mergeTest.s" + i + " WITH 
DATATYPE=INT64,"
+              + "ENCODING=PLAIN");
+        } catch (SQLException e) {
+          // ignore
+        }
+      }
+
+      for (int j = 1; j <= 10; j++) {
+        statement.execute(String.format("INSERT INTO 
root.mergeTest(timestamp,s1,s2,s3) VALUES (%d,%d,"
+            + "%d,%d)", j, j+1, j+2, j+3));
+      }
+      statement.execute("FLUSH");
+      for (int j = 1; j <= 10; j++) {
+        statement.execute(String.format("INSERT INTO 
root.mergeTest(timestamp,s1,s2,s3) VALUES (%d,%d,"
+            + "%d,%d)", j, j+10, j+20, j+30));
+      }
+      statement.execute("FLUSH");
+      statement.execute("MERGE");
+
+      int cnt;
+      try (ResultSet resultSet = statement.executeQuery("SHOW MERGE STATUS")) {
+        cnt = 0;
+        int colNum = resultSet.getMetaData().getColumnCount();
+        while (resultSet.next()) {
+          StringBuilder stringBuilder = new StringBuilder();
+          for (int i = 0; i < colNum; i++) {
+            stringBuilder.append(resultSet.getString(i + 1)).append(",");
+          }
+          System.out.println(stringBuilder.toString());
+          cnt++;
+        }
+      }
+      assertEquals(1, cnt);
+    }
+  }
 }

Reply via email to