This is an automated email from the ASF dual-hosted git repository.

wangchao316 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f21431  overload SessionPool.executeQueryStatement and make it 
support querying with timeout (#4150)
2f21431 is described below

commit 2f21431a2f04da6be1b9f058860e794775f2d7ba
Author: 刘威 <[email protected]>
AuthorDate: Thu Oct 14 15:32:17 2021 +0800

    overload SessionPool.executeQueryStatement and make it support querying 
with timeout (#4150)
    
    overload SessionPool.executeQueryStatement and make it support querying 
with timeout (#4150)
---
 .../org/apache/iotdb/session/pool/SessionPool.java | 33 ++++++++++++++++++++++
 .../apache/iotdb/session/pool/SessionPoolTest.java | 27 ++++++++++++++----
 2 files changed, 54 insertions(+), 6 deletions(-)

diff --git 
a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java 
b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 0cd6140..fd8f8e7 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -1335,6 +1335,39 @@ public class SessionPool {
   }
 
   /**
+   * execure query sql users must call closeResultSet(SessionDataSetWrapper) 
if they do not use the
+   * SessionDataSet any more. users do not need to call 
sessionDataSet.closeOpeationHandler() any
+   * more.
+   *
+   * @param sql query statement
+   * @param timeoutInMs the timeout of this query, in milliseconds
+   * @return result set Notice that you must get the result instance. 
Otherwise a data leakage will
+   *     happen
+   */
+  @SuppressWarnings("squid:S2095") // Suppress wrapper not closed warning
+  public SessionDataSetWrapper executeQueryStatement(String sql, long 
timeoutInMs)
+      throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        SessionDataSet resp = session.executeQueryStatement(sql, timeoutInMs);
+        SessionDataSetWrapper wrapper = new SessionDataSetWrapper(resp, 
session, this);
+        occupy(session);
+        return wrapper;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new 
one.
+        logger.warn("executeQueryStatement failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException | RuntimeException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+    // never go here
+    return null;
+  }
+
+  /**
    * execute non query statement
    *
    * @param sql non query statement
diff --git 
a/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java 
b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
index dc39d47..a48cb3e 100644
--- a/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
+++ b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
@@ -51,6 +51,7 @@ import static org.junit.Assert.fail;
 public class SessionPoolTest {
 
   private static final Logger logger = 
LoggerFactory.getLogger(SessionPoolTest.class);
+  private static final long DEFAULT_QUERY_TIMEOUT = -1;
   private final CompactionStrategy defaultCompaction =
       IoTDBDescriptor.getInstance().getConfig().getCompactionStrategy();
 
@@ -158,11 +159,18 @@ public class SessionPoolTest {
   @Test
   public void executeQueryStatement() {
     SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3);
-    correctQuery(pool);
+    correctQuery(pool, DEFAULT_QUERY_TIMEOUT);
     pool.close();
   }
 
-  private void correctQuery(SessionPool pool) {
+  @Test
+  public void executeQueryStatementWithTimeout() {
+    SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3);
+    correctQuery(pool, 2000);
+    pool.close();
+  }
+
+  private void correctQuery(SessionPool pool, long timeoutInMs) {
     ExecutorService service = Executors.newFixedThreadPool(10);
     write10Data(pool, true);
     // now let's query
@@ -171,8 +179,15 @@ public class SessionPoolTest {
       service.submit(
           () -> {
             try {
-              SessionDataSetWrapper wrapper =
-                  pool.executeQueryStatement("select * from root.sg1.d1 where 
time = " + no);
+              SessionDataSetWrapper wrapper;
+              if (timeoutInMs == DEFAULT_QUERY_TIMEOUT) {
+                wrapper =
+                    pool.executeQueryStatement("select * from root.sg1.d1 
where time = " + no);
+              } else {
+                wrapper =
+                    pool.executeQueryStatement(
+                        "select * from root.sg1.d1 where time = " + no, 
timeoutInMs);
+              }
               pool.closeResultSet(wrapper);
             } catch (Exception e) {
               logger.error("correctQuery failed", e);
@@ -269,7 +284,7 @@ public class SessionPoolTest {
               null,
               false,
               Config.DEFAULT_CONNECTION_TIMEOUT_MS);
-      correctQuery(pool);
+      correctQuery(pool, DEFAULT_QUERY_TIMEOUT);
       pool.close();
       return;
     } catch (StatementExecutionException e) {
@@ -301,7 +316,7 @@ public class SessionPoolTest {
                 null,
                 false,
                 Config.DEFAULT_CONNECTION_TIMEOUT_MS);
-        correctQuery(pool);
+        correctQuery(pool, DEFAULT_QUERY_TIMEOUT);
         pool.close();
       } catch (StatementExecutionException es) {
         fail("should be TTransportException but get an exception: " + 
e.getMessage());

Reply via email to