This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch add_select_locally
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 58d65094a979925ef54b7452662a0cb707ef4531
Author: Tian Jiang <[email protected]>
AuthorDate: Fri Feb 13 18:55:31 2026 +0800

    support select locally
---
 .../iotdb/it/env/cluster/env/AbstractEnv.java      |  18 ++
 .../iotdb/it/env/cluster/node/DataNodeWrapper.java |   5 +
 .../iotdb/it/env/remote/env/RemoteServerEnv.java   |   6 +
 .../java/org/apache/iotdb/itbase/env/BaseEnv.java  |   2 +
 .../it/db/it/IoTDBDataConsistencyIT.java           | 307 +++++++++++++++++++++
 .../plan/relational/analyzer/Analysis.java         |  10 +
 .../relational/analyzer/StatementAnalyzer.java     |   2 +
 .../distribute/TableDistributedPlanGenerator.java  |  14 +
 .../plan/relational/sql/ast/QueryBody.java         |   4 +
 .../relational/sql/ast/QuerySpecification.java     |  12 +
 .../plan/relational/sql/parser/AstBuilder.java     |  48 ++--
 .../db/relational/grammar/sql/RelationalSql.g4     |   3 +-
 12 files changed, 408 insertions(+), 23 deletions(-)

diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
index 8308225a9af..0a1148361f7 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
@@ -641,6 +641,24 @@ public abstract class AbstractEnv implements BaseEnv {
         getReadConnections(null, dataNodeWrapper, username, password, 
sqlDialect));
   }
 
+  @Override
+  public Connection getConnection(DataNodeWrapper dataNodeWrapper, String 
sqlDialect)
+      throws SQLException {
+    return new ClusterTestConnection(
+        getWriteConnectionWithSpecifiedDataNode(
+            dataNodeWrapper,
+            null,
+            SessionConfig.DEFAULT_USER,
+            SessionConfig.DEFAULT_PASSWORD,
+            sqlDialect),
+        getReadConnections(
+            null,
+            dataNodeWrapper,
+            SessionConfig.DEFAULT_USER,
+            SessionConfig.DEFAULT_PASSWORD,
+            sqlDialect));
+  }
+
   @Override
   public Connection getWriteOnlyConnectionWithSpecifiedDataNode(
       final DataNodeWrapper dataNode,
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
index 96a0fbe27e0..a90912bd9e4 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
@@ -307,4 +307,9 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
   public int getRestServicePort() {
     return restServicePort;
   }
+
+  @Override
+  public String toString() {
+    return "DataNodeWrapper{" + internalAddress + ":" + internalPort + "}";
+  }
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
index 586eff60494..391a62b563e 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
@@ -192,6 +192,12 @@ public class RemoteServerEnv implements BaseEnv {
     throw new UnsupportedOperationException();
   }
 
+  @Override
+  public Connection getConnection(DataNodeWrapper dataNodeWrapper, String 
sqlDialect)
+      throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
   public void setTestMethodName(String testCaseName) {
     // Do nothing
   }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
index 8b32deb3ea8..ad7e90fe7bd 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
@@ -160,6 +160,8 @@ public interface BaseEnv {
       DataNodeWrapper dataNodeWrapper, String username, String password, 
String sqlDialect)
       throws SQLException;
 
+  Connection getConnection(DataNodeWrapper dataNodeWrapper, String sqlDialect) 
throws SQLException;
+
   default Connection getConnection(String username, String password) throws 
SQLException {
     return getConnection(username, password, TREE_SQL_DIALECT);
   }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBDataConsistencyIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBDataConsistencyIT.java
new file mode 100644
index 00000000000..3067a7de25c
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBDataConsistencyIT.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.relational.it.db.it;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.TableClusterIT;
+import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
+import org.apache.iotdb.itbase.env.BaseEnv;
+
+import org.apache.tsfile.utils.Pair;
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Locale;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@SuppressWarnings("UnnecessaryLocalVariable")
+@RunWith(IoTDBTestRunner.class)
+@Category({TableLocalStandaloneIT.class, TableClusterIT.class})
+public class IoTDBDataConsistencyIT {
+
+  private static final int numDNs = 3;
+  private static final int numDataReplications = 2;
+  //                                     device   measurement   values
+  private final Map<DataNodeWrapper, Map<String, Map<String, List<Object>>>> 
dataNodeData = new HashMap<>();
+  //                device      measurement  value  #occurrences
+  private final Map<String, Map<String, Map<Object, Integer>>> dataOccurrences 
= new HashMap<>();
+  private final boolean verbose = true;
+
+  @BeforeClass
+  public static void setUpClass() {
+    Locale.setDefault(Locale.ENGLISH);
+
+    EnvFactory.getEnv()
+        .getConfig()
+        .getCommonConfig()
+        .setDataReplicationFactor(numDataReplications)
+        .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS);
+    EnvFactory.getEnv().initClusterEnvironment(1, numDNs);
+  }
+
+  @Before
+  public void setUp() {
+    try (Connection connection = 
EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
+        Statement statement = connection.createStatement()) {
+      statement.execute("CREATE DATABASE IF NOT EXISTS test");
+      statement.execute("CREATE TABLE test.t1 (tag1 string tag, s1 int32, s2 
int32)");
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+  }
+
+  @After
+  public void tearDown() {
+    try (Connection connection = 
EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
+        Statement statement = connection.createStatement()) {
+      statement.execute("DROP DATABASE IF EXISTS test");
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+  }
+
+  @AfterClass
+  public static void tearDownClass() {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  private void printCollectedResult() {
+    System.out.println("====================Collected 
Result=====================");
+    dataNodeData.forEach((dn, data) -> System.out.println(dn + ": " + data));
+    dataOccurrences.forEach((deviceId, measurementMap) -> 
System.out.println(deviceId + ": " + measurementMap));
+  }
+
+  private void collectDataAndOccurrences(ResultSet resultSet,
+      Map<DataNodeWrapper, Map<String, Map<String, List<Object>>>> 
dataNodeData,
+      Map<String, Map<String, Map<Object, Integer>>> dataOccurrences,
+      DataNodeWrapper dataNodeWrapper) throws SQLException {
+    ResultSetMetaData metaData = resultSet.getMetaData();
+    int columnCount = metaData.getColumnCount();
+    List<String> tagColumnNames = new ArrayList<>();
+    List<String> fieldColumnNames = new ArrayList<>();
+    for (int i = 1; i <= columnCount; i++) {
+      String columnName = metaData.getColumnName(i);
+      if (columnName.startsWith("tag")) {
+        tagColumnNames.add(columnName);
+      } else if (columnName.startsWith("s")) {
+        fieldColumnNames.add(columnName);
+      }
+    }
+
+    while (resultSet.next()) {
+      long time = resultSet.getLong("time");
+      StringBuilder deviceId = new StringBuilder();
+      for (String tagColumnName : tagColumnNames) {
+        String tag = resultSet.getString(tagColumnName);
+        deviceId.append(tag).append(",");
+      }
+
+      for (String fieldColumnName : fieldColumnNames) {
+        Object val = resultSet.getObject(fieldColumnName);
+        Pair<Long, Object> timeValuePair = new Pair<>(time, val);
+        dataOccurrences.computeIfAbsent(deviceId.toString(), k -> new 
HashMap<>()).computeIfAbsent(
+            fieldColumnName, k -> new HashMap<>()).merge(timeValuePair, 1, 
Integer::sum);
+        dataNodeData.computeIfAbsent(dataNodeWrapper, dn -> new 
HashMap<>()).computeIfAbsent(
+            deviceId.toString(), k -> new 
HashMap<>()).computeIfAbsent(fieldColumnName, k -> new 
ArrayList<>()).add(timeValuePair);
+      }
+    }
+  }
+
+  private void queryAndCollect(Map<DataNodeWrapper, Map<String, Map<String, 
List<Object>>>> dataNodeData,
+      Map<String, Map<String, Map<Object, Integer>>> dataOccurrences,
+      BaseEnv env) throws SQLException {
+    dataNodeData.clear();
+    dataOccurrences.clear();
+    List<DataNodeWrapper> dataNodeWrapperList = env.getDataNodeWrapperList();
+    for (DataNodeWrapper dataNodeWrapper : dataNodeWrapperList) {
+      try (Connection localConnection =
+          env.getConnection(dataNodeWrapper, BaseEnv.TABLE_SQL_DIALECT);
+          Statement localStatement = localConnection.createStatement()) {
+        ResultSet resultSet =
+            localStatement.executeQuery("SELECT LOCALLY * FROM test.t1");
+
+        collectDataAndOccurrences(resultSet, dataNodeData, dataOccurrences, 
dataNodeWrapper);
+      }
+    }
+    if (verbose) {
+      printCollectedResult();
+    }
+  }
+
+  private void checkConsistency(Map<String, Map<String, Map<Object, Integer>>> 
dataOccurrences, boolean expectEmpty) {
+    if (!expectEmpty) {
+      assertFalse(dataOccurrences.isEmpty());
+    } else {
+      assertTrue(dataOccurrences.isEmpty());
+    }
+    dataOccurrences.values().forEach(measurementMap ->
+        measurementMap.values().forEach(valueMap -> valueMap.values().forEach(
+            count -> assertEquals(numDataReplications, count.intValue())
+        )));
+  }
+
+  private void prepareData(Statement statement, long numTimestamp, int 
numDevices)
+      throws SQLException {
+    for (int d = 0; d < numDevices; d++) {
+      for (long t = 0; t < numTimestamp; t++) {
+        statement.execute(String.format("INSERT INTO test.t1 (time, tag1, s1, 
s2) VALUES(%s, 'a%s', %s, %s)", t, d, t, t + 100));
+      }
+    }
+  }
+
+  @Test
+  public void testBasicConsistency() {
+    BaseEnv env = EnvFactory.getEnv();
+
+    try (Connection connection = env.getConnection(BaseEnv.TABLE_SQL_DIALECT);
+        Statement statement = connection.createStatement()) {
+      long numTimestamp = 3;
+      int numDevices = 3;
+      prepareData(statement, numTimestamp, numDevices);
+
+      Awaitility.await()
+          .atMost(20, TimeUnit.SECONDS)
+          .pollInterval(1, TimeUnit.SECONDS)
+          .untilAsserted(
+              () -> {
+                queryAndCollect(dataNodeData, dataOccurrences, env);
+                checkConsistency(dataOccurrences, false);
+              });
+
+      statement.execute("FLUSH");
+      Awaitility.await()
+          .atMost(20, TimeUnit.SECONDS)
+          .pollInterval(1, TimeUnit.SECONDS)
+          .untilAsserted(
+              () -> {
+                queryAndCollect(dataNodeData, dataOccurrences, env);
+                checkConsistency(dataOccurrences, false);
+              });
+    } catch (Exception e) {
+      printCollectedResult();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testConsistencyAfterDelete() {
+    BaseEnv env = EnvFactory.getEnv();
+
+    try (Connection connection = env.getConnection(BaseEnv.TABLE_SQL_DIALECT);
+        Statement statement = connection.createStatement()) {
+      long numTimestamp = 3;
+      int numDevices = 3;
+      prepareData(statement, numTimestamp, numDevices);
+
+      statement.execute("DELETE FROM test.t1 WHERE time < 1");
+      Awaitility.await()
+          .atMost(20, TimeUnit.SECONDS)
+          .pollInterval(1, TimeUnit.SECONDS)
+          .untilAsserted(
+              () -> {
+                queryAndCollect(dataNodeData, dataOccurrences, env);
+                checkConsistency(dataOccurrences, false);
+              });
+
+      statement.execute("DELETE FROM test.t1 WHERE tag1='a1'");
+      Awaitility.await()
+          .atMost(20, TimeUnit.SECONDS)
+          .pollInterval(1, TimeUnit.SECONDS)
+          .untilAsserted(
+              () -> {
+                queryAndCollect(dataNodeData, dataOccurrences, env);
+                checkConsistency(dataOccurrences, false);
+              });
+
+      statement.execute("DELETE FROM test.t1");
+      Awaitility.await()
+          .atMost(20, TimeUnit.SECONDS)
+          .pollInterval(1, TimeUnit.SECONDS)
+          .untilAsserted(
+              () -> {
+                queryAndCollect(dataNodeData, dataOccurrences, env);
+                checkConsistency(dataOccurrences, true);
+              });
+    } catch (Exception e) {
+      printCollectedResult();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testConsistencyAfterRestart() throws SQLException {
+    BaseEnv env = EnvFactory.getEnv();
+
+    try (Connection connection = env.getConnection(BaseEnv.TABLE_SQL_DIALECT);
+        Statement statement = connection.createStatement()) {
+      long numTimestamp = 3;
+      int numDevices = 3;
+      prepareData(statement, numTimestamp, numDevices);
+      Awaitility.await()
+          .atMost(20, TimeUnit.SECONDS)
+          .pollInterval(1, TimeUnit.SECONDS)
+          .untilAsserted(
+              () -> {
+                queryAndCollect(dataNodeData, dataOccurrences, env);
+                checkConsistency(dataOccurrences, false);
+              });
+      statement.execute("FLUSH");
+    }
+
+    TestUtils.restartCluster(env);
+    try {
+      Awaitility.await()
+          .atMost(20, TimeUnit.SECONDS)
+          .pollInterval(1, TimeUnit.SECONDS)
+          .untilAsserted(
+              () -> {
+                queryAndCollect(dataNodeData, dataOccurrences, env);
+                checkConsistency(dataOccurrences, false);
+              });
+    } catch (Exception e) {
+      printCollectedResult();
+      throw e;
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
index 4a0fe9daa57..d0bca8f2de4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
@@ -263,6 +263,8 @@ public class Analysis implements IAnalysis {
   // independently to utilize predicate pushdown optimization.
   private SqlParser sqlParser;
 
+  private boolean isLocalQuery = false;
+
   public Analysis(@Nullable Statement root, Map<NodeRef<Parameter>, 
Expression> parameters) {
     this.root = root;
     this.parameters = ImmutableMap.copyOf(requireNonNull(parameters, 
"parameters is null"));
@@ -1574,4 +1576,12 @@ public class Analysis implements IAnalysis {
       return columns;
     }
   }
+
+  public boolean isLocalQuery() {
+    return isLocalQuery;
+  }
+
+  public void setLocalQuery(boolean localQuery) {
+    isLocalQuery = localQuery;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
index bc6d54c37e7..1be3516b255 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
@@ -860,6 +860,8 @@ public class StatementAnalyzer {
     @Override
     protected Scope visitQuery(Query node, Optional<Scope> context) {
       analysis.setQuery(true);
+      analysis.setLocalQuery(node.getQueryBody().isLocalQuery());
+
       Scope withScope = analyzeWith(node, context);
       hasFillInParentScope = node.getFill().isPresent() || 
hasFillInParentScope;
       Scope queryBodyScope = process(node.getQueryBody(), withScope);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
index 7072b5f519f..b73c022cbcd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.commons.partition.SchemaPartition;
 import org.apache.iotdb.commons.schema.table.TsTable;
 import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 import org.apache.iotdb.commons.utils.TimePartitionUtils;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.common.QueryId;
@@ -810,6 +811,19 @@ public class TableDistributedPlanGenerator
         context.deviceCrossRegion = true;
       }
       for (final TRegionReplicaSet regionReplicaSet : regionReplicaSets) {
+        if (analysis.isLocalQuery()) {
+          // only query this node in local query mode
+          int dataNodeId = 
IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
+          boolean containsThisNode =
+              regionReplicaSet.dataNodeLocations.stream()
+                  .anyMatch(dn -> dn.getDataNodeId() == dataNodeId);
+          if (!containsThisNode) {
+            continue;
+          } else {
+            regionReplicaSet.dataNodeLocations.removeIf(dn -> 
dn.getDataNodeId() != dataNodeId);
+          }
+        }
+
         final DeviceTableScanNode deviceTableScanNode =
             tableScanNodeMap.computeIfAbsent(
                 regionReplicaSet,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/QueryBody.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/QueryBody.java
index 35fefdfbf32..55162084db4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/QueryBody.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/QueryBody.java
@@ -31,4 +31,8 @@ public abstract class QueryBody extends Relation {
   public <R, C> R accept(AstVisitor<R, C> visitor, C context) {
     return visitor.visitQueryBody(this, context);
   }
+
+  public boolean isLocalQuery() {
+    return false;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/QuerySpecification.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/QuerySpecification.java
index 5de61a2dde2..89b3c28c7e9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/QuerySpecification.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/QuerySpecification.java
@@ -45,6 +45,8 @@ public class QuerySpecification extends QueryBody {
   private final Optional<Offset> offset;
   private final Optional<Node> limit;
 
+  private boolean isLocalQuery = false;
+
   public QuerySpecification(
       Select select,
       Optional<Relation> from,
@@ -207,4 +209,14 @@ public class QuerySpecification extends QueryBody {
     size += 
AstMemoryEstimationHelper.getEstimatedSizeOfAccountableObject(limit.orElse(null));
     return size;
   }
+
+  public QuerySpecification setLocalQuery(boolean localQuery) {
+    isLocalQuery = localQuery;
+    return this;
+  }
+
+  @Override
+  public boolean isLocalQuery() {
+    return isLocalQuery;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
index f46b356423d..7c8cbeb3852 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
@@ -2226,17 +2226,18 @@ public class AstBuilder extends 
RelationalSqlBaseVisitor<Node> {
           getLocation(ctx),
           Optional.empty(),
           new QuerySpecification(
-              getLocation(ctx),
-              query.getSelect(),
-              query.getFrom(),
-              query.getWhere(),
-              query.getGroupBy(),
-              query.getHaving(),
-              fill,
-              query.getWindows(),
-              orderBy,
-              offset,
-              limit),
+                  getLocation(ctx),
+                  query.getSelect(),
+                  query.getFrom(),
+                  query.getWhere(),
+                  query.getGroupBy(),
+                  query.getHaving(),
+                  fill,
+                  query.getWindows(),
+                  orderBy,
+                  offset,
+                  limit)
+              .setLocalQuery(term.isLocalQuery()),
           Optional.empty(),
           Optional.empty(),
           Optional.empty(),
@@ -2352,18 +2353,21 @@ public class AstBuilder extends 
RelationalSqlBaseVisitor<Node> {
       from = Optional.of(relation);
     }
 
+    boolean isLocalQuery = ctx.LOCALLY() != null;
+
     return new QuerySpecification(
-        getLocation(ctx),
-        new Select(getLocation(ctx.SELECT()), isDistinct(ctx.setQuantifier()), 
selectItems),
-        from,
-        visitIfPresent(ctx.where, Expression.class),
-        visitIfPresent(ctx.groupBy(), GroupBy.class),
-        visitIfPresent(ctx.having, Expression.class),
-        Optional.empty(),
-        visit(ctx.windowDefinition(), WindowDefinition.class),
-        Optional.empty(),
-        Optional.empty(),
-        Optional.empty());
+            getLocation(ctx),
+            new Select(getLocation(ctx.SELECT()), 
isDistinct(ctx.setQuantifier()), selectItems),
+            from,
+            visitIfPresent(ctx.where, Expression.class),
+            visitIfPresent(ctx.groupBy(), GroupBy.class),
+            visitIfPresent(ctx.having, Expression.class),
+            Optional.empty(),
+            visit(ctx.windowDefinition(), WindowDefinition.class),
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty())
+        .setLocalQuery(isLocalQuery);
   }
 
   @Override
diff --git 
a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4
 
b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4
index 08a8b4c2e82..71164c903a0 100644
--- 
a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4
+++ 
b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4
@@ -1004,7 +1004,7 @@ sortItem
     ;
 
 querySpecification
-    : SELECT setQuantifier? selectItem (',' selectItem)*
+    : SELECT LOCALLY? setQuantifier? selectItem (',' selectItem)*
       (FROM relation (',' relation)*)?
       (WHERE where=booleanExpression)?
       (GROUP BY groupBy)?
@@ -1669,6 +1669,7 @@ LISTAGG: 'LISTAGG';
 LOAD: 'LOAD';
 LOADED: 'LOADED';
 LOCAL: 'LOCAL';
+LOCALLY: 'LOCALLY';
 LOCALTIME: 'LOCALTIME';
 LOCALTIMESTAMP: 'LOCALTIMESTAMP';
 LOGICAL: 'LOGICAL';

Reply via email to