[
https://issues.apache.org/jira/browse/DRILL-4730?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15866110#comment-15866110
]
ASF GitHub Bot commented on DRILL-4730:
---------------------------------------
Github user sudheeshkatkam commented on a diff in the pull request:
https://github.com/apache/drill/pull/613#discussion_r86651825
--- Diff:
exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillCursor.java ---
@@ -24,33 +24,260 @@
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.calcite.avatica.AvaticaResultSet;
+import org.apache.calcite.avatica.AvaticaStatement;
import org.apache.calcite.avatica.ColumnMetaData;
+import org.apache.calcite.avatica.Meta;
+import org.apache.calcite.avatica.Meta.Signature;
import org.apache.calcite.avatica.util.ArrayImpl.Factory;
import org.apache.calcite.avatica.util.Cursor;
import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.client.DrillClient;
import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
+import org.apache.drill.exec.proto.UserBitShared.QueryType;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.rpc.ConnectionThrottle;
import org.apache.drill.exec.rpc.user.QueryDataBatch;
+import org.apache.drill.exec.rpc.user.UserResultsListener;
import org.apache.drill.exec.store.ischema.InfoSchemaConstants;
+import org.apache.drill.jdbc.SchemaChangeListener;
import org.slf4j.Logger;
+import com.google.common.collect.Queues;
+
class DrillCursor implements Cursor {
+
+ ////////////////////////////////////////
+ // ResultsListener:
+ static class ResultsListener implements UserResultsListener {
+ private static final org.slf4j.Logger logger =
+ org.slf4j.LoggerFactory.getLogger(ResultsListener.class);
+
+ private static volatile int nextInstanceId = 1;
+
+ /** (Just for logging.) */
+ private final int instanceId;
+
+ private final int batchQueueThrottlingThreshold;
+
+ /** (Just for logging.) */
+ private volatile QueryId queryId;
+
+ /** (Just for logging.) */
+ private int lastReceivedBatchNumber;
+ /** (Just for logging.) */
+ private int lastDequeuedBatchNumber;
+
+ private volatile UserException executionFailureException;
+
+ // TODO: Revisit "completed". Determine and document exactly what it
+ // means. Some uses imply that it means that incoming messages
indicate
+ // that the _query_ has _terminated_ (not necessarily _completing_
+ // normally), while some uses imply that it's some other state of the
+ // ResultListener. Some uses seem redundant.)
+ volatile boolean completed = false;
+
+ /** Whether throttling of incoming data is active. */
+ private final AtomicBoolean throttled = new AtomicBoolean( false );
+ private volatile ConnectionThrottle throttle;
+
+ private volatile boolean closed = false;
+
+ private final CountDownLatch firstMessageReceived = new
CountDownLatch(1);
+
+ final LinkedBlockingDeque<QueryDataBatch> batchQueue =
+ Queues.newLinkedBlockingDeque();
+
+
+ /**
+ * ...
+ * @param batchQueueThrottlingThreshold
+ * queue size threshold for throttling server
+ */
+ ResultsListener( int batchQueueThrottlingThreshold ) {
+ instanceId = nextInstanceId++;
+ this.batchQueueThrottlingThreshold = batchQueueThrottlingThreshold;
+ logger.debug( "[#{}] Query listener created.", instanceId );
+ }
+
+ /**
+ * Starts throttling if not currently throttling.
+ * @param throttle the "throttlable" object to throttle
+ * @return true if actually started (wasn't throttling already)
+ */
+ private boolean startThrottlingIfNot( ConnectionThrottle throttle ) {
+ final boolean started = throttled.compareAndSet( false, true );
+ if ( started ) {
+ this.throttle = throttle;
+ throttle.setAutoRead(false);
+ }
+ return started;
+ }
+
+ /**
+ * Stops throttling if currently throttling.
+ * @return true if actually stopped (was throttling)
+ */
+ private boolean stopThrottlingIfSo() {
+ final boolean stopped = throttled.compareAndSet( true, false );
+ if ( stopped ) {
+ throttle.setAutoRead(true);
+ throttle = null;
+ }
+ return stopped;
+ }
+
+ public void awaitFirstMessage() throws InterruptedException {
+ firstMessageReceived.await();
+ }
+
+ private void releaseIfFirst() {
--- End diff --
Is this impl changed intended?
> Update JDBC DatabaseMetaData implementation to use new Metadata APIs
> --------------------------------------------------------------------
>
> Key: DRILL-4730
> URL: https://issues.apache.org/jira/browse/DRILL-4730
> Project: Apache Drill
> Issue Type: Sub-task
> Components: Client - JDBC
> Reporter: Venki Korukanti
> Assignee: Laurent Goujon
>
> DRILL-4728 is going to add support for new metadata APIs. Replace the
> INFORMATION_SCHEMA queries used to get the metadata with the new APIs
> provided in Java client.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)