This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c05ac0ff0b [IOTDB-3099] Flush in new cluster (#6167)
c05ac0ff0b is described below
commit c05ac0ff0b5e1278c36ea7ca3d23cc24f663be5e
Author: 任宇华 <[email protected]>
AuthorDate: Mon Jun 13 14:59:06 2022 +0800
[IOTDB-3099] Flush in new cluster (#6167)
---
.../org/apache/iotdb/db/qp/sql/IdentifierParser.g4 | 2 +
.../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 2 +-
.../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 | 4 +
.../confignode/client/SyncDataNodeClientPool.java | 31 ++++++
.../iotdb/confignode/manager/ConfigManager.java | 26 ++++-
.../apache/iotdb/confignode/manager/Manager.java | 8 +-
.../confignode/manager/PermissionManager.java | 36 ++-----
.../thrift/ConfigNodeRPCServiceProcessor.java | 31 ++++++
.../apache/iotdb/db/client/ConfigNodeClient.java | 17 ++++
.../apache/iotdb/db/engine/StorageEngineV2.java | 39 ++++++++
.../plan/execution/config/ConfigTaskVisitor.java | 6 ++
.../db/mpp/plan/execution/config/FlushTask.java | 95 ++++++++++++++++++
.../iotdb/db/mpp/plan/parser/ASTVisitor.java | 30 ++++++
.../db/mpp/plan/statement/StatementVisitor.java | 5 +
.../db/mpp/plan/statement/sys/FlushStatement.java | 106 +++++++++++++++++++++
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 2 +-
.../service/thrift/impl/InternalServiceImpl.java | 6 ++
thrift-commons/src/main/thrift/common.thrift | 6 ++
.../src/main/thrift/confignode.thrift | 6 ++
thrift/src/main/thrift/mpp.thrift | 2 +
20 files changed, 418 insertions(+), 42 deletions(-)
diff --git
a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
index 548a896c85..284e04f9d9 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
@@ -51,6 +51,7 @@ keyWords
| CACHE
| CHILD
| CLEAR
+ | CLUSTER
| CONCAT
| CONFIGURATION
| CONTINUOUS
@@ -95,6 +96,7 @@ keyWords
| LINK
| LIST
| LOAD
+ | LOCAL
| LOCK
| MERGE
| METADATA
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index a179a46f3c..5f860bb65d 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -614,7 +614,7 @@ fullMerge
// Flush
flush
- : FLUSH prefixPath? (COMMA prefixPath)* BOOLEAN_LITERAL?
+ : FLUSH prefixPath? (COMMA prefixPath)* BOOLEAN_LITERAL? (ON (LOCAL |
CLUSTER))?
;
// Clear Cache
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
index 5dc57c5406..fc11251612 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
@@ -298,6 +298,10 @@ LOAD
: L O A D
;
+LOCAL
+ : L O C A L
+ ;
+
LOCK
: L O C K
;
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncDataNodeClientPool.java
b/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncDataNodeClientPool.java
index 62c7d2a16f..9510180dc3 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncDataNodeClientPool.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncDataNodeClientPool.java
@@ -21,11 +21,13 @@ package org.apache.iotdb.confignode.client;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
+import org.apache.iotdb.mpp.rpc.thrift.TInvalidatePermissionCacheReq;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.TException;
@@ -123,6 +125,35 @@ public class SyncDataNodeClientPool {
}
}
+ public TSStatus flush(TEndPoint endPoint, TFlushReq flushReq) {
+ TSStatus status;
+ try (SyncDataNodeInternalServiceClient client =
clientManager.borrowClient(endPoint)) {
+ status = client.flush(flushReq);
+ } catch (IOException e) {
+ LOGGER.error("Can't connect to DataNode {}", endPoint, e);
+ status = new TSStatus(TSStatusCode.TIME_OUT.getStatusCode());
+ } catch (TException e) {
+ LOGGER.error("flush on DataNode {} failed", endPoint, e);
+ status = new
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+ }
+ return status;
+ }
+
+ public TSStatus invalidatePermissionCache(
+ TEndPoint endPoint, TInvalidatePermissionCacheReq
invalidatePermissionCacheReq) {
+ TSStatus status;
+ try (SyncDataNodeInternalServiceClient client =
clientManager.borrowClient(endPoint)) {
+ status = client.invalidatePermissionCache(invalidatePermissionCacheReq);
+ } catch (IOException e) {
+ LOGGER.error("Can't connect to DataNode {}", endPoint, e);
+ status = new TSStatus(TSStatusCode.TIME_OUT.getStatusCode());
+ } catch (TException e) {
+ LOGGER.error("Invalid Permission Cache on DataNode {} failed", endPoint,
e);
+ status = new
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+ }
+ return status;
+ }
+
// TODO: Is the ClientPool must be a singleton?
private static class ClientPoolHolder {
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index f695373c21..fe30ba53e5 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.AuthUtils;
import org.apache.iotdb.confignode.conf.ConfigNodeConf;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
-import org.apache.iotdb.confignode.consensus.request.ConfigRequest;
import org.apache.iotdb.confignode.consensus.request.auth.AuthorReq;
import org.apache.iotdb.confignode.consensus.request.read.CountStorageGroupReq;
import org.apache.iotdb.confignode.consensus.request.read.GetDataNodeInfoReq;
@@ -475,20 +474,20 @@ public class ConfigManager implements Manager {
}
@Override
- public TSStatus operatePermission(ConfigRequest configRequest) {
+ public TSStatus operatePermission(AuthorReq authorReq) {
TSStatus status = confirmLeader();
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- return permissionManager.operatePermission((AuthorReq) configRequest);
+ return permissionManager.operatePermission(authorReq);
} else {
return status;
}
}
@Override
- public DataSet queryPermission(ConfigRequest configRequest) {
+ public DataSet queryPermission(AuthorReq authorReq) {
TSStatus status = confirmLeader();
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- return permissionManager.queryPermission((AuthorReq) configRequest);
+ return permissionManager.queryPermission(authorReq);
} else {
PermissionInfoResp dataSet = new PermissionInfoResp();
dataSet.setStatus(status);
@@ -619,4 +618,21 @@ public class ConfigManager implements Manager {
public ProcedureManager getProcedureManager() {
return procedureManager;
}
+
+ /**
+ * @param storageGroups the storage groups to check
+ * @return List of PartialPath the storage groups that not exist
+ */
+ public List<PartialPath> checkStorageGroupExist(List<PartialPath>
storageGroups) {
+ List<PartialPath> noExistSg = new ArrayList<>();
+ if (storageGroups == null) {
+ return noExistSg;
+ }
+ for (PartialPath storageGroup : storageGroups) {
+ if
(!clusterSchemaManager.getStorageGroupNames().contains(storageGroup.toString()))
{
+ noExistSg.add(storageGroup);
+ }
+ }
+ return noExistSg;
+ }
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
index ef930f3238..df6b1593d5 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.confignode.manager;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.confignode.consensus.request.ConfigRequest;
+import org.apache.iotdb.confignode.consensus.request.auth.AuthorReq;
import org.apache.iotdb.confignode.consensus.request.read.CountStorageGroupReq;
import org.apache.iotdb.confignode.consensus.request.read.GetDataNodeInfoReq;
import org.apache.iotdb.confignode.consensus.request.read.GetDataPartitionReq;
@@ -186,18 +186,16 @@ public interface Manager {
/**
* Operate Permission
*
- * @param configRequest AuthorPlan
* @return status
*/
- TSStatus operatePermission(ConfigRequest configRequest);
+ TSStatus operatePermission(AuthorReq authorReq);
/**
* Query Permission
*
- * @param configRequest AuthorPlan
* @return PermissionInfoDataSet
*/
- DataSet queryPermission(ConfigRequest configRequest);
+ DataSet queryPermission(AuthorReq authorReq);
/** login */
TPermissionInfoResp login(String username, String password);
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
index 98389bd222..0d682a570f 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
@@ -20,25 +20,20 @@
package org.apache.iotdb.confignode.manager;
import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.client.IClientManager;
-import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.confignode.client.SyncDataNodeClientPool;
import org.apache.iotdb.confignode.consensus.request.ConfigRequestType;
import org.apache.iotdb.confignode.consensus.request.auth.AuthorReq;
import org.apache.iotdb.confignode.consensus.response.PermissionInfoResp;
import org.apache.iotdb.confignode.persistence.AuthorInfo;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidatePermissionCacheReq;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.List;
/** manager permission query and operation */
@@ -48,11 +43,6 @@ public class PermissionManager {
private final ConfigManager configManager;
private final AuthorInfo authorInfo;
- private static final IClientManager<TEndPoint,
SyncDataNodeInternalServiceClient>
- INTERNAL_SERVICE_CLIENT_MANAGER =
- new IClientManager.Factory<TEndPoint,
SyncDataNodeInternalServiceClient>()
- .createClientManager(
- new
DataNodeClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
public PermissionManager(ConfigManager configManager, AuthorInfo authorInfo)
{
this.configManager = configManager;
@@ -114,25 +104,11 @@ public class PermissionManager {
req.setUsername(username);
req.setRoleName(roleName);
for (TDataNodeInfo dataNodeInfo : allDataNodes) {
- TEndPoint internalEndPoint =
dataNodeInfo.getLocation().getInternalEndPoint();
- try {
- status =
- INTERNAL_SERVICE_CLIENT_MANAGER
- .borrowClient(internalEndPoint)
- .invalidatePermissionCache(req);
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- status.setMessage(
- "datanode cache initialization failed, ip: "
- + internalEndPoint.getIp()
- + ", port: "
- + internalEndPoint.getPort());
- return status;
- }
- } catch (IOException | TException e) {
- logger.error("Failed to initialize cache, the error is ", e);
- return RpcUtils.getStatus(
- TSStatusCode.INVALIDATE_PERMISSION_CACHE_ERROR,
- "Failed to initialize cache, the error is " + e.getMessage());
+ status =
+ SyncDataNodeClientPool.getInstance()
+
.invalidatePermissionCache(dataNodeInfo.getLocation().getInternalEndPoint(),
req);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return status;
}
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 11cbca37cf..aabffbed77 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -21,11 +21,13 @@ package org.apache.iotdb.confignode.service.thrift;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.auth.AuthException;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.confignode.client.SyncDataNodeClientPool;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.ConfigRequestType;
import org.apache.iotdb.confignode.consensus.request.auth.AuthorReq;
@@ -83,6 +85,7 @@ import
org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchemaResp;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.db.qp.logical.sys.AuthorOperator;
+import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.TException;
@@ -407,6 +410,34 @@ public class ConfigNodeRPCServiceProcessor implements
ConfigIService.Iface {
return configManager.dropFunction(req.getUdfName());
}
+ @Override
+ public TSStatus flush(TFlushReq req) throws TException {
+ if (req.storageGroups != null) {
+ List<PartialPath> noExistSg =
+
configManager.checkStorageGroupExist(PartialPath.fromStringList(req.storageGroups));
+ if (!noExistSg.isEmpty()) {
+ StringBuilder sb = new StringBuilder();
+ noExistSg.forEach(storageGroup ->
sb.append(storageGroup.getFullPath()).append(","));
+ return RpcUtils.getStatus(
+ TSStatusCode.STORAGE_GROUP_NOT_EXIST,
+ "storageGroup " + sb.subSequence(0, sb.length() - 1) + " does not
exist");
+ }
+ }
+
+ List<TDataNodeInfo> onlineDataNodes =
+ configManager.getNodeManager().getOnlineDataNodes(req.dataNodeId);
+ TSStatus tsStatus = new TSStatus();
+ for (TDataNodeInfo dataNodeInfo : onlineDataNodes) {
+ tsStatus =
+ SyncDataNodeClientPool.getInstance()
+ .flush(dataNodeInfo.getLocation().getInternalEndPoint(), req);
+ if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return tsStatus;
+ }
+ }
+ return tsStatus;
+ }
+
public void handleClientExit() {}
// TODO: Interfaces for data operations
diff --git
a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index cef015d09e..f7e42bc075 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.client;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.BaseClientFactory;
import org.apache.iotdb.commons.client.ClientFactoryProperty;
@@ -623,6 +624,22 @@ public class ConfigNodeClient implements
ConfigIService.Iface, SyncThriftClient,
throw new TException(MSG_RECONNECTION_FAIL);
}
+ @Override
+ public TSStatus flush(TFlushReq req) throws TException {
+ for (int i = 0; i < RETRY_NUM; i++) {
+ try {
+ TSStatus status = client.flush(req);
+ if (!updateConfigNodeLeader(status)) {
+ return status;
+ }
+ } catch (TException e) {
+ configLeader = null;
+ }
+ reconnect();
+ }
+ throw new TException(MSG_RECONNECTION_FAIL);
+ }
+
@Override
public TSStatus dropFunction(TDropFunctionReq req) throws TException {
for (int i = 0; i < RETRY_NUM; i++) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
index 63e844cf27..d06ae6b55a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.db.engine;
+import org.apache.iotdb.common.rpc.thrift.TFlushReq;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
@@ -46,8 +48,11 @@ import
org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.db.utils.ThreadUtils;
import org.apache.iotdb.db.utils.UpgradeUtils;
+import org.apache.iotdb.db.wal.WALManager;
import org.apache.iotdb.db.wal.exception.WALException;
import org.apache.iotdb.db.wal.recover.WALRecoverManager;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.FilePathUtils;
import org.apache.commons.io.FileUtils;
@@ -515,6 +520,40 @@ public class StorageEngineV2 implements IService {
}
}
+ public void closeStorageGroupProcessor(String storageGroupPath, boolean
isSeq) {
+ for (DataRegion dataRegion : dataRegionMap.values()) {
+ if (dataRegion.getLogicalStorageGroupName().equals(storageGroupPath)) {
+ if (isSeq) {
+ for (TsFileProcessor tsFileProcessor :
dataRegion.getWorkSequenceTsFileProcessors()) {
+ dataRegion.syncCloseOneTsFileProcessor(isSeq, tsFileProcessor);
+ }
+ } else {
+ for (TsFileProcessor tsFileProcessor :
dataRegion.getWorkUnsequenceTsFileProcessors()) {
+ dataRegion.syncCloseOneTsFileProcessor(isSeq, tsFileProcessor);
+ }
+ }
+ }
+ }
+ }
+
+ public TSStatus operatorFlush(TFlushReq req) {
+ if (req.storageGroups == null) {
+ StorageEngineV2.getInstance().syncCloseAllProcessor();
+ WALManager.getInstance().deleteOutdatedWALFiles();
+ } else {
+ for (String storageGroup : req.storageGroups) {
+ if (req.isSeq == null) {
+
StorageEngineV2.getInstance().closeStorageGroupProcessor(storageGroup, true);
+
StorageEngineV2.getInstance().closeStorageGroupProcessor(storageGroup, false);
+ } else {
+ StorageEngineV2.getInstance()
+ .closeStorageGroupProcessor(storageGroup,
Boolean.parseBoolean(req.isSeq));
+ }
+ }
+ }
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
+
public void setTTL(List<DataRegionId> dataRegionIdList, long dataTTL) {
for (DataRegionId dataRegionId : dataRegionIdList) {
DataRegion dataRegion = dataRegionMap.get(dataRegionId);
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
index 38b3ac1d18..03ee48af39 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
@@ -34,6 +34,7 @@ import
org.apache.iotdb.db.mpp.plan.statement.metadata.ShowStorageGroupStatement
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowTTLStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.UnSetTTLStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.AuthorStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.FlushStatement;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
public class ConfigTaskVisitor
@@ -105,6 +106,11 @@ public class ConfigTaskVisitor
return new CreateFunctionTask(createFunctionStatement);
}
+ @Override
+ public IConfigTask visitFlush(FlushStatement flushStatement, TaskContext
context) {
+ return new FlushTask(flushStatement);
+ }
+
@Override
public IConfigTask visitDropFunction(
DropFunctionStatement dropFunctionStatement, TaskContext context) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/FlushTask.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/FlushTask.java
new file mode 100644
index 0000000000..3c3479441c
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/FlushTask.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.execution.config;
+
+import org.apache.iotdb.common.rpc.thrift.TFlushReq;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.consensus.PartitionRegionId;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeInfo;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.plan.statement.sys.FlushStatement;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class FlushTask implements IConfigTask {
+
+ private static final Logger logger =
LoggerFactory.getLogger(FlushTask.class);
+
+ private FlushStatement flushStatement;
+
+ public FlushTask(FlushStatement flushStatement) {
+ this.flushStatement = flushStatement;
+ }
+
+ @Override
+ public ListenableFuture<ConfigTaskResult> execute(
+ IClientManager<PartitionRegionId, ConfigNodeClient> clientManager)
+ throws InterruptedException {
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ TSStatus tsStatus = new TSStatus();
+ TFlushReq tFlushReq = new TFlushReq();
+ List<String> storageGroups = new ArrayList<>();
+ if (flushStatement.getStorageGroupPartitionIds() != null) {
+ for (PartialPath partialPath :
flushStatement.getStorageGroupPartitionIds().keySet()) {
+ storageGroups.add(partialPath.getFullPath());
+ }
+ tFlushReq.setStorageGroups(storageGroups);
+ }
+ if (flushStatement.isSeq() != null) {
+ tFlushReq.setIsSeq(flushStatement.isSeq().toString());
+ }
+ IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ if (flushStatement.isLocal()) {
+ tFlushReq.setDataNodeId(config.getDataNodeId());
+ } else {
+ tFlushReq.setDataNodeId(-1);
+ }
+ try (ConfigNodeClient client =
clientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+ // Send request to some API server
+ tsStatus = client.flush(tFlushReq);
+ // Get response or throw exception
+ } catch (IOException | TException e) {
+ logger.error("Failed to connect to config node.");
+ future.setException(e);
+ }
+ if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ } else {
+ future.setException(new StatementExecutionException(tsStatus));
+ }
+ // If the action is executed successfully, return the Future.
+ // If your operation is async, you can return the corresponding future
directly.
+ return future;
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
index d3f4ce8028..4bf921174e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.mpp.common.filter.BasicFunctionFilter;
import org.apache.iotdb.db.mpp.common.filter.QueryFilter;
import org.apache.iotdb.db.mpp.plan.analyze.ExpressionAnalyzer;
+import org.apache.iotdb.db.mpp.plan.constant.StatementType;
import org.apache.iotdb.db.mpp.plan.expression.Expression;
import org.apache.iotdb.db.mpp.plan.expression.ExpressionType;
import org.apache.iotdb.db.mpp.plan.expression.binary.AdditionExpression;
@@ -100,6 +101,7 @@ import
org.apache.iotdb.db.mpp.plan.statement.metadata.ShowTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.UnSetTTLStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.AuthorStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.ExplainStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.FlushStatement;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.qp.logical.sys.AuthorOperator;
import org.apache.iotdb.db.qp.sql.IoTDBSqlParser;
@@ -2184,4 +2186,32 @@ public class ASTVisitor extends
IoTDBSqlParserBaseVisitor<Statement> {
throw new SemanticException(DELETE_RANGE_ERROR_MSG);
}
}
+
+ // Flush
+
+ @Override
+ public Statement visitFlush(IoTDBSqlParser.FlushContext ctx) {
+ FlushStatement flushStatement = new FlushStatement(StatementType.FLUSH);
+ Map<PartialPath, List<Pair<Long, Boolean>>> storageGroupPartitionIds =
null;
+ if (ctx.BOOLEAN_LITERAL() != null) {
+
flushStatement.setSeq(Boolean.parseBoolean(ctx.BOOLEAN_LITERAL().getText()));
+ }
+ if (ctx.CLUSTER() != null) {
+ flushStatement.setLocal(false);
+ } else {
+ flushStatement.setLocal(true);
+ }
+ if (ctx.prefixPath(0) != null) {
+ List<PartialPath> storageGroups = new ArrayList<>();
+ for (IoTDBSqlParser.PrefixPathContext prefixPathContext :
ctx.prefixPath()) {
+ storageGroups.add(parsePrefixPath(prefixPathContext));
+ }
+ storageGroupPartitionIds = new HashMap<>();
+ for (PartialPath path : storageGroups) {
+ storageGroupPartitionIds.put(path, null);
+ }
+ }
+ flushStatement.setStorageGroupPartitionIds(storageGroupPartitionIds);
+ return flushStatement;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
index 1af9e9aa39..b878a7038a 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
@@ -56,6 +56,7 @@ import
org.apache.iotdb.db.mpp.plan.statement.metadata.ShowTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.UnSetTTLStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.AuthorStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.ExplainStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.FlushStatement;
/**
* This class provides a visitor of {@link
org.apache.iotdb.db.mpp.plan.statement.StatementNode},
@@ -246,4 +247,8 @@ public abstract class StatementVisitor<R, C> {
public R visitDeleteData(DeleteDataStatement deleteDataStatement, C context)
{
return visitStatement(deleteDataStatement, context);
}
+
+ public R visitFlush(FlushStatement flushStatement, C context) {
+ return visitStatement(flushStatement, context);
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/FlushStatement.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/FlushStatement.java
new file mode 100644
index 0000000000..9a8f920deb
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/FlushStatement.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.statement.sys;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
+import org.apache.iotdb.db.mpp.plan.constant.StatementType;
+import org.apache.iotdb.db.mpp.plan.statement.IConfigStatement;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
+import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class FlushStatement extends Statement implements IConfigStatement {
+
+ private static final Logger logger =
LoggerFactory.getLogger(FlushPlan.class);
+ /**
+ * key-> storage group, value->list of pair, Pair<PartitionId, isSequence>,
+ *
+ * <p>Notice, the value maybe null, when it is null, all partitions under
the storage groups are
+ * flushed, so do not use {@link java.util.concurrent.ConcurrentHashMap}
when initializing as
+ * ConcurrentMap dose not support null key and value
+ */
+ private Map<PartialPath, List<Pair<Long, Boolean>>> storageGroupPartitionIds;
+
+ // being null indicates flushing both seq and unseq data
+ private Boolean isSeq;
+
+ private boolean isLocal;
+
+ public FlushStatement(StatementType flushType) {
+ this.statementType = flushType;
+ }
+
+ public Map<PartialPath, List<Pair<Long, Boolean>>>
getStorageGroupPartitionIds() {
+ return storageGroupPartitionIds;
+ }
+
+ public void setStorageGroupPartitionIds(
+ Map<PartialPath, List<Pair<Long, Boolean>>> storageGroupPartitionIds) {
+ this.storageGroupPartitionIds = storageGroupPartitionIds;
+ }
+
+ public Boolean isSeq() {
+ return isSeq;
+ }
+
+ public void setSeq(Boolean seq) {
+ isSeq = seq;
+ }
+
+ public boolean isLocal() {
+ return isLocal;
+ }
+
+ public void setLocal(boolean local) {
+ isLocal = local;
+ }
+
+ @Override
+ public QueryType getQueryType() {
+ return QueryType.WRITE;
+ }
+
+ @Override
+ public List<PartialPath> getPaths() {
+ if (storageGroupPartitionIds == null) {
+ return Collections.emptyList();
+ }
+ List<PartialPath> partialPaths = new ArrayList<>();
+ for (PartialPath partialPath : storageGroupPartitionIds.keySet()) {
+ partialPaths.add(partialPath);
+ }
+ return partialPaths;
+ }
+
+ @Override
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitFlush(this, context);
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index d9b93e77ca..48c702bd52 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -2456,7 +2456,7 @@ public class PlanExecutor implements IPlanExecutor {
* @param storageGroups the storage groups to check
* @return List of PartialPath the storage groups that not exist
*/
- List<PartialPath> checkStorageGroupExist(List<PartialPath> storageGroups) {
+ public static List<PartialPath> checkStorageGroupExist(List<PartialPath>
storageGroups) {
List<PartialPath> noExistSg = new ArrayList<>();
if (storageGroups == null) {
return noExistSg;
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
index 1d1b061a11..d1b6f3faab 100644
---
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.service.thrift.impl;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.THeartbeatReq;
import org.apache.iotdb.common.rpc.thrift.THeartbeatResp;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
@@ -351,6 +352,11 @@ public class InternalServiceImpl implements
InternalService.Iface {
return RpcUtils.getStatus(TSStatusCode.INVALIDATE_PERMISSION_CACHE_ERROR);
}
+ @Override
+ public TSStatus flush(TFlushReq req) throws TException {
+ return StorageEngineV2.getInstance().operatorFlush(req);
+ }
+
@Override
public TSStatus deleteRegion(TConsensusGroupId tconsensusGroupId) throws
TException {
ConsensusGroupId consensusGroupId =
diff --git a/thrift-commons/src/main/thrift/common.thrift
b/thrift-commons/src/main/thrift/common.thrift
index 9f3de32db9..ac6a17331b 100644
--- a/thrift-commons/src/main/thrift/common.thrift
+++ b/thrift-commons/src/main/thrift/common.thrift
@@ -90,4 +90,10 @@ struct TDataNodeInfo {
1: required TDataNodeLocation location
2: required i32 cpuCoreNum
3: required i64 maxMemory
+}
+
+struct TFlushReq{
+ 1: optional string isSeq
+ 2: optional list<string> storageGroups
+ 3: optional i32 dataNodeId
}
\ No newline at end of file
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift
b/thrift-confignode/src/main/thrift/confignode.thrift
index 8f55224075..f1bd0e0818 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -292,4 +292,10 @@ service ConfigIService {
common.TSStatus createFunction(TCreateFunctionReq req)
common.TSStatus dropFunction(TDropFunctionReq req)
+
+ /* Flush */
+
+ common.TSStatus flush(common.TFlushReq req)
+
}
+
diff --git a/thrift/src/main/thrift/mpp.thrift
b/thrift/src/main/thrift/mpp.thrift
index 9ee0d7ebe1..2b89299e56 100644
--- a/thrift/src/main/thrift/mpp.thrift
+++ b/thrift/src/main/thrift/mpp.thrift
@@ -256,6 +256,8 @@ service InternalService {
* @param string:username, list<string>:roleList
*/
common.TSStatus invalidatePermissionCache(TInvalidatePermissionCacheReq req)
+
+ common.TSStatus flush(common.TFlushReq req)
}
service DataBlockService {