DRILL-3640: Support JDBC Statement.setQueryTimeout(int) Allow for queries to be cancelled if they don't complete within the stipulated time. This is done by having Drill[Prepared]StatementImpl create a Stopwatch timer to track elapsed time. * DrillCursor uses this to detect timeouts. * DrillResultSetImpl uses this to detech timeout from the client side (e.g. a slow client, when all batches have been processed by DrillCursor) * Tests added to test these and other query timeout scenarios. * Dependent on DRILL-5973 for enabling server-triggered timeout tests NOTE: PreparedStatementTest.testServerTriggeredQueryTimeout is disabled
closes #1024 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/a9ea4ec1 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/a9ea4ec1 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/a9ea4ec1 Branch: refs/heads/master Commit: a9ea4ec1c5645ddab4b7aef9ac060ff5f109b696 Parents: f421152 Author: Kunal Khatua <[email protected]> Authored: Wed Nov 29 12:12:53 2017 -0800 Committer: Arina Ielchiieva <[email protected]> Committed: Fri Jan 26 13:43:05 2018 +0200 ---------------------------------------------------------------------- .../src/main/resources/drill-module.conf | 2 +- .../org/apache/drill/jdbc/DrillStatement.java | 24 +-- .../apache/drill/jdbc/SqlTimeoutException.java | 14 +- .../org/apache/drill/jdbc/impl/DrillCursor.java | 61 +++++- .../drill/jdbc/impl/DrillResultSetImpl.java | 22 +++ .../drill/jdbc/impl/DrillStatementImpl.java | 21 +- .../drill/jdbc/PreparedStatementTest.java | 173 ++++++++++++++++ .../org/apache/drill/jdbc/StatementTest.java | 197 +++++++++++++++---- 8 files changed, 436 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/a9ea4ec1/exec/java-exec/src/main/resources/drill-module.conf ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index 8ac8d7b..28b7975 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -54,7 +54,7 @@ drill.client: { drill.tmp-dir: "/tmp" drill.tmp-dir: ${?DRILL_TMP_DIR} drill.exec: { - cluster-id: "drillbits1" + cluster-id: "drillbits1", rpc: { user: { timeout: 30, http://git-wip-us.apache.org/repos/asf/drill/blob/a9ea4ec1/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillStatement.java ---------------------------------------------------------------------- diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillStatement.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillStatement.java index f2aea40..6db5f3a 100644 --- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillStatement.java +++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillStatement.java @@ -16,10 +16,9 @@ */ package org.apache.drill.jdbc; -import java.sql.SQLFeatureNotSupportedException; +import java.sql.SQLException; import java.sql.Statement; - /** * Drill-specific {@link Statement}. * @see #unwrap @@ -27,34 +26,29 @@ import java.sql.Statement; public interface DrillStatement extends Statement { /** - * <strong>Drill</strong>: - * Returns zero, indicating that no timeout is set. - * * @throws AlreadyClosedSqlException * if connection is closed + * @throws SQLException + * Any other exception */ @Override - int getQueryTimeout() throws AlreadyClosedSqlException; + int getQueryTimeout() throws AlreadyClosedSqlException, SQLException; /** * <strong>Drill</strong>: - * Not supported (for non-zero timeout value). - * <p> - * Normally, just throws {@link SQLFeatureNotSupportedException} unless - * request is trivially for no timeout (zero {@code milliseconds} value). - * </p> + * Supported (for non-zero timeout value). * @throws AlreadyClosedSqlException * if connection is closed * @throws JdbcApiSqlException * if an invalid parameter value is detected (and not above case) - * @throws SQLFeatureNotSupportedException - * if timeout is non-zero (and not above case) + * @throws SQLException + * Any other exception */ @Override - void setQueryTimeout( int milliseconds ) + void setQueryTimeout( int seconds ) throws AlreadyClosedSqlException, JdbcApiSqlException, - SQLFeatureNotSupportedException; + SQLException; /** * {@inheritDoc} http://git-wip-us.apache.org/repos/asf/drill/blob/a9ea4ec1/exec/jdbc/src/main/java/org/apache/drill/jdbc/SqlTimeoutException.java ---------------------------------------------------------------------- diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/SqlTimeoutException.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/SqlTimeoutException.java index c24858e..d449916 100644 --- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/SqlTimeoutException.java +++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/SqlTimeoutException.java @@ -17,18 +17,22 @@ */ package org.apache.drill.jdbc; -import java.sql.SQLException; +import java.sql.SQLTimeoutException; /** * Indicates that an operation timed out. This is not an error; you can * retry the operation. */ -public class SqlTimeoutException - extends SQLException -{ +public class SqlTimeoutException extends SQLTimeoutException { + private static final long serialVersionUID = 2017_04_03L; + SqlTimeoutException() { // SQLException(reason, SQLState, vendorCode) // REVIEW mb 19-Jul-05 Is there a standard SQLState? super("timeout", null, 0); } -} \ No newline at end of file + + public SqlTimeoutException(long timeoutValueInSeconds) { + super("Query timed out in "+ timeoutValueInSeconds + " seconds"); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/a9ea4ec1/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillCursor.java ---------------------------------------------------------------------- diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillCursor.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillCursor.java index 9b9a4c8..72c36dd 100644 --- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillCursor.java +++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillCursor.java @@ -21,6 +21,7 @@ import static org.slf4j.LoggerFactory.getLogger; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.SQLTimeoutException; import java.util.ArrayList; import java.util.Calendar; import java.util.List; @@ -29,6 +30,7 @@ import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.base.Stopwatch; import org.apache.calcite.avatica.AvaticaResultSet; import org.apache.calcite.avatica.AvaticaStatement; import org.apache.calcite.avatica.ColumnMetaData; @@ -52,6 +54,7 @@ import org.apache.drill.exec.rpc.user.QueryDataBatch; import org.apache.drill.exec.rpc.user.UserResultsListener; import org.apache.drill.exec.store.ischema.InfoSchemaConstants; import org.apache.drill.jdbc.SchemaChangeListener; +import org.apache.drill.jdbc.SqlTimeoutException; import org.slf4j.Logger; import com.google.common.collect.Queues; @@ -100,13 +103,18 @@ class DrillCursor implements Cursor { final LinkedBlockingDeque<QueryDataBatch> batchQueue = Queues.newLinkedBlockingDeque(); + private final DrillCursor parent; + Stopwatch elapsedTimer = null; /** * ... - * @param batchQueueThrottlingThreshold - * queue size threshold for throttling server + * @param parent + * reference to DrillCursor + * @param batchQueueThrottlingThreshold + * queue size threshold for throttling server */ - ResultsListener( int batchQueueThrottlingThreshold ) { + ResultsListener(DrillCursor parent, int batchQueueThrottlingThreshold ) { + this.parent = parent; instanceId = nextInstanceId++; this.batchQueueThrottlingThreshold = batchQueueThrottlingThreshold; logger.debug( "[#{}] Query listener created.", instanceId ); @@ -139,8 +147,17 @@ class DrillCursor implements Cursor { return stopped; } - public void awaitFirstMessage() throws InterruptedException { - firstMessageReceived.await(); + public void awaitFirstMessage() throws InterruptedException, SQLTimeoutException { + //Check if a non-zero timeout has been set + if ( parent.timeoutInMilliseconds > 0 ) { + //Identifying remaining in milliseconds to maintain a granularity close to integer value of timeout + long timeToTimeout = parent.timeoutInMilliseconds - parent.elapsedTimer.elapsed(TimeUnit.MILLISECONDS); + if ( timeToTimeout <= 0 || !firstMessageReceived.await(timeToTimeout, TimeUnit.MILLISECONDS)) { + throw new SqlTimeoutException(TimeUnit.MILLISECONDS.toSeconds(parent.timeoutInMilliseconds)); + } + } else { + firstMessageReceived.await(); + } } private void releaseIfFirst() { @@ -212,7 +229,7 @@ class DrillCursor implements Cursor { * @throws InterruptedException * if waiting on the queue was interrupted */ - QueryDataBatch getNext() throws UserException, InterruptedException { + QueryDataBatch getNext() throws UserException, InterruptedException, SQLTimeoutException { while (true) { if (executionFailureException != null) { logger.debug( "[#{}] Dequeued query failure exception: {}.", @@ -239,6 +256,11 @@ class DrillCursor implements Cursor { } return qdb; } + + // Check and throw SQLTimeoutException + if ( parent.timeoutInMilliseconds > 0 && parent.elapsedTimer.elapsed(TimeUnit.MILLISECONDS) >= parent.timeoutInMilliseconds ) { + throw new SqlTimeoutException(TimeUnit.MILLISECONDS.toSeconds(parent.timeoutInMilliseconds)); + } } } } @@ -251,6 +273,7 @@ class DrillCursor implements Cursor { instanceId, batchQueue.size() ); } while (!batchQueue.isEmpty()) { + // Don't bother with query timeout, we're closing the cursor QueryDataBatch qdb = batchQueue.poll(); if (qdb != null && qdb.getData() != null) { qdb.getData().release(); @@ -318,13 +341,17 @@ class DrillCursor implements Cursor { * (Not <i>row</i> number.) */ private int currentRecordNumber = -1; + //Track timeout period + private long timeoutInMilliseconds = 0L; + private Stopwatch elapsedTimer; /** * * @param statement * @param signature + * @throws SQLException */ - DrillCursor(DrillConnectionImpl connection, AvaticaStatement statement, Signature signature) { + DrillCursor(DrillConnectionImpl connection, AvaticaStatement statement, Signature signature) throws SQLException { this.connection = connection; this.statement = statement; this.signature = signature; @@ -333,8 +360,9 @@ class DrillCursor implements Cursor { final int batchQueueThrottlingThreshold = client.getConfig().getInt( ExecConstants.JDBC_BATCH_QUEUE_THROTTLING_THRESHOLD ); - resultsListener = new ResultsListener(batchQueueThrottlingThreshold); + resultsListener = new ResultsListener(this, batchQueueThrottlingThreshold); currentBatchHolder = new RecordBatchLoader(client.getAllocator()); + setTimeout(this.statement.getQueryTimeout()); } protected int getCurrentRecordNumber() { @@ -376,6 +404,19 @@ class DrillCursor implements Cursor { currentBatchHolder.clear(); } + long getTimeoutInMilliseconds() { + return timeoutInMilliseconds; + } + + //Set the cursor's timeout in seconds + void setTimeout(int timeoutDurationInSeconds) { + this.timeoutInMilliseconds = TimeUnit.SECONDS.toMillis(timeoutDurationInSeconds); + //Starting the timer, since this is invoked via the ResultSet.execute() call + if (timeoutInMilliseconds > 0) { + elapsedTimer = Stopwatch.createStarted(); + } + } + /** * Updates column accessors and metadata from current record batch. */ @@ -615,4 +656,8 @@ class DrillCursor implements Cursor { return accessors.wasNull(); } + public Stopwatch getElapsedTimer() { + return elapsedTimer; + } + } http://git-wip-us.apache.org/repos/asf/drill/blob/a9ea4ec1/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java ---------------------------------------------------------------------- diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java index f4fc588..6ca8ee2 100644 --- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java +++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java @@ -32,6 +32,7 @@ import java.sql.ResultSetMetaData; import java.sql.RowId; import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; +import java.sql.SQLTimeoutException; import java.sql.SQLWarning; import java.sql.SQLXML; import java.sql.Time; @@ -42,6 +43,7 @@ import java.util.Calendar; import java.util.List; import java.util.Map; import java.util.TimeZone; +import java.util.concurrent.TimeUnit; import org.apache.calcite.avatica.AvaticaResultSet; import org.apache.calcite.avatica.AvaticaSite; @@ -54,7 +56,9 @@ import org.apache.calcite.avatica.util.Cursor.Accessor; import org.apache.drill.jdbc.AlreadyClosedSqlException; import org.apache.drill.jdbc.DrillResultSet; import org.apache.drill.jdbc.ExecutionCanceledSqlException; +import org.apache.drill.jdbc.SqlTimeoutException; +import com.google.common.base.Stopwatch; /** * Drill's implementation of {@link ResultSet}. @@ -67,6 +71,10 @@ class DrillResultSetImpl extends AvaticaResultSet implements DrillResultSet { private final DrillConnectionImpl connection; private volatile boolean hasPendingCancelationNotification = false; + //Timeout Support Variables + private Stopwatch elapsedTimer; + private long queryTimeoutInMilliseconds; + DrillResultSetImpl(AvaticaStatement statement, QueryState state, Meta.Signature signature, ResultSetMetaData resultSetMetaData, TimeZone timeZone, Meta.Frame firstFrame) { @@ -86,6 +94,7 @@ class DrillResultSetImpl extends AvaticaResultSet implements DrillResultSet { */ private void throwIfClosed() throws AlreadyClosedSqlException, ExecutionCanceledSqlException, + SQLTimeoutException, SQLException { if ( isClosed() ) { if (cursor instanceof DrillCursor && hasPendingCancelationNotification) { @@ -97,6 +106,14 @@ class DrillResultSetImpl extends AvaticaResultSet implements DrillResultSet { throw new AlreadyClosedSqlException( "ResultSet is already closed." ); } } + + //Implicit check for whether timeout is set + if (elapsedTimer != null) { + //The timer has already been started by the DrillCursor at this point + if (elapsedTimer.elapsed(TimeUnit.MILLISECONDS) > this.queryTimeoutInMilliseconds) { + throw new SqlTimeoutException(TimeUnit.MILLISECONDS.toSeconds(this.queryTimeoutInMilliseconds)); + } + } } @@ -127,6 +144,7 @@ class DrillResultSetImpl extends AvaticaResultSet implements DrillResultSet { @Override public boolean next() throws SQLException { throwIfClosed(); + // TODO: Resolve following comments (possibly obsolete because of later // addition of preceding call to throwIfClosed. Also, NOTE that the // following check, and maybe some throwIfClosed() calls, probably must @@ -1889,6 +1907,10 @@ class DrillResultSetImpl extends AvaticaResultSet implements DrillResultSet { } else { DrillCursor drillCursor = new DrillCursor(connection, statement, signature); + //Getting handle to elapsed timer for timeout purposes + this.elapsedTimer = drillCursor.getElapsedTimer(); + //Setting this to ensure future calls to change timeouts for an active Statement doesn't affect ResultSet + this.queryTimeoutInMilliseconds = drillCursor.getTimeoutInMilliseconds(); super.execute2(drillCursor, this.signature.columns); // Read first (schema-only) batch to initialize result-set metadata from http://git-wip-us.apache.org/repos/asf/drill/blob/a9ea4ec1/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillStatementImpl.java ---------------------------------------------------------------------- diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillStatementImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillStatementImpl.java index a01bcf3..f664b52 100644 --- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillStatementImpl.java +++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillStatementImpl.java @@ -156,29 +156,18 @@ class DrillStatementImpl extends AvaticaStatement implements DrillStatement, } @Override - public int getQueryTimeout() throws AlreadyClosedSqlException + public int getQueryTimeout() throws AlreadyClosedSqlException, SQLException { throwIfClosed(); - return 0; // (No no timeout.) + return super.getQueryTimeout(); } @Override - public void setQueryTimeout( int milliseconds ) + public void setQueryTimeout( int seconds ) throws AlreadyClosedSqlException, - InvalidParameterSqlException, - SQLFeatureNotSupportedException { + SQLException { throwIfClosed(); - if ( milliseconds < 0 ) { - throw new InvalidParameterSqlException( - "Invalid (negative) \"milliseconds\" parameter to setQueryTimeout(...)" - + " (" + milliseconds + ")" ); - } - else { - if ( 0 != milliseconds ) { - throw new SQLFeatureNotSupportedException( - "Setting network timeout is not supported." ); - } - } + super.setQueryTimeout(seconds); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/a9ea4ec1/exec/jdbc/src/test/java/org/apache/drill/jdbc/PreparedStatementTest.java ---------------------------------------------------------------------- diff --git a/exec/jdbc/src/test/java/org/apache/drill/jdbc/PreparedStatementTest.java b/exec/jdbc/src/test/java/org/apache/drill/jdbc/PreparedStatementTest.java index 45bd9b8..94155e4 100644 --- a/exec/jdbc/src/test/java/org/apache/drill/jdbc/PreparedStatementTest.java +++ b/exec/jdbc/src/test/java/org/apache/drill/jdbc/PreparedStatementTest.java @@ -42,18 +42,24 @@ import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; +import java.sql.SQLTimeoutException; import java.sql.Statement; import java.sql.Timestamp; import java.util.List; import java.util.Properties; +import java.util.Random; +import java.util.concurrent.TimeUnit; import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.physical.impl.ScreenCreator; import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.store.ischema.InfoSchemaConstants; +import org.apache.drill.exec.testing.Controls; import org.apache.drill.categories.JdbcTest; import org.hamcrest.Matcher; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; import com.google.common.collect.ImmutableList; @@ -65,6 +71,14 @@ import org.junit.experimental.categories.Category; @Category(JdbcTest.class) public class PreparedStatementTest extends JdbcTestBase { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PreparedStatementTest.class); + + private static final String SYS_VERSION_SQL = "select * from sys.version"; + private static final String SYS_RANDOM_SQL = + "SELECT cast(random() as varchar) as myStr FROM (VALUES(1)) " + + "union SELECT cast(random() as varchar) as myStr FROM (VALUES(1)) " + + "union SELECT cast(random() as varchar) as myStr FROM (VALUES(1)) "; + /** Fuzzy matcher for parameters-not-supported message assertions. (Based on * current "Prepared-statement dynamic parameters are not supported.") */ private static final Matcher<String> PARAMETERS_NOT_SUPPORTED_MSG_MATCHER = @@ -237,6 +251,165 @@ public class PreparedStatementTest extends JdbcTestBase { } } + /** + * Test for reading of default query timeout + */ + @Test + public void testDefaultGetQueryTimeout() throws SQLException { + try (PreparedStatement stmt = connection.prepareStatement(SYS_VERSION_SQL)) { + int timeoutValue = stmt.getQueryTimeout(); + assertEquals( 0L , timeoutValue ); + } + } + + /** + * Test Invalid parameter by giving negative timeout + */ + @Test + public void testInvalidSetQueryTimeout() throws SQLException { + try (PreparedStatement stmt = connection.prepareStatement(SYS_VERSION_SQL)) { + //Setting negative value + int valueToSet = -10; + try { + stmt.setQueryTimeout(valueToSet); + } catch ( final SQLException e) { + assertThat( e.getMessage(), containsString( "illegal timeout value") ); + } + } + } + + /** + * Test setting a valid timeout + */ + @Test + public void testValidSetQueryTimeout() throws SQLException { + try (PreparedStatement stmt = connection.prepareStatement(SYS_VERSION_SQL)) { + //Setting positive value + int valueToSet = new Random(20150304).nextInt(59)+1; + logger.info("Setting timeout as {} seconds", valueToSet); + stmt.setQueryTimeout(valueToSet); + assertEquals( valueToSet , stmt.getQueryTimeout() ); + } + } + + /** + * Test setting timeout as zero and executing + */ + @Test + public void testSetQueryTimeoutAsZero() throws SQLException { + try (PreparedStatement stmt = connection.prepareStatement(SYS_RANDOM_SQL)) { + stmt.setQueryTimeout(0); + stmt.executeQuery(); + ResultSet rs = stmt.getResultSet(); + int rowCount = 0; + while (rs.next()) { + rs.getBytes(1); + rowCount++; + } + assertEquals( 3 , rowCount ); + } + } + + /** + * Test setting timeout for a query that actually times out + */ + @Test + public void testClientTriggeredQueryTimeout() throws Exception { + //Setting to a very low value (3sec) + int timeoutDuration = 3; + int rowsCounted = 0; + try (PreparedStatement stmt = connection.prepareStatement(SYS_RANDOM_SQL)) { + stmt.setQueryTimeout(timeoutDuration); + logger.info("Set a timeout of {} seconds", stmt.getQueryTimeout()); + ResultSet rs = stmt.executeQuery(); + //Fetch each row and pause (simulate a slow client) + try { + while (rs.next()) { + rs.getString(1); + rowsCounted++; + //Pause briefly (a second beyond the timeout) before attempting to fetch rows + try { + Thread.sleep( TimeUnit.SECONDS.toMillis(timeoutDuration + 1) ); + } catch (InterruptedException e) {/*DoNothing*/} + logger.info("Paused for {} seconds", (timeoutDuration+1)); + } + } catch (SQLTimeoutException sqlEx) { + logger.info("Counted "+rowsCounted+" rows before hitting timeout"); + return; //Successfully return + } + } + //Throw an exception to indicate that we shouldn't have reached this point + throw new Exception("Failed to trigger timeout of "+ timeoutDuration + " sec"); + } + + /** + * Test setting timeout for a query that actually times out because of lack of timely server response + */ + @Ignore ( "Pause Injection appears broken for PreparedStatement" ) + @Test ( expected = SqlTimeoutException.class ) + public void testServerTriggeredQueryTimeout() throws Exception { + //Setting to a very low value (2sec) + int timeoutDuration = 2; + //Server will be paused marginally longer than the test timeout + long serverPause = timeoutDuration + 2; + //Additional time for JDBC timeout and server pauses to complete + int cleanupPause = 3; + + //Simulate a lack of timely server response by injecting a pause in the Screen operator's sending-data RPC + final String controls = Controls.newBuilder() + .addTimedPause(ScreenCreator.class, "sending-data", 0, TimeUnit.SECONDS.toMillis(serverPause)) + .build(); + + //Fetching an exclusive connection since injected pause affects all sessions on the connection + try ( Connection exclusiveConnection = new Driver().connect( "jdbc:drill:zk=local", null )) { + try(Statement stmt = exclusiveConnection.createStatement()) { + assertThat( + stmt.execute(String.format( + "ALTER session SET `%s` = '%s'", + ExecConstants.DRILLBIT_CONTROL_INJECTIONS, controls)), + equalTo(true)); + } + + try (PreparedStatement pStmt = exclusiveConnection.prepareStatement(SYS_RANDOM_SQL)) { + pStmt.setQueryTimeout(timeoutDuration); + logger.info("Set a timeout of {} seconds", pStmt.getQueryTimeout()); + + //Executing a prepared statement with the paused server. Expecting timeout to occur here + ResultSet rs = pStmt.executeQuery(); + //Fetch rows + while (rs.next()) { + rs.getBytes(1); + } + } catch (SQLTimeoutException sqlEx) { + logger.info("SQLTimeoutException thrown: {}", sqlEx.getMessage()); + throw (SqlTimeoutException) sqlEx; + } finally { + //Pause briefly to wait for server to unblock + try { + Thread.sleep( TimeUnit.SECONDS.toMillis(cleanupPause) ); + } catch (InterruptedException e) {/*DoNothing*/} + } + } + } + + /** + * Test setting timeout that never gets triggered + */ + @Test + public void testNonTriggeredQueryTimeout() throws SQLException { + try (PreparedStatement stmt = connection.prepareStatement(SYS_VERSION_SQL)) { + stmt.setQueryTimeout(60); + stmt.executeQuery(); + ResultSet rs = stmt.getResultSet(); + int rowCount = 0; + while (rs.next()) { + rs.getBytes(1); + rowCount++; + } + assertEquals( 1 , rowCount ); + } + } + ////////// // Parameters-not-implemented tests: http://git-wip-us.apache.org/repos/asf/drill/blob/a9ea4ec1/exec/jdbc/src/test/java/org/apache/drill/jdbc/StatementTest.java ---------------------------------------------------------------------- diff --git a/exec/jdbc/src/test/java/org/apache/drill/jdbc/StatementTest.java b/exec/jdbc/src/test/java/org/apache/drill/jdbc/StatementTest.java index b01ff2c..d91fc3d 100644 --- a/exec/jdbc/src/test/java/org/apache/drill/jdbc/StatementTest.java +++ b/exec/jdbc/src/test/java/org/apache/drill/jdbc/StatementTest.java @@ -19,16 +19,28 @@ package org.apache.drill.jdbc; import static org.hamcrest.CoreMatchers.*; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertEquals; import org.apache.drill.categories.JdbcTest; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.client.DrillClient; +import org.apache.drill.exec.physical.impl.ScreenCreator; +import org.apache.drill.exec.proto.helper.QueryIdHelper; +import org.apache.drill.exec.testing.Controls; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; import java.sql.Connection; +import java.sql.ResultSet; import java.sql.Statement; +import java.util.Date; +import java.util.Random; +import java.util.concurrent.TimeUnit; import java.sql.SQLFeatureNotSupportedException; +import java.sql.SQLTimeoutException; import java.sql.SQLException; /** @@ -37,6 +49,14 @@ import java.sql.SQLException; @Category(JdbcTest.class) public class StatementTest extends JdbcTestBase { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StatementTest.class); + + private static final String SYS_VERSION_SQL = "select * from sys.version"; + private static final String SYS_RANDOM_SQL = + "SELECT cast(random() as varchar) as myStr FROM (VALUES(1)) " + + "union SELECT cast(random() as varchar) as myStr FROM (VALUES(1)) " + + "union SELECT cast(random() as varchar) as myStr FROM (VALUES(1)) "; + private static Connection connection; private static Statement statement; @@ -46,7 +66,6 @@ public class StatementTest extends JdbcTestBase { // Connection--and other JDBC objects--on test method failure, but this test // class uses some objects across methods.) connection = new Driver().connect( "jdbc:drill:zk=local", null ); - statement = connection.createStatement(); } @AfterClass @@ -61,54 +80,166 @@ public class StatementTest extends JdbcTestBase { ////////// // getQueryTimeout(): - /** Tests that getQueryTimeout() indicates no timeout set. */ + /** + * Test for reading of default query timeout + */ @Test - public void testGetQueryTimeoutSaysNoTimeout() throws SQLException { - assertThat( statement.getQueryTimeout(), equalTo( 0 ) ); + public void testDefaultGetQueryTimeout() throws SQLException { + try(Statement stmt = connection.createStatement()) { + int timeoutValue = stmt.getQueryTimeout(); + assertEquals( 0 , timeoutValue ); + } } ////////// // setQueryTimeout(...): - /** Tests that setQueryTimeout(...) accepts (redundantly) setting to - * no-timeout mode. */ + /** + * Test Invalid parameter by giving negative timeout + */ @Test - public void testSetQueryTimeoutAcceptsNotimeoutRequest() throws SQLException { - statement.setQueryTimeout( 0 ); + public void testInvalidSetQueryTimeout() throws SQLException { + try (Statement stmt = connection.createStatement()) { + //Setting negative value + int valueToSet = -10; + try { + stmt.setQueryTimeout(valueToSet); + } catch ( final SQLException e) { + assertThat( e.getMessage(), containsString( "illegal timeout value") ); + } + } } - /** Tests that setQueryTimeout(...) rejects setting a timeout. */ - @Test( expected = SQLFeatureNotSupportedException.class ) - public void testSetQueryTimeoutRejectsTimeoutRequest() throws SQLException { - try { - statement.setQueryTimeout( 1_000 ); + /** + * Test setting a valid timeout + */ + @Test + public void testValidSetQueryTimeout() throws SQLException { + try (Statement stmt = connection.createStatement()) { + //Setting positive value + int valueToSet = new Random(20150304).nextInt(59)+1; + logger.info("Setting timeout as {} seconds", valueToSet); + stmt.setQueryTimeout(valueToSet); + assertEquals( valueToSet , stmt.getQueryTimeout() ); } - catch ( SQLFeatureNotSupportedException e ) { - // Check exception for some mention of query timeout: - assertThat( e.getMessage(), anyOf( containsString( "Timeout" ), - containsString( "timeout" ) ) ); - throw e; + } + + + /** + * Test setting timeout that never gets triggered + */ + @Test + public void testSetQueryTimeoutAsZero() throws SQLException { + try (Statement stmt = connection.createStatement()) { + stmt.setQueryTimeout(0); + stmt.executeQuery(SYS_RANDOM_SQL); + ResultSet rs = stmt.getResultSet(); + int rowCount = 0; + while (rs.next()) { + rs.getBytes(1); + rowCount++; + } + assertEquals( 3 , rowCount ); } } - /** Tests that setQueryTimeout(...) rejects setting a timeout (different - * value). */ - @Test( expected = SQLFeatureNotSupportedException.class ) - public void testSetQueryTimeoutRejectsTimeoutRequest2() throws SQLException { - statement.setQueryTimeout( Integer.MAX_VALUE / 2 ); + /** + * Test setting timeout for a query that actually times out because of lack of timely client response + */ + @Test + public void testClientTriggeredQueryTimeout() throws Exception { + //Setting to a very low value (3sec) + int timeoutDuration = 3; + int rowsCounted = 0; + try (Statement stmt = connection.createStatement()) { + stmt.setQueryTimeout(timeoutDuration); + logger.info("Set a timeout of {} seconds", stmt.getQueryTimeout()); + ResultSet rs = stmt.executeQuery(SYS_RANDOM_SQL); + //Fetch each row and pause (simulate a slow client) + try { + while (rs.next()) { + rs.getString(1); + rowsCounted++; + //Pause briefly (a second beyond the timeout) before attempting to fetch rows + try { + Thread.sleep( TimeUnit.SECONDS.toMillis(timeoutDuration + 1) ); + } catch (InterruptedException e) {/*DoNothing*/} + logger.info("Paused for {} seconds", (timeoutDuration+1)); + } + } catch (SQLTimeoutException sqlEx) { + logger.info("Counted "+rowsCounted+" rows before hitting timeout"); + return; //Successfully return + } + } + //Throw an exception to indicate that we shouldn't have reached this point + throw new Exception("Failed to trigger timeout of "+ timeoutDuration + " sec"); } - @Test( expected = InvalidParameterSqlException.class ) - public void testSetQueryTimeoutRejectsBadTimeoutValue() throws SQLException { - try { - statement.setQueryTimeout( -2 ); + /** + * Test setting timeout for a query that actually times out because of lack of timely server response + */ + @Test ( expected = SqlTimeoutException.class ) + public void testServerTriggeredQueryTimeout() throws Exception { + //Setting to a very low value (2sec) + int timeoutDuration = 2; + //Server will be paused marginally longer than the test timeout + long serverPause = timeoutDuration + 2; + //Additional time for JDBC timeout and server pauses to complete + int cleanupPause = 3; + + //Simulate a lack of timely server response by injecting a pause in the Screen operator's sending-data RPC + final String controls = Controls.newBuilder() + .addTimedPause(ScreenCreator.class, "sending-data", 0, TimeUnit.SECONDS.toMillis(serverPause)) + .build(); + + //Fetching an exclusive connection since injected pause affects all sessions on the connection + try ( Connection exclusiveConnection = new Driver().connect( "jdbc:drill:zk=local", null )) { + try(Statement stmt = exclusiveConnection.createStatement()) { + assertThat( + stmt.execute(String.format( + "ALTER session SET `%s` = '%s'", + ExecConstants.DRILLBIT_CONTROL_INJECTIONS, controls)), + equalTo(true)); + } + + try(Statement stmt = exclusiveConnection.createStatement()) { + stmt.setQueryTimeout(timeoutDuration); + logger.info("Set a timeout of {} seconds", stmt.getQueryTimeout()); + + //Executing a query with the paused server. Expecting timeout to occur here + ResultSet rs = stmt.executeQuery(SYS_VERSION_SQL); + //Fetch rows + while (rs.next()) { + rs.getBytes(1); + } + } catch (SQLTimeoutException sqlEx) { + logger.info("SQLTimeoutException thrown: {}", sqlEx.getMessage()); + throw (SqlTimeoutException) sqlEx; + } finally { + //Pause briefly to wait for server to unblock + try { + Thread.sleep( TimeUnit.SECONDS.toMillis(cleanupPause) ); + } catch (InterruptedException e) {/*DoNothing*/} + } } - catch ( InvalidParameterSqlException e ) { - // Check exception for some mention of parameter name or semantics: - assertThat( e.getMessage(), anyOf( containsString( "milliseconds" ), - containsString( "timeout" ), - containsString( "Timeout" ) ) ); - throw e; + } + + + /** + * Test setting timeout that never gets triggered + */ + @Test + public void testNonTriggeredQueryTimeout() throws SQLException { + try (Statement stmt = connection.createStatement()) { + stmt.setQueryTimeout(60); + stmt.executeQuery(SYS_VERSION_SQL); + ResultSet rs = stmt.getResultSet(); + int rowCount = 0; + while (rs.next()) { + rs.getBytes(1); + rowCount++; + } + assertEquals( 1 , rowCount ); } }
