DRILL-4994: Refactor DrillCursor

Refactor DrillCursor to be more self-contained.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/ab60855b
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/ab60855b
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/ab60855b

Branch: refs/heads/master
Commit: ab60855bf390e8f01369760f019ee06eecf1959e
Parents: e2b5271
Author: Laurent Goujon <laur...@dremio.com>
Authored: Fri Nov 4 13:31:19 2016 -0700
Committer: Jinfeng Ni <j...@apache.org>
Committed: Wed Mar 1 23:15:09 2017 -0800

----------------------------------------------------------------------
 .../jdbc/impl/AvaticaDrillSqlAccessor.java      |   4 +-
 .../org/apache/drill/jdbc/impl/DrillCursor.java | 323 +++++++++++++++++--
 .../drill/jdbc/impl/DrillResultSetImpl.java     | 302 +----------------
 3 files changed, 317 insertions(+), 312 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/ab60855b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/AvaticaDrillSqlAccessor.java
----------------------------------------------------------------------
diff --git 
a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/AvaticaDrillSqlAccessor.java
 
b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/AvaticaDrillSqlAccessor.java
index 5a48e59..914e279 100644
--- 
a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/AvaticaDrillSqlAccessor.java
+++ 
b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/AvaticaDrillSqlAccessor.java
@@ -64,11 +64,11 @@ class AvaticaDrillSqlAccessor implements Accessor {
     // so in that case row can be left at -1, so isBeforeFirst() returns true
     // even though we're not longer before the empty set of rows--and it's all
     // private, so we can't get to it to override any of several candidates.
-    if ( cursor.getResultSet().isAfterLast() ) {
+    if ( cursor.isAfterLast() ) {
       throw new InvalidCursorStateSqlException(
           "Result set cursor is already positioned past all rows." );
     }
-    else if ( cursor.getResultSet().isBeforeFirst() ) {
+    else if ( cursor.isBeforeFirst() ) {
       throw new InvalidCursorStateSqlException(
           "Result set cursor is positioned before all rows.  Call next() 
first." );
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/ab60855b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillCursor.java
----------------------------------------------------------------------
diff --git 
a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillCursor.java 
b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillCursor.java
index 08570a8..ed279a3 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillCursor.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillCursor.java
@@ -24,33 +24,260 @@ import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.calcite.avatica.AvaticaResultSet;
+import org.apache.calcite.avatica.AvaticaStatement;
 import org.apache.calcite.avatica.ColumnMetaData;
+import org.apache.calcite.avatica.Meta;
+import org.apache.calcite.avatica.Meta.Signature;
 import org.apache.calcite.avatica.util.ArrayImpl.Factory;
 import org.apache.calcite.avatica.util.Cursor;
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.client.DrillClient;
 import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
+import org.apache.drill.exec.proto.UserBitShared.QueryType;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.rpc.ConnectionThrottle;
 import org.apache.drill.exec.rpc.user.QueryDataBatch;
+import org.apache.drill.exec.rpc.user.UserResultsListener;
 import org.apache.drill.exec.store.ischema.InfoSchemaConstants;
+import org.apache.drill.jdbc.SchemaChangeListener;
 import org.slf4j.Logger;
 
+import com.google.common.collect.Queues;
+
 
 class DrillCursor implements Cursor {
+
+  ////////////////////////////////////////
+  // ResultsListener:
+  static class ResultsListener implements UserResultsListener {
+    private static final org.slf4j.Logger logger =
+        org.slf4j.LoggerFactory.getLogger(ResultsListener.class);
+
+    private static volatile int nextInstanceId = 1;
+
+    /** (Just for logging.) */
+    private final int instanceId;
+
+    private final int batchQueueThrottlingThreshold;
+
+    /** (Just for logging.) */
+    private volatile QueryId queryId;
+
+    /** (Just for logging.) */
+    private int lastReceivedBatchNumber;
+    /** (Just for logging.) */
+    private int lastDequeuedBatchNumber;
+
+    private volatile UserException executionFailureException;
+
+    // TODO:  Revisit "completed".  Determine and document exactly what it
+    // means.  Some uses imply that it means that incoming messages indicate
+    // that the _query_ has _terminated_ (not necessarily _completing_
+    // normally), while some uses imply that it's some other state of the
+    // ResultListener.  Some uses seem redundant.)
+    volatile boolean completed = false;
+
+    /** Whether throttling of incoming data is active. */
+    private final AtomicBoolean throttled = new AtomicBoolean( false );
+    private volatile ConnectionThrottle throttle;
+
+    private volatile boolean closed = false;
+
+    private final CountDownLatch firstMessageReceived = new CountDownLatch(1);
+
+    final LinkedBlockingDeque<QueryDataBatch> batchQueue =
+        Queues.newLinkedBlockingDeque();
+
+
+    /**
+     * ...
+     * @param  batchQueueThrottlingThreshold
+     *         queue size threshold for throttling server
+     */
+    ResultsListener( int batchQueueThrottlingThreshold ) {
+      instanceId = nextInstanceId++;
+      this.batchQueueThrottlingThreshold = batchQueueThrottlingThreshold;
+      logger.debug( "[#{}] Query listener created.", instanceId );
+    }
+
+    /**
+     * Starts throttling if not currently throttling.
+     * @param  throttle  the "throttlable" object to throttle
+     * @return  true if actually started (wasn't throttling already)
+     */
+    private boolean startThrottlingIfNot( ConnectionThrottle throttle ) {
+      final boolean started = throttled.compareAndSet( false, true );
+      if ( started ) {
+        this.throttle = throttle;
+        throttle.setAutoRead(false);
+      }
+      return started;
+    }
+
+    /**
+     * Stops throttling if currently throttling.
+     * @return  true if actually stopped (was throttling)
+     */
+    private boolean stopThrottlingIfSo() {
+      final boolean stopped = throttled.compareAndSet( true, false );
+      if ( stopped ) {
+        throttle.setAutoRead(true);
+        throttle = null;
+      }
+      return stopped;
+    }
+
+    public void awaitFirstMessage() throws InterruptedException {
+      firstMessageReceived.await();
+    }
+
+    private void releaseIfFirst() {
+      firstMessageReceived.countDown();
+    }
+
+    @Override
+    public void queryIdArrived(QueryId queryId) {
+      logger.debug( "[#{}] Received query ID: {}.",
+                    instanceId, QueryIdHelper.getQueryId( queryId ) );
+      this.queryId = queryId;
+    }
+
+    @Override
+    public void submissionFailed(UserException ex) {
+      logger.debug( "Received query failure:", instanceId, ex );
+      this.executionFailureException = ex;
+      completed = true;
+      close();
+      logger.info( "[#{}] Query failed: ", instanceId, ex );
+    }
+
+    @Override
+    public void dataArrived(QueryDataBatch result, ConnectionThrottle 
throttle) {
+      lastReceivedBatchNumber++;
+      logger.debug( "[#{}] Received query data batch #{}: {}.",
+                    instanceId, lastReceivedBatchNumber, result );
+
+      // If we're in a closed state, just release the message.
+      if (closed) {
+        result.release();
+        // TODO:  Revisit member completed:  Is ResultListener really completed
+        // after only one data batch after being closed?
+        completed = true;
+        return;
+      }
+
+      // We're active; let's add to the queue.
+      batchQueue.add(result);
+
+      // Throttle server if queue size has exceed threshold.
+      if (batchQueue.size() > batchQueueThrottlingThreshold ) {
+        if ( startThrottlingIfNot( throttle ) ) {
+          logger.debug( "[#{}] Throttling started at queue size {}.",
+                        instanceId, batchQueue.size() );
+        }
+      }
+
+      releaseIfFirst();
+    }
+
+    @Override
+    public void queryCompleted(QueryState state) {
+      logger.debug( "[#{}] Received query completion: {}.", instanceId, state 
);
+      releaseIfFirst();
+      completed = true;
+    }
+
+    QueryId getQueryId() {
+      return queryId;
+    }
+
+
+    /**
+     * Gets the next batch of query results from the queue.
+     * @return  the next batch, or {@code null} after last batch has been 
returned
+     * @throws UserException
+     *         if the query failed
+     * @throws InterruptedException
+     *         if waiting on the queue was interrupted
+     */
+    QueryDataBatch getNext() throws UserException, InterruptedException {
+      while (true) {
+        if (executionFailureException != null) {
+          logger.debug( "[#{}] Dequeued query failure exception: {}.",
+                        instanceId, executionFailureException );
+          throw executionFailureException;
+        }
+        if (completed && batchQueue.isEmpty()) {
+          return null;
+        } else {
+          QueryDataBatch qdb = batchQueue.poll(50, TimeUnit.MILLISECONDS);
+          if (qdb != null) {
+            lastDequeuedBatchNumber++;
+            logger.debug( "[#{}] Dequeued query data batch #{}: {}.",
+                          instanceId, lastDequeuedBatchNumber, qdb );
+
+            // Unthrottle server if queue size has dropped enough below 
threshold:
+            if ( batchQueue.size() < batchQueueThrottlingThreshold / 2
+                 || batchQueue.size() == 0  // (in case threshold < 2)
+                 ) {
+              if ( stopThrottlingIfSo() ) {
+                logger.debug( "[#{}] Throttling stopped at queue size {}.",
+                              instanceId, batchQueue.size() );
+              }
+            }
+            return qdb;
+          }
+        }
+      }
+    }
+
+    void close() {
+      logger.debug( "[#{}] Query listener closing.", instanceId );
+      closed = true;
+      if ( stopThrottlingIfSo() ) {
+        logger.debug( "[#{}] Throttling stopped at close() (at queue size 
{}).",
+                      instanceId, batchQueue.size() );
+      }
+      while (!batchQueue.isEmpty()) {
+        QueryDataBatch qdb = batchQueue.poll();
+        if (qdb != null && qdb.getData() != null) {
+          qdb.getData().release();
+        }
+      }
+      // Close may be called before the first result is received and therefore
+      // when the main thread is blocked waiting for the result.  In that case
+      // we want to unblock the main thread.
+      firstMessageReceived.countDown(); // TODO:  Why not call releaseIfFirst 
as used elsewhere?
+      completed = true;
+    }
+
+  }
+
   private static final Logger logger = getLogger( DrillCursor.class );
 
   /** JDBC-specified string for unknown catalog, schema, and table names. */
   private static final String UNKNOWN_NAME_STRING = "";
 
-  /** The associated {@link java.sql.ResultSet} implementation. */
-  private final DrillResultSetImpl resultSet;
+  private final DrillConnectionImpl connection;
+  private final AvaticaStatement statement;
+  private final Meta.Signature signature;
 
   /** Holds current batch of records (none before first load). */
   private final RecordBatchLoader currentBatchHolder;
 
-  private final DrillResultSetImpl.ResultsListener resultsListener;
+  private final ResultsListener resultsListener;
+  private SchemaChangeListener changeListener;
 
   private final DrillAccessorList accessors = new DrillAccessorList();
 
@@ -85,6 +312,7 @@ class DrillCursor implements Cursor {
   /** Whether cursor is after the end of the sequence of records/rows. */
   private boolean afterLastRow = false;
 
+  private int currentRowNumber = -1;
   /** Zero-based offset of current record in record batch.
    * (Not <i>row</i> number.) */
   private int currentRecordNumber = -1;
@@ -92,22 +320,42 @@ class DrillCursor implements Cursor {
 
   /**
    *
-   * @param  resultSet  the associated ResultSet implementation
+   * @param statement
+   * @param signature
    */
-  DrillCursor(final DrillResultSetImpl resultSet) {
-    this.resultSet = resultSet;
-    currentBatchHolder = resultSet.batchLoader;
-    resultsListener = resultSet.resultsListener;
-  }
-
-  DrillResultSetImpl getResultSet() {
-    return resultSet;
+  DrillCursor(DrillConnectionImpl connection, AvaticaStatement statement, 
Signature signature) {
+    this.connection = connection;
+    this.statement = statement;
+    this.signature = signature;
+
+    DrillClient client = connection.getClient();
+    final int batchQueueThrottlingThreshold =
+        client.getConfig().getInt(
+            ExecConstants.JDBC_BATCH_QUEUE_THROTTLING_THRESHOLD );
+    resultsListener = new ResultsListener(batchQueueThrottlingThreshold);
+    currentBatchHolder = new RecordBatchLoader(client.getAllocator());
   }
 
   protected int getCurrentRecordNumber() {
     return currentRecordNumber;
   }
 
+  public String getQueryId() {
+    if (resultsListener.getQueryId() != null) {
+      return QueryIdHelper.getQueryId(resultsListener.getQueryId());
+    } else {
+      return null;
+    }
+  }
+
+  public boolean isBeforeFirst() {
+    return currentRowNumber < 0;
+  }
+
+  public boolean isAfterLast() {
+    return afterLastRow;
+  }
+
   // (Overly restrictive Avatica uses List<Accessor> instead of List<? extends
   // Accessor>, so accessors/DrillAccessorList can't be of type
   // List<AvaticaDrillSqlAccessor>, and we have to cast from Accessor to
@@ -119,6 +367,14 @@ class DrillCursor implements Cursor {
     return accessors;
   }
 
+  synchronized void cleanup() {
+    if (resultsListener.getQueryId() != null && ! resultsListener.completed) {
+      connection.getClient().cancelQuery(resultsListener.getQueryId());
+    }
+    resultsListener.close();
+    currentBatchHolder.clear();
+  }
+
   /**
    * Updates column accessors and metadata from current record batch.
    */
@@ -144,8 +400,8 @@ class DrillCursor implements Cursor {
         schema,
         getObjectClasses );
 
-    if (getResultSet().changeListener != null) {
-      getResultSet().changeListener.schemaChanged(schema);
+    if (changeListener != null) {
+      changeListener.schemaChanged(schema);
     }
   }
 
@@ -261,6 +517,7 @@ class DrillCursor implements Cursor {
       throw new IllegalStateException(
           "loadInitialSchema() called a second time" );
     }
+
     assert ! afterLastRow : "afterLastRow already true in loadInitialSchema()";
     assert ! afterFirstBatch : "afterLastRow already true in 
loadInitialSchema()";
     assert -1 == currentRecordNumber
@@ -270,6 +527,26 @@ class DrillCursor implements Cursor {
         : "currentBatchHolder.getRecordCount() not 0 (is "
           + currentBatchHolder.getRecordCount() + " in loadInitialSchema()";
 
+    if (statement instanceof DrillPreparedStatementImpl) {
+      DrillPreparedStatementImpl drillPreparedStatement = 
(DrillPreparedStatementImpl) statement;
+      
connection.getClient().executePreparedStatement(drillPreparedStatement.getPreparedStatementHandle().getServerHandle(),
 resultsListener);
+    } else {
+      connection.getClient().runQuery(QueryType.SQL, signature.sql, 
resultsListener);
+    }
+
+    try {
+      resultsListener.awaitFirstMessage();
+    } catch ( InterruptedException e ) {
+      // Preserve evidence that the interruption occurred so that code higher 
up
+      // on the call stack can learn of the interruption and respond to it if 
it
+      // wants to.
+      Thread.currentThread().interrupt();
+
+      // Not normally expected--Drill doesn't interrupt in this area (right?)--
+      // but JDBC client certainly could.
+      throw new SQLException("Interrupted", e );
+    }
+
     returnTrueForNextCallToNext = true;
 
     nextRowInternally();
@@ -297,26 +574,28 @@ class DrillCursor implements Cursor {
       return false;
     }
     else if ( returnTrueForNextCallToNext ) {
+      ++currentRowNumber;
       // We have a deferred "not after end" to report--reset and report that.
       returnTrueForNextCallToNext = false;
       return true;
     }
     else {
       accessors.clearLastColumnIndexedInRow();
-      return nextRowInternally();
+      boolean res = nextRowInternally();
+      if (res) { ++ currentRowNumber; }
+
+      return res;
     }
   }
 
+  public void cancel() {
+    close();
+  }
+
   @Override
   public void close() {
-    // currentBatchHolder is owned by resultSet and cleaned up by
-    // DrillResultSet.cleanup()
-
-    // listener is owned by resultSet and cleaned up by
-    // DrillResultSet.cleanup()
-
     // Clean up result set (to deallocate any buffers).
-    getResultSet().cleanup();
+    cleanup();
     // TODO:  CHECK:  Something might need to set statement.openResultSet to
     // null.  Also, AvaticaResultSet.close() doesn't check whether already
     // closed and skip calls to cursor.close(), statement.onResultSetClose()

http://git-wip-us.apache.org/repos/asf/drill/blob/ab60855b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
----------------------------------------------------------------------
diff --git 
a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java 
b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
index a2a7699..e406348 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
@@ -40,10 +40,6 @@ import java.sql.Types;
 import java.util.Calendar;
 import java.util.Map;
 import java.util.TimeZone;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.calcite.avatica.AvaticaResultSet;
 import org.apache.calcite.avatica.AvaticaSite;
@@ -51,23 +47,9 @@ import org.apache.calcite.avatica.AvaticaStatement;
 import org.apache.calcite.avatica.ColumnMetaData;
 import org.apache.calcite.avatica.Meta;
 import org.apache.calcite.avatica.util.Cursor;
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.client.DrillClient;
-import org.apache.drill.exec.proto.UserBitShared.QueryId;
-import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
-import org.apache.drill.exec.proto.UserBitShared.QueryType;
-import org.apache.drill.exec.proto.helper.QueryIdHelper;
-import org.apache.drill.exec.record.RecordBatchLoader;
-import org.apache.drill.exec.rpc.ConnectionThrottle;
-import org.apache.drill.exec.rpc.user.QueryDataBatch;
-import org.apache.drill.exec.rpc.user.UserResultsListener;
 import org.apache.drill.jdbc.AlreadyClosedSqlException;
 import org.apache.drill.jdbc.DrillResultSet;
 import org.apache.drill.jdbc.ExecutionCanceledSqlException;
-import org.apache.drill.jdbc.SchemaChangeListener;
-
-import com.google.common.collect.Queues;
 
 
 /**
@@ -79,29 +61,13 @@ class DrillResultSetImpl extends AvaticaResultSet 
implements DrillResultSet {
       org.slf4j.LoggerFactory.getLogger(DrillResultSetImpl.class);
 
   private final DrillConnectionImpl connection;
-
-  SchemaChangeListener changeListener;
-  final ResultsListener resultsListener;
-  private final DrillClient client;
-  // TODO:  Resolve:  Since is barely manipulated here in DrillResultSetImpl,
-  //  move down into DrillCursor and have this.clean() have cursor clean it.
-  final RecordBatchLoader batchLoader;
-  final DrillCursor cursor;
-  boolean hasPendingCancelationNotification;
-
+  private volatile boolean hasPendingCancelationNotification = false;
 
   DrillResultSetImpl(AvaticaStatement statement, Meta.Signature signature,
                      ResultSetMetaData resultSetMetaData, TimeZone timeZone,
                      Meta.Frame firstFrame) {
     super(statement, signature, resultSetMetaData, timeZone, firstFrame);
     connection = (DrillConnectionImpl) statement.getConnection();
-    client = connection.getClient();
-    final int batchQueueThrottlingThreshold =
-        client.getConfig().getInt(
-            ExecConstants.JDBC_BATCH_QUEUE_THROTTLING_THRESHOLD );
-    resultsListener = new ResultsListener(batchQueueThrottlingThreshold);
-    batchLoader = new RecordBatchLoader(client.getAllocator());
-    cursor = new DrillCursor(this);
   }
 
   /**
@@ -118,7 +84,7 @@ class DrillResultSetImpl extends AvaticaResultSet implements 
DrillResultSet {
                                       ExecutionCanceledSqlException,
                                       SQLException {
     if ( isClosed() ) {
-      if ( hasPendingCancelationNotification ) {
+      if (cursor instanceof DrillCursor && hasPendingCancelationNotification) {
         hasPendingCancelationNotification = false;
         throw new ExecutionCanceledSqlException(
             "SQL statement execution canceled; ResultSet now closed." );
@@ -139,17 +105,12 @@ class DrillResultSetImpl extends AvaticaResultSet 
implements DrillResultSet {
 
   @Override
   protected void cancel() {
-    hasPendingCancelationNotification = true;
-    cleanup();
-    close();
-  }
-
-  synchronized void cleanup() {
-    if (resultsListener.getQueryId() != null && ! resultsListener.completed) {
-      client.cancelQuery(resultsListener.getQueryId());
+    if (cursor instanceof DrillCursor) {
+      hasPendingCancelationNotification = true;
+      ((DrillCursor) cursor).cancel();
+    } else {
+      super.cancel();
     }
-    resultsListener.close();
-    batchLoader.clear();
   }
 
   ////////////////////////////////////////
@@ -172,7 +133,7 @@ class DrillResultSetImpl extends AvaticaResultSet 
implements DrillResultSet {
     // cancellation) which in turn sets the cursor to null.  So we must check
     // before we call next.
     // TODO: handle next() after close is called in the Avatica code.
-    if (super.cursor != null) {
+    if (cursor != null) {
       return super.next();
     } else {
       return false;
@@ -1900,11 +1861,10 @@ class DrillResultSetImpl extends AvaticaResultSet 
implements DrillResultSet {
   @Override
   public String getQueryId() throws SQLException {
     throwIfClosed();
-    if (resultsListener.getQueryId() != null) {
-      return QueryIdHelper.getQueryId(resultsListener.getQueryId());
-    } else {
-      return null;
+    if (cursor instanceof DrillCursor) {
+      return ((DrillCursor) cursor).getQueryId();
     }
+    return null;
   }
 
 
@@ -1912,249 +1872,15 @@ class DrillResultSetImpl extends AvaticaResultSet 
implements DrillResultSet {
 
   @Override
   protected DrillResultSetImpl execute() throws SQLException{
-    if (statement instanceof DrillPreparedStatementImpl) {
-      DrillPreparedStatementImpl drillPreparedStatement = 
(DrillPreparedStatementImpl) statement;
-      
client.executePreparedStatement(drillPreparedStatement.getPreparedStatementHandle().getServerHandle(),
 resultsListener);
-    } else {
-      client.runQuery(QueryType.SQL, this.signature.sql, resultsListener);
-    }
     connection.getDriver().handler.onStatementExecute(statement, null);
 
-    super.execute2(cursor, this.signature.columns);
-
-    // don't return with metadata until we've achieved at least one return 
message.
-    try {
-      // TODO:  Revisit:  Why reaching directly into ResultsListener rather 
than
-      // calling some wait method?
-      resultsListener.latch.await();
-    } catch ( InterruptedException e ) {
-      // Preserve evidence that the interruption occurred so that code higher 
up
-      // on the call stack can learn of the interruption and respond to it if 
it
-      // wants to.
-      Thread.currentThread().interrupt();
-
-      // Not normally expected--Drill doesn't interrupt in this area (right?)--
-      // but JDBC client certainly could.
-      throw new SQLException( "Interrupted", e );
-    }
+    DrillCursor drillCursor = new DrillCursor(connection, statement, 
signature);
+    super.execute2(drillCursor, this.signature.columns);
 
     // Read first (schema-only) batch to initialize result-set metadata from
     // (initial) schema before Statement.execute...(...) returns result set:
-    cursor.loadInitialSchema();
+    drillCursor.loadInitialSchema();
 
     return this;
   }
-
-
-  ////////////////////////////////////////
-  // ResultsListener:
-
-  static class ResultsListener implements UserResultsListener {
-    private static final org.slf4j.Logger logger =
-        org.slf4j.LoggerFactory.getLogger(ResultsListener.class);
-
-    private static volatile int nextInstanceId = 1;
-
-    /** (Just for logging.) */
-    private final int instanceId;
-
-    private final int batchQueueThrottlingThreshold;
-
-    /** (Just for logging.) */
-    private volatile QueryId queryId;
-
-    /** (Just for logging.) */
-    private int lastReceivedBatchNumber;
-    /** (Just for logging.) */
-    private int lastDequeuedBatchNumber;
-
-    private volatile UserException executionFailureException;
-
-    // TODO:  Revisit "completed".  Determine and document exactly what it
-    // means.  Some uses imply that it means that incoming messages indicate
-    // that the _query_ has _terminated_ (not necessarily _completing_
-    // normally), while some uses imply that it's some other state of the
-    // ResultListener.  Some uses seem redundant.)
-    volatile boolean completed = false;
-
-    /** Whether throttling of incoming data is active. */
-    private final AtomicBoolean throttled = new AtomicBoolean( false );
-    private volatile ConnectionThrottle throttle;
-
-    private volatile boolean closed = false;
-    // TODO:  Rename.  It's obvious it's a latch--but what condition or action
-    // does it represent or control?
-    private CountDownLatch latch = new CountDownLatch(1);
-    private AtomicBoolean receivedMessage = new AtomicBoolean(false);
-
-    final LinkedBlockingDeque<QueryDataBatch> batchQueue =
-        Queues.newLinkedBlockingDeque();
-
-
-    /**
-     * ...
-     * @param  batchQueueThrottlingThreshold
-     *         queue size threshold for throttling server
-     */
-    ResultsListener( int batchQueueThrottlingThreshold ) {
-      instanceId = nextInstanceId++;
-      this.batchQueueThrottlingThreshold = batchQueueThrottlingThreshold;
-      logger.debug( "[#{}] Query listener created.", instanceId );
-    }
-
-    /**
-     * Starts throttling if not currently throttling.
-     * @param  throttle  the "throttlable" object to throttle
-     * @return  true if actually started (wasn't throttling already)
-     */
-    private boolean startThrottlingIfNot( ConnectionThrottle throttle ) {
-      final boolean started = throttled.compareAndSet( false, true );
-      if ( started ) {
-        this.throttle = throttle;
-        throttle.setAutoRead(false);
-      }
-      return started;
-    }
-
-    /**
-     * Stops throttling if currently throttling.
-     * @return  true if actually stopped (was throttling)
-     */
-    private boolean stopThrottlingIfSo() {
-      final boolean stopped = throttled.compareAndSet( true, false );
-      if ( stopped ) {
-        throttle.setAutoRead(true);
-        throttle = null;
-      }
-      return stopped;
-    }
-
-    // TODO:  Doc.:  Release what if what is first relative to what?
-    private boolean releaseIfFirst() {
-      if (receivedMessage.compareAndSet(false, true)) {
-        latch.countDown();
-        return true;
-      }
-
-      return false;
-    }
-
-    @Override
-    public void queryIdArrived(QueryId queryId) {
-      logger.debug( "[#{}] Received query ID: {}.",
-                    instanceId, QueryIdHelper.getQueryId( queryId ) );
-      this.queryId = queryId;
-    }
-
-    @Override
-    public void submissionFailed(UserException ex) {
-      logger.debug( "Received query failure:", instanceId, ex );
-      this.executionFailureException = ex;
-      completed = true;
-      close();
-      logger.info( "[#{}] Query failed: ", instanceId, ex );
-    }
-
-    @Override
-    public void dataArrived(QueryDataBatch result, ConnectionThrottle 
throttle) {
-      lastReceivedBatchNumber++;
-      logger.debug( "[#{}] Received query data batch #{}: {}.",
-                    instanceId, lastReceivedBatchNumber, result );
-
-      // If we're in a closed state, just release the message.
-      if (closed) {
-        result.release();
-        // TODO:  Revisit member completed:  Is ResultListener really completed
-        // after only one data batch after being closed?
-        completed = true;
-        return;
-      }
-
-      // We're active; let's add to the queue.
-      batchQueue.add(result);
-
-      // Throttle server if queue size has exceed threshold.
-      if (batchQueue.size() > batchQueueThrottlingThreshold ) {
-        if ( startThrottlingIfNot( throttle ) ) {
-          logger.debug( "[#{}] Throttling started at queue size {}.",
-                        instanceId, batchQueue.size() );
-        }
-      }
-
-      releaseIfFirst();
-    }
-
-    @Override
-    public void queryCompleted(QueryState state) {
-      logger.debug( "[#{}] Received query completion: {}.", instanceId, state 
);
-      releaseIfFirst();
-      completed = true;
-    }
-
-    QueryId getQueryId() {
-      return queryId;
-    }
-
-
-    /**
-     * Gets the next batch of query results from the queue.
-     * @return  the next batch, or {@code null} after last batch has been 
returned
-     * @throws UserException
-     *         if the query failed
-     * @throws InterruptedException
-     *         if waiting on the queue was interrupted
-     */
-    QueryDataBatch getNext() throws UserException, InterruptedException {
-      while (true) {
-        if (executionFailureException != null) {
-          logger.debug( "[#{}] Dequeued query failure exception: {}.",
-                        instanceId, executionFailureException );
-          throw executionFailureException;
-        }
-        if (completed && batchQueue.isEmpty()) {
-          return null;
-        } else {
-          QueryDataBatch qdb = batchQueue.poll(50, TimeUnit.MILLISECONDS);
-          if (qdb != null) {
-            lastDequeuedBatchNumber++;
-            logger.debug( "[#{}] Dequeued query data batch #{}: {}.",
-                          instanceId, lastDequeuedBatchNumber, qdb );
-
-            // Unthrottle server if queue size has dropped enough below 
threshold:
-            if ( batchQueue.size() < batchQueueThrottlingThreshold / 2
-                 || batchQueue.size() == 0  // (in case threshold < 2)
-                 ) {
-              if ( stopThrottlingIfSo() ) {
-                logger.debug( "[#{}] Throttling stopped at queue size {}.",
-                              instanceId, batchQueue.size() );
-              }
-            }
-            return qdb;
-          }
-        }
-      }
-    }
-
-    void close() {
-      logger.debug( "[#{}] Query listener closing.", instanceId );
-      closed = true;
-      if ( stopThrottlingIfSo() ) {
-        logger.debug( "[#{}] Throttling stopped at close() (at queue size 
{}).",
-                      instanceId, batchQueue.size() );
-      }
-      while (!batchQueue.isEmpty()) {
-        QueryDataBatch qdb = batchQueue.poll();
-        if (qdb != null && qdb.getData() != null) {
-          qdb.getData().release();
-        }
-      }
-      // Close may be called before the first result is received and therefore
-      // when the main thread is blocked waiting for the result.  In that case
-      // we want to unblock the main thread.
-      latch.countDown(); // TODO:  Why not call releaseIfFirst as used 
elsewhere?
-      completed = true;
-    }
-
-  }
-
 }

Reply via email to