This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch ty/TableModelGrammar
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ty/TableModelGrammar by this
push:
new 1f5c6c9d1ae Support Specify database while contructing JDBC, Session
and SessionPool
1f5c6c9d1ae is described below
commit 1f5c6c9d1ae165e53b2bde07546610fa28457982
Author: Jackie Tien <[email protected]>
AuthorDate: Tue Jul 9 14:14:28 2024 +0800
Support Specify database while contructing JDBC, Session and SessionPool
---
.../org/apache/iotdb/TableModelJDBCExample.java | 108 ++++++++++++
.../org/apache/iotdb/TableModelSessionExample.java | 136 ++++++++++++++
.../apache/iotdb/TableModelSessionPoolExample.java | 147 ++++++++++++++++
.../iotdb/it/env/cluster/env/AbstractEnv.java | 31 ++++
.../iotdb/it/env/remote/env/RemoteServerEnv.java | 36 ++++
.../java/org/apache/iotdb/itbase/env/BaseEnv.java | 5 +
.../iotdb/itbase/runtime/ClusterTestStatement.java | 4 +-
.../relational/it/schema/IoTDBDatabaseIT.java | 21 ++-
.../iotdb/relational/it/schema/IoTDBTableIT.java | 14 +-
.../it/session/IoTDBTableModelSessionIT.java | 114 ++++++++++++
.../session/pool/IoTDBTableModelSessionPoolIT.java | 196 +++++++++++++++++++++
.../org/apache/iotdb/isession/IPooledSession.java | 64 +++++++
.../apache/iotdb/isession/pool/ISessionPool.java | 3 +
.../main/java/org/apache/iotdb/jdbc/Config.java | 2 +
.../org/apache/iotdb/jdbc/IoTDBConnection.java | 5 +
.../apache/iotdb/jdbc/IoTDBConnectionParams.java | 11 ++
.../java/org/apache/iotdb/jdbc/IoTDBStatement.java | 4 +
.../src/main/java/org/apache/iotdb/jdbc/Utils.java | 55 ++++--
.../main/java/org/apache/iotdb/rpc/RpcUtils.java | 4 +
.../java/org/apache/iotdb/session/Session.java | 48 ++++-
.../apache/iotdb/session/SessionConnection.java | 20 ++-
.../org/apache/iotdb/session/pool/SessionPool.java | 35 +++-
.../apache/iotdb/session/pool/SessionWrapper.java | 195 ++++++++++++++++++++
.../session/subscription/SubscriptionSession.java | 9 +-
.../SubscriptionSessionConnection.java | 13 +-
.../iotdb/session/SessionConnectionTest.java | 6 +-
.../protocol/thrift/impl/ClientRPCServiceImpl.java | 27 +++
.../config/executor/ClusterConfigTaskExecutor.java | 46 +++--
.../thrift-datanode/src/main/thrift/client.thrift | 2 +
29 files changed, 1309 insertions(+), 52 deletions(-)
diff --git
a/example/jdbc/src/main/java/org/apache/iotdb/TableModelJDBCExample.java
b/example/jdbc/src/main/java/org/apache/iotdb/TableModelJDBCExample.java
new file mode 100644
index 00000000000..ff326d97340
--- /dev/null
+++ b/example/jdbc/src/main/java/org/apache/iotdb/TableModelJDBCExample.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb;
+
+import org.apache.iotdb.jdbc.IoTDBSQLException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+public class TableModelJDBCExample {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TableModelJDBCExample.class);
+
+ public static void main(String[] args) throws ClassNotFoundException,
SQLException {
+ Class.forName("org.apache.iotdb.jdbc.IoTDBDriver");
+
+ // don't specify database in url
+ try (Connection connection =
+ DriverManager.getConnection(
+ "jdbc:iotdb://127.0.0.1:6667?sql_dialect=table", "root",
"root");
+ Statement statement = connection.createStatement()) {
+
+ statement.execute("CREATE DATABASE test1");
+ statement.execute("CREATE DATABASE test2");
+
+ statement.execute("use test2");
+
+ // or use full qualified table name
+ statement.execute(
+ "create table test1.table1(region_id STRING ID, plant_id STRING ID,
device_id STRING ID, model STRING ATTRIBUTE, temperature FLOAT MEASUREMENT,
humidity DOUBLE MEASUREMENT) with (TTL=3600000)");
+
+ statement.execute(
+ "create table table2(region_id STRING ID, plant_id STRING ID, color
STRING ATTRIBUTE, temperature FLOAT MEASUREMENT, speed DOUBLE MEASUREMENT) with
(TTL=6600000)");
+
+ // show tables from current database
+ try (ResultSet resultSet = statement.executeQuery("SHOW TABLES")) {
+ ResultSetMetaData metaData = resultSet.getMetaData();
+ System.out.println(metaData.getColumnCount());
+ while (resultSet.next()) {
+ System.out.println(resultSet.getString(1) + ", " +
resultSet.getInt(2));
+ }
+ }
+
+ // show tables by specifying another database
+ // using SHOW tables FROM
+ try (ResultSet resultSet = statement.executeQuery("SHOW TABLES FROM
test1")) {
+ ResultSetMetaData metaData = resultSet.getMetaData();
+ System.out.println(metaData.getColumnCount());
+ while (resultSet.next()) {
+ System.out.println(resultSet.getString(1) + ", " +
resultSet.getInt(2));
+ }
+ }
+
+ } catch (IoTDBSQLException e) {
+ LOGGER.error("IoTDB Jdbc example error", e);
+ }
+
+ // specify database in url
+ try (Connection connection =
+ DriverManager.getConnection(
+ "jdbc:iotdb://127.0.0.1:6667/test1?sql_dialect=table", "root",
"root");
+ Statement statement = connection.createStatement()) {
+ // show tables from current database test1
+ try (ResultSet resultSet = statement.executeQuery("SHOW TABLES")) {
+ ResultSetMetaData metaData = resultSet.getMetaData();
+ System.out.println(metaData.getColumnCount());
+ while (resultSet.next()) {
+ System.out.println(resultSet.getString(1) + ", " +
resultSet.getInt(2));
+ }
+ }
+
+ // change database to test2
+ statement.execute("use test2");
+
+ try (ResultSet resultSet = statement.executeQuery("SHOW TABLES")) {
+ ResultSetMetaData metaData = resultSet.getMetaData();
+ System.out.println(metaData.getColumnCount());
+ while (resultSet.next()) {
+ System.out.println(resultSet.getString(1) + ", " +
resultSet.getInt(2));
+ }
+ }
+ }
+ }
+}
diff --git
a/example/session/src/main/java/org/apache/iotdb/TableModelSessionExample.java
b/example/session/src/main/java/org/apache/iotdb/TableModelSessionExample.java
new file mode 100644
index 00000000000..b57b5903e07
--- /dev/null
+++
b/example/session/src/main/java/org/apache/iotdb/TableModelSessionExample.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb;
+
+import org.apache.iotdb.isession.SessionDataSet;
+import org.apache.iotdb.isession.util.Version;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Session;
+
+public class TableModelSessionExample {
+
+ private static final String LOCAL_HOST = "127.0.0.1";
+
+ public static void main(String[] args) {
+
+ // don't specify database in constructor
+ Session session =
+ new Session.Builder()
+ .host(LOCAL_HOST)
+ .port(6667)
+ .username("root")
+ .password("root")
+ .version(Version.V_1_0)
+ .sqlDialect("table")
+ .build();
+
+ try {
+ session.open(false);
+
+ session.executeNonQueryStatement("CREATE DATABASE test1");
+ session.executeNonQueryStatement("CREATE DATABASE test2");
+
+ session.executeNonQueryStatement("use test2");
+
+ // or use full qualified table name
+ session.executeNonQueryStatement(
+ "create table test1.table1(region_id STRING ID, plant_id STRING ID,
device_id STRING ID, model STRING ATTRIBUTE, temperature FLOAT MEASUREMENT,
humidity DOUBLE MEASUREMENT) with (TTL=3600000)");
+
+ session.executeNonQueryStatement(
+ "create table table2(region_id STRING ID, plant_id STRING ID, color
STRING ATTRIBUTE, temperature FLOAT MEASUREMENT, speed DOUBLE MEASUREMENT) with
(TTL=6600000)");
+
+ // show tables from current database
+ try (SessionDataSet dataSet = session.executeQueryStatement("SHOW
TABLES")) {
+ System.out.println(dataSet.getColumnNames());
+ while (dataSet.hasNext()) {
+ System.out.println(dataSet.next());
+ }
+ }
+
+ // show tables by specifying another database
+ // using SHOW tables FROM
+ try (SessionDataSet dataSet = session.executeQueryStatement("SHOW TABLES
FROM test1")) {
+ System.out.println(dataSet.getColumnNames());
+ while (dataSet.hasNext()) {
+ System.out.println(dataSet.next());
+ }
+ }
+
+ } catch (IoTDBConnectionException e) {
+ e.printStackTrace();
+ } catch (StatementExecutionException e) {
+ e.printStackTrace();
+ } finally {
+ try {
+ session.close();
+ } catch (IoTDBConnectionException e) {
+ e.printStackTrace();
+ }
+ }
+
+ // specify database in constructor
+ session =
+ new Session.Builder()
+ .host(LOCAL_HOST)
+ .port(6667)
+ .username("root")
+ .password("root")
+ .version(Version.V_1_0)
+ .sqlDialect("table")
+ .database("test1")
+ .build();
+
+ try {
+ session.open(false);
+
+ // show tables from current database
+ try (SessionDataSet dataSet = session.executeQueryStatement("SHOW
TABLES")) {
+ System.out.println(dataSet.getColumnNames());
+ while (dataSet.hasNext()) {
+ System.out.println(dataSet.next());
+ }
+ }
+
+ // change database to test2
+ session.executeNonQueryStatement("use test2");
+
+ // show tables by specifying another database
+ // using SHOW tables FROM
+ try (SessionDataSet dataSet = session.executeQueryStatement("SHOW
TABLES")) {
+ System.out.println(dataSet.getColumnNames());
+ while (dataSet.hasNext()) {
+ System.out.println(dataSet.next());
+ }
+ }
+
+ } catch (IoTDBConnectionException e) {
+ e.printStackTrace();
+ } catch (StatementExecutionException e) {
+ e.printStackTrace();
+ } finally {
+ try {
+ session.close();
+ } catch (IoTDBConnectionException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+}
diff --git
a/example/session/src/main/java/org/apache/iotdb/TableModelSessionPoolExample.java
b/example/session/src/main/java/org/apache/iotdb/TableModelSessionPoolExample.java
new file mode 100644
index 00000000000..78af9efdd6d
--- /dev/null
+++
b/example/session/src/main/java/org/apache/iotdb/TableModelSessionPoolExample.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb;
+
+import org.apache.iotdb.isession.IPooledSession;
+import org.apache.iotdb.isession.SessionDataSet;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.pool.SessionPool;
+
+public class TableModelSessionPoolExample {
+
+ private static final String LOCAL_HOST = "127.0.0.1";
+
+ public static void main(String[] args) {
+
+ // don't specify database in constructor
+ SessionPool sessionPool =
+ new SessionPool.Builder()
+ .host(LOCAL_HOST)
+ .port(6667)
+ .user("root")
+ .password("root")
+ .maxSize(1)
+ .sqlDialect("table")
+ .build();
+
+ try (IPooledSession session = sessionPool.getPooledSession()) {
+
+ session.executeNonQueryStatement("CREATE DATABASE test1");
+ session.executeNonQueryStatement("CREATE DATABASE test2");
+
+ session.executeNonQueryStatement("use test2");
+
+ // or use full qualified table name
+ session.executeNonQueryStatement(
+ "create table test1.table1(region_id STRING ID, plant_id STRING ID,
device_id STRING ID, model STRING ATTRIBUTE, temperature FLOAT MEASUREMENT,
humidity DOUBLE MEASUREMENT) with (TTL=3600000)");
+
+ session.executeNonQueryStatement(
+ "create table table2(region_id STRING ID, plant_id STRING ID, color
STRING ATTRIBUTE, temperature FLOAT MEASUREMENT, speed DOUBLE MEASUREMENT) with
(TTL=6600000)");
+
+ // show tables from current database
+ try (SessionDataSet dataSet = session.executeQueryStatement("SHOW
TABLES")) {
+ System.out.println(dataSet.getColumnNames());
+ System.out.println(dataSet.getColumnTypes());
+ while (dataSet.hasNext()) {
+ System.out.println(dataSet.next());
+ }
+ }
+
+ // show tables by specifying another database
+ // using SHOW tables FROM
+ try (SessionDataSet dataSet = session.executeQueryStatement("SHOW TABLES
FROM test1")) {
+ System.out.println(dataSet.getColumnNames());
+ System.out.println(dataSet.getColumnTypes());
+ while (dataSet.hasNext()) {
+ System.out.println(dataSet.next());
+ }
+ }
+
+ } catch (IoTDBConnectionException e) {
+ e.printStackTrace();
+ } catch (StatementExecutionException e) {
+ e.printStackTrace();
+ } finally {
+ sessionPool.close();
+ }
+
+ // specify database in constructor
+ sessionPool =
+ new SessionPool.Builder()
+ .host(LOCAL_HOST)
+ .port(6667)
+ .user("root")
+ .password("root")
+ .maxSize(1)
+ .sqlDialect("table")
+ .database("test1")
+ .build();
+
+ try (IPooledSession session = sessionPool.getPooledSession()) {
+
+ // show tables from current database
+ try (SessionDataSet dataSet = session.executeQueryStatement("SHOW
TABLES")) {
+ System.out.println(dataSet.getColumnNames());
+ System.out.println(dataSet.getColumnTypes());
+ while (dataSet.hasNext()) {
+ System.out.println(dataSet.next());
+ }
+ }
+
+ // change database to test2
+ session.executeNonQueryStatement("use test2");
+
+ // show tables by specifying another database
+ // using SHOW tables FROM
+ try (SessionDataSet dataSet = session.executeQueryStatement("SHOW
TABLES")) {
+ System.out.println(dataSet.getColumnNames());
+ System.out.println(dataSet.getColumnTypes());
+ while (dataSet.hasNext()) {
+ System.out.println(dataSet.next());
+ }
+ }
+
+ } catch (IoTDBConnectionException e) {
+ e.printStackTrace();
+ } catch (StatementExecutionException e) {
+ e.printStackTrace();
+ }
+
+ try (IPooledSession session = sessionPool.getPooledSession()) {
+
+ // show tables from default database test1
+ try (SessionDataSet dataSet = session.executeQueryStatement("SHOW
TABLES")) {
+ System.out.println(dataSet.getColumnNames());
+ System.out.println(dataSet.getColumnTypes());
+ while (dataSet.hasNext()) {
+ System.out.println(dataSet.next());
+ }
+ }
+
+ } catch (IoTDBConnectionException e) {
+ e.printStackTrace();
+ } catch (StatementExecutionException e) {
+ e.printStackTrace();
+ } finally {
+ sessionPool.close();
+ }
+ }
+}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
index 2fd68b5bbf6..b0e483d61bb 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
@@ -429,6 +429,22 @@ public abstract class AbstractEnv implements BaseEnv {
return session;
}
+ @Override
+ public ISession getSessionConnectionWithDB(String sqlDialect, String
database)
+ throws IoTDBConnectionException {
+ DataNodeWrapper dataNode =
+
this.dataNodeWrapperList.get(rand.nextInt(this.dataNodeWrapperList.size()));
+ Session session =
+ new Session.Builder()
+ .host(dataNode.getIp())
+ .port(dataNode.getPort())
+ .sqlDialect(sqlDialect)
+ .database(database)
+ .build();
+ session.open();
+ return session;
+ }
+
@Override
public ISession getSessionConnection(String userName, String password,
String sqlDialect)
throws IoTDBConnectionException {
@@ -480,6 +496,21 @@ public abstract class AbstractEnv implements BaseEnv {
.build();
}
+ @Override
+ public ISessionPool getSessionPool(int maxSize, String sqlDialect, String
database) {
+ DataNodeWrapper dataNode =
+
this.dataNodeWrapperList.get(rand.nextInt(this.dataNodeWrapperList.size()));
+ return new SessionPool.Builder()
+ .host(dataNode.getIp())
+ .port(dataNode.getPort())
+ .user(SessionConfig.DEFAULT_USER)
+ .password(SessionConfig.DEFAULT_PASSWORD)
+ .maxSize(maxSize)
+ .sqlDialect(sqlDialect)
+ .database(database)
+ .build();
+ }
+
protected NodeConnection getWriteConnection(
Constant.Version version, String username, String password, String
sqlDialect)
throws SQLException {
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
index 665d6248ad0..6c75c22ad96 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
@@ -216,6 +216,28 @@ public class RemoteServerEnv implements BaseEnv {
.build();
}
+ @Override
+ public ISessionPool getSessionPool(int maxSize, String sqlDialect, String
database) {
+ return new SessionPool.Builder()
+ .host(SessionConfig.DEFAULT_HOST)
+ .port(SessionConfig.DEFAULT_PORT)
+ .user(SessionConfig.DEFAULT_USER)
+ .password(SessionConfig.DEFAULT_PASSWORD)
+ .maxSize(maxSize)
+ .fetchSize(SessionConfig.DEFAULT_FETCH_SIZE)
+ .waitToGetSessionTimeoutInMs(60_000)
+ .enableCompression(false)
+ .zoneId(null)
+ .enableRedirection(SessionConfig.DEFAULT_REDIRECTION_MODE)
+ .connectionTimeoutInMs(SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS)
+ .version(SessionConfig.DEFAULT_VERSION)
+ .thriftDefaultBufferSize(SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY)
+ .thriftMaxFrameSize(SessionConfig.DEFAULT_MAX_FRAME_SIZE)
+ .sqlDialect(sqlDialect)
+ .database(database)
+ .build();
+ }
+
@Override
public ISession getSessionConnection(String sqlDialect) throws
IoTDBConnectionException {
Session session =
@@ -228,6 +250,20 @@ public class RemoteServerEnv implements BaseEnv {
return session;
}
+ @Override
+ public ISession getSessionConnectionWithDB(String sqlDialect, String
database)
+ throws IoTDBConnectionException {
+ Session session =
+ new Session.Builder()
+ .host(ip_addr)
+ .port(Integer.parseInt(port))
+ .sqlDialect(sqlDialect)
+ .database(database)
+ .build();
+ session.open();
+ return session;
+ }
+
@Override
public ISession getSessionConnection(String userName, String password,
String sqlDialect)
throws IoTDBConnectionException {
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
index 472df6c6f63..b5af08af1cf 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
@@ -174,12 +174,17 @@ public interface BaseEnv {
ISessionPool getSessionPool(int maxSize, String sqlDialect);
+ ISessionPool getSessionPool(int maxSize, String sqlDialect, String database);
+
default ISession getSessionConnection() throws IoTDBConnectionException {
return getSessionConnection(TREE_SQL_DIALECT);
}
ISession getSessionConnection(String sqlDialect) throws
IoTDBConnectionException;
+ ISession getSessionConnectionWithDB(String sqlDialect, String database)
+ throws IoTDBConnectionException;
+
default ISession getSessionConnection(String userName, String password)
throws IoTDBConnectionException {
return getSessionConnection(userName, password, TREE_SQL_DIALECT);
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestStatement.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestStatement.java
index f8c3e38ea57..d99ea24717e 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestStatement.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestStatement.java
@@ -31,6 +31,8 @@ import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
+import static org.apache.iotdb.rpc.RpcUtils.isUseDatabase;
+
/** The implementation of {@link ClusterTestStatement} in cluster test. */
public class ClusterTestStatement implements Statement {
@@ -190,7 +192,7 @@ public class ClusterTestStatement implements Statement {
sql = sql.trim();
boolean result = writeStatement.execute(sql);
// if use XXXX, sendRequest to all statements
- if (sql.length() > 4 && "use ".equalsIgnoreCase(sql.substring(0, 4))) {
+ if (isUseDatabase(sql)) {
for (Statement readStatement : readStatements) {
boolean tmp = readStatement.execute(sql);
result = result && tmp;
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java
index 3260d4b27f6..56246f1d790 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java
@@ -65,6 +65,17 @@ public class IoTDBDatabaseIT {
// create
statement.execute("create database test");
+ // create duplicated database without IF NOT EXISTS
+ try {
+ statement.execute("create database test");
+ fail("create database test shouldn't succeed because test already
exists");
+ } catch (SQLException e) {
+ assertEquals("501: Database test already exists", e.getMessage());
+ }
+
+ // create duplicated database with IF NOT EXISTS
+ statement.execute("create database IF NOT EXISTS test");
+
String[] databaseNames = new String[] {"test"};
int[] schemaReplicaFactors = new int[] {1};
int[] dataReplicaFactors = new int[] {1};
@@ -105,15 +116,17 @@ public class IoTDBDatabaseIT {
assertFalse(resultSet.next());
}
- // drop nonexistent database
+ // drop nonexistent database without IF EXISTS
try {
statement.execute("drop database test");
- fail("drop database test shouldn't succeed because test1 doesn't
exist");
+ fail("drop database test shouldn't succeed because test doesn't
exist");
} catch (SQLException e) {
- // TODO error msg should be changed to 500: Database test1 doesn't
exists
- assertEquals("508: Path [root.test] does not exist", e.getMessage());
+ assertEquals("500: Database test doesn't exist", e.getMessage());
}
+ // drop nonexistent database with IF EXISTS
+ statement.execute("drop database IF EXISTS test");
+
// create with strange name
try {
statement.execute("create database 1test");
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBTableIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBTableIT.java
index 80ae31d9c54..72519a38c45 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBTableIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBTableIT.java
@@ -129,7 +129,7 @@ public class IoTDBTableIT {
}
while (resultSet.next()) {
assertEquals(tableNames[cnt], resultSet.getString(1));
- assertEquals(ttls[cnt], resultSet.getInt(2));
+ assertEquals(ttls[cnt], resultSet.getLong(2));
cnt++;
}
assertEquals(tableNames.length, cnt);
@@ -150,7 +150,7 @@ public class IoTDBTableIT {
}
while (resultSet.next()) {
assertEquals(tableNames[cnt], resultSet.getString(1));
- assertEquals(ttls[cnt], resultSet.getInt(2));
+ assertEquals(ttls[cnt], resultSet.getLong(2));
cnt++;
}
assertEquals(tableNames.length, cnt);
@@ -167,7 +167,7 @@ public class IoTDBTableIT {
}
while (resultSet.next()) {
assertEquals(tableNames[cnt], resultSet.getString(1));
- assertEquals(ttls[cnt], resultSet.getInt(2));
+ assertEquals(ttls[cnt], resultSet.getLong(2));
cnt++;
}
assertEquals(tableNames.length, cnt);
@@ -240,6 +240,14 @@ public class IoTDBTableIT {
assertEquals("550: Table test3.table3 not exists.", e.getMessage());
}
+ statement.execute("drop database test1");
+
+ try {
+ statement.executeQuery("SHOW tables from test1");
+ } catch (SQLException e) {
+ assertEquals("500: Database test1 doesn't exists.", e.getMessage());
+ }
+
} catch (SQLException e) {
e.printStackTrace();
fail(e.getMessage());
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBTableModelSessionIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBTableModelSessionIT.java
new file mode 100644
index 00000000000..24abb4c6b87
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBTableModelSessionIT.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.relational.it.session;
+
+import org.apache.iotdb.isession.ISession;
+import org.apache.iotdb.isession.SessionDataSet;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+
+import org.apache.tsfile.read.common.RowRecord;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import static
org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.showTablesColumnHeaders;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBTableModelSessionIT {
+
+ @Before
+ public void setUp() throws Exception {
+ EnvFactory.getEnv().initClusterEnvironment();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ @Test
+ public void testUseDatabase() {
+ String[] table1Names = new String[] {"table1"};
+ long[] table1ttls = new long[] {3600000L};
+
+ String[] table2Names = new String[] {"table2"};
+ long[] table2ttls = new long[] {6600000L};
+
+ try (ISession session =
EnvFactory.getEnv().getSessionConnectionWithDB("table", "test2")) {
+
+ session.executeNonQueryStatement("CREATE DATABASE test1");
+ session.executeNonQueryStatement("CREATE DATABASE test2");
+
+ // or use full qualified table name
+ session.executeNonQueryStatement(
+ "create table test1.table1(region_id STRING ID, plant_id STRING ID,
device_id STRING ID, model STRING ATTRIBUTE, temperature FLOAT MEASUREMENT,
humidity DOUBLE MEASUREMENT) with (TTL=3600000)");
+
+ session.executeNonQueryStatement(
+ "create table table2(region_id STRING ID, plant_id STRING ID, color
STRING ATTRIBUTE, temperature FLOAT MEASUREMENT, speed DOUBLE MEASUREMENT) with
(TTL=6600000)");
+
+ try (SessionDataSet dataSet = session.executeQueryStatement("SHOW
TABLES")) {
+ int cnt = 0;
+ assertEquals(showTablesColumnHeaders.size(),
dataSet.getColumnNames().size());
+ for (int i = 0; i < showTablesColumnHeaders.size(); i++) {
+ assertEquals(
+ showTablesColumnHeaders.get(i).getColumnName(),
dataSet.getColumnNames().get(i));
+ }
+ while (dataSet.hasNext()) {
+ RowRecord rowRecord = dataSet.next();
+ assertEquals(table2Names[cnt],
rowRecord.getFields().get(0).getStringValue());
+ assertEquals(table2ttls[cnt],
rowRecord.getFields().get(1).getLongV());
+ cnt++;
+ }
+ assertEquals(table2Names.length, cnt);
+ }
+
+ session.executeNonQueryStatement("use test1");
+
+ try (SessionDataSet dataSet = session.executeQueryStatement("SHOW
TABLES")) {
+ int cnt = 0;
+ assertEquals(showTablesColumnHeaders.size(),
dataSet.getColumnNames().size());
+ for (int i = 0; i < showTablesColumnHeaders.size(); i++) {
+ assertEquals(
+ showTablesColumnHeaders.get(i).getColumnName(),
dataSet.getColumnNames().get(i));
+ }
+ while (dataSet.hasNext()) {
+ RowRecord rowRecord = dataSet.next();
+ assertEquals(table1Names[cnt],
rowRecord.getFields().get(0).getStringValue());
+ assertEquals(table1ttls[cnt],
rowRecord.getFields().get(1).getLongV());
+ cnt++;
+ }
+ assertEquals(table1Names.length, cnt);
+ }
+
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ fail(e.getMessage());
+ }
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/pool/IoTDBTableModelSessionPoolIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/pool/IoTDBTableModelSessionPoolIT.java
new file mode 100644
index 00000000000..db87a401de9
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/pool/IoTDBTableModelSessionPoolIT.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.relational.it.session.pool;
+
+import org.apache.iotdb.isession.IPooledSession;
+import org.apache.iotdb.isession.SessionDataSet;
+import org.apache.iotdb.isession.pool.ISessionPool;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+
+import org.apache.tsfile.read.common.RowRecord;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import static
org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.showTablesColumnHeaders;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBTableModelSessionPoolIT {
+
+ @Before
+ public void setUp() throws Exception {
+ EnvFactory.getEnv().initClusterEnvironment();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ @Test
+ public void testUseDatabase() {
+
+ String[] table1Names = new String[] {"table1"};
+ long[] table1ttls = new long[] {3600000L};
+
+ String[] table2Names = new String[] {"table2"};
+ long[] table2ttls = new long[] {6600000L};
+
+ ISessionPool sessionPool = EnvFactory.getEnv().getSessionPool(1, "table");
+ try (IPooledSession session = sessionPool.getPooledSession()) {
+
+ session.executeNonQueryStatement("CREATE DATABASE test1");
+ session.executeNonQueryStatement("CREATE DATABASE test2");
+
+ session.executeNonQueryStatement("use test2");
+
+ // or use full qualified table name
+ session.executeNonQueryStatement(
+ "create table test1.table1(region_id STRING ID, plant_id STRING ID,
device_id STRING ID, model STRING ATTRIBUTE, temperature FLOAT MEASUREMENT,
humidity DOUBLE MEASUREMENT) with (TTL=3600000)");
+
+ session.executeNonQueryStatement(
+ "create table table2(region_id STRING ID, plant_id STRING ID, color
STRING ATTRIBUTE, temperature FLOAT MEASUREMENT, speed DOUBLE MEASUREMENT) with
(TTL=6600000)");
+
+ try (SessionDataSet dataSet = session.executeQueryStatement("SHOW
TABLES")) {
+ int cnt = 0;
+ assertEquals(showTablesColumnHeaders.size(),
dataSet.getColumnNames().size());
+ for (int i = 0; i < showTablesColumnHeaders.size(); i++) {
+ assertEquals(
+ showTablesColumnHeaders.get(i).getColumnName(),
dataSet.getColumnNames().get(i));
+ }
+ while (dataSet.hasNext()) {
+ RowRecord rowRecord = dataSet.next();
+ assertEquals(table2Names[cnt],
rowRecord.getFields().get(0).getStringValue());
+ assertEquals(table2ttls[cnt],
rowRecord.getFields().get(1).getLongV());
+ cnt++;
+ }
+ assertEquals(table2Names.length, cnt);
+ }
+
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ fail(e.getMessage());
+ }
+
+ try (IPooledSession session = sessionPool.getPooledSession()) {
+
+ // current session's database is still test2
+ try (SessionDataSet dataSet = session.executeQueryStatement("SHOW
TABLES")) {
+ int cnt = 0;
+ assertEquals(showTablesColumnHeaders.size(),
dataSet.getColumnNames().size());
+ for (int i = 0; i < showTablesColumnHeaders.size(); i++) {
+ assertEquals(
+ showTablesColumnHeaders.get(i).getColumnName(),
dataSet.getColumnNames().get(i));
+ }
+ while (dataSet.hasNext()) {
+ RowRecord rowRecord = dataSet.next();
+ assertEquals(table2Names[cnt],
rowRecord.getFields().get(0).getStringValue());
+ assertEquals(table2ttls[cnt],
rowRecord.getFields().get(1).getLongV());
+ cnt++;
+ }
+ assertEquals(table2Names.length, cnt);
+ }
+
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ fail(e.getMessage());
+ } finally {
+ sessionPool.close();
+ }
+
+ // specify database in constructor
+ sessionPool = EnvFactory.getEnv().getSessionPool(1, "table", "test1");
+
+ try (IPooledSession session = sessionPool.getPooledSession()) {
+
+ // current session's database is test1
+ try (SessionDataSet dataSet = session.executeQueryStatement("SHOW
TABLES")) {
+ int cnt = 0;
+ assertEquals(showTablesColumnHeaders.size(),
dataSet.getColumnNames().size());
+ for (int i = 0; i < showTablesColumnHeaders.size(); i++) {
+ assertEquals(
+ showTablesColumnHeaders.get(i).getColumnName(),
dataSet.getColumnNames().get(i));
+ }
+ while (dataSet.hasNext()) {
+ RowRecord rowRecord = dataSet.next();
+ assertEquals(table1Names[cnt],
rowRecord.getFields().get(0).getStringValue());
+ assertEquals(table1ttls[cnt],
rowRecord.getFields().get(1).getLongV());
+ cnt++;
+ }
+ assertEquals(table1Names.length, cnt);
+ }
+
+ // change database to test2
+ session.executeNonQueryStatement("use test2");
+
+ try (SessionDataSet dataSet = session.executeQueryStatement("SHOW
TABLES")) {
+ int cnt = 0;
+ assertEquals(showTablesColumnHeaders.size(),
dataSet.getColumnNames().size());
+ for (int i = 0; i < showTablesColumnHeaders.size(); i++) {
+ assertEquals(
+ showTablesColumnHeaders.get(i).getColumnName(),
dataSet.getColumnNames().get(i));
+ }
+ while (dataSet.hasNext()) {
+ RowRecord rowRecord = dataSet.next();
+ assertEquals(table2Names[cnt],
rowRecord.getFields().get(0).getStringValue());
+ assertEquals(table2ttls[cnt],
rowRecord.getFields().get(1).getLongV());
+ cnt++;
+ }
+ assertEquals(table2Names.length, cnt);
+ }
+
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ fail(e.getMessage());
+ }
+
+ // after putting back, the session's database should be changed back to
default test1
+ try (IPooledSession session = sessionPool.getPooledSession()) {
+
+ try (SessionDataSet dataSet = session.executeQueryStatement("SHOW
TABLES")) {
+ int cnt = 0;
+ assertEquals(showTablesColumnHeaders.size(),
dataSet.getColumnNames().size());
+ for (int i = 0; i < showTablesColumnHeaders.size(); i++) {
+ assertEquals(
+ showTablesColumnHeaders.get(i).getColumnName(),
dataSet.getColumnNames().get(i));
+ }
+ while (dataSet.hasNext()) {
+ RowRecord rowRecord = dataSet.next();
+ assertEquals(table1Names[cnt],
rowRecord.getFields().get(0).getStringValue());
+ assertEquals(table1ttls[cnt],
rowRecord.getFields().get(1).getLongV());
+ cnt++;
+ }
+ assertEquals(table1Names.length, cnt);
+ }
+
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ fail(e.getMessage());
+ } finally {
+ sessionPool.close();
+ }
+ }
+}
diff --git
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/IPooledSession.java
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/IPooledSession.java
new file mode 100644
index 00000000000..04c25944a97
--- /dev/null
+++
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/IPooledSession.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.isession;
+
+import org.apache.iotdb.isession.util.Version;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.service.rpc.thrift.TSBackupConfigurationResp;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
+
+import org.apache.thrift.TException;
+import org.apache.tsfile.write.record.Tablet;
+
+public interface IPooledSession extends AutoCloseable {
+
+ Version getVersion();
+
+ int getFetchSize();
+
+ void close() throws IoTDBConnectionException;
+
+ String getTimeZone();
+
+ SessionDataSet executeQueryStatement(String sql)
+ throws StatementExecutionException, IoTDBConnectionException;
+
+ SessionDataSet executeQueryStatement(String sql, long timeoutInMs)
+ throws StatementExecutionException, IoTDBConnectionException;
+
+ void executeNonQueryStatement(String sql)
+ throws IoTDBConnectionException, StatementExecutionException;
+
+ String getTimestampPrecision() throws TException;
+
+ void insertTablet(Tablet tablet) throws StatementExecutionException,
IoTDBConnectionException;
+
+ boolean isEnableQueryRedirection();
+
+ boolean isEnableRedirection();
+
+ TSBackupConfigurationResp getBackupConfiguration()
+ throws IoTDBConnectionException, StatementExecutionException;
+
+ TSConnectionInfoResp fetchAllConnections() throws IoTDBConnectionException;
+
+ long getQueryTimeout();
+}
diff --git
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java
index eeb6509e89d..768a4ceb136 100644
---
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java
+++
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.isession.pool;
import org.apache.iotdb.common.rpc.thrift.TAggregationType;
+import org.apache.iotdb.isession.IPooledSession;
import org.apache.iotdb.isession.template.Template;
import org.apache.iotdb.isession.util.SystemStatus;
import org.apache.iotdb.isession.util.Version;
@@ -558,6 +559,8 @@ public interface ISessionPool {
long getQueryTimeout();
+ IPooledSession getPooledSession() throws IoTDBConnectionException;
+
/**
* @deprecated
*/
diff --git a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java
b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java
index eb347224d95..0b1049330ea 100644
--- a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java
+++ b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java
@@ -83,4 +83,6 @@ public class Config {
public static final String TRUST_STORE_PWD = "trust_store_pwd";
public static final String SQL_DIALECT = "sql_dialect";
+
+ public static final String DATABASE = "db";
}
diff --git
a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
index b4bdda425e3..721a1217ca2 100644
--- a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
+++ b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
@@ -499,6 +499,7 @@ public class IoTDBConnection implements Connection {
openReq.setZoneId(getTimeZone());
openReq.putToConfiguration(Config.VERSION, params.getVersion().toString());
openReq.putToConfiguration(Config.SQL_DIALECT, params.getSqlDialect());
+ params.getDb().ifPresent(db -> openReq.putToConfiguration(Config.DATABASE,
db));
TSOpenSessionResp openResp = null;
try {
@@ -593,4 +594,8 @@ public class IoTDBConnection implements Connection {
public ServerProperties getServerProperties() throws TException {
return getClient().getProperties();
}
+
+ protected void changeDefaultDatabase(String database) {
+ params.setDb(database);
+ }
}
diff --git
a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnectionParams.java
b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnectionParams.java
index e70b5e5a559..a31d2198d27 100644
---
a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnectionParams.java
+++
b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnectionParams.java
@@ -25,6 +25,7 @@ import org.apache.tsfile.common.conf.TSFileConfig;
import java.nio.charset.Charset;
import java.time.ZoneId;
+import java.util.Optional;
public class IoTDBConnectionParams {
@@ -51,6 +52,8 @@ public class IoTDBConnectionParams {
private String sqlDialect = "tree";
+ private String db;
+
public IoTDBConnectionParams(String url) {
this.jdbcUriString = url;
}
@@ -186,4 +189,12 @@ public class IoTDBConnectionParams {
public void setSqlDialect(String sqlDialect) {
this.sqlDialect = sqlDialect;
}
+
+ public Optional<String> getDb() {
+ return Optional.ofNullable(db);
+ }
+
+ public void setDb(String db) {
+ this.db = db;
+ }
}
diff --git
a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
index eaa080aaefe..4dd1d505587 100644
--- a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
+++ b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
@@ -321,6 +321,10 @@ public class IoTDBStatement implements Statement {
throw new IoTDBSQLException(e.getMessage(), execResp.getStatus());
}
+ if (execResp.isSetDatabase()) {
+ connection.changeDefaultDatabase(execResp.getDatabase());
+ }
+
if (execResp.isSetColumns()) {
queryId = execResp.getQueryId();
if (execResp.queryResult == null) {
diff --git a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/Utils.java
b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/Utils.java
index bdea45fb2d9..00e46cc340d 100644
--- a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/Utils.java
+++ b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/Utils.java
@@ -33,10 +33,10 @@ public class Utils {
"squid:S5843",
"squid:S5998"
}) // Regular expressions should not be too complicated
- static final Pattern SUFFIX_URL_PATTERN =
Pattern.compile("([0-9]{1,5})(/|\\\\?.*=.*(&.*=.*)*)?");
+ static final Pattern SUFFIX_URL_PATTERN =
Pattern.compile("(/|\\\\?.*=.*(&.*=.*)*)?");
static final String COLON = ":";
- static final String SLASH = "/";
+ static final char SLASH = '/';
static final String PARAMETER_SEPARATOR = "?";
static final String RPC_COMPRESS = "rpc_compress";
@@ -59,29 +59,48 @@ public class Utils {
String subURL = url.substring(Config.IOTDB_URL_PREFIX.length());
int i = subURL.lastIndexOf(COLON);
host = subURL.substring(0, i);
- suffixURL = subURL.substring(i + 1);
+ params.setHost(host);
+ i++;
+ // parse port
+ int port = 0;
+ for (; i < subURL.length() && Character.isDigit(subURL.charAt(i)); i++) {
+ port = port * 10 + (subURL.charAt(i) - '0');
+ }
+ suffixURL = i < subURL.length() ? subURL.substring(i) : "";
+ // legal port
+ if (port >= 1 && port <= 65535) {
+ params.setPort(port);
+
+ // parse database
+ if (i < subURL.length() && subURL.charAt(i) == SLASH) {
+ int endIndex = subURL.indexOf(PARAMETER_SEPARATOR, i + 1);
+ String database;
+ if (endIndex <= i + 1) {
+ if (i + 1 == subURL.length()) {
+ database = null;
+ } else {
+ database = subURL.substring(i + 1);
+ }
+ suffixURL = "";
+ } else {
+ database = subURL.substring(i + 1, endIndex);
+ suffixURL = subURL.substring(endIndex);
+ }
+ params.setDb(database);
+ }
- matcher = SUFFIX_URL_PATTERN.matcher(suffixURL);
- if (matcher.matches() && parseUrlParam(subURL, info)) {
- isUrlLegal = true;
+ matcher = SUFFIX_URL_PATTERN.matcher(suffixURL);
+ if (matcher.matches() && parseUrlParam(subURL, info)) {
+ isUrlLegal = true;
+ }
}
}
if (!isUrlLegal) {
throw new IoTDBURLException(
- "Error url format, url should be jdbc:iotdb://anything:port/ or
jdbc:iotdb://anything:port?property1=value1&property2=value2");
+ "Error url format, url should be
jdbc:iotdb://anything:port/[database] or
jdbc:iotdb://anything:port[/database]?property1=value1&property2=value2,
current url is "
+ + url);
}
- params.setHost(host);
-
- // parse port
- String port = suffixURL;
- if (suffixURL.contains(PARAMETER_SEPARATOR)) {
- port = suffixURL.split("\\" + PARAMETER_SEPARATOR)[0];
- } else if (suffixURL.contains(SLASH)) {
- port = suffixURL.substring(0, suffixURL.length() - 1);
- }
- params.setPort(Integer.parseInt(port));
-
if (info.containsKey(Config.AUTH_USER)) {
params.setUsername(info.getProperty(Config.AUTH_USER));
}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
index e48054abb19..06aac75aa66 100644
--- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
+++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
@@ -342,4 +342,8 @@ public class RpcUtils {
: new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
.setMessage(failedStatus.toString());
}
+
+ public static boolean isUseDatabase(String sql) {
+ return sql.length() > 4 && "use ".equalsIgnoreCase(sql.substring(0, 4));
+ }
}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
index 533e0c9df47..087b38b73b8 100644
--- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -89,6 +89,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -176,6 +177,9 @@ public class Session implements ISession {
protected String sqlDialect = SessionConfig.SQL_DIALECT;
+ // may be null
+ protected volatile String database;
+
private static final String REDIRECT_TWICE = "redirect twice";
private static final String REDIRECT_TWICE_RETRY = "redirect twice, please
try again.";
@@ -440,6 +444,7 @@ public class Session implements ISession {
this.retryIntervalInMs = builder.retryIntervalInMs;
this.sqlDialect = builder.sqlDialect;
this.queryTimeoutInMs = builder.timeOut;
+ this.database = builder.database;
}
@Override
@@ -600,10 +605,17 @@ public class Session implements ISession {
Session session, TEndPoint endpoint, ZoneId zoneId) throws
IoTDBConnectionException {
if (endpoint == null) {
return new SessionConnection(
- session, zoneId, availableNodes, maxRetryCount, retryIntervalInMs,
sqlDialect);
+ session, zoneId, availableNodes, maxRetryCount, retryIntervalInMs,
sqlDialect, database);
}
return new SessionConnection(
- session, endpoint, zoneId, availableNodes, maxRetryCount,
retryIntervalInMs, sqlDialect);
+ session,
+ endpoint,
+ zoneId,
+ availableNodes,
+ maxRetryCount,
+ retryIntervalInMs,
+ sqlDialect,
+ database);
}
@Override
@@ -936,7 +948,24 @@ public class Session implements ISession {
@Override
public void executeNonQueryStatement(String sql)
throws IoTDBConnectionException, StatementExecutionException {
+ String previousDB = database;
defaultSessionConnection.executeNonQueryStatement(sql);
+ if (!Objects.equals(previousDB, database) && endPointToSessionConnection
!= null) {
+ Iterator<Map.Entry<TEndPoint, SessionConnection>> iterator =
+ endPointToSessionConnection.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<TEndPoint, SessionConnection> entry = iterator.next();
+ SessionConnection sessionConnection = entry.getValue();
+ if (sessionConnection != defaultSessionConnection) {
+ try {
+ sessionConnection.executeNonQueryStatement(sql);
+ } catch (Throwable t) {
+ logger.warn("failed to change database for {}", entry.getKey());
+ iterator.remove();
+ }
+ }
+ }
+ }
}
/**
@@ -3826,6 +3855,14 @@ public class Session implements ISession {
return defaultSessionConnection.fetchAllConnections();
}
+ protected void changeDatabase(String database) {
+ this.database = database;
+ }
+
+ public String getDatabase() {
+ return database;
+ }
+
public static class Builder {
private String host = SessionConfig.DEFAULT_HOST;
private int rpcPort = SessionConfig.DEFAULT_PORT;
@@ -3852,6 +3889,8 @@ public class Session implements ISession {
private String sqlDialect = SessionConfig.SQL_DIALECT;
+ private String database;
+
public Builder useSSL(boolean useSSL) {
this.useSSL = useSSL;
return this;
@@ -3954,6 +3993,11 @@ public class Session implements ISession {
return this;
}
+ public Builder database(String database) {
+ this.database = database;
+ return this;
+ }
+
public Session build() {
if (nodeUrls != null
&& (!SessionConfig.DEFAULT_HOST.equals(host) || rpcPort !=
SessionConfig.DEFAULT_PORT)) {
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 10b4295db20..37f9f72f39d 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -107,12 +107,15 @@ public class SessionConnection {
private final String sqlDialect;
+ private final String database;
+
// TestOnly
public SessionConnection() {
availableNodes = Collections::emptyList;
this.maxRetryCount = Math.max(0, SessionConfig.MAX_RETRY_COUNT);
this.retryIntervalInMs = Math.max(0, SessionConfig.RETRY_INTERVAL_IN_MS);
this.sqlDialect = "tree";
+ database = null;
}
public SessionConnection(
@@ -122,7 +125,8 @@ public class SessionConnection {
Supplier<List<TEndPoint>> availableNodes,
int maxRetryCount,
long retryIntervalInMs,
- String sqlDialect)
+ String sqlDialect,
+ String database)
throws IoTDBConnectionException {
this.session = session;
this.endPoint = endPoint;
@@ -132,6 +136,7 @@ public class SessionConnection {
this.maxRetryCount = Math.max(0, maxRetryCount);
this.retryIntervalInMs = Math.max(0, retryIntervalInMs);
this.sqlDialect = sqlDialect;
+ this.database = database;
try {
init(endPoint, session.useSSL, session.trustStore,
session.trustStorePwd);
} catch (StatementExecutionException e) {
@@ -147,7 +152,8 @@ public class SessionConnection {
Supplier<List<TEndPoint>> availableNodes,
int maxRetryCount,
long retryIntervalInMs,
- String sqlDialect)
+ String sqlDialect,
+ String database)
throws IoTDBConnectionException {
this.session = session;
this.zoneId = zoneId == null ? ZoneId.systemDefault() : zoneId;
@@ -156,6 +162,7 @@ public class SessionConnection {
this.maxRetryCount = Math.max(0, maxRetryCount);
this.retryIntervalInMs = Math.max(0, retryIntervalInMs);
this.sqlDialect = sqlDialect;
+ this.database = database;
initClusterConn();
}
@@ -198,6 +205,9 @@ public class SessionConnection {
openReq.setZoneId(zoneId.toString());
openReq.putToConfiguration("version", session.version.toString());
openReq.putToConfiguration("sql_dialect", sqlDialect);
+ if (database != null) {
+ openReq.putToConfiguration("db", database);
+ }
try {
TSOpenSessionResp openResp = client.openSession(openReq);
@@ -501,7 +511,11 @@ public class SessionConnection {
throws TException {
request.setSessionId(sessionId);
request.setStatementId(statementId);
- return client.executeUpdateStatementV2(request).status;
+ TSExecuteStatementResp resp = client.executeUpdateStatementV2(request);
+ if (resp.isSetDatabase()) {
+ session.changeDatabase(resp.getDatabase());
+ }
+ return resp.status;
}
protected SessionDataSet executeRawDataQuery(
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 43f1817eb7e..da9256dd683 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.session.pool;
import org.apache.iotdb.common.rpc.thrift.TAggregationType;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.isession.INodeSupplier;
+import org.apache.iotdb.isession.IPooledSession;
import org.apache.iotdb.isession.ISession;
import org.apache.iotdb.isession.SessionConfig;
import org.apache.iotdb.isession.SessionDataSet;
@@ -57,6 +58,8 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import static org.apache.iotdb.rpc.RpcUtils.isUseDatabase;
+
/**
* SessionPool is a wrapper of a Session Set. Using SessionPool, the user do
not need to consider
* how to reuse a session connection. Even if the session is disconnected, the
session pool can
@@ -155,6 +158,9 @@ public class SessionPool implements ISessionPool {
protected String sqlDialect = SessionConfig.SQL_DIALECT;
+ // may be null
+ protected String database;
+
private static final String INSERT_RECORD_FAIL = "insertRecord failed";
private static final String INSERT_RECORD_ERROR_MSG = "unexpected error in
insertRecord";
@@ -490,6 +496,7 @@ public class SessionPool implements ISessionPool {
this.maxRetryCount = builder.maxRetryCount;
this.retryIntervalInMs = builder.retryIntervalInMs;
this.sqlDialect = builder.sqlDialect;
+ this.database = builder.defaultDatabase;
this.queryTimeoutInMs = builder.queryTimeoutInMs;
if (enableAutoFetch) {
@@ -546,6 +553,7 @@ public class SessionPool implements ISessionPool {
.maxRetryCount(maxRetryCount)
.retryIntervalInMs(retryIntervalInMs)
.sqlDialect(sqlDialect)
+ .database(database)
.timeOut(queryTimeoutInMs)
.build();
} else {
@@ -568,6 +576,7 @@ public class SessionPool implements ISessionPool {
.maxRetryCount(maxRetryCount)
.retryIntervalInMs(retryIntervalInMs)
.sqlDialect(sqlDialect)
+ .database(database)
.timeOut(queryTimeoutInMs)
.build();
}
@@ -704,6 +713,11 @@ public class SessionPool implements ISessionPool {
return session;
}
+ @Override
+ public IPooledSession getPooledSession() throws IoTDBConnectionException {
+ return new SessionWrapper((Session) getSession(), this);
+ }
+
@Override
public int currentAvailableSize() {
return queue.size();
@@ -715,7 +729,7 @@ public class SessionPool implements ISessionPool {
}
@SuppressWarnings({"squid:S2446"})
- private void putBack(ISession session) {
+ protected void putBack(ISession session) {
queue.push(session);
synchronized (this) {
// we do not need to notifyAll as any waited thread can continue to work
after waked up.
@@ -829,6 +843,11 @@ public class SessionPool implements ISessionPool {
}
}
+ protected void cleanSessionAndMayThrowConnectionException(ISession session) {
+ closeSession(session);
+ tryConstructNewSession();
+ }
+
/**
* insert the data of a device. For each timestamp, the number of
measurements is the same.
*
@@ -3031,6 +3050,13 @@ public class SessionPool implements ISessionPool {
@Override
public void executeNonQueryStatement(String sql)
throws StatementExecutionException, IoTDBConnectionException {
+
+ // use XXX is forbidden in SessionPool.executeNonQueryStatement
+ if (isUseDatabase(sql)) {
+ throw new IllegalArgumentException(
+ String.format("SessionPool doesn't support executing %s directly",
sql));
+ }
+
ISession session = getSession();
try {
session.executeNonQueryStatement(sql);
@@ -3541,6 +3567,8 @@ public class SessionPool implements ISessionPool {
private String sqlDialect = SessionConfig.SQL_DIALECT;
private long queryTimeoutInMs = SessionConfig.DEFAULT_QUERY_TIME_OUT;
+ private String defaultDatabase;
+
public Builder useSSL(boolean useSSL) {
this.useSSL = useSSL;
return this;
@@ -3661,6 +3689,11 @@ public class SessionPool implements ISessionPool {
return this;
}
+ public Builder database(String database) {
+ this.defaultDatabase = database;
+ return this;
+ }
+
public SessionPool build() {
return new SessionPool(this);
}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionWrapper.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionWrapper.java
new file mode 100644
index 00000000000..deb43f56e80
--- /dev/null
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionWrapper.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.session.pool;
+
+import org.apache.iotdb.isession.IPooledSession;
+import org.apache.iotdb.isession.SessionDataSet;
+import org.apache.iotdb.isession.util.Version;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.service.rpc.thrift.TSBackupConfigurationResp;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
+import org.apache.iotdb.session.Session;
+
+import org.apache.thrift.TException;
+import org.apache.tsfile.write.record.Tablet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * used for SessionPool.getSession need to do some other things like calling
+ * cleanSessionAndMayThrowConnectionException in SessionPool while
encountering connection exception
+ * only need to putBack to SessionPool while closing
+ */
+public class SessionWrapper implements IPooledSession {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SessionWrapper.class);
+
+ private Session session;
+
+ private final SessionPool sessionPool;
+
+ private final AtomicBoolean closed;
+
+ public SessionWrapper(Session session, SessionPool sessionPool) {
+ this.session = session;
+ this.sessionPool = sessionPool;
+ this.closed = new AtomicBoolean(false);
+ }
+
+ @Override
+ public Version getVersion() {
+ return session.getVersion();
+ }
+
+ @Override
+ public int getFetchSize() {
+ return session.getFetchSize();
+ }
+
+ @Override
+ public void close() throws IoTDBConnectionException {
+ if (closed.compareAndSet(false, true)) {
+ if (!Objects.equals(session.getDatabase(), sessionPool.database)
+ && sessionPool.database != null) {
+ try {
+ session.executeNonQueryStatement("use " + sessionPool.database);
+ } catch (StatementExecutionException e) {
+ LOGGER.warn(
+ "Failed to change back database by executing: use " +
sessionPool.database, e);
+ }
+ }
+ sessionPool.putBack(session);
+ session = null;
+ }
+ }
+
+ @Override
+ public String getTimeZone() {
+ return session.getTimeZone();
+ }
+
+ @Override
+ public SessionDataSet executeQueryStatement(String sql)
+ throws StatementExecutionException, IoTDBConnectionException {
+ try {
+ return session.executeQueryStatement(sql);
+ } catch (IoTDBConnectionException e) {
+ sessionPool.cleanSessionAndMayThrowConnectionException(session);
+ closed.set(true);
+ session = null;
+ throw e;
+ }
+ }
+
+ @Override
+ public SessionDataSet executeQueryStatement(String sql, long timeoutInMs)
+ throws StatementExecutionException, IoTDBConnectionException {
+ try {
+ return session.executeQueryStatement(sql, timeoutInMs);
+ } catch (IoTDBConnectionException e) {
+ sessionPool.cleanSessionAndMayThrowConnectionException(session);
+ closed.set(true);
+ session = null;
+ throw e;
+ }
+ }
+
+ @Override
+ public void executeNonQueryStatement(String sql)
+ throws IoTDBConnectionException, StatementExecutionException {
+ try {
+ session.executeNonQueryStatement(sql);
+ } catch (IoTDBConnectionException e) {
+ sessionPool.cleanSessionAndMayThrowConnectionException(session);
+ closed.set(true);
+ session = null;
+ throw e;
+ }
+ }
+
+ @Override
+ public String getTimestampPrecision() throws TException {
+ try {
+ return session.getTimestampPrecision();
+ } catch (TException e) {
+ sessionPool.cleanSessionAndMayThrowConnectionException(session);
+ closed.set(true);
+ session = null;
+ throw e;
+ }
+ }
+
+ @Override
+ public void insertTablet(Tablet tablet)
+ throws StatementExecutionException, IoTDBConnectionException {
+ try {
+ session.insertTablet(tablet);
+ } catch (IoTDBConnectionException e) {
+ sessionPool.cleanSessionAndMayThrowConnectionException(session);
+ closed.set(true);
+ session = null;
+ throw e;
+ }
+ }
+
+ @Override
+ public boolean isEnableQueryRedirection() {
+ return session.isEnableQueryRedirection();
+ }
+
+ @Override
+ public boolean isEnableRedirection() {
+ return session.isEnableRedirection();
+ }
+
+ @Override
+ public TSBackupConfigurationResp getBackupConfiguration()
+ throws IoTDBConnectionException, StatementExecutionException {
+ try {
+ return session.getBackupConfiguration();
+ } catch (IoTDBConnectionException e) {
+ sessionPool.cleanSessionAndMayThrowConnectionException(session);
+ closed.set(true);
+ session = null;
+ throw e;
+ }
+ }
+
+ @Override
+ public TSConnectionInfoResp fetchAllConnections() throws
IoTDBConnectionException {
+ try {
+ return session.fetchAllConnections();
+ } catch (IoTDBConnectionException e) {
+ sessionPool.cleanSessionAndMayThrowConnectionException(session);
+ closed.set(true);
+ session = null;
+ throw e;
+ }
+ }
+
+ @Override
+ public long getQueryTimeout() {
+ return session.getQueryTimeout();
+ }
+}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
index 307629bd868..b70506ead24 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
@@ -75,7 +75,14 @@ public class SubscriptionSession extends Session {
"Subscription session must be configured with an endpoint.");
}
return new SubscriptionSessionConnection(
- session, endpoint, zoneId, availableNodes, maxRetryCount,
retryIntervalInMs, sqlDialect);
+ session,
+ endpoint,
+ zoneId,
+ availableNodes,
+ maxRetryCount,
+ retryIntervalInMs,
+ sqlDialect,
+ database);
}
/////////////////////////////// topic ///////////////////////////////
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java
index 16f4959fffb..cc900abb7ba 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java
@@ -52,9 +52,18 @@ public class SubscriptionSessionConnection extends
SessionConnection {
Supplier<List<TEndPoint>> availableNodes,
int maxRetryCount,
long retryIntervalInMs,
- String sqlDialect)
+ String sqlDialect,
+ String database)
throws IoTDBConnectionException {
- super(session, endPoint, zoneId, availableNodes, maxRetryCount,
retryIntervalInMs, sqlDialect);
+ super(
+ session,
+ endPoint,
+ zoneId,
+ availableNodes,
+ maxRetryCount,
+ retryIntervalInMs,
+ sqlDialect,
+ database);
}
// from org.apache.iotdb.session.NodesSupplier.updateDataNodeList
diff --git
a/iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionConnectionTest.java
b/iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionConnectionTest.java
index f4be6898d2f..0785bc800d9 100644
---
a/iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionConnectionTest.java
+++
b/iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionConnectionTest.java
@@ -173,7 +173,8 @@ public class SessionConnectionTest {
() -> Collections.singletonList(new TEndPoint("local", 12)),
SessionConfig.MAX_RETRY_COUNT,
SessionConfig.RETRY_INTERVAL_IN_MS,
- "tree");
+ "tree",
+ null);
}
@Test(expected = IoTDBConnectionException.class)
@@ -194,7 +195,8 @@ public class SessionConnectionTest {
() -> Collections.singletonList(new TEndPoint("local", 12)),
SessionConfig.MAX_RETRY_COUNT,
SessionConfig.RETRY_INTERVAL_IN_MS,
- "tree");
+ "tree",
+ null);
}
@Test
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index fd4d23ba535..524e918a141 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -92,6 +92,8 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimePa
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Use;
+import
org.apache.iotdb.db.queryengine.plan.relational.sql.parser.ParsingException;
import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
@@ -299,6 +301,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
long startTime = System.nanoTime();
StatementType statementType = null;
Throwable t = null;
+ boolean useDatabase = false;
try {
// create and cache dataset
ExecutionResult result;
@@ -339,6 +342,10 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement s =
relationSqlParser.createStatement(statement,
clientSession.getZoneId());
+ if (s instanceof Use) {
+ useDatabase = true;
+ }
+
if (s == null) {
return RpcUtils.getTSExecuteStatementResp(
RpcUtils.getStatus(
@@ -381,9 +388,18 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
} else {
finished = true;
resp = RpcUtils.getTSExecuteStatementResp(result.status);
+ // set for use XX
+ if (useDatabase) {
+ resp.setDatabase(clientSession.getDatabaseName());
+ }
}
return resp;
}
+ } catch (ParsingException e) {
+ finished = true;
+ t = e;
+ return RpcUtils.getTSExecuteStatementResp(
+ RpcUtils.getStatus(TSStatusCode.SQL_PARSE_ERROR, e.getMessage()));
} catch (Exception e) {
finished = true;
t = e;
@@ -1193,6 +1209,8 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus,
CURRENT_RPC_VERSION);
return resp.setSessionId(-1);
}
+ Optional<String> database = parseDatabase(req);
+ IClientSession clientSession = SESSION_MANAGER.getCurrSession();
BasicOpenSessionResp openSessionResp =
SESSION_MANAGER.login(
SESSION_MANAGER.getCurrSession(),
@@ -1203,6 +1221,10 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
clientVersion,
sqlDialect);
TSStatus tsStatus = RpcUtils.getStatus(openSessionResp.getCode(),
openSessionResp.getMessage());
+
+ if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() &&
database.isPresent()) {
+ clientSession.setDatabaseName(database.get());
+ }
TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus,
CURRENT_RPC_VERSION);
return resp.setSessionId(openSessionResp.getSessionId());
}
@@ -1231,6 +1253,11 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
}
}
+ private Optional<String> parseDatabase(TSOpenSessionReq req) {
+ Map<String, String> configuration = req.configuration;
+ return configuration == null ? Optional.empty() :
Optional.ofNullable(configuration.get("db"));
+ }
+
@Override
public TSStatus closeSession(TSCloseSessionReq req) {
return new TSStatus(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index aea0ab14e5e..1cc847a41d2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -2828,14 +2828,26 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
try (ConfigNodeClient client =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
TSStatus tsStatus = client.deleteDatabases(req);
- if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() == tsStatus.getCode()) {
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ } else if (TSStatusCode.PATH_NOT_EXIST.getStatusCode() ==
tsStatus.getCode()) {
+ if (dropDB.isExists()) {
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ } else {
+ LOGGER.info(
+ "Failed to DROP DATABASE {}, because it doesn't exist",
+ dropDB.getDbName().getValue());
+ future.setException(
+ new IoTDBException(
+ String.format("Database %s doesn't exist",
dropDB.getDbName().getValue()),
+ TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()));
+ }
+ } else {
LOGGER.warn(
"Failed to execute delete database {} in config node, status is
{}.",
dropDB.getDbName().getValue(),
tsStatus);
future.setException(new IoTDBException(tsStatus.message,
tsStatus.getCode()));
- } else {
- future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
} catch (ClientManagerException | TException e) {
future.setException(e);
@@ -2853,22 +2865,26 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
// Send request to some API server
TSStatus tsStatus = configNodeClient.setDatabase(databaseSchema);
- // Get response or throw exception
- if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
- // If database already exists when loading, we do not throw exceptions
to avoid printing too
- // many logs
- if (TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode() ==
tsStatus.getCode()
- && createDB.isSetIfNotExists()) {
+
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() == tsStatus.getCode()) {
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ } else if (TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode() ==
tsStatus.getCode()) {
+ if (createDB.isSetIfNotExists()) {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
} else {
- LOGGER.warn(
- "Failed to execute create database {} in config node, status is
{}.",
- createDB.getDbName(),
- tsStatus);
- future.setException(new IoTDBException(tsStatus.message,
tsStatus.code));
+ LOGGER.info(
+ "Failed to CREATE DATABASE {}, because it already exists",
createDB.getDbName());
+ future.setException(
+ new IoTDBException(
+ String.format("Database %s already exists",
createDB.getDbName()),
+ TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()));
}
} else {
- future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ LOGGER.warn(
+ "Failed to execute create database {} in config node, status is
{}.",
+ createDB.getDbName(),
+ tsStatus);
+ future.setException(new IoTDBException(tsStatus.message,
tsStatus.code));
}
} catch (ClientManagerException | TException e) {
future.setException(e);
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
index 917a9a57900..9f5c5e7a8ce 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
@@ -70,6 +70,8 @@ struct TSExecuteStatementResp {
12: optional TSTracingInfo tracingInfo
13: optional list<binary> queryResult
14: optional bool moreData
+ // only be set while executing use XXX successfully
+ 15: optional string database
}
enum TSProtocolVersion {