This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 4dbda78d74f Added database check for flush on local & Optimized the 
UTF-8 param in IT (#17365)
4dbda78d74f is described below

commit 4dbda78d74f5f81d06aa6559bc16719f09cb8a0f
Author: Caideyipi <[email protected]>
AuthorDate: Fri Mar 27 10:31:33 2026 +0800

    Added database check for flush on local & Optimized the UTF-8 param in IT 
(#17365)
    
    * fix
    
    * bishop
    
    * fix-it
---
 .../it/env/cluster/node/AbstractNodeWrapper.java   |  3 ++-
 .../org/apache/iotdb/db/it/IoTDBFlushQueryIT.java  |  9 +++++++
 .../thrift/ConfigNodeRPCServiceProcessor.java      |  6 ++---
 .../impl/DataNodeInternalRPCServiceImpl.java       |  2 +-
 .../config/executor/ClusterConfigTaskExecutor.java |  3 +--
 .../plan/relational/sql/parser/AstBuilder.java     |  6 ++---
 .../iotdb/db/storageengine/StorageEngine.java      | 31 ++++++++++++++++++----
 7 files changed, 45 insertions(+), 15 deletions(-)

diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
index 2ff390a8e83..5a271c1ead2 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
@@ -527,7 +527,8 @@ public abstract class AbstractNodeWrapper implements 
BaseNodeWrapper {
               "-XX:MaxDirectMemorySize=" + jvmConfig.getMaxDirectMemorySize() 
+ "m",
               "-Djdk.nio.maxCachedBufferSize=262144",
               "-D" + IoTDBConstant.INTEGRATION_TEST_KILL_POINTS + "=" + 
killPoints.toString(),
-              "-Dsun.jnu.encoding=UTF-8 -Dfile.encoding=UTF-8",
+              "-Dsun.jnu.encoding=UTF-8",
+              "-Dfile.encoding=UTF-8",
               "-cp",
               server_node_lib_path));
       addStartCmdParams(startCmd);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBFlushQueryIT.java 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBFlushQueryIT.java
index 7e3f7dff2cf..169975450c0 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBFlushQueryIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBFlushQueryIT.java
@@ -188,6 +188,15 @@ public class IoTDBFlushQueryIT {
         sqe.printStackTrace();
         assertTrue(sqe.getMessage().contains(expectedMsg));
       }
+      try {
+        statement.execute(
+            "FLUSH 
root.noexist.nodatagroup1,root.notExistGroup1,root.notExistGroup2 on local");
+      } catch (SQLException sqe) {
+        String expectedMsg =
+            "500: Database 
root.noexist.nodatagroup1,root.notExistGroup1,root.notExistGroup2 does not 
exist on local";
+        sqe.printStackTrace();
+        assertTrue(sqe.getMessage().contains(expectedMsg));
+      }
     } catch (Exception e) {
       fail(e.getMessage());
     }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 5d6aa8da9f5..4d01f3770c2 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -981,11 +981,11 @@ public class ConfigNodeRPCServiceProcessor implements 
IConfigNodeRPCService.Ifac
   @Override
   public TSStatus flush(final TFlushReq req) throws TException {
     if (req.storageGroups != null) {
-      final List<String> noExistSg =
+      final List<String> noExistDB =
           
configManager.getPartitionManager().filterUnExistDatabases(req.storageGroups);
-      if (!noExistSg.isEmpty()) {
+      if (!noExistDB.isEmpty()) {
         final StringBuilder sb = new StringBuilder();
-        noExistSg.forEach(storageGroup -> sb.append(storageGroup).append(","));
+        noExistDB.forEach(database -> sb.append(database).append(","));
         return RpcUtils.getStatus(
             TSStatusCode.DATABASE_NOT_EXIST,
             "Database " + sb.subSequence(0, sb.length() - 1) + " does not 
exist");
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 3aa7aad7e4f..c2c3a8d16cd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -2433,7 +2433,7 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
   @Override
   public TSStatus flush(TFlushReq req) throws TException {
     try {
-      storageEngine.operateFlush(req);
+      storageEngine.operateFlush(req, false);
     } catch (Exception e) {
       return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, 
e.getMessage());
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index d0627351b27..fc28d7a5f32 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -1264,8 +1264,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
       }
     } else {
       try {
-        StorageEngine.getInstance().operateFlush(tFlushReq);
-        tsStatus = RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+        tsStatus = StorageEngine.getInstance().operateFlush(tFlushReq, true);
       } catch (final Exception e) {
         tsStatus = RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, 
e.getMessage());
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
index 59fefd8d169..70dc79b6adb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
@@ -1574,17 +1574,17 @@ public class AstBuilder extends 
RelationalSqlBaseVisitor<Node> {
   @Override
   public Node visitFlushStatement(final 
RelationalSqlParser.FlushStatementContext ctx) {
     final FlushStatement flushStatement = new 
FlushStatement(StatementType.FLUSH);
-    List<String> storageGroups = null;
+    List<String> databases = null;
     if (ctx.booleanValue() != null) {
       
flushStatement.setSeq(Boolean.parseBoolean(ctx.booleanValue().getText()));
     }
     flushStatement.setOnCluster(
         ctx.localOrClusterMode() == null || ctx.localOrClusterMode().LOCAL() 
== null);
     if (ctx.identifier() != null) {
-      storageGroups =
+      databases =
           
getIdentifiers(ctx.identifier()).stream().map(Identifier::getValue).collect(toList());
     }
-    flushStatement.setDatabases(storageGroups);
+    flushStatement.setDatabases(databases);
     return new Flush(flushStatement, null);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index fc2f9a2cacd..2a1c21de040 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -42,6 +42,7 @@ import org.apache.iotdb.commons.schema.ttl.TTLCache;
 import org.apache.iotdb.commons.service.IService;
 import org.apache.iotdb.commons.service.ServiceType;
 import org.apache.iotdb.commons.utils.PathUtils;
+import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.commons.utils.TimePartitionUtils;
 import org.apache.iotdb.consensus.ConsensusFactory;
@@ -555,6 +556,11 @@ public class StorageEngine implements IService {
     checkResults(tasks, "Failed to sync close processor.");
   }
 
+  public boolean containsDatabase(final String database) {
+    return dataRegionMap.values().stream()
+        .anyMatch(dataRegion -> Objects.equals(database, 
dataRegion.getDatabaseName()));
+  }
+
   public void syncCloseProcessorsInDatabase(String databaseName, boolean 
isSeq) {
     List<Future<Void>> tasks = new ArrayList<>();
     for (DataRegion dataRegion : dataRegionMap.values()) {
@@ -662,22 +668,37 @@ public class StorageEngine implements IService {
     }
   }
 
-  public void operateFlush(TFlushReq req) {
+  public TSStatus operateFlush(final TFlushReq req, final boolean onLocal) {
+    final StorageEngine storageEngine = StorageEngine.getInstance();
     if (req.getRegionIds() != null && !req.getRegionIds().isEmpty()) {
-      
StorageEngine.getInstance().syncCloseProcessorsInRegion(req.getRegionIds());
+      storageEngine.syncCloseProcessorsInRegion(req.getRegionIds());
     } else if (req.storageGroups == null || req.storageGroups.isEmpty()) {
       StorageEngine.getInstance().syncCloseAllProcessor();
       WALManager.getInstance().syncDeleteOutdatedFilesInWALNodes();
     } else {
+      if (onLocal) {
+        final List<String> noExistDB =
+            req.storageGroups.stream()
+                .filter(database -> !storageEngine.containsDatabase(database))
+                .collect(Collectors.toList());
+        if (!noExistDB.isEmpty()) {
+          final StringBuilder sb = new StringBuilder();
+          noExistDB.forEach(database -> sb.append(database).append(","));
+          return RpcUtils.getStatus(
+              TSStatusCode.DATABASE_NOT_EXIST,
+              "Database " + sb.subSequence(0, sb.length() - 1) + " does not 
exist on local");
+        }
+      }
       for (String databaseName : req.storageGroups) {
         if (req.isSeq == null) {
-          
StorageEngine.getInstance().syncCloseProcessorsInDatabase(databaseName);
+          storageEngine.syncCloseProcessorsInDatabase(databaseName);
         } else {
-          StorageEngine.getInstance()
-              .syncCloseProcessorsInDatabase(databaseName, 
Boolean.parseBoolean(req.isSeq));
+          storageEngine.syncCloseProcessorsInDatabase(
+              databaseName, Boolean.parseBoolean(req.isSeq));
         }
       }
     }
+    return StatusUtils.OK;
   }
 
   public void clearCache() {

Reply via email to