DRILL-3159: Part 1--Prep., Hyg. for: Make JDBC throttling threshold configurable.
Cleaned, enhanced DrillResultSet: - Enhanced ResultsListener logging: - Added instance ID; added batch numbers. - Added logging at close (pairing with logging at construction). - Fixed 2-integer query ID to UUID form. - Renamed qrb -> qdb; q -> qdb (per recent QueryDataBatch change). - Added "final" on ResultsListener's logger. Reduced Avatica-vs.-Drill casting: - DrillStatementImpl's (Drill)Connection(Impl). - DrillResultSetImpl's (Drill)Statement(Impl). Converted a comment in ExecConstants. Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/0c69631f Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/0c69631f Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/0c69631f Branch: refs/heads/master Commit: 0c69631fb9e7ef19e503645d6433584a12de23d4 Parents: 71199ed Author: dbarclay <[email protected]> Authored: Wed May 20 18:16:23 2015 -0700 Committer: Parth Chandra <[email protected]> Committed: Tue Jun 2 12:26:02 2015 -0700 ---------------------------------------------------------------------- .../org/apache/drill/exec/ExecConstants.java | 3 +- .../apache/drill/jdbc/DrillJdbc41Factory.java | 11 ++- .../drill/jdbc/impl/DrillResultSetImpl.java | 84 +++++++++++++------- .../drill/jdbc/impl/DrillStatementImpl.java | 5 +- 4 files changed, 71 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/0c69631f/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index 8a24e8d..be67f9d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -60,7 +60,8 @@ public interface ExecConstants { public static final String TEMP_DIRECTORIES = "drill.exec.tmp.directories"; public static final String TEMP_FILESYSTEM = "drill.exec.tmp.filesystem"; public static final String INCOMING_BUFFER_IMPL = "drill.exec.buffer.impl"; - public static final String INCOMING_BUFFER_SIZE = "drill.exec.buffer.size"; // incoming buffer size (number of batches) + /** incoming buffer size (number of batches) */ + public static final String INCOMING_BUFFER_SIZE = "drill.exec.buffer.size"; public static final String SPOOLING_BUFFER_DELETE = "drill.exec.buffer.spooling.delete"; public static final String SPOOLING_BUFFER_MEMORY = "drill.exec.buffer.spooling.size"; public static final String BATCH_PURGE_THRESHOLD = "drill.exec.sort.purge.threshold"; http://git-wip-us.apache.org/repos/asf/drill/blob/0c69631f/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillJdbc41Factory.java ---------------------------------------------------------------------- diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillJdbc41Factory.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillJdbc41Factory.java index 6240b62..93fe59d 100644 --- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillJdbc41Factory.java +++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillJdbc41Factory.java @@ -82,9 +82,14 @@ public class DrillJdbc41Factory extends DrillFactory { } @Override - public DrillResultSetImpl newResultSet(AvaticaStatement statement, AvaticaPrepareResult prepareResult, TimeZone timeZone) { - final ResultSetMetaData metaData = newResultSetMetaData(statement, prepareResult.getColumnList()); - return new DrillResultSetImpl(statement, (DrillPrepareResult) prepareResult, metaData, timeZone); + public DrillResultSetImpl newResultSet( AvaticaStatement statement, + AvaticaPrepareResult prepareResult, + TimeZone timeZone ) { + final ResultSetMetaData metaData = + newResultSetMetaData(statement, prepareResult.getColumnList()); + return new DrillResultSetImpl( (DrillStatementImpl) statement, + (DrillPrepareResult) prepareResult, + metaData, timeZone); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/0c69631f/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java ---------------------------------------------------------------------- diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java index 4fa1f2f..385ccf5 100644 --- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java +++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java @@ -58,6 +58,8 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS @SuppressWarnings("unused") private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillResultSetImpl.class); + private final DrillStatementImpl statement; + // (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).) public SchemaChangeListener changeListener; // (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).) @@ -71,17 +73,21 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS public final DrillCursor cursor; public boolean hasPendingCancelationNotification; - public DrillResultSetImpl(AvaticaStatement statement, AvaticaPrepareResult prepareResult, + public DrillResultSetImpl(DrillStatementImpl statement, AvaticaPrepareResult prepareResult, ResultSetMetaData resultSetMetaData, TimeZone timeZone) { super(statement, prepareResult, resultSetMetaData, timeZone); + this.statement = statement; DrillConnection c = (DrillConnection) statement.getConnection(); DrillClient client = c.getClient(); - // DrillClient client, DrillStatement statement) { currentBatch = new RecordBatchLoader(client.getAllocator()); this.client = client; cursor = new DrillCursor(this); } + public DrillStatementImpl getStatement() { + return statement; + } + /** * Throws AlreadyClosedSqlException or QueryCanceledSqlException if this * ResultSet is closed. @@ -171,8 +177,8 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS } public String getQueryId() { - if (resultsListener.queryId != null) { - return QueryIdHelper.getQueryId(resultsListener.queryId); + if (resultsListener.getQueryId() != null) { + return QueryIdHelper.getQueryId(resultsListener.getQueryId()); } else { return null; } @@ -180,12 +186,22 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS // (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).) public static class ResultsListener implements UserResultsListener { - private static Logger logger = getLogger( ResultsListener.class ); + private static final Logger logger = getLogger( ResultsListener.class ); private static final int THROTTLING_QUEUE_SIZE_THRESHOLD = 100; + private static volatile int nextInstanceId = 1; + /** (Just for logging.) */ + private final int instanceId; + + /** (Just for logging.) */ private volatile QueryId queryId; + /** (Just for logging.) */ + private int lastReceivedBatchNumber; + /** (Just for logging.) */ + private int lastDequeuedBatchNumber; + private volatile UserException executionFailureException; // TODO: Revisit "completed". Determine and document exactly what it @@ -210,7 +226,8 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS ResultsListener() { - logger.debug( "Query listener created." ); + instanceId = nextInstanceId++; + logger.debug( "[#{}] Query listener created.", instanceId ); } /** @@ -252,22 +269,25 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS @Override public void queryIdArrived(QueryId queryId) { - logger.debug( "Received query ID: {}.", queryId ); + logger.debug( "[#{}] Received query ID: {}.", + instanceId, QueryIdHelper.getQueryId( queryId ) ); this.queryId = queryId; } @Override public void submissionFailed(UserException ex) { - logger.debug( "Received query failure:", ex ); + logger.debug( "Received query failure:", instanceId, ex ); this.executionFailureException = ex; completed = true; close(); - logger.info( "Query failed: ", ex ); + logger.info( "[#{}] Query failed: ", instanceId, ex ); } @Override public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) { - logger.debug( "Received query data batch: {}.", result ); + lastReceivedBatchNumber++; + logger.debug( "[#{}] Received query data batch #{}: {}.", + instanceId, lastReceivedBatchNumber, result ); // If we're in a closed state, just release the message. if (closed) { @@ -282,7 +302,8 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS batchQueue.add(result); if (batchQueue.size() >= THROTTLING_QUEUE_SIZE_THRESHOLD - 1) { if ( startThrottlingIfNot( throttle ) ) { - logger.debug( "Throttling started at queue size {}.", batchQueue.size() ); + logger.debug( "[#{}] Throttling started at queue size {}.", + instanceId, batchQueue.size() ); } } @@ -291,7 +312,7 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS @Override public void queryCompleted(QueryState state) { - logger.debug( "Received query completion: {}.", state ); + logger.debug( "[#{}] Received query completion: {}.", instanceId, state ); releaseIfFirst(); completed = true; } @@ -313,41 +334,50 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS InterruptedException { while (true) { if (executionFailureException != null) { - logger.debug( "Dequeued query failure exception: {}.", executionFailureException ); + logger.debug( "[#{}] Dequeued query failure exception: {}.", + instanceId, executionFailureException ); throw executionFailureException; } if (completed && batchQueue.isEmpty()) { return null; } else { - QueryDataBatch q = batchQueue.poll(50, TimeUnit.MILLISECONDS); - if (q != null) { - assert THROTTLING_QUEUE_SIZE_THRESHOLD >= 2; - if (batchQueue.size() < THROTTLING_QUEUE_SIZE_THRESHOLD / 2) { + QueryDataBatch qdb = batchQueue.poll(50, TimeUnit.MILLISECONDS); + if (qdb != null) { + lastDequeuedBatchNumber++; + logger.debug( "[#{}] Dequeued query data batch #{}: {}.", + instanceId, lastDequeuedBatchNumber, qdb ); + + // Unthrottle server if queue size has dropped enough below threshold: + if ( batchQueue.size() < batchQueueThrottlingThreshold / 2 + || batchQueue.size() == 0 // (in case threshold < 2) + ) { if ( stopThrottlingIfSo() ) { - logger.debug( "Throttling stopped at queue size {}.", - batchQueue.size() ); + logger.debug( "[#{}] Throttling stopped at queue size {}.", + instanceId, batchQueue.size() ); } } - logger.debug( "Dequeued query data batch: {}.", q ); - return q; + return qdb; } } } } void close() { + logger.debug( "[#{}] Query listener closing.", instanceId ); closed = true; if ( stopThrottlingIfSo() ) { - logger.debug( "Throttling stopped at close() (at queue size {}).", batchQueue.size() ); + logger.debug( "[#{}] Throttling stopped at close() (at queue size {}).", + instanceId, batchQueue.size() ); } while (!batchQueue.isEmpty()) { - QueryDataBatch qrb = batchQueue.poll(); - if (qrb != null && qrb.getData() != null) { - qrb.getData().release(); + QueryDataBatch qdb = batchQueue.poll(); + if (qdb != null && qdb.getData() != null) { + qdb.getData().release(); } } - // close may be called before the first result is received and the main thread is blocked waiting - // for the result. In that case we want to unblock the main thread. + // Close may be called before the first result is received and therefore + // when the main thread is blocked waiting for the result. In that case + // we want to unblock the main thread. latch.countDown(); // TODO: Why not call releaseIfFirst as used elsewhere? completed = true; } http://git-wip-us.apache.org/repos/asf/drill/blob/0c69631f/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillStatementImpl.java ---------------------------------------------------------------------- diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillStatementImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillStatementImpl.java index 5160c31..6610f52 100644 --- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillStatementImpl.java +++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillStatementImpl.java @@ -33,9 +33,12 @@ import net.hydromatic.avatica.AvaticaStatement; public abstract class DrillStatementImpl extends AvaticaStatement implements DrillStatement, DrillRemoteStatement { + private final DrillConnectionImpl connection; + // (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).) public DrillStatementImpl(DrillConnectionImpl connection, int resultSetType, int resultSetConcurrency, int resultSetHoldability) { super(connection, resultSetType, resultSetConcurrency, resultSetHoldability); + this.connection = connection; connection.openStatementsRegistry.addStatement(this); } @@ -52,7 +55,7 @@ public abstract class DrillStatementImpl extends AvaticaStatement @Override public DrillConnectionImpl getConnection() { - return (DrillConnectionImpl) connection; + return connection; } // WORKAROUND: Work around AvaticaStatement's code that wraps _any_ exception,
