DRILL-4994: Refactor DrillCursor Refactor DrillCursor to be more self-contained.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/ab60855b Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/ab60855b Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/ab60855b Branch: refs/heads/master Commit: ab60855bf390e8f01369760f019ee06eecf1959e Parents: e2b5271 Author: Laurent Goujon <laur...@dremio.com> Authored: Fri Nov 4 13:31:19 2016 -0700 Committer: Jinfeng Ni <j...@apache.org> Committed: Wed Mar 1 23:15:09 2017 -0800 ---------------------------------------------------------------------- .../jdbc/impl/AvaticaDrillSqlAccessor.java | 4 +- .../org/apache/drill/jdbc/impl/DrillCursor.java | 323 +++++++++++++++++-- .../drill/jdbc/impl/DrillResultSetImpl.java | 302 +---------------- 3 files changed, 317 insertions(+), 312 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/ab60855b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/AvaticaDrillSqlAccessor.java ---------------------------------------------------------------------- diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/AvaticaDrillSqlAccessor.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/AvaticaDrillSqlAccessor.java index 5a48e59..914e279 100644 --- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/AvaticaDrillSqlAccessor.java +++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/AvaticaDrillSqlAccessor.java @@ -64,11 +64,11 @@ class AvaticaDrillSqlAccessor implements Accessor { // so in that case row can be left at -1, so isBeforeFirst() returns true // even though we're not longer before the empty set of rows--and it's all // private, so we can't get to it to override any of several candidates. - if ( cursor.getResultSet().isAfterLast() ) { + if ( cursor.isAfterLast() ) { throw new InvalidCursorStateSqlException( "Result set cursor is already positioned past all rows." ); } - else if ( cursor.getResultSet().isBeforeFirst() ) { + else if ( cursor.isBeforeFirst() ) { throw new InvalidCursorStateSqlException( "Result set cursor is positioned before all rows. Call next() first." ); } http://git-wip-us.apache.org/repos/asf/drill/blob/ab60855b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillCursor.java ---------------------------------------------------------------------- diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillCursor.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillCursor.java index 08570a8..ed279a3 100644 --- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillCursor.java +++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillCursor.java @@ -24,33 +24,260 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.Calendar; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.calcite.avatica.AvaticaResultSet; +import org.apache.calcite.avatica.AvaticaStatement; import org.apache.calcite.avatica.ColumnMetaData; +import org.apache.calcite.avatica.Meta; +import org.apache.calcite.avatica.Meta.Signature; import org.apache.calcite.avatica.util.ArrayImpl.Factory; import org.apache.calcite.avatica.util.Cursor; import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.client.DrillClient; import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.proto.UserBitShared.QueryId; +import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; +import org.apache.drill.exec.proto.UserBitShared.QueryType; +import org.apache.drill.exec.proto.helper.QueryIdHelper; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.RecordBatchLoader; +import org.apache.drill.exec.rpc.ConnectionThrottle; import org.apache.drill.exec.rpc.user.QueryDataBatch; +import org.apache.drill.exec.rpc.user.UserResultsListener; import org.apache.drill.exec.store.ischema.InfoSchemaConstants; +import org.apache.drill.jdbc.SchemaChangeListener; import org.slf4j.Logger; +import com.google.common.collect.Queues; + class DrillCursor implements Cursor { + + //////////////////////////////////////// + // ResultsListener: + static class ResultsListener implements UserResultsListener { + private static final org.slf4j.Logger logger = + org.slf4j.LoggerFactory.getLogger(ResultsListener.class); + + private static volatile int nextInstanceId = 1; + + /** (Just for logging.) */ + private final int instanceId; + + private final int batchQueueThrottlingThreshold; + + /** (Just for logging.) */ + private volatile QueryId queryId; + + /** (Just for logging.) */ + private int lastReceivedBatchNumber; + /** (Just for logging.) */ + private int lastDequeuedBatchNumber; + + private volatile UserException executionFailureException; + + // TODO: Revisit "completed". Determine and document exactly what it + // means. Some uses imply that it means that incoming messages indicate + // that the _query_ has _terminated_ (not necessarily _completing_ + // normally), while some uses imply that it's some other state of the + // ResultListener. Some uses seem redundant.) + volatile boolean completed = false; + + /** Whether throttling of incoming data is active. */ + private final AtomicBoolean throttled = new AtomicBoolean( false ); + private volatile ConnectionThrottle throttle; + + private volatile boolean closed = false; + + private final CountDownLatch firstMessageReceived = new CountDownLatch(1); + + final LinkedBlockingDeque<QueryDataBatch> batchQueue = + Queues.newLinkedBlockingDeque(); + + + /** + * ... + * @param batchQueueThrottlingThreshold + * queue size threshold for throttling server + */ + ResultsListener( int batchQueueThrottlingThreshold ) { + instanceId = nextInstanceId++; + this.batchQueueThrottlingThreshold = batchQueueThrottlingThreshold; + logger.debug( "[#{}] Query listener created.", instanceId ); + } + + /** + * Starts throttling if not currently throttling. + * @param throttle the "throttlable" object to throttle + * @return true if actually started (wasn't throttling already) + */ + private boolean startThrottlingIfNot( ConnectionThrottle throttle ) { + final boolean started = throttled.compareAndSet( false, true ); + if ( started ) { + this.throttle = throttle; + throttle.setAutoRead(false); + } + return started; + } + + /** + * Stops throttling if currently throttling. + * @return true if actually stopped (was throttling) + */ + private boolean stopThrottlingIfSo() { + final boolean stopped = throttled.compareAndSet( true, false ); + if ( stopped ) { + throttle.setAutoRead(true); + throttle = null; + } + return stopped; + } + + public void awaitFirstMessage() throws InterruptedException { + firstMessageReceived.await(); + } + + private void releaseIfFirst() { + firstMessageReceived.countDown(); + } + + @Override + public void queryIdArrived(QueryId queryId) { + logger.debug( "[#{}] Received query ID: {}.", + instanceId, QueryIdHelper.getQueryId( queryId ) ); + this.queryId = queryId; + } + + @Override + public void submissionFailed(UserException ex) { + logger.debug( "Received query failure:", instanceId, ex ); + this.executionFailureException = ex; + completed = true; + close(); + logger.info( "[#{}] Query failed: ", instanceId, ex ); + } + + @Override + public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) { + lastReceivedBatchNumber++; + logger.debug( "[#{}] Received query data batch #{}: {}.", + instanceId, lastReceivedBatchNumber, result ); + + // If we're in a closed state, just release the message. + if (closed) { + result.release(); + // TODO: Revisit member completed: Is ResultListener really completed + // after only one data batch after being closed? + completed = true; + return; + } + + // We're active; let's add to the queue. + batchQueue.add(result); + + // Throttle server if queue size has exceed threshold. + if (batchQueue.size() > batchQueueThrottlingThreshold ) { + if ( startThrottlingIfNot( throttle ) ) { + logger.debug( "[#{}] Throttling started at queue size {}.", + instanceId, batchQueue.size() ); + } + } + + releaseIfFirst(); + } + + @Override + public void queryCompleted(QueryState state) { + logger.debug( "[#{}] Received query completion: {}.", instanceId, state ); + releaseIfFirst(); + completed = true; + } + + QueryId getQueryId() { + return queryId; + } + + + /** + * Gets the next batch of query results from the queue. + * @return the next batch, or {@code null} after last batch has been returned + * @throws UserException + * if the query failed + * @throws InterruptedException + * if waiting on the queue was interrupted + */ + QueryDataBatch getNext() throws UserException, InterruptedException { + while (true) { + if (executionFailureException != null) { + logger.debug( "[#{}] Dequeued query failure exception: {}.", + instanceId, executionFailureException ); + throw executionFailureException; + } + if (completed && batchQueue.isEmpty()) { + return null; + } else { + QueryDataBatch qdb = batchQueue.poll(50, TimeUnit.MILLISECONDS); + if (qdb != null) { + lastDequeuedBatchNumber++; + logger.debug( "[#{}] Dequeued query data batch #{}: {}.", + instanceId, lastDequeuedBatchNumber, qdb ); + + // Unthrottle server if queue size has dropped enough below threshold: + if ( batchQueue.size() < batchQueueThrottlingThreshold / 2 + || batchQueue.size() == 0 // (in case threshold < 2) + ) { + if ( stopThrottlingIfSo() ) { + logger.debug( "[#{}] Throttling stopped at queue size {}.", + instanceId, batchQueue.size() ); + } + } + return qdb; + } + } + } + } + + void close() { + logger.debug( "[#{}] Query listener closing.", instanceId ); + closed = true; + if ( stopThrottlingIfSo() ) { + logger.debug( "[#{}] Throttling stopped at close() (at queue size {}).", + instanceId, batchQueue.size() ); + } + while (!batchQueue.isEmpty()) { + QueryDataBatch qdb = batchQueue.poll(); + if (qdb != null && qdb.getData() != null) { + qdb.getData().release(); + } + } + // Close may be called before the first result is received and therefore + // when the main thread is blocked waiting for the result. In that case + // we want to unblock the main thread. + firstMessageReceived.countDown(); // TODO: Why not call releaseIfFirst as used elsewhere? + completed = true; + } + + } + private static final Logger logger = getLogger( DrillCursor.class ); /** JDBC-specified string for unknown catalog, schema, and table names. */ private static final String UNKNOWN_NAME_STRING = ""; - /** The associated {@link java.sql.ResultSet} implementation. */ - private final DrillResultSetImpl resultSet; + private final DrillConnectionImpl connection; + private final AvaticaStatement statement; + private final Meta.Signature signature; /** Holds current batch of records (none before first load). */ private final RecordBatchLoader currentBatchHolder; - private final DrillResultSetImpl.ResultsListener resultsListener; + private final ResultsListener resultsListener; + private SchemaChangeListener changeListener; private final DrillAccessorList accessors = new DrillAccessorList(); @@ -85,6 +312,7 @@ class DrillCursor implements Cursor { /** Whether cursor is after the end of the sequence of records/rows. */ private boolean afterLastRow = false; + private int currentRowNumber = -1; /** Zero-based offset of current record in record batch. * (Not <i>row</i> number.) */ private int currentRecordNumber = -1; @@ -92,22 +320,42 @@ class DrillCursor implements Cursor { /** * - * @param resultSet the associated ResultSet implementation + * @param statement + * @param signature */ - DrillCursor(final DrillResultSetImpl resultSet) { - this.resultSet = resultSet; - currentBatchHolder = resultSet.batchLoader; - resultsListener = resultSet.resultsListener; - } - - DrillResultSetImpl getResultSet() { - return resultSet; + DrillCursor(DrillConnectionImpl connection, AvaticaStatement statement, Signature signature) { + this.connection = connection; + this.statement = statement; + this.signature = signature; + + DrillClient client = connection.getClient(); + final int batchQueueThrottlingThreshold = + client.getConfig().getInt( + ExecConstants.JDBC_BATCH_QUEUE_THROTTLING_THRESHOLD ); + resultsListener = new ResultsListener(batchQueueThrottlingThreshold); + currentBatchHolder = new RecordBatchLoader(client.getAllocator()); } protected int getCurrentRecordNumber() { return currentRecordNumber; } + public String getQueryId() { + if (resultsListener.getQueryId() != null) { + return QueryIdHelper.getQueryId(resultsListener.getQueryId()); + } else { + return null; + } + } + + public boolean isBeforeFirst() { + return currentRowNumber < 0; + } + + public boolean isAfterLast() { + return afterLastRow; + } + // (Overly restrictive Avatica uses List<Accessor> instead of List<? extends // Accessor>, so accessors/DrillAccessorList can't be of type // List<AvaticaDrillSqlAccessor>, and we have to cast from Accessor to @@ -119,6 +367,14 @@ class DrillCursor implements Cursor { return accessors; } + synchronized void cleanup() { + if (resultsListener.getQueryId() != null && ! resultsListener.completed) { + connection.getClient().cancelQuery(resultsListener.getQueryId()); + } + resultsListener.close(); + currentBatchHolder.clear(); + } + /** * Updates column accessors and metadata from current record batch. */ @@ -144,8 +400,8 @@ class DrillCursor implements Cursor { schema, getObjectClasses ); - if (getResultSet().changeListener != null) { - getResultSet().changeListener.schemaChanged(schema); + if (changeListener != null) { + changeListener.schemaChanged(schema); } } @@ -261,6 +517,7 @@ class DrillCursor implements Cursor { throw new IllegalStateException( "loadInitialSchema() called a second time" ); } + assert ! afterLastRow : "afterLastRow already true in loadInitialSchema()"; assert ! afterFirstBatch : "afterLastRow already true in loadInitialSchema()"; assert -1 == currentRecordNumber @@ -270,6 +527,26 @@ class DrillCursor implements Cursor { : "currentBatchHolder.getRecordCount() not 0 (is " + currentBatchHolder.getRecordCount() + " in loadInitialSchema()"; + if (statement instanceof DrillPreparedStatementImpl) { + DrillPreparedStatementImpl drillPreparedStatement = (DrillPreparedStatementImpl) statement; + connection.getClient().executePreparedStatement(drillPreparedStatement.getPreparedStatementHandle().getServerHandle(), resultsListener); + } else { + connection.getClient().runQuery(QueryType.SQL, signature.sql, resultsListener); + } + + try { + resultsListener.awaitFirstMessage(); + } catch ( InterruptedException e ) { + // Preserve evidence that the interruption occurred so that code higher up + // on the call stack can learn of the interruption and respond to it if it + // wants to. + Thread.currentThread().interrupt(); + + // Not normally expected--Drill doesn't interrupt in this area (right?)-- + // but JDBC client certainly could. + throw new SQLException("Interrupted", e ); + } + returnTrueForNextCallToNext = true; nextRowInternally(); @@ -297,26 +574,28 @@ class DrillCursor implements Cursor { return false; } else if ( returnTrueForNextCallToNext ) { + ++currentRowNumber; // We have a deferred "not after end" to report--reset and report that. returnTrueForNextCallToNext = false; return true; } else { accessors.clearLastColumnIndexedInRow(); - return nextRowInternally(); + boolean res = nextRowInternally(); + if (res) { ++ currentRowNumber; } + + return res; } } + public void cancel() { + close(); + } + @Override public void close() { - // currentBatchHolder is owned by resultSet and cleaned up by - // DrillResultSet.cleanup() - - // listener is owned by resultSet and cleaned up by - // DrillResultSet.cleanup() - // Clean up result set (to deallocate any buffers). - getResultSet().cleanup(); + cleanup(); // TODO: CHECK: Something might need to set statement.openResultSet to // null. Also, AvaticaResultSet.close() doesn't check whether already // closed and skip calls to cursor.close(), statement.onResultSetClose() http://git-wip-us.apache.org/repos/asf/drill/blob/ab60855b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java ---------------------------------------------------------------------- diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java index a2a7699..e406348 100644 --- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java +++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java @@ -40,10 +40,6 @@ import java.sql.Types; import java.util.Calendar; import java.util.Map; import java.util.TimeZone; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.calcite.avatica.AvaticaResultSet; import org.apache.calcite.avatica.AvaticaSite; @@ -51,23 +47,9 @@ import org.apache.calcite.avatica.AvaticaStatement; import org.apache.calcite.avatica.ColumnMetaData; import org.apache.calcite.avatica.Meta; import org.apache.calcite.avatica.util.Cursor; -import org.apache.drill.common.exceptions.UserException; -import org.apache.drill.exec.ExecConstants; -import org.apache.drill.exec.client.DrillClient; -import org.apache.drill.exec.proto.UserBitShared.QueryId; -import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; -import org.apache.drill.exec.proto.UserBitShared.QueryType; -import org.apache.drill.exec.proto.helper.QueryIdHelper; -import org.apache.drill.exec.record.RecordBatchLoader; -import org.apache.drill.exec.rpc.ConnectionThrottle; -import org.apache.drill.exec.rpc.user.QueryDataBatch; -import org.apache.drill.exec.rpc.user.UserResultsListener; import org.apache.drill.jdbc.AlreadyClosedSqlException; import org.apache.drill.jdbc.DrillResultSet; import org.apache.drill.jdbc.ExecutionCanceledSqlException; -import org.apache.drill.jdbc.SchemaChangeListener; - -import com.google.common.collect.Queues; /** @@ -79,29 +61,13 @@ class DrillResultSetImpl extends AvaticaResultSet implements DrillResultSet { org.slf4j.LoggerFactory.getLogger(DrillResultSetImpl.class); private final DrillConnectionImpl connection; - - SchemaChangeListener changeListener; - final ResultsListener resultsListener; - private final DrillClient client; - // TODO: Resolve: Since is barely manipulated here in DrillResultSetImpl, - // move down into DrillCursor and have this.clean() have cursor clean it. - final RecordBatchLoader batchLoader; - final DrillCursor cursor; - boolean hasPendingCancelationNotification; - + private volatile boolean hasPendingCancelationNotification = false; DrillResultSetImpl(AvaticaStatement statement, Meta.Signature signature, ResultSetMetaData resultSetMetaData, TimeZone timeZone, Meta.Frame firstFrame) { super(statement, signature, resultSetMetaData, timeZone, firstFrame); connection = (DrillConnectionImpl) statement.getConnection(); - client = connection.getClient(); - final int batchQueueThrottlingThreshold = - client.getConfig().getInt( - ExecConstants.JDBC_BATCH_QUEUE_THROTTLING_THRESHOLD ); - resultsListener = new ResultsListener(batchQueueThrottlingThreshold); - batchLoader = new RecordBatchLoader(client.getAllocator()); - cursor = new DrillCursor(this); } /** @@ -118,7 +84,7 @@ class DrillResultSetImpl extends AvaticaResultSet implements DrillResultSet { ExecutionCanceledSqlException, SQLException { if ( isClosed() ) { - if ( hasPendingCancelationNotification ) { + if (cursor instanceof DrillCursor && hasPendingCancelationNotification) { hasPendingCancelationNotification = false; throw new ExecutionCanceledSqlException( "SQL statement execution canceled; ResultSet now closed." ); @@ -139,17 +105,12 @@ class DrillResultSetImpl extends AvaticaResultSet implements DrillResultSet { @Override protected void cancel() { - hasPendingCancelationNotification = true; - cleanup(); - close(); - } - - synchronized void cleanup() { - if (resultsListener.getQueryId() != null && ! resultsListener.completed) { - client.cancelQuery(resultsListener.getQueryId()); + if (cursor instanceof DrillCursor) { + hasPendingCancelationNotification = true; + ((DrillCursor) cursor).cancel(); + } else { + super.cancel(); } - resultsListener.close(); - batchLoader.clear(); } //////////////////////////////////////// @@ -172,7 +133,7 @@ class DrillResultSetImpl extends AvaticaResultSet implements DrillResultSet { // cancellation) which in turn sets the cursor to null. So we must check // before we call next. // TODO: handle next() after close is called in the Avatica code. - if (super.cursor != null) { + if (cursor != null) { return super.next(); } else { return false; @@ -1900,11 +1861,10 @@ class DrillResultSetImpl extends AvaticaResultSet implements DrillResultSet { @Override public String getQueryId() throws SQLException { throwIfClosed(); - if (resultsListener.getQueryId() != null) { - return QueryIdHelper.getQueryId(resultsListener.getQueryId()); - } else { - return null; + if (cursor instanceof DrillCursor) { + return ((DrillCursor) cursor).getQueryId(); } + return null; } @@ -1912,249 +1872,15 @@ class DrillResultSetImpl extends AvaticaResultSet implements DrillResultSet { @Override protected DrillResultSetImpl execute() throws SQLException{ - if (statement instanceof DrillPreparedStatementImpl) { - DrillPreparedStatementImpl drillPreparedStatement = (DrillPreparedStatementImpl) statement; - client.executePreparedStatement(drillPreparedStatement.getPreparedStatementHandle().getServerHandle(), resultsListener); - } else { - client.runQuery(QueryType.SQL, this.signature.sql, resultsListener); - } connection.getDriver().handler.onStatementExecute(statement, null); - super.execute2(cursor, this.signature.columns); - - // don't return with metadata until we've achieved at least one return message. - try { - // TODO: Revisit: Why reaching directly into ResultsListener rather than - // calling some wait method? - resultsListener.latch.await(); - } catch ( InterruptedException e ) { - // Preserve evidence that the interruption occurred so that code higher up - // on the call stack can learn of the interruption and respond to it if it - // wants to. - Thread.currentThread().interrupt(); - - // Not normally expected--Drill doesn't interrupt in this area (right?)-- - // but JDBC client certainly could. - throw new SQLException( "Interrupted", e ); - } + DrillCursor drillCursor = new DrillCursor(connection, statement, signature); + super.execute2(drillCursor, this.signature.columns); // Read first (schema-only) batch to initialize result-set metadata from // (initial) schema before Statement.execute...(...) returns result set: - cursor.loadInitialSchema(); + drillCursor.loadInitialSchema(); return this; } - - - //////////////////////////////////////// - // ResultsListener: - - static class ResultsListener implements UserResultsListener { - private static final org.slf4j.Logger logger = - org.slf4j.LoggerFactory.getLogger(ResultsListener.class); - - private static volatile int nextInstanceId = 1; - - /** (Just for logging.) */ - private final int instanceId; - - private final int batchQueueThrottlingThreshold; - - /** (Just for logging.) */ - private volatile QueryId queryId; - - /** (Just for logging.) */ - private int lastReceivedBatchNumber; - /** (Just for logging.) */ - private int lastDequeuedBatchNumber; - - private volatile UserException executionFailureException; - - // TODO: Revisit "completed". Determine and document exactly what it - // means. Some uses imply that it means that incoming messages indicate - // that the _query_ has _terminated_ (not necessarily _completing_ - // normally), while some uses imply that it's some other state of the - // ResultListener. Some uses seem redundant.) - volatile boolean completed = false; - - /** Whether throttling of incoming data is active. */ - private final AtomicBoolean throttled = new AtomicBoolean( false ); - private volatile ConnectionThrottle throttle; - - private volatile boolean closed = false; - // TODO: Rename. It's obvious it's a latch--but what condition or action - // does it represent or control? - private CountDownLatch latch = new CountDownLatch(1); - private AtomicBoolean receivedMessage = new AtomicBoolean(false); - - final LinkedBlockingDeque<QueryDataBatch> batchQueue = - Queues.newLinkedBlockingDeque(); - - - /** - * ... - * @param batchQueueThrottlingThreshold - * queue size threshold for throttling server - */ - ResultsListener( int batchQueueThrottlingThreshold ) { - instanceId = nextInstanceId++; - this.batchQueueThrottlingThreshold = batchQueueThrottlingThreshold; - logger.debug( "[#{}] Query listener created.", instanceId ); - } - - /** - * Starts throttling if not currently throttling. - * @param throttle the "throttlable" object to throttle - * @return true if actually started (wasn't throttling already) - */ - private boolean startThrottlingIfNot( ConnectionThrottle throttle ) { - final boolean started = throttled.compareAndSet( false, true ); - if ( started ) { - this.throttle = throttle; - throttle.setAutoRead(false); - } - return started; - } - - /** - * Stops throttling if currently throttling. - * @return true if actually stopped (was throttling) - */ - private boolean stopThrottlingIfSo() { - final boolean stopped = throttled.compareAndSet( true, false ); - if ( stopped ) { - throttle.setAutoRead(true); - throttle = null; - } - return stopped; - } - - // TODO: Doc.: Release what if what is first relative to what? - private boolean releaseIfFirst() { - if (receivedMessage.compareAndSet(false, true)) { - latch.countDown(); - return true; - } - - return false; - } - - @Override - public void queryIdArrived(QueryId queryId) { - logger.debug( "[#{}] Received query ID: {}.", - instanceId, QueryIdHelper.getQueryId( queryId ) ); - this.queryId = queryId; - } - - @Override - public void submissionFailed(UserException ex) { - logger.debug( "Received query failure:", instanceId, ex ); - this.executionFailureException = ex; - completed = true; - close(); - logger.info( "[#{}] Query failed: ", instanceId, ex ); - } - - @Override - public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) { - lastReceivedBatchNumber++; - logger.debug( "[#{}] Received query data batch #{}: {}.", - instanceId, lastReceivedBatchNumber, result ); - - // If we're in a closed state, just release the message. - if (closed) { - result.release(); - // TODO: Revisit member completed: Is ResultListener really completed - // after only one data batch after being closed? - completed = true; - return; - } - - // We're active; let's add to the queue. - batchQueue.add(result); - - // Throttle server if queue size has exceed threshold. - if (batchQueue.size() > batchQueueThrottlingThreshold ) { - if ( startThrottlingIfNot( throttle ) ) { - logger.debug( "[#{}] Throttling started at queue size {}.", - instanceId, batchQueue.size() ); - } - } - - releaseIfFirst(); - } - - @Override - public void queryCompleted(QueryState state) { - logger.debug( "[#{}] Received query completion: {}.", instanceId, state ); - releaseIfFirst(); - completed = true; - } - - QueryId getQueryId() { - return queryId; - } - - - /** - * Gets the next batch of query results from the queue. - * @return the next batch, or {@code null} after last batch has been returned - * @throws UserException - * if the query failed - * @throws InterruptedException - * if waiting on the queue was interrupted - */ - QueryDataBatch getNext() throws UserException, InterruptedException { - while (true) { - if (executionFailureException != null) { - logger.debug( "[#{}] Dequeued query failure exception: {}.", - instanceId, executionFailureException ); - throw executionFailureException; - } - if (completed && batchQueue.isEmpty()) { - return null; - } else { - QueryDataBatch qdb = batchQueue.poll(50, TimeUnit.MILLISECONDS); - if (qdb != null) { - lastDequeuedBatchNumber++; - logger.debug( "[#{}] Dequeued query data batch #{}: {}.", - instanceId, lastDequeuedBatchNumber, qdb ); - - // Unthrottle server if queue size has dropped enough below threshold: - if ( batchQueue.size() < batchQueueThrottlingThreshold / 2 - || batchQueue.size() == 0 // (in case threshold < 2) - ) { - if ( stopThrottlingIfSo() ) { - logger.debug( "[#{}] Throttling stopped at queue size {}.", - instanceId, batchQueue.size() ); - } - } - return qdb; - } - } - } - } - - void close() { - logger.debug( "[#{}] Query listener closing.", instanceId ); - closed = true; - if ( stopThrottlingIfSo() ) { - logger.debug( "[#{}] Throttling stopped at close() (at queue size {}).", - instanceId, batchQueue.size() ); - } - while (!batchQueue.isEmpty()) { - QueryDataBatch qdb = batchQueue.poll(); - if (qdb != null && qdb.getData() != null) { - qdb.getData().release(); - } - } - // Close may be called before the first result is received and therefore - // when the main thread is blocked waiting for the result. In that case - // we want to unblock the main thread. - latch.countDown(); // TODO: Why not call releaseIfFirst as used elsewhere? - completed = true; - } - - } - }