This is an automated email from the ASF dual-hosted git repository. volodymyr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit 482a63549e1bfe2b238ea9bdaf7d42312e1f51f6 Author: Padma Penumarthy <[email protected]> AuthorDate: Sun Jul 1 09:43:40 2018 -0700 DRILL-6537: Limit the batch size for buffering operators based on how much memory they get closes #1342 --- .../src/main/java/org/apache/drill/exec/ExecConstants.java | 4 ++++ .../apache/drill/exec/physical/impl/join/HashJoinBatch.java | 10 +++++++--- .../apache/drill/exec/server/options/SystemOptionManager.java | 3 ++- exec/java-exec/src/main/resources/drill-module.conf | 1 + 4 files changed, 14 insertions(+), 4 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index bc16272..49f149b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -85,6 +85,10 @@ public final class ExecConstants { // need to produce very large batches that take up lot of memory. public static final LongValidator OUTPUT_BATCH_SIZE_VALIDATOR = new RangeLongValidator(OUTPUT_BATCH_SIZE, 128, 512 * 1024 * 1024); + // Based on available memory, adjust output batch size for buffered operators by this factor. + public static final String OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR = "drill.exec.memory.operator.output_batch_size_avail_mem_factor"; + public static final DoubleValidator OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR_VALIDATOR = new RangeDoubleValidator(OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR, 0.01, 1.0); + // External Sort Boot configuration public static final String EXTERNAL_SORT_TARGET_SPILL_BATCH_SIZE = "drill.exec.sort.external.spill.batch.size"; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 428a47e..047c597 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -886,9 +886,13 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { partitions = new HashPartition[0]; // get the output batch size from config. - int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR); - batchMemoryManager = new JoinBatchMemoryManager(configuredBatchSize, left, right); - logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize); + final int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR); + final double avail_mem_factor = (double) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR_VALIDATOR); + int outputBatchSize = Math.min(configuredBatchSize, Integer.highestOneBit((int)(allocator.getLimit() * avail_mem_factor))); + logger.debug("BATCH_STATS, configured output batch size: {}, allocated memory {}, avail mem factor {}, output batch size: {}", + configuredBatchSize, allocator.getLimit(), avail_mem_factor, outputBatchSize); + + batchMemoryManager = new JoinBatchMemoryManager(outputBatchSize, left, right); } /** diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index e6368f5..a9c4742 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -233,6 +233,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea new OptionDefinition(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false)), new OptionDefinition(ExecConstants.STATS_LOGGING_BATCH_SIZE_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)), new OptionDefinition(ExecConstants.STATS_LOGGING_BATCH_FG_SIZE_VALIDATOR,new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)), + new OptionDefinition(ExecConstants.OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false)), new OptionDefinition(ExecConstants.FRAG_RUNNER_RPC_TIMEOUT_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)), }; @@ -294,7 +295,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea * Initializes this option manager. * * @return this option manager - * @throws IOException + * @throws Exception */ public SystemOptionManager init() throws Exception { options = provider.getOrCreateStore(config); diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index 23d59d3..2e8c2e7 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -438,6 +438,7 @@ drill.exec.options: { drill.exec.storage.implicit.suffix.column.label: "suffix", drill.exec.testing.controls: "{}", drill.exec.memory.operator.output_batch_size : 16777216, # 16 MB + drill.exec.memory.operator.output_batch_size_avail_mem_factor : 0.1, exec.bulk_load_table_list.bulk_size: 1000, exec.compile.scalar_replacement: false, exec.enable_bulk_load_table_list: false,
