DRILL-2884: 2-Hygiene: Associated cleanup, mostly of DrillResultSetImpl.
- Re-ordered UserResultsListener methods logically (chronologically).
(In UserResultsListener too.)
- Re-ordered some other ResultsListener methods more logically
(chronologically).
- Added ResultsListener logging (incoming and outgoing).
- Moved queryId down into ResultListener so ResultListener doesn't need to
reach up into DrillResultSetImpl; changed from inner to nested class.
- Renamed:
- Fixed typo "resultslistener" -> "resultsListener"
- MAX -> THROTTLING_QUERY_SIZE_THRESHOLD, queue -> batchQueue,
ex -> executionFailureException
- Added, purged some TODOs.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/7b776e7f
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/7b776e7f
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/7b776e7f
Branch: refs/heads/master
Commit: 7b776e7fb94b1b2b57fe3bc0b586047c47a5f042
Parents: f9efc3b
Author: dbarclay <[email protected]>
Authored: Sat Apr 25 21:23:10 2015 -0700
Committer: Parth Chandra <[email protected]>
Committed: Tue May 5 17:40:13 2015 -0700
----------------------------------------------------------------------
.../exec/rpc/user/UserResultsListener.java | 14 +--
.../java/org/apache/drill/jdbc/DrillCursor.java | 2 +-
.../drill/jdbc/impl/DrillResultSetImpl.java | 112 ++++++++++++-------
3 files changed, 77 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/7b776e7f/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
----------------------------------------------------------------------
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
index e422a3f..01a44b8 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
@@ -37,17 +37,17 @@ public interface UserResultsListener {
void submissionFailed(UserException ex);
/**
- * The query has completed (successsful completion or cancellation). The
listener will not receive any other
- * data or result message. Called when the server returns a terminal-non
failing- state (COMPLETED or CANCELLED)
- * @param state
- */
- void queryCompleted(QueryState state);
-
- /**
* A {@link org.apache.drill.exec.proto.beans.QueryData QueryData} message
was received
* @param result data batch received
* @param throttle connection throttle
*/
void dataArrived(QueryDataBatch result, ConnectionThrottle throttle);
+ /**
+ * The query has completed (successsful completion or cancellation). The
listener will not receive any other
+ * data or result message. Called when the server returns a terminal-non
failing- state (COMPLETED or CANCELLED)
+ * @param state
+ */
+ void queryCompleted(QueryState state);
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/7b776e7f/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillCursor.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillCursor.java
b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillCursor.java
index 41b89ce..6bad3ce 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillCursor.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillCursor.java
@@ -65,7 +65,7 @@ public class DrillCursor implements Cursor {
public DrillCursor(final DrillResultSetImpl resultSet) {
this.resultSet = resultSet;
currentBatch = resultSet.currentBatch;
- resultsListener = resultSet.resultslistener;
+ resultsListener = resultSet.resultsListener;
}
public DrillResultSetImpl getResultSet() {
http://git-wip-us.apache.org/repos/asf/drill/blob/7b776e7f/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
----------------------------------------------------------------------
diff --git
a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
index 35674bf..a7cc0c1 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
@@ -47,31 +47,31 @@ import org.apache.drill.jdbc.DrillResultSet;
import org.apache.drill.jdbc.ExecutionCanceledSqlException;
import org.apache.drill.jdbc.SchemaChangeListener;
-import com.google.common.collect.Queues;
+import static org.slf4j.LoggerFactory.getLogger;
+import org.slf4j.Logger;
+import com.google.common.collect.Queues;
-//???? Split this into interface org.apache.drill.jdbc.DrillResultSet for
published
-// interface and a class probably named
org.apache.drill.jdbc.impl.DrillResultSetImpl.
-// Add any needed documentation of Drill-specific behavior of JDBC-defined
-// ResultSet methods to new DrillResultSet. ...
public class DrillResultSetImpl extends AvaticaResultSet implements
DrillResultSet {
+ @SuppressWarnings("unused")
private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(DrillResultSetImpl.class);
// (Public until JDBC impl. classes moved out of published-intf. package.
(DRILL-2089).)
public SchemaChangeListener changeListener;
// (Public until JDBC impl. classes moved out of published-intf. package.
(DRILL-2089).)
- public final ResultsListener resultslistener = new ResultsListener();
- private volatile QueryId queryId;
+ public final ResultsListener resultsListener = new ResultsListener();
private final DrillClient client;
// (Public until JDBC impl. classes moved out of published-intf. package.
(DRILL-2089).)
+ // TODO: Resolve: Since is barely manipulated here in DrillResultSetImpl,
+ // move down into DrillCursor and have this.clean() have cursor clean it.
public final RecordBatchLoader currentBatch;
// (Public until JDBC impl. classes moved out of published-intf. package.
(DRILL-2089).)
public final DrillCursor cursor;
public boolean hasPendingCancelationNotification;
public DrillResultSetImpl(AvaticaStatement statement, AvaticaPrepareResult
prepareResult,
- ResultSetMetaData resultSetMetaData, TimeZone timeZone) {
+ ResultSetMetaData resultSetMetaData, TimeZone
timeZone) {
super(statement, prepareResult, resultSetMetaData, timeZone);
DrillConnection c = (DrillConnection) statement.getConnection();
DrillClient client = c.getClient();
@@ -120,10 +120,10 @@ public class DrillResultSetImpl extends AvaticaResultSet
implements DrillResultS
// (Public until JDBC impl. classes moved out of published-intf. package.
(DRILL-2089).)
public synchronized void cleanup() {
- if (queryId != null && ! resultslistener.completed) {
- client.cancelQuery(queryId);
+ if (resultsListener.getQueryId() != null && ! resultsListener.completed) {
+ client.cancelQuery(resultsListener.getQueryId());
}
- resultslistener.close();
+ resultsListener.close();
currentBatch.clear();
}
@@ -145,45 +145,60 @@ public class DrillResultSetImpl extends AvaticaResultSet
implements DrillResultS
DrillConnectionImpl connection = (DrillConnectionImpl)
statement.getConnection();
connection.getClient().runQuery(QueryType.SQL, this.prepareResult.getSql(),
- resultslistener);
+ resultsListener);
connection.getDriver().handler.onStatementExecute(statement, null);
super.execute();
// don't return with metadata until we've achieved at least one return
message.
try {
- resultslistener.latch.await();
+ // TODO: Revisit: Why reaching directly into ResultsListener rather
than
+ // calling some wait method?
+ resultsListener.latch.await();
cursor.next();
} catch (InterruptedException e) {
- // TODO: Check: Should this call Thread.currentThread.interrupt()? If
- // not, at least document why this is empty.
+ // TODO: Check: Should this call Thread.currentThread.interrupt()? If
+ // not, at least document why this is empty.
+ // TODO: Check: Does anything ever interrupt this? (Is catch needed?)
}
return this;
}
public String getQueryId() {
- if (queryId != null) {
- return QueryIdHelper.getQueryId(queryId);
+ if (resultsListener.queryId != null) {
+ return QueryIdHelper.getQueryId(resultsListener.queryId);
} else {
return null;
}
}
// (Public until JDBC impl. classes moved out of published-intf. package.
(DRILL-2089).)
- public class ResultsListener implements UserResultsListener {
- private static final int MAX = 100;
- private volatile UserException ex;
+ public static class ResultsListener implements UserResultsListener {
+ private static Logger logger = getLogger( ResultsListener.class );
+
+ private static final int THROTTLING_QUEUE_SIZE_THRESHOLD = 100;
+
+ private volatile QueryId queryId;
+
+
+ private volatile UserException executionFailureException;
volatile boolean completed = false;
private volatile boolean autoread = true;
private volatile ConnectionThrottle throttle;
private volatile boolean closed = false;
+ // TODO: Rename. It's obvious it's a latch--but what condition or action
+ // does it represent or control?
private CountDownLatch latch = new CountDownLatch(1);
private AtomicBoolean receivedMessage = new AtomicBoolean(false);
+ final LinkedBlockingDeque<QueryDataBatch> batchQueue =
+ Queues.newLinkedBlockingDeque();
- final LinkedBlockingDeque<QueryDataBatch> queue =
Queues.newLinkedBlockingDeque();
+ ResultsListener() {
+ logger.debug( "Query listener created." );
+ }
// TODO: Doc.: Release what if what is first relative to what?
private boolean releaseIfFirst() {
@@ -196,22 +211,23 @@ public class DrillResultSetImpl extends AvaticaResultSet
implements DrillResultS
}
@Override
- public void submissionFailed(UserException ex) {
- this.ex = ex;
- completed = true;
- close();
- System.out.println("Query failed: " + ex.getMessage());
+ public void queryIdArrived(QueryId queryId) {
+ logger.debug( "Received query ID: {}.", queryId );
+ this.queryId = queryId;
}
@Override
- public void queryCompleted(QueryState state) {
- releaseIfFirst();
+ public void submissionFailed(UserException ex) {
+ logger.debug( "Received query failure.", ex );
+ this.executionFailureException = ex;
completed = true;
+ close();
+ logger.info( "Query failed: ", ex );
}
@Override
public void dataArrived(QueryDataBatch result, ConnectionThrottle
throttle) {
- logger.debug("Result arrived {}", result);
+ logger.debug( "Received query data: {}.", result );
// If we're in a closed state, just release the message.
if (closed) {
@@ -221,8 +237,8 @@ public class DrillResultSetImpl extends AvaticaResultSet
implements DrillResultS
}
// We're active; let's add to the queue.
- queue.add(result);
- if (queue.size() >= MAX - 1) {
+ batchQueue.add(result);
+ if (batchQueue.size() >= THROTTLING_QUEUE_SIZE_THRESHOLD - 1) {
throttle.setAutoRead(false);
this.throttle = throttle;
autoread = false;
@@ -231,22 +247,36 @@ public class DrillResultSetImpl extends AvaticaResultSet
implements DrillResultS
releaseIfFirst();
}
+ @Override
+ public void queryCompleted(QueryState state) {
+ logger.debug( "Query completion arrived: {}.", state );
+ releaseIfFirst();
+ completed = true;
+ }
+
+ public QueryId getQueryId() {
+ return queryId;
+ }
+
+
// TODO: Doc.: Specify whether result can be null and what that means.
public QueryDataBatch getNext() throws Exception {
while (true) {
- if (ex != null) {
- throw ex;
+ if (executionFailureException != null) {
+ logger.debug( "Dequeued query failure exception: {}.",
executionFailureException );
+ throw executionFailureException;
}
- if (completed && queue.isEmpty()) {
+ if (completed && batchQueue.isEmpty()) {
return null;
} else {
- QueryDataBatch q = queue.poll(50, TimeUnit.MILLISECONDS);
+ QueryDataBatch q = batchQueue.poll(50, TimeUnit.MILLISECONDS);
if (q != null) {
- if (!autoread && queue.size() < MAX / 2) {
+ if (!autoread && batchQueue.size() <
THROTTLING_QUEUE_SIZE_THRESHOLD / 2) {
autoread = true;
throttle.setAutoRead(true);
throttle = null;
}
+ logger.debug( "Dequeued query data: {}.", q );
return q;
}
}
@@ -255,22 +285,18 @@ public class DrillResultSetImpl extends AvaticaResultSet
implements DrillResultS
void close() {
closed = true;
- while (!queue.isEmpty()) {
- QueryDataBatch qrb = queue.poll();
+ while (!batchQueue.isEmpty()) {
+ QueryDataBatch qrb = batchQueue.poll();
if (qrb != null && qrb.getData() != null) {
qrb.getData().release();
}
}
// close may be called before the first result is received and the main
thread is blocked waiting
// for the result. In that case we want to unblock the main thread.
- latch.countDown();
+ latch.countDown(); // TODO: Why not call releaseIfFirst as used
elsewhere?
completed = true;
}
- @Override
- public void queryIdArrived(QueryId queryId) {
- DrillResultSetImpl.this.queryId = queryId;
- }
}
}