This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new a0787aaf6 [GOBBLIN-1891] Create selftuning buffered ORC writer (#3751)
a0787aaf6 is described below

commit a0787aaf6f0c7b4306a799df4a9a4c76ce19e5db
Author: William Lo <[email protected]>
AuthorDate: Fri Sep 1 17:02:52 2023 -0400

    [GOBBLIN-1891] Create selftuning buffered ORC writer (#3751)
    
    * Selftuning ORCwriter with configurations
    
    * Cleanup
    
    * Address reviews and clean up
    
    * More clean up, adds comments, fix bug where tuning was not happening at a 
regular interval
    
    * cleanup
    
    * Fix checkstyle
    
    * renames and log improvements
    
    * Add configs to handle multiple tasks and read correct orc stripe size 
properties, fix some more bugs
    
    * Fix bug in orc converter memory manager where it was miscalculating 
buffer size
    
    * Algorithm improvement/bug fix, log improvements, flush only when 
batchsize decreases
    
    * Fix findbugs and address review
    
    * Comment cleanup
    
    * Add log for startup batchsize
    
    * Change naming of batch size memory factor -> rowbatch memory factor to be 
more accurate to what it's for
    
    * Add memorymanager tests and change algorithm to take into account for 
children byte size
    
    * Fix test to not rely on an implied large batch size
    
    * Add basic test cases for selftuning
    
    * Decrease the size of tests as github actions cannot handle a large array
    
    * Address last review
---
 .../gobblin/writer/PartitionedDataWriter.java      |   3 +-
 .../writer/GenericRecordToOrcValueWriter.java      |  97 ++++++++---
 .../gobblin/writer/GobblinBaseOrcWriter.java       | 182 ++++++++++++++++++---
 .../gobblin/writer/OrcConverterMemoryManager.java  | 100 +++++++++++
 .../writer/GenericRecordToOrcValueWriterTest.java  |  30 +++-
 .../gobblin/writer/GobblinOrcWriterTest.java       | 145 +++++++++++++++-
 .../writer/OrcConverterMemoryManagerTest.java      |  80 +++++++++
 .../converter_memory_manager_nested_test/data.json |  38 +++++
 .../schema.avsc                                    |  37 +++++
 .../test/resources/orc_writer_test/data_multi.json |  16 ++
 10 files changed, 680 insertions(+), 48 deletions(-)

diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
index d5f0e8177..cc0e1d4e7 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
@@ -87,7 +87,7 @@ public class PartitionedDataWriter<S, D> extends 
WriterWrapper<D> implements Fin
   public static final Long DEFAULT_PARTITIONED_WRITER_CACHE_TTL_SECONDS = 
Long.MAX_VALUE;
   public static final String PARTITIONED_WRITER_WRITE_TIMEOUT_SECONDS = 
"partitionedDataWriter.write.timeout.seconds";
   public static final Long DEFAULT_PARTITIONED_WRITER_WRITE_TIMEOUT_SECONDS = 
Long.MAX_VALUE;
-
+  public static final String CURRENT_PARTITIONED_WRITERS_COUNTER = 
"partitionedDataWriter.counter";
   private static final GenericRecord NON_PARTITIONED_WRITER_KEY =
       new 
GenericData.Record(SchemaBuilder.record("Dummy").fields().endRecord());
 
@@ -176,6 +176,7 @@ public class PartitionedDataWriter<S, D> extends 
WriterWrapper<D> implements Fin
                       log.info(String.format("Adding one more writer to 
loading cache of existing writer "
                           + "with size = %d", partitionWriters.size()));
                       Future<DataWriter<D>> future = 
createWriterPool.submit(() -> createPartitionWriter(key));
+                      state.setProp(CURRENT_PARTITIONED_WRITERS_COUNTER, 
partitionWriters.size() + 1);
                       return future.get(writeTimeoutInterval, 
TimeUnit.SECONDS);
                     } catch (ExecutionException | InterruptedException e) {
                       throw new RuntimeException("Error creating writer", e);
diff --git 
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java
 
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java
index 24e203b64..27068258b 100644
--- 
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java
+++ 
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java
@@ -46,6 +46,7 @@ import 
org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.State;
@@ -54,6 +55,8 @@ import org.apache.gobblin.util.orc.AvroOrcSchemaConverter;
 
 /**
  * The converter for buffering rows and forming columnar batch.
+ * Additionally, records the estimated size of the data converted in bytes
+ * TODO: consider using the record size provided by the extractor instead of 
the converter as it may be more available and accurate
  */
 @Slf4j
 public class GenericRecordToOrcValueWriter implements 
OrcValueWriter<GenericRecord> {
@@ -69,6 +72,10 @@ public class GenericRecordToOrcValueWriter implements 
OrcValueWriter<GenericReco
   @VisibleForTesting
   public int resizeCount = 0;
 
+  @Getter
+  long totalBytesConverted = 0;
+  @Getter
+  long totalRecordsConverted = 0;
   /**
    * The interface for the conversion from GenericRecord to ORC's 
ColumnVectors.
    */
@@ -79,8 +86,9 @@ public class GenericRecordToOrcValueWriter implements 
OrcValueWriter<GenericReco
      * @param column either the column number or element number
      * @param data Object which contains the data
      * @param output the ColumnVector to put the value into
+     * @returns the number of elements converted by the converter, for 
tracking and estimation purposes
      */
-    void addValue(int rowId, int column, Object data, ColumnVector output);
+    long addValue(int rowId, int column, Object data, ColumnVector output);
   }
 
   private final Converter[] converters;
@@ -98,11 +106,17 @@ public class GenericRecordToOrcValueWriter implements 
OrcValueWriter<GenericReco
     log.info("enabledSmartSizing: {}, enlargeFactor: {}", enabledSmartSizing, 
enlargeFactor);
   }
 
+  /** Converts a record from the GenericRecord to the ORC ColumnVectors.
+   * Additionally, records the number of bytes converted and the number of 
records converted.
+   * @param value the data value to write.
+   * @param output the VectorizedRowBatch to which the output will be written.
+   * @throws IOException
+   */
   @Override
   public void write(GenericRecord value, VectorizedRowBatch output)
       throws IOException {
-
     int row = output.size++;
+    long bytesConverted = 0;
     for (int c = 0; c < converters.length; ++c) {
       ColumnVector col = output.cols[c];
       if (value.get(c) == null) {
@@ -110,55 +124,72 @@ public class GenericRecordToOrcValueWriter implements 
OrcValueWriter<GenericReco
         col.isNull[row] = true;
       } else {
         col.isNull[row] = false;
-        converters[c].addValue(row, c, value.get(c), col);
+        bytesConverted += converters[c].addValue(row, c, value.get(c), col);
       }
     }
+    this.totalBytesConverted += bytesConverted;
+    this.totalRecordsConverted += 1;
   }
 
+
   static class BooleanConverter implements Converter {
-    public void addValue(int rowId, int column, Object data, ColumnVector 
output) {
+    private static final int MEMORY_SIZE_BYTES = 1;
+    public long addValue(int rowId, int column, Object data, ColumnVector 
output) {
       ((LongColumnVector) output).vector[rowId] = (boolean) data ? 1 : 0;
+      return MEMORY_SIZE_BYTES;
     }
   }
 
   static class ByteConverter implements Converter {
-    public void addValue(int rowId, int column, Object data, ColumnVector 
output) {
+    private static final int MEMORY_SIZE_BYTES = 1;
+    public long addValue(int rowId, int column, Object data, ColumnVector 
output) {
       ((LongColumnVector) output).vector[rowId] = (byte) data;
+      return MEMORY_SIZE_BYTES;
     }
   }
 
   static class ShortConverter implements Converter {
-    public void addValue(int rowId, int column, Object data, ColumnVector 
output) {
+    private static final int MEMORY_SIZE_BYTES = 4;
+    public long addValue(int rowId, int column, Object data, ColumnVector 
output) {
       ((LongColumnVector) output).vector[rowId] = (short) data;
+      return MEMORY_SIZE_BYTES;
     }
   }
 
   static class IntConverter implements Converter {
-    public void addValue(int rowId, int column, Object data, ColumnVector 
output) {
+    private static final int MEMORY_SIZE_BYTES = 4;
+    public long addValue(int rowId, int column, Object data, ColumnVector 
output) {
       ((LongColumnVector) output).vector[rowId] = (int) data;
+      return MEMORY_SIZE_BYTES;
     }
   }
 
   static class LongConverter implements Converter {
-    public void addValue(int rowId, int column, Object data, ColumnVector 
output) {
+    private static final int MEMORY_SIZE_BYTES = 8;
+    public long addValue(int rowId, int column, Object data, ColumnVector 
output) {
       ((LongColumnVector) output).vector[rowId] = (long) data;
+      return MEMORY_SIZE_BYTES;
     }
   }
 
   static class FloatConverter implements Converter {
-    public void addValue(int rowId, int column, Object data, ColumnVector 
output) {
+    private static final int MEMORY_SIZE_BYTES = 4;
+    public long addValue(int rowId, int column, Object data, ColumnVector 
output) {
       ((DoubleColumnVector) output).vector[rowId] = (float) data;
+      return MEMORY_SIZE_BYTES;
     }
   }
 
   static class DoubleConverter implements Converter {
-    public void addValue(int rowId, int column, Object data, ColumnVector 
output) {
+    private static final int MEMORY_SIZE_BYTES = 8;
+    public long addValue(int rowId, int column, Object data, ColumnVector 
output) {
       ((DoubleColumnVector) output).vector[rowId] = (double) data;
+      return MEMORY_SIZE_BYTES;
     }
   }
 
   static class StringConverter implements Converter {
-    public void addValue(int rowId, int column, Object data, ColumnVector 
output) {
+    public long addValue(int rowId, int column, Object data, ColumnVector 
output) {
       final byte[] value;
       if (data instanceof GenericEnumSymbol) {
         value = data.toString().getBytes(StandardCharsets.UTF_8);
@@ -170,11 +201,12 @@ public class GenericRecordToOrcValueWriter implements 
OrcValueWriter<GenericReco
         value = ((String) data).getBytes(StandardCharsets.UTF_8);
       }
       ((BytesColumnVector) output).setRef(rowId, value, 0, value.length);
+      return value.length;
     }
   }
 
   static class BytesConverter implements Converter {
-    public void addValue(int rowId, int column, Object data, ColumnVector 
output) {
+    public long addValue(int rowId, int column, Object data, ColumnVector 
output) {
       final byte[] value;
       if (data instanceof GenericFixed) {
         value = ((GenericFixed) data).bytes();
@@ -184,18 +216,22 @@ public class GenericRecordToOrcValueWriter implements 
OrcValueWriter<GenericReco
         value = (byte[]) data;
       }
       ((BytesColumnVector) output).setRef(rowId, value, 0, value.length);
+      return value.length;
     }
   }
 
   static class DecimalConverter implements Converter {
+    // This is a naive estimation
+    private static final int MEMORY_SIZE_BYTES = 17;
     private final int scale;
 
     public DecimalConverter(int scale) {
       this.scale = scale;
     }
 
-    public void addValue(int rowId, int column, Object data, ColumnVector 
output) {
+    public long addValue(int rowId, int column, Object data, ColumnVector 
output) {
       ((DecimalColumnVector) 
output).vector[rowId].set(getHiveDecimalFromByteBuffer((ByteBuffer) data));
+      return MEMORY_SIZE_BYTES;
     }
 
     /**
@@ -228,19 +264,22 @@ public class GenericRecordToOrcValueWriter implements 
OrcValueWriter<GenericReco
       }
     }
 
-    public void addValue(int rowId, int column, Object data, ColumnVector 
output) {
+    public long addValue(int rowId, int column, Object data, ColumnVector 
output) {
       GenericRecord value = (GenericRecord) data;
       StructColumnVector cv = (StructColumnVector) output;
+      long estimatedBytes = 0;
       for (int c = 0; c < children.length; ++c) {
         ColumnVector field = cv.fields[c];
         if (value.get(c) == null) {
           field.noNulls = false;
           field.isNull[rowId] = true;
+          estimatedBytes += 1;
         } else {
           field.isNull[rowId] = false;
-          children[c].addValue(rowId, c, value.get(c), field);
+          estimatedBytes += children[c].addValue(rowId, c, value.get(c), 
field);
         }
       }
+      return estimatedBytes;
     }
   }
 
@@ -261,22 +300,24 @@ public class GenericRecordToOrcValueWriter implements 
OrcValueWriter<GenericReco
      *             original data type without union wrapper.
      */
     @Override
-    public void addValue(int rowId, int column, Object data, ColumnVector 
output) {
+    public long addValue(int rowId, int column, Object data, ColumnVector 
output) {
       UnionColumnVector cv = (UnionColumnVector) output;
       int tag = (data != null) ? GenericData.get().resolveUnion(unionSchema, 
data) : children.length;
-
+      long estimatedBytes = 0;
       for (int c = 0; c < children.length; ++c) {
         ColumnVector field = cv.fields[c];
         // If c == tag that indicates data must not be null
         if (c == tag) {
           field.isNull[rowId] = false;
           cv.tags[rowId] = c;
-          children[c].addValue(rowId, c, data, field);
+          estimatedBytes += children[c].addValue(rowId, c, data, field);
         } else {
           field.noNulls = false;
           field.isNull[rowId] = true;
+          estimatedBytes += 1;
         }
       }
+      return estimatedBytes;
     }
   }
 
@@ -290,7 +331,7 @@ public class GenericRecordToOrcValueWriter implements 
OrcValueWriter<GenericReco
       rowsAdded = 0;
     }
 
-    public void addValue(int rowId, int column, Object data, ColumnVector 
output) {
+    public long addValue(int rowId, int column, Object data, ColumnVector 
output) {
       rowsAdded += 1;
       List value = (List) data;
       ListColumnVector cv = (ListColumnVector) output;
@@ -299,6 +340,7 @@ public class GenericRecordToOrcValueWriter implements 
OrcValueWriter<GenericReco
       cv.lengths[rowId] = value.size();
       cv.offsets[rowId] = cv.childCount;
       cv.childCount += cv.lengths[rowId];
+
       // make sure the child is big enough
       // If seeing child array being saturated, will need to expand with a 
reasonable amount.
       if (cv.childCount > cv.child.isNull.length) {
@@ -306,18 +348,21 @@ public class GenericRecordToOrcValueWriter implements 
OrcValueWriter<GenericReco
         log.info("Column vector: {}, resizing to: {}, child count: {}", 
cv.child, resizedLength, cv.childCount);
         cv.child.ensureSize(resizedLength, true);
       }
-
+      // Add the size of the empty space of the list
+      long estimatedBytes = 0;
       // Add each element
       for (int e = 0; e < cv.lengths[rowId]; ++e) {
         int offset = (int) (e + cv.offsets[rowId]);
         if (value.get(e) == null) {
           cv.child.noNulls = false;
           cv.child.isNull[offset] = true;
+          estimatedBytes += 1;
         } else {
           cv.child.isNull[offset] = false;
-          children.addValue(offset, e, value.get(e), cv.child);
+          estimatedBytes += children.addValue(offset, e, value.get(e), 
cv.child);
         }
       }
+      return estimatedBytes;
     }
   }
 
@@ -333,7 +378,7 @@ public class GenericRecordToOrcValueWriter implements 
OrcValueWriter<GenericReco
       rowsAdded = 0;
     }
 
-    public void addValue(int rowId, int column, Object data, ColumnVector 
output) {
+    public long addValue(int rowId, int column, Object data, ColumnVector 
output) {
       rowsAdded += 1;
       Map<Object, Object> map = (Map<Object, Object>) data;
       Set<Map.Entry<Object, Object>> entries = map.entrySet();
@@ -353,24 +398,28 @@ public class GenericRecordToOrcValueWriter implements 
OrcValueWriter<GenericReco
       }
       // Add each element
       int e = 0;
+      long estimatedBytes = 0;
       for (Map.Entry entry : entries) {
         int offset = (int) (e + cv.offsets[rowId]);
         if (entry.getKey() == null) {
           cv.keys.noNulls = false;
           cv.keys.isNull[offset] = true;
+          estimatedBytes += 1;
         } else {
           cv.keys.isNull[offset] = false;
-          keyConverter.addValue(offset, e, entry.getKey(), cv.keys);
+          estimatedBytes += keyConverter.addValue(offset, e, entry.getKey(), 
cv.keys);
         }
         if (entry.getValue() == null) {
           cv.values.noNulls = false;
           cv.values.isNull[offset] = true;
+          estimatedBytes += 1;
         } else {
           cv.values.isNull[offset] = false;
-          valueConverter.addValue(offset, e, entry.getValue(), cv.values);
+          estimatedBytes += valueConverter.addValue(offset, e, 
entry.getValue(), cv.values);
         }
         e++;
       }
+      return estimatedBytes;
     }
   }
 
diff --git 
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
 
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
index a7de6a568..6e7a00bed 100644
--- 
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
+++ 
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
@@ -18,7 +18,11 @@
 package org.apache.gobblin.writer;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.LinkedList;
 import java.util.Properties;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.serde2.SerDeException;
@@ -44,21 +48,57 @@ public abstract class GobblinBaseOrcWriter<S, D> extends 
FsDataWriter<D> {
   public static final String ORC_WRITER_PREFIX = "orcWriter.";
   public static final String ORC_WRITER_BATCH_SIZE = ORC_WRITER_PREFIX + 
"batchSize";
   public static final int DEFAULT_ORC_WRITER_BATCH_SIZE = 1000;
+  public static final String ORC_WRITER_AUTO_SELFTUNE_ENABLED = 
ORC_WRITER_PREFIX + "auto.selfTune.enabled";
+  public static final String ORC_WRITER_ESTIMATED_RECORD_SIZE = 
ORC_WRITER_PREFIX + "estimated.recordSize";
+  public static final String ORC_WRITER_AUTO_SELFTUNE_ROWS_BETWEEN_CHECK = 
ORC_WRITER_PREFIX + "auto.selfTune.rowsBetweenCheck";
+  public static final String ORCWRITER_ROWBATCH_MEMORY_USAGE_FACTOR = 
ORC_WRITER_PREFIX + "auto.selfTune.memory.usage.factor";
+  public static final int DEFAULT_ORC_AUTO_SELFTUNE_ROWS_BETWEEN_CHECK = 500;
+  public static final String 
ORC_WRITER_ESTIMATED_BYTES_ALLOCATED_CONVERTER_MEMORY = ORC_WRITER_PREFIX + 
"estimated.bytes.allocated.converter.memory";
+  public static final String ORC_WRITER_CONCURRENT_TASKS = ORC_WRITER_PREFIX + 
"auto.selfTune.concurrent.tasks";
+
+  // This value gives an estimation on how many writers are buffering records 
at the same time in a container.
+  // Since time-based partition scheme is a commonly used practice, plus the 
chances for late-arrival data,
+  // usually there would be 2-3 writers running during the hourly boundary. 3 
is chosen here for being conservative.
+  private static final int CONCURRENT_WRITERS_DEFAULT = 3;
+  public static final double DEFAULT_ORCWRITER_BATCHSIZE_MEMORY_USAGE_FACTOR = 
0.5;
+  public static final int DEFAULT_ORCWRITER_BATCHSIZE_ROWCHECK_FACTOR = 5;
+  // Tune iff the new batch size is 10% different from the current batch size
+  public static final double DEFAULT_ORCWRITER_TUNE_BATCHSIZE_SENSITIVITY = 
0.1;
+  public static final int DEFAULT_MIN_ORCWRITER_ROWCHECK = 150;
+  public static final int DEFAULT_MAX_ORCWRITER_ROWCHECK = 5000;
 
   protected final OrcValueWriter<D> valueWriter;
   @VisibleForTesting
   VectorizedRowBatch rowBatch;
   private final TypeDescription typeDescription;
-  protected final Writer orcFileWriter;
+  protected Writer orcFileWriter;
   private final RowBatchPool rowBatchPool;
   private final boolean enableRowBatchPool;
+  protected long estimatedRecordSizeBytes = -1;
 
   // the close method may be invoked multiple times, but the underlying writer 
only supports close being called once
   protected volatile boolean closed = false;
 
-  protected final int batchSize;
+  protected int batchSize;
   protected final S inputSchema;
 
+  private final boolean selfTuningWriter;
+  private int selfTuneRowsBetweenCheck;
+  private double rowBatchMemoryUsageFactor;
+  private int nextSelfTune;
+  private boolean initialEstimatingRecordSizePhase = false;
+  private Queue<Integer> initialSelfTuneCheckpoints = new 
LinkedList<>(Arrays.asList(10, 100, 500));
+  private AtomicInteger recordCounter = new AtomicInteger(0);
+  @VisibleForTesting
+  long availableMemory = -1;
+  private long orcWriterStripeSizeBytes;
+  private int concurrentWriterTasks;
+  private int orcFileWriterRowsBetweenCheck;
+  // Holds the maximum size of the previous run's maximum buffer or the max of 
the current run's maximum buffer
+  private long estimatedBytesAllocatedConverterMemory = -1;
+  private OrcConverterMemoryManager converterMemoryManager;
+
+  Configuration writerConfig;
 
   public GobblinBaseOrcWriter(FsDataWriterBuilder<S, D> builder, State 
properties)
       throws IOException {
@@ -68,29 +108,56 @@ public abstract class GobblinBaseOrcWriter<S, D> extends 
FsDataWriter<D> {
     this.inputSchema = builder.getSchema();
     this.typeDescription = getOrcSchema();
     this.valueWriter = getOrcValueWriter(typeDescription, this.inputSchema, 
properties);
-    this.batchSize = properties.getPropAsInt(ORC_WRITER_BATCH_SIZE, 
DEFAULT_ORC_WRITER_BATCH_SIZE);
+    this.selfTuningWriter = 
properties.getPropAsBoolean(ORC_WRITER_AUTO_SELFTUNE_ENABLED, false);
+    this.batchSize = this.selfTuningWriter ? DEFAULT_ORC_WRITER_BATCH_SIZE : 
properties.getPropAsInt(ORC_WRITER_BATCH_SIZE, DEFAULT_ORC_WRITER_BATCH_SIZE);
     this.rowBatchPool = RowBatchPool.instance(properties);
     this.enableRowBatchPool = 
properties.getPropAsBoolean(RowBatchPool.ENABLE_ROW_BATCH_POOL, false);
+    this.selfTuneRowsBetweenCheck = 
properties.getPropAsInt(ORC_WRITER_AUTO_SELFTUNE_ROWS_BETWEEN_CHECK, 
DEFAULT_ORC_AUTO_SELFTUNE_ROWS_BETWEEN_CHECK);
+    this.rowBatchMemoryUsageFactor = 
properties.getPropAsDouble(ORCWRITER_ROWBATCH_MEMORY_USAGE_FACTOR, 
DEFAULT_ORCWRITER_BATCHSIZE_MEMORY_USAGE_FACTOR);
     this.rowBatch = enableRowBatchPool ? 
rowBatchPool.getRowBatch(typeDescription, batchSize) : 
typeDescription.createRowBatch(batchSize);
-    log.info("Created ORC writer, batch size: {}, {}: {}",
-            batchSize, OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(),
-            properties.getProp(
-                    OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(),
-                    OrcConf.ROWS_BETWEEN_CHECKS.getDefaultValue().toString()));
-
+    this.converterMemoryManager = new OrcConverterMemoryManager(this.rowBatch);
+    this.orcWriterStripeSizeBytes = 
properties.getPropAsLong(OrcConf.STRIPE_SIZE.getAttribute(), (long) 
OrcConf.STRIPE_SIZE.getDefaultValue());
+    // Track the number of other writer tasks from different datasets 
ingesting on the same container
+    this.concurrentWriterTasks = 
properties.getPropAsInt(ORC_WRITER_CONCURRENT_TASKS, 1);
     // Create file-writer
-    Configuration conf = new Configuration();
+    this.writerConfig = new Configuration();
     // Populate job Configurations into Conf as well so that configurations 
related to ORC writer can be tuned easily.
     for (Object key : properties.getProperties().keySet()) {
-      conf.set((String) key, properties.getProp((String) key));
+      this.writerConfig.set((String) key, properties.getProp((String) key));
     }
-
-    OrcFile.WriterOptions options = 
OrcFile.writerOptions(properties.getProperties(), conf);
+    OrcFile.WriterOptions options = 
OrcFile.writerOptions(properties.getProperties(), this.writerConfig);
     options.setSchema(typeDescription);
 
-    // For buffer-writer, flush has to be executed before close so it is 
better we maintain the life-cycle of fileWriter
-    // instead of delegating it to closer object in FsDataWriter.
-    this.orcFileWriter = OrcFile.createWriter(this.stagingFile, options);
+    // Get the amount of allocated and future space available
+    this.availableMemory = (Runtime.getRuntime().maxMemory() - 
(Runtime.getRuntime().totalMemory() - 
Runtime.getRuntime().freeMemory()))/this.concurrentWriterTasks;
+    log.info("Available memory for ORC writer: {}", this.availableMemory);
+
+    if (this.selfTuningWriter) {
+      if (properties.contains(ORC_WRITER_ESTIMATED_RECORD_SIZE) && 
properties.getPropAsLong(ORC_WRITER_ESTIMATED_RECORD_SIZE) != -1) {
+        this.estimatedRecordSizeBytes = 
properties.getPropAsLong(ORC_WRITER_ESTIMATED_RECORD_SIZE);
+        this.estimatedBytesAllocatedConverterMemory = 
properties.getPropAsLong(ORC_WRITER_ESTIMATED_BYTES_ALLOCATED_CONVERTER_MEMORY, 
-1);
+        this.orcFileWriterRowsBetweenCheck = 
properties.getPropAsInt(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(), (int) 
OrcConf.ROWS_BETWEEN_CHECKS.getDefaultValue());
+        // Use the last run's rows between check value for the underlying file 
size writer, if it exists. Otherwise it will default to 5000
+        log.info("Using previously stored properties to calculate new batch 
size, ORC Estimated Record size is : {},"
+                + "estimated bytes converter allocated is : {}, ORC rows 
between check is {}",
+            this.estimatedRecordSizeBytes, 
this.estimatedBytesAllocatedConverterMemory, 
this.orcFileWriterRowsBetweenCheck);
+        this.tuneBatchSize(estimatedRecordSizeBytes);
+        log.info("Initialized batch size at {}", this.batchSize);
+        this.nextSelfTune = this.selfTuneRowsBetweenCheck;
+      } else {
+        // We will need to incrementally tune the writer based on the first 
few records
+        this.nextSelfTune = 5;
+        this.initialEstimatingRecordSizePhase = true;
+      }
+    } else {
+      this.batchSize = properties.getPropAsInt(ORC_WRITER_BATCH_SIZE, 
DEFAULT_ORC_WRITER_BATCH_SIZE);
+      log.info("Created ORC writer, batch size: {}, {}: {}",
+          this.batchSize, OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(),
+          this.writerConfig.get(
+              OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(),
+              OrcConf.ROWS_BETWEEN_CHECKS.getDefaultValue().toString()));
+      this.orcFileWriter = OrcFile.createWriter(this.stagingFile, options);
+    }
   }
 
   /**
@@ -113,12 +180,12 @@ public abstract class GobblinBaseOrcWriter<S, D> extends 
FsDataWriter<D> {
 
   @Override
   public long recordsWritten() {
-    return this.orcFileWriter.getNumberOfRows();
+    return this.orcFileWriter != null ? this.orcFileWriter.getNumberOfRows(): 
0;
   }
 
   @Override
   public long bytesWritten() {
-    return this.orcFileWriter.getRawDataSize();
+    return this.orcFileWriter != null ? this.orcFileWriter.getRawDataSize() : 
0;
   }
 
   @Override
@@ -141,6 +208,11 @@ public abstract class GobblinBaseOrcWriter<S, D> extends 
FsDataWriter<D> {
   public void flush()
       throws IOException {
     if (rowBatch.size > 0) {
+      // We only initialize the native ORC file writer once to avoid creating 
too many small files, as reconfiguring rows between memory check
+      // requires one to close the writer and start a new file
+      if (this.orcFileWriter == null) {
+        initializeOrcFileWriter();
+      }
       orcFileWriter.addRowBatch(rowBatch);
       rowBatch.reset();
     }
@@ -183,9 +255,66 @@ public abstract class GobblinBaseOrcWriter<S, D> extends 
FsDataWriter<D> {
       throws IOException {
     closeInternal();
     super.commit();
+    if (this.selfTuningWriter) {
+      properties.setProp(ORC_WRITER_ESTIMATED_RECORD_SIZE, 
String.valueOf(estimatedRecordSizeBytes));
+      
properties.setProp(ORC_WRITER_ESTIMATED_BYTES_ALLOCATED_CONVERTER_MEMORY, 
String.valueOf(this.converterMemoryManager.getConverterBufferTotalSize()));
+      properties.setProp(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(), 
String.valueOf(this.orcFileWriterRowsBetweenCheck));
+    }
   }
 
   /**
+   * Modifies the size of the writer buffer based on the average size of the 
records written so far, the amount of available memory during initialization, 
and the number of concurrent writers.
+   * The new batch size is calculated as follows:
+   * 1. Memory available = (available memory during startup)/(concurrent 
writers) - (memory used by ORCFile writer)
+   * 2. Average file size, estimated during Avro -> ORC conversion
+   * 3. Estimate of memory used by the converter lists, as during resize the 
internal buffer size can grow large
+   * 4. New batch size = (Memory available - Estimated memory used by 
converter lists) / Average file size
+   * Generally in this writer, the memory the converter uses for large arrays 
is the leading cause of OOM in streaming, along with the records stored in the 
rowBatch
+   * Another potential approach is to also check the memory available before 
resizing the converter lists, and to flush the batch whenever a resize is 
needed.
+   */
+  void tuneBatchSize(long averageSizePerRecord) throws IOException {
+    this.estimatedBytesAllocatedConverterMemory = 
Math.max(this.estimatedBytesAllocatedConverterMemory, 
this.converterMemoryManager.getConverterBufferTotalSize());
+    int currentPartitionedWriters = 
this.properties.getPropAsInt(PartitionedDataWriter.CURRENT_PARTITIONED_WRITERS_COUNTER,
 CONCURRENT_WRITERS_DEFAULT);
+    // In the native ORC writer implementation, it will flush the writer if 
the internal memory exceeds the size of a stripe after rows between check
+    // So worst case the most memory the writer can hold is the size of a 
stripe plus size of records * number of records between checks
+    // Note that this is an overestimate as the native ORC file writer should 
have some compression ratio
+    long maxMemoryInFileWriter = averageSizePerRecord * 
this.orcFileWriterRowsBetweenCheck + this.orcWriterStripeSizeBytes;
+
+    int newBatchSize = (int) ((this.availableMemory*1.0 / 
currentPartitionedWriters * this.rowBatchMemoryUsageFactor - 
maxMemoryInFileWriter
+        - this.estimatedBytesAllocatedConverterMemory) / averageSizePerRecord);
+    // Handle scenarios where new batch size can be 0 or less due to 
overestimating memory used by other components
+    newBatchSize = Math.min(Math.max(1, newBatchSize), 
DEFAULT_ORC_WRITER_BATCH_SIZE);
+    if (Math.abs(newBatchSize - this.batchSize) > 
DEFAULT_ORCWRITER_TUNE_BATCHSIZE_SENSITIVITY * this.batchSize) {
+      log.info("Tuning ORC writer batch size from {} to {} based on average 
byte size per record: {} with available memory {} and {} bytes "
+              + "of allocated memory in converter buffers, with {} partitioned 
writers",
+          batchSize, newBatchSize, averageSizePerRecord, availableMemory,
+          estimatedBytesAllocatedConverterMemory, currentPartitionedWriters);
+      // We need to always flush because ORC VectorizedRowBatch.ensureSize() 
does not provide an option to preserve data, refer to
+      // 
https://orc.apache.org/api/hive-storage-api/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.html
+      this.flush();
+      this.batchSize = newBatchSize;
+      this.rowBatch.ensureSize(this.batchSize);
+    }
+  }
+
+  void initializeOrcFileWriter() {
+    try {
+      this.orcFileWriterRowsBetweenCheck = Math.max(Math.min(this.batchSize * 
DEFAULT_ORCWRITER_BATCHSIZE_ROWCHECK_FACTOR, DEFAULT_MAX_ORCWRITER_ROWCHECK), 
DEFAULT_MIN_ORCWRITER_ROWCHECK);
+      this.writerConfig.set(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(), 
String.valueOf(this.orcFileWriterRowsBetweenCheck));
+      log.info("Created ORC writer, batch size: {}, {}: {}",
+          this.batchSize, OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(),
+          this.writerConfig.get(
+              OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(),
+              OrcConf.ROWS_BETWEEN_CHECKS.getDefaultValue().toString()));
+      OrcFile.WriterOptions options = 
OrcFile.writerOptions(properties.getProperties(), this.writerConfig);
+      options.setSchema(typeDescription);
+      this.orcFileWriter = OrcFile.createWriter(this.stagingFile, options);
+    } catch (IOException e) {
+      log.error("Failed to flush the current batch", e);
+    }
+  }
+
+  /*
    * Note: orc.rows.between.memory.checks is the configuration available to 
tune memory-check sensitivity in ORC-Core
    * library. By default it is set to 5000. If the user-application is dealing 
with large-row Kafka topics for example,
    * one should consider lower this value to make memory-check more active.
@@ -194,10 +323,21 @@ public abstract class GobblinBaseOrcWriter<S, D> extends 
FsDataWriter<D> {
   public void write(D record)
       throws IOException {
     Preconditions.checkState(!closed, "Writer already closed");
-    valueWriter.write(record, rowBatch);
+    this.valueWriter.write(record, this.rowBatch);
+    int recordCount = this.recordCounter.incrementAndGet();
+    if (this.selfTuningWriter && recordCount == this.nextSelfTune) {
+      long totalBytes = ((GenericRecordToOrcValueWriter) 
valueWriter).getTotalBytesConverted();
+      long totalRecords = ((GenericRecordToOrcValueWriter) 
valueWriter).getTotalRecordsConverted();
+      this.estimatedRecordSizeBytes = totalRecords == 0 ? 0 : totalBytes / 
totalRecords;
+      this.tuneBatchSize(this.estimatedRecordSizeBytes);
+      if (this.initialEstimatingRecordSizePhase && 
!initialSelfTuneCheckpoints.isEmpty()) {
+        this.nextSelfTune = initialSelfTuneCheckpoints.poll();
+      } else {
+        this.nextSelfTune += this.selfTuneRowsBetweenCheck;
+      }
+    }
     if (rowBatch.size == this.batchSize) {
-      orcFileWriter.addRowBatch(rowBatch);
-      rowBatch.reset();
+      this.flush();
     }
   }
 }
diff --git 
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/OrcConverterMemoryManager.java
 
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/OrcConverterMemoryManager.java
new file mode 100644
index 000000000..dcd6250dc
--- /dev/null
+++ 
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/OrcConverterMemoryManager.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
+import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
+import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
+import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
+import org.apache.orc.storage.ql.exec.vector.UnionColumnVector;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+
+
+/**
+ * A helper class to calculate the size of array buffers in a {@link 
VectorizedRowBatch}.
+ * This estimate is mainly based on the maximum size of each variable length 
column, which can be resized
+ * Since the resizing algorithm for each column can balloon, this can affect 
likelihood of OOM
+ */
+public class OrcConverterMemoryManager {
+
+  private VectorizedRowBatch rowBatch;
+
+  // TODO: Consider moving the resize algorithm from the converter to this 
class
+  OrcConverterMemoryManager(VectorizedRowBatch rowBatch) {
+    this.rowBatch = rowBatch;
+  }
+
+  /**
+   * Estimates the approximate size in bytes of elements in a column
+   * This takes into account the default null values of different ORC 
ColumnVectors and approximates their sizes
+   * Although its a rough calculation, it can accurately depict the weight of 
resizes in a column for very large arrays and maps
+   * Which tend to dominate the size of the buffer overall
+   * TODO: Clean this method up considerably and refactor resize logic here
+   * @param col
+   * @return
+   */
+  public long calculateSizeOfColHelper(ColumnVector col) {
+    long converterBufferColSize = 0;
+    if (col instanceof ListColumnVector) {
+      ListColumnVector listColumnVector = (ListColumnVector) col;
+      converterBufferColSize += 
calculateSizeOfColHelper(listColumnVector.child);
+    } else if (col instanceof MapColumnVector) {
+      MapColumnVector mapColumnVector = (MapColumnVector) col;
+      converterBufferColSize += calculateSizeOfColHelper(mapColumnVector.keys);
+      converterBufferColSize += 
calculateSizeOfColHelper(mapColumnVector.values);
+    } else if (col instanceof StructColumnVector) {
+      StructColumnVector structColumnVector = (StructColumnVector) col;
+      for (int j = 0; j < structColumnVector.fields.length; j++) {
+        converterBufferColSize += 
calculateSizeOfColHelper(structColumnVector.fields[j]);
+      }
+    } else if (col instanceof UnionColumnVector) {
+      UnionColumnVector unionColumnVector = (UnionColumnVector) col;
+      for (int j = 0; j < unionColumnVector.fields.length; j++) {
+        converterBufferColSize += 
calculateSizeOfColHelper(unionColumnVector.fields[j]);
+      }
+    } else if (col instanceof LongColumnVector || col instanceof 
DoubleColumnVector || col instanceof DecimalColumnVector) {
+      // Memory space in bytes of native type
+      converterBufferColSize += col.isNull.length * 8;
+    } else if (col instanceof BytesColumnVector) {
+      // Contains two integer list references of size vector for tracking so 
will use that as null size
+      converterBufferColSize += ((BytesColumnVector) col).vector.length * 8;
+    }
+    // Calculate overhead of the column's own null reference
+    converterBufferColSize += col.isNull.length;
+    return converterBufferColSize;
+  }
+
+  /**
+   * Returns the total size of all variable length columns in a {@link 
VectorizedRowBatch}
+   * TODO: Consider calculating this value on the fly everytime a resize is 
called
+   * @return
+   */
+  public long getConverterBufferTotalSize() {
+    long converterBufferTotalSize = 0;
+    ColumnVector[] cols = this.rowBatch.cols;
+    for (int i = 0; i < cols.length; i++) {
+      converterBufferTotalSize += calculateSizeOfColHelper(cols[i]);
+    }
+    return converterBufferTotalSize;
+  }
+
+}
diff --git 
a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriterTest.java
 
b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriterTest.java
index 0f10316af..c22c62827 100644
--- 
a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriterTest.java
+++ 
b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriterTest.java
@@ -27,7 +27,6 @@ import java.util.stream.Collectors;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.gobblin.util.orc.AvroOrcSchemaConverter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -48,6 +47,8 @@ import com.google.common.io.Files;
 
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.util.orc.AvroOrcSchemaConverter;
+
 import static org.apache.orc.mapred.OrcMapredRecordReader.nextValue;
 
 
@@ -172,6 +173,33 @@ public class GenericRecordToOrcValueWriterTest {
     Assert.assertEquals(valueWriter.resizeCount, 2);
   }
 
+  @Test
+  public void testConvertedBytesCalculation()
+      throws Exception {
+    Schema schema =
+        new 
Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("list_map_test/schema.avsc"));
+
+    TypeDescription orcSchema = AvroOrcSchemaConverter.getOrcSchema(schema);
+    GenericRecordToOrcValueWriter valueWriter = new 
GenericRecordToOrcValueWriter(orcSchema, schema);
+    // Make the batch size very small so that the enlarge behavior would 
easily be triggered.
+    // But this has to more than the number of records that we deserialized 
form data.json, as here we don't reset batch.
+    VectorizedRowBatch rowBatch = orcSchema.createRowBatch(10);
+
+    List<GenericRecord> recordList = GobblinOrcWriterTest
+        .deserializeAvroRecords(this.getClass(), schema, 
"list_map_test/data.json");
+    Assert.assertEquals(recordList.size(), 6);
+    for (GenericRecord record : recordList) {
+      valueWriter.write(record, rowBatch);
+    }
+    // We want to add the sum of the sizes of the elements in the list and 
map, as well as any isNull values created by resizing the array
+    long byteSumOfIdList = 4 * 3 * 6;
+    // Sum of keys + values
+    long byteSumOfMaps = 1 * 2 * 6 + 4 * 2 * 6;
+    long expectedBytesConverted = byteSumOfIdList + byteSumOfMaps;
+    Assert.assertEquals(valueWriter.getTotalBytesConverted(), 
expectedBytesConverted);
+    Assert.assertEquals(valueWriter.getTotalRecordsConverted(), 6);
+  }
+
   /**
    * Accessing "fields" using reflection to work-around access modifiers.
    */
diff --git 
a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GobblinOrcWriterTest.java
 
b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GobblinOrcWriterTest.java
index 0b0912cf7..dfee83644 100644
--- 
a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GobblinOrcWriterTest.java
+++ 
b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GobblinOrcWriterTest.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
+import org.apache.orc.OrcConf;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -43,7 +44,7 @@ import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.source.workunit.WorkUnit;
 
 import static 
org.apache.gobblin.writer.GenericRecordToOrcValueWriterTest.deserializeOrcRecords;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.when;
 
 
 /**
@@ -139,4 +140,146 @@ public class GobblinOrcWriterTest {
       Assert.fail();
     }
   }
+
+  @Test
+  public void testSelfTuneRowBatchSizeIncrease() throws Exception {
+    Schema schema =
+        new 
Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("orc_writer_test/schema.avsc"));
+    List<GenericRecord> recordList = deserializeAvroRecords(this.getClass(), 
schema, "orc_writer_test/data_multi.json");
+
+    // Mock WriterBuilder, bunch of mocking behaviors to work-around 
precondition checks in writer builder
+    FsDataWriterBuilder<Schema, GenericRecord> mockBuilder =
+        (FsDataWriterBuilder<Schema, GenericRecord>) 
Mockito.mock(FsDataWriterBuilder.class);
+    when(mockBuilder.getSchema()).thenReturn(schema);
+
+    State dummyState = new WorkUnit();
+    String stagingDir = Files.createTempDir().getAbsolutePath();
+    String outputDir = Files.createTempDir().getAbsolutePath();
+    dummyState.setProp(ConfigurationKeys.WRITER_STAGING_DIR, stagingDir);
+    dummyState.setProp(ConfigurationKeys.WRITER_FILE_PATH,  "selfTune");
+    dummyState.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, outputDir);
+    dummyState.setProp(GobblinBaseOrcWriter.ORC_WRITER_AUTO_SELFTUNE_ENABLED, 
"true");
+    when(mockBuilder.getFileName(dummyState)).thenReturn("file");
+    Path outputFilePath = new Path(outputDir, "selfTune/file");
+
+    // Having a closer to manage the life-cycle of the writer object.
+    Closer closer = Closer.create();
+    GobblinOrcWriter orcWriter = closer.register(new 
GobblinOrcWriter(mockBuilder, dummyState));
+    // Initialize the rowBatch such that it should store all records
+    orcWriter.rowBatch.ensureSize(5);
+    orcWriter.batchSize=5;
+
+    for (GenericRecord record : recordList) {
+      orcWriter.write(record);
+    }
+    // Force the batchSize to increase, lets ensure that the records are not 
lost in the rowBatch
+    orcWriter.tuneBatchSize(1);
+    Assert.assertFalse(orcWriter.batchSize == 5);
+    Assert.assertTrue(orcWriter.rowBatch.size == 0, "Expected the row batch to 
be flushed to preserve data");
+
+    // Not yet flushed in ORC
+    Assert.assertEquals(orcWriter.recordsWritten(), 0);
+
+    orcWriter.commit();
+    Assert.assertEquals(orcWriter.recordsWritten(), 4);
+
+    // Verify ORC file contains correct records.
+    FileSystem fs = FileSystem.getLocal(new Configuration());
+    Assert.assertTrue(fs.exists(outputFilePath));
+    List<Writable> orcRecords = deserializeOrcRecords(outputFilePath, fs);
+    Assert.assertEquals(orcRecords.size(), 4);
+  }
+
+  @Test
+  public void testSelfTuneRowBatchSizeDecrease() throws Exception {
+    Schema schema =
+        new 
Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("orc_writer_test/schema.avsc"));
+    List<GenericRecord> recordList = deserializeAvroRecords(this.getClass(), 
schema, "orc_writer_test/data_multi.json");
+
+    // Mock WriterBuilder, bunch of mocking behaviors to work-around 
precondition checks in writer builder
+    FsDataWriterBuilder<Schema, GenericRecord> mockBuilder =
+        (FsDataWriterBuilder<Schema, GenericRecord>) 
Mockito.mock(FsDataWriterBuilder.class);
+    when(mockBuilder.getSchema()).thenReturn(schema);
+
+    State dummyState = new WorkUnit();
+    String stagingDir = Files.createTempDir().getAbsolutePath();
+    String outputDir = Files.createTempDir().getAbsolutePath();
+    dummyState.setProp(ConfigurationKeys.WRITER_STAGING_DIR, stagingDir);
+    dummyState.setProp(ConfigurationKeys.WRITER_FILE_PATH,  "selfTune");
+    dummyState.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, outputDir);
+    dummyState.setProp(GobblinBaseOrcWriter.ORC_WRITER_AUTO_SELFTUNE_ENABLED, 
"true");
+    
dummyState.setProp(GobblinBaseOrcWriter.ORC_WRITER_AUTO_SELFTUNE_ROWS_BETWEEN_CHECK,
 "1");
+    when(mockBuilder.getFileName(dummyState)).thenReturn("file");
+    Path outputFilePath = new Path(outputDir, "selfTune/file");
+
+    // Having a closer to manage the life-cycle of the writer object.
+    Closer closer = Closer.create();
+    GobblinOrcWriter orcWriter = closer.register(new 
GobblinOrcWriter(mockBuilder, dummyState));
+    // Force a larger initial batchSize that can be tuned down
+    orcWriter.batchSize = 10;
+    orcWriter.rowBatch.ensureSize(10);
+
+    for (GenericRecord record : recordList) {
+      orcWriter.write(record);
+    }
+    // Force the batchSize to decrease
+    orcWriter.tuneBatchSize(1000000000);
+    Assert.assertTrue(orcWriter.batchSize == 1);
+    Assert.assertTrue(orcWriter.rowBatch.size == 0, "Expected the row batch to 
be flushed to preserve data");
+
+    // Not yet flushed in ORC
+    Assert.assertEquals(orcWriter.recordsWritten(), 0);
+
+    orcWriter.commit();
+    Assert.assertEquals(orcWriter.recordsWritten(), 4);
+
+    // Verify ORC file contains correct records.
+    FileSystem fs = FileSystem.getLocal(new Configuration());
+    Assert.assertTrue(fs.exists(outputFilePath));
+    List<Writable> orcRecords = deserializeOrcRecords(outputFilePath, fs);
+    Assert.assertEquals(orcRecords.size(), 4);
+  }
+
+
+  @Test
+  public void testSelfTuneRowBatchCalculation() throws Exception {
+    Schema schema =
+        new 
Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("orc_writer_test/schema.avsc"));
+    List<GenericRecord> recordList = deserializeAvroRecords(this.getClass(), 
schema, "orc_writer_test/data_multi.json");
+
+    // Mock WriterBuilder, bunch of mocking behaviors to work-around 
precondition checks in writer builder
+    FsDataWriterBuilder<Schema, GenericRecord> mockBuilder =
+        (FsDataWriterBuilder<Schema, GenericRecord>) 
Mockito.mock(FsDataWriterBuilder.class);
+    when(mockBuilder.getSchema()).thenReturn(schema);
+
+    State dummyState = new WorkUnit();
+    String stagingDir = Files.createTempDir().getAbsolutePath();
+    String outputDir = Files.createTempDir().getAbsolutePath();
+    dummyState.setProp(ConfigurationKeys.WRITER_STAGING_DIR, stagingDir);
+    dummyState.setProp(ConfigurationKeys.WRITER_FILE_PATH,  "selfTune");
+    dummyState.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, outputDir);
+    dummyState.setProp(GobblinBaseOrcWriter.ORC_WRITER_AUTO_SELFTUNE_ENABLED, 
"true");
+    dummyState.setProp(OrcConf.STRIPE_SIZE.getAttribute(), "100");
+    when(mockBuilder.getFileName(dummyState)).thenReturn("file");
+
+    // Having a closer to manage the life-cycle of the writer object.
+    Closer closer = Closer.create();
+    GobblinOrcWriter orcWriter = closer.register(new 
GobblinOrcWriter(mockBuilder, dummyState));
+    // Force a larger initial batchSize that can be tuned down
+    orcWriter.batchSize = 10;
+    orcWriter.rowBatch.ensureSize(10);
+    orcWriter.availableMemory = 100000000;
+    // Given the amount of available memory and a low stripe size, and 
estimated rowBatchSize, the resulting batchsize should be maxed out
+    orcWriter.tuneBatchSize(10);
+    Assert.assertTrue(orcWriter.batchSize == 
GobblinOrcWriter.DEFAULT_ORC_WRITER_BATCH_SIZE);
+    orcWriter.availableMemory = 100;
+    orcWriter.tuneBatchSize(10);
+    // Given that the amount of available memory is low, the resulting 
batchsize should be 1
+    Assert.assertTrue(orcWriter.batchSize == 1);
+    orcWriter.availableMemory = 10000;
+    orcWriter.rowBatch.ensureSize(10000);
+    // Since the rowBatch is large, the resulting batchsize should still be 1 
even with more memory
+    orcWriter.tuneBatchSize(10);
+    Assert.assertTrue(orcWriter.batchSize == 1);
+  }
 }
\ No newline at end of file
diff --git 
a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/OrcConverterMemoryManagerTest.java
 
b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/OrcConverterMemoryManagerTest.java
new file mode 100644
index 000000000..c04778187
--- /dev/null
+++ 
b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/OrcConverterMemoryManagerTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.util.orc.AvroOrcSchemaConverter;
+
+
+public class OrcConverterMemoryManagerTest {
+
+  @Test
+  public void testBufferSizeCalculationResize()
+      throws Exception {
+    Schema schema =
+        new 
Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("list_map_test/schema.avsc"));
+    TypeDescription orcSchema = AvroOrcSchemaConverter.getOrcSchema(schema);
+    // Make batch size small so that the enlarge behavior would easily be 
triggered.
+    VectorizedRowBatch rowBatch = orcSchema.createRowBatch(10);
+    OrcConverterMemoryManager memoryManager = new 
OrcConverterMemoryManager(rowBatch);
+    GenericRecordToOrcValueWriter valueWriter = new 
GenericRecordToOrcValueWriter(orcSchema, schema);
+
+    List<GenericRecord> recordList = GobblinOrcWriterTest
+        .deserializeAvroRecords(this.getClass(), schema, 
"list_map_test/data.json");
+    Assert.assertEquals(recordList.size(), 6);
+    for (GenericRecord record : recordList) {
+      valueWriter.write(record, rowBatch);
+    }
+    // Expected size is the size of the lists, map keys and map vals after 
resize. Since there are 6 records, and each array/map have at least 2 elements, 
then
+    // One resize is performed when the respective list/maps exceed the 
initial size of 10, in this case 12.
+    // So the resized total length would be 12*3 for the list, map keys and 
map vals, with 8 bytes per value .
+    int expectedSize = 36 * 9 + 36 * 9 + 36 * 9 + 10*2;
+    Assert.assertEquals(memoryManager.getConverterBufferTotalSize(), 
expectedSize);
+  }
+
+  @Test
+  public void testBufferSizeCalculatedDeepNestedList() throws Exception {
+    Schema schema =
+        new 
Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("converter_memory_manager_nested_test/schema.avsc"));
+    TypeDescription orcSchema = AvroOrcSchemaConverter.getOrcSchema(schema);
+    // Make batch such that only deeply nested list is resized
+    VectorizedRowBatch rowBatch = orcSchema.createRowBatch(15);
+    OrcConverterMemoryManager memoryManager = new 
OrcConverterMemoryManager(rowBatch);
+    GenericRecordToOrcValueWriter valueWriter = new 
GenericRecordToOrcValueWriter(orcSchema, schema);
+
+    List<GenericRecord> recordList = GobblinOrcWriterTest
+        .deserializeAvroRecords(this.getClass(), schema, 
"converter_memory_manager_nested_test/data.json");
+    Assert.assertEquals(recordList.size(), 1);
+    for (GenericRecord record : recordList) {
+      valueWriter.write(record, rowBatch);
+    }
+    // Deeply nested list should be resized once, since it resizes at 30 
elements (5+10+15) to 90
+    // Other fields should not be resized, (map keys and vals, and top level 
arrays)
+    // Account for size of top level arrays that should be small
+    int expectedSize = 30*3*9 + 30*9 + 15*4; // Deeply nested list + maps + 
other structure overhead
+    Assert.assertEquals(memoryManager.getConverterBufferTotalSize(), 
expectedSize);
+  }
+}
diff --git 
a/gobblin-modules/gobblin-orc/src/test/resources/converter_memory_manager_nested_test/data.json
 
b/gobblin-modules/gobblin-orc/src/test/resources/converter_memory_manager_nested_test/data.json
new file mode 100644
index 000000000..37105d9fb
--- /dev/null
+++ 
b/gobblin-modules/gobblin-orc/src/test/resources/converter_memory_manager_nested_test/data.json
@@ -0,0 +1,38 @@
+{
+  "parentList": [
+    [
+        {
+          "nestedField1": "A",
+          "nestedField2": "B",
+          "deeplyNestedList": [1,2,3,4,5]
+        },
+        {
+          "nestedField1": "A",
+          "nestedField2": "B",
+          "deeplyNestedList": [1,2,3,4,5,6,7,8,9,10]
+        },
+        {
+          "nestedField1": "A",
+          "nestedField2": "B",
+          "deeplyNestedList": [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15]
+        }
+    ],
+    [
+      {
+        "nestedField1": "A",
+        "nestedField2": "B",
+        "deeplyNestedList": [1,2,3,4,5]
+      },
+      {
+        "nestedField1": "A",
+        "nestedField2": "B",
+        "deeplyNestedList": [1,2,3,4,5,6,7,8,9,10]
+      },
+      {
+        "nestedField1": "A",
+        "nestedField2": "B",
+        "deeplyNestedList": [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15]
+      }
+    ]
+  ]
+}
diff --git 
a/gobblin-modules/gobblin-orc/src/test/resources/converter_memory_manager_nested_test/schema.avsc
 
b/gobblin-modules/gobblin-orc/src/test/resources/converter_memory_manager_nested_test/schema.avsc
new file mode 100644
index 000000000..36a636603
--- /dev/null
+++ 
b/gobblin-modules/gobblin-orc/src/test/resources/converter_memory_manager_nested_test/schema.avsc
@@ -0,0 +1,37 @@
+{
+  "type" : "record",
+  "name" : "parentRecordName",
+  "fields" : [
+    {
+      "name": "parentList",
+      "type": {
+        "type": "array",
+        "items": {
+          "type": "array",
+          "name": "nestedList",
+          "items": {
+            "type": "record",
+            "name": "nestedRecordName",
+            "fields": [
+              {
+                "name": "nestedField1",
+                "type": "string"
+              },
+              {
+                "name": "nestedField2",
+                "type": "string"
+              },
+              {
+                "name": "deeplyNestedList",
+                "type": {
+                  "type": "array",
+                  "items": "int"
+                }
+              }
+            ]
+          }
+        }
+      }
+    }
+  ]
+}
\ No newline at end of file
diff --git 
a/gobblin-modules/gobblin-orc/src/test/resources/orc_writer_test/data_multi.json
 
b/gobblin-modules/gobblin-orc/src/test/resources/orc_writer_test/data_multi.json
new file mode 100644
index 000000000..45e4931e0
--- /dev/null
+++ 
b/gobblin-modules/gobblin-orc/src/test/resources/orc_writer_test/data_multi.json
@@ -0,0 +1,16 @@
+{
+  "id": 1,
+  "name": "Alyssa"
+}
+{
+  "id": 2,
+  "name": "Bob"
+}
+{
+  "id": 3,
+  "name": "Charlie"
+}
+{
+  "id": 4,
+  "name": "Diane"
+}
\ No newline at end of file

Reply via email to