This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f021ba6b [GOBBLIN-1898] Improve performance of ORCWriter Self Tune 
(#3762)
5f021ba6b is described below

commit 5f021ba6b9f259c383be3fa5a2ec323e168c1501
Author: William Lo <[email protected]>
AuthorDate: Tue Sep 12 12:36:23 2023 -0400

    [GOBBLIN-1898] Improve performance of ORCWriter Self Tune (#3762)
    
    * Selftuning ORCwriter with configurations
    
    * Cleanup
    
    * Address reviews and clean up
    
    * More clean up, adds comments, fix bug where tuning was not happening at a 
regular interval
    
    * cleanup
    
    * Fix checkstyle
    
    * renames and log improvements
    
    * Add configs to handle multiple tasks and read correct orc stripe size 
properties, fix some more bugs
    
    * Fix bug in orc converter memory manager where it was miscalculating 
buffer size
    
    * Algorithm improvement/bug fix, log improvements, flush only when 
batchsize decreases
    
    * Fix findbugs and address review
    
    * Comment cleanup
    
    * Add log for startup batchsize
    
    * Change naming of batch size memory factor -> rowbatch memory factor to be 
more accurate to what it's for
    
    * Add memorymanager tests and change algorithm to take into account for 
children byte size
    
    * Fix test to not rely on an implied large batch size
    
    * Add basic test cases for selftuning
    
    * Decrease the size of tests as github actions cannot handle a large array
    
    * Improve ORCWriter self tune accuracy with native orcwriter API for 
estimateMemory(), and improve state management and smoothen algorithm and 
additional configurations
    
    * Add unit tests
    
    * Cleanup
    
    * Upgrade ORC to 1.7.6
    
    * Refactor configs into its own class
    
    * Fix checkstyle
---
 .../mapreduce/CompactionOrcJobConfigurator.java    |   4 +-
 .../mapreduce/orc/OrcKeyCompactorOutputFormat.java |   4 +-
 .../writer/GenericRecordToOrcValueWriter.java      |   4 +-
 .../gobblin/writer/GobblinBaseOrcWriter.java       | 122 ++++++++++++---------
 .../gobblin/writer/GobblinOrcWriterBuilder.java    |   3 +-
 .../gobblin/writer/GobblinOrcWriterConfigs.java    |  88 +++++++++++++++
 .../gobblin/writer/GobblinOrcWriterTest.java       |  50 ++++++++-
 gradle/scripts/dependencyDefinitions.gradle        |   6 +-
 8 files changed, 211 insertions(+), 70 deletions(-)

diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionOrcJobConfigurator.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionOrcJobConfigurator.java
index d554a5362..a67a9ee39 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionOrcJobConfigurator.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionOrcJobConfigurator.java
@@ -37,8 +37,8 @@ import org.apache.gobblin.configuration.State;
 
 import static 
org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter.COMPACTION_OUTPUT_EXTENSION;
 import static 
org.apache.gobblin.compaction.mapreduce.orc.OrcUtils.eligibleForUpConvert;
-import static 
org.apache.gobblin.writer.GobblinOrcWriter.DEFAULT_ORC_WRITER_BATCH_SIZE;
-import static org.apache.gobblin.writer.GobblinOrcWriter.ORC_WRITER_BATCH_SIZE;
+import static 
org.apache.gobblin.writer.GobblinOrcWriterConfigs.DEFAULT_ORC_WRITER_BATCH_SIZE;
+import static 
org.apache.gobblin.writer.GobblinOrcWriterConfigs.ORC_WRITER_BATCH_SIZE;
 
 public class CompactionOrcJobConfigurator extends CompactionJobConfigurator {
   /**
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyCompactorOutputFormat.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyCompactorOutputFormat.java
index 27eefcded..07c1a0255 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyCompactorOutputFormat.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyCompactorOutputFormat.java
@@ -35,7 +35,7 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter;
 import org.apache.gobblin.writer.GobblinOrcMemoryManager;
-import org.apache.gobblin.writer.GobblinOrcWriter;
+import org.apache.gobblin.writer.GobblinOrcWriterConfigs;
 
 import static 
org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter.COMPACTION_OUTPUT_EXTENSION;
 
@@ -70,7 +70,7 @@ public class OrcKeyCompactorOutputFormat extends 
OrcOutputFormat {
     Path filename = getDefaultWorkFile(taskAttemptContext, extension);
     Writer writer = OrcFile.createWriter(filename,
         org.apache.orc.mapred.OrcOutputFormat.buildOptions(conf).memory(new 
GobblinOrcMemoryManager(conf)));
-    int rowBatchSize = conf.getInt(GobblinOrcWriter.ORC_WRITER_BATCH_SIZE, 
GobblinOrcWriter.DEFAULT_ORC_WRITER_BATCH_SIZE);
+    int rowBatchSize = 
conf.getInt(GobblinOrcWriterConfigs.ORC_WRITER_BATCH_SIZE, 
GobblinOrcWriterConfigs.DEFAULT_ORC_WRITER_BATCH_SIZE);
     log.info("Creating OrcMapreduceRecordWriter with row batch size = {}", 
rowBatchSize);
     return new OrcMapreduceRecordWriter(writer, rowBatchSize);
   }
diff --git 
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java
 
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java
index 27068258b..4e1a49512 100644
--- 
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java
+++ 
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java
@@ -60,9 +60,9 @@ import org.apache.gobblin.util.orc.AvroOrcSchemaConverter;
  */
 @Slf4j
 public class GenericRecordToOrcValueWriter implements 
OrcValueWriter<GenericRecord> {
-  private static final String ENABLE_SMART_ARRAY_ENLARGE = 
GobblinOrcWriter.ORC_WRITER_PREFIX + "enabledMulValueColumnVectorSmartSizing";
+  private static final String ENABLE_SMART_ARRAY_ENLARGE = 
GobblinOrcWriterConfigs.ORC_WRITER_PREFIX + 
"enabledMulValueColumnVectorSmartSizing";
   private static final boolean DEFAULT_ENABLE_SMART_ARRAY_ENLARGE = false;
-  private static final String ENLARGE_FACTOR_KEY = 
GobblinOrcWriter.ORC_WRITER_PREFIX + "enlargeFactor";
+  private static final String ENLARGE_FACTOR_KEY = 
GobblinOrcWriterConfigs.ORC_WRITER_PREFIX + "enlargeFactor";
   private static final int DEFAULT_ENLARGE_FACTOR = 3;
 
   private boolean enabledSmartSizing;
diff --git 
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
 
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
index 6e7a00bed..f7bc0c325 100644
--- 
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
+++ 
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
@@ -45,27 +45,6 @@ import org.apache.gobblin.state.ConstructState;
  */
 @Slf4j
 public abstract class GobblinBaseOrcWriter<S, D> extends FsDataWriter<D> {
-  public static final String ORC_WRITER_PREFIX = "orcWriter.";
-  public static final String ORC_WRITER_BATCH_SIZE = ORC_WRITER_PREFIX + 
"batchSize";
-  public static final int DEFAULT_ORC_WRITER_BATCH_SIZE = 1000;
-  public static final String ORC_WRITER_AUTO_SELFTUNE_ENABLED = 
ORC_WRITER_PREFIX + "auto.selfTune.enabled";
-  public static final String ORC_WRITER_ESTIMATED_RECORD_SIZE = 
ORC_WRITER_PREFIX + "estimated.recordSize";
-  public static final String ORC_WRITER_AUTO_SELFTUNE_ROWS_BETWEEN_CHECK = 
ORC_WRITER_PREFIX + "auto.selfTune.rowsBetweenCheck";
-  public static final String ORCWRITER_ROWBATCH_MEMORY_USAGE_FACTOR = 
ORC_WRITER_PREFIX + "auto.selfTune.memory.usage.factor";
-  public static final int DEFAULT_ORC_AUTO_SELFTUNE_ROWS_BETWEEN_CHECK = 500;
-  public static final String 
ORC_WRITER_ESTIMATED_BYTES_ALLOCATED_CONVERTER_MEMORY = ORC_WRITER_PREFIX + 
"estimated.bytes.allocated.converter.memory";
-  public static final String ORC_WRITER_CONCURRENT_TASKS = ORC_WRITER_PREFIX + 
"auto.selfTune.concurrent.tasks";
-
-  // This value gives an estimation on how many writers are buffering records 
at the same time in a container.
-  // Since time-based partition scheme is a commonly used practice, plus the 
chances for late-arrival data,
-  // usually there would be 2-3 writers running during the hourly boundary. 3 
is chosen here for being conservative.
-  private static final int CONCURRENT_WRITERS_DEFAULT = 3;
-  public static final double DEFAULT_ORCWRITER_BATCHSIZE_MEMORY_USAGE_FACTOR = 
0.5;
-  public static final int DEFAULT_ORCWRITER_BATCHSIZE_ROWCHECK_FACTOR = 5;
-  // Tune iff the new batch size is 10% different from the current batch size
-  public static final double DEFAULT_ORCWRITER_TUNE_BATCHSIZE_SENSITIVITY = 
0.1;
-  public static final int DEFAULT_MIN_ORCWRITER_ROWCHECK = 150;
-  public static final int DEFAULT_MAX_ORCWRITER_ROWCHECK = 5000;
 
   protected final OrcValueWriter<D> valueWriter;
   @VisibleForTesting
@@ -74,7 +53,6 @@ public abstract class GobblinBaseOrcWriter<S, D> extends 
FsDataWriter<D> {
   protected Writer orcFileWriter;
   private final RowBatchPool rowBatchPool;
   private final boolean enableRowBatchPool;
-  protected long estimatedRecordSizeBytes = -1;
 
   // the close method may be invoked multiple times, but the underlying writer 
only supports close being called once
   protected volatile boolean closed = false;
@@ -91,9 +69,16 @@ public abstract class GobblinBaseOrcWriter<S, D> extends 
FsDataWriter<D> {
   private AtomicInteger recordCounter = new AtomicInteger(0);
   @VisibleForTesting
   long availableMemory = -1;
-  private long orcWriterStripeSizeBytes;
-  private int concurrentWriterTasks;
+  private long currentOrcWriterMaxUnderlyingMemory = -1;
+  private long prevOrcWriterMaxUnderlyingMemory = -1;
+  private int orcFileWriterMaxRowsBetweenCheck;
+  private int orcFileWriterMinRowsBetweenCheck;
   private int orcFileWriterRowsBetweenCheck;
+  private long orcStripeSize;
+  private int maxOrcBatchSize;
+
+  private int concurrentWriterTasks;
+  private long orcWriterStripeSizeBytes;
   // Holds the maximum size of the previous run's maximum buffer or the max of 
the current run's maximum buffer
   private long estimatedBytesAllocatedConverterMemory = -1;
   private OrcConverterMemoryManager converterMemoryManager;
@@ -108,17 +93,27 @@ public abstract class GobblinBaseOrcWriter<S, D> extends 
FsDataWriter<D> {
     this.inputSchema = builder.getSchema();
     this.typeDescription = getOrcSchema();
     this.valueWriter = getOrcValueWriter(typeDescription, this.inputSchema, 
properties);
-    this.selfTuningWriter = 
properties.getPropAsBoolean(ORC_WRITER_AUTO_SELFTUNE_ENABLED, false);
-    this.batchSize = this.selfTuningWriter ? DEFAULT_ORC_WRITER_BATCH_SIZE : 
properties.getPropAsInt(ORC_WRITER_BATCH_SIZE, DEFAULT_ORC_WRITER_BATCH_SIZE);
+    this.selfTuningWriter = 
properties.getPropAsBoolean(GobblinOrcWriterConfigs.ORC_WRITER_AUTO_SELFTUNE_ENABLED,
 false);
+    this.maxOrcBatchSize = 
properties.getPropAsInt(GobblinOrcWriterConfigs.ORC_WRITER_AUTO_SELFTUNE_MAX_BATCH_SIZE,
+        GobblinOrcWriterConfigs.DEFAULT_MAX_ORC_WRITER_BATCH_SIZE);
+    this.batchSize = this.selfTuningWriter ?
+        
properties.getPropAsInt(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_PREVIOUS_BATCH_SIZE,
 GobblinOrcWriterConfigs.DEFAULT_MAX_ORC_WRITER_BATCH_SIZE)
+        : 
properties.getPropAsInt(GobblinOrcWriterConfigs.ORC_WRITER_BATCH_SIZE, 
GobblinOrcWriterConfigs.DEFAULT_ORC_WRITER_BATCH_SIZE);
     this.rowBatchPool = RowBatchPool.instance(properties);
     this.enableRowBatchPool = 
properties.getPropAsBoolean(RowBatchPool.ENABLE_ROW_BATCH_POOL, false);
-    this.selfTuneRowsBetweenCheck = 
properties.getPropAsInt(ORC_WRITER_AUTO_SELFTUNE_ROWS_BETWEEN_CHECK, 
DEFAULT_ORC_AUTO_SELFTUNE_ROWS_BETWEEN_CHECK);
-    this.rowBatchMemoryUsageFactor = 
properties.getPropAsDouble(ORCWRITER_ROWBATCH_MEMORY_USAGE_FACTOR, 
DEFAULT_ORCWRITER_BATCHSIZE_MEMORY_USAGE_FACTOR);
+    this.selfTuneRowsBetweenCheck = 
properties.getPropAsInt(GobblinOrcWriterConfigs.ORC_WRITER_AUTO_SELFTUNE_ROWS_BETWEEN_CHECK,
+        GobblinOrcWriterConfigs.DEFAULT_ORC_AUTO_SELFTUNE_ROWS_BETWEEN_CHECK);
+    this.rowBatchMemoryUsageFactor = 
properties.getPropAsDouble(GobblinOrcWriterConfigs.ORC_WRITER_ROWBATCH_MEMORY_USAGE_FACTOR,
+        
GobblinOrcWriterConfigs.DEFAULT_ORC_WRITER_BATCHSIZE_MEMORY_USAGE_FACTOR);
     this.rowBatch = enableRowBatchPool ? 
rowBatchPool.getRowBatch(typeDescription, batchSize) : 
typeDescription.createRowBatch(batchSize);
     this.converterMemoryManager = new OrcConverterMemoryManager(this.rowBatch);
-    this.orcWriterStripeSizeBytes = 
properties.getPropAsLong(OrcConf.STRIPE_SIZE.getAttribute(), (long) 
OrcConf.STRIPE_SIZE.getDefaultValue());
     // Track the number of other writer tasks from different datasets 
ingesting on the same container
-    this.concurrentWriterTasks = 
properties.getPropAsInt(ORC_WRITER_CONCURRENT_TASKS, 1);
+    this.concurrentWriterTasks = 
properties.getPropAsInt(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_CONCURRENT_TASKS,
 1);
+    this.orcStripeSize = 
properties.getPropAsLong(OrcConf.STRIPE_SIZE.getAttribute(), (long) 
OrcConf.STRIPE_SIZE.getDefaultValue());
+    this.orcFileWriterMinRowsBetweenCheck = 
properties.getPropAsInt(GobblinOrcWriterConfigs.ORC_WRITER_MIN_ROWCHECK,
+        GobblinOrcWriterConfigs.DEFAULT_MIN_ORC_WRITER_ROWCHECK);
+    this.orcFileWriterMaxRowsBetweenCheck = 
properties.getPropAsInt(GobblinOrcWriterConfigs.ORC_WRITER_MAX_ROWCHECK,
+        GobblinOrcWriterConfigs.DEFAULT_MAX_ORC_WRITER_ROWCHECK);
     // Create file-writer
     this.writerConfig = new Configuration();
     // Populate job Configurations into Conf as well so that configurations 
related to ORC writer can be tuned easily.
@@ -133,14 +128,16 @@ public abstract class GobblinBaseOrcWriter<S, D> extends 
FsDataWriter<D> {
     log.info("Available memory for ORC writer: {}", this.availableMemory);
 
     if (this.selfTuningWriter) {
-      if (properties.contains(ORC_WRITER_ESTIMATED_RECORD_SIZE) && 
properties.getPropAsLong(ORC_WRITER_ESTIMATED_RECORD_SIZE) != -1) {
-        this.estimatedRecordSizeBytes = 
properties.getPropAsLong(ORC_WRITER_ESTIMATED_RECORD_SIZE);
-        this.estimatedBytesAllocatedConverterMemory = 
properties.getPropAsLong(ORC_WRITER_ESTIMATED_BYTES_ALLOCATED_CONVERTER_MEMORY, 
-1);
+      if 
(properties.contains(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_ESTIMATED_RECORD_SIZE)
 &&
+          
properties.getPropAsLong(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_ESTIMATED_RECORD_SIZE)
 != -1) {
+        long estimatedRecordSizeBytes = 
properties.getPropAsLong(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_ESTIMATED_RECORD_SIZE);
+        this.estimatedBytesAllocatedConverterMemory = 
properties.getPropAsLong(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_ESTIMATED_BYTES_ALLOCATED_CONVERTER_MEMORY,
 -1);
         this.orcFileWriterRowsBetweenCheck = 
properties.getPropAsInt(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(), (int) 
OrcConf.ROWS_BETWEEN_CHECKS.getDefaultValue());
+        this.prevOrcWriterMaxUnderlyingMemory = 
properties.getPropAsLong(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_NATIVE_WRITER_MEMORY,
 this.orcStripeSize);
         // Use the last run's rows between check value for the underlying file 
size writer, if it exists. Otherwise it will default to 5000
         log.info("Using previously stored properties to calculate new batch 
size, ORC Estimated Record size is : {},"
-                + "estimated bytes converter allocated is : {}, ORC rows 
between check is {}",
-            this.estimatedRecordSizeBytes, 
this.estimatedBytesAllocatedConverterMemory, 
this.orcFileWriterRowsBetweenCheck);
+                + "estimated bytes converter allocated is : {}, ORC rows 
between check is {}, native ORC writer estimated size is {}",
+            estimatedRecordSizeBytes, 
this.estimatedBytesAllocatedConverterMemory, 
this.orcFileWriterRowsBetweenCheck, this.prevOrcWriterMaxUnderlyingMemory);
         this.tuneBatchSize(estimatedRecordSizeBytes);
         log.info("Initialized batch size at {}", this.batchSize);
         this.nextSelfTune = this.selfTuneRowsBetweenCheck;
@@ -148,9 +145,9 @@ public abstract class GobblinBaseOrcWriter<S, D> extends 
FsDataWriter<D> {
         // We will need to incrementally tune the writer based on the first 
few records
         this.nextSelfTune = 5;
         this.initialEstimatingRecordSizePhase = true;
+        this.prevOrcWriterMaxUnderlyingMemory = this.orcStripeSize;
       }
     } else {
-      this.batchSize = properties.getPropAsInt(ORC_WRITER_BATCH_SIZE, 
DEFAULT_ORC_WRITER_BATCH_SIZE);
       log.info("Created ORC writer, batch size: {}, {}: {}",
           this.batchSize, OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(),
           this.writerConfig.get(
@@ -256,9 +253,13 @@ public abstract class GobblinBaseOrcWriter<S, D> extends 
FsDataWriter<D> {
     closeInternal();
     super.commit();
     if (this.selfTuningWriter) {
-      properties.setProp(ORC_WRITER_ESTIMATED_RECORD_SIZE, 
String.valueOf(estimatedRecordSizeBytes));
-      
properties.setProp(ORC_WRITER_ESTIMATED_BYTES_ALLOCATED_CONVERTER_MEMORY, 
String.valueOf(this.converterMemoryManager.getConverterBufferTotalSize()));
+      
properties.setProp(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_ESTIMATED_RECORD_SIZE,
 String.valueOf(getEstimatedRecordSizeBytes()));
+      
properties.setProp(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_ESTIMATED_BYTES_ALLOCATED_CONVERTER_MEMORY,
+          
String.valueOf(this.converterMemoryManager.getConverterBufferTotalSize()));
       properties.setProp(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(), 
String.valueOf(this.orcFileWriterRowsBetweenCheck));
+      
properties.setProp(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_PREVIOUS_BATCH_SIZE,
 this.batchSize);
+      
properties.setProp(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_NATIVE_WRITER_MEMORY,
+          this.currentOrcWriterMaxUnderlyingMemory != -1 ? 
this.currentOrcWriterMaxUnderlyingMemory : orcFileWriter.estimateMemory());
     }
   }
 
@@ -274,32 +275,43 @@ public abstract class GobblinBaseOrcWriter<S, D> extends 
FsDataWriter<D> {
    */
   void tuneBatchSize(long averageSizePerRecord) throws IOException {
     this.estimatedBytesAllocatedConverterMemory = 
Math.max(this.estimatedBytesAllocatedConverterMemory, 
this.converterMemoryManager.getConverterBufferTotalSize());
-    int currentPartitionedWriters = 
this.properties.getPropAsInt(PartitionedDataWriter.CURRENT_PARTITIONED_WRITERS_COUNTER,
 CONCURRENT_WRITERS_DEFAULT);
+    int currentPartitionedWriters = 
this.properties.getPropAsInt(PartitionedDataWriter.CURRENT_PARTITIONED_WRITERS_COUNTER,
+        GobblinOrcWriterConfigs.DEFAULT_CONCURRENT_WRITERS);
     // In the native ORC writer implementation, it will flush the writer if 
the internal memory exceeds the size of a stripe after rows between check
-    // So worst case the most memory the writer can hold is the size of a 
stripe plus size of records * number of records between checks
-    // Note that this is an overestimate as the native ORC file writer should 
have some compression ratio
-    long maxMemoryInFileWriter = averageSizePerRecord * 
this.orcFileWriterRowsBetweenCheck + this.orcWriterStripeSizeBytes;
+    // Use ORC Writer estimation API to get the max memory used by the 
underlying ORC writer, but note that it is an overestimation as it includes 
memory allocated but not used
+    // More details in 
https://lists.apache.org/thread/g6yo7m46mr86ov1vkm9wnmshgw7hcl6b
+    if (this.orcFileWriter != null) {
+      this.currentOrcWriterMaxUnderlyingMemory = 
Math.max(this.currentOrcWriterMaxUnderlyingMemory, 
orcFileWriter.estimateMemory());
+    }
+    long maxMemoryInFileWriter = Math.max(currentOrcWriterMaxUnderlyingMemory, 
prevOrcWriterMaxUnderlyingMemory);
 
     int newBatchSize = (int) ((this.availableMemory*1.0 / 
currentPartitionedWriters * this.rowBatchMemoryUsageFactor - 
maxMemoryInFileWriter
         - this.estimatedBytesAllocatedConverterMemory) / averageSizePerRecord);
     // Handle scenarios where new batch size can be 0 or less due to 
overestimating memory used by other components
-    newBatchSize = Math.min(Math.max(1, newBatchSize), 
DEFAULT_ORC_WRITER_BATCH_SIZE);
-    if (Math.abs(newBatchSize - this.batchSize) > 
DEFAULT_ORCWRITER_TUNE_BATCHSIZE_SENSITIVITY * this.batchSize) {
+    newBatchSize = Math.min(Math.max(1, newBatchSize), this.maxOrcBatchSize);
+    if (Math.abs(newBatchSize - this.batchSize) > 
GobblinOrcWriterConfigs.DEFAULT_ORC_WRITER_TUNE_BATCHSIZE_SENSITIVITY * 
this.batchSize) {
+      // Add a factor when tuning up the batch size to prevent large sudden 
increases in memory usage
+      if (newBatchSize > this.batchSize) {
+        newBatchSize = (newBatchSize - this.batchSize) / 2 + this.batchSize;
+      }
       log.info("Tuning ORC writer batch size from {} to {} based on average 
byte size per record: {} with available memory {} and {} bytes "
-              + "of allocated memory in converter buffers, with {} partitioned 
writers",
+              + "of allocated memory in converter buffers, native orc writer 
estimated memory {}, with {} partitioned writers",
           batchSize, newBatchSize, averageSizePerRecord, availableMemory,
-          estimatedBytesAllocatedConverterMemory, currentPartitionedWriters);
+          estimatedBytesAllocatedConverterMemory, maxMemoryInFileWriter, 
currentPartitionedWriters);
+      this.batchSize = newBatchSize;
       // We need to always flush because ORC VectorizedRowBatch.ensureSize() 
does not provide an option to preserve data, refer to
       // 
https://orc.apache.org/api/hive-storage-api/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.html
       this.flush();
-      this.batchSize = newBatchSize;
       this.rowBatch.ensureSize(this.batchSize);
     }
   }
 
   void initializeOrcFileWriter() {
     try {
-      this.orcFileWriterRowsBetweenCheck = Math.max(Math.min(this.batchSize * 
DEFAULT_ORCWRITER_BATCHSIZE_ROWCHECK_FACTOR, DEFAULT_MAX_ORCWRITER_ROWCHECK), 
DEFAULT_MIN_ORCWRITER_ROWCHECK);
+      this.orcFileWriterRowsBetweenCheck = Math.max(
+          Math.min(this.batchSize * 
GobblinOrcWriterConfigs.DEFAULT_ORC_WRITER_BATCHSIZE_ROWCHECK_FACTOR, 
this.orcFileWriterMaxRowsBetweenCheck),
+          this.orcFileWriterMinRowsBetweenCheck
+      );
       this.writerConfig.set(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(), 
String.valueOf(this.orcFileWriterRowsBetweenCheck));
       log.info("Created ORC writer, batch size: {}, {}: {}",
           this.batchSize, OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(),
@@ -314,22 +326,24 @@ public abstract class GobblinBaseOrcWriter<S, D> extends 
FsDataWriter<D> {
     }
   }
 
+  private long getEstimatedRecordSizeBytes() {
+    long totalBytes = ((GenericRecordToOrcValueWriter) 
valueWriter).getTotalBytesConverted();
+    long totalRecords = ((GenericRecordToOrcValueWriter) 
valueWriter).getTotalRecordsConverted();
+    return totalBytes / totalRecords;
+  }
+
   /*
    * Note: orc.rows.between.memory.checks is the configuration available to 
tune memory-check sensitivity in ORC-Core
    * library. By default it is set to 5000. If the user-application is dealing 
with large-row Kafka topics for example,
    * one should consider lower this value to make memory-check more active.
    */
   @Override
-  public void write(D record)
-      throws IOException {
+  public void write(D record) throws IOException {
     Preconditions.checkState(!closed, "Writer already closed");
     this.valueWriter.write(record, this.rowBatch);
     int recordCount = this.recordCounter.incrementAndGet();
     if (this.selfTuningWriter && recordCount == this.nextSelfTune) {
-      long totalBytes = ((GenericRecordToOrcValueWriter) 
valueWriter).getTotalBytesConverted();
-      long totalRecords = ((GenericRecordToOrcValueWriter) 
valueWriter).getTotalRecordsConverted();
-      this.estimatedRecordSizeBytes = totalRecords == 0 ? 0 : totalBytes / 
totalRecords;
-      this.tuneBatchSize(this.estimatedRecordSizeBytes);
+      this.tuneBatchSize(this.getEstimatedRecordSizeBytes());
       if (this.initialEstimatingRecordSizePhase && 
!initialSelfTuneCheckpoints.isEmpty()) {
         this.nextSelfTune = initialSelfTuneCheckpoints.poll();
       } else {
diff --git 
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterBuilder.java
 
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterBuilder.java
index d626891b2..f0c485037 100644
--- 
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterBuilder.java
+++ 
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterBuilder.java
@@ -30,7 +30,6 @@ import com.google.common.base.Strings;
  * The WriterBuilder extension to create {@link GobblinOrcWriter} on top of 
{@link FsDataWriterBuilder}
  */
 public class GobblinOrcWriterBuilder extends FsDataWriterBuilder<Schema, 
GenericRecord> {
-  public static final String ORC_WRITER_INSTRUMENTED = 
GobblinBaseOrcWriter.ORC_WRITER_PREFIX + "instrumented";
   public GobblinOrcWriterBuilder() {
   }
 
@@ -43,7 +42,7 @@ public class GobblinOrcWriterBuilder extends 
FsDataWriterBuilder<Schema, Generic
 
     switch (this.destination.getType()) {
       case HDFS:
-        if 
(this.destination.getProperties().getPropAsBoolean(ORC_WRITER_INSTRUMENTED, 
false)) {
+        if 
(this.destination.getProperties().getPropAsBoolean(GobblinOrcWriterConfigs.ORC_WRITER_INSTRUMENTED,
 false)) {
           return new InstrumentedGobblinOrcWriter(this, 
this.destination.getProperties());
         }
         return new GobblinOrcWriter(this, this.destination.getProperties());
diff --git 
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterConfigs.java
 
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterConfigs.java
new file mode 100644
index 000000000..50b51ae81
--- /dev/null
+++ 
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterConfigs.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+/**
+ * Configuration keys for the Gobblin ORC Writer
+ */
+public class GobblinOrcWriterConfigs {
+  public static final String ORC_WRITER_PREFIX = "orcWriter.";
+  /**
+   * Default buffer size in the ORC Writer before sending the records to the 
native ORC Writer
+   */
+  public static final String ORC_WRITER_BATCH_SIZE = ORC_WRITER_PREFIX + 
"batchSize";
+  /**
+   * Configuration for enabling Gobblin Avro -> ORC Self tuning writer, 
optimized for Kafka Streaming Ingestion
+   */
+  public static final String ORC_WRITER_AUTO_SELFTUNE_ENABLED = 
ORC_WRITER_PREFIX + "auto.selfTune.enabled";
+  /**
+   * Max buffer size of the Gobblin ORC Writer that it can be tuned to
+   */
+  public static final String ORC_WRITER_AUTO_SELFTUNE_MAX_BATCH_SIZE = 
ORC_WRITER_PREFIX + "auto.selfTune.max.batch.size";
+  /**
+   * How often should the Gobblin ORC Writer check for tuning
+   */
+  public static final String ORC_WRITER_AUTO_SELFTUNE_ROWS_BETWEEN_CHECK = 
ORC_WRITER_PREFIX + "auto.selfTune.rowsBetweenCheck";
+  /**
+   * What percentage of the JVM memory should be reserved for the buffers of 
the Gobblin ORC Writer, approximately
+   */
+  public static final String ORC_WRITER_ROWBATCH_MEMORY_USAGE_FACTOR = 
ORC_WRITER_PREFIX + "auto.selfTune.memory.usage.factor";
+  /**
+   * In the self tuning writer, the minimum buffer size that can be configured 
for the initialization of the native ORC Writer,
+   * size measured in records before checking to flush if buffer memory size 
exceeds stripe size. Note that the Gobblin ORC Writer
+   * will initialize the native ORC Writer just once in its lifecycle to 
prevent multiple small files.
+   */
+  public static final String ORC_WRITER_MIN_ROWCHECK = ORC_WRITER_PREFIX + 
"min.rows.between.memory.checks";
+  /**
+   * In the self tuning writer, the maximum buffer size that can be configured 
for the initialization of the native ORC Writer,
+   * size measured in records before checking to flush if buffer memory size 
exceeds stripe size. Note that the Gobblin ORC Writer
+   * will initialize the native ORC Writer just once in its lifecycle to 
prevent multiple small files.
+   */
+  public static final String ORC_WRITER_MAX_ROWCHECK = ORC_WRITER_PREFIX + 
"max.rows.between.memory.checks";
+
+  public static final String ORC_WRITER_INSTRUMENTED = ORC_WRITER_PREFIX + 
"instrumented";
+
+  public static final int DEFAULT_ORC_WRITER_BATCH_SIZE = 1000;
+  /**
+   *  This value gives an estimation on how many writers are buffering records 
at the same time in a container.
+   *    Since time-based partition scheme is a commonly used practice, plus 
the chances for late-arrival data,
+   *    usually there would be 2-3 writers running during the hourly boundary. 
3 is chosen here for being conservative.
+   */
+  public static final int DEFAULT_CONCURRENT_WRITERS = 3;
+  public static final double DEFAULT_ORC_WRITER_BATCHSIZE_MEMORY_USAGE_FACTOR 
= 0.3;
+  /**
+   * The ratio of native ORC Writer buffer size to Gobblin ORC Writer buffer 
size
+   */
+  public static final int DEFAULT_ORC_WRITER_BATCHSIZE_ROWCHECK_FACTOR = 5;
+  public static final int DEFAULT_MAX_ORC_WRITER_BATCH_SIZE = 
DEFAULT_ORC_WRITER_BATCH_SIZE;
+  public static final int DEFAULT_ORC_AUTO_SELFTUNE_ROWS_BETWEEN_CHECK = 500;
+  /**
+   * Tune iff the new batch size is 10% different from the current batch size
+   */
+  public static final double DEFAULT_ORC_WRITER_TUNE_BATCHSIZE_SENSITIVITY = 
0.1;
+  public static final int DEFAULT_MIN_ORC_WRITER_ROWCHECK = 150;
+  public static final int DEFAULT_MAX_ORC_WRITER_ROWCHECK = 5000;
+
+  public static class RuntimeStateConfigs {
+    public static final String ORC_WRITER_ESTIMATED_RECORD_SIZE = 
ORC_WRITER_PREFIX + "estimated.recordSize";
+    public static final String ORC_WRITER_NATIVE_WRITER_MEMORY = 
ORC_WRITER_PREFIX + "estimated.native.writer.memory";
+    public static final String ORC_WRITER_PREVIOUS_BATCH_SIZE = 
ORC_WRITER_PREFIX + "previous.batch.size";
+    public static final String ORC_WRITER_CONCURRENT_TASKS = ORC_WRITER_PREFIX 
+ "auto.selfTune.concurrent.tasks";
+    public static final String 
ORC_WRITER_ESTIMATED_BYTES_ALLOCATED_CONVERTER_MEMORY = ORC_WRITER_PREFIX + 
"estimated.bytes.allocated.converter.memory";
+  }
+}
diff --git 
a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GobblinOrcWriterTest.java
 
b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GobblinOrcWriterTest.java
index dfee83644..63db334a7 100644
--- 
a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GobblinOrcWriterTest.java
+++ 
b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GobblinOrcWriterTest.java
@@ -158,7 +158,7 @@ public class GobblinOrcWriterTest {
     dummyState.setProp(ConfigurationKeys.WRITER_STAGING_DIR, stagingDir);
     dummyState.setProp(ConfigurationKeys.WRITER_FILE_PATH,  "selfTune");
     dummyState.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, outputDir);
-    dummyState.setProp(GobblinBaseOrcWriter.ORC_WRITER_AUTO_SELFTUNE_ENABLED, 
"true");
+    
dummyState.setProp(GobblinOrcWriterConfigs.ORC_WRITER_AUTO_SELFTUNE_ENABLED, 
"true");
     when(mockBuilder.getFileName(dummyState)).thenReturn("file");
     Path outputFilePath = new Path(outputDir, "selfTune/file");
 
@@ -207,8 +207,8 @@ public class GobblinOrcWriterTest {
     dummyState.setProp(ConfigurationKeys.WRITER_STAGING_DIR, stagingDir);
     dummyState.setProp(ConfigurationKeys.WRITER_FILE_PATH,  "selfTune");
     dummyState.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, outputDir);
-    dummyState.setProp(GobblinBaseOrcWriter.ORC_WRITER_AUTO_SELFTUNE_ENABLED, 
"true");
-    
dummyState.setProp(GobblinBaseOrcWriter.ORC_WRITER_AUTO_SELFTUNE_ROWS_BETWEEN_CHECK,
 "1");
+    
dummyState.setProp(GobblinOrcWriterConfigs.ORC_WRITER_AUTO_SELFTUNE_ENABLED, 
"true");
+    
dummyState.setProp(GobblinOrcWriterConfigs.ORC_WRITER_AUTO_SELFTUNE_ROWS_BETWEEN_CHECK,
 "1");
     when(mockBuilder.getFileName(dummyState)).thenReturn("file");
     Path outputFilePath = new Path(outputDir, "selfTune/file");
 
@@ -258,7 +258,7 @@ public class GobblinOrcWriterTest {
     dummyState.setProp(ConfigurationKeys.WRITER_STAGING_DIR, stagingDir);
     dummyState.setProp(ConfigurationKeys.WRITER_FILE_PATH,  "selfTune");
     dummyState.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, outputDir);
-    dummyState.setProp(GobblinBaseOrcWriter.ORC_WRITER_AUTO_SELFTUNE_ENABLED, 
"true");
+    
dummyState.setProp(GobblinOrcWriterConfigs.ORC_WRITER_AUTO_SELFTUNE_ENABLED, 
"true");
     dummyState.setProp(OrcConf.STRIPE_SIZE.getAttribute(), "100");
     when(mockBuilder.getFileName(dummyState)).thenReturn("file");
 
@@ -271,7 +271,9 @@ public class GobblinOrcWriterTest {
     orcWriter.availableMemory = 100000000;
     // Given the amount of available memory and a low stripe size, and 
estimated rowBatchSize, the resulting batchsize should be maxed out
     orcWriter.tuneBatchSize(10);
-    Assert.assertTrue(orcWriter.batchSize == 
GobblinOrcWriter.DEFAULT_ORC_WRITER_BATCH_SIZE);
+    System.out.println(orcWriter.batchSize);
+    // Take into account that increases in batchsize are multiplied by a 
factor to prevent large jumps in batchsize
+    Assert.assertTrue(orcWriter.batchSize == 
(GobblinOrcWriterConfigs.DEFAULT_ORC_WRITER_BATCH_SIZE+10)/2);
     orcWriter.availableMemory = 100;
     orcWriter.tuneBatchSize(10);
     // Given that the amount of available memory is low, the resulting 
batchsize should be 1
@@ -282,4 +284,42 @@ public class GobblinOrcWriterTest {
     orcWriter.tuneBatchSize(10);
     Assert.assertTrue(orcWriter.batchSize == 1);
   }
+
+  @Test
+  public void testStatePersistenceWhenClosingWriter() throws IOException {
+    Schema schema =
+        new 
Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("orc_writer_test/schema.avsc"));
+    List<GenericRecord> recordList = deserializeAvroRecords(this.getClass(), 
schema, "orc_writer_test/data_multi.json");
+
+    // Mock WriterBuilder, bunch of mocking behaviors to work-around 
precondition checks in writer builder
+    FsDataWriterBuilder<Schema, GenericRecord> mockBuilder =
+        (FsDataWriterBuilder<Schema, GenericRecord>) 
Mockito.mock(FsDataWriterBuilder.class);
+    when(mockBuilder.getSchema()).thenReturn(schema);
+
+    State dummyState = new WorkUnit();
+    String stagingDir = Files.createTempDir().getAbsolutePath();
+    String outputDir = Files.createTempDir().getAbsolutePath();
+    dummyState.setProp(ConfigurationKeys.WRITER_STAGING_DIR, stagingDir);
+    dummyState.setProp(ConfigurationKeys.WRITER_FILE_PATH,  "selfTune");
+    dummyState.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, outputDir);
+    
dummyState.setProp(GobblinOrcWriterConfigs.ORC_WRITER_AUTO_SELFTUNE_ENABLED, 
"true");
+    dummyState.setProp(OrcConf.STRIPE_SIZE.getAttribute(), "100");
+    when(mockBuilder.getFileName(dummyState)).thenReturn("file");
+
+    // Having a closer to manage the life-cycle of the writer object.
+    Closer closer = Closer.create();
+    GobblinOrcWriter orcWriter = closer.register(new 
GobblinOrcWriter(mockBuilder, dummyState));
+    for (GenericRecord record : recordList) {
+      orcWriter.write(record);
+    }
+    // Hard code the batchsize here as tuning batch size is dependent on the 
runtime environment
+    orcWriter.batchSize = 10;
+    orcWriter.commit();
+
+    
Assert.assertEquals(dummyState.getProp(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_ESTIMATED_RECORD_SIZE),
 "9");
+    
Assert.assertEquals(dummyState.getProp(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_PREVIOUS_BATCH_SIZE),
 "10");
+    
Assert.assertEquals(dummyState.getProp(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_ESTIMATED_BYTES_ALLOCATED_CONVERTER_MEMORY),
 "18000");
+    
Assert.assertNotNull(dummyState.getProp(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_NATIVE_WRITER_MEMORY));
+    Assert.assertNotNull(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute());
+  }
 }
\ No newline at end of file
diff --git a/gradle/scripts/dependencyDefinitions.gradle 
b/gradle/scripts/dependencyDefinitions.gradle
index 095aed605..cee0abedb 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -192,9 +192,9 @@ ext.externalDependency = [
     /**
      * Avoiding conflicts with Hive 1.x versions existed in the classpath
      */
-    "orcMapreduce":"org.apache.orc:orc-mapreduce:1.6.8:nohive",
-    "orcCore": "org.apache.orc:orc-core:1.6.8:nohive",
-    "orcTools":"org.apache.orc:orc-tools:1.6.8",
+    "orcMapreduce":"org.apache.orc:orc-mapreduce:1.7.6:nohive",
+    "orcCore": "org.apache.orc:orc-core:1.7.6:nohive",
+    "orcTools":"org.apache.orc:orc-tools:1.7.6",
     'parquet': 'org.apache.parquet:parquet-hadoop:1.11.0',
     'parquetAvro': 'org.apache.parquet:parquet-avro:1.11.0',
     'parquetProto': 'org.apache.parquet:parquet-protobuf:1.11.0',


Reply via email to