This is an automated email from the ASF dual-hosted git repository.
neuyilan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c046fd3c93 [IOTDB-4109] Optimize merge、flush、clear cache operation
(#6956)
c046fd3c93 is described below
commit c046fd3c936a1dd4d8c164f4e331ef6364115960
Author: 任宇华 <[email protected]>
AuthorDate: Fri Aug 12 11:41:54 2022 +0800
[IOTDB-4109] Optimize merge、flush、clear cache operation (#6956)
* Optimize merge、flush、clear cache operation
* fix bug
Co-authored-by: renyuhua <[email protected]>
---
.../iotdb/confignode/manager/ConfigManager.java | 10 +--
.../apache/iotdb/confignode/manager/IManager.java | 9 ++-
.../iotdb/confignode/manager/NodeManager.java | 25 +-----
.../thrift/ConfigNodeRPCServiceProcessor.java | 10 +--
.../Maintenance-Tools/Maintenance-Command.md | 6 +-
.../Maintenance-Tools/Maintenance-Command.md | 6 +-
.../apache/iotdb/db/client/ConfigNodeClient.java | 10 +--
.../config/executor/ClusterConfigTaskExecutor.java | 89 ++++++++++++----------
.../config/executor/IConfigTaskExecutor.java | 8 +-
.../executor/StandaloneConfigTaskExecutor.java | 17 ++---
.../plan/execution/config/sys/ClearCacheTask.java | 13 +---
.../mpp/plan/execution/config/sys/FlushTask.java | 10 +--
.../mpp/plan/execution/config/sys/MergeTask.java | 12 +--
thrift-commons/src/main/thrift/common.thrift | 1 -
.../src/main/thrift/confignode.thrift | 13 +---
15 files changed, 88 insertions(+), 151 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 89ed6935c5..4bb53f4f3c 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -75,7 +75,6 @@ import org.apache.iotdb.confignode.persistence.UDFInfo;
import org.apache.iotdb.confignode.persistence.executor.ConfigPlanExecutor;
import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
import org.apache.iotdb.confignode.persistence.schema.ClusterSchemaInfo;
-import org.apache.iotdb.confignode.rpc.thrift.TClearCacheReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo;
@@ -84,7 +83,6 @@ import
org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
-import org.apache.iotdb.confignode.rpc.thrift.TMergeReq;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo;
import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
@@ -809,10 +807,10 @@ public class ConfigManager implements IManager {
}
@Override
- public TSStatus merge(TMergeReq req) {
+ public TSStatus merge() {
TSStatus status = confirmLeader();
return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
- ? RpcUtils.squashResponseStatusList(nodeManager.merge(req))
+ ? RpcUtils.squashResponseStatusList(nodeManager.merge())
: status;
}
@@ -825,10 +823,10 @@ public class ConfigManager implements IManager {
}
@Override
- public TSStatus clearCache(TClearCacheReq req) {
+ public TSStatus clearCache() {
TSStatus status = confirmLeader();
return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
- ? RpcUtils.squashResponseStatusList(nodeManager.clearCache(req))
+ ? RpcUtils.squashResponseStatusList(nodeManager.clearCache())
: status;
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index b319714276..8412372bb4 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -38,13 +38,11 @@ import
org.apache.iotdb.confignode.consensus.request.write.SetStorageGroupPlan;
import org.apache.iotdb.confignode.consensus.request.write.SetTTLPlan;
import
org.apache.iotdb.confignode.consensus.request.write.SetTimePartitionIntervalPlan;
import org.apache.iotdb.confignode.manager.load.LoadManager;
-import org.apache.iotdb.confignode.rpc.thrift.TClearCacheReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
-import org.apache.iotdb.confignode.rpc.thrift.TMergeReq;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
@@ -278,11 +276,14 @@ public interface IManager {
TSStatus dropFunction(String udfName);
- TSStatus merge(TMergeReq req);
+ /** Merge on all DataNodes */
+ TSStatus merge();
+ /** Flush on all DataNodes */
TSStatus flush(TFlushReq req);
- TSStatus clearCache(TClearCacheReq req);
+ /** Clear cache on all DataNodes */
+ TSStatus clearCache();
/**
* Get the latest RegionRouteMap
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
index b3519d7eaa..31c4cadbd8 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
@@ -40,11 +40,9 @@ import org.apache.iotdb.confignode.manager.load.LoadManager;
import org.apache.iotdb.confignode.manager.load.heartbeat.INodeCache;
import org.apache.iotdb.confignode.persistence.NodeInfo;
import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler;
-import org.apache.iotdb.confignode.rpc.thrift.TClearCacheReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
-import org.apache.iotdb.confignode.rpc.thrift.TMergeReq;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
@@ -60,7 +58,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
/** NodeManager manages cluster node addition and removal requests */
public class NodeManager {
@@ -370,15 +367,9 @@ public class NodeManager {
+ ".");
}
- public List<TSStatus> merge(TMergeReq req) {
+ public List<TSStatus> merge() {
Map<Integer, TDataNodeLocation> dataNodeLocationMap =
configManager.getNodeManager().getRegisteredDataNodeLocations();
- if (req.dataNodeId != -1) {
- dataNodeLocationMap =
- dataNodeLocationMap.entrySet().stream()
- .filter((e) -> req.dataNodeId == e.getKey())
- .collect(Collectors.toMap((e) -> e.getKey(), (e) ->
e.getValue()));
- }
List<TSStatus> dataNodeResponseStatus =
Collections.synchronizedList(new
ArrayList<>(dataNodeLocationMap.size()));
AsyncDataNodeClientPool.getInstance()
@@ -390,12 +381,6 @@ public class NodeManager {
public List<TSStatus> flush(TFlushReq req) {
Map<Integer, TDataNodeLocation> dataNodeLocationMap =
configManager.getNodeManager().getRegisteredDataNodeLocations();
- if (req.dataNodeId != -1) {
- dataNodeLocationMap =
- dataNodeLocationMap.entrySet().stream()
- .filter((e) -> req.dataNodeId == e.getKey())
- .collect(Collectors.toMap((e) -> e.getKey(), (e) ->
e.getValue()));
- }
List<TSStatus> dataNodeResponseStatus =
Collections.synchronizedList(new
ArrayList<>(dataNodeLocationMap.size()));
AsyncDataNodeClientPool.getInstance()
@@ -404,15 +389,9 @@ public class NodeManager {
return dataNodeResponseStatus;
}
- public List<TSStatus> clearCache(TClearCacheReq req) {
+ public List<TSStatus> clearCache() {
Map<Integer, TDataNodeLocation> dataNodeLocationMap =
configManager.getNodeManager().getRegisteredDataNodeLocations();
- if (req.dataNodeId != -1) {
- dataNodeLocationMap =
- dataNodeLocationMap.entrySet().stream()
- .filter((e) -> req.dataNodeId == e.getKey())
- .collect(Collectors.toMap((e) -> e.getKey(), (e) ->
e.getValue()));
- }
List<TSStatus> dataNodeResponseStatus =
Collections.synchronizedList(new
ArrayList<>(dataNodeLocationMap.size()));
AsyncDataNodeClientPool.getInstance()
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 235d97a943..e58ba05004 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -62,7 +62,6 @@ import
org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
-import org.apache.iotdb.confignode.rpc.thrift.TClearCacheReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
@@ -82,7 +81,6 @@ import
org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
-import org.apache.iotdb.confignode.rpc.thrift.TMergeReq;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
@@ -506,8 +504,8 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
}
@Override
- public TSStatus merge(TMergeReq req) throws TException {
- return configManager.merge(req);
+ public TSStatus merge() throws TException {
+ return configManager.merge();
}
@Override
@@ -527,8 +525,8 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
}
@Override
- public TSStatus clearCache(TClearCacheReq req) throws TException {
- return configManager.clearCache(req);
+ public TSStatus clearCache() throws TException {
+ return configManager.clearCache();
}
@Override
diff --git a/docs/UserGuide/Maintenance-Tools/Maintenance-Command.md
b/docs/UserGuide/Maintenance-Tools/Maintenance-Command.md
index bc9e56869b..53c3fced41 100644
--- a/docs/UserGuide/Maintenance-Tools/Maintenance-Command.md
+++ b/docs/UserGuide/Maintenance-Tools/Maintenance-Command.md
@@ -22,7 +22,7 @@
# Maintenance Command
## FLUSH
-Persist all the data points in the memory table of the storage group to the
disk, and seal the data file.In cluster mode, we provide commands to persist
the specified storage group cache of a single node and persist the specified
storage group cache of the cluster.
+Persist all the data points in the memory table of the storage group to the
disk, and seal the data file. In cluster mode, we provide commands to persist
the specified storage group cache of local node and persist the specified
storage group cache of the cluster.
Note: This command does not need to be invoked manually by the client. IoTDB
has WAL to ensure data security
and IoTDB will flush when appropriate.
@@ -48,7 +48,7 @@ Execute Level Compaction and unsequence Compaction task.
Currently IoTDB support
IoTDB> MERGE
IoTDB> FULL MERGE
```
-At the same time, manually trigger the compaction process of data files are
supported for individual nodes or the entire cluster in cluster mode:
+At the same time, manually trigger the compaction process of data files are
supported for local node or the entire cluster in cluster mode:
```sql
IoTDB> MERGE ON LOCAL
IoTDB> MERGE ON CLUSTER
@@ -58,7 +58,7 @@ IoTDB> FULL MERGE ON CLUSTER
## CLEAR CACHE
-Clear the cache of chunk, chunk metadata and timeseries metadata to release
the memory footprint. In cluster mode, we provide commands to clear a single
node cache and clear the cluster cache.
+Clear the cache of chunk, chunk metadata and timeseries metadata to release
the memory footprint. In cluster mode, we provide commands to clear local node
cache and clear the cluster cache.
```sql
IoTDB> CLEAR CACHE
diff --git a/docs/zh/UserGuide/Maintenance-Tools/Maintenance-Command.md
b/docs/zh/UserGuide/Maintenance-Tools/Maintenance-Command.md
index 8a83f7cb68..3978695aa5 100644
--- a/docs/zh/UserGuide/Maintenance-Tools/Maintenance-Command.md
+++ b/docs/zh/UserGuide/Maintenance-Tools/Maintenance-Command.md
@@ -22,7 +22,7 @@
## FLUSH
-将指定存储组的内存缓存区 Memory Table
的数据持久化到磁盘上,并将数据文件封口。在集群模式下,我们提供了持久化单个节点的指定存储组的缓存、持久化整个集群指定存储组的缓存命令。
+将指定存储组的内存缓存区 Memory Table
的数据持久化到磁盘上,并将数据文件封口。在集群模式下,我们提供了持久化本节点的指定存储组的缓存、持久化整个集群指定存储组的缓存命令。
注意:此命令客户端不需要手动调用,IoTDB 有 wal 保证数据安全,IoTDB 会选择合适的时机进行 flush。
如果频繁调用 flush 会导致数据文件很小,降低查询性能。
@@ -47,7 +47,7 @@ IoTDB> FLUSH root.sg1,root.sg2 ON CLUSTER
IoTDB> MERGE
IoTDB> FULL MERGE
```
-同时,在集群模式中支持对单个节点或整个集群手动触发数据文件的合并:
+同时,在集群模式中支持对本节点或整个集群手动触发数据文件的合并:
```sql
IoTDB> MERGE ON LOCAL
IoTDB> MERGE ON CLUSTER
@@ -58,7 +58,7 @@ IoTDB> FULL MERGE ON CLUSTER
## CLEAR CACHE
-手动清除chunk, chunk metadata和timeseries
metadata的缓存,在内存资源紧张时,可以通过此命令,释放查询时缓存所占的内存空间。在集群模式下,我们提供了清空单个节点缓存、清空整个集群缓存命令。
+手动清除chunk, chunk metadata和timeseries
metadata的缓存,在内存资源紧张时,可以通过此命令,释放查询时缓存所占的内存空间。在集群模式下,我们提供了清空本节点缓存、清空整个集群缓存命令。
```sql
IoTDB> CLEAR CACHE
diff --git
a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index a055b91265..f181ce7249 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -37,7 +37,6 @@ import
org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
-import org.apache.iotdb.confignode.rpc.thrift.TClearCacheReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
@@ -57,7 +56,6 @@ import
org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
-import org.apache.iotdb.confignode.rpc.thrift.TMergeReq;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
@@ -751,10 +749,10 @@ public class ConfigNodeClient
}
@Override
- public TSStatus merge(TMergeReq req) throws TException {
+ public TSStatus merge() throws TException {
for (int i = 0; i < RETRY_NUM; i++) {
try {
- TSStatus status = client.merge(req);
+ TSStatus status = client.merge();
if (!updateConfigNodeLeader(status)) {
return status;
}
@@ -783,10 +781,10 @@ public class ConfigNodeClient
}
@Override
- public TSStatus clearCache(TClearCacheReq req) throws TException {
+ public TSStatus clearCache() throws TException {
for (int i = 0; i < RETRY_NUM; i++) {
try {
- TSStatus status = client.clearCache(req);
+ TSStatus status = client.clearCache();
if (!updateConfigNodeLeader(status)) {
return status;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 5e85c95f66..628be9326a 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -26,13 +26,11 @@ import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.consensus.PartitionRegionId;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.confignode.rpc.thrift.TClearCacheReq;
import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDeleteStorageGroupsReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
-import org.apache.iotdb.confignode.rpc.thrift.TMergeReq;
import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowConfigNodesResp;
@@ -44,6 +42,7 @@ import
org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchemaResp;
import org.apache.iotdb.db.client.ConfigNodeClient;
import org.apache.iotdb.db.client.ConfigNodeInfo;
import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
+import org.apache.iotdb.db.localconfignode.LocalConfigNode;
import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
@@ -270,60 +269,70 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
}
@Override
- public SettableFuture<ConfigTaskResult> merge(TMergeReq tMergeReq) {
+ public SettableFuture<ConfigTaskResult> merge(boolean isCluster) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
- try (ConfigNodeClient client =
-
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) {
- // Send request to some API server
- TSStatus tsStatus = client.merge(tMergeReq);
- // Get response or throw exception
- if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
- } else {
- future.setException(new StatementExecutionException(tsStatus));
+ TSStatus tsStatus = new TSStatus();
+ if (isCluster) {
+ try (ConfigNodeClient client =
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+ // Send request to some API server
+ tsStatus = client.merge();
+ } catch (IOException | TException e) {
+ future.setException(e);
}
- } catch (IOException | TException e) {
- future.setException(e);
+ } else {
+ tsStatus = LocalConfigNode.getInstance().executeMergeOperation();
+ }
+ if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ } else {
+ future.setException(new StatementExecutionException(tsStatus));
}
return future;
}
@Override
- public SettableFuture<ConfigTaskResult> flush(TFlushReq tFlushReq) {
+ public SettableFuture<ConfigTaskResult> flush(TFlushReq tFlushReq, boolean
isCluster) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
-
- try (ConfigNodeClient client =
-
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) {
- // Send request to some API server
- TSStatus tsStatus = client.flush(tFlushReq);
- // Get response or throw exception
- if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
- } else {
- future.setException(new StatementExecutionException(tsStatus));
+ TSStatus tsStatus = new TSStatus();
+ if (isCluster) {
+ try (ConfigNodeClient client =
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+ // Send request to some API server
+ tsStatus = client.flush(tFlushReq);
+ } catch (IOException | TException e) {
+ future.setException(e);
}
- } catch (IOException | TException e) {
- future.setException(e);
+ } else {
+ tsStatus =
LocalConfigNode.getInstance().executeFlushOperation(tFlushReq);
+ }
+ if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ } else {
+ future.setException(new StatementExecutionException(tsStatus));
}
return future;
}
@Override
- public SettableFuture<ConfigTaskResult> clearCache(TClearCacheReq
tClearCacheReq) {
+ public SettableFuture<ConfigTaskResult> clearCache(boolean isCluster) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
-
- try (ConfigNodeClient client =
-
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) {
- // Send request to some API server
- TSStatus tsStatus = client.clearCache(tClearCacheReq);
- // Get response or throw exception
- if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
- } else {
- future.setException(new StatementExecutionException(tsStatus));
+ TSStatus tsStatus = new TSStatus();
+ if (isCluster) {
+ try (ConfigNodeClient client =
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+ // Send request to some API server
+ tsStatus = client.clearCache();
+ } catch (IOException | TException e) {
+ future.setException(e);
}
- } catch (IOException | TException e) {
- future.setException(e);
+ } else {
+ tsStatus = LocalConfigNode.getInstance().executeClearCacheOperation();
+ }
+ if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ } else {
+ future.setException(new StatementExecutionException(tsStatus));
}
return future;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
index 5e13984c90..c7a80f6f21 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
@@ -20,8 +20,6 @@
package org.apache.iotdb.db.mpp.plan.execution.config.executor;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
-import org.apache.iotdb.confignode.rpc.thrift.TClearCacheReq;
-import org.apache.iotdb.confignode.rpc.thrift.TMergeReq;
import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
import
org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement;
import
org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteStorageGroupStatement;
@@ -62,11 +60,11 @@ public interface IConfigTaskExecutor {
SettableFuture<ConfigTaskResult> setTTL(SetTTLStatement setTTLStatement,
String taskName);
- SettableFuture<ConfigTaskResult> merge(TMergeReq mergeReq);
+ SettableFuture<ConfigTaskResult> merge(boolean isCluster);
- SettableFuture<ConfigTaskResult> flush(TFlushReq tFlushReq);
+ SettableFuture<ConfigTaskResult> flush(TFlushReq tFlushReq, boolean
isCluster);
- SettableFuture<ConfigTaskResult> clearCache(TClearCacheReq tClearCacheReq);
+ SettableFuture<ConfigTaskResult> clearCache(boolean isCluster);
SettableFuture<ConfigTaskResult> showCluster();
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
index 4afdb8c795..a0769c5270 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
@@ -26,8 +26,6 @@ import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
import org.apache.iotdb.commons.udf.service.UDFRegistrationService;
-import org.apache.iotdb.confignode.rpc.thrift.TClearCacheReq;
-import org.apache.iotdb.confignode.rpc.thrift.TMergeReq;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
import org.apache.iotdb.db.localconfignode.LocalConfigNode;
import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
@@ -225,10 +223,9 @@ public class StandaloneConfigTaskExecutor implements
IConfigTaskExecutor {
}
@Override
- public SettableFuture<ConfigTaskResult> merge(TMergeReq mergeReq) {
+ public SettableFuture<ConfigTaskResult> merge(boolean isCluster) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
- LocalConfigNode localConfigNode = LocalConfigNode.getInstance();
- TSStatus tsStatus = localConfigNode.executeMergeOperation();
+ TSStatus tsStatus = LocalConfigNode.getInstance().executeMergeOperation();
if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
} else {
@@ -238,10 +235,9 @@ public class StandaloneConfigTaskExecutor implements
IConfigTaskExecutor {
}
@Override
- public SettableFuture<ConfigTaskResult> flush(TFlushReq tFlushReq) {
+ public SettableFuture<ConfigTaskResult> flush(TFlushReq tFlushReq, boolean
isCluster) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
- LocalConfigNode localConfigNode = LocalConfigNode.getInstance();
- TSStatus tsStatus = localConfigNode.executeFlushOperation(tFlushReq);
+ TSStatus tsStatus =
LocalConfigNode.getInstance().executeFlushOperation(tFlushReq);
if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
} else {
@@ -251,10 +247,9 @@ public class StandaloneConfigTaskExecutor implements
IConfigTaskExecutor {
}
@Override
- public SettableFuture<ConfigTaskResult> clearCache(TClearCacheReq
tclearCacheReq) {
+ public SettableFuture<ConfigTaskResult> clearCache(boolean isCluster) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
- LocalConfigNode localConfigNode = LocalConfigNode.getInstance();
- TSStatus tsStatus = localConfigNode.executeClearCacheOperation();
+ TSStatus tsStatus =
LocalConfigNode.getInstance().executeClearCacheOperation();
if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
} else {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/ClearCacheTask.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/ClearCacheTask.java
index 8d735a2f71..bcfb055a7d 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/ClearCacheTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/ClearCacheTask.java
@@ -19,9 +19,6 @@
package org.apache.iotdb.db.mpp.plan.execution.config.sys;
-import org.apache.iotdb.confignode.rpc.thrift.TClearCacheReq;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
import org.apache.iotdb.db.mpp.plan.execution.config.IConfigTask;
import
org.apache.iotdb.db.mpp.plan.execution.config.executor.IConfigTaskExecutor;
@@ -40,16 +37,8 @@ public class ClearCacheTask implements IConfigTask {
@Override
public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor
configTaskExecutor)
throws InterruptedException {
- TClearCacheReq tClearCacheReq = new TClearCacheReq();
-
- IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- if (clearCacheStatement.isCluster()) {
- tClearCacheReq.setDataNodeId(-1);
- } else {
- tClearCacheReq.setDataNodeId(config.getDataNodeId());
- }
// If the action is executed successfully, return the Future.
// If your operation is async, you can return the corresponding future
directly.
- return configTaskExecutor.clearCache(tClearCacheReq);
+ return configTaskExecutor.clearCache(clearCacheStatement.isCluster());
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/FlushTask.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/FlushTask.java
index 4309d1bdbf..dfd9a9b943 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/FlushTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/FlushTask.java
@@ -21,8 +21,6 @@ package org.apache.iotdb.db.mpp.plan.execution.config.sys;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
import org.apache.iotdb.db.mpp.plan.execution.config.IConfigTask;
import
org.apache.iotdb.db.mpp.plan.execution.config.executor.IConfigTaskExecutor;
@@ -55,14 +53,8 @@ public class FlushTask implements IConfigTask {
if (flushStatement.isSeq() != null) {
tFlushReq.setIsSeq(flushStatement.isSeq().toString());
}
- IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- if (flushStatement.isCluster()) {
- tFlushReq.setDataNodeId(-1);
- } else {
- tFlushReq.setDataNodeId(config.getDataNodeId());
- }
// If the action is executed successfully, return the Future.
// If your operation is async, you can return the corresponding future
directly.
- return configTaskExecutor.flush(tFlushReq);
+ return configTaskExecutor.flush(tFlushReq, flushStatement.isCluster());
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/MergeTask.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/MergeTask.java
index 05e2695e95..5c8dd77f4c 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/MergeTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/MergeTask.java
@@ -19,9 +19,6 @@
package org.apache.iotdb.db.mpp.plan.execution.config.sys;
-import org.apache.iotdb.confignode.rpc.thrift.TMergeReq;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
import org.apache.iotdb.db.mpp.plan.execution.config.IConfigTask;
import
org.apache.iotdb.db.mpp.plan.execution.config.executor.IConfigTaskExecutor;
@@ -40,13 +37,6 @@ public class MergeTask implements IConfigTask {
@Override
public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor
configTaskExecutor)
throws InterruptedException {
- TMergeReq mergeReq = new TMergeReq();
- IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- if (mergeStatement.isCluster()) {
- mergeReq.setDataNodeId(-1);
- } else {
- mergeReq.setDataNodeId(config.getDataNodeId());
- }
- return configTaskExecutor.merge(mergeReq);
+ return configTaskExecutor.merge(mergeStatement.isCluster());
}
}
diff --git a/thrift-commons/src/main/thrift/common.thrift
b/thrift-commons/src/main/thrift/common.thrift
index 049d3c5269..9bfb1ada5d 100644
--- a/thrift-commons/src/main/thrift/common.thrift
+++ b/thrift-commons/src/main/thrift/common.thrift
@@ -99,7 +99,6 @@ enum TRegionMigrateFailedType {
struct TFlushReq {
1: optional string isSeq
2: optional list<string> storageGroups
- 3: optional i32 dataNodeId
}
// for node management
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift
b/thrift-confignode/src/main/thrift/confignode.thrift
index 52de985075..5824eaf8ce 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -338,15 +338,6 @@ struct TGetPathsSetTemplatesResp {
2: optional list<string> pathList
}
-// Maintenance Tools
-struct TMergeReq {
- 1: optional i32 dataNodeId
-}
-
-struct TClearCacheReq {
- 1: optional i32 dataNodeId
-}
-
service IConfigNodeRPCService {
// ======================================================
@@ -606,13 +597,13 @@ service IConfigNodeRPCService {
// ======================================================
/** Execute Level Compaction and unsequence Compaction task on all DataNodes
*/
- common.TSStatus merge(TMergeReq req)
+ common.TSStatus merge()
/** Persist all the data points in the memory table of the storage group to
the disk, and seal the data file on all DataNodes */
common.TSStatus flush(common.TFlushReq req)
/** Clear the cache of chunk, chunk metadata and timeseries metadata to
release the memory footprint on all DataNodes */
- common.TSStatus clearCache(TClearCacheReq req)
+ common.TSStatus clearCache()
// ======================================================
// Cluster Tools