This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch standalone_flush
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4d8b22aa9b65d22aab10fa1c4532757a26680c04
Author: HTHou <[email protected]>
AuthorDate: Mon Jun 13 16:15:10 2022 +0800

    Support flush in new standalone IoTDB
---
 .../apache/iotdb/db/engine/StorageEngineV2.java    |  2 +-
 .../iotdb/db/localconfignode/LocalConfigNode.java  |  6 +++++
 .../db/mpp/plan/execution/config/FlushTask.java    | 24 ++++++++++-------
 .../iotdb/db/mpp/plan/parser/ASTVisitor.java       | 13 +++++-----
 .../db/mpp/plan/statement/sys/FlushStatement.java  | 30 ++++++----------------
 .../service/thrift/impl/InternalServiceImpl.java   |  2 +-
 6 files changed, 37 insertions(+), 40 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java 
b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
index d06ae6b55a..43411ef759 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngineV2.java
@@ -536,7 +536,7 @@ public class StorageEngineV2 implements IService {
     }
   }
 
-  public TSStatus operatorFlush(TFlushReq req) {
+  public TSStatus operateFlush(TFlushReq req) {
     if (req.storageGroups == null) {
       StorageEngineV2.getInstance().syncCloseAllProcessor();
       WALManager.getInstance().deleteOutdatedWALFiles();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java 
b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
index f26ec58530..3c1ba07911 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
@@ -23,7 +23,9 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TFlushReq;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.auth.AuthException;
@@ -1243,4 +1245,8 @@ public class LocalConfigNode {
       throws AuthException {
     return iAuthorizer.checkUserPrivileges(username, path, permission);
   }
+
+  public TSStatus executeFlushOperation(TFlushReq tFlushReq) {
+    return storageEngine.operateFlush(tFlushReq);
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/FlushTask.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/FlushTask.java
index 3c3479441c..231a967ce0 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/FlushTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/FlushTask.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.client.ConfigNodeClient;
 import org.apache.iotdb.db.client.ConfigNodeInfo;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.localconfignode.LocalConfigNode;
 import org.apache.iotdb.db.mpp.plan.statement.sys.FlushStatement;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -60,8 +61,8 @@ public class FlushTask implements IConfigTask {
     TSStatus tsStatus = new TSStatus();
     TFlushReq tFlushReq = new TFlushReq();
     List<String> storageGroups = new ArrayList<>();
-    if (flushStatement.getStorageGroupPartitionIds() != null) {
-      for (PartialPath partialPath : 
flushStatement.getStorageGroupPartitionIds().keySet()) {
+    if (flushStatement.getStorageGroups() != null) {
+      for (PartialPath partialPath : flushStatement.getStorageGroups()) {
         storageGroups.add(partialPath.getFullPath());
       }
       tFlushReq.setStorageGroups(storageGroups);
@@ -75,13 +76,18 @@ public class FlushTask implements IConfigTask {
     } else {
       tFlushReq.setDataNodeId(-1);
     }
-    try (ConfigNodeClient client = 
clientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
-      // Send request to some API server
-      tsStatus = client.flush(tFlushReq);
-      // Get response or throw exception
-    } catch (IOException | TException e) {
-      logger.error("Failed to connect to config node.");
-      future.setException(e);
+    if (config.isClusterMode()) {
+      try (ConfigNodeClient client = 
clientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+        // Send request to some API server
+        tsStatus = client.flush(tFlushReq);
+        // Get response or throw exception
+      } catch (IOException | TException e) {
+        logger.error("Failed to connect to config node.");
+        future.setException(e);
+      }
+    } else {
+      LocalConfigNode localConfigNode = LocalConfigNode.getInstance();
+      tsStatus = localConfigNode.executeFlushOperation(tFlushReq);
     }
     if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
index 4bf921174e..9d8de6fa16 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
@@ -2192,26 +2192,25 @@ public class ASTVisitor extends 
IoTDBSqlParserBaseVisitor<Statement> {
   @Override
   public Statement visitFlush(IoTDBSqlParser.FlushContext ctx) {
     FlushStatement flushStatement = new FlushStatement(StatementType.FLUSH);
-    Map<PartialPath, List<Pair<Long, Boolean>>> storageGroupPartitionIds = 
null;
+    List<PartialPath> storageGroups = null;
     if (ctx.BOOLEAN_LITERAL() != null) {
       
flushStatement.setSeq(Boolean.parseBoolean(ctx.BOOLEAN_LITERAL().getText()));
     }
     if (ctx.CLUSTER() != null) {
+      if (!IoTDBDescriptor.getInstance().getConfig().isClusterMode()) {
+        throw new SemanticException("FLUSH ON CLUSTER is not supported in 
standalone mode");
+      }
       flushStatement.setLocal(false);
     } else {
       flushStatement.setLocal(true);
     }
     if (ctx.prefixPath(0) != null) {
-      List<PartialPath> storageGroups = new ArrayList<>();
+      storageGroups = new ArrayList<>();
       for (IoTDBSqlParser.PrefixPathContext prefixPathContext : 
ctx.prefixPath()) {
         storageGroups.add(parsePrefixPath(prefixPathContext));
       }
-      storageGroupPartitionIds = new HashMap<>();
-      for (PartialPath path : storageGroups) {
-        storageGroupPartitionIds.put(path, null);
-      }
     }
-    flushStatement.setStorageGroupPartitionIds(storageGroupPartitionIds);
+    flushStatement.setStorageGroups(storageGroups);
     return flushStatement;
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/FlushStatement.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/FlushStatement.java
index 9a8f920deb..934073a541 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/FlushStatement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/FlushStatement.java
@@ -26,27 +26,18 @@ import 
org.apache.iotdb.db.mpp.plan.statement.IConfigStatement;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
 import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
 import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
-import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 
 public class FlushStatement extends Statement implements IConfigStatement {
 
   private static final Logger logger = 
LoggerFactory.getLogger(FlushPlan.class);
-  /**
-   * key-> storage group, value->list of pair, Pair<PartitionId, isSequence>,
-   *
-   * <p>Notice, the value maybe null, when it is null, all partitions under 
the storage groups are
-   * flushed, so do not use {@link java.util.concurrent.ConcurrentHashMap} 
when initializing as
-   * ConcurrentMap dose not support null key and value
-   */
-  private Map<PartialPath, List<Pair<Long, Boolean>>> storageGroupPartitionIds;
+  /** list of storage group */
+  private List<PartialPath> storageGroups;
 
   // being null indicates flushing both seq and unseq data
   private Boolean isSeq;
@@ -57,13 +48,12 @@ public class FlushStatement extends Statement implements 
IConfigStatement {
     this.statementType = flushType;
   }
 
-  public Map<PartialPath, List<Pair<Long, Boolean>>> 
getStorageGroupPartitionIds() {
-    return storageGroupPartitionIds;
+  public List<PartialPath> getStorageGroups() {
+    return storageGroups;
   }
 
-  public void setStorageGroupPartitionIds(
-      Map<PartialPath, List<Pair<Long, Boolean>>> storageGroupPartitionIds) {
-    this.storageGroupPartitionIds = storageGroupPartitionIds;
+  public void setStorageGroups(List<PartialPath> storageGroups) {
+    this.storageGroups = storageGroups;
   }
 
   public Boolean isSeq() {
@@ -89,14 +79,10 @@ public class FlushStatement extends Statement implements 
IConfigStatement {
 
   @Override
   public List<PartialPath> getPaths() {
-    if (storageGroupPartitionIds == null) {
+    if (storageGroups == null) {
       return Collections.emptyList();
     }
-    List<PartialPath> partialPaths = new ArrayList<>();
-    for (PartialPath partialPath : storageGroupPartitionIds.keySet()) {
-      partialPaths.add(partialPath);
-    }
-    return partialPaths;
+    return storageGroups;
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
index d1b6f3faab..0e9a00ecce 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
@@ -354,7 +354,7 @@ public class InternalServiceImpl implements 
InternalService.Iface {
 
   @Override
   public TSStatus flush(TFlushReq req) throws TException {
-    return StorageEngineV2.getInstance().operatorFlush(req);
+    return StorageEngineV2.getInstance().operateFlush(req);
   }
 
   @Override

Reply via email to