Github user sudheeshkatkam commented on a diff in the pull request:

    https://github.com/apache/drill/pull/613#discussion_r86651825
  
    --- Diff: 
exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillCursor.java ---
    @@ -24,33 +24,260 @@
     import java.util.ArrayList;
     import java.util.Calendar;
     import java.util.List;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.LinkedBlockingDeque;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
     
     import org.apache.calcite.avatica.AvaticaResultSet;
    +import org.apache.calcite.avatica.AvaticaStatement;
     import org.apache.calcite.avatica.ColumnMetaData;
    +import org.apache.calcite.avatica.Meta;
    +import org.apache.calcite.avatica.Meta.Signature;
     import org.apache.calcite.avatica.util.ArrayImpl.Factory;
     import org.apache.calcite.avatica.util.Cursor;
     import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.exec.ExecConstants;
    +import org.apache.drill.exec.client.DrillClient;
     import org.apache.drill.exec.exception.SchemaChangeException;
    +import org.apache.drill.exec.proto.UserBitShared.QueryId;
    +import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
    +import org.apache.drill.exec.proto.UserBitShared.QueryType;
    +import org.apache.drill.exec.proto.helper.QueryIdHelper;
     import org.apache.drill.exec.record.BatchSchema;
     import org.apache.drill.exec.record.RecordBatchLoader;
    +import org.apache.drill.exec.rpc.ConnectionThrottle;
     import org.apache.drill.exec.rpc.user.QueryDataBatch;
    +import org.apache.drill.exec.rpc.user.UserResultsListener;
     import org.apache.drill.exec.store.ischema.InfoSchemaConstants;
    +import org.apache.drill.jdbc.SchemaChangeListener;
     import org.slf4j.Logger;
     
    +import com.google.common.collect.Queues;
    +
     
     class DrillCursor implements Cursor {
    +
    +  ////////////////////////////////////////
    +  // ResultsListener:
    +  static class ResultsListener implements UserResultsListener {
    +    private static final org.slf4j.Logger logger =
    +        org.slf4j.LoggerFactory.getLogger(ResultsListener.class);
    +
    +    private static volatile int nextInstanceId = 1;
    +
    +    /** (Just for logging.) */
    +    private final int instanceId;
    +
    +    private final int batchQueueThrottlingThreshold;
    +
    +    /** (Just for logging.) */
    +    private volatile QueryId queryId;
    +
    +    /** (Just for logging.) */
    +    private int lastReceivedBatchNumber;
    +    /** (Just for logging.) */
    +    private int lastDequeuedBatchNumber;
    +
    +    private volatile UserException executionFailureException;
    +
    +    // TODO:  Revisit "completed".  Determine and document exactly what it
    +    // means.  Some uses imply that it means that incoming messages 
indicate
    +    // that the _query_ has _terminated_ (not necessarily _completing_
    +    // normally), while some uses imply that it's some other state of the
    +    // ResultListener.  Some uses seem redundant.)
    +    volatile boolean completed = false;
    +
    +    /** Whether throttling of incoming data is active. */
    +    private final AtomicBoolean throttled = new AtomicBoolean( false );
    +    private volatile ConnectionThrottle throttle;
    +
    +    private volatile boolean closed = false;
    +
    +    private final CountDownLatch firstMessageReceived = new 
CountDownLatch(1);
    +
    +    final LinkedBlockingDeque<QueryDataBatch> batchQueue =
    +        Queues.newLinkedBlockingDeque();
    +
    +
    +    /**
    +     * ...
    +     * @param  batchQueueThrottlingThreshold
    +     *         queue size threshold for throttling server
    +     */
    +    ResultsListener( int batchQueueThrottlingThreshold ) {
    +      instanceId = nextInstanceId++;
    +      this.batchQueueThrottlingThreshold = batchQueueThrottlingThreshold;
    +      logger.debug( "[#{}] Query listener created.", instanceId );
    +    }
    +
    +    /**
    +     * Starts throttling if not currently throttling.
    +     * @param  throttle  the "throttlable" object to throttle
    +     * @return  true if actually started (wasn't throttling already)
    +     */
    +    private boolean startThrottlingIfNot( ConnectionThrottle throttle ) {
    +      final boolean started = throttled.compareAndSet( false, true );
    +      if ( started ) {
    +        this.throttle = throttle;
    +        throttle.setAutoRead(false);
    +      }
    +      return started;
    +    }
    +
    +    /**
    +     * Stops throttling if currently throttling.
    +     * @return  true if actually stopped (was throttling)
    +     */
    +    private boolean stopThrottlingIfSo() {
    +      final boolean stopped = throttled.compareAndSet( true, false );
    +      if ( stopped ) {
    +        throttle.setAutoRead(true);
    +        throttle = null;
    +      }
    +      return stopped;
    +    }
    +
    +    public void awaitFirstMessage() throws InterruptedException {
    +      firstMessageReceived.await();
    +    }
    +
    +    private void releaseIfFirst() {
    --- End diff --
    
    Is this impl changed intended?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to