This is an automated email from the ASF dual-hosted git repository.
wangchao316 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 2f21431 overload SessionPool.executeQueryStatement and make it
support querying with timeout (#4150)
2f21431 is described below
commit 2f21431a2f04da6be1b9f058860e794775f2d7ba
Author: 刘威 <[email protected]>
AuthorDate: Thu Oct 14 15:32:17 2021 +0800
overload SessionPool.executeQueryStatement and make it support querying
with timeout (#4150)
overload SessionPool.executeQueryStatement and make it support querying
with timeout (#4150)
---
.../org/apache/iotdb/session/pool/SessionPool.java | 33 ++++++++++++++++++++++
.../apache/iotdb/session/pool/SessionPoolTest.java | 27 ++++++++++++++----
2 files changed, 54 insertions(+), 6 deletions(-)
diff --git
a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 0cd6140..fd8f8e7 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -1335,6 +1335,39 @@ public class SessionPool {
}
/**
+ * execure query sql users must call closeResultSet(SessionDataSetWrapper)
if they do not use the
+ * SessionDataSet any more. users do not need to call
sessionDataSet.closeOpeationHandler() any
+ * more.
+ *
+ * @param sql query statement
+ * @param timeoutInMs the timeout of this query, in milliseconds
+ * @return result set Notice that you must get the result instance.
Otherwise a data leakage will
+ * happen
+ */
+ @SuppressWarnings("squid:S2095") // Suppress wrapper not closed warning
+ public SessionDataSetWrapper executeQueryStatement(String sql, long
timeoutInMs)
+ throws IoTDBConnectionException, StatementExecutionException {
+ for (int i = 0; i < RETRY; i++) {
+ Session session = getSession();
+ try {
+ SessionDataSet resp = session.executeQueryStatement(sql, timeoutInMs);
+ SessionDataSetWrapper wrapper = new SessionDataSetWrapper(resp,
session, this);
+ occupy(session);
+ return wrapper;
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new
one.
+ logger.warn("executeQueryStatement failed", e);
+ cleanSessionAndMayThrowConnectionException(session, i, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ }
+ }
+ // never go here
+ return null;
+ }
+
+ /**
* execute non query statement
*
* @param sql non query statement
diff --git
a/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
index dc39d47..a48cb3e 100644
--- a/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
+++ b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
@@ -51,6 +51,7 @@ import static org.junit.Assert.fail;
public class SessionPoolTest {
private static final Logger logger =
LoggerFactory.getLogger(SessionPoolTest.class);
+ private static final long DEFAULT_QUERY_TIMEOUT = -1;
private final CompactionStrategy defaultCompaction =
IoTDBDescriptor.getInstance().getConfig().getCompactionStrategy();
@@ -158,11 +159,18 @@ public class SessionPoolTest {
@Test
public void executeQueryStatement() {
SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3);
- correctQuery(pool);
+ correctQuery(pool, DEFAULT_QUERY_TIMEOUT);
pool.close();
}
- private void correctQuery(SessionPool pool) {
+ @Test
+ public void executeQueryStatementWithTimeout() {
+ SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3);
+ correctQuery(pool, 2000);
+ pool.close();
+ }
+
+ private void correctQuery(SessionPool pool, long timeoutInMs) {
ExecutorService service = Executors.newFixedThreadPool(10);
write10Data(pool, true);
// now let's query
@@ -171,8 +179,15 @@ public class SessionPoolTest {
service.submit(
() -> {
try {
- SessionDataSetWrapper wrapper =
- pool.executeQueryStatement("select * from root.sg1.d1 where
time = " + no);
+ SessionDataSetWrapper wrapper;
+ if (timeoutInMs == DEFAULT_QUERY_TIMEOUT) {
+ wrapper =
+ pool.executeQueryStatement("select * from root.sg1.d1
where time = " + no);
+ } else {
+ wrapper =
+ pool.executeQueryStatement(
+ "select * from root.sg1.d1 where time = " + no,
timeoutInMs);
+ }
pool.closeResultSet(wrapper);
} catch (Exception e) {
logger.error("correctQuery failed", e);
@@ -269,7 +284,7 @@ public class SessionPoolTest {
null,
false,
Config.DEFAULT_CONNECTION_TIMEOUT_MS);
- correctQuery(pool);
+ correctQuery(pool, DEFAULT_QUERY_TIMEOUT);
pool.close();
return;
} catch (StatementExecutionException e) {
@@ -301,7 +316,7 @@ public class SessionPoolTest {
null,
false,
Config.DEFAULT_CONNECTION_TIMEOUT_MS);
- correctQuery(pool);
+ correctQuery(pool, DEFAULT_QUERY_TIMEOUT);
pool.close();
} catch (StatementExecutionException es) {
fail("should be TTransportException but get an exception: " +
e.getMessage());