DRILL-3640: Support JDBC Statement.setQueryTimeout(int)

Allow for queries to be cancelled if they don't complete within the stipulated 
time.
This is done by having Drill[Prepared]StatementImpl create a Stopwatch timer to 
track elapsed time.
  * DrillCursor uses this to detect timeouts.
  * DrillResultSetImpl uses this to detech timeout from the client side (e.g. a 
slow client, when all batches have been processed by DrillCursor)
* Tests added to test these and other query timeout scenarios.
* Dependent on DRILL-5973 for enabling server-triggered timeout tests
NOTE: PreparedStatementTest.testServerTriggeredQueryTimeout is disabled

closes #1024


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/a9ea4ec1
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/a9ea4ec1
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/a9ea4ec1

Branch: refs/heads/master
Commit: a9ea4ec1c5645ddab4b7aef9ac060ff5f109b696
Parents: f421152
Author: Kunal Khatua <[email protected]>
Authored: Wed Nov 29 12:12:53 2017 -0800
Committer: Arina Ielchiieva <[email protected]>
Committed: Fri Jan 26 13:43:05 2018 +0200

----------------------------------------------------------------------
 .../src/main/resources/drill-module.conf        |   2 +-
 .../org/apache/drill/jdbc/DrillStatement.java   |  24 +--
 .../apache/drill/jdbc/SqlTimeoutException.java  |  14 +-
 .../org/apache/drill/jdbc/impl/DrillCursor.java |  61 +++++-
 .../drill/jdbc/impl/DrillResultSetImpl.java     |  22 +++
 .../drill/jdbc/impl/DrillStatementImpl.java     |  21 +-
 .../drill/jdbc/PreparedStatementTest.java       | 173 ++++++++++++++++
 .../org/apache/drill/jdbc/StatementTest.java    | 197 +++++++++++++++----
 8 files changed, 436 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/a9ea4ec1/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf 
b/exec/java-exec/src/main/resources/drill-module.conf
index 8ac8d7b..28b7975 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -54,7 +54,7 @@ drill.client: {
 drill.tmp-dir: "/tmp"
 drill.tmp-dir: ${?DRILL_TMP_DIR}
 drill.exec: {
-  cluster-id: "drillbits1"
+  cluster-id: "drillbits1",
   rpc: {
     user: {
       timeout: 30,

http://git-wip-us.apache.org/repos/asf/drill/blob/a9ea4ec1/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillStatement.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillStatement.java 
b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillStatement.java
index f2aea40..6db5f3a 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillStatement.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillStatement.java
@@ -16,10 +16,9 @@
  */
 package org.apache.drill.jdbc;
 
-import java.sql.SQLFeatureNotSupportedException;
+import java.sql.SQLException;
 import java.sql.Statement;
 
-
 /**
  * Drill-specific {@link Statement}.
  * @see #unwrap
@@ -27,34 +26,29 @@ import java.sql.Statement;
 public interface DrillStatement extends Statement {
 
   /**
-   * <strong>Drill</strong>:
-   * Returns zero, indicating that no timeout is set.
-   *
    * @throws  AlreadyClosedSqlException
    *            if connection is closed
+   * @throws  SQLException
+   *            Any other exception
    */
   @Override
-  int getQueryTimeout() throws AlreadyClosedSqlException;
+  int getQueryTimeout() throws AlreadyClosedSqlException, SQLException;
 
   /**
    * <strong>Drill</strong>:
-   * Not supported (for non-zero timeout value).
-   * <p>
-   *   Normally, just throws {@link SQLFeatureNotSupportedException} unless
-   *   request is trivially for no timeout (zero {@code milliseconds} value).
-   * </p>
+   * Supported (for non-zero timeout value).
    * @throws  AlreadyClosedSqlException
    *            if connection is closed
    * @throws  JdbcApiSqlException
    *            if an invalid parameter value is detected (and not above case)
-   * @throws  SQLFeatureNotSupportedException
-   *            if timeout is non-zero (and not above case)
+   * @throws  SQLException
+   *            Any other exception
    */
   @Override
-  void setQueryTimeout( int milliseconds )
+  void setQueryTimeout( int seconds )
       throws AlreadyClosedSqlException,
              JdbcApiSqlException,
-             SQLFeatureNotSupportedException;
+             SQLException;
 
   /**
    * {@inheritDoc}

http://git-wip-us.apache.org/repos/asf/drill/blob/a9ea4ec1/exec/jdbc/src/main/java/org/apache/drill/jdbc/SqlTimeoutException.java
----------------------------------------------------------------------
diff --git 
a/exec/jdbc/src/main/java/org/apache/drill/jdbc/SqlTimeoutException.java 
b/exec/jdbc/src/main/java/org/apache/drill/jdbc/SqlTimeoutException.java
index c24858e..d449916 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/SqlTimeoutException.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/SqlTimeoutException.java
@@ -17,18 +17,22 @@
  */
 package org.apache.drill.jdbc;
 
-import java.sql.SQLException;
+import java.sql.SQLTimeoutException;
 
 /**
  * Indicates that an operation timed out. This is not an error; you can
  * retry the operation.
  */
-public class SqlTimeoutException
-    extends SQLException
-{
+public class SqlTimeoutException extends SQLTimeoutException {
+  private static final long serialVersionUID = 2017_04_03L;
+
   SqlTimeoutException() {
     // SQLException(reason, SQLState, vendorCode)
     // REVIEW mb 19-Jul-05 Is there a standard SQLState?
     super("timeout", null, 0);
   }
-}
\ No newline at end of file
+
+  public SqlTimeoutException(long timeoutValueInSeconds) {
+    super("Query timed out in "+ timeoutValueInSeconds + " seconds");
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/a9ea4ec1/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillCursor.java
----------------------------------------------------------------------
diff --git 
a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillCursor.java 
b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillCursor.java
index 9b9a4c8..72c36dd 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillCursor.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillCursor.java
@@ -21,6 +21,7 @@ import static org.slf4j.LoggerFactory.getLogger;
 
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.SQLTimeoutException;
 import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.List;
@@ -29,6 +30,7 @@ import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.google.common.base.Stopwatch;
 import org.apache.calcite.avatica.AvaticaResultSet;
 import org.apache.calcite.avatica.AvaticaStatement;
 import org.apache.calcite.avatica.ColumnMetaData;
@@ -52,6 +54,7 @@ import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.rpc.user.UserResultsListener;
 import org.apache.drill.exec.store.ischema.InfoSchemaConstants;
 import org.apache.drill.jdbc.SchemaChangeListener;
+import org.apache.drill.jdbc.SqlTimeoutException;
 import org.slf4j.Logger;
 
 import com.google.common.collect.Queues;
@@ -100,13 +103,18 @@ class DrillCursor implements Cursor {
     final LinkedBlockingDeque<QueryDataBatch> batchQueue =
         Queues.newLinkedBlockingDeque();
 
+    private final DrillCursor parent;
+    Stopwatch elapsedTimer = null;
 
     /**
      * ...
-     * @param  batchQueueThrottlingThreshold
-     *         queue size threshold for throttling server
+     * @param parent
+     *        reference to DrillCursor
+     * @param batchQueueThrottlingThreshold
+     *        queue size threshold for throttling server
      */
-    ResultsListener( int batchQueueThrottlingThreshold ) {
+    ResultsListener(DrillCursor parent, int batchQueueThrottlingThreshold ) {
+      this.parent = parent;
       instanceId = nextInstanceId++;
       this.batchQueueThrottlingThreshold = batchQueueThrottlingThreshold;
       logger.debug( "[#{}] Query listener created.", instanceId );
@@ -139,8 +147,17 @@ class DrillCursor implements Cursor {
       return stopped;
     }
 
-    public void awaitFirstMessage() throws InterruptedException {
-      firstMessageReceived.await();
+    public void awaitFirstMessage() throws InterruptedException, 
SQLTimeoutException {
+      //Check if a non-zero timeout has been set
+      if ( parent.timeoutInMilliseconds > 0 ) {
+        //Identifying remaining in milliseconds to maintain a granularity 
close to integer value of timeout
+        long timeToTimeout = parent.timeoutInMilliseconds - 
parent.elapsedTimer.elapsed(TimeUnit.MILLISECONDS);
+        if ( timeToTimeout <= 0 || !firstMessageReceived.await(timeToTimeout, 
TimeUnit.MILLISECONDS)) {
+            throw new 
SqlTimeoutException(TimeUnit.MILLISECONDS.toSeconds(parent.timeoutInMilliseconds));
+        }
+      } else {
+        firstMessageReceived.await();
+      }
     }
 
     private void releaseIfFirst() {
@@ -212,7 +229,7 @@ class DrillCursor implements Cursor {
      * @throws InterruptedException
      *         if waiting on the queue was interrupted
      */
-    QueryDataBatch getNext() throws UserException, InterruptedException {
+    QueryDataBatch getNext() throws UserException, InterruptedException, 
SQLTimeoutException {
       while (true) {
         if (executionFailureException != null) {
           logger.debug( "[#{}] Dequeued query failure exception: {}.",
@@ -239,6 +256,11 @@ class DrillCursor implements Cursor {
             }
             return qdb;
           }
+
+          // Check and throw SQLTimeoutException
+          if ( parent.timeoutInMilliseconds > 0 && 
parent.elapsedTimer.elapsed(TimeUnit.MILLISECONDS) >= 
parent.timeoutInMilliseconds ) {
+            throw new 
SqlTimeoutException(TimeUnit.MILLISECONDS.toSeconds(parent.timeoutInMilliseconds));
+          }
         }
       }
     }
@@ -251,6 +273,7 @@ class DrillCursor implements Cursor {
                       instanceId, batchQueue.size() );
       }
       while (!batchQueue.isEmpty()) {
+        // Don't bother with query timeout, we're closing the cursor
         QueryDataBatch qdb = batchQueue.poll();
         if (qdb != null && qdb.getData() != null) {
           qdb.getData().release();
@@ -318,13 +341,17 @@ class DrillCursor implements Cursor {
    * (Not <i>row</i> number.) */
   private int currentRecordNumber = -1;
 
+  //Track timeout period
+  private long timeoutInMilliseconds = 0L;
+  private Stopwatch elapsedTimer;
 
   /**
    *
    * @param statement
    * @param signature
+   * @throws SQLException
    */
-  DrillCursor(DrillConnectionImpl connection, AvaticaStatement statement, 
Signature signature) {
+  DrillCursor(DrillConnectionImpl connection, AvaticaStatement statement, 
Signature signature) throws SQLException {
     this.connection = connection;
     this.statement = statement;
     this.signature = signature;
@@ -333,8 +360,9 @@ class DrillCursor implements Cursor {
     final int batchQueueThrottlingThreshold =
         client.getConfig().getInt(
             ExecConstants.JDBC_BATCH_QUEUE_THROTTLING_THRESHOLD );
-    resultsListener = new ResultsListener(batchQueueThrottlingThreshold);
+    resultsListener = new ResultsListener(this, batchQueueThrottlingThreshold);
     currentBatchHolder = new RecordBatchLoader(client.getAllocator());
+    setTimeout(this.statement.getQueryTimeout());
   }
 
   protected int getCurrentRecordNumber() {
@@ -376,6 +404,19 @@ class DrillCursor implements Cursor {
     currentBatchHolder.clear();
   }
 
+  long getTimeoutInMilliseconds() {
+    return timeoutInMilliseconds;
+  }
+
+  //Set the cursor's timeout in seconds
+  void setTimeout(int timeoutDurationInSeconds) {
+    this.timeoutInMilliseconds = 
TimeUnit.SECONDS.toMillis(timeoutDurationInSeconds);
+    //Starting the timer, since this is invoked via the ResultSet.execute() 
call
+    if (timeoutInMilliseconds > 0) {
+      elapsedTimer = Stopwatch.createStarted();
+    }
+  }
+
   /**
    * Updates column accessors and metadata from current record batch.
    */
@@ -615,4 +656,8 @@ class DrillCursor implements Cursor {
     return accessors.wasNull();
   }
 
+  public Stopwatch getElapsedTimer() {
+    return elapsedTimer;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/a9ea4ec1/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
----------------------------------------------------------------------
diff --git 
a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java 
b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
index f4fc588..6ca8ee2 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
@@ -32,6 +32,7 @@ import java.sql.ResultSetMetaData;
 import java.sql.RowId;
 import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
+import java.sql.SQLTimeoutException;
 import java.sql.SQLWarning;
 import java.sql.SQLXML;
 import java.sql.Time;
@@ -42,6 +43,7 @@ import java.util.Calendar;
 import java.util.List;
 import java.util.Map;
 import java.util.TimeZone;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.calcite.avatica.AvaticaResultSet;
 import org.apache.calcite.avatica.AvaticaSite;
@@ -54,7 +56,9 @@ import org.apache.calcite.avatica.util.Cursor.Accessor;
 import org.apache.drill.jdbc.AlreadyClosedSqlException;
 import org.apache.drill.jdbc.DrillResultSet;
 import org.apache.drill.jdbc.ExecutionCanceledSqlException;
+import org.apache.drill.jdbc.SqlTimeoutException;
 
+import com.google.common.base.Stopwatch;
 
 /**
  * Drill's implementation of {@link ResultSet}.
@@ -67,6 +71,10 @@ class DrillResultSetImpl extends AvaticaResultSet implements 
DrillResultSet {
   private final DrillConnectionImpl connection;
   private volatile boolean hasPendingCancelationNotification = false;
 
+  //Timeout Support Variables
+  private Stopwatch elapsedTimer;
+  private long queryTimeoutInMilliseconds;
+
   DrillResultSetImpl(AvaticaStatement statement, QueryState state, 
Meta.Signature signature,
                      ResultSetMetaData resultSetMetaData, TimeZone timeZone,
                      Meta.Frame firstFrame) {
@@ -86,6 +94,7 @@ class DrillResultSetImpl extends AvaticaResultSet implements 
DrillResultSet {
    */
   private void throwIfClosed() throws AlreadyClosedSqlException,
                                       ExecutionCanceledSqlException,
+                                      SQLTimeoutException,
                                       SQLException {
     if ( isClosed() ) {
       if (cursor instanceof DrillCursor && hasPendingCancelationNotification) {
@@ -97,6 +106,14 @@ class DrillResultSetImpl extends AvaticaResultSet 
implements DrillResultSet {
         throw new AlreadyClosedSqlException( "ResultSet is already closed." );
       }
     }
+
+    //Implicit check for whether timeout is set
+    if (elapsedTimer != null) {
+      //The timer has already been started by the DrillCursor at this point
+      if (elapsedTimer.elapsed(TimeUnit.MILLISECONDS) > 
this.queryTimeoutInMilliseconds) {
+        throw new 
SqlTimeoutException(TimeUnit.MILLISECONDS.toSeconds(this.queryTimeoutInMilliseconds));
+      }
+    }
   }
 
 
@@ -127,6 +144,7 @@ class DrillResultSetImpl extends AvaticaResultSet 
implements DrillResultSet {
   @Override
   public boolean next() throws SQLException {
     throwIfClosed();
+
     // TODO:  Resolve following comments (possibly obsolete because of later
     // addition of preceding call to throwIfClosed.  Also, NOTE that the
     // following check, and maybe some throwIfClosed() calls, probably must
@@ -1889,6 +1907,10 @@ class DrillResultSetImpl extends AvaticaResultSet 
implements DrillResultSet {
     }
     else {
       DrillCursor drillCursor = new DrillCursor(connection, statement, 
signature);
+      //Getting handle to elapsed timer for timeout purposes
+      this.elapsedTimer = drillCursor.getElapsedTimer();
+      //Setting this to ensure future calls to change timeouts for an active 
Statement doesn't affect ResultSet
+      this.queryTimeoutInMilliseconds = drillCursor.getTimeoutInMilliseconds();
       super.execute2(drillCursor, this.signature.columns);
 
       // Read first (schema-only) batch to initialize result-set metadata from

http://git-wip-us.apache.org/repos/asf/drill/blob/a9ea4ec1/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillStatementImpl.java
----------------------------------------------------------------------
diff --git 
a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillStatementImpl.java 
b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillStatementImpl.java
index a01bcf3..f664b52 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillStatementImpl.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillStatementImpl.java
@@ -156,29 +156,18 @@ class DrillStatementImpl extends AvaticaStatement 
implements DrillStatement,
   }
 
   @Override
-  public int getQueryTimeout() throws AlreadyClosedSqlException
+  public int getQueryTimeout() throws AlreadyClosedSqlException, SQLException
   {
     throwIfClosed();
-    return 0;  // (No no timeout.)
+    return super.getQueryTimeout();
   }
 
   @Override
-  public void setQueryTimeout( int milliseconds )
+  public void setQueryTimeout( int seconds )
       throws AlreadyClosedSqlException,
-             InvalidParameterSqlException,
-             SQLFeatureNotSupportedException {
+             SQLException {
     throwIfClosed();
-    if ( milliseconds < 0 ) {
-      throw new InvalidParameterSqlException(
-          "Invalid (negative) \"milliseconds\" parameter to 
setQueryTimeout(...)"
-          + " (" + milliseconds + ")" );
-    }
-    else {
-      if ( 0 != milliseconds ) {
-        throw new SQLFeatureNotSupportedException(
-            "Setting network timeout is not supported." );
-      }
-    }
+    super.setQueryTimeout(seconds);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/a9ea4ec1/exec/jdbc/src/test/java/org/apache/drill/jdbc/PreparedStatementTest.java
----------------------------------------------------------------------
diff --git 
a/exec/jdbc/src/test/java/org/apache/drill/jdbc/PreparedStatementTest.java 
b/exec/jdbc/src/test/java/org/apache/drill/jdbc/PreparedStatementTest.java
index 45bd9b8..94155e4 100644
--- a/exec/jdbc/src/test/java/org/apache/drill/jdbc/PreparedStatementTest.java
+++ b/exec/jdbc/src/test/java/org/apache/drill/jdbc/PreparedStatementTest.java
@@ -42,18 +42,24 @@ import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
+import java.sql.SQLTimeoutException;
 import java.sql.Statement;
 import java.sql.Timestamp;
 import java.util.List;
 import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.physical.impl.ScreenCreator;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.store.ischema.InfoSchemaConstants;
+import org.apache.drill.exec.testing.Controls;
 import org.apache.drill.categories.JdbcTest;
 import org.hamcrest.Matcher;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import com.google.common.collect.ImmutableList;
@@ -65,6 +71,14 @@ import org.junit.experimental.categories.Category;
 @Category(JdbcTest.class)
 public class PreparedStatementTest extends JdbcTestBase {
 
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(PreparedStatementTest.class);
+
+  private static final String SYS_VERSION_SQL = "select * from sys.version";
+  private static final String SYS_RANDOM_SQL =
+      "SELECT cast(random() as varchar) as myStr FROM (VALUES(1)) " +
+      "union SELECT cast(random() as varchar) as myStr FROM (VALUES(1)) " +
+      "union SELECT cast(random() as varchar) as myStr FROM (VALUES(1)) ";
+
   /** Fuzzy matcher for parameters-not-supported message assertions.  (Based on
    *  current "Prepared-statement dynamic parameters are not supported.") */
   private static final Matcher<String> PARAMETERS_NOT_SUPPORTED_MSG_MATCHER =
@@ -237,6 +251,165 @@ public class PreparedStatementTest extends JdbcTestBase {
     }
   }
 
+  /**
+   * Test for reading of default query timeout
+   */
+  @Test
+  public void testDefaultGetQueryTimeout() throws SQLException {
+    try (PreparedStatement stmt = 
connection.prepareStatement(SYS_VERSION_SQL)) {
+      int timeoutValue = stmt.getQueryTimeout();
+      assertEquals( 0L , timeoutValue );
+    }
+  }
+
+  /**
+   * Test Invalid parameter by giving negative timeout
+   */
+  @Test
+  public void testInvalidSetQueryTimeout() throws SQLException {
+    try (PreparedStatement stmt = 
connection.prepareStatement(SYS_VERSION_SQL)) {
+      //Setting negative value
+      int valueToSet = -10;
+      try {
+        stmt.setQueryTimeout(valueToSet);
+      } catch ( final SQLException e) {
+        assertThat( e.getMessage(), containsString( "illegal timeout value") );
+      }
+    }
+  }
+
+  /**
+   * Test setting a valid timeout
+   */
+  @Test
+  public void testValidSetQueryTimeout() throws SQLException {
+    try (PreparedStatement stmt = 
connection.prepareStatement(SYS_VERSION_SQL)) {
+      //Setting positive value
+      int valueToSet = new Random(20150304).nextInt(59)+1;
+      logger.info("Setting timeout as {} seconds", valueToSet);
+      stmt.setQueryTimeout(valueToSet);
+      assertEquals( valueToSet , stmt.getQueryTimeout() );
+    }
+  }
+
+  /**
+   * Test setting timeout as zero and executing
+   */
+  @Test
+  public void testSetQueryTimeoutAsZero() throws SQLException {
+    try (PreparedStatement stmt = connection.prepareStatement(SYS_RANDOM_SQL)) 
{
+      stmt.setQueryTimeout(0);
+      stmt.executeQuery();
+      ResultSet rs = stmt.getResultSet();
+      int rowCount = 0;
+      while (rs.next()) {
+        rs.getBytes(1);
+        rowCount++;
+      }
+      assertEquals( 3 , rowCount );
+    }
+  }
+
+  /**
+   * Test setting timeout for a query that actually times out
+   */
+  @Test
+  public void testClientTriggeredQueryTimeout() throws Exception {
+    //Setting to a very low value (3sec)
+    int timeoutDuration = 3;
+    int rowsCounted = 0;
+    try (PreparedStatement stmt = connection.prepareStatement(SYS_RANDOM_SQL)) 
{
+      stmt.setQueryTimeout(timeoutDuration);
+      logger.info("Set a timeout of {} seconds", stmt.getQueryTimeout());
+      ResultSet rs = stmt.executeQuery();
+      //Fetch each row and pause (simulate a slow client)
+      try {
+        while (rs.next()) {
+          rs.getString(1);
+          rowsCounted++;
+          //Pause briefly (a second beyond the timeout) before attempting to 
fetch rows
+          try {
+            Thread.sleep( TimeUnit.SECONDS.toMillis(timeoutDuration + 1) );
+          } catch (InterruptedException e) {/*DoNothing*/}
+          logger.info("Paused for {} seconds", (timeoutDuration+1));
+        }
+      } catch (SQLTimeoutException sqlEx) {
+        logger.info("Counted "+rowsCounted+" rows before hitting timeout");
+        return; //Successfully return
+      }
+    }
+    //Throw an exception to indicate that we shouldn't have reached this point
+    throw new Exception("Failed to trigger timeout of "+ timeoutDuration + " 
sec");
+  }
+
+  /**
+   * Test setting timeout for a query that actually times out because of lack 
of timely server response
+   */
+  @Ignore ( "Pause Injection appears broken for PreparedStatement" )
+  @Test ( expected = SqlTimeoutException.class )
+  public void testServerTriggeredQueryTimeout() throws Exception {
+    //Setting to a very low value (2sec)
+    int timeoutDuration = 2;
+    //Server will be paused marginally longer than the test timeout
+    long serverPause = timeoutDuration + 2;
+    //Additional time for JDBC timeout and server pauses to complete
+    int cleanupPause = 3;
+
+    //Simulate a lack of timely server response by injecting a pause in the 
Screen operator's sending-data RPC
+    final String controls = Controls.newBuilder()
+        .addTimedPause(ScreenCreator.class, "sending-data", 0, 
TimeUnit.SECONDS.toMillis(serverPause))
+        .build();
+
+    //Fetching an exclusive connection since injected pause affects all 
sessions on the connection
+    try ( Connection exclusiveConnection = new Driver().connect( 
"jdbc:drill:zk=local", null )) {
+      try(Statement stmt = exclusiveConnection.createStatement()) {
+        assertThat(
+            stmt.execute(String.format(
+                "ALTER session SET `%s` = '%s'",
+                ExecConstants.DRILLBIT_CONTROL_INJECTIONS, controls)),
+            equalTo(true));
+      }
+
+      try (PreparedStatement pStmt = 
exclusiveConnection.prepareStatement(SYS_RANDOM_SQL)) {
+        pStmt.setQueryTimeout(timeoutDuration);
+        logger.info("Set a timeout of {} seconds", pStmt.getQueryTimeout());
+
+        //Executing a prepared statement with the paused server. Expecting 
timeout to occur here
+        ResultSet rs = pStmt.executeQuery();
+        //Fetch rows
+        while (rs.next()) {
+          rs.getBytes(1);
+        }
+      } catch (SQLTimeoutException sqlEx) {
+        logger.info("SQLTimeoutException thrown: {}", sqlEx.getMessage());
+        throw (SqlTimeoutException) sqlEx;
+      } finally {
+        //Pause briefly to wait for server to unblock
+        try {
+          Thread.sleep( TimeUnit.SECONDS.toMillis(cleanupPause) );
+        } catch (InterruptedException e) {/*DoNothing*/}
+      }
+    }
+  }
+
+  /**
+   * Test setting timeout that never gets triggered
+   */
+  @Test
+  public void testNonTriggeredQueryTimeout() throws SQLException {
+    try (PreparedStatement stmt = 
connection.prepareStatement(SYS_VERSION_SQL)) {
+      stmt.setQueryTimeout(60);
+      stmt.executeQuery();
+      ResultSet rs = stmt.getResultSet();
+      int rowCount = 0;
+      while (rs.next()) {
+        rs.getBytes(1);
+        rowCount++;
+      }
+      assertEquals( 1 , rowCount );
+    }
+  }
+
   //////////
   // Parameters-not-implemented tests:
 

http://git-wip-us.apache.org/repos/asf/drill/blob/a9ea4ec1/exec/jdbc/src/test/java/org/apache/drill/jdbc/StatementTest.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/test/java/org/apache/drill/jdbc/StatementTest.java 
b/exec/jdbc/src/test/java/org/apache/drill/jdbc/StatementTest.java
index b01ff2c..d91fc3d 100644
--- a/exec/jdbc/src/test/java/org/apache/drill/jdbc/StatementTest.java
+++ b/exec/jdbc/src/test/java/org/apache/drill/jdbc/StatementTest.java
@@ -19,16 +19,28 @@ package org.apache.drill.jdbc;
 
 import static org.hamcrest.CoreMatchers.*;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertEquals;
 
 import org.apache.drill.categories.JdbcTest;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.physical.impl.ScreenCreator;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.testing.Controls;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import java.sql.Connection;
+import java.sql.ResultSet;
 import java.sql.Statement;
+import java.util.Date;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
 import java.sql.SQLFeatureNotSupportedException;
+import java.sql.SQLTimeoutException;
 import java.sql.SQLException;
 
 /**
@@ -37,6 +49,14 @@ import java.sql.SQLException;
 @Category(JdbcTest.class)
 public class StatementTest extends JdbcTestBase {
 
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(StatementTest.class);
+
+  private static final String SYS_VERSION_SQL = "select * from sys.version";
+  private static final String SYS_RANDOM_SQL =
+      "SELECT cast(random() as varchar) as myStr FROM (VALUES(1)) " +
+      "union SELECT cast(random() as varchar) as myStr FROM (VALUES(1)) " +
+      "union SELECT cast(random() as varchar) as myStr FROM (VALUES(1)) ";
+
   private static Connection connection;
   private static Statement statement;
 
@@ -46,7 +66,6 @@ public class StatementTest extends JdbcTestBase {
     // Connection--and other JDBC objects--on test method failure, but this 
test
     // class uses some objects across methods.)
     connection = new Driver().connect( "jdbc:drill:zk=local", null );
-    statement = connection.createStatement();
   }
 
   @AfterClass
@@ -61,54 +80,166 @@ public class StatementTest extends JdbcTestBase {
   //////////
   // getQueryTimeout():
 
-  /** Tests that getQueryTimeout() indicates no timeout set. */
+  /**
+   * Test for reading of default query timeout
+   */
   @Test
-  public void testGetQueryTimeoutSaysNoTimeout() throws SQLException {
-    assertThat( statement.getQueryTimeout(), equalTo( 0 ) );
+  public void testDefaultGetQueryTimeout() throws SQLException {
+    try(Statement stmt = connection.createStatement()) {
+      int timeoutValue = stmt.getQueryTimeout();
+      assertEquals( 0 , timeoutValue );
+    }
   }
 
   //////////
   // setQueryTimeout(...):
 
-  /** Tests that setQueryTimeout(...) accepts (redundantly) setting to
-   *  no-timeout mode. */
+  /**
+   * Test Invalid parameter by giving negative timeout
+   */
   @Test
-  public void testSetQueryTimeoutAcceptsNotimeoutRequest() throws SQLException 
{
-    statement.setQueryTimeout( 0 );
+  public void testInvalidSetQueryTimeout() throws SQLException {
+    try (Statement stmt = connection.createStatement()) {
+      //Setting negative value
+      int valueToSet = -10;
+      try {
+        stmt.setQueryTimeout(valueToSet);
+      } catch ( final SQLException e) {
+        assertThat( e.getMessage(), containsString( "illegal timeout value") );
+      }
+    }
   }
 
-  /** Tests that setQueryTimeout(...) rejects setting a timeout. */
-  @Test( expected = SQLFeatureNotSupportedException.class )
-  public void testSetQueryTimeoutRejectsTimeoutRequest() throws SQLException {
-    try {
-      statement.setQueryTimeout( 1_000 );
+  /**
+   * Test setting a valid timeout
+   */
+  @Test
+  public void testValidSetQueryTimeout() throws SQLException {
+    try (Statement stmt = connection.createStatement()) {
+      //Setting positive value
+      int valueToSet = new Random(20150304).nextInt(59)+1;
+      logger.info("Setting timeout as {} seconds", valueToSet);
+      stmt.setQueryTimeout(valueToSet);
+      assertEquals( valueToSet , stmt.getQueryTimeout() );
     }
-    catch ( SQLFeatureNotSupportedException e ) {
-      // Check exception for some mention of query timeout:
-      assertThat( e.getMessage(), anyOf( containsString( "Timeout" ),
-                                         containsString( "timeout" ) ) );
-      throw e;
+  }
+
+
+  /**
+   * Test setting timeout that never gets triggered
+   */
+  @Test
+  public void testSetQueryTimeoutAsZero() throws SQLException {
+    try (Statement stmt = connection.createStatement()) {
+      stmt.setQueryTimeout(0);
+      stmt.executeQuery(SYS_RANDOM_SQL);
+      ResultSet rs = stmt.getResultSet();
+      int rowCount = 0;
+      while (rs.next()) {
+        rs.getBytes(1);
+        rowCount++;
+      }
+      assertEquals( 3 , rowCount );
     }
   }
 
-  /** Tests that setQueryTimeout(...) rejects setting a timeout (different
-   *  value). */
-  @Test( expected = SQLFeatureNotSupportedException.class )
-  public void testSetQueryTimeoutRejectsTimeoutRequest2() throws SQLException {
-    statement.setQueryTimeout( Integer.MAX_VALUE / 2 );
+  /**
+   * Test setting timeout for a query that actually times out because of lack 
of timely client response
+   */
+  @Test
+  public void testClientTriggeredQueryTimeout() throws Exception {
+    //Setting to a very low value (3sec)
+    int timeoutDuration = 3;
+    int rowsCounted = 0;
+    try (Statement stmt = connection.createStatement()) {
+      stmt.setQueryTimeout(timeoutDuration);
+      logger.info("Set a timeout of {} seconds", stmt.getQueryTimeout());
+      ResultSet rs = stmt.executeQuery(SYS_RANDOM_SQL);
+      //Fetch each row and pause (simulate a slow client)
+      try {
+        while (rs.next()) {
+          rs.getString(1);
+          rowsCounted++;
+          //Pause briefly (a second beyond the timeout) before attempting to 
fetch rows
+          try {
+            Thread.sleep( TimeUnit.SECONDS.toMillis(timeoutDuration + 1) );
+          } catch (InterruptedException e) {/*DoNothing*/}
+          logger.info("Paused for {} seconds", (timeoutDuration+1));
+        }
+      } catch (SQLTimeoutException sqlEx) {
+        logger.info("Counted "+rowsCounted+" rows before hitting timeout");
+        return; //Successfully return
+      }
+    }
+    //Throw an exception to indicate that we shouldn't have reached this point
+    throw new Exception("Failed to trigger timeout of "+ timeoutDuration + " 
sec");
   }
 
-  @Test( expected = InvalidParameterSqlException.class )
-  public void testSetQueryTimeoutRejectsBadTimeoutValue() throws SQLException {
-    try {
-      statement.setQueryTimeout( -2 );
+  /**
+   * Test setting timeout for a query that actually times out because of lack 
of timely server response
+   */
+  @Test ( expected = SqlTimeoutException.class )
+  public void testServerTriggeredQueryTimeout() throws Exception {
+    //Setting to a very low value (2sec)
+    int timeoutDuration = 2;
+    //Server will be paused marginally longer than the test timeout
+    long serverPause = timeoutDuration + 2;
+    //Additional time for JDBC timeout and server pauses to complete
+    int cleanupPause = 3;
+
+    //Simulate a lack of timely server response by injecting a pause in the 
Screen operator's sending-data RPC
+    final String controls = Controls.newBuilder()
+        .addTimedPause(ScreenCreator.class, "sending-data", 0, 
TimeUnit.SECONDS.toMillis(serverPause))
+        .build();
+
+    //Fetching an exclusive connection since injected pause affects all 
sessions on the connection
+    try ( Connection exclusiveConnection = new Driver().connect( 
"jdbc:drill:zk=local", null )) {
+      try(Statement stmt = exclusiveConnection.createStatement()) {
+        assertThat(
+            stmt.execute(String.format(
+                "ALTER session SET `%s` = '%s'",
+                ExecConstants.DRILLBIT_CONTROL_INJECTIONS, controls)),
+            equalTo(true));
+      }
+
+      try(Statement stmt = exclusiveConnection.createStatement()) {
+        stmt.setQueryTimeout(timeoutDuration);
+        logger.info("Set a timeout of {} seconds", stmt.getQueryTimeout());
+
+        //Executing a query with the paused server. Expecting timeout to occur 
here
+        ResultSet rs = stmt.executeQuery(SYS_VERSION_SQL);
+        //Fetch rows
+        while (rs.next()) {
+          rs.getBytes(1);
+        }
+      } catch (SQLTimeoutException sqlEx) {
+        logger.info("SQLTimeoutException thrown: {}", sqlEx.getMessage());
+        throw (SqlTimeoutException) sqlEx;
+      } finally {
+        //Pause briefly to wait for server to unblock
+        try {
+          Thread.sleep( TimeUnit.SECONDS.toMillis(cleanupPause) );
+        } catch (InterruptedException e) {/*DoNothing*/}
+      }
     }
-    catch ( InvalidParameterSqlException e ) {
-      // Check exception for some mention of parameter name or semantics:
-      assertThat( e.getMessage(), anyOf( containsString( "milliseconds" ),
-                                         containsString( "timeout" ),
-                                         containsString( "Timeout" ) ) );
-      throw e;
+  }
+
+
+  /**
+   * Test setting timeout that never gets triggered
+   */
+  @Test
+  public void testNonTriggeredQueryTimeout() throws SQLException {
+    try (Statement stmt = connection.createStatement()) {
+      stmt.setQueryTimeout(60);
+      stmt.executeQuery(SYS_VERSION_SQL);
+      ResultSet rs = stmt.getResultSet();
+      int rowCount = 0;
+      while (rs.next()) {
+        rs.getBytes(1);
+        rowCount++;
+      }
+      assertEquals( 1 , rowCount );
     }
   }
 

Reply via email to