DRILL-3159: Part 2--Core: Make JDBC throttling threshold configurable. Added configuration/option "drill.jdbc.batch_queue_throttling_threshold". Applied "drill.jdbc.batch_queue_throttling_threshold" to DrillResultSetImpl.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/acf5566e Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/acf5566e Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/acf5566e Branch: refs/heads/master Commit: acf5566e81ca3fafe83309188499aabb2091b945 Parents: 0c69631 Author: dbarclay <[email protected]> Authored: Thu May 14 15:36:14 2015 -0700 Committer: Parth Chandra <[email protected]> Committed: Tue Jun 2 12:26:11 2015 -0700 ---------------------------------------------------------------------- .../org/apache/drill/exec/ExecConstants.java | 3 +++ .../src/main/resources/drill-module.conf | 4 ++++ .../drill/jdbc/impl/DrillResultSetImpl.java | 24 ++++++++++++++++---- 3 files changed, 26 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/acf5566e/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index be67f9d..91793f5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -87,6 +87,9 @@ public interface ExecConstants { public static final String USER_AUTHENTICATOR_IMPL = "drill.exec.security.user.auth.impl"; public static final String PAM_AUTHENTICATOR_PROFILES = "drill.exec.security.user.auth.pam_profiles"; public static final String ERROR_ON_MEMORY_LEAK = "drill.exec.debug.error_on_leak"; + /** Size of JDBC batch queue (in batches) above which throttling begins. */ + public static final String JDBC_BATCH_QUEUE_THROTTLING_THRESHOLD = + "drill.jdbc.batch_queue_throttling_threshold"; /** * Currently if a query is cancelled, but one of the fragments reports the status as FAILED instead of CANCELLED or http://git-wip-us.apache.org/repos/asf/drill/blob/acf5566e/exec/java-exec/src/main/resources/drill-module.conf ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index 66055f1..dbe449a 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -163,3 +163,7 @@ drill.exec: { return_error_for_failure_in_cancelled_fragments: false } } + +drill.jdbc: { + batch_queue_throttling_threshold: 100 +} http://git-wip-us.apache.org/repos/asf/drill/blob/acf5566e/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java ---------------------------------------------------------------------- diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java index 385ccf5..cb6bd1d 100644 --- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java +++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java @@ -31,6 +31,7 @@ import net.hydromatic.avatica.AvaticaResultSet; import net.hydromatic.avatica.AvaticaStatement; import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.client.DrillClient; import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; @@ -63,7 +64,7 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS // (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).) public SchemaChangeListener changeListener; // (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).) - public final ResultsListener resultsListener = new ResultsListener(); + public final ResultsListener resultsListener; private final DrillClient client; // (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).) // TODO: Resolve: Since is barely manipulated here in DrillResultSetImpl, @@ -77,6 +78,10 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS ResultSetMetaData resultSetMetaData, TimeZone timeZone) { super(statement, prepareResult, resultSetMetaData, timeZone); this.statement = statement; + final int batchQueueThrottlingThreshold = + this.getStatement().getConnection().getClient().getConfig().getInt( + ExecConstants.JDBC_BATCH_QUEUE_THROTTLING_THRESHOLD ); + resultsListener = new ResultsListener( batchQueueThrottlingThreshold ); DrillConnection c = (DrillConnection) statement.getConnection(); DrillClient client = c.getClient(); currentBatch = new RecordBatchLoader(client.getAllocator()); @@ -188,12 +193,13 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS public static class ResultsListener implements UserResultsListener { private static final Logger logger = getLogger( ResultsListener.class ); - private static final int THROTTLING_QUEUE_SIZE_THRESHOLD = 100; private static volatile int nextInstanceId = 1; /** (Just for logging.) */ private final int instanceId; + private final int batchQueueThrottlingThreshold; + /** (Just for logging.) */ private volatile QueryId queryId; @@ -225,8 +231,14 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS Queues.newLinkedBlockingDeque(); - ResultsListener() { + /** + * ... + * @param batchQueueThrottlingThreshold + * queue size threshold for throttling server + */ + ResultsListener( int batchQueueThrottlingThreshold ) { instanceId = nextInstanceId++; + this.batchQueueThrottlingThreshold = batchQueueThrottlingThreshold; logger.debug( "[#{}] Query listener created.", instanceId ); } @@ -245,7 +257,7 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS } /** - * Stops throttling if currently active. + * Stops throttling if currently throttling. * @return true if actually stopped (was throttling) */ private boolean stopThrottlingIfSo() { @@ -300,7 +312,9 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS // We're active; let's add to the queue. batchQueue.add(result); - if (batchQueue.size() >= THROTTLING_QUEUE_SIZE_THRESHOLD - 1) { + + // Throttle server if queue size has exceed threshold. + if (batchQueue.size() > batchQueueThrottlingThreshold ) { if ( startThrottlingIfNot( throttle ) ) { logger.debug( "[#{}] Throttling started at queue size {}.", instanceId, batchQueue.size() );
