This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/useXX in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 707b3fc8fdb3ead6d8b27579d06e1afd3d5e4722 Author: JackieTien97 <[email protected]> AuthorDate: Tue Jul 9 10:57:28 2024 +0800 add TableModelSessionPoolExample and IT --- .../apache/iotdb/TableModelSessionPoolExample.java | 147 ++++++++++++++++ .../iotdb/it/env/cluster/env/AbstractEnv.java | 31 ++++ .../iotdb/it/env/remote/env/RemoteServerEnv.java | 36 ++++ .../java/org/apache/iotdb/itbase/env/BaseEnv.java | 5 + .../relational/it/schema/IoTDBDatabaseIT.java | 21 ++- .../iotdb/relational/it/schema/IoTDBTableIT.java | 6 +- .../it/session/IoTDBTableModelSessionIT.java | 114 ++++++++++++ .../session/pool/IoTDBTableModelSessionPoolIT.java | 196 +++++++++++++++++++++ .../apache/iotdb/isession/pool/ISessionPool.java | 3 + .../org/apache/iotdb/session/pool/SessionPool.java | 1 + .../protocol/thrift/impl/ClientRPCServiceImpl.java | 6 + .../config/executor/ClusterConfigTaskExecutor.java | 46 +++-- 12 files changed, 590 insertions(+), 22 deletions(-) diff --git a/example/session/src/main/java/org/apache/iotdb/TableModelSessionPoolExample.java b/example/session/src/main/java/org/apache/iotdb/TableModelSessionPoolExample.java new file mode 100644 index 00000000000..78af9efdd6d --- /dev/null +++ b/example/session/src/main/java/org/apache/iotdb/TableModelSessionPoolExample.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb; + +import org.apache.iotdb.isession.IPooledSession; +import org.apache.iotdb.isession.SessionDataSet; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.pool.SessionPool; + +public class TableModelSessionPoolExample { + + private static final String LOCAL_HOST = "127.0.0.1"; + + public static void main(String[] args) { + + // don't specify database in constructor + SessionPool sessionPool = + new SessionPool.Builder() + .host(LOCAL_HOST) + .port(6667) + .user("root") + .password("root") + .maxSize(1) + .sqlDialect("table") + .build(); + + try (IPooledSession session = sessionPool.getPooledSession()) { + + session.executeNonQueryStatement("CREATE DATABASE test1"); + session.executeNonQueryStatement("CREATE DATABASE test2"); + + session.executeNonQueryStatement("use test2"); + + // or use full qualified table name + session.executeNonQueryStatement( + "create table test1.table1(region_id STRING ID, plant_id STRING ID, device_id STRING ID, model STRING ATTRIBUTE, temperature FLOAT MEASUREMENT, humidity DOUBLE MEASUREMENT) with (TTL=3600000)"); + + session.executeNonQueryStatement( + "create table table2(region_id STRING ID, plant_id STRING ID, color STRING ATTRIBUTE, temperature FLOAT MEASUREMENT, speed DOUBLE MEASUREMENT) with (TTL=6600000)"); + + // show tables from current database + try (SessionDataSet dataSet = session.executeQueryStatement("SHOW TABLES")) { + System.out.println(dataSet.getColumnNames()); + System.out.println(dataSet.getColumnTypes()); + while (dataSet.hasNext()) { + System.out.println(dataSet.next()); + } + } + + // show tables by specifying another database + // using SHOW tables FROM + try (SessionDataSet dataSet = session.executeQueryStatement("SHOW TABLES FROM test1")) { + System.out.println(dataSet.getColumnNames()); + System.out.println(dataSet.getColumnTypes()); + while (dataSet.hasNext()) { + System.out.println(dataSet.next()); + } + } + + } catch (IoTDBConnectionException e) { + e.printStackTrace(); + } catch (StatementExecutionException e) { + e.printStackTrace(); + } finally { + sessionPool.close(); + } + + // specify database in constructor + sessionPool = + new SessionPool.Builder() + .host(LOCAL_HOST) + .port(6667) + .user("root") + .password("root") + .maxSize(1) + .sqlDialect("table") + .database("test1") + .build(); + + try (IPooledSession session = sessionPool.getPooledSession()) { + + // show tables from current database + try (SessionDataSet dataSet = session.executeQueryStatement("SHOW TABLES")) { + System.out.println(dataSet.getColumnNames()); + System.out.println(dataSet.getColumnTypes()); + while (dataSet.hasNext()) { + System.out.println(dataSet.next()); + } + } + + // change database to test2 + session.executeNonQueryStatement("use test2"); + + // show tables by specifying another database + // using SHOW tables FROM + try (SessionDataSet dataSet = session.executeQueryStatement("SHOW TABLES")) { + System.out.println(dataSet.getColumnNames()); + System.out.println(dataSet.getColumnTypes()); + while (dataSet.hasNext()) { + System.out.println(dataSet.next()); + } + } + + } catch (IoTDBConnectionException e) { + e.printStackTrace(); + } catch (StatementExecutionException e) { + e.printStackTrace(); + } + + try (IPooledSession session = sessionPool.getPooledSession()) { + + // show tables from default database test1 + try (SessionDataSet dataSet = session.executeQueryStatement("SHOW TABLES")) { + System.out.println(dataSet.getColumnNames()); + System.out.println(dataSet.getColumnTypes()); + while (dataSet.hasNext()) { + System.out.println(dataSet.next()); + } + } + + } catch (IoTDBConnectionException e) { + e.printStackTrace(); + } catch (StatementExecutionException e) { + e.printStackTrace(); + } finally { + sessionPool.close(); + } + } +} diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java index 2fd68b5bbf6..b0e483d61bb 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java @@ -429,6 +429,22 @@ public abstract class AbstractEnv implements BaseEnv { return session; } + @Override + public ISession getSessionConnectionWithDB(String sqlDialect, String database) + throws IoTDBConnectionException { + DataNodeWrapper dataNode = + this.dataNodeWrapperList.get(rand.nextInt(this.dataNodeWrapperList.size())); + Session session = + new Session.Builder() + .host(dataNode.getIp()) + .port(dataNode.getPort()) + .sqlDialect(sqlDialect) + .database(database) + .build(); + session.open(); + return session; + } + @Override public ISession getSessionConnection(String userName, String password, String sqlDialect) throws IoTDBConnectionException { @@ -480,6 +496,21 @@ public abstract class AbstractEnv implements BaseEnv { .build(); } + @Override + public ISessionPool getSessionPool(int maxSize, String sqlDialect, String database) { + DataNodeWrapper dataNode = + this.dataNodeWrapperList.get(rand.nextInt(this.dataNodeWrapperList.size())); + return new SessionPool.Builder() + .host(dataNode.getIp()) + .port(dataNode.getPort()) + .user(SessionConfig.DEFAULT_USER) + .password(SessionConfig.DEFAULT_PASSWORD) + .maxSize(maxSize) + .sqlDialect(sqlDialect) + .database(database) + .build(); + } + protected NodeConnection getWriteConnection( Constant.Version version, String username, String password, String sqlDialect) throws SQLException { diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java index 665d6248ad0..6c75c22ad96 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java @@ -216,6 +216,28 @@ public class RemoteServerEnv implements BaseEnv { .build(); } + @Override + public ISessionPool getSessionPool(int maxSize, String sqlDialect, String database) { + return new SessionPool.Builder() + .host(SessionConfig.DEFAULT_HOST) + .port(SessionConfig.DEFAULT_PORT) + .user(SessionConfig.DEFAULT_USER) + .password(SessionConfig.DEFAULT_PASSWORD) + .maxSize(maxSize) + .fetchSize(SessionConfig.DEFAULT_FETCH_SIZE) + .waitToGetSessionTimeoutInMs(60_000) + .enableCompression(false) + .zoneId(null) + .enableRedirection(SessionConfig.DEFAULT_REDIRECTION_MODE) + .connectionTimeoutInMs(SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS) + .version(SessionConfig.DEFAULT_VERSION) + .thriftDefaultBufferSize(SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY) + .thriftMaxFrameSize(SessionConfig.DEFAULT_MAX_FRAME_SIZE) + .sqlDialect(sqlDialect) + .database(database) + .build(); + } + @Override public ISession getSessionConnection(String sqlDialect) throws IoTDBConnectionException { Session session = @@ -228,6 +250,20 @@ public class RemoteServerEnv implements BaseEnv { return session; } + @Override + public ISession getSessionConnectionWithDB(String sqlDialect, String database) + throws IoTDBConnectionException { + Session session = + new Session.Builder() + .host(ip_addr) + .port(Integer.parseInt(port)) + .sqlDialect(sqlDialect) + .database(database) + .build(); + session.open(); + return session; + } + @Override public ISession getSessionConnection(String userName, String password, String sqlDialect) throws IoTDBConnectionException { diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java index 472df6c6f63..b5af08af1cf 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java @@ -174,12 +174,17 @@ public interface BaseEnv { ISessionPool getSessionPool(int maxSize, String sqlDialect); + ISessionPool getSessionPool(int maxSize, String sqlDialect, String database); + default ISession getSessionConnection() throws IoTDBConnectionException { return getSessionConnection(TREE_SQL_DIALECT); } ISession getSessionConnection(String sqlDialect) throws IoTDBConnectionException; + ISession getSessionConnectionWithDB(String sqlDialect, String database) + throws IoTDBConnectionException; + default ISession getSessionConnection(String userName, String password) throws IoTDBConnectionException { return getSessionConnection(userName, password, TREE_SQL_DIALECT); diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java index 3260d4b27f6..56246f1d790 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java @@ -65,6 +65,17 @@ public class IoTDBDatabaseIT { // create statement.execute("create database test"); + // create duplicated database without IF NOT EXISTS + try { + statement.execute("create database test"); + fail("create database test shouldn't succeed because test already exists"); + } catch (SQLException e) { + assertEquals("501: Database test already exists", e.getMessage()); + } + + // create duplicated database with IF NOT EXISTS + statement.execute("create database IF NOT EXISTS test"); + String[] databaseNames = new String[] {"test"}; int[] schemaReplicaFactors = new int[] {1}; int[] dataReplicaFactors = new int[] {1}; @@ -105,15 +116,17 @@ public class IoTDBDatabaseIT { assertFalse(resultSet.next()); } - // drop nonexistent database + // drop nonexistent database without IF EXISTS try { statement.execute("drop database test"); - fail("drop database test shouldn't succeed because test1 doesn't exist"); + fail("drop database test shouldn't succeed because test doesn't exist"); } catch (SQLException e) { - // TODO error msg should be changed to 500: Database test1 doesn't exists - assertEquals("508: Path [root.test] does not exist", e.getMessage()); + assertEquals("500: Database test doesn't exist", e.getMessage()); } + // drop nonexistent database with IF EXISTS + statement.execute("drop database IF EXISTS test"); + // create with strange name try { statement.execute("create database 1test"); diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBTableIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBTableIT.java index 8e74b7ced8a..72519a38c45 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBTableIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBTableIT.java @@ -129,7 +129,7 @@ public class IoTDBTableIT { } while (resultSet.next()) { assertEquals(tableNames[cnt], resultSet.getString(1)); - assertEquals(ttls[cnt], resultSet.getInt(2)); + assertEquals(ttls[cnt], resultSet.getLong(2)); cnt++; } assertEquals(tableNames.length, cnt); @@ -150,7 +150,7 @@ public class IoTDBTableIT { } while (resultSet.next()) { assertEquals(tableNames[cnt], resultSet.getString(1)); - assertEquals(ttls[cnt], resultSet.getInt(2)); + assertEquals(ttls[cnt], resultSet.getLong(2)); cnt++; } assertEquals(tableNames.length, cnt); @@ -167,7 +167,7 @@ public class IoTDBTableIT { } while (resultSet.next()) { assertEquals(tableNames[cnt], resultSet.getString(1)); - assertEquals(ttls[cnt], resultSet.getInt(2)); + assertEquals(ttls[cnt], resultSet.getLong(2)); cnt++; } assertEquals(tableNames.length, cnt); diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBTableModelSessionIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBTableModelSessionIT.java new file mode 100644 index 00000000000..24abb4c6b87 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBTableModelSessionIT.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.relational.it.session; + +import org.apache.iotdb.isession.ISession; +import org.apache.iotdb.isession.SessionDataSet; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.ClusterIT; +import org.apache.iotdb.itbase.category.LocalStandaloneIT; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; + +import org.apache.tsfile.read.common.RowRecord; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import static org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.showTablesColumnHeaders; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +@RunWith(IoTDBTestRunner.class) +@Category({LocalStandaloneIT.class, ClusterIT.class}) +public class IoTDBTableModelSessionIT { + + @Before + public void setUp() throws Exception { + EnvFactory.getEnv().initClusterEnvironment(); + } + + @After + public void tearDown() throws Exception { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void testUseDatabase() { + String[] table1Names = new String[] {"table1"}; + long[] table1ttls = new long[] {3600000L}; + + String[] table2Names = new String[] {"table2"}; + long[] table2ttls = new long[] {6600000L}; + + try (ISession session = EnvFactory.getEnv().getSessionConnectionWithDB("table", "test2")) { + + session.executeNonQueryStatement("CREATE DATABASE test1"); + session.executeNonQueryStatement("CREATE DATABASE test2"); + + // or use full qualified table name + session.executeNonQueryStatement( + "create table test1.table1(region_id STRING ID, plant_id STRING ID, device_id STRING ID, model STRING ATTRIBUTE, temperature FLOAT MEASUREMENT, humidity DOUBLE MEASUREMENT) with (TTL=3600000)"); + + session.executeNonQueryStatement( + "create table table2(region_id STRING ID, plant_id STRING ID, color STRING ATTRIBUTE, temperature FLOAT MEASUREMENT, speed DOUBLE MEASUREMENT) with (TTL=6600000)"); + + try (SessionDataSet dataSet = session.executeQueryStatement("SHOW TABLES")) { + int cnt = 0; + assertEquals(showTablesColumnHeaders.size(), dataSet.getColumnNames().size()); + for (int i = 0; i < showTablesColumnHeaders.size(); i++) { + assertEquals( + showTablesColumnHeaders.get(i).getColumnName(), dataSet.getColumnNames().get(i)); + } + while (dataSet.hasNext()) { + RowRecord rowRecord = dataSet.next(); + assertEquals(table2Names[cnt], rowRecord.getFields().get(0).getStringValue()); + assertEquals(table2ttls[cnt], rowRecord.getFields().get(1).getLongV()); + cnt++; + } + assertEquals(table2Names.length, cnt); + } + + session.executeNonQueryStatement("use test1"); + + try (SessionDataSet dataSet = session.executeQueryStatement("SHOW TABLES")) { + int cnt = 0; + assertEquals(showTablesColumnHeaders.size(), dataSet.getColumnNames().size()); + for (int i = 0; i < showTablesColumnHeaders.size(); i++) { + assertEquals( + showTablesColumnHeaders.get(i).getColumnName(), dataSet.getColumnNames().get(i)); + } + while (dataSet.hasNext()) { + RowRecord rowRecord = dataSet.next(); + assertEquals(table1Names[cnt], rowRecord.getFields().get(0).getStringValue()); + assertEquals(table1ttls[cnt], rowRecord.getFields().get(1).getLongV()); + cnt++; + } + assertEquals(table1Names.length, cnt); + } + + } catch (IoTDBConnectionException | StatementExecutionException e) { + fail(e.getMessage()); + } + } +} diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/pool/IoTDBTableModelSessionPoolIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/pool/IoTDBTableModelSessionPoolIT.java new file mode 100644 index 00000000000..db87a401de9 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/pool/IoTDBTableModelSessionPoolIT.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.relational.it.session.pool; + +import org.apache.iotdb.isession.IPooledSession; +import org.apache.iotdb.isession.SessionDataSet; +import org.apache.iotdb.isession.pool.ISessionPool; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.ClusterIT; +import org.apache.iotdb.itbase.category.LocalStandaloneIT; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; + +import org.apache.tsfile.read.common.RowRecord; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import static org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.showTablesColumnHeaders; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +@RunWith(IoTDBTestRunner.class) +@Category({LocalStandaloneIT.class, ClusterIT.class}) +public class IoTDBTableModelSessionPoolIT { + + @Before + public void setUp() throws Exception { + EnvFactory.getEnv().initClusterEnvironment(); + } + + @After + public void tearDown() throws Exception { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void testUseDatabase() { + + String[] table1Names = new String[] {"table1"}; + long[] table1ttls = new long[] {3600000L}; + + String[] table2Names = new String[] {"table2"}; + long[] table2ttls = new long[] {6600000L}; + + ISessionPool sessionPool = EnvFactory.getEnv().getSessionPool(1, "table"); + try (IPooledSession session = sessionPool.getPooledSession()) { + + session.executeNonQueryStatement("CREATE DATABASE test1"); + session.executeNonQueryStatement("CREATE DATABASE test2"); + + session.executeNonQueryStatement("use test2"); + + // or use full qualified table name + session.executeNonQueryStatement( + "create table test1.table1(region_id STRING ID, plant_id STRING ID, device_id STRING ID, model STRING ATTRIBUTE, temperature FLOAT MEASUREMENT, humidity DOUBLE MEASUREMENT) with (TTL=3600000)"); + + session.executeNonQueryStatement( + "create table table2(region_id STRING ID, plant_id STRING ID, color STRING ATTRIBUTE, temperature FLOAT MEASUREMENT, speed DOUBLE MEASUREMENT) with (TTL=6600000)"); + + try (SessionDataSet dataSet = session.executeQueryStatement("SHOW TABLES")) { + int cnt = 0; + assertEquals(showTablesColumnHeaders.size(), dataSet.getColumnNames().size()); + for (int i = 0; i < showTablesColumnHeaders.size(); i++) { + assertEquals( + showTablesColumnHeaders.get(i).getColumnName(), dataSet.getColumnNames().get(i)); + } + while (dataSet.hasNext()) { + RowRecord rowRecord = dataSet.next(); + assertEquals(table2Names[cnt], rowRecord.getFields().get(0).getStringValue()); + assertEquals(table2ttls[cnt], rowRecord.getFields().get(1).getLongV()); + cnt++; + } + assertEquals(table2Names.length, cnt); + } + + } catch (IoTDBConnectionException | StatementExecutionException e) { + fail(e.getMessage()); + } + + try (IPooledSession session = sessionPool.getPooledSession()) { + + // current session's database is still test2 + try (SessionDataSet dataSet = session.executeQueryStatement("SHOW TABLES")) { + int cnt = 0; + assertEquals(showTablesColumnHeaders.size(), dataSet.getColumnNames().size()); + for (int i = 0; i < showTablesColumnHeaders.size(); i++) { + assertEquals( + showTablesColumnHeaders.get(i).getColumnName(), dataSet.getColumnNames().get(i)); + } + while (dataSet.hasNext()) { + RowRecord rowRecord = dataSet.next(); + assertEquals(table2Names[cnt], rowRecord.getFields().get(0).getStringValue()); + assertEquals(table2ttls[cnt], rowRecord.getFields().get(1).getLongV()); + cnt++; + } + assertEquals(table2Names.length, cnt); + } + + } catch (IoTDBConnectionException | StatementExecutionException e) { + fail(e.getMessage()); + } finally { + sessionPool.close(); + } + + // specify database in constructor + sessionPool = EnvFactory.getEnv().getSessionPool(1, "table", "test1"); + + try (IPooledSession session = sessionPool.getPooledSession()) { + + // current session's database is test1 + try (SessionDataSet dataSet = session.executeQueryStatement("SHOW TABLES")) { + int cnt = 0; + assertEquals(showTablesColumnHeaders.size(), dataSet.getColumnNames().size()); + for (int i = 0; i < showTablesColumnHeaders.size(); i++) { + assertEquals( + showTablesColumnHeaders.get(i).getColumnName(), dataSet.getColumnNames().get(i)); + } + while (dataSet.hasNext()) { + RowRecord rowRecord = dataSet.next(); + assertEquals(table1Names[cnt], rowRecord.getFields().get(0).getStringValue()); + assertEquals(table1ttls[cnt], rowRecord.getFields().get(1).getLongV()); + cnt++; + } + assertEquals(table1Names.length, cnt); + } + + // change database to test2 + session.executeNonQueryStatement("use test2"); + + try (SessionDataSet dataSet = session.executeQueryStatement("SHOW TABLES")) { + int cnt = 0; + assertEquals(showTablesColumnHeaders.size(), dataSet.getColumnNames().size()); + for (int i = 0; i < showTablesColumnHeaders.size(); i++) { + assertEquals( + showTablesColumnHeaders.get(i).getColumnName(), dataSet.getColumnNames().get(i)); + } + while (dataSet.hasNext()) { + RowRecord rowRecord = dataSet.next(); + assertEquals(table2Names[cnt], rowRecord.getFields().get(0).getStringValue()); + assertEquals(table2ttls[cnt], rowRecord.getFields().get(1).getLongV()); + cnt++; + } + assertEquals(table2Names.length, cnt); + } + + } catch (IoTDBConnectionException | StatementExecutionException e) { + fail(e.getMessage()); + } + + // after putting back, the session's database should be changed back to default test1 + try (IPooledSession session = sessionPool.getPooledSession()) { + + try (SessionDataSet dataSet = session.executeQueryStatement("SHOW TABLES")) { + int cnt = 0; + assertEquals(showTablesColumnHeaders.size(), dataSet.getColumnNames().size()); + for (int i = 0; i < showTablesColumnHeaders.size(); i++) { + assertEquals( + showTablesColumnHeaders.get(i).getColumnName(), dataSet.getColumnNames().get(i)); + } + while (dataSet.hasNext()) { + RowRecord rowRecord = dataSet.next(); + assertEquals(table1Names[cnt], rowRecord.getFields().get(0).getStringValue()); + assertEquals(table1ttls[cnt], rowRecord.getFields().get(1).getLongV()); + cnt++; + } + assertEquals(table1Names.length, cnt); + } + + } catch (IoTDBConnectionException | StatementExecutionException e) { + fail(e.getMessage()); + } finally { + sessionPool.close(); + } + } +} diff --git a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java index eeb6509e89d..768a4ceb136 100644 --- a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java +++ b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java @@ -20,6 +20,7 @@ package org.apache.iotdb.isession.pool; import org.apache.iotdb.common.rpc.thrift.TAggregationType; +import org.apache.iotdb.isession.IPooledSession; import org.apache.iotdb.isession.template.Template; import org.apache.iotdb.isession.util.SystemStatus; import org.apache.iotdb.isession.util.Version; @@ -558,6 +559,8 @@ public interface ISessionPool { long getQueryTimeout(); + IPooledSession getPooledSession() throws IoTDBConnectionException; + /** * @deprecated */ diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java index aae9db1377f..da9256dd683 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java @@ -713,6 +713,7 @@ public class SessionPool implements ISessionPool { return session; } + @Override public IPooledSession getPooledSession() throws IoTDBConnectionException { return new SessionWrapper((Session) getSession(), this); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index c5f485b680d..524e918a141 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -93,6 +93,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions; import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Use; +import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.ParsingException; import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser; import org.apache.iotdb.db.queryengine.plan.statement.Statement; import org.apache.iotdb.db.queryengine.plan.statement.StatementType; @@ -394,6 +395,11 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } return resp; } + } catch (ParsingException e) { + finished = true; + t = e; + return RpcUtils.getTSExecuteStatementResp( + RpcUtils.getStatus(TSStatusCode.SQL_PARSE_ERROR, e.getMessage())); } catch (Exception e) { finished = true; t = e; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index aea0ab14e5e..1cc847a41d2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -2828,14 +2828,26 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { try (ConfigNodeClient client = CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { TSStatus tsStatus = client.deleteDatabases(req); - if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) { + if (TSStatusCode.SUCCESS_STATUS.getStatusCode() == tsStatus.getCode()) { + future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); + } else if (TSStatusCode.PATH_NOT_EXIST.getStatusCode() == tsStatus.getCode()) { + if (dropDB.isExists()) { + future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); + } else { + LOGGER.info( + "Failed to DROP DATABASE {}, because it doesn't exist", + dropDB.getDbName().getValue()); + future.setException( + new IoTDBException( + String.format("Database %s doesn't exist", dropDB.getDbName().getValue()), + TSStatusCode.DATABASE_NOT_EXIST.getStatusCode())); + } + } else { LOGGER.warn( "Failed to execute delete database {} in config node, status is {}.", dropDB.getDbName().getValue(), tsStatus); future.setException(new IoTDBException(tsStatus.message, tsStatus.getCode())); - } else { - future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); } } catch (ClientManagerException | TException e) { future.setException(e); @@ -2853,22 +2865,26 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { // Send request to some API server TSStatus tsStatus = configNodeClient.setDatabase(databaseSchema); - // Get response or throw exception - if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) { - // If database already exists when loading, we do not throw exceptions to avoid printing too - // many logs - if (TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode() == tsStatus.getCode() - && createDB.isSetIfNotExists()) { + + if (TSStatusCode.SUCCESS_STATUS.getStatusCode() == tsStatus.getCode()) { + future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); + } else if (TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode() == tsStatus.getCode()) { + if (createDB.isSetIfNotExists()) { future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); } else { - LOGGER.warn( - "Failed to execute create database {} in config node, status is {}.", - createDB.getDbName(), - tsStatus); - future.setException(new IoTDBException(tsStatus.message, tsStatus.code)); + LOGGER.info( + "Failed to CREATE DATABASE {}, because it already exists", createDB.getDbName()); + future.setException( + new IoTDBException( + String.format("Database %s already exists", createDB.getDbName()), + TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode())); } } else { - future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); + LOGGER.warn( + "Failed to execute create database {} in config node, status is {}.", + createDB.getDbName(), + tsStatus); + future.setException(new IoTDBException(tsStatus.message, tsStatus.code)); } } catch (ClientManagerException | TException e) { future.setException(e);
