This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 482a63549e1bfe2b238ea9bdaf7d42312e1f51f6
Author: Padma Penumarthy <[email protected]>
AuthorDate: Sun Jul 1 09:43:40 2018 -0700

    DRILL-6537: Limit the batch size for buffering operators based on how much 
memory they get
    
    closes #1342
---
 .../src/main/java/org/apache/drill/exec/ExecConstants.java     |  4 ++++
 .../apache/drill/exec/physical/impl/join/HashJoinBatch.java    | 10 +++++++---
 .../apache/drill/exec/server/options/SystemOptionManager.java  |  3 ++-
 exec/java-exec/src/main/resources/drill-module.conf            |  1 +
 4 files changed, 14 insertions(+), 4 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index bc16272..49f149b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -85,6 +85,10 @@ public final class ExecConstants {
   // need to produce very large batches that take up lot of memory.
   public static final LongValidator OUTPUT_BATCH_SIZE_VALIDATOR = new 
RangeLongValidator(OUTPUT_BATCH_SIZE, 128, 512 * 1024 * 1024);
 
+  // Based on available memory, adjust output batch size for buffered 
operators by this factor.
+  public static final String OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR = 
"drill.exec.memory.operator.output_batch_size_avail_mem_factor";
+  public static final DoubleValidator 
OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR_VALIDATOR = new 
RangeDoubleValidator(OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR, 0.01, 1.0);
+
   // External Sort Boot configuration
 
   public static final String EXTERNAL_SORT_TARGET_SPILL_BATCH_SIZE = 
"drill.exec.sort.external.spill.batch.size";
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 428a47e..047c597 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -886,9 +886,13 @@ public class HashJoinBatch extends 
AbstractBinaryRecordBatch<HashJoinPOP> {
     partitions = new HashPartition[0];
 
     // get the output batch size from config.
-    int configuredBatchSize = (int) 
context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
-    batchMemoryManager = new JoinBatchMemoryManager(configuredBatchSize, left, 
right);
-    logger.debug("BATCH_STATS, configured output batch size: {}", 
configuredBatchSize);
+    final int configuredBatchSize = (int) 
context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
+    final double avail_mem_factor = (double) 
context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR_VALIDATOR);
+    int outputBatchSize = Math.min(configuredBatchSize, 
Integer.highestOneBit((int)(allocator.getLimit() * avail_mem_factor)));
+    logger.debug("BATCH_STATS, configured output batch size: {}, allocated 
memory {}, avail mem factor {}, output batch size: {}",
+      configuredBatchSize, allocator.getLimit(), avail_mem_factor, 
outputBatchSize);
+
+    batchMemoryManager = new JoinBatchMemoryManager(outputBatchSize, left, 
right);
   }
 
   /**
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index e6368f5..a9c4742 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -233,6 +233,7 @@ public class SystemOptionManager extends BaseOptionManager 
implements AutoClosea
       new OptionDefinition(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR, new 
OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false)),
       new OptionDefinition(ExecConstants.STATS_LOGGING_BATCH_SIZE_VALIDATOR, 
new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, 
true)),
       new 
OptionDefinition(ExecConstants.STATS_LOGGING_BATCH_FG_SIZE_VALIDATOR,new 
OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
+      new 
OptionDefinition(ExecConstants.OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR_VALIDATOR, 
new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false)),
       new OptionDefinition(ExecConstants.FRAG_RUNNER_RPC_TIMEOUT_VALIDATOR, 
new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)),
     };
 
@@ -294,7 +295,7 @@ public class SystemOptionManager extends BaseOptionManager 
implements AutoClosea
    * Initializes this option manager.
    *
    * @return this option manager
-   * @throws IOException
+   * @throws Exception
    */
   public SystemOptionManager init() throws Exception {
     options = provider.getOrCreateStore(config);
diff --git a/exec/java-exec/src/main/resources/drill-module.conf 
b/exec/java-exec/src/main/resources/drill-module.conf
index 23d59d3..2e8c2e7 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -438,6 +438,7 @@ drill.exec.options: {
     drill.exec.storage.implicit.suffix.column.label: "suffix",
     drill.exec.testing.controls: "{}",
     drill.exec.memory.operator.output_batch_size : 16777216, # 16 MB
+    drill.exec.memory.operator.output_batch_size_avail_mem_factor : 0.1,
     exec.bulk_load_table_list.bulk_size: 1000,
     exec.compile.scalar_replacement: false,
     exec.enable_bulk_load_table_list: false,

Reply via email to