DRILL-2993: Unthrottle at cancel() to fix post-cancelation hangs. Added unthrottling in close(). Cleaned up throttling logic code: - Applied AtomicBoolean to eliminate race conditions. - Extracted methods for starting/stopping throttling.
Made small edits to some message: - Fixed missed, inconsistent ResultsListener log messages. - Clarified exception message. Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/091122c4 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/091122c4 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/091122c4 Branch: refs/heads/master Commit: 091122c4e651f1585118fcbf1a092627de03f0e4 Parents: ffbb9c7 Author: dbarclay <dbarc...@maprtech.com> Authored: Sat May 9 22:17:35 2015 -0700 Committer: Mehant Baid <meha...@gmail.com> Committed: Wed May 13 11:25:00 2015 -0700 ---------------------------------------------------------------------- .../drill/jdbc/impl/DrillResultSetImpl.java | 70 ++++++++++++++++---- 1 file changed, 57 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/091122c4/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java ---------------------------------------------------------------------- diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java index 2fe6c28..4fa1f2f 100644 --- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java +++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java @@ -24,6 +24,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import net.hydromatic.avatica.AvaticaPrepareResult; import net.hydromatic.avatica.AvaticaResultSet; @@ -96,7 +97,7 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS if ( hasPendingCancelationNotification ) { hasPendingCancelationNotification = false; throw new ExecutionCanceledSqlException( - "SQL statement execution canceled; resultSet closed." ); + "SQL statement execution canceled; ResultSet now closed." ); } else { throw new AlreadyClosedSqlException( "ResultSet is already closed." ); @@ -186,9 +187,18 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS private volatile QueryId queryId; private volatile UserException executionFailureException; + + // TODO: Revisit "completed". Determine and document exactly what it + // means. Some uses imply that it means that incoming messages indicate + // that the _query_ has _terminated_ (not necessarily _completing_ + // normally), while some uses imply that it's some other state of the + // ResultListener. Some uses seem redundant.) volatile boolean completed = false; - private volatile boolean autoread = true; + + /** Whether throttling of incoming data is active. */ + private final AtomicBoolean throttled = new AtomicBoolean( false ); private volatile ConnectionThrottle throttle; + private volatile boolean closed = false; // TODO: Rename. It's obvious it's a latch--but what condition or action // does it represent or control? @@ -203,6 +213,33 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS logger.debug( "Query listener created." ); } + /** + * Starts throttling if not currently throttling. + * @param throttle the "throttlable" object to throttle + * @return true if actually started (wasn't throttling already) + */ + private boolean startThrottlingIfNot( ConnectionThrottle throttle ) { + final boolean started = throttled.compareAndSet( false, true ); + if ( started ) { + this.throttle = throttle; + throttle.setAutoRead(false); + } + return started; + } + + /** + * Stops throttling if currently active. + * @return true if actually stopped (was throttling) + */ + private boolean stopThrottlingIfSo() { + final boolean stopped = throttled.compareAndSet( true, false ); + if ( stopped ) { + throttle.setAutoRead(true); + throttle = null; + } + return stopped; + } + // TODO: Doc.: Release what if what is first relative to what? private boolean releaseIfFirst() { if (receivedMessage.compareAndSet(false, true)) { @@ -221,7 +258,7 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS @Override public void submissionFailed(UserException ex) { - logger.debug( "Received query failure.", ex ); + logger.debug( "Received query failure:", ex ); this.executionFailureException = ex; completed = true; close(); @@ -230,11 +267,13 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS @Override public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) { - logger.debug( "Received query data: {}.", result ); + logger.debug( "Received query data batch: {}.", result ); // If we're in a closed state, just release the message. if (closed) { result.release(); + // TODO: Revisit member completed: Is ResultListener really completed + // after only one data batch after being closed? completed = true; return; } @@ -242,9 +281,9 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS // We're active; let's add to the queue. batchQueue.add(result); if (batchQueue.size() >= THROTTLING_QUEUE_SIZE_THRESHOLD - 1) { - throttle.setAutoRead(false); - this.throttle = throttle; - autoread = false; + if ( startThrottlingIfNot( throttle ) ) { + logger.debug( "Throttling started at queue size {}.", batchQueue.size() ); + } } releaseIfFirst(); @@ -252,7 +291,7 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS @Override public void queryCompleted(QueryState state) { - logger.debug( "Query completion arrived: {}.", state ); + logger.debug( "Received query completion: {}.", state ); releaseIfFirst(); completed = true; } @@ -282,12 +321,14 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS } else { QueryDataBatch q = batchQueue.poll(50, TimeUnit.MILLISECONDS); if (q != null) { - if (!autoread && batchQueue.size() < THROTTLING_QUEUE_SIZE_THRESHOLD / 2) { - autoread = true; - throttle.setAutoRead(true); - throttle = null; + assert THROTTLING_QUEUE_SIZE_THRESHOLD >= 2; + if (batchQueue.size() < THROTTLING_QUEUE_SIZE_THRESHOLD / 2) { + if ( stopThrottlingIfSo() ) { + logger.debug( "Throttling stopped at queue size {}.", + batchQueue.size() ); + } } - logger.debug( "Dequeued query data: {}.", q ); + logger.debug( "Dequeued query data batch: {}.", q ); return q; } } @@ -296,6 +337,9 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS void close() { closed = true; + if ( stopThrottlingIfSo() ) { + logger.debug( "Throttling stopped at close() (at queue size {}).", batchQueue.size() ); + } while (!batchQueue.isEmpty()) { QueryDataBatch qrb = batchQueue.poll(); if (qrb != null && qrb.getData() != null) {