This is an automated email from the ASF dual-hosted git repository.

neuyilan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c046fd3c93 [IOTDB-4109] Optimize merge、flush、clear cache operation 
(#6956)
c046fd3c93 is described below

commit c046fd3c936a1dd4d8c164f4e331ef6364115960
Author: 任宇华 <[email protected]>
AuthorDate: Fri Aug 12 11:41:54 2022 +0800

    [IOTDB-4109] Optimize merge、flush、clear cache operation (#6956)
    
    * Optimize merge、flush、clear cache operation
    
    * fix bug
    
    Co-authored-by: renyuhua <[email protected]>
---
 .../iotdb/confignode/manager/ConfigManager.java    | 10 +--
 .../apache/iotdb/confignode/manager/IManager.java  |  9 ++-
 .../iotdb/confignode/manager/NodeManager.java      | 25 +-----
 .../thrift/ConfigNodeRPCServiceProcessor.java      | 10 +--
 .../Maintenance-Tools/Maintenance-Command.md       |  6 +-
 .../Maintenance-Tools/Maintenance-Command.md       |  6 +-
 .../apache/iotdb/db/client/ConfigNodeClient.java   | 10 +--
 .../config/executor/ClusterConfigTaskExecutor.java | 89 ++++++++++++----------
 .../config/executor/IConfigTaskExecutor.java       |  8 +-
 .../executor/StandaloneConfigTaskExecutor.java     | 17 ++---
 .../plan/execution/config/sys/ClearCacheTask.java  | 13 +---
 .../mpp/plan/execution/config/sys/FlushTask.java   | 10 +--
 .../mpp/plan/execution/config/sys/MergeTask.java   | 12 +--
 thrift-commons/src/main/thrift/common.thrift       |  1 -
 .../src/main/thrift/confignode.thrift              | 13 +---
 15 files changed, 88 insertions(+), 151 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 89ed6935c5..4bb53f4f3c 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -75,7 +75,6 @@ import org.apache.iotdb.confignode.persistence.UDFInfo;
 import org.apache.iotdb.confignode.persistence.executor.ConfigPlanExecutor;
 import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
 import org.apache.iotdb.confignode.persistence.schema.ClusterSchemaInfo;
-import org.apache.iotdb.confignode.rpc.thrift.TClearCacheReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo;
@@ -84,7 +83,6 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
-import org.apache.iotdb.confignode.rpc.thrift.TMergeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
@@ -809,10 +807,10 @@ public class ConfigManager implements IManager {
   }
 
   @Override
-  public TSStatus merge(TMergeReq req) {
+  public TSStatus merge() {
     TSStatus status = confirmLeader();
     return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
-        ? RpcUtils.squashResponseStatusList(nodeManager.merge(req))
+        ? RpcUtils.squashResponseStatusList(nodeManager.merge())
         : status;
   }
 
@@ -825,10 +823,10 @@ public class ConfigManager implements IManager {
   }
 
   @Override
-  public TSStatus clearCache(TClearCacheReq req) {
+  public TSStatus clearCache() {
     TSStatus status = confirmLeader();
     return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
-        ? RpcUtils.squashResponseStatusList(nodeManager.clearCache(req))
+        ? RpcUtils.squashResponseStatusList(nodeManager.clearCache())
         : status;
   }
 
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index b319714276..8412372bb4 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -38,13 +38,11 @@ import 
org.apache.iotdb.confignode.consensus.request.write.SetStorageGroupPlan;
 import org.apache.iotdb.confignode.consensus.request.write.SetTTLPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.SetTimePartitionIntervalPlan;
 import org.apache.iotdb.confignode.manager.load.LoadManager;
-import org.apache.iotdb.confignode.rpc.thrift.TClearCacheReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
-import org.apache.iotdb.confignode.rpc.thrift.TMergeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
@@ -278,11 +276,14 @@ public interface IManager {
 
   TSStatus dropFunction(String udfName);
 
-  TSStatus merge(TMergeReq req);
+  /** Merge on all DataNodes */
+  TSStatus merge();
 
+  /** Flush on all DataNodes */
   TSStatus flush(TFlushReq req);
 
-  TSStatus clearCache(TClearCacheReq req);
+  /** Clear cache on all DataNodes */
+  TSStatus clearCache();
 
   /**
    * Get the latest RegionRouteMap
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
index b3519d7eaa..31c4cadbd8 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
@@ -40,11 +40,9 @@ import org.apache.iotdb.confignode.manager.load.LoadManager;
 import org.apache.iotdb.confignode.manager.load.heartbeat.INodeCache;
 import org.apache.iotdb.confignode.persistence.NodeInfo;
 import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler;
-import org.apache.iotdb.confignode.rpc.thrift.TClearCacheReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
-import org.apache.iotdb.confignode.rpc.thrift.TMergeReq;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
@@ -60,7 +58,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
 
 /** NodeManager manages cluster node addition and removal requests */
 public class NodeManager {
@@ -370,15 +367,9 @@ public class NodeManager {
                 + ".");
   }
 
-  public List<TSStatus> merge(TMergeReq req) {
+  public List<TSStatus> merge() {
     Map<Integer, TDataNodeLocation> dataNodeLocationMap =
         configManager.getNodeManager().getRegisteredDataNodeLocations();
-    if (req.dataNodeId != -1) {
-      dataNodeLocationMap =
-          dataNodeLocationMap.entrySet().stream()
-              .filter((e) -> req.dataNodeId == e.getKey())
-              .collect(Collectors.toMap((e) -> e.getKey(), (e) -> 
e.getValue()));
-    }
     List<TSStatus> dataNodeResponseStatus =
         Collections.synchronizedList(new 
ArrayList<>(dataNodeLocationMap.size()));
     AsyncDataNodeClientPool.getInstance()
@@ -390,12 +381,6 @@ public class NodeManager {
   public List<TSStatus> flush(TFlushReq req) {
     Map<Integer, TDataNodeLocation> dataNodeLocationMap =
         configManager.getNodeManager().getRegisteredDataNodeLocations();
-    if (req.dataNodeId != -1) {
-      dataNodeLocationMap =
-          dataNodeLocationMap.entrySet().stream()
-              .filter((e) -> req.dataNodeId == e.getKey())
-              .collect(Collectors.toMap((e) -> e.getKey(), (e) -> 
e.getValue()));
-    }
     List<TSStatus> dataNodeResponseStatus =
         Collections.synchronizedList(new 
ArrayList<>(dataNodeLocationMap.size()));
     AsyncDataNodeClientPool.getInstance()
@@ -404,15 +389,9 @@ public class NodeManager {
     return dataNodeResponseStatus;
   }
 
-  public List<TSStatus> clearCache(TClearCacheReq req) {
+  public List<TSStatus> clearCache() {
     Map<Integer, TDataNodeLocation> dataNodeLocationMap =
         configManager.getNodeManager().getRegisteredDataNodeLocations();
-    if (req.dataNodeId != -1) {
-      dataNodeLocationMap =
-          dataNodeLocationMap.entrySet().stream()
-              .filter((e) -> req.dataNodeId == e.getKey())
-              .collect(Collectors.toMap((e) -> e.getKey(), (e) -> 
e.getValue()));
-    }
     List<TSStatus> dataNodeResponseStatus =
         Collections.synchronizedList(new 
ArrayList<>(dataNodeLocationMap.size()));
     AsyncDataNodeClientPool.getInstance()
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 235d97a943..e58ba05004 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -62,7 +62,6 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq;
 import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
 import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
-import org.apache.iotdb.confignode.rpc.thrift.TClearCacheReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
@@ -82,7 +81,6 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
 import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
-import org.apache.iotdb.confignode.rpc.thrift.TMergeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
@@ -506,8 +504,8 @@ public class ConfigNodeRPCServiceProcessor implements 
IConfigNodeRPCService.Ifac
   }
 
   @Override
-  public TSStatus merge(TMergeReq req) throws TException {
-    return configManager.merge(req);
+  public TSStatus merge() throws TException {
+    return configManager.merge();
   }
 
   @Override
@@ -527,8 +525,8 @@ public class ConfigNodeRPCServiceProcessor implements 
IConfigNodeRPCService.Ifac
   }
 
   @Override
-  public TSStatus clearCache(TClearCacheReq req) throws TException {
-    return configManager.clearCache(req);
+  public TSStatus clearCache() throws TException {
+    return configManager.clearCache();
   }
 
   @Override
diff --git a/docs/UserGuide/Maintenance-Tools/Maintenance-Command.md 
b/docs/UserGuide/Maintenance-Tools/Maintenance-Command.md
index bc9e56869b..53c3fced41 100644
--- a/docs/UserGuide/Maintenance-Tools/Maintenance-Command.md
+++ b/docs/UserGuide/Maintenance-Tools/Maintenance-Command.md
@@ -22,7 +22,7 @@
 # Maintenance Command
 ## FLUSH
 
-Persist all the data points in the memory table of the storage group to the 
disk, and seal the data file.In cluster mode, we provide commands to persist 
the specified storage group cache of a single node and persist the specified 
storage group cache of the cluster.
+Persist all the data points in the memory table of the storage group to the 
disk, and seal the data file. In cluster mode, we provide commands to persist 
the specified storage group cache of local node and persist the specified 
storage group cache of the cluster.
 
 Note: This command does not need to be invoked manually by the client. IoTDB 
has WAL to ensure data security
 and IoTDB will flush when appropriate.
@@ -48,7 +48,7 @@ Execute Level Compaction and unsequence Compaction task. 
Currently IoTDB support
 IoTDB> MERGE
 IoTDB> FULL MERGE
 ```
-At the same time, manually trigger the compaction process of data files are 
supported for individual nodes or the entire cluster in cluster mode:
+At the same time, manually trigger the compaction process of data files are 
supported for local node or the entire cluster in cluster mode:
 ```sql
 IoTDB> MERGE ON LOCAL
 IoTDB> MERGE ON CLUSTER
@@ -58,7 +58,7 @@ IoTDB> FULL MERGE ON CLUSTER
 
 ## CLEAR CACHE
 
-Clear the cache of chunk, chunk metadata and timeseries metadata to release 
the memory footprint. In cluster mode, we provide commands to clear a single 
node cache and clear the cluster cache.
+Clear the cache of chunk, chunk metadata and timeseries metadata to release 
the memory footprint. In cluster mode, we provide commands to clear local node 
cache and clear the cluster cache.
 
 ```sql
 IoTDB> CLEAR CACHE
diff --git a/docs/zh/UserGuide/Maintenance-Tools/Maintenance-Command.md 
b/docs/zh/UserGuide/Maintenance-Tools/Maintenance-Command.md
index 8a83f7cb68..3978695aa5 100644
--- a/docs/zh/UserGuide/Maintenance-Tools/Maintenance-Command.md
+++ b/docs/zh/UserGuide/Maintenance-Tools/Maintenance-Command.md
@@ -22,7 +22,7 @@
 
 ## FLUSH
 
-将指定存储组的内存缓存区 Memory Table 
的数据持久化到磁盘上,并将数据文件封口。在集群模式下,我们提供了持久化单个节点的指定存储组的缓存、持久化整个集群指定存储组的缓存命令。
+将指定存储组的内存缓存区 Memory Table 
的数据持久化到磁盘上,并将数据文件封口。在集群模式下,我们提供了持久化本节点的指定存储组的缓存、持久化整个集群指定存储组的缓存命令。
 
 注意:此命令客户端不需要手动调用,IoTDB 有 wal 保证数据安全,IoTDB 会选择合适的时机进行 flush。
 如果频繁调用 flush 会导致数据文件很小,降低查询性能。
@@ -47,7 +47,7 @@ IoTDB> FLUSH root.sg1,root.sg2 ON CLUSTER
 IoTDB> MERGE
 IoTDB> FULL MERGE
 ```
-同时,在集群模式中支持对单个节点或整个集群手动触发数据文件的合并:
+同时,在集群模式中支持对本节点或整个集群手动触发数据文件的合并:
 ```sql
 IoTDB> MERGE ON LOCAL
 IoTDB> MERGE ON CLUSTER
@@ -58,7 +58,7 @@ IoTDB> FULL MERGE ON CLUSTER
 ## CLEAR CACHE
 
 
-手动清除chunk, chunk metadata和timeseries 
metadata的缓存,在内存资源紧张时,可以通过此命令,释放查询时缓存所占的内存空间。在集群模式下,我们提供了清空单个节点缓存、清空整个集群缓存命令。
+手动清除chunk, chunk metadata和timeseries 
metadata的缓存,在内存资源紧张时,可以通过此命令,释放查询时缓存所占的内存空间。在集群模式下,我们提供了清空本节点缓存、清空整个集群缓存命令。
 
 ```sql
 IoTDB> CLEAR CACHE
diff --git 
a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java 
b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index a055b91265..f181ce7249 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -37,7 +37,6 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq;
 import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
 import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
-import org.apache.iotdb.confignode.rpc.thrift.TClearCacheReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
@@ -57,7 +56,6 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
 import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
-import org.apache.iotdb.confignode.rpc.thrift.TMergeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
@@ -751,10 +749,10 @@ public class ConfigNodeClient
   }
 
   @Override
-  public TSStatus merge(TMergeReq req) throws TException {
+  public TSStatus merge() throws TException {
     for (int i = 0; i < RETRY_NUM; i++) {
       try {
-        TSStatus status = client.merge(req);
+        TSStatus status = client.merge();
         if (!updateConfigNodeLeader(status)) {
           return status;
         }
@@ -783,10 +781,10 @@ public class ConfigNodeClient
   }
 
   @Override
-  public TSStatus clearCache(TClearCacheReq req) throws TException {
+  public TSStatus clearCache() throws TException {
     for (int i = 0; i < RETRY_NUM; i++) {
       try {
-        TSStatus status = client.clearCache(req);
+        TSStatus status = client.clearCache();
         if (!updateConfigNodeLeader(status)) {
           return status;
         }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 5e85c95f66..628be9326a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -26,13 +26,11 @@ import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.consensus.PartitionRegionId;
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.confignode.rpc.thrift.TClearCacheReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDeleteStorageGroupsReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
-import org.apache.iotdb.confignode.rpc.thrift.TMergeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowConfigNodesResp;
@@ -44,6 +42,7 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchemaResp;
 import org.apache.iotdb.db.client.ConfigNodeClient;
 import org.apache.iotdb.db.client.ConfigNodeInfo;
 import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
+import org.apache.iotdb.db.localconfignode.LocalConfigNode;
 import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
 import org.apache.iotdb.db.metadata.template.Template;
 import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
@@ -270,60 +269,70 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
   }
 
   @Override
-  public SettableFuture<ConfigTaskResult> merge(TMergeReq tMergeReq) {
+  public SettableFuture<ConfigTaskResult> merge(boolean isCluster) {
     SettableFuture<ConfigTaskResult> future = SettableFuture.create();
-    try (ConfigNodeClient client =
-        
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) {
-      // Send request to some API server
-      TSStatus tsStatus = client.merge(tMergeReq);
-      // Get response or throw exception
-      if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
-      } else {
-        future.setException(new StatementExecutionException(tsStatus));
+    TSStatus tsStatus = new TSStatus();
+    if (isCluster) {
+      try (ConfigNodeClient client =
+          
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+        // Send request to some API server
+        tsStatus = client.merge();
+      } catch (IOException | TException e) {
+        future.setException(e);
       }
-    } catch (IOException | TException e) {
-      future.setException(e);
+    } else {
+      tsStatus = LocalConfigNode.getInstance().executeMergeOperation();
+    }
+    if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+    } else {
+      future.setException(new StatementExecutionException(tsStatus));
     }
     return future;
   }
 
   @Override
-  public SettableFuture<ConfigTaskResult> flush(TFlushReq tFlushReq) {
+  public SettableFuture<ConfigTaskResult> flush(TFlushReq tFlushReq, boolean 
isCluster) {
     SettableFuture<ConfigTaskResult> future = SettableFuture.create();
-
-    try (ConfigNodeClient client =
-        
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) {
-      // Send request to some API server
-      TSStatus tsStatus = client.flush(tFlushReq);
-      // Get response or throw exception
-      if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
-      } else {
-        future.setException(new StatementExecutionException(tsStatus));
+    TSStatus tsStatus = new TSStatus();
+    if (isCluster) {
+      try (ConfigNodeClient client =
+          
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+        // Send request to some API server
+        tsStatus = client.flush(tFlushReq);
+      } catch (IOException | TException e) {
+        future.setException(e);
       }
-    } catch (IOException | TException e) {
-      future.setException(e);
+    } else {
+      tsStatus = 
LocalConfigNode.getInstance().executeFlushOperation(tFlushReq);
+    }
+    if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+    } else {
+      future.setException(new StatementExecutionException(tsStatus));
     }
     return future;
   }
 
   @Override
-  public SettableFuture<ConfigTaskResult> clearCache(TClearCacheReq 
tClearCacheReq) {
+  public SettableFuture<ConfigTaskResult> clearCache(boolean isCluster) {
     SettableFuture<ConfigTaskResult> future = SettableFuture.create();
-
-    try (ConfigNodeClient client =
-        
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) {
-      // Send request to some API server
-      TSStatus tsStatus = client.clearCache(tClearCacheReq);
-      // Get response or throw exception
-      if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
-      } else {
-        future.setException(new StatementExecutionException(tsStatus));
+    TSStatus tsStatus = new TSStatus();
+    if (isCluster) {
+      try (ConfigNodeClient client =
+          
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+        // Send request to some API server
+        tsStatus = client.clearCache();
+      } catch (IOException | TException e) {
+        future.setException(e);
       }
-    } catch (IOException | TException e) {
-      future.setException(e);
+    } else {
+      tsStatus = LocalConfigNode.getInstance().executeClearCacheOperation();
+    }
+    if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+    } else {
+      future.setException(new StatementExecutionException(tsStatus));
     }
     return future;
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
index 5e13984c90..c7a80f6f21 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
@@ -20,8 +20,6 @@
 package org.apache.iotdb.db.mpp.plan.execution.config.executor;
 
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
-import org.apache.iotdb.confignode.rpc.thrift.TClearCacheReq;
-import org.apache.iotdb.confignode.rpc.thrift.TMergeReq;
 import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
 import 
org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement;
 import 
org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteStorageGroupStatement;
@@ -62,11 +60,11 @@ public interface IConfigTaskExecutor {
 
   SettableFuture<ConfigTaskResult> setTTL(SetTTLStatement setTTLStatement, 
String taskName);
 
-  SettableFuture<ConfigTaskResult> merge(TMergeReq mergeReq);
+  SettableFuture<ConfigTaskResult> merge(boolean isCluster);
 
-  SettableFuture<ConfigTaskResult> flush(TFlushReq tFlushReq);
+  SettableFuture<ConfigTaskResult> flush(TFlushReq tFlushReq, boolean 
isCluster);
 
-  SettableFuture<ConfigTaskResult> clearCache(TClearCacheReq tClearCacheReq);
+  SettableFuture<ConfigTaskResult> clearCache(boolean isCluster);
 
   SettableFuture<ConfigTaskResult> showCluster();
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
index 4afdb8c795..a0769c5270 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
@@ -26,8 +26,6 @@ import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
 import org.apache.iotdb.commons.udf.service.UDFRegistrationService;
-import org.apache.iotdb.confignode.rpc.thrift.TClearCacheReq;
-import org.apache.iotdb.confignode.rpc.thrift.TMergeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
 import org.apache.iotdb.db.localconfignode.LocalConfigNode;
 import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
@@ -225,10 +223,9 @@ public class StandaloneConfigTaskExecutor implements 
IConfigTaskExecutor {
   }
 
   @Override
-  public SettableFuture<ConfigTaskResult> merge(TMergeReq mergeReq) {
+  public SettableFuture<ConfigTaskResult> merge(boolean isCluster) {
     SettableFuture<ConfigTaskResult> future = SettableFuture.create();
-    LocalConfigNode localConfigNode = LocalConfigNode.getInstance();
-    TSStatus tsStatus = localConfigNode.executeMergeOperation();
+    TSStatus tsStatus = LocalConfigNode.getInstance().executeMergeOperation();
     if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
     } else {
@@ -238,10 +235,9 @@ public class StandaloneConfigTaskExecutor implements 
IConfigTaskExecutor {
   }
 
   @Override
-  public SettableFuture<ConfigTaskResult> flush(TFlushReq tFlushReq) {
+  public SettableFuture<ConfigTaskResult> flush(TFlushReq tFlushReq, boolean 
isCluster) {
     SettableFuture<ConfigTaskResult> future = SettableFuture.create();
-    LocalConfigNode localConfigNode = LocalConfigNode.getInstance();
-    TSStatus tsStatus = localConfigNode.executeFlushOperation(tFlushReq);
+    TSStatus tsStatus = 
LocalConfigNode.getInstance().executeFlushOperation(tFlushReq);
     if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
     } else {
@@ -251,10 +247,9 @@ public class StandaloneConfigTaskExecutor implements 
IConfigTaskExecutor {
   }
 
   @Override
-  public SettableFuture<ConfigTaskResult> clearCache(TClearCacheReq 
tclearCacheReq) {
+  public SettableFuture<ConfigTaskResult> clearCache(boolean isCluster) {
     SettableFuture<ConfigTaskResult> future = SettableFuture.create();
-    LocalConfigNode localConfigNode = LocalConfigNode.getInstance();
-    TSStatus tsStatus = localConfigNode.executeClearCacheOperation();
+    TSStatus tsStatus = 
LocalConfigNode.getInstance().executeClearCacheOperation();
     if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
     } else {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/ClearCacheTask.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/ClearCacheTask.java
index 8d735a2f71..bcfb055a7d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/ClearCacheTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/ClearCacheTask.java
@@ -19,9 +19,6 @@
 
 package org.apache.iotdb.db.mpp.plan.execution.config.sys;
 
-import org.apache.iotdb.confignode.rpc.thrift.TClearCacheReq;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
 import org.apache.iotdb.db.mpp.plan.execution.config.IConfigTask;
 import 
org.apache.iotdb.db.mpp.plan.execution.config.executor.IConfigTaskExecutor;
@@ -40,16 +37,8 @@ public class ClearCacheTask implements IConfigTask {
   @Override
   public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor 
configTaskExecutor)
       throws InterruptedException {
-    TClearCacheReq tClearCacheReq = new TClearCacheReq();
-
-    IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-    if (clearCacheStatement.isCluster()) {
-      tClearCacheReq.setDataNodeId(-1);
-    } else {
-      tClearCacheReq.setDataNodeId(config.getDataNodeId());
-    }
     // If the action is executed successfully, return the Future.
     // If your operation is async, you can return the corresponding future 
directly.
-    return configTaskExecutor.clearCache(tClearCacheReq);
+    return configTaskExecutor.clearCache(clearCacheStatement.isCluster());
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/FlushTask.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/FlushTask.java
index 4309d1bdbf..dfd9a9b943 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/FlushTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/FlushTask.java
@@ -21,8 +21,6 @@ package org.apache.iotdb.db.mpp.plan.execution.config.sys;
 
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
 import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
 import org.apache.iotdb.db.mpp.plan.execution.config.IConfigTask;
 import 
org.apache.iotdb.db.mpp.plan.execution.config.executor.IConfigTaskExecutor;
@@ -55,14 +53,8 @@ public class FlushTask implements IConfigTask {
     if (flushStatement.isSeq() != null) {
       tFlushReq.setIsSeq(flushStatement.isSeq().toString());
     }
-    IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-    if (flushStatement.isCluster()) {
-      tFlushReq.setDataNodeId(-1);
-    } else {
-      tFlushReq.setDataNodeId(config.getDataNodeId());
-    }
     // If the action is executed successfully, return the Future.
     // If your operation is async, you can return the corresponding future 
directly.
-    return configTaskExecutor.flush(tFlushReq);
+    return configTaskExecutor.flush(tFlushReq, flushStatement.isCluster());
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/MergeTask.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/MergeTask.java
index 05e2695e95..5c8dd77f4c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/MergeTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/MergeTask.java
@@ -19,9 +19,6 @@
 
 package org.apache.iotdb.db.mpp.plan.execution.config.sys;
 
-import org.apache.iotdb.confignode.rpc.thrift.TMergeReq;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
 import org.apache.iotdb.db.mpp.plan.execution.config.IConfigTask;
 import 
org.apache.iotdb.db.mpp.plan.execution.config.executor.IConfigTaskExecutor;
@@ -40,13 +37,6 @@ public class MergeTask implements IConfigTask {
   @Override
   public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor 
configTaskExecutor)
       throws InterruptedException {
-    TMergeReq mergeReq = new TMergeReq();
-    IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-    if (mergeStatement.isCluster()) {
-      mergeReq.setDataNodeId(-1);
-    } else {
-      mergeReq.setDataNodeId(config.getDataNodeId());
-    }
-    return configTaskExecutor.merge(mergeReq);
+    return configTaskExecutor.merge(mergeStatement.isCluster());
   }
 }
diff --git a/thrift-commons/src/main/thrift/common.thrift 
b/thrift-commons/src/main/thrift/common.thrift
index 049d3c5269..9bfb1ada5d 100644
--- a/thrift-commons/src/main/thrift/common.thrift
+++ b/thrift-commons/src/main/thrift/common.thrift
@@ -99,7 +99,6 @@ enum TRegionMigrateFailedType {
 struct TFlushReq {
    1: optional string isSeq
    2: optional list<string> storageGroups
-   3: optional i32 dataNodeId
 }
 
 // for node management
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift 
b/thrift-confignode/src/main/thrift/confignode.thrift
index 52de985075..5824eaf8ce 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -338,15 +338,6 @@ struct TGetPathsSetTemplatesResp {
   2: optional list<string> pathList
 }
 
-// Maintenance Tools
-struct TMergeReq {
-  1: optional i32 dataNodeId
-}
-
-struct TClearCacheReq {
-   1: optional i32 dataNodeId
-}
-
 service IConfigNodeRPCService {
 
   // ======================================================
@@ -606,13 +597,13 @@ service IConfigNodeRPCService {
   // ======================================================
 
   /** Execute Level Compaction and unsequence Compaction task on all DataNodes 
*/
-  common.TSStatus merge(TMergeReq req)
+  common.TSStatus merge()
 
   /** Persist all the data points in the memory table of the storage group to 
the disk, and seal the data file on all DataNodes */
   common.TSStatus flush(common.TFlushReq req)
 
   /** Clear the cache of chunk, chunk metadata and timeseries metadata to 
release the memory footprint on all DataNodes */
-  common.TSStatus clearCache(TClearCacheReq req)
+  common.TSStatus clearCache()
 
   // ======================================================
   // Cluster Tools

Reply via email to