This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch force_ci/object_type in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ce0e776dd0587aba07c1411db1733c8b52d14a84 Author: Jackie Tien <[email protected]> AuthorDate: Mon Dec 8 17:34:47 2025 +0800 Implement PreparedStmt on the Server side (#16764) (#16880) (cherry picked from commit 7436c88304e71a21f961f2de3c2fd85285c13050) --- .../iotdb/itbase/runtime/ClusterTestStatement.java | 7 + .../it/db/it/IoTDBPreparedStatementIT.java | 385 +++++++++++++++++++++ .../iotdb/db/protocol/session/ClientSession.java | 23 ++ .../iotdb/db/protocol/session/IClientSession.java | 31 ++ .../db/protocol/session/InternalClientSession.java | 24 ++ .../db/protocol/session/MqttClientSession.java | 24 ++ .../db/protocol/session/PreparedStatementInfo.java | 99 ++++++ .../db/protocol/session/RestClientSession.java | 25 ++ .../iotdb/db/protocol/session/SessionManager.java | 41 ++- .../thrift/handler/BaseServerContextHandler.java | 1 + .../protocol/thrift/impl/ClientRPCServiceImpl.java | 1 + .../iotdb/db/queryengine/plan/Coordinator.java | 89 ++++- .../execution/config/TableConfigTaskVisitor.java | 16 + .../execution/config/session/DeallocateTask.java | 72 ++++ .../plan/execution/config/session/PrepareTask.java | 85 +++++ .../session/PreparedStatementMemoryManager.java | 157 +++++++++ .../relational/analyzer/StatementAnalyzer.java | 76 ++-- .../plan/relational/planner/TableModelPlanner.java | 16 +- .../plan/relational/sql/AstMemoryEstimator.java | 67 ++++ .../plan/relational/sql/ParameterExtractor.java | 121 +++++++ .../plan/relational/sql/ast/AstVisitor.java | 16 + .../plan/relational/sql/ast/Deallocate.java | 79 +++++ .../plan/relational/sql/ast/Execute.java | 96 +++++ .../plan/relational/sql/ast/ExecuteImmediate.java | 99 ++++++ .../plan/relational/sql/ast/Prepare.java | 87 +++++ .../plan/relational/sql/parser/AstBuilder.java | 38 ++ .../db/relational/grammar/sql/RelationalSql.g4 | 23 ++ .../thrift-datanode/src/main/thrift/client.thrift | 1 + 28 files changed, 1747 insertions(+), 52 deletions(-) diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestStatement.java b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestStatement.java index 3f96fdf1372..0523a384828 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestStatement.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestStatement.java @@ -78,6 +78,13 @@ public class ClusterTestStatement implements Statement { statement.setQueryTimeout(timeout); } + /** + * Executes a SQL query on all read statements in parallel. + * + * <p>Note: For PreparedStatement EXECUTE queries, use the write connection directly instead, + * because PreparedStatements are session-scoped and this method may route queries to different + * nodes where the PreparedStatement doesn't exist. + */ @Override public ResultSet executeQuery(String sql) throws SQLException { return new ClusterTestResultSet(readStatements, readEndpoints, sql, queryTimeout); diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBPreparedStatementIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBPreparedStatementIT.java new file mode 100644 index 00000000000..f06d46201af --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBPreparedStatementIT.java @@ -0,0 +1,385 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.relational.it.db.it; + +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.TableClusterIT; +import org.apache.iotdb.itbase.category.TableLocalStandaloneIT; +import org.apache.iotdb.itbase.runtime.ClusterTestConnection; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; + +import static org.apache.iotdb.db.it.utils.TestUtils.tableResultSetEqualTest; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@RunWith(IoTDBTestRunner.class) +@Category({TableLocalStandaloneIT.class, TableClusterIT.class}) +public class IoTDBPreparedStatementIT { + private static final String DATABASE_NAME = "test"; + private static final String[] sqls = + new String[] { + "CREATE DATABASE " + DATABASE_NAME, + "USE " + DATABASE_NAME, + "CREATE TABLE test_table(id INT64 FIELD, name STRING FIELD, value DOUBLE FIELD)", + "INSERT INTO test_table VALUES (2025-01-01T00:00:00, 1, 'Alice', 100.5)", + "INSERT INTO test_table VALUES (2025-01-01T00:01:00, 2, 'Bob', 200.3)", + "INSERT INTO test_table VALUES (2025-01-01T00:02:00, 3, 'Charlie', 300.7)", + "INSERT INTO test_table VALUES (2025-01-01T00:03:00, 4, 'David', 400.2)", + "INSERT INTO test_table VALUES (2025-01-01T00:04:00, 5, 'Eve', 500.9)", + }; + + protected static void insertData() { + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + for (String sql : sqls) { + statement.execute(sql); + } + } catch (Exception e) { + fail("insertData failed: " + e.getMessage()); + } + } + + @BeforeClass + public static void setUp() { + EnvFactory.getEnv().initClusterEnvironment(); + insertData(); + } + + @AfterClass + public static void tearDown() { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + /** + * Execute a prepared statement query and verify the result. For PreparedStatement EXECUTE + * queries, use the write connection directly instead of tableResultSetEqualTest, because + * PreparedStatements are session-scoped and tableResultSetEqualTest may route queries to + * different nodes where the PreparedStatement doesn't exist. + */ + private static void executePreparedStatementAndVerify( + Connection connection, + Statement statement, + String executeSql, + String[] expectedHeader, + String[] expectedRetArray) + throws SQLException { + // Execute with parameters using write connection directly + // In cluster test, we need to use write connection to ensure same session + if (connection instanceof ClusterTestConnection) { + // Use write connection directly for PreparedStatement queries + try (Statement writeStatement = + ((ClusterTestConnection) connection) + .writeConnection + .getUnderlyingConnection() + .createStatement(); + ResultSet resultSet = writeStatement.executeQuery(executeSql)) { + ResultSetMetaData metaData = resultSet.getMetaData(); + + // Verify header + assertEquals(expectedHeader.length, metaData.getColumnCount()); + for (int i = 1; i <= metaData.getColumnCount(); i++) { + assertEquals(expectedHeader[i - 1], metaData.getColumnName(i)); + } + + // Verify data + int cnt = 0; + while (resultSet.next()) { + StringBuilder builder = new StringBuilder(); + for (int i = 1; i <= expectedHeader.length; i++) { + builder.append(resultSet.getString(i)).append(","); + } + assertEquals(expectedRetArray[cnt], builder.toString()); + cnt++; + } + assertEquals(expectedRetArray.length, cnt); + } + } else { + try (ResultSet resultSet = statement.executeQuery(executeSql)) { + ResultSetMetaData metaData = resultSet.getMetaData(); + + // Verify header + assertEquals(expectedHeader.length, metaData.getColumnCount()); + for (int i = 1; i <= metaData.getColumnCount(); i++) { + assertEquals(expectedHeader[i - 1], metaData.getColumnName(i)); + } + + // Verify data + int cnt = 0; + while (resultSet.next()) { + StringBuilder builder = new StringBuilder(); + for (int i = 1; i <= expectedHeader.length; i++) { + builder.append(resultSet.getString(i)).append(","); + } + assertEquals(expectedRetArray[cnt], builder.toString()); + cnt++; + } + assertEquals(expectedRetArray.length, cnt); + } + } + } + + @Test + public void testPrepareAndExecute() { + String[] expectedHeader = new String[] {"time", "id", "name", "value"}; + String[] retArray = new String[] {"2025-01-01T00:01:00.000Z,2,Bob,200.3,"}; + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + statement.execute("USE " + DATABASE_NAME); + // Prepare a statement + statement.execute("PREPARE stmt1 FROM SELECT * FROM test_table WHERE id = ?"); + // Execute with parameter using write connection directly + executePreparedStatementAndVerify( + connection, statement, "EXECUTE stmt1 USING 2", expectedHeader, retArray); + // Deallocate + statement.execute("DEALLOCATE PREPARE stmt1"); + } catch (SQLException e) { + fail("testPrepareAndExecute failed: " + e.getMessage()); + } + } + + @Test + public void testPrepareAndExecuteMultipleTimes() { + String[] expectedHeader = new String[] {"time", "id", "name", "value"}; + String[] retArray1 = new String[] {"2025-01-01T00:00:00.000Z,1,Alice,100.5,"}; + String[] retArray2 = new String[] {"2025-01-01T00:02:00.000Z,3,Charlie,300.7,"}; + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + statement.execute("USE " + DATABASE_NAME); + // Prepare a statement + statement.execute("PREPARE stmt2 FROM SELECT * FROM test_table WHERE id = ?"); + // Execute multiple times with different parameters using write connection directly + executePreparedStatementAndVerify( + connection, statement, "EXECUTE stmt2 USING 1", expectedHeader, retArray1); + executePreparedStatementAndVerify( + connection, statement, "EXECUTE stmt2 USING 3", expectedHeader, retArray2); + // Deallocate + statement.execute("DEALLOCATE PREPARE stmt2"); + } catch (SQLException e) { + fail("testPrepareAndExecuteMultipleTimes failed: " + e.getMessage()); + } + } + + @Test + public void testPrepareWithMultipleParameters() { + String[] expectedHeader = new String[] {"time", "id", "name", "value"}; + String[] retArray = new String[] {"2025-01-01T00:01:00.000Z,2,Bob,200.3,"}; + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + statement.execute("USE " + DATABASE_NAME); + // Prepare a statement with multiple parameters + statement.execute("PREPARE stmt3 FROM SELECT * FROM test_table WHERE id = ? AND value > ?"); + // Execute with multiple parameters using write connection directly + executePreparedStatementAndVerify( + connection, statement, "EXECUTE stmt3 USING 2, 150.0", expectedHeader, retArray); + // Deallocate + statement.execute("DEALLOCATE PREPARE stmt3"); + } catch (SQLException e) { + fail("testPrepareWithMultipleParameters failed: " + e.getMessage()); + } + } + + @Test + public void testExecuteImmediate() { + String[] expectedHeader = new String[] {"time", "id", "name", "value"}; + String[] retArray = new String[] {"2025-01-01T00:03:00.000Z,4,David,400.2,"}; + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + statement.execute("USE " + DATABASE_NAME); + // Execute immediate with SQL string and parameters + tableResultSetEqualTest( + "EXECUTE IMMEDIATE 'SELECT * FROM test_table WHERE id = ?' USING 4", + expectedHeader, + retArray, + DATABASE_NAME); + } catch (SQLException e) { + fail("testExecuteImmediate failed: " + e.getMessage()); + } + } + + @Test + public void testExecuteImmediateWithoutParameters() { + String[] expectedHeader = new String[] {"_col0"}; + String[] retArray = new String[] {"5,"}; + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + statement.execute("USE " + DATABASE_NAME); + // Execute immediate without parameters + tableResultSetEqualTest( + "EXECUTE IMMEDIATE 'SELECT COUNT(*) FROM test_table'", + expectedHeader, + retArray, + DATABASE_NAME); + } catch (SQLException e) { + fail("testExecuteImmediateWithoutParameters failed: " + e.getMessage()); + } + } + + @Test + public void testExecuteImmediateWithMultipleParameters() { + String[] expectedHeader = new String[] {"time", "id", "name", "value"}; + String[] retArray = new String[] {"2025-01-01T00:04:00.000Z,5,Eve,500.9,"}; + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + statement.execute("USE " + DATABASE_NAME); + // Execute immediate with multiple parameters + tableResultSetEqualTest( + "EXECUTE IMMEDIATE 'SELECT * FROM test_table WHERE id = ? AND value > ?' USING 5, 450.0", + expectedHeader, + retArray, + DATABASE_NAME); + } catch (SQLException e) { + fail("testExecuteImmediateWithMultipleParameters failed: " + e.getMessage()); + } + } + + @Test + public void testDeallocateNonExistentStatement() { + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + statement.execute("USE " + DATABASE_NAME); + // Try to deallocate a non-existent statement + SQLException exception = + assertThrows( + SQLException.class, () -> statement.execute("DEALLOCATE PREPARE non_existent_stmt")); + assertTrue( + exception.getMessage().contains("does not exist") + || exception.getMessage().contains("Prepared statement")); + } catch (SQLException e) { + fail("testDeallocateNonExistentStatement failed: " + e.getMessage()); + } + } + + @Test + public void testExecuteNonExistentStatement() { + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + statement.execute("USE " + DATABASE_NAME); + // Try to execute a non-existent statement + SQLException exception = + assertThrows( + SQLException.class, () -> statement.execute("EXECUTE non_existent_stmt USING 1")); + assertTrue( + exception.getMessage().contains("does not exist") + || exception.getMessage().contains("Prepared statement")); + } catch (SQLException e) { + fail("testExecuteNonExistentStatement failed: " + e.getMessage()); + } + } + + @Test + public void testMultiplePreparedStatements() { + String[] expectedHeader1 = new String[] {"time", "id", "name", "value"}; + String[] retArray1 = new String[] {"2025-01-01T00:00:00.000Z,1,Alice,100.5,"}; + String[] expectedHeader2 = new String[] {"_col0"}; + String[] retArray2 = new String[] {"4,"}; + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + statement.execute("USE " + DATABASE_NAME); + // Prepare multiple statements + statement.execute("PREPARE stmt4 FROM SELECT * FROM test_table WHERE id = ?"); + statement.execute("PREPARE stmt5 FROM SELECT COUNT(*) FROM test_table WHERE value > ?"); + // Execute both statements using write connection directly + executePreparedStatementAndVerify( + connection, statement, "EXECUTE stmt4 USING 1", expectedHeader1, retArray1); + executePreparedStatementAndVerify( + connection, statement, "EXECUTE stmt5 USING 200.0", expectedHeader2, retArray2); + // Deallocate both + statement.execute("DEALLOCATE PREPARE stmt4"); + statement.execute("DEALLOCATE PREPARE stmt5"); + } catch (SQLException e) { + fail("testMultiplePreparedStatements failed: " + e.getMessage()); + } + } + + @Test + public void testPrepareDuplicateName() { + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + statement.execute("USE " + DATABASE_NAME); + // Prepare a statement + statement.execute("PREPARE stmt6 FROM SELECT * FROM test_table WHERE id = ?"); + // Try to prepare another statement with the same name + SQLException exception = + assertThrows( + SQLException.class, + () -> statement.execute("PREPARE stmt6 FROM SELECT * FROM test_table WHERE id = ?")); + assertTrue( + exception.getMessage().contains("already exists") + || exception.getMessage().contains("Prepared statement")); + // Cleanup + statement.execute("DEALLOCATE PREPARE stmt6"); + } catch (SQLException e) { + fail("testPrepareDuplicateName failed: " + e.getMessage()); + } + } + + @Test + public void testPrepareAndExecuteWithAggregation() { + String[] expectedHeader = new String[] {"_col0"}; + String[] retArray = new String[] {"300.40000000000003,"}; + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + statement.execute("USE " + DATABASE_NAME); + // Prepare a statement with aggregation + statement.execute( + "PREPARE stmt7 FROM SELECT AVG(value) FROM test_table WHERE id >= ? AND id <= ?"); + // Execute with parameters using write connection directly + executePreparedStatementAndVerify( + connection, statement, "EXECUTE stmt7 USING 2, 4", expectedHeader, retArray); + // Deallocate + statement.execute("DEALLOCATE PREPARE stmt7"); + } catch (SQLException e) { + fail("testPrepareAndExecuteWithAggregation failed: " + e.getMessage()); + } + } + + @Test + public void testPrepareAndExecuteWithStringParameter() { + String[] expectedHeader = new String[] {"time", "id", "name", "value"}; + String[] retArray = new String[] {"2025-01-01T00:02:00.000Z,3,Charlie,300.7,"}; + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + statement.execute("USE " + DATABASE_NAME); + // Prepare a statement with string parameter + statement.execute("PREPARE stmt8 FROM SELECT * FROM test_table WHERE name = ?"); + // Execute with string parameter using write connection directly + executePreparedStatementAndVerify( + connection, statement, "EXECUTE stmt8 USING 'Charlie'", expectedHeader, retArray); + // Deallocate + statement.execute("DEALLOCATE PREPARE stmt8"); + } catch (SQLException e) { + fail("testPrepareAndExecuteWithStringParameter failed: " + e.getMessage()); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/ClientSession.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/ClientSession.java index 6aa862b9242..ea90fbafecc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/ClientSession.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/ClientSession.java @@ -33,6 +33,9 @@ public class ClientSession extends IClientSession { private final Map<Long, Set<Long>> statementIdToQueryId = new ConcurrentHashMap<>(); + // Map from statement name to PreparedStatementInfo + private final Map<String, PreparedStatementInfo> preparedStatements = new ConcurrentHashMap<>(); + public ClientSession(Socket clientSocket) { this.clientSocket = clientSocket; } @@ -103,4 +106,24 @@ public class ClientSession extends IClientSession { } } } + + @Override + public void addPreparedStatement(String statementName, PreparedStatementInfo info) { + preparedStatements.put(statementName, info); + } + + @Override + public PreparedStatementInfo removePreparedStatement(String statementName) { + return preparedStatements.remove(statementName); + } + + @Override + public PreparedStatementInfo getPreparedStatement(String statementName) { + return preparedStatements.get(statementName); + } + + @Override + public Set<String> getPreparedStatementNames() { + return preparedStatements.keySet(); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/IClientSession.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/IClientSession.java index 351806de099..97585673e82 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/IClientSession.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/IClientSession.java @@ -188,6 +188,37 @@ public abstract class IClientSession { this.databaseName = databaseName; } + /** + * Add a prepared statement to this session. + * + * @param statementName the name of the prepared statement + * @param info the prepared statement information + */ + public abstract void addPreparedStatement(String statementName, PreparedStatementInfo info); + + /** + * Remove a prepared statement from this session. + * + * @param statementName the name of the prepared statement + * @return the removed prepared statement info, or null if not found + */ + public abstract PreparedStatementInfo removePreparedStatement(String statementName); + + /** + * Get a prepared statement from this session. + * + * @param statementName the name of the prepared statement + * @return the prepared statement info, or null if not found + */ + public abstract PreparedStatementInfo getPreparedStatement(String statementName); + + /** + * Get all prepared statement names in this session. + * + * @return set of prepared statement names + */ + public abstract Set<String> getPreparedStatementNames(); + public long getLastActiveTime() { return lastActiveTime; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/InternalClientSession.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/InternalClientSession.java index 3c72d083a8c..ed87d0b0ee3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/InternalClientSession.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/InternalClientSession.java @@ -88,4 +88,28 @@ public class InternalClientSession extends IClientSession { public void removeQueryId(Long statementId, Long queryId) { ClientSession.removeQueryId(statementIdToQueryId, statementId, queryId); } + + @Override + public void addPreparedStatement(String statementName, PreparedStatementInfo info) { + throw new UnsupportedOperationException( + "InternalClientSession should never call PREPARE statement methods."); + } + + @Override + public PreparedStatementInfo removePreparedStatement(String statementName) { + throw new UnsupportedOperationException( + "InternalClientSession should never call PREPARE statement methods."); + } + + @Override + public PreparedStatementInfo getPreparedStatement(String statementName) { + throw new UnsupportedOperationException( + "InternalClientSession should never call PREPARE statement methods."); + } + + @Override + public Set<String> getPreparedStatementNames() { + throw new UnsupportedOperationException( + "InternalClientSession should never call PREPARE statement methods."); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/MqttClientSession.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/MqttClientSession.java index ae9e2cd0361..c0b68e885a1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/MqttClientSession.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/MqttClientSession.java @@ -76,4 +76,28 @@ public class MqttClientSession extends IClientSession { public void removeQueryId(Long statementId, Long queryId) { throw new UnsupportedOperationException(); } + + @Override + public void addPreparedStatement(String statementName, PreparedStatementInfo info) { + throw new UnsupportedOperationException( + "MQTT client session does not support PREPARE statement."); + } + + @Override + public PreparedStatementInfo removePreparedStatement(String statementName) { + throw new UnsupportedOperationException( + "MQTT client session does not support PREPARE statement."); + } + + @Override + public PreparedStatementInfo getPreparedStatement(String statementName) { + throw new UnsupportedOperationException( + "MQTT client session does not support PREPARE statement."); + } + + @Override + public Set<String> getPreparedStatementNames() { + throw new UnsupportedOperationException( + "MQTT client session does not support PREPARE statement."); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/PreparedStatementInfo.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/PreparedStatementInfo.java new file mode 100644 index 00000000000..0bfc750c4ba --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/PreparedStatementInfo.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.protocol.session; + +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement; + +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +/** + * Information about a prepared statement stored in a session. The AST is cached here to avoid + * reparsing on EXECUTE. + */ +public class PreparedStatementInfo { + + private final String statementName; + private final Statement sql; // Cached AST (contains Parameter nodes) + private final long createTime; + private final long memorySizeInBytes; // Memory size allocated for this PreparedStatement + + public PreparedStatementInfo(String statementName, Statement sql, long memorySizeInBytes) { + this.statementName = requireNonNull(statementName, "statementName is null"); + this.sql = requireNonNull(sql, "sql is null"); + this.createTime = System.currentTimeMillis(); + this.memorySizeInBytes = memorySizeInBytes; + } + + public PreparedStatementInfo( + String statementName, Statement sql, long createTime, long memorySizeInBytes) { + this.statementName = requireNonNull(statementName, "statementName is null"); + this.sql = requireNonNull(sql, "sql is null"); + this.createTime = createTime; + this.memorySizeInBytes = memorySizeInBytes; + } + + public String getStatementName() { + return statementName; + } + + public Statement getSql() { + return sql; + } + + public long getCreateTime() { + return createTime; + } + + public long getMemorySizeInBytes() { + return memorySizeInBytes; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PreparedStatementInfo that = (PreparedStatementInfo) o; + return Objects.equals(statementName, that.statementName) && Objects.equals(sql, that.sql); + } + + @Override + public int hashCode() { + return Objects.hash(statementName, sql); + } + + @Override + public String toString() { + return "PreparedStatementInfo{" + + "statementName='" + + statementName + + '\'' + + ", sql=" + + sql + + ", createTime=" + + createTime + + '}'; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/RestClientSession.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/RestClientSession.java index fa830ace3fb..d122c3c7dc5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/RestClientSession.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/RestClientSession.java @@ -22,12 +22,17 @@ package org.apache.iotdb.db.protocol.session; import org.apache.iotdb.service.rpc.thrift.TSConnectionType; import java.util.Collections; +import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; public class RestClientSession extends IClientSession { private final String clientID; + // Map from statement name to PreparedStatementInfo + private final Map<String, PreparedStatementInfo> preparedStatements = new ConcurrentHashMap<>(); + public RestClientSession(String clientID) { this.clientID = clientID; } @@ -76,4 +81,24 @@ public class RestClientSession extends IClientSession { public void removeQueryId(Long statementId, Long queryId) { throw new UnsupportedOperationException(); } + + @Override + public void addPreparedStatement(String statementName, PreparedStatementInfo info) { + preparedStatements.put(statementName, info); + } + + @Override + public PreparedStatementInfo removePreparedStatement(String statementName) { + return preparedStatements.remove(statementName); + } + + @Override + public PreparedStatementInfo getPreparedStatement(String statementName) { + return preparedStatements.get(statementName); + } + + @Override + public Set<String> getPreparedStatementNames() { + return preparedStatements.keySet(); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java index a4a28efa5ec..e5e95d4f82c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java @@ -40,6 +40,7 @@ import org.apache.iotdb.db.protocol.basic.BasicOpenSessionResp; import org.apache.iotdb.db.protocol.thrift.OperationType; import org.apache.iotdb.db.queryengine.common.ConnectionInfo; import org.apache.iotdb.db.queryengine.common.SessionInfo; +import org.apache.iotdb.db.queryengine.plan.execution.config.session.PreparedStatementMemoryManager; import org.apache.iotdb.db.storageengine.dataregion.read.control.QueryResourceManager; import org.apache.iotdb.db.utils.DataNodeAuthUtils; import org.apache.iotdb.metrics.utils.MetricLevel; @@ -276,6 +277,7 @@ public class SessionManager implements SessionManagerMBean { } private void releaseSessionResource(IClientSession session, LongConsumer releaseQueryResource) { + // Release query resources Iterable<Long> statementIds = session.getStatementIds(); if (statementIds != null) { for (Long statementId : statementIds) { @@ -287,6 +289,17 @@ public class SessionManager implements SessionManagerMBean { } } } + + // Release PreparedStatement memory resources + try { + PreparedStatementMemoryManager.getInstance().releaseAllForSession(session); + } catch (Exception e) { + LOGGER.warn( + "Failed to release PreparedStatement resources for session {}: {}", + session, + e.getMessage(), + e); + } } public TSStatus closeOperation( @@ -295,6 +308,7 @@ public class SessionManager implements SessionManagerMBean { long statementId, boolean haveStatementId, boolean haveSetQueryId, + String preparedStatementName, LongConsumer releaseByQueryId) { if (!checkLogin(session)) { return RpcUtils.getStatus( @@ -307,7 +321,7 @@ public class SessionManager implements SessionManagerMBean { if (haveSetQueryId) { this.closeDataset(session, statementId, queryId, releaseByQueryId); } else { - this.closeStatement(session, statementId, releaseByQueryId); + this.closeStatement(session, statementId, preparedStatementName, releaseByQueryId); } return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); } else { @@ -342,14 +356,35 @@ public class SessionManager implements SessionManagerMBean { } public void closeStatement( - IClientSession session, long statementId, LongConsumer releaseByQueryId) { + IClientSession session, + long statementId, + String preparedStatementName, + LongConsumer releaseByQueryId) { Set<Long> queryIdSet = session.removeStatementId(statementId); if (queryIdSet != null) { for (Long queryId : queryIdSet) { releaseByQueryId.accept(queryId); } } - session.removeStatementId(statementId); + + // If preparedStatementName is provided, release the prepared statement resources + if (preparedStatementName != null && !preparedStatementName.isEmpty()) { + try { + PreparedStatementInfo removedInfo = session.removePreparedStatement(preparedStatementName); + if (removedInfo != null) { + // Release the memory allocated for this PreparedStatement + PreparedStatementMemoryManager.getInstance().release(removedInfo.getMemorySizeInBytes()); + } + } catch (Exception e) { + LOGGER.warn( + "Failed to release PreparedStatement '{}' resources when closing statement {} for session {}: {}", + preparedStatementName, + statementId, + session, + e.getMessage(), + e); + } + } } public long requestQueryId(IClientSession session, Long statementId) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/handler/BaseServerContextHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/handler/BaseServerContextHandler.java index e633caa45f6..9b7efd82780 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/handler/BaseServerContextHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/handler/BaseServerContextHandler.java @@ -71,6 +71,7 @@ public class BaseServerContextHandler { public void deleteContext(ServerContext context, TProtocol in, TProtocol out) { getSessionManager().removeCurrSession(); + if (context != null && factory != null) { ((JudgableServerContext) context).whenDisconnect(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index ea7646efb2b..ad16535bf57 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -1445,6 +1445,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { req.statementId, req.isSetStatementId(), req.isSetQueryId(), + req.isSetPreparedStatementName() ? req.getPreparedStatementName() : null, COORDINATOR::cleanupQueryExecution); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java index f10600cbda9..7708f6c18cd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java @@ -29,10 +29,14 @@ import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.conf.CommonConfig; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.commons.memory.IMemoryBlock; +import org.apache.iotdb.commons.memory.MemoryBlockType; import org.apache.iotdb.db.auth.AuthorityChecker; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.exception.sql.SemanticException; import org.apache.iotdb.db.protocol.session.IClientSession; +import org.apache.iotdb.db.protocol.session.PreparedStatementInfo; import org.apache.iotdb.db.queryengine.common.DataNodeEndPoints; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.QueryId; @@ -49,6 +53,7 @@ import org.apache.iotdb.db.queryengine.plan.execution.config.TableConfigTaskVisi import org.apache.iotdb.db.queryengine.plan.execution.config.TreeConfigTaskVisitor; import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner; import org.apache.iotdb.db.queryengine.plan.planner.TreeModelPlanner; +import org.apache.iotdb.db.queryengine.plan.relational.analyzer.NodeRef; import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; import org.apache.iotdb.db.queryengine.plan.relational.planner.PlannerContext; import org.apache.iotdb.db.queryengine.plan.relational.planner.TableModelPlanner; @@ -56,6 +61,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.Dat import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.DistributedOptimizeFactory; import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.LogicalOptimizeFactory; import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PlanOptimizer; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ParameterExtractor; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AddColumn; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AlterDB; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ClearCache; @@ -64,6 +70,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateFunction; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateModel; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateTable; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateTraining; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Deallocate; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DeleteDevice; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DescribeTable; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropColumn; @@ -71,13 +78,19 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropDB; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropFunction; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropModel; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropTable; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Execute; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ExecuteImmediate; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ExtendRegion; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Flush; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.KillQuery; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Literal; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadConfiguration; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadModel; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.MigrateRegion; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Parameter; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.PipeStatement; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Prepare; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ReconstructRegion; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RelationalAuthorStatement; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RemoveAINode; @@ -132,7 +145,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; @@ -184,6 +199,8 @@ public class Coordinator { private static final Coordinator INSTANCE = new Coordinator(); + private static final IMemoryBlock coordinatorMemoryBlock; + private final ConcurrentHashMap<Long, IQueryExecution> queryExecutionMap; private final StatementRewrite statementRewrite; @@ -192,6 +209,20 @@ public class Coordinator { private final DataNodeLocationSupplierFactory.DataNodeLocationSupplier dataNodeLocationSupplier; private final TypeManager typeManager; + static { + coordinatorMemoryBlock = + IoTDBDescriptor.getInstance() + .getMemoryConfig() + .getCoordinatorMemoryManager() + .exactAllocate("Coordinator", MemoryBlockType.DYNAMIC); + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "Initialized shared MemoryBlock 'Coordinator' with all available memory: {} bytes", + coordinatorMemoryBlock.getTotalMemorySizeInBytes()); + } + } + private Coordinator() { this.queryExecutionMap = new ConcurrentHashMap<>(); this.typeManager = new InternalTypeManager(); @@ -401,6 +432,8 @@ public class Coordinator { distributionPlanOptimizers, AuthorityChecker.getAccessControl(), dataNodeLocationSupplier, + Collections.emptyList(), + Collections.emptyMap(), typeManager); return new QueryExecution(tableModelPlanner, queryContext, executor); } @@ -475,7 +508,9 @@ public class Coordinator { || statement instanceof LoadModel || statement instanceof UnloadModel || statement instanceof ShowLoadedModels - || statement instanceof RemoveRegion) { + || statement instanceof RemoveRegion + || statement instanceof Prepare + || statement instanceof Deallocate) { return new ConfigExecution( queryContext, null, @@ -485,12 +520,54 @@ public class Coordinator { clientSession, metadata, AuthorityChecker.getAccessControl(), typeManager), queryContext)); } + // Initialize variables for TableModelPlanner + org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement statementToUse = statement; + List<Expression> parameters = Collections.emptyList(); + Map<NodeRef<Parameter>, Expression> parameterLookup = Collections.emptyMap(); + + if (statement instanceof Execute) { + Execute executeStatement = (Execute) statement; + String statementName = executeStatement.getStatementName().getValue(); + + // Get prepared statement from session (contains cached AST) + PreparedStatementInfo preparedInfo = clientSession.getPreparedStatement(statementName); + if (preparedInfo == null) { + throw new SemanticException( + String.format("Prepared statement '%s' does not exist", statementName)); + } + + // Use cached AST + statementToUse = preparedInfo.getSql(); + + // Bind parameters: create parameterLookup map + // Note: bindParameters() internally validates parameter count + parameterLookup = + ParameterExtractor.bindParameters(statementToUse, executeStatement.getParameters()); + parameters = new ArrayList<>(executeStatement.getParameters()); + + } else if (statement instanceof ExecuteImmediate) { + ExecuteImmediate executeImmediateStatement = (ExecuteImmediate) statement; + + // EXECUTE IMMEDIATE needs to parse SQL first + String sql = executeImmediateStatement.getSqlString(); + List<Literal> literalParameters = executeImmediateStatement.getParameters(); + + statementToUse = sqlParser.createStatement(sql, clientSession.getZoneId(), clientSession); + + if (!literalParameters.isEmpty()) { + parameterLookup = ParameterExtractor.bindParameters(statementToUse, literalParameters); + parameters = new ArrayList<>(literalParameters); + } + } + if (statement instanceof WrappedInsertStatement) { ((WrappedInsertStatement) statement).setContext(queryContext); } - final TableModelPlanner tableModelPlanner = + + // Create QueryExecution with TableModelPlanner + TableModelPlanner tableModelPlanner = new TableModelPlanner( - statement, + statementToUse, sqlParser, metadata, scheduledExecutor, @@ -501,6 +578,8 @@ public class Coordinator { distributionPlanOptimizers, AuthorityChecker.getAccessControl(), dataNodeLocationSupplier, + parameters, + parameterLookup, typeManager); return new QueryExecution(tableModelPlanner, queryContext, executor); } @@ -609,6 +688,10 @@ public class Coordinator { return INSTANCE; } + public static IMemoryBlock getCoordinatorMemoryBlock() { + return coordinatorMemoryBlock; + } + public void recordExecutionTime(long queryId, long executionTime) { IQueryExecution queryExecution = getQueryExecution(queryId); if (queryExecution != null) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java index 3356ed2b584..f9d668944e0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java @@ -101,6 +101,8 @@ import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.ShowTablesDetailsTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.ShowTablesTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.UseDBTask; +import org.apache.iotdb.db.queryengine.plan.execution.config.session.DeallocateTask; +import org.apache.iotdb.db.queryengine.plan.execution.config.session.PrepareTask; import org.apache.iotdb.db.queryengine.plan.execution.config.session.SetSqlDialectTask; import org.apache.iotdb.db.queryengine.plan.execution.config.session.ShowCurrentDatabaseTask; import org.apache.iotdb.db.queryengine.plan.execution.config.session.ShowCurrentSqlDialectTask; @@ -150,6 +152,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateTraining; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateView; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DataType; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DatabaseStatement; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Deallocate; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DeleteDevice; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DescribeTable; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropColumn; @@ -171,6 +174,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadModel; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LongLiteral; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.MigrateRegion; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Node; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Prepare; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Property; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.QualifiedName; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ReconstructRegion; @@ -1375,6 +1379,18 @@ public class TableConfigTaskVisitor extends AstVisitor<IConfigTask, MPPQueryCont return new SetSqlDialectTask(node.getSqlDialect()); } + @Override + protected IConfigTask visitPrepare(Prepare node, MPPQueryContext context) { + context.setQueryType(QueryType.WRITE); + return new PrepareTask(node.getStatementName().getValue(), node.getSql()); + } + + @Override + protected IConfigTask visitDeallocate(Deallocate node, MPPQueryContext context) { + context.setQueryType(QueryType.WRITE); + return new DeallocateTask(node.getStatementName().getValue()); + } + @Override protected IConfigTask visitShowCurrentDatabase( ShowCurrentDatabase node, MPPQueryContext context) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/session/DeallocateTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/session/DeallocateTask.java new file mode 100644 index 00000000000..6f5f3f48461 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/session/DeallocateTask.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.execution.config.session; + +import org.apache.iotdb.db.exception.sql.SemanticException; +import org.apache.iotdb.db.protocol.session.IClientSession; +import org.apache.iotdb.db.protocol.session.PreparedStatementInfo; +import org.apache.iotdb.db.protocol.session.SessionManager; +import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult; +import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask; +import org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor; +import org.apache.iotdb.rpc.TSStatusCode; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; + +/** + * Task for executing DEALLOCATE PREPARE statement. Removes the prepared statement from the session + * and releases its allocated memory. + */ +public class DeallocateTask implements IConfigTask { + + private final String statementName; + + public DeallocateTask(String statementName) { + this.statementName = statementName; + } + + @Override + public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor configTaskExecutor) + throws InterruptedException { + SettableFuture<ConfigTaskResult> future = SettableFuture.create(); + IClientSession session = SessionManager.getInstance().getCurrSession(); + if (session == null) { + future.setException( + new IllegalStateException("No current session available for DEALLOCATE statement")); + return future; + } + + // Remove the prepared statement + PreparedStatementInfo removedInfo = session.removePreparedStatement(statementName); + if (removedInfo == null) { + future.setException( + new SemanticException( + String.format("Prepared statement '%s' does not exist", statementName))); + return future; + } + + // Release the memory allocated for this PreparedStatement from the shared MemoryBlock + PreparedStatementMemoryManager.getInstance().release(removedInfo.getMemorySizeInBytes()); + + future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); + return future; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/session/PrepareTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/session/PrepareTask.java new file mode 100644 index 00000000000..bf61e702c72 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/session/PrepareTask.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.execution.config.session; + +import org.apache.iotdb.db.exception.sql.SemanticException; +import org.apache.iotdb.db.protocol.session.IClientSession; +import org.apache.iotdb.db.protocol.session.PreparedStatementInfo; +import org.apache.iotdb.db.protocol.session.SessionManager; +import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult; +import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask; +import org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor; +import org.apache.iotdb.db.queryengine.plan.relational.sql.AstMemoryEstimator; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement; +import org.apache.iotdb.rpc.TSStatusCode; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; + +/** + * Task for executing PREPARE statement. Stores the prepared statement AST in the session. The AST + * is cached to avoid reparsing on EXECUTE (skipping Parser phase). Memory is allocated from + * CoordinatorMemoryManager and shared across all sessions. + */ +public class PrepareTask implements IConfigTask { + + private final String statementName; + private final Statement sql; // AST containing Parameter nodes + + public PrepareTask(String statementName, Statement sql) { + this.statementName = statementName; + this.sql = sql; + } + + @Override + public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor configTaskExecutor) + throws InterruptedException { + SettableFuture<ConfigTaskResult> future = SettableFuture.create(); + IClientSession session = SessionManager.getInstance().getCurrSession(); + if (session == null) { + future.setException( + new IllegalStateException("No current session available for PREPARE statement")); + return future; + } + + // Check if prepared statement with the same name already exists + PreparedStatementInfo existingInfo = session.getPreparedStatement(statementName); + if (existingInfo != null) { + future.setException( + new SemanticException( + String.format("Prepared statement '%s' already exists.", statementName))); + return future; + } + + // Estimate memory size of the AST + long memorySizeInBytes = AstMemoryEstimator.estimateMemorySize(sql); + + // Allocate memory from CoordinatorMemoryManager + // This memory is shared across all sessions using a single MemoryBlock + PreparedStatementMemoryManager.getInstance().allocate(statementName, memorySizeInBytes); + + // Create and store the prepared statement info (AST is cached) + PreparedStatementInfo info = new PreparedStatementInfo(statementName, sql, memorySizeInBytes); + session.addPreparedStatement(statementName, info); + + future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); + return future; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/session/PreparedStatementMemoryManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/session/PreparedStatementMemoryManager.java new file mode 100644 index 00000000000..9d5a3fb098e --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/session/PreparedStatementMemoryManager.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.execution.config.session; + +import org.apache.iotdb.commons.memory.IMemoryBlock; +import org.apache.iotdb.db.exception.sql.SemanticException; +import org.apache.iotdb.db.protocol.session.IClientSession; +import org.apache.iotdb.db.protocol.session.PreparedStatementInfo; +import org.apache.iotdb.db.queryengine.plan.Coordinator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Set; + +/** + * Memory manager for PreparedStatement. All PreparedStatements from all sessions share a single + * MemoryBlock named "Coordinator" allocated from CoordinatorMemoryManager. The MemoryBlock is + * initialized in Coordinator with all available memory. + */ +public class PreparedStatementMemoryManager { + private static final Logger LOGGER = + LoggerFactory.getLogger(PreparedStatementMemoryManager.class); + + private static final PreparedStatementMemoryManager INSTANCE = + new PreparedStatementMemoryManager(); + + private static final String SHARED_MEMORY_BLOCK_NAME = "Coordinator"; + + private PreparedStatementMemoryManager() { + // singleton + } + + public static PreparedStatementMemoryManager getInstance() { + return INSTANCE; + } + + private IMemoryBlock getSharedMemoryBlock() { + return Coordinator.getCoordinatorMemoryBlock(); + } + + /** + * Allocate memory for a PreparedStatement. + * + * @param statementName the name of the prepared statement + * @param memorySizeInBytes the memory size in bytes to allocate + * @throws SemanticException if memory allocation fails + */ + public void allocate(String statementName, long memorySizeInBytes) { + IMemoryBlock sharedMemoryBlock = getSharedMemoryBlock(); + // Allocate memory from the shared block + boolean allocated = sharedMemoryBlock.allocate(memorySizeInBytes); + if (!allocated) { + LOGGER.warn( + "Failed to allocate {} bytes from shared MemoryBlock '{}' for PreparedStatement '{}'", + memorySizeInBytes, + SHARED_MEMORY_BLOCK_NAME, + statementName); + throw new SemanticException( + String.format( + "Insufficient memory for PreparedStatement '%s'. " + + "Please deallocate some PreparedStatements and try again.", + statementName)); + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "Allocated {} bytes for PreparedStatement '{}' from shared MemoryBlock '{}'. ", + memorySizeInBytes, + statementName, + SHARED_MEMORY_BLOCK_NAME); + } + } + + /** + * Release memory for a PreparedStatement. + * + * @param memorySizeInBytes the memory size in bytes to release + */ + public void release(long memorySizeInBytes) { + if (memorySizeInBytes <= 0) { + return; + } + + IMemoryBlock sharedMemoryBlock = getSharedMemoryBlock(); + if (!sharedMemoryBlock.isReleased()) { + long releasedSize = sharedMemoryBlock.release(memorySizeInBytes); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "Released {} bytes from shared MemoryBlock '{}' for PreparedStatement. ", + releasedSize, + SHARED_MEMORY_BLOCK_NAME); + } + } else { + LOGGER.error( + "Attempted to release memory from shared MemoryBlock '{}' but it is released", + SHARED_MEMORY_BLOCK_NAME); + } + } + + /** + * Release all PreparedStatements for a session. This method should be called when a session is + * closed or connection is lost. + * + * @param session the session whose PreparedStatements should be released + */ + public void releaseAllForSession(IClientSession session) { + if (session == null) { + return; + } + + Set<String> preparedStatementNames = session.getPreparedStatementNames(); + if (preparedStatementNames == null || preparedStatementNames.isEmpty()) { + return; + } + + int releasedCount = 0; + long totalReleasedBytes = 0; + + for (String statementName : preparedStatementNames) { + PreparedStatementInfo info = session.getPreparedStatement(statementName); + if (info != null) { + long memorySize = info.getMemorySizeInBytes(); + if (memorySize > 0) { + release(memorySize); + releasedCount++; + totalReleasedBytes += memorySize; + } + } + } + + if (releasedCount > 0 && LOGGER.isDebugEnabled()) { + LOGGER.debug( + "Released {} PreparedStatement(s) ({} bytes total) for session {}", + releasedCount, + totalReleasedBytes, + session); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java index 5acf353e32e..73c57157f72 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java @@ -46,7 +46,6 @@ import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName; import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableMetadataImpl; import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema; -import org.apache.iotdb.db.queryengine.plan.relational.planner.IrExpressionInterpreter; import org.apache.iotdb.db.queryengine.plan.relational.planner.PlannerContext; import org.apache.iotdb.db.queryengine.plan.relational.planner.ScopeAware; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; @@ -133,6 +132,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.NotExpression; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.NullIfExpression; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Offset; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.OrderBy; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Parameter; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.PatternRecognitionRelation; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.PipeEnriched; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Property; @@ -211,6 +211,7 @@ import org.apache.iotdb.udf.api.relational.table.specification.ScalarParameterSp import org.apache.iotdb.udf.api.relational.table.specification.TableParameterSpecification; import com.google.common.base.Joiner; +import com.google.common.base.VerifyException; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -281,6 +282,7 @@ import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.Scope.Bas import static org.apache.iotdb.db.queryengine.plan.relational.function.tvf.ForecastTableFunction.TIMECOL_PARAMETER_NAME; import static org.apache.iotdb.db.queryengine.plan.relational.metadata.MetadataUtil.createQualifiedObjectName; import static org.apache.iotdb.db.queryengine.plan.relational.metadata.TableMetadataImpl.isTimestampType; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.IrExpressionInterpreter.evaluateConstantExpression; import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DereferenceExpression.getQualifiedName; import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Join.Type.FULL; import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Join.Type.INNER; @@ -3981,15 +3983,12 @@ public class StatementAnalyzer { if (node.getRowCount() instanceof LongLiteral) { rowCount = ((LongLiteral) node.getRowCount()).getParsedValue(); } else { - // checkState( - // node.getRowCount() instanceof Parameter, - // "unexpected OFFSET rowCount: " + - // node.getRowCount().getClass().getSimpleName()); - throw new SemanticException( + checkState( + node.getRowCount() instanceof Parameter, "unexpected OFFSET rowCount: " + node.getRowCount().getClass().getSimpleName()); - // OptionalLong providedValue = - // analyzeParameterAsRowCount((Parameter) node.getRowCount(), scope, "OFFSET"); - // rowCount = providedValue.orElse(0); + OptionalLong providedValue = + analyzeParameterAsRowCount((Parameter) node.getRowCount(), scope, "OFFSET"); + rowCount = providedValue.orElse(0); } if (rowCount < 0) { throw new SemanticException( @@ -4050,14 +4049,10 @@ public class StatementAnalyzer { } else if (node.getRowCount() instanceof LongLiteral) { rowCount = OptionalLong.of(((LongLiteral) node.getRowCount()).getParsedValue()); } else { - // checkState( - // node.getRowCount() instanceof Parameter, - // "unexpected LIMIT rowCount: " + - // node.getRowCount().getClass().getSimpleName()); - throw new SemanticException( + checkState( + node.getRowCount() instanceof Parameter, "unexpected LIMIT rowCount: " + node.getRowCount().getClass().getSimpleName()); - // rowCount = analyzeParameterAsRowCount((Parameter) node.getRowCount(), scope, - // "LIMIT"); + rowCount = analyzeParameterAsRowCount((Parameter) node.getRowCount(), scope, "LIMIT"); } rowCount.ifPresent( count -> { @@ -4073,32 +4068,27 @@ public class StatementAnalyzer { return false; } - // private OptionalLong analyzeParameterAsRowCount( - // Parameter parameter, Scope scope, String context) { - // // validate parameter index - // analyzeExpression(parameter, scope); - // Expression providedValue = analysis.getParameters().get(NodeRef.of(parameter)); - // Object value; - // try { - // value = - // evaluateConstantExpression( - // providedValue, - // BIGINT, - // plannerContext, - // session, - // accessControl, - // analysis.getParameters()); - // } catch (VerifyException e) { - // throw new SemanticException( - // String.format("Non constant parameter value for %s: %s", context, providedValue)); - // } - // if (value == null) { - // throw new SemanticException( - // String.format("Parameter value provided for %s is NULL: %s", context, - // providedValue)); - // } - // return OptionalLong.of((long) value); - // } + private OptionalLong analyzeParameterAsRowCount( + Parameter parameter, Scope scope, String context) { + // validate parameter index + analyzeExpression(parameter, scope); + Expression providedValue = analysis.getParameters().get(NodeRef.of(parameter)); + Object value; + try { + value = + evaluateConstantExpression( + providedValue, new PlannerContext(metadata, typeManager), sessionContext); + + } catch (VerifyException e) { + throw new SemanticException( + String.format("Non constant parameter value for %s: %s", context, providedValue)); + } + if (value == null) { + throw new SemanticException( + String.format("Parameter value provided for %s is NULL: %s", context, providedValue)); + } + return OptionalLong.of((long) value); + } private void analyzeAggregations( QuerySpecification node, @@ -5191,7 +5181,7 @@ public class StatementAnalyzer { Expression expression, ScalarParameterSpecification argumentSpecification) { // currently, only constant arguments are supported Object constantValue = - IrExpressionInterpreter.evaluateConstantExpression( + evaluateConstantExpression( expression, new PlannerContext(metadata, typeManager), sessionContext); if (!argumentSpecification.getType().checkObjectType(constantValue)) { if ((argumentSpecification.getType().equals(org.apache.iotdb.udf.api.type.Type.STRING) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java index fddff825f0d..78f9729ece3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java @@ -35,13 +35,16 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan; import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan; import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis; import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analyzer; +import org.apache.iotdb.db.queryengine.plan.relational.analyzer.NodeRef; import org.apache.iotdb.db.queryengine.plan.relational.analyzer.StatementAnalyzerFactory; import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; import org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.TableDistributedPlanner; import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.DataNodeLocationSupplierFactory; import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PlanOptimizer; import org.apache.iotdb.db.queryengine.plan.relational.security.AccessControl; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Parameter; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.PipeEnriched; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WrappedInsertStatement; @@ -57,8 +60,8 @@ import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import java.util.ArrayList; -import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.ScheduledExecutorService; import static org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet.DISTRIBUTION_PLANNER; @@ -88,6 +91,9 @@ public class TableModelPlanner implements IPlanner { private final DataNodeLocationSupplierFactory.DataNodeLocationSupplier dataNodeLocationSupplier; + // Parameters for prepared statements (optional) + private final List<Expression> parameters; + private final Map<NodeRef<Parameter>, Expression> parameterLookup; private final TypeManager typeManager; public TableModelPlanner( @@ -104,6 +110,8 @@ public class TableModelPlanner implements IPlanner { final List<PlanOptimizer> distributionPlanOptimizers, final AccessControl accessControl, final DataNodeLocationSupplierFactory.DataNodeLocationSupplier dataNodeLocationSupplier, + final List<Expression> parameters, + final Map<NodeRef<Parameter>, Expression> parameterLookup, final TypeManager typeManager) { this.statement = statement; this.sqlParser = sqlParser; @@ -116,6 +124,8 @@ public class TableModelPlanner implements IPlanner { this.distributionPlanOptimizers = distributionPlanOptimizers; this.accessControl = accessControl; this.dataNodeLocationSupplier = dataNodeLocationSupplier; + this.parameters = parameters; + this.parameterLookup = parameterLookup; this.typeManager = typeManager; } @@ -125,8 +135,8 @@ public class TableModelPlanner implements IPlanner { context, context.getSession(), new StatementAnalyzerFactory(metadata, sqlParser, accessControl, typeManager), - Collections.emptyList(), - Collections.emptyMap(), + parameters, + parameterLookup, statementRewrite, warningCollector) .analyze(statement); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/AstMemoryEstimator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/AstMemoryEstimator.java new file mode 100644 index 00000000000..d45f6546e0f --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/AstMemoryEstimator.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.sql; + +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DefaultTraversalVisitor; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Node; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement; + +import org.apache.tsfile.utils.RamUsageEstimator; + +/** + * Utility class for estimating memory usage of AST nodes. Uses RamUsageEstimator to calculate + * approximate memory size. + */ +public final class AstMemoryEstimator { + private AstMemoryEstimator() {} + + /** + * Estimate the memory size of a Statement AST node in bytes. + * + * @param statement the statement AST to estimate + * @return estimated memory size in bytes + */ + public static long estimateMemorySize(Statement statement) { + if (statement == null) { + return 0L; + } + MemoryEstimatingVisitor visitor = new MemoryEstimatingVisitor(); + visitor.process(statement, null); + return visitor.getTotalMemorySize(); + } + + private static class MemoryEstimatingVisitor extends DefaultTraversalVisitor<Void> { + private long totalMemorySize = 0L; + + public long getTotalMemorySize() { + return totalMemorySize; + } + + @Override + protected Void visitNode(Node node, Void context) { + // Estimate shallow size of the node object + long nodeSize = RamUsageEstimator.shallowSizeOfInstance(node.getClass()); + totalMemorySize += nodeSize; + + // Traverse children (DefaultTraversalVisitor handles this) + return super.visitNode(node, context); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ParameterExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ParameterExtractor.java new file mode 100644 index 00000000000..d727acdc35e --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ParameterExtractor.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.queryengine.plan.relational.sql; + +import org.apache.iotdb.db.exception.sql.SemanticException; +import org.apache.iotdb.db.queryengine.plan.relational.analyzer.NodeRef; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DefaultTraversalVisitor; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Literal; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Parameter; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement; + +import com.google.common.collect.ImmutableMap; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import static com.google.common.collect.ImmutableList.toImmutableList; + +/** Utility class for extracting and binding parameters in prepared statements. */ +public final class ParameterExtractor { + private ParameterExtractor() {} + + /** + * Get the number of parameters in a statement. + * + * @param statement the statement to analyze + * @return the number of parameters + */ + public static int getParameterCount(Statement statement) { + return extractParameters(statement).size(); + } + + /** + * Extract all Parameter nodes from a statement in order of appearance. + * + * @param statement the statement to analyze + * @return list of Parameter nodes in order of appearance + */ + public static List<Parameter> extractParameters(Statement statement) { + ParameterExtractingVisitor visitor = new ParameterExtractingVisitor(); + visitor.process(statement, null); + return visitor.getParameters().stream() + .sorted( + Comparator.comparing( + parameter -> + parameter + .getLocation() + .orElseThrow( + () -> new SemanticException("Parameter node must have a location")), + Comparator.comparing( + org.apache.iotdb.db.queryengine.plan.relational.sql.ast.NodeLocation + ::getLineNumber) + .thenComparing( + org.apache.iotdb.db.queryengine.plan.relational.sql.ast.NodeLocation + ::getColumnNumber))) + .collect(toImmutableList()); + } + + /** + * Bind parameter values to Parameter nodes in a statement. Creates a map from Parameter node + * references to their corresponding Expression values. + * + * @param statement the statement containing Parameter nodes + * @param values the parameter values (in order) + * @return map from Parameter node references to Expression values + * @throws SemanticException if the number of parameters doesn't match + */ + public static Map<NodeRef<Parameter>, Expression> bindParameters( + Statement statement, List<Literal> values) { + List<Parameter> parametersList = extractParameters(statement); + + // Validate parameter count + if (parametersList.size() != values.size()) { + throw new SemanticException( + String.format( + "Invalid number of parameters: expected %d, got %d", + parametersList.size(), values.size())); + } + + ImmutableMap.Builder<NodeRef<Parameter>, Expression> builder = ImmutableMap.builder(); + Iterator<Literal> iterator = values.iterator(); + for (Parameter parameter : parametersList) { + builder.put(NodeRef.of(parameter), iterator.next()); + } + return builder.buildOrThrow(); + } + + private static class ParameterExtractingVisitor extends DefaultTraversalVisitor<Void> { + private final List<Parameter> parameters = new ArrayList<>(); + + public List<Parameter> getParameters() { + return parameters; + } + + @Override + protected Void visitParameter(Parameter node, Void context) { + parameters.add(node); + return null; + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java index 54802a1d2f6..2728750418f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java @@ -93,6 +93,22 @@ public abstract class AstVisitor<R, C> { return visitStatement(node, context); } + protected R visitPrepare(Prepare node, C context) { + return visitStatement(node, context); + } + + protected R visitExecute(Execute node, C context) { + return visitStatement(node, context); + } + + protected R visitExecuteImmediate(ExecuteImmediate node, C context) { + return visitStatement(node, context); + } + + protected R visitDeallocate(Deallocate node, C context) { + return visitStatement(node, context); + } + protected R visitGenericLiteral(GenericLiteral node, C context) { return visitLiteral(node, context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Deallocate.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Deallocate.java new file mode 100644 index 00000000000..fd579bb64b5 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Deallocate.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.sql.ast; + +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Objects; + +import static com.google.common.base.MoreObjects.toStringHelper; +import static java.util.Objects.requireNonNull; + +/** DEALLOCATE PREPARE statement AST node. Example: DEALLOCATE PREPARE stmt1 */ +public final class Deallocate extends Statement { + + private final Identifier statementName; + + public Deallocate(Identifier statementName) { + this(null, statementName); + } + + public Deallocate(NodeLocation location, Identifier statementName) { + super(location); + this.statementName = requireNonNull(statementName, "statementName is null"); + } + + public Identifier getStatementName() { + return statementName; + } + + @Override + public <R, C> R accept(AstVisitor<R, C> visitor, C context) { + return visitor.visitDeallocate(this, context); + } + + @Override + public List<Node> getChildren() { + return ImmutableList.of(statementName); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Deallocate that = (Deallocate) o; + return Objects.equals(statementName, that.statementName); + } + + @Override + public int hashCode() { + return Objects.hash(statementName); + } + + @Override + public String toString() { + return toStringHelper(this).add("statementName", statementName).toString(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Execute.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Execute.java new file mode 100644 index 00000000000..d7e219faf1b --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Execute.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.sql.ast; + +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Objects; + +import static com.google.common.base.MoreObjects.toStringHelper; +import static java.util.Objects.requireNonNull; + +/** EXECUTE statement AST node. Example: EXECUTE stmt1 USING 100, 'test' */ +public final class Execute extends Statement { + + private final Identifier statementName; + private final List<Literal> parameters; + + public Execute(Identifier statementName) { + this(null, statementName, ImmutableList.of()); + } + + public Execute(Identifier statementName, List<Literal> parameters) { + this(null, statementName, parameters); + } + + public Execute(NodeLocation location, Identifier statementName, List<Literal> parameters) { + super(location); + this.statementName = requireNonNull(statementName, "statementName is null"); + this.parameters = ImmutableList.copyOf(requireNonNull(parameters, "parameters is null")); + } + + public Identifier getStatementName() { + return statementName; + } + + public List<Literal> getParameters() { + return parameters; + } + + @Override + public <R, C> R accept(AstVisitor<R, C> visitor, C context) { + return visitor.visitExecute(this, context); + } + + @Override + public List<Node> getChildren() { + ImmutableList.Builder<Node> children = ImmutableList.builder(); + children.add(statementName); + children.addAll(parameters); + return children.build(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Execute that = (Execute) o; + return Objects.equals(statementName, that.statementName) + && Objects.equals(parameters, that.parameters); + } + + @Override + public int hashCode() { + return Objects.hash(statementName, parameters); + } + + @Override + public String toString() { + return toStringHelper(this) + .add("statementName", statementName) + .add("parameters", parameters) + .toString(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ExecuteImmediate.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ExecuteImmediate.java new file mode 100644 index 00000000000..955ac54e4fb --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ExecuteImmediate.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.sql.ast; + +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Objects; + +import static com.google.common.base.MoreObjects.toStringHelper; +import static java.util.Objects.requireNonNull; + +/** + * EXECUTE IMMEDIATE statement AST node. Example: EXECUTE IMMEDIATE 'SELECT * FROM table WHERE id = + * 100' + */ +public final class ExecuteImmediate extends Statement { + + private final StringLiteral sql; + private final List<Literal> parameters; + + public ExecuteImmediate(StringLiteral sql) { + this(null, sql, ImmutableList.of()); + } + + public ExecuteImmediate(StringLiteral sql, List<Literal> parameters) { + this(null, sql, parameters); + } + + public ExecuteImmediate(NodeLocation location, StringLiteral sql, List<Literal> parameters) { + super(location); + this.sql = requireNonNull(sql, "sql is null"); + this.parameters = ImmutableList.copyOf(requireNonNull(parameters, "parameters is null")); + } + + public StringLiteral getSql() { + return sql; + } + + public String getSqlString() { + return sql.getValue(); + } + + public List<Literal> getParameters() { + return parameters; + } + + @Override + public <R, C> R accept(AstVisitor<R, C> visitor, C context) { + return visitor.visitExecuteImmediate(this, context); + } + + @Override + public List<Node> getChildren() { + ImmutableList.Builder<Node> children = ImmutableList.builder(); + children.add(sql); + children.addAll(parameters); + return children.build(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ExecuteImmediate that = (ExecuteImmediate) o; + return Objects.equals(sql, that.sql) && Objects.equals(parameters, that.parameters); + } + + @Override + public int hashCode() { + return Objects.hash(sql, parameters); + } + + @Override + public String toString() { + return toStringHelper(this).add("sql", sql).add("parameters", parameters).toString(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Prepare.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Prepare.java new file mode 100644 index 00000000000..f413b8a1926 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Prepare.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.sql.ast; + +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Objects; + +import static com.google.common.base.MoreObjects.toStringHelper; +import static java.util.Objects.requireNonNull; + +/** PREPARE statement AST node. Example: PREPARE stmt1 FROM SELECT * FROM table WHERE id = ? */ +public final class Prepare extends Statement { + + private final Identifier statementName; + private final Statement sql; + + public Prepare(Identifier statementName, Statement sql) { + super(null); + this.statementName = requireNonNull(statementName, "statementName is null"); + this.sql = requireNonNull(sql, "sql is null"); + } + + public Prepare(NodeLocation location, Identifier statementName, Statement sql) { + super(location); + this.statementName = requireNonNull(statementName, "statementName is null"); + this.sql = requireNonNull(sql, "sql is null"); + } + + public Identifier getStatementName() { + return statementName; + } + + public Statement getSql() { + return sql; + } + + @Override + public <R, C> R accept(AstVisitor<R, C> visitor, C context) { + return visitor.visitPrepare(this, context); + } + + @Override + public List<Node> getChildren() { + return ImmutableList.of(statementName, sql); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Prepare that = (Prepare) o; + return Objects.equals(statementName, that.statementName) && Objects.equals(sql, that.sql); + } + + @Override + public int hashCode() { + return Objects.hash(statementName, sql); + } + + @Override + public String toString() { + return toStringHelper(this).add("statementName", statementName).add("sql", sql).toString(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java index f62b45d8b8a..b041c56b2bc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java @@ -71,6 +71,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CurrentTime; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CurrentUser; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DataType; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DataTypeParameter; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Deallocate; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Delete; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DeleteDevice; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DereferenceExpression; @@ -89,6 +90,8 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropTopic; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.EmptyPattern; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Except; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ExcludedPattern; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Execute; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ExecuteImmediate; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ExistsPredicate; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Explain; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ExplainAnalyze; @@ -145,6 +148,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.PatternQuantifier import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.PatternRecognitionRelation; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.PatternRecognitionRelation.RowsPerMatch; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.PatternVariable; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Prepare; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ProcessingMode; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Property; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.QualifiedName; @@ -3786,6 +3790,40 @@ public class AstBuilder extends RelationalSqlBaseVisitor<Node> { return new DropModel(modelId); } + @Override + public Node visitPrepareStatement(RelationalSqlParser.PrepareStatementContext ctx) { + Identifier statementName = lowerIdentifier((Identifier) visit(ctx.statementName)); + Statement sql = (Statement) visit(ctx.sql); + return new Prepare(getLocation(ctx), statementName, sql); + } + + @Override + public Node visitExecuteStatement(RelationalSqlParser.ExecuteStatementContext ctx) { + Identifier statementName = lowerIdentifier((Identifier) visit(ctx.statementName)); + List<Literal> parameters = + ctx.literalExpression() != null && !ctx.literalExpression().isEmpty() + ? visit(ctx.literalExpression(), Literal.class) + : ImmutableList.of(); + return new Execute(getLocation(ctx), statementName, parameters); + } + + @Override + public Node visitExecuteImmediateStatement( + RelationalSqlParser.ExecuteImmediateStatementContext ctx) { + StringLiteral sql = (StringLiteral) visit(ctx.sql); + List<Literal> parameters = + ctx.literalExpression() != null && !ctx.literalExpression().isEmpty() + ? visit(ctx.literalExpression(), Literal.class) + : ImmutableList.of(); + return new ExecuteImmediate(getLocation(ctx), sql, parameters); + } + + @Override + public Node visitDeallocateStatement(RelationalSqlParser.DeallocateStatementContext ctx) { + Identifier statementName = lowerIdentifier((Identifier) visit(ctx.statementName)); + return new Deallocate(getLocation(ctx), statementName); + } + // ***************** arguments ***************** @Override public Node visitGenericType(RelationalSqlParser.GenericTypeContext ctx) { diff --git a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 index 4bd8a2b0bd7..1690ea4c855 100644 --- a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 +++ b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 @@ -174,6 +174,12 @@ statement | loadModelStatement | unloadModelStatement + // Prepared Statement + | prepareStatement + | executeStatement + | executeImmediateStatement + | deallocateStatement + // View, Trigger, CQ, Quota are not supported yet ; @@ -857,6 +863,23 @@ unloadModelStatement : UNLOAD MODEL existingModelId=identifier FROM DEVICES deviceIdList=string ; +// ------------------------------------------- Prepared Statement --------------------------------------------------------- +prepareStatement + : PREPARE statementName=identifier FROM sql=statement + ; + +executeStatement + : EXECUTE statementName=identifier (USING literalExpression (',' literalExpression)*)? + ; + +executeImmediateStatement + : EXECUTE IMMEDIATE sql=string (USING literalExpression (',' literalExpression)*)? + ; + +deallocateStatement + : DEALLOCATE PREPARE statementName=identifier + ; + // ------------------------------------------- Query Statement --------------------------------------------------------- queryStatement : query #statementDefault diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift index 7d334059fff..48afb89d336 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift @@ -164,6 +164,7 @@ struct TSCloseOperationReq { 1: required i64 sessionId 2: optional i64 queryId 3: optional i64 statementId + 4: optional string preparedStatementName } struct TSFetchResultsReq{
