This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch standalone_flush in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4d8b22aa9b65d22aab10fa1c4532757a26680c04 Author: HTHou <[email protected]> AuthorDate: Mon Jun 13 16:15:10 2022 +0800 Support flush in new standalone IoTDB --- .../apache/iotdb/db/engine/StorageEngineV2.java | 2 +- .../iotdb/db/localconfignode/LocalConfigNode.java | 6 +++++ .../db/mpp/plan/execution/config/FlushTask.java | 24 ++++++++++------- .../iotdb/db/mpp/plan/parser/ASTVisitor.java | 13 +++++----- .../db/mpp/plan/statement/sys/FlushStatement.java | 30 ++++++---------------- .../service/thrift/impl/InternalServiceImpl.java | 2 +- 6 files changed, 37 insertions(+), 40 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java index d06ae6b55a..43411ef759 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java @@ -536,7 +536,7 @@ public class StorageEngineV2 implements IService { } } - public TSStatus operatorFlush(TFlushReq req) { + public TSStatus operateFlush(TFlushReq req) { if (req.storageGroups == null) { StorageEngineV2.getInstance().syncCloseAllProcessor(); WALManager.getInstance().deleteOutdatedWALFiles(); diff --git a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java index f26ec58530..3c1ba07911 100644 --- a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java +++ b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java @@ -23,7 +23,9 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.common.rpc.thrift.TFlushReq; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.auth.AuthException; @@ -1243,4 +1245,8 @@ public class LocalConfigNode { throws AuthException { return iAuthorizer.checkUserPrivileges(username, path, permission); } + + public TSStatus executeFlushOperation(TFlushReq tFlushReq) { + return storageEngine.operateFlush(tFlushReq); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/FlushTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/FlushTask.java index 3c3479441c..231a967ce0 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/FlushTask.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/FlushTask.java @@ -28,6 +28,7 @@ import org.apache.iotdb.db.client.ConfigNodeClient; import org.apache.iotdb.db.client.ConfigNodeInfo; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.localconfignode.LocalConfigNode; import org.apache.iotdb.db.mpp.plan.statement.sys.FlushStatement; import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.rpc.TSStatusCode; @@ -60,8 +61,8 @@ public class FlushTask implements IConfigTask { TSStatus tsStatus = new TSStatus(); TFlushReq tFlushReq = new TFlushReq(); List<String> storageGroups = new ArrayList<>(); - if (flushStatement.getStorageGroupPartitionIds() != null) { - for (PartialPath partialPath : flushStatement.getStorageGroupPartitionIds().keySet()) { + if (flushStatement.getStorageGroups() != null) { + for (PartialPath partialPath : flushStatement.getStorageGroups()) { storageGroups.add(partialPath.getFullPath()); } tFlushReq.setStorageGroups(storageGroups); @@ -75,13 +76,18 @@ public class FlushTask implements IConfigTask { } else { tFlushReq.setDataNodeId(-1); } - try (ConfigNodeClient client = clientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) { - // Send request to some API server - tsStatus = client.flush(tFlushReq); - // Get response or throw exception - } catch (IOException | TException e) { - logger.error("Failed to connect to config node."); - future.setException(e); + if (config.isClusterMode()) { + try (ConfigNodeClient client = clientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) { + // Send request to some API server + tsStatus = client.flush(tFlushReq); + // Get response or throw exception + } catch (IOException | TException e) { + logger.error("Failed to connect to config node."); + future.setException(e); + } + } else { + LocalConfigNode localConfigNode = LocalConfigNode.getInstance(); + tsStatus = localConfigNode.executeFlushOperation(tFlushReq); } if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java index 4bf921174e..9d8de6fa16 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java @@ -2192,26 +2192,25 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> { @Override public Statement visitFlush(IoTDBSqlParser.FlushContext ctx) { FlushStatement flushStatement = new FlushStatement(StatementType.FLUSH); - Map<PartialPath, List<Pair<Long, Boolean>>> storageGroupPartitionIds = null; + List<PartialPath> storageGroups = null; if (ctx.BOOLEAN_LITERAL() != null) { flushStatement.setSeq(Boolean.parseBoolean(ctx.BOOLEAN_LITERAL().getText())); } if (ctx.CLUSTER() != null) { + if (!IoTDBDescriptor.getInstance().getConfig().isClusterMode()) { + throw new SemanticException("FLUSH ON CLUSTER is not supported in standalone mode"); + } flushStatement.setLocal(false); } else { flushStatement.setLocal(true); } if (ctx.prefixPath(0) != null) { - List<PartialPath> storageGroups = new ArrayList<>(); + storageGroups = new ArrayList<>(); for (IoTDBSqlParser.PrefixPathContext prefixPathContext : ctx.prefixPath()) { storageGroups.add(parsePrefixPath(prefixPathContext)); } - storageGroupPartitionIds = new HashMap<>(); - for (PartialPath path : storageGroups) { - storageGroupPartitionIds.put(path, null); - } } - flushStatement.setStorageGroupPartitionIds(storageGroupPartitionIds); + flushStatement.setStorageGroups(storageGroups); return flushStatement; } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/FlushStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/FlushStatement.java index 9a8f920deb..934073a541 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/FlushStatement.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/FlushStatement.java @@ -26,27 +26,18 @@ import org.apache.iotdb.db.mpp.plan.statement.IConfigStatement; import org.apache.iotdb.db.mpp.plan.statement.Statement; import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor; import org.apache.iotdb.db.qp.physical.sys.FlushPlan; -import org.apache.iotdb.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Map; public class FlushStatement extends Statement implements IConfigStatement { private static final Logger logger = LoggerFactory.getLogger(FlushPlan.class); - /** - * key-> storage group, value->list of pair, Pair<PartitionId, isSequence>, - * - * <p>Notice, the value maybe null, when it is null, all partitions under the storage groups are - * flushed, so do not use {@link java.util.concurrent.ConcurrentHashMap} when initializing as - * ConcurrentMap dose not support null key and value - */ - private Map<PartialPath, List<Pair<Long, Boolean>>> storageGroupPartitionIds; + /** list of storage group */ + private List<PartialPath> storageGroups; // being null indicates flushing both seq and unseq data private Boolean isSeq; @@ -57,13 +48,12 @@ public class FlushStatement extends Statement implements IConfigStatement { this.statementType = flushType; } - public Map<PartialPath, List<Pair<Long, Boolean>>> getStorageGroupPartitionIds() { - return storageGroupPartitionIds; + public List<PartialPath> getStorageGroups() { + return storageGroups; } - public void setStorageGroupPartitionIds( - Map<PartialPath, List<Pair<Long, Boolean>>> storageGroupPartitionIds) { - this.storageGroupPartitionIds = storageGroupPartitionIds; + public void setStorageGroups(List<PartialPath> storageGroups) { + this.storageGroups = storageGroups; } public Boolean isSeq() { @@ -89,14 +79,10 @@ public class FlushStatement extends Statement implements IConfigStatement { @Override public List<PartialPath> getPaths() { - if (storageGroupPartitionIds == null) { + if (storageGroups == null) { return Collections.emptyList(); } - List<PartialPath> partialPaths = new ArrayList<>(); - for (PartialPath partialPath : storageGroupPartitionIds.keySet()) { - partialPaths.add(partialPath); - } - return partialPaths; + return storageGroups; } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java index d1b6f3faab..0e9a00ecce 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java @@ -354,7 +354,7 @@ public class InternalServiceImpl implements InternalService.Iface { @Override public TSStatus flush(TFlushReq req) throws TException { - return StorageEngineV2.getInstance().operatorFlush(req); + return StorageEngineV2.getInstance().operateFlush(req); } @Override
