This is an automated email from the ASF dual-hosted git repository.
Caideyipi pushed a commit to branch codex/jdbc-driver-info
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/codex/jdbc-driver-info by this
push:
new 164931f42f5 Fix JDBC resource lifecycle cleanup
164931f42f5 is described below
commit 164931f42f5c9284623c11cda8af56c03c1d0227
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jun 10 12:21:37 2026 +0800
Fix JDBC resource lifecycle cleanup
---
.../org/apache/iotdb/jdbc/IoTDBConnection.java | 79 ++++++--
.../org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java | 50 +++--
.../apache/iotdb/jdbc/IoTDBPreparedStatement.java | 26 ++-
.../java/org/apache/iotdb/jdbc/IoTDBStatement.java | 207 ++++++++++++++++-----
.../iotdb/jdbc/IoTDBTablePreparedStatement.java | 95 ++++++++--
.../org/apache/iotdb/jdbc/IoTDBConnectionTest.java | 121 ++++++++++++
.../apache/iotdb/jdbc/IoTDBJDBCResultSetTest.java | 164 ++++++++++++++++
.../iotdb/jdbc/IoTDBPreparedStatementTest.java | 29 +++
.../org/apache/iotdb/jdbc/IoTDBStatementTest.java | 125 +++++++++++++
.../jdbc/IoTDBTablePreparedStatementTest.java | 204 ++++++++++++++++++++
10 files changed, 999 insertions(+), 101 deletions(-)
diff --git
a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
index f514719fced..8af5bcd245a 100644
--- a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
+++ b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
@@ -64,9 +64,13 @@ import java.time.DateTimeException;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
public class IoTDBConnection implements Connection {
@@ -83,6 +87,8 @@ public class IoTDBConnection implements Connection {
private boolean isClosed = true;
private SQLWarning warningChain = null;
private TTransport transport;
+ private final Set<IoTDBStatement> openStatements =
+ Collections.newSetFromMap(new ConcurrentHashMap<IoTDBStatement,
Boolean>());
/**
* Timeout of query can be set by users. Unit: s If not set, default value 0
will be used, which
@@ -181,18 +187,27 @@ public class IoTDBConnection implements Connection {
if (isClosed) {
return;
}
+ SQLException statementCloseException = closeOpenStatements();
TSCloseSessionReq req = new TSCloseSessionReq(sessionId);
try {
getClient().closeSession(req);
} catch (TException e) {
- throw new SQLException(
- "Error occurs when closing session at server. Maybe server is
down.", e);
+ SQLException closeSessionException =
+ new SQLException("Error occurs when closing session at server. Maybe
server is down.", e);
+ if (statementCloseException != null) {
+ closeSessionException.addSuppressed(statementCloseException);
+ }
+ throw closeSessionException;
} finally {
isClosed = true;
+ openStatements.clear();
if (transport != null) {
transport.close();
}
}
+ if (statementCloseException != null) {
+ throw statementCloseException;
+ }
}
@Override
@@ -233,7 +248,10 @@ public class IoTDBConnection implements Connection {
@Override
public Statement createStatement() throws SQLException {
checkOpen("createStatement");
- return new IoTDBStatement(this, getClient(), sessionId, zoneId, charset,
queryTimeout);
+ IoTDBStatement statement =
+ new IoTDBStatement(this, getClient(), sessionId, zoneId, charset,
queryTimeout);
+ registerStatement(statement);
+ return statement;
}
@Override
@@ -249,7 +267,10 @@ public class IoTDBConnection implements Connection {
throw new SQLException(
String.format("Statements with ResultSet type %d are not supported",
resultSetType));
}
- return new IoTDBStatement(this, getClient(), sessionId, zoneId, charset,
queryTimeout);
+ IoTDBStatement statement =
+ new IoTDBStatement(this, getClient(), sessionId, zoneId, charset,
queryTimeout);
+ registerStatement(statement);
+ return statement;
}
@Override
@@ -298,12 +319,10 @@ public class IoTDBConnection implements Connection {
}
}
- PreparedStatement stmt = this.prepareStatement("USE ?");
- stmt.setString(1, arg0);
- try {
+ try (PreparedStatement stmt = this.prepareStatement("USE ?")) {
+ stmt.setString(1, arg0);
stmt.execute();
} catch (SQLException e) {
- stmt.close();
logger.error(JdbcMessages.USE_DATABASE_ERROR, e.getMessage());
throw e;
}
@@ -378,12 +397,10 @@ public class IoTDBConnection implements Connection {
}
}
- PreparedStatement stmt = this.prepareStatement("USE ?");
- stmt.setString(1, arg0);
- try {
+ try (PreparedStatement stmt = this.prepareStatement("USE ?")) {
+ stmt.setString(1, arg0);
stmt.execute();
} catch (SQLException e) {
- stmt.close();
logger.error(JdbcMessages.USE_DATABASE_ERROR, e.getMessage());
throw e;
}
@@ -475,11 +492,18 @@ public class IoTDBConnection implements Connection {
@Override
public PreparedStatement prepareStatement(String sql) throws SQLException {
checkOpen("prepareStatement");
+ if (sql == null) {
+ throw new SQLException("SQL statement cannot be null");
+ }
+ IoTDBStatement statement;
if (getSqlDialect().equals(Constant.TABLE_DIALECT)) {
- return new IoTDBTablePreparedStatement(this, getClient(), sessionId,
sql, zoneId, charset);
+ statement =
+ new IoTDBTablePreparedStatement(this, getClient(), sessionId, sql,
zoneId, charset);
} else {
- return new IoTDBPreparedStatement(this, getClient(), sessionId, sql,
zoneId, charset);
+ statement = new IoTDBPreparedStatement(this, getClient(), sessionId,
sql, zoneId, charset);
}
+ registerStatement(statement);
+ return (PreparedStatement) statement;
}
@Override
@@ -555,7 +579,8 @@ public class IoTDBConnection implements Connection {
throw new SQLException(JdbcMessages.NOT_SUPPORT_SET_NETWORK_TIMEOUT);
}
- public int getQueryTimeout() {
+ public int getQueryTimeout() throws SQLException {
+ checkOpen("getQueryTimeout");
return this.queryTimeout;
}
@@ -779,6 +804,30 @@ public class IoTDBConnection implements Connection {
}
}
+ private void registerStatement(IoTDBStatement statement) {
+ openStatements.add(statement);
+ }
+
+ void unregisterStatement(IoTDBStatement statement) {
+ openStatements.remove(statement);
+ }
+
+ private SQLException closeOpenStatements() {
+ SQLException statementCloseException = null;
+ for (IoTDBStatement statement : new ArrayList<>(openStatements)) {
+ try {
+ statement.close();
+ } catch (SQLException e) {
+ if (statementCloseException == null) {
+ statementCloseException = e;
+ } else {
+ statementCloseException.addSuppressed(e);
+ }
+ }
+ }
+ return statementCloseException;
+ }
+
private void checkOpenForClientInfo(String action) throws
SQLClientInfoException {
if (isClosed) {
throw new SQLClientInfoException(
diff --git
a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java
b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java
index 8ec783e05f9..d9f3e94b3f9 100644
---
a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java
+++
b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java
@@ -83,6 +83,7 @@ public class IoTDBJDBCResultSet implements ResultSet {
private List<String> sgColumns = null;
private Charset charset = TSFileConfig.STRING_CHARSET;
private String timeFormat = RpcUtils.DEFAULT_TIME_FORMAT;
+ private final long queryId;
private boolean explicitlyClosed = false;
@SuppressWarnings("squid:S107") // ignore Methods should not have too many
parameters
@@ -105,6 +106,7 @@ public class IoTDBJDBCResultSet implements ResultSet {
boolean tableModel,
List<Integer> columnIndex2TsBlockColumnIndexList)
throws SQLException {
+ this.queryId = queryId;
this.ioTDBRpcDataSet =
new IoTDBRpcDataSet(
sql,
@@ -115,7 +117,7 @@ public class IoTDBJDBCResultSet implements ResultSet {
moreData,
queryId,
statement.getStmtId(),
- client,
+ getResultSetClient(client, queryId),
sessionId,
dataSet,
statement.getFetchSizeInternal(),
@@ -151,6 +153,7 @@ public class IoTDBJDBCResultSet implements ResultSet {
boolean moreData,
ZoneId zoneId)
throws SQLException {
+ this.queryId = queryId;
this.ioTDBRpcDataSet =
new IoTDBRpcDataSet(
sql,
@@ -161,7 +164,7 @@ public class IoTDBJDBCResultSet implements ResultSet {
moreData,
queryId,
((IoTDBStatement) statement).getStmtId(),
- client,
+ getResultSetClient(client, queryId),
sessionId,
dataSet,
((IoTDBStatement) statement).getFetchSizeInternal(),
@@ -179,6 +182,11 @@ public class IoTDBJDBCResultSet implements ResultSet {
}
}
+ private static IClientRPCService.Iface getResultSetClient(
+ IClientRPCService.Iface client, long queryId) {
+ return queryId == -1 ? null : client;
+ }
+
@Override
public boolean isWrapperFor(Class<?> iface) throws SQLException {
checkOpen();
@@ -224,11 +232,13 @@ public class IoTDBJDBCResultSet implements ResultSet {
}
try {
ioTDBRpcDataSet.close();
- explicitlyClosed = true;
+ statement.clearQueryId(queryId);
} catch (StatementExecutionException e) {
throw new SQLException(JdbcMessages.CLOSE_SERVER_SIDE_ERROR, e);
} catch (TException e) {
throw new SQLException(JdbcMessages.CLOSE_CONNECTING_ERROR, e);
+ } finally {
+ explicitlyClosed = true;
}
}
@@ -871,7 +881,7 @@ public class IoTDBJDBCResultSet implements ResultSet {
@Override
public boolean isClosed() {
- return ioTDBRpcDataSet.isClosed();
+ return explicitlyClosed;
}
private void checkOpen() throws SQLException {
@@ -1387,46 +1397,60 @@ public class IoTDBJDBCResultSet implements ResultSet {
}
}
- public boolean isSetTracingInfo() {
+ public boolean isSetTracingInfo() throws SQLException {
+ checkOpen();
if (ioTDBRpcTracingInfo == null) {
return false;
}
return ioTDBRpcTracingInfo.isSetTracingInfo();
}
- public List<String> getActivityList() {
+ public List<String> getActivityList() throws SQLException {
+ checkOpen();
return ioTDBRpcTracingInfo.getActivityList();
}
- public List<Long> getElapsedTimeList() {
+ public List<Long> getElapsedTimeList() throws SQLException {
+ checkOpen();
return ioTDBRpcTracingInfo.getElapsedTimeList();
}
public long getStatisticsByName(String name) throws Exception {
+ checkOpen();
return ioTDBRpcTracingInfo.getStatisticsByName(name);
}
public String getStatisticsInfoByName(String name) throws Exception {
+ checkOpen();
return ioTDBRpcTracingInfo.getStatisticsInfoByName(name);
}
- public boolean isIgnoreTimeStamp() {
+ public boolean isIgnoreTimeStamp() throws SQLException {
+ checkOpen();
return ioTDBRpcDataSet.isIgnoreTimeStamp();
}
- public String getOperationType() {
+ public String getOperationType() throws SQLException {
+ checkOpen();
return this.operationType;
}
- public List<String> getColumns() {
+ public List<String> getColumns() throws SQLException {
+ checkOpen();
return this.columns;
}
- public List<String> getSgColumns() {
+ public List<String> getSgColumns() throws SQLException {
+ checkOpen();
return sgColumns;
}
- public TSDataType getColumnTypeByIndex(int columnIndex) {
- return ioTDBRpcDataSet.getDataType(columnIndex);
+ public TSDataType getColumnTypeByIndex(int columnIndex) throws SQLException {
+ checkOpen();
+ try {
+ return ioTDBRpcDataSet.getDataType(columnIndex);
+ } catch (IllegalArgumentException | IndexOutOfBoundsException e) {
+ throw new SQLException(e.getMessage());
+ }
}
}
diff --git
a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBPreparedStatement.java
b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBPreparedStatement.java
index 46ef522a6e8..2f6b85adbdc 100644
---
a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBPreparedStatement.java
+++
b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBPreparedStatement.java
@@ -84,6 +84,18 @@ public class IoTDBPreparedStatement extends IoTDBStatement
implements PreparedSt
ZoneId zoneId,
Charset charset)
throws SQLException {
+ this(connection, client, sessionId, requireNonNullSql(sql), zoneId,
charset, true);
+ }
+
+ private IoTDBPreparedStatement(
+ IoTDBConnection connection,
+ Iface client,
+ Long sessionId,
+ String sql,
+ ZoneId zoneId,
+ Charset charset,
+ boolean validated)
+ throws SQLException {
super(connection, client, sessionId, zoneId, charset);
this.sql = sql;
this.parameterCount = splitSqlStatement(sql).size() - 1;
@@ -93,9 +105,14 @@ public class IoTDBPreparedStatement extends IoTDBStatement
implements PreparedSt
IoTDBPreparedStatement(
IoTDBConnection connection, Iface client, Long sessionId, String sql,
ZoneId zoneId)
throws SQLException {
- super(connection, client, sessionId, zoneId, TSFileConfig.STRING_CHARSET);
- this.sql = sql;
- this.parameterCount = splitSqlStatement(sql).size() - 1;
+ this(connection, client, sessionId, sql, zoneId,
TSFileConfig.STRING_CHARSET);
+ }
+
+ private static String requireNonNullSql(String sql) throws SQLException {
+ if (sql == null) {
+ throw new SQLException("SQL statement cannot be null");
+ }
+ return sql;
}
@Override
@@ -113,18 +130,21 @@ public class IoTDBPreparedStatement extends
IoTDBStatement implements PreparedSt
@Override
public boolean execute() throws SQLException {
checkConnection("execute");
+ closeCurrentResultSet();
return super.execute(createCompleteSql(sql, parameters));
}
@Override
public ResultSet executeQuery() throws SQLException {
checkConnection("executeQuery");
+ closeCurrentResultSet();
return super.executeQuery(createCompleteSql(sql, parameters));
}
@Override
public int executeUpdate() throws SQLException {
checkConnection("executeUpdate");
+ closeCurrentResultSet();
return super.executeUpdate(createCompleteSql(sql, parameters));
}
diff --git
a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
index e2fa67aa2d8..d2818d60b58 100644
--- a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
+++ b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
@@ -240,7 +240,7 @@ public class IoTDBStatement implements Statement {
warningChain = null;
}
- private void closeClientOperation() throws SQLException {
+ protected void closeClientOperation() throws SQLException {
try {
if (stmtId != -1) {
TSCloseOperationReq closeReq = new TSCloseOperationReq(sessionId);
@@ -254,14 +254,65 @@ public class IoTDBStatement implements Statement {
}
}
+ protected void closeQueryOperation(long queryIdToClose) throws SQLException {
+ try {
+ if (queryIdToClose != -1) {
+ TSCloseOperationReq closeReq = new TSCloseOperationReq(sessionId);
+ closeReq.setStatementId(stmtId);
+ closeReq.setQueryId(queryIdToClose);
+ TSStatus closeResp = client.closeOperation(closeReq);
+ RpcUtils.verifySuccess(closeResp);
+ if (queryId == queryIdToClose) {
+ queryId = -1;
+ }
+ }
+ } catch (Exception e) {
+ throw new SQLException(JdbcMessages.CLOSE_STATEMENT_ERROR, e);
+ }
+ }
+
+ protected void clearQueryId(long closedQueryId) {
+ if (queryId == closedQueryId) {
+ queryId = -1;
+ }
+ }
+
+ protected void setQueryId(long queryId) {
+ this.queryId = queryId;
+ }
+
@Override
public void close() throws SQLException {
- if (isClosed) {
+ if (isClosed()) {
+ unregisterStatement();
return;
}
- closeClientOperation();
- isClosed = true;
+ SQLException closeException = null;
+ try {
+ closeCurrentResultSet();
+ } catch (SQLException e) {
+ closeException = mergeSQLException(closeException, e);
+ }
+
+ boolean clientOperationClosed = false;
+ try {
+ closeClientOperation();
+ clientOperationClosed = true;
+ } catch (SQLException e) {
+ closeException = mergeSQLException(closeException, e);
+ }
+
+ if (clientOperationClosed) {
+ isClosed = true;
+ }
+ if (isClosed) {
+ unregisterStatement();
+ }
+
+ if (closeException != null) {
+ throw closeException;
+ }
}
@Override
@@ -284,6 +335,7 @@ public class IoTDBStatement implements Statement {
@Override
public boolean execute(String sql) throws SQLException {
checkConnection("execute");
+ closeCurrentResultSet();
try {
return executeSQL(sql);
} catch (TException e) {
@@ -393,38 +445,48 @@ public class IoTDBStatement implements Statement {
}
if (execResp.isSetColumns()) {
- queryId = execResp.getQueryId();
+ queryId = execResp.isSetQueryId() ? execResp.getQueryId() : -1;
if (execResp.queryResult == null) {
- throw new SQLException(JdbcMessages.QUERY_RESULT_SHOULD_NOT_BE_NULL);
+ SQLException exception = new
SQLException(JdbcMessages.QUERY_RESULT_SHOULD_NOT_BE_NULL);
+ throw closeQueryOperationOnResultSetCreationFailure(queryId,
exception);
} else {
- this.resultSet =
- new IoTDBJDBCResultSet(
- this,
- execResp.getColumns(),
- execResp.getDataTypeList(),
- execResp.columnNameIndexMap,
- execResp.isIgnoreTimeStamp(),
- client,
- sql,
- queryId,
- sessionId,
- execResp.queryResult,
- execResp.tracingInfo,
- execReq.timeout,
- execResp.moreData,
- zoneId,
- charset,
- execResp.isSetTableModel() && execResp.isTableModel(),
- execResp.getColumnIndex2TsBlockColumnIndexList());
+ try {
+ this.resultSet =
+ new IoTDBJDBCResultSet(
+ this,
+ execResp.getColumns(),
+ execResp.getDataTypeList(),
+ execResp.columnNameIndexMap,
+ execResp.isIgnoreTimeStamp(),
+ client,
+ sql,
+ queryId,
+ sessionId,
+ execResp.queryResult,
+ execResp.tracingInfo,
+ execReq.timeout,
+ execResp.moreData,
+ zoneId,
+ charset,
+ execResp.isSetTableModel() && execResp.isTableModel(),
+ execResp.getColumnIndex2TsBlockColumnIndexList());
+ } catch (SQLException | RuntimeException e) {
+ throw closeQueryOperationOnResultSetCreationFailure(queryId, e);
+ }
}
return true;
}
+ if (execResp.isSetQueryId()) {
+ queryId = execResp.getQueryId();
+ closeQueryOperation(queryId);
+ }
return false;
}
@Override
public int[] executeBatch() throws SQLException {
checkConnection("executeBatch");
+ closeCurrentResultSet();
try {
return executeBatchSQL();
} catch (TException e) {
@@ -487,6 +549,7 @@ public class IoTDBStatement implements Statement {
public ResultSet executeQuery(String sql, long timeoutInMS) throws
SQLException {
checkConnection("execute query");
+ closeCurrentResultSet();
try {
return executeQuerySQL(sql, timeoutInMS);
} catch (TException e) {
@@ -509,7 +572,7 @@ public class IoTDBStatement implements Statement {
TSExecuteStatementResp execResp =
callWithRetryAndReconnect(
() -> client.executeQueryStatementV2(execReq),
TSExecuteStatementResp::getStatus);
- queryId = execResp.getQueryId();
+ queryId = execResp.isSetQueryId() ? execResp.getQueryId() : -1;
try {
RpcUtils.verifySuccess(execResp.getStatus());
} catch (StatementExecutionException e) {
@@ -517,27 +580,32 @@ public class IoTDBStatement implements Statement {
}
if (!execResp.isSetQueryResult()) {
- throw new SQLException(JdbcMessages.QUERY_RESULT_SHOULD_NOT_BE_NULL);
+ SQLException exception = new
SQLException(JdbcMessages.QUERY_RESULT_SHOULD_NOT_BE_NULL);
+ throw closeQueryOperationOnResultSetCreationFailure(queryId, exception);
} else {
- this.resultSet =
- new IoTDBJDBCResultSet(
- this,
- execResp.getColumns(),
- execResp.getDataTypeList(),
- execResp.columnNameIndexMap,
- execResp.isIgnoreTimeStamp(),
- client,
- sql,
- queryId,
- sessionId,
- execResp.getQueryResult(),
- execResp.tracingInfo,
- execReq.timeout,
- execResp.moreData,
- zoneId,
- charset,
- execResp.isSetTableModel() && execResp.isTableModel(),
- execResp.getColumnIndex2TsBlockColumnIndexList());
+ try {
+ this.resultSet =
+ new IoTDBJDBCResultSet(
+ this,
+ execResp.getColumns(),
+ execResp.getDataTypeList(),
+ execResp.columnNameIndexMap,
+ execResp.isIgnoreTimeStamp(),
+ client,
+ sql,
+ queryId,
+ sessionId,
+ execResp.getQueryResult(),
+ execResp.tracingInfo,
+ execReq.timeout,
+ execResp.moreData,
+ zoneId,
+ charset,
+ execResp.isSetTableModel() && execResp.isTableModel(),
+ execResp.getColumnIndex2TsBlockColumnIndexList());
+ } catch (SQLException | RuntimeException e) {
+ throw closeQueryOperationOnResultSetCreationFailure(queryId, e);
+ }
}
return resultSet;
}
@@ -553,6 +621,7 @@ public class IoTDBStatement implements Statement {
@Override
public int executeUpdate(String sql) throws SQLException {
checkConnection("execute update");
+ closeCurrentResultSet();
try {
return executeUpdateSQL(sql);
} catch (TException e) {
@@ -583,14 +652,15 @@ public class IoTDBStatement implements Statement {
final TSExecuteStatementResp execResp =
callWithRetryAndReconnect(
() -> client.executeUpdateStatement(execReq),
TSExecuteStatementResp::getStatus);
- if (execResp.isSetQueryId()) {
- queryId = execResp.getQueryId();
- }
try {
RpcUtils.verifySuccess(execResp.getStatus());
} catch (final StatementExecutionException e) {
throw new IoTDBSQLException(e.getMessage(), execResp.getStatus());
}
+ if (execResp.isSetQueryId()) {
+ queryId = execResp.getQueryId();
+ closeQueryOperation(queryId);
+ }
return 0;
}
@@ -667,6 +737,7 @@ public class IoTDBStatement implements Statement {
@Override
public boolean getMoreResults() throws SQLException {
checkConnection("getMoreResults");
+ closeCurrentResultSet();
return false;
}
@@ -735,7 +806,7 @@ public class IoTDBStatement implements Statement {
@Override
public boolean isClosed() {
- return isClosed;
+ return isClosed || connection == null || connection.isClosed();
}
@Override
@@ -768,6 +839,42 @@ public class IoTDBStatement implements Statement {
}
}
+ protected void closeCurrentResultSet() throws SQLException {
+ if (resultSet != null) {
+ ResultSet currentResultSet = resultSet;
+ resultSet = null;
+ currentResultSet.close();
+ }
+ }
+
+ protected static SQLException mergeSQLException(SQLException current,
SQLException next) {
+ if (current == null) {
+ return next;
+ }
+ current.addSuppressed(next);
+ return current;
+ }
+
+ protected SQLException closeQueryOperationOnResultSetCreationFailure(
+ long queryIdToClose, Exception failure) {
+ SQLException resultSetCreationException =
+ failure instanceof SQLException
+ ? (SQLException) failure
+ : new SQLException(failure.getMessage(), failure);
+ try {
+ closeQueryOperation(queryIdToClose);
+ } catch (SQLException closeException) {
+ resultSetCreationException.addSuppressed(closeException);
+ }
+ return resultSetCreationException;
+ }
+
+ private void unregisterStatement() {
+ if (connection != null) {
+ connection.unregisterStatement(this);
+ }
+ }
+
private SQLException unsupportedOperation(String action, String message)
throws SQLException {
checkConnection(action);
return new SQLException(message);
diff --git
a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBTablePreparedStatement.java
b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBTablePreparedStatement.java
index 8003952ca59..59a88fbedc6 100644
---
a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBTablePreparedStatement.java
+++
b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBTablePreparedStatement.java
@@ -91,6 +91,18 @@ public class IoTDBTablePreparedStatement extends
IoTDBStatement implements Prepa
ZoneId zoneId,
Charset charset)
throws SQLException {
+ this(connection, client, sessionId, requireNonNullSql(sql), zoneId,
charset, true);
+ }
+
+ private IoTDBTablePreparedStatement(
+ IoTDBConnection connection,
+ Iface client,
+ Long sessionId,
+ String sql,
+ ZoneId zoneId,
+ Charset charset,
+ boolean validated)
+ throws SQLException {
super(connection, client, sessionId, zoneId, charset);
this.sql = sql;
this.preparedStatementName = generateStatementName();
@@ -115,7 +127,14 @@ public class IoTDBTablePreparedStatement extends
IoTDBStatement implements Prepa
parameterTypes[i] = Types.NULL;
}
} catch (TException | StatementExecutionException e) {
- throw new SQLException(JdbcMessages.FAILED_TO_PREPARE_STATEMENT +
e.getMessage(), e);
+ SQLException prepareException =
+ new SQLException(JdbcMessages.FAILED_TO_PREPARE_STATEMENT +
e.getMessage(), e);
+ try {
+ closeClientOperation();
+ } catch (SQLException closeException) {
+ prepareException.addSuppressed(closeException);
+ }
+ throw prepareException;
}
} else {
// For non-query statements, only keep text parameters for client-side
substitution.
@@ -133,6 +152,13 @@ public class IoTDBTablePreparedStatement extends
IoTDBStatement implements Prepa
this(connection, client, sessionId, sql, zoneId,
TSFileConfig.STRING_CHARSET);
}
+ private static String requireNonNullSql(String sql) throws SQLException {
+ if (sql == null) {
+ throw new SQLException("SQL statement cannot be null");
+ }
+ return sql;
+ }
+
private String generateStatementName() {
// StatementId is unique in directly connected DataNode
return "jdbc_ps_" + getStmtId();
@@ -160,9 +186,9 @@ public class IoTDBTablePreparedStatement extends
IoTDBStatement implements Prepa
public boolean execute() throws SQLException {
checkConnection("execute");
if (isQueryStatement(sql)) {
- TSExecuteStatementResp resp = executeInternal();
- return resp.isSetQueryDataSet() || resp.isSetQueryResult();
+ return processQueryResult(executeInternal()) != null;
} else {
+ closeCurrentResultSet();
return super.execute(createCompleteSql(sql, parameters));
}
}
@@ -185,10 +211,12 @@ public class IoTDBTablePreparedStatement extends
IoTDBStatement implements Prepa
@Override
public int executeUpdate() throws SQLException {
checkConnection("executeUpdate");
+ closeCurrentResultSet();
return super.executeUpdate(createCompleteSql(sql, parameters));
}
private TSExecuteStatementResp executeInternal() throws SQLException {
+ closeCurrentResultSet();
// Validate all parameters are set
for (int i = 0; i < parameterCount; i++) {
if (parameterTypes[i] == Types.NULL
@@ -218,31 +246,51 @@ public class IoTDBTablePreparedStatement extends
IoTDBStatement implements Prepa
}
private ResultSet processQueryResult(TSExecuteStatementResp resp) throws
SQLException {
+ long queryId = resp.isSetQueryId() ? resp.getQueryId() : -1;
if (resp.isSetQueryDataSet() || resp.isSetQueryResult()) {
- this.resultSet =
- new IoTDBJDBCResultSet(
- this,
- resp.getColumns(),
- resp.getDataTypeList(),
- resp.columnNameIndexMap,
- resp.ignoreTimeStamp,
- client,
- sql,
- resp.queryId,
- sessionId,
- resp.queryResult,
- resp.tracingInfo,
- (long) queryTimeout * 1000,
- resp.isSetMoreData() && resp.isMoreData(),
- zoneId);
+ setQueryId(queryId);
+ if (resp.queryResult == null) {
+ SQLException exception = new
SQLException(JdbcMessages.QUERY_RESULT_SHOULD_NOT_BE_NULL);
+ throw closeQueryOperationOnResultSetCreationFailure(queryId,
exception);
+ }
+ try {
+ this.resultSet =
+ new IoTDBJDBCResultSet(
+ this,
+ resp.getColumns(),
+ resp.getDataTypeList(),
+ resp.columnNameIndexMap,
+ resp.ignoreTimeStamp,
+ client,
+ sql,
+ queryId,
+ sessionId,
+ resp.queryResult,
+ resp.tracingInfo,
+ (long) queryTimeout * 1000,
+ resp.isSetMoreData() && resp.isMoreData(),
+ zoneId);
+ } catch (SQLException | RuntimeException e) {
+ throw closeQueryOperationOnResultSetCreationFailure(queryId, e);
+ }
return resultSet;
}
+ if (queryId != -1) {
+ setQueryId(queryId);
+ closeQueryOperation(queryId);
+ }
return null;
}
@Override
public void close() throws SQLException {
+ SQLException closeException = null;
if (!isClosed() && serverSidePrepared) {
+ try {
+ closeCurrentResultSet();
+ } catch (SQLException e) {
+ closeException = mergeSQLException(closeException, e);
+ }
// Deallocate prepared statement on server only if it was prepared
server-side
TSDeallocatePreparedReq req = new TSDeallocatePreparedReq();
req.setSessionId(sessionId);
@@ -257,7 +305,14 @@ public class IoTDBTablePreparedStatement extends
IoTDBStatement implements Prepa
logger.warn(JdbcMessages.ERROR_DEALLOCATING_PREPARED_STATEMENT, e);
}
}
- super.close();
+ try {
+ super.close();
+ } catch (SQLException e) {
+ closeException = mergeSQLException(closeException, e);
+ }
+ if (closeException != null) {
+ throw closeException;
+ }
}
@Override
diff --git
a/iotdb-client/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBConnectionTest.java
b/iotdb-client/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBConnectionTest.java
index 6ec7f2c27c5..078b5432f46 100644
---
a/iotdb-client/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBConnectionTest.java
+++
b/iotdb-client/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBConnectionTest.java
@@ -23,7 +23,10 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
import org.apache.iotdb.service.rpc.thrift.ServerProperties;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
+import org.apache.iotdb.service.rpc.thrift.TSPrepareReq;
import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
import org.apache.thrift.TException;
@@ -37,10 +40,12 @@ import org.mockito.MockitoAnnotations;
import java.lang.reflect.Field;
import java.sql.Connection;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLClientInfoException;
import java.sql.SQLException;
import java.sql.Savepoint;
+import java.sql.Statement;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
@@ -53,7 +58,10 @@ import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -144,6 +152,60 @@ public class IoTDBConnectionTest {
assertThrows(SQLException.class, () -> tableConnection.setSchema(null));
}
+ @Test
+ public void testTableCatalogAndSchemaCloseUseStatements() throws Exception {
+ IoTDBConnection tableConnection =
+ new IoTDBConnection() {
+ @Override
+ public String getSqlDialect() {
+ return Constant.TABLE_DIALECT;
+ }
+ };
+ openConnection(tableConnection);
+ tableConnection.setClient(client);
+ TSExecuteStatementResp resp = mock(TSExecuteStatementResp.class);
+ when(client.requestStatementId(anyLong())).thenReturn(1L, 2L);
+
when(client.executeStatementV2(any(TSExecuteStatementReq.class))).thenReturn(resp);
+ when(resp.getStatus()).thenReturn(successStatus);
+ when(client.closeOperation(any())).thenReturn(successStatus);
+
+ tableConnection.setSchema("root");
+ tableConnection.setCatalog("root2");
+
+ verify(client, times(2)).closeOperation(any());
+ }
+
+ @Test
+ public void testPrepareStatementRejectsNullSqlBeforeRequestingStatementId()
throws Exception {
+ openConnection(connection);
+ connection.setClient(client);
+
+ assertThrows(SQLException.class, () -> connection.prepareStatement(null));
+
+ verify(client, never()).requestStatementId(anyLong());
+ verify(client, never()).closeOperation(any());
+ }
+
+ @Test
+ public void
testTablePrepareStatementRejectsNullSqlBeforeRequestingStatementId()
+ throws Exception {
+ IoTDBConnection tableConnection =
+ new IoTDBConnection() {
+ @Override
+ public String getSqlDialect() {
+ return Constant.TABLE_DIALECT;
+ }
+ };
+ openConnection(tableConnection);
+ tableConnection.setClient(client);
+
+ assertThrows(SQLException.class, () ->
tableConnection.prepareStatement(null));
+
+ verify(client, never()).requestStatementId(anyLong());
+ verify(client, never()).prepareStatement(any(TSPrepareReq.class));
+ verify(client, never()).closeOperation(any());
+ }
+
@Test
public void testGetServerProperties() throws TException {
openConnection(connection);
@@ -308,6 +370,7 @@ public class IoTDBConnectionTest {
assertThrows(SQLException.class, () -> connection.rollback());
assertThrows(SQLException.class, () -> connection.rollback((Savepoint)
null));
assertThrows(SQLException.class, () -> connection.setNetworkTimeout(null,
0));
+ assertThrows(SQLException.class, () -> connection.getQueryTimeout());
assertThrows(SQLException.class, () -> connection.setQueryTimeout(60));
assertThrows(SQLException.class, () -> connection.setSavepoint());
assertThrows(SQLException.class, () -> connection.setSavepoint("s"));
@@ -324,6 +387,64 @@ public class IoTDBConnectionTest {
verify(transport, never()).close();
}
+ @Test
+ public void testCloseClosesCreatedStatements() throws Exception {
+ openConnection(connection);
+ connection.setClient(client);
+ when(client.requestStatementId(anyLong())).thenReturn(1L, 2L);
+ when(client.closeOperation(any())).thenReturn(successStatus);
+ when(client.closeSession(any())).thenReturn(successStatus);
+
+ Statement statement = connection.createStatement();
+ PreparedStatement preparedStatement = connection.prepareStatement("SELECT
?");
+
+ connection.close();
+
+ assertTrue(connection.isClosed());
+ assertTrue(statement.isClosed());
+ assertTrue(preparedStatement.isClosed());
+ verify(client, times(2)).closeOperation(any());
+ verify(client).closeSession(any());
+ }
+
+ @Test
+ public void testCloseClosesCurrentResultSetFromCreatedStatement() throws
Exception {
+ openConnection(connection);
+ connection.setClient(client);
+ when(client.requestStatementId(anyLong())).thenReturn(1L);
+ when(client.closeOperation(any())).thenReturn(successStatus);
+ when(client.closeSession(any())).thenReturn(successStatus);
+
+ IoTDBStatement statement = (IoTDBStatement) connection.createStatement();
+ ResultSet resultSet = mock(ResultSet.class);
+ statement.resultSet = resultSet;
+
+ connection.close();
+
+ assertTrue(statement.isClosed());
+ verify(resultSet).close();
+ verify(client).closeOperation(any());
+ verify(client).closeSession(any());
+ }
+
+ @Test
+ public void testClosedStatementIsUnregisteredFromConnection() throws
Exception {
+ openConnection(connection);
+ connection.setClient(client);
+ when(client.requestStatementId(anyLong())).thenReturn(1L);
+ when(client.closeOperation(any())).thenReturn(successStatus);
+ when(client.closeSession(any())).thenReturn(successStatus);
+
+ Statement statement = connection.createStatement();
+ statement.close();
+
+ connection.close();
+
+ assertTrue(statement.isClosed());
+ verify(client, times(1)).closeOperation(any());
+ verify(client).closeSession(any());
+ }
+
private void openConnection(IoTDBConnection target) {
try {
Field isClosedField = IoTDBConnection.class.getDeclaredField("isClosed");
diff --git
a/iotdb-client/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSetTest.java
b/iotdb-client/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSetTest.java
index 46e620d2734..c091695d8b7 100644
---
a/iotdb-client/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSetTest.java
+++
b/iotdb-client/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSetTest.java
@@ -21,7 +21,9 @@ package org.apache.iotdb.jdbc;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
+import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
@@ -198,6 +200,8 @@ public class IoTDBJDBCResultSetTest {
Assert.assertThrows(SQLException.class, () -> resultSet.getString(0));
Assert.assertThrows(SQLException.class, () ->
resultSet.getObject("missing"));
Assert.assertThrows(SQLException.class, () ->
resultSet.getTimestamp("missing"));
+ Assert.assertThrows(
+ SQLException.class, () -> ((IoTDBJDBCResultSet)
resultSet).getColumnTypeByIndex(0));
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
// check columnInfoList
@@ -316,6 +320,14 @@ public class IoTDBJDBCResultSetTest {
Assert.assertThrows(SQLException.class, () -> resultSet.getType());
Assert.assertThrows(SQLException.class, () -> resultSet.getWarnings());
Assert.assertThrows(SQLException.class, () -> resultSet.wasNull());
+ Assert.assertThrows(
+ SQLException.class, () -> ((IoTDBJDBCResultSet)
resultSet).isSetTracingInfo());
+ Assert.assertThrows(
+ SQLException.class, () -> ((IoTDBJDBCResultSet)
resultSet).isIgnoreTimeStamp());
+ Assert.assertThrows(
+ SQLException.class, () -> ((IoTDBJDBCResultSet)
resultSet).getColumnTypeByIndex(1));
+ Assert.assertThrows(
+ SQLException.class, () -> ((IoTDBJDBCResultSet)
resultSet).getOperationType());
SQLException unsupportedException =
Assert.assertThrows(SQLException.class, () -> resultSet.absolute(1));
@@ -325,6 +337,158 @@ public class IoTDBJDBCResultSetTest {
Assert.assertEquals("ResultSet has been closed",
unsupportedException.getMessage());
}
+ @SuppressWarnings("resource")
+ @Test
+ public void testLocalResultSetCloseDoesNotCloseServerOperation() throws
Exception {
+ ResultSet resultSet =
+ new IoTDBJDBCResultSet(
+ statement,
+ Collections.singletonList("s1"),
+ Collections.singletonList("INT32"),
+ Collections.singletonMap("s1", 0),
+ true,
+ client,
+ null,
+ -1,
+ sessionId,
+ Collections.<ByteBuffer>emptyList(),
+ null,
+ (long) 60 * 1000,
+ false,
+ zoneID);
+
+ resultSet.close();
+
+ Assert.assertTrue(resultSet.isClosed());
+ verify(client, times(0)).closeOperation(any(TSCloseOperationReq.class));
+ }
+
+ @SuppressWarnings("resource")
+ @Test
+ public void testResultSetCloseClearsStatementQueryId() throws Exception {
+ mockVehicleQueryResponse();
+
+ Assert.assertTrue(statement.execute("select * from root.vehicle.d0"));
+
+ ResultSet resultSet = statement.getResultSet();
+ resultSet.close();
+ statement.cancel();
+
+ Assert.assertTrue(resultSet.isClosed());
+ verify(client, times(0)).cancelOperation(any(TSCancelOperationReq.class));
+ }
+
+ @SuppressWarnings("resource")
+ @Test
+ public void testStatementCloseClosesCurrentResultSet() throws Exception {
+ mockVehicleQueryResponse();
+
+ Assert.assertTrue(statement.execute("select * from root.vehicle.d0"));
+
+ ResultSet resultSet = statement.getResultSet();
+ Assert.assertFalse(resultSet.isClosed());
+
+ statement.close();
+
+ Assert.assertTrue(statement.isClosed());
+ Assert.assertTrue(resultSet.isClosed());
+ Assert.assertThrows(SQLException.class, () -> resultSet.next());
+ }
+
+ @SuppressWarnings("resource")
+ @Test
+ public void testFailedCloseStillMarksResultSetClosed() throws Exception {
+ mockVehicleQueryResponse();
+
+ Assert.assertTrue(statement.execute("select * from root.vehicle.d0"));
+
+ TSStatus closeFailure = new
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+ closeFailure.setMessage("close failed");
+
when(client.closeOperation(any(TSCloseOperationReq.class))).thenReturn(closeFailure);
+
+ ResultSet resultSet = statement.getResultSet();
+ Assert.assertThrows(SQLException.class, resultSet::close);
+
+ Assert.assertTrue(resultSet.isClosed());
+ Assert.assertThrows(SQLException.class, resultSet::next);
+ }
+
+ @SuppressWarnings("resource")
+ @Test
+ public void testStatementExecutionClosesPreviousResultSet() throws Exception
{
+ mockVehicleQueryResponse();
+
+ Assert.assertTrue(statement.execute("select * from root.vehicle.d0"));
+
+ ResultSet previousResultSet = statement.getResultSet();
+ Assert.assertFalse(previousResultSet.isClosed());
+
+ mockVehicleQueryResponse();
+ execResp.queryResult = FakedFirstFetchTsBlockResult();
+
+ Assert.assertTrue(statement.execute("select * from root.vehicle.d0"));
+
+ Assert.assertTrue(previousResultSet.isClosed());
+ Assert.assertThrows(SQLException.class, () -> previousResultSet.next());
+
+ ResultSet currentResultSet = statement.getResultSet();
+ Assert.assertNotSame(previousResultSet, currentResultSet);
+ Assert.assertFalse(currentResultSet.isClosed());
+
+ currentResultSet.close();
+ }
+
+ @SuppressWarnings("resource")
+ @Test
+ public void testExecuteUpdateClosesCurrentResultSet() throws Exception {
+ mockVehicleQueryResponse();
+
when(client.executeUpdateStatement(any(TSExecuteStatementReq.class))).thenReturn(execResp);
+
+ Assert.assertTrue(statement.execute("select * from root.vehicle.d0"));
+
+ ResultSet resultSet = statement.getResultSet();
+ Assert.assertFalse(resultSet.isClosed());
+
+ Assert.assertEquals(0, statement.executeUpdate("insert into
root.sg.d(time,s) values(1,1)"));
+
+ Assert.assertTrue(resultSet.isClosed());
+ Assert.assertNull(statement.getResultSet());
+ }
+
+ @SuppressWarnings("resource")
+ @Test
+ public void testGetMoreResultsClosesCurrentResultSet() throws Exception {
+ mockVehicleQueryResponse();
+
+ Assert.assertTrue(statement.execute("select * from root.vehicle.d0"));
+
+ ResultSet resultSet = statement.getResultSet();
+ Assert.assertFalse(resultSet.isClosed());
+
+ Assert.assertFalse(statement.getMoreResults());
+
+ Assert.assertTrue(resultSet.isClosed());
+ Assert.assertNull(statement.getResultSet());
+ }
+
+ @SuppressWarnings("resource")
+ @Test
+ public void testExhaustedResultSetIsNotReportedClosed() throws Exception {
+ mockTextQueryResponse();
+
+ Assert.assertTrue(statement.execute("select s3 from root.vehicle.d0"));
+
+ ResultSet resultSet = statement.getResultSet();
+ Assert.assertTrue(resultSet.next());
+ Assert.assertFalse(resultSet.next());
+ Assert.assertFalse(resultSet.isClosed());
+ Assert.assertEquals(ResultSet.TYPE_FORWARD_ONLY, resultSet.getType());
+
+ resultSet.close();
+
+ Assert.assertTrue(resultSet.isClosed());
+ }
+
@SuppressWarnings("resource")
@Test
public void testInvalidBigDecimalConversionThrowsSQLException() throws
Exception {
diff --git
a/iotdb-client/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBPreparedStatementTest.java
b/iotdb-client/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBPreparedStatementTest.java
index 9e94cefeaab..349692274ba 100644
---
a/iotdb-client/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBPreparedStatementTest.java
+++
b/iotdb-client/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBPreparedStatementTest.java
@@ -35,6 +35,7 @@ import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.sql.Date;
import java.sql.ParameterMetaData;
+import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
@@ -48,6 +49,9 @@ import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -73,6 +77,17 @@ public class IoTDBPreparedStatementTest {
when(client.closeOperation(any())).thenReturn(Status_SUCCESS);
}
+ @SuppressWarnings("resource")
+ @Test
+ public void testConstructorRejectsNullSqlBeforeRequestingStatementId()
throws Exception {
+ assertThrows(
+ SQLException.class,
+ () -> new IoTDBPreparedStatement(connection, client, sessionId, null,
zoneId));
+
+ verify(client, never()).requestStatementId(anyLong());
+ verify(client, never()).closeOperation(any());
+ }
+
@SuppressWarnings("resource")
@Test
public void testNonParameterized() throws Exception {
@@ -187,6 +202,20 @@ public class IoTDBPreparedStatementTest {
assertThrows(SQLException.class, () -> ps.execute());
}
+ @SuppressWarnings("resource")
+ @Test
+ public void executeWithUnsetParameterClosesPreviousResultSet() throws
SQLException {
+ IoTDBPreparedStatement ps =
+ new IoTDBPreparedStatement(connection, client, sessionId, "SELECT ?",
zoneId);
+ ResultSet previousResultSet = mock(ResultSet.class);
+ ps.resultSet = previousResultSet;
+
+ assertThrows(SQLException.class, ps::execute);
+
+ verify(previousResultSet).close();
+ assertNull(ps.resultSet);
+ }
+
@SuppressWarnings("resource")
@Test
public void oneIntArgument() throws Exception {
diff --git
a/iotdb-client/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBStatementTest.java
b/iotdb-client/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBStatementTest.java
index 892a3365157..0b4fbbb191d 100644
---
a/iotdb-client/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBStatementTest.java
+++
b/iotdb-client/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBStatementTest.java
@@ -21,6 +21,10 @@ package org.apache.iotdb.jdbc;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.service.rpc.thrift.IClientRPCService.Iface;
+import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
+import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp;
@@ -29,20 +33,28 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
+import java.nio.ByteBuffer;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.ZoneId;
+import java.util.Collections;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class IoTDBStatementTest {
@@ -169,8 +181,121 @@ public class IoTDBStatementTest {
assertTrue(unsupportedException.getMessage().contains("statement has been
closed"));
}
+ @Test
+ public void testCloseStillClosesOperationWhenResultSetCloseFails() throws
Exception {
+ when(client.closeOperation(any())).thenReturn(RpcUtils.SUCCESS_STATUS);
+ IoTDBStatement statement = new IoTDBStatement(connection, client,
sessionId, zoneID, 0, 1L);
+ ResultSet resultSet = mock(ResultSet.class);
+ doThrow(new SQLException("result set close
failed")).when(resultSet).close();
+ statement.resultSet = resultSet;
+
+ SQLException closeException = assertThrows(SQLException.class,
statement::close);
+
+ assertEquals("result set close failed", closeException.getMessage());
+ assertTrue(statement.isClosed());
+ assertNull(statement.resultSet);
+ verify(client).closeOperation(any());
+ }
+
+ @Test
+ public void testExecuteClosesQueryOperationWhenResultSetCreationFails()
throws Exception {
+ long queryId = 10L;
+ IoTDBStatement statement = new IoTDBStatement(connection, client,
sessionId, zoneID, 0, 1L);
+ TSExecuteStatementResp resp = malformedQueryResponse(queryId);
+
when(client.executeStatementV2(any(TSExecuteStatementReq.class))).thenReturn(resp);
+
when(client.closeOperation(any(TSCloseOperationReq.class))).thenReturn(RpcUtils.SUCCESS_STATUS);
+
+ assertThrows(SQLException.class, () -> statement.execute("select s1 from
root.sg.d"));
+
+ ArgumentCaptor<TSCloseOperationReq> closeReq =
+ ArgumentCaptor.forClass(TSCloseOperationReq.class);
+ verify(client).closeOperation(closeReq.capture());
+ assertEquals(1L, closeReq.getValue().getStatementId());
+ assertEquals(queryId, closeReq.getValue().getQueryId());
+ assertNull(statement.resultSet);
+ }
+
+ @Test
+ public void testExecuteQueryClosesQueryOperationWhenResultSetCreationFails()
throws Exception {
+ long queryId = 11L;
+ IoTDBStatement statement = new IoTDBStatement(connection, client,
sessionId, zoneID, 0, 2L);
+ TSExecuteStatementResp resp = malformedQueryResponse(queryId);
+
when(client.executeQueryStatementV2(any(TSExecuteStatementReq.class))).thenReturn(resp);
+
when(client.closeOperation(any(TSCloseOperationReq.class))).thenReturn(RpcUtils.SUCCESS_STATUS);
+
+ assertThrows(SQLException.class, () -> statement.executeQuery("select s1
from root.sg.d"));
+
+ ArgumentCaptor<TSCloseOperationReq> closeReq =
+ ArgumentCaptor.forClass(TSCloseOperationReq.class);
+ verify(client).closeOperation(closeReq.capture());
+ assertEquals(2L, closeReq.getValue().getStatementId());
+ assertEquals(queryId, closeReq.getValue().getQueryId());
+ assertNull(statement.resultSet);
+ }
+
+ @Test
+ public void testExecuteClosesUnexpectedQueryOperationForNonResultResponse()
throws Exception {
+ long statementId = 3L;
+ long queryId = 12L;
+ IoTDBStatement statement =
+ new IoTDBStatement(connection, client, sessionId, zoneID, 0,
statementId);
+ TSExecuteStatementResp resp = new TSExecuteStatementResp();
+ resp.setStatus(RpcUtils.SUCCESS_STATUS);
+ resp.setQueryId(queryId);
+
when(client.executeStatementV2(any(TSExecuteStatementReq.class))).thenReturn(resp);
+
when(client.closeOperation(any(TSCloseOperationReq.class))).thenReturn(RpcUtils.SUCCESS_STATUS);
+
+ assertFalse(statement.execute("insert into root.sg.d(time,s)
values(1,1)"));
+
+ ArgumentCaptor<TSCloseOperationReq> closeReq =
+ ArgumentCaptor.forClass(TSCloseOperationReq.class);
+ verify(client).closeOperation(closeReq.capture());
+ assertEquals(statementId, closeReq.getValue().getStatementId());
+ assertEquals(queryId, closeReq.getValue().getQueryId());
+
+ statement.cancel();
+
+ verify(client, never()).cancelOperation(any(TSCancelOperationReq.class));
+ }
+
+ @Test
+ public void testExecuteUpdateClosesUnexpectedQueryOperation() throws
Exception {
+ long statementId = 4L;
+ long queryId = 13L;
+ IoTDBStatement statement =
+ new IoTDBStatement(connection, client, sessionId, zoneID, 0,
statementId);
+ TSExecuteStatementResp resp = new TSExecuteStatementResp();
+ resp.setStatus(RpcUtils.SUCCESS_STATUS);
+ resp.setQueryId(queryId);
+
when(client.executeUpdateStatement(any(TSExecuteStatementReq.class))).thenReturn(resp);
+
when(client.closeOperation(any(TSCloseOperationReq.class))).thenReturn(RpcUtils.SUCCESS_STATUS);
+
+ assertEquals(0, statement.executeUpdate("insert into root.sg.d(time,s)
values(1,1)"));
+
+ ArgumentCaptor<TSCloseOperationReq> closeReq =
+ ArgumentCaptor.forClass(TSCloseOperationReq.class);
+ verify(client).closeOperation(closeReq.capture());
+ assertEquals(statementId, closeReq.getValue().getStatementId());
+ assertEquals(queryId, closeReq.getValue().getQueryId());
+
+ statement.cancel();
+
+ verify(client, never()).cancelOperation(any(TSCancelOperationReq.class));
+ }
+
@Test(expected = SQLException.class)
public void testUnwrapRejectsUnsupportedClass() throws SQLException {
new IoTDBStatement(connection, client, sessionId,
zoneID).unwrap(String.class);
}
+
+ private TSExecuteStatementResp malformedQueryResponse(long queryId) {
+ TSExecuteStatementResp resp = new TSExecuteStatementResp();
+ resp.setStatus(RpcUtils.SUCCESS_STATUS);
+ resp.setQueryId(queryId);
+ resp.setColumns(Collections.singletonList("s1"));
+ resp.setDataTypeList(Collections.emptyList());
+ resp.setQueryResult(Collections.<ByteBuffer>emptyList());
+ resp.setIgnoreTimeStamp(true);
+ return resp;
+ }
}
diff --git
a/iotdb-client/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBTablePreparedStatementTest.java
b/iotdb-client/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBTablePreparedStatementTest.java
index c8c620b0a61..3e3f54e1521 100644
---
a/iotdb-client/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBTablePreparedStatementTest.java
+++
b/iotdb-client/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBTablePreparedStatementTest.java
@@ -24,15 +24,19 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.rpc.stmt.PreparedParameterSerde;
import org.apache.iotdb.rpc.stmt.PreparedParameterSerde.DeserializedParam;
import org.apache.iotdb.service.rpc.thrift.IClientRPCService.Iface;
+import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
+import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
import org.apache.iotdb.service.rpc.thrift.TSExecutePreparedReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
import org.apache.iotdb.service.rpc.thrift.TSPrepareReq;
import org.apache.iotdb.service.rpc.thrift.TSPrepareResp;
+import org.apache.thrift.TException;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
+import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
@@ -43,19 +47,28 @@ import java.io.InputStream;
import java.nio.ByteBuffer;
import java.sql.Date;
import java.sql.ParameterMetaData;
+import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
import java.sql.Types;
import java.time.ZoneId;
+import java.util.Collections;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -75,6 +88,7 @@ public class IoTDBTablePreparedStatementTest {
public void before() throws Exception {
MockitoAnnotations.initMocks(this);
when(connection.getSqlDialect()).thenReturn("table");
+ when(connection.getTimeFactor()).thenReturn(1_000);
when(execStatementResp.getStatus()).thenReturn(Status_SUCCESS);
when(execStatementResp.getQueryId()).thenReturn(queryId);
@@ -131,6 +145,18 @@ public class IoTDBTablePreparedStatementTest {
// ========== Table Model SQL Injection Prevention Tests ==========
+ @SuppressWarnings("resource")
+ @Test
+ public void testConstructorRejectsNullSqlBeforeRequestingStatementId()
throws Exception {
+ assertThrows(
+ SQLException.class,
+ () -> new IoTDBTablePreparedStatement(connection, client, sessionId,
null, zoneId));
+
+ verify(client, never()).requestStatementId(anyLong());
+ verify(client, never()).prepareStatement(any(TSPrepareReq.class));
+ verify(client, never()).closeOperation(any());
+ }
+
@SuppressWarnings("resource")
@Test
public void testParameterMetadataWrapperMethods() throws Exception {
@@ -196,6 +222,184 @@ public class IoTDBTablePreparedStatementTest {
assertTrue(unsupportedException.getMessage().contains("statement has been
closed"));
}
+ @SuppressWarnings("resource")
+ @Test
+ public void testFailedServerSidePrepareClosesStatementOperation() throws
Exception {
+ when(client.prepareStatement(any(TSPrepareReq.class)))
+ .thenThrow(new TException("prepare failed"));
+
+ assertThrows(
+ SQLException.class,
+ () -> new IoTDBTablePreparedStatement(connection, client, sessionId,
"SELECT ?", zoneId));
+
+ verify(client).closeOperation(any());
+ }
+
+ @SuppressWarnings("resource")
+ @Test
+ public void testTablePreparedExecutionClosesPreviousResultSet() throws
Exception {
+ IoTDBTablePreparedStatement ps =
+ new IoTDBTablePreparedStatement(connection, client, sessionId, "SELECT
?", zoneId);
+ ResultSet previousResultSet = mock(ResultSet.class);
+ ps.resultSet = previousResultSet;
+
+ ps.setInt(1, 1);
+ ps.execute();
+
+ verify(previousResultSet).close();
+ assertNull(ps.resultSet);
+ }
+
+ @SuppressWarnings("resource")
+ @Test
+ public void testTablePreparedExecuteStoresQueryResultSet() throws Exception {
+ IoTDBTablePreparedStatement ps =
+ new IoTDBTablePreparedStatement(connection, client, sessionId, "SELECT
?", zoneId);
+ when(execStatementResp.isSetQueryResult()).thenReturn(true);
+
when(execStatementResp.getColumns()).thenReturn(Collections.singletonList("s1"));
+
when(execStatementResp.getDataTypeList()).thenReturn(Collections.singletonList("INT32"));
+ when(execStatementResp.getColumnIndex2TsBlockColumnIndexList())
+ .thenReturn(Collections.singletonList(0));
+ execStatementResp.queryResult = Collections.<ByteBuffer>emptyList();
+
+ ps.setInt(1, 1);
+
+ assertTrue(ps.execute());
+
+ ResultSet resultSet = ps.getResultSet();
+ assertNotNull(resultSet);
+ assertSame(resultSet, ps.getResultSet());
+ }
+
+ @SuppressWarnings("resource")
+ @Test
+ public void testTablePreparedCancelCancelsActiveQueryResult() throws
Exception {
+ long queryId = 12L;
+ IoTDBTablePreparedStatement ps =
+ new IoTDBTablePreparedStatement(connection, client, sessionId, "SELECT
?", zoneId);
+ when(execStatementResp.isSetQueryId()).thenReturn(true);
+ when(execStatementResp.getQueryId()).thenReturn(queryId);
+ when(execStatementResp.isSetQueryResult()).thenReturn(true);
+
when(execStatementResp.getColumns()).thenReturn(Collections.singletonList("s1"));
+
when(execStatementResp.getDataTypeList()).thenReturn(Collections.singletonList("INT32"));
+ when(execStatementResp.getColumnIndex2TsBlockColumnIndexList())
+ .thenReturn(Collections.singletonList(0));
+
when(client.cancelOperation(any(TSCancelOperationReq.class))).thenReturn(Status_SUCCESS);
+ execStatementResp.queryResult = Collections.<ByteBuffer>emptyList();
+
+ ps.setInt(1, 1);
+ assertTrue(ps.execute());
+
+ ps.cancel();
+
+ ArgumentCaptor<TSCancelOperationReq> cancelReq =
+ ArgumentCaptor.forClass(TSCancelOperationReq.class);
+ verify(client).cancelOperation(cancelReq.capture());
+ assertEquals(queryId, cancelReq.getValue().getQueryId());
+ }
+
+ @SuppressWarnings("resource")
+ @Test
+ public void testTablePreparedClosesUnexpectedQueryOperationWithoutResult()
throws Exception {
+ long statementId = 3L;
+ long queryId = 13L;
+ when(client.requestStatementId(anyLong())).thenReturn(statementId);
+ IoTDBTablePreparedStatement ps =
+ new IoTDBTablePreparedStatement(connection, client, sessionId, "SELECT
?", zoneId);
+ TSExecuteStatementResp resp = new TSExecuteStatementResp();
+ resp.setStatus(Status_SUCCESS);
+ resp.setQueryId(queryId);
+
when(client.executePreparedStatement(any(TSExecutePreparedReq.class))).thenReturn(resp);
+
+ ps.setInt(1, 1);
+
+ assertFalse(ps.execute());
+
+ ArgumentCaptor<TSCloseOperationReq> closeReq =
+ ArgumentCaptor.forClass(TSCloseOperationReq.class);
+ verify(client).closeOperation(closeReq.capture());
+ assertEquals(statementId, closeReq.getValue().getStatementId());
+ assertEquals(queryId, closeReq.getValue().getQueryId());
+ assertNull(ps.resultSet);
+ }
+
+ @SuppressWarnings("resource")
+ @Test
+ public void
testTablePreparedClosesQueryOperationWhenResultSetCreationFails() throws
Exception {
+ long queryId = 14L;
+ when(client.requestStatementId(anyLong())).thenReturn(4L);
+ IoTDBTablePreparedStatement ps =
+ new IoTDBTablePreparedStatement(connection, client, sessionId, "SELECT
?", zoneId);
+ TSExecuteStatementResp malformedResp = new TSExecuteStatementResp();
+ malformedResp.setStatus(Status_SUCCESS);
+ malformedResp.setQueryId(queryId);
+ malformedResp.setColumns(Collections.singletonList("s1"));
+ malformedResp.setDataTypeList(Collections.emptyList());
+ malformedResp.setQueryResult(Collections.<ByteBuffer>emptyList());
+ when(client.executePreparedStatement(any(TSExecutePreparedReq.class)))
+ .thenReturn(malformedResp);
+
+ ps.setInt(1, 1);
+
+ assertThrows(SQLException.class, ps::execute);
+
+ ArgumentCaptor<TSCloseOperationReq> closeReq =
+ ArgumentCaptor.forClass(TSCloseOperationReq.class);
+ verify(client).closeOperation(closeReq.capture());
+ assertEquals(4L, closeReq.getValue().getStatementId());
+ assertEquals(queryId, closeReq.getValue().getQueryId());
+ assertNull(ps.resultSet);
+ }
+
+ @SuppressWarnings("resource")
+ @Test
+ public void testTablePreparedUnsetParameterClosesPreviousResultSet() throws
Exception {
+ IoTDBTablePreparedStatement ps =
+ new IoTDBTablePreparedStatement(connection, client, sessionId, "SELECT
?", zoneId);
+ ResultSet previousResultSet = mock(ResultSet.class);
+ ps.resultSet = previousResultSet;
+
+ assertThrows(SQLException.class, ps::execute);
+
+ verify(previousResultSet).close();
+ assertNull(ps.resultSet);
+ }
+
+ @SuppressWarnings("resource")
+ @Test
+ public void testCloseClosesResultSetBeforeDeallocatingPreparedStatement()
throws Exception {
+ IoTDBTablePreparedStatement ps =
+ new IoTDBTablePreparedStatement(connection, client, sessionId, "SELECT
?", zoneId);
+ ResultSet resultSet = mock(ResultSet.class);
+ ps.resultSet = resultSet;
+
+ ps.close();
+
+ InOrder inOrder = inOrder(resultSet, client);
+ inOrder.verify(resultSet).close();
+ inOrder.verify(client).deallocatePreparedStatement(any());
+ assertTrue(ps.isClosed());
+ assertNull(ps.resultSet);
+ }
+
+ @SuppressWarnings("resource")
+ @Test
+ public void testCloseDeallocatesPreparedStatementWhenResultSetCloseFails()
throws Exception {
+ IoTDBTablePreparedStatement ps =
+ new IoTDBTablePreparedStatement(connection, client, sessionId, "SELECT
?", zoneId);
+ ResultSet resultSet = mock(ResultSet.class);
+ doThrow(new SQLException("result set close
failed")).when(resultSet).close();
+ ps.resultSet = resultSet;
+
+ SQLException closeException = assertThrows(SQLException.class, ps::close);
+
+ assertEquals("result set close failed", closeException.getMessage());
+ verify(client).deallocatePreparedStatement(any());
+ verify(client).closeOperation(any());
+ assertTrue(ps.isClosed());
+ assertNull(ps.resultSet);
+ }
+
@SuppressWarnings("resource")
@Test
public void testTableModelLoginInjectionWithComment() throws Exception {