This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new a392ebd29 [GOBBLIN-1918] Use Exponential Decay for RowBatch resizing  
(#3787)
a392ebd29 is described below

commit a392ebd2988f52f7109dbb95bf9966ecac0d947f
Author: William Lo <[email protected]>
AuthorDate: Mon Sep 25 18:24:21 2023 -0400

    [GOBBLIN-1918] Use Exponential Decay for RowBatch resizing  (#3787)
    
    * WIP
    
    * Add configurable decay alg for buffer resizing orc converter
    
    * Add configs and tests
    
    * Move configs to shared class
    
    * Fix test after moving configs
    
    * Fix more tests
    
    * Fix bug with getting resize counts
---
 .../writer/GenericRecordToOrcValueWriter.java      |  43 +-------
 .../gobblin/writer/GobblinBaseOrcWriter.java       |   9 +-
 .../apache/gobblin/writer/GobblinOrcWriter.java    |   4 +-
 .../gobblin/writer/GobblinOrcWriterConfigs.java    |   9 ++
 .../gobblin/writer/OrcConverterMemoryManager.java  | 121 ++++++++++++++++-----
 .../writer/GenericRecordToOrcValueWriterTest.java  |  15 ++-
 .../writer/OrcConverterMemoryManagerTest.java      |  60 +++++++++-
 7 files changed, 183 insertions(+), 78 deletions(-)

diff --git 
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java
 
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java
index 4e1a49512..5c4c878ee 100644
--- 
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java
+++ 
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java
@@ -44,12 +44,9 @@ import 
org.apache.orc.storage.ql.exec.vector.StructColumnVector;
 import org.apache.orc.storage.ql.exec.vector.UnionColumnVector;
 import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
 
-import com.google.common.annotations.VisibleForTesting;
-
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.util.orc.AvroOrcSchemaConverter;
 
 
@@ -60,17 +57,10 @@ import org.apache.gobblin.util.orc.AvroOrcSchemaConverter;
  */
 @Slf4j
 public class GenericRecordToOrcValueWriter implements 
OrcValueWriter<GenericRecord> {
-  private static final String ENABLE_SMART_ARRAY_ENLARGE = 
GobblinOrcWriterConfigs.ORC_WRITER_PREFIX + 
"enabledMulValueColumnVectorSmartSizing";
-  private static final boolean DEFAULT_ENABLE_SMART_ARRAY_ENLARGE = false;
-  private static final String ENLARGE_FACTOR_KEY = 
GobblinOrcWriterConfigs.ORC_WRITER_PREFIX + "enlargeFactor";
-  private static final int DEFAULT_ENLARGE_FACTOR = 3;
 
   private boolean enabledSmartSizing;
   private int enlargeFactor;
-
-  // A rough measure of how many times resize is triggered, helping on 
debugging and testing.
-  @VisibleForTesting
-  public int resizeCount = 0;
+  private OrcConverterMemoryManager memoryManager;
 
   @Getter
   long totalBytesConverted = 0;
@@ -93,17 +83,9 @@ public class GenericRecordToOrcValueWriter implements 
OrcValueWriter<GenericReco
 
   private final Converter[] converters;
 
-  public GenericRecordToOrcValueWriter(TypeDescription typeDescription, Schema 
avroSchema) {
+  public GenericRecordToOrcValueWriter(TypeDescription typeDescription, Schema 
avroSchema, OrcConverterMemoryManager memoryManager) {
     converters = buildConverters(typeDescription, avroSchema);
-    this.enabledSmartSizing = DEFAULT_ENABLE_SMART_ARRAY_ENLARGE;
-    this.enlargeFactor = DEFAULT_ENLARGE_FACTOR;
-  }
-
-  public GenericRecordToOrcValueWriter(TypeDescription typeDescription, Schema 
avroSchema, State state) {
-    this(typeDescription, avroSchema);
-    this.enabledSmartSizing = 
state.getPropAsBoolean(ENABLE_SMART_ARRAY_ENLARGE, 
DEFAULT_ENABLE_SMART_ARRAY_ENLARGE);
-    this.enlargeFactor = state.getPropAsInt(ENLARGE_FACTOR_KEY, 
DEFAULT_ENLARGE_FACTOR);
-    log.info("enabledSmartSizing: {}, enlargeFactor: {}", enabledSmartSizing, 
enlargeFactor);
+    this.memoryManager = memoryManager;
   }
 
   /** Converts a record from the GenericRecord to the ORC ColumnVectors.
@@ -344,7 +326,7 @@ public class GenericRecordToOrcValueWriter implements 
OrcValueWriter<GenericReco
       // make sure the child is big enough
       // If seeing child array being saturated, will need to expand with a 
reasonable amount.
       if (cv.childCount > cv.child.isNull.length) {
-        int resizedLength = resize(rowsAdded, cv.isNull.length, cv.childCount);
+        int resizedLength = memoryManager.resize(rowsAdded, cv.childCount);
         log.info("Column vector: {}, resizing to: {}, child count: {}", 
cv.child, resizedLength, cv.childCount);
         cv.child.ensureSize(resizedLength, true);
       }
@@ -390,7 +372,7 @@ public class GenericRecordToOrcValueWriter implements 
OrcValueWriter<GenericReco
       cv.childCount += cv.lengths[rowId];
       // make sure the child is big enough
       if (cv.childCount > cv.keys.isNull.length) {
-        int resizedLength = resize(rowsAdded, cv.isNull.length, cv.childCount);
+        int resizedLength = memoryManager.resize(rowsAdded, cv.childCount);
         log.info("Column vector: {}, resizing to: {}, child count: {}", 
cv.keys, resizedLength, cv.childCount);
         cv.keys.ensureSize(resizedLength, true);
         log.info("Column vector: {}, resizing to: {}, child count: {}", 
cv.values, resizedLength, cv.childCount);
@@ -423,19 +405,6 @@ public class GenericRecordToOrcValueWriter implements 
OrcValueWriter<GenericReco
     }
   }
 
-  /**
-   * Resize the child-array size based on configuration.
-   * If smart-sizing is enabled, it will using the avg size of container and 
expand the whole child array to
-   * delta(avgSizeOfContainer * numberOfContainer(batchSize)) the first time 
this is called.
-   * If there's further resize requested, it will add delta again to be 
conservative, but chances of adding delta
-   * for multiple times should be low, unless the container size is 
fluctuating too much.
-   */
-  private int resize(int rowsAdded, int batchSize, int requestedSize) {
-    resizeCount += 1;
-    log.info(String.format("It has been resized %s times in current writer", 
resizeCount));
-    return enabledSmartSizing ? requestedSize + (requestedSize / rowsAdded + 
1) * batchSize : enlargeFactor * requestedSize;
-  }
-
   private Converter buildConverter(TypeDescription schema, Schema avroSchema) {
     switch (schema.getCategory()) {
       case BOOLEAN:
@@ -486,6 +455,6 @@ public class GenericRecordToOrcValueWriter implements 
OrcValueWriter<GenericReco
   }
 
   public int getResizeCount() {
-    return resizeCount;
+    return memoryManager.getResizeCount();
   }
 }
\ No newline at end of file
diff --git 
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
 
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
index 739b84374..d8a2a353e 100644
--- 
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
+++ 
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
@@ -82,7 +82,7 @@ public abstract class GobblinBaseOrcWriter<S, D> extends 
FsDataWriter<D> {
   private long orcWriterStripeSizeBytes;
   // Holds the maximum size of the previous run's maximum buffer or the max of 
the current run's maximum buffer
   private long estimatedBytesAllocatedConverterMemory = -1;
-  private OrcConverterMemoryManager converterMemoryManager;
+  protected OrcConverterMemoryManager converterMemoryManager;
 
   Configuration writerConfig;
 
@@ -93,7 +93,6 @@ public abstract class GobblinBaseOrcWriter<S, D> extends 
FsDataWriter<D> {
     // Create value-writer which is essentially a record-by-record-converter 
with buffering in batch.
     this.inputSchema = builder.getSchema();
     this.typeDescription = getOrcSchema();
-    this.valueWriter = getOrcValueWriter(typeDescription, this.inputSchema, 
properties);
     this.selfTuningWriter = 
properties.getPropAsBoolean(GobblinOrcWriterConfigs.ORC_WRITER_AUTO_SELFTUNE_ENABLED,
 false);
     this.maxOrcBatchSize = 
properties.getPropAsInt(GobblinOrcWriterConfigs.ORC_WRITER_AUTO_SELFTUNE_MAX_BATCH_SIZE,
         GobblinOrcWriterConfigs.DEFAULT_MAX_ORC_WRITER_BATCH_SIZE);
@@ -107,7 +106,10 @@ public abstract class GobblinBaseOrcWriter<S, D> extends 
FsDataWriter<D> {
     this.rowBatchMemoryUsageFactor = 
properties.getPropAsDouble(GobblinOrcWriterConfigs.ORC_WRITER_ROWBATCH_MEMORY_USAGE_FACTOR,
         
GobblinOrcWriterConfigs.DEFAULT_ORC_WRITER_BATCHSIZE_MEMORY_USAGE_FACTOR);
     this.rowBatch = enableRowBatchPool ? 
rowBatchPool.getRowBatch(typeDescription, batchSize) : 
typeDescription.createRowBatch(batchSize);
-    this.converterMemoryManager = new OrcConverterMemoryManager(this.rowBatch);
+    this.orcWriterStripeSizeBytes = 
properties.getPropAsLong(OrcConf.STRIPE_SIZE.getAttribute(), (long) 
OrcConf.STRIPE_SIZE.getDefaultValue());
+    this.converterMemoryManager = new OrcConverterMemoryManager(this.rowBatch, 
properties);
+    this.valueWriter = getOrcValueWriter(typeDescription, this.inputSchema, 
properties);
+
     // Track the number of other writer tasks from different datasets 
ingesting on the same container
     this.concurrentWriterTasks = 
properties.getPropAsInt(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_CONCURRENT_TASKS,
 1);
     this.orcStripeSize = 
properties.getPropAsLong(OrcConf.STRIPE_SIZE.getAttribute(), (long) 
OrcConf.STRIPE_SIZE.getDefaultValue());
@@ -121,7 +123,6 @@ public abstract class GobblinBaseOrcWriter<S, D> extends 
FsDataWriter<D> {
     JobConfigurationUtils.putStateIntoConfiguration(properties, 
this.writerConfig);
     OrcFile.WriterOptions options = 
OrcFile.writerOptions(properties.getProperties(), this.writerConfig);
     options.setSchema(typeDescription);
-
     // Get the amount of allocated and future space available
     this.availableMemory = (Runtime.getRuntime().maxMemory() - 
(Runtime.getRuntime().totalMemory() - 
Runtime.getRuntime().freeMemory()))/this.concurrentWriterTasks;
     log.info("Available memory for ORC writer: {}", this.availableMemory);
diff --git 
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
 
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
index dba1f2fc4..d84125525 100644
--- 
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
+++ 
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
@@ -23,7 +23,6 @@ import java.util.Properties;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.gobblin.util.orc.AvroOrcSchemaConverter;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator;
 import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
@@ -32,6 +31,7 @@ import org.apache.orc.TypeDescription;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.util.orc.AvroOrcSchemaConverter;
 
 /**
  * A wrapper for ORC-core writer without dependency on Hive SerDe library.
@@ -50,7 +50,7 @@ public class GobblinOrcWriter extends 
GobblinBaseOrcWriter<Schema, GenericRecord
   @Override
   protected OrcValueWriter<GenericRecord> getOrcValueWriter(TypeDescription 
typeDescription, Schema inputSchema,
       State state) {
-    return new GenericRecordToOrcValueWriter(typeDescription, 
this.inputSchema, this.properties);
+    return new GenericRecordToOrcValueWriter(typeDescription, 
this.inputSchema, this.converterMemoryManager);
   }
 
   @Override
diff --git 
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterConfigs.java
 
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterConfigs.java
index 50b51ae81..b0b859f93 100644
--- 
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterConfigs.java
+++ 
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterConfigs.java
@@ -78,6 +78,15 @@ public class GobblinOrcWriterConfigs {
   public static final int DEFAULT_MIN_ORC_WRITER_ROWCHECK = 150;
   public static final int DEFAULT_MAX_ORC_WRITER_ROWCHECK = 5000;
 
+  /**
+   * Avro to ORC converter configs
+   */
+  public static final String ENABLE_SMART_ARRAY_ENLARGE = ORC_WRITER_PREFIX + 
"smartArrayEnlargement.enabled";
+  public static final String SMART_ARRAY_ENLARGE_FACTOR_MAX = 
ORC_WRITER_PREFIX + "smartArrayEnlargement.factor.max";
+  public static final String SMART_ARRAY_ENLARGE_FACTOR_MIN = 
ORC_WRITER_PREFIX + "smartArrayEnlargement.factor.min";
+  public static final String SMART_ARRAY_ENLARGE_DECAY_FACTOR = 
ORC_WRITER_PREFIX + "smartArrayEnlargement.factor.decay";
+  public static final String ENLARGE_FACTOR_KEY = ORC_WRITER_PREFIX + 
"enlargeFactor";
+
   public static class RuntimeStateConfigs {
     public static final String ORC_WRITER_ESTIMATED_RECORD_SIZE = 
ORC_WRITER_PREFIX + "estimated.recordSize";
     public static final String ORC_WRITER_NATIVE_WRITER_MEMORY = 
ORC_WRITER_PREFIX + "estimated.native.writer.memory";
diff --git 
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/OrcConverterMemoryManager.java
 
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/OrcConverterMemoryManager.java
index dcd6250dc..7a74f56fd 100644
--- 
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/OrcConverterMemoryManager.java
+++ 
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/OrcConverterMemoryManager.java
@@ -28,19 +28,57 @@ import 
org.apache.orc.storage.ql.exec.vector.StructColumnVector;
 import org.apache.orc.storage.ql.exec.vector.UnionColumnVector;
 import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
 
+import com.google.common.base.Preconditions;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
 
 /**
  * A helper class to calculate the size of array buffers in a {@link 
VectorizedRowBatch}.
  * This estimate is mainly based on the maximum size of each variable length 
column, which can be resized
  * Since the resizing algorithm for each column can balloon, this can affect 
likelihood of OOM
  */
+@Slf4j
 public class OrcConverterMemoryManager {
 
+  private static final boolean DEFAULT_ENABLE_SMART_ARRAY_ENLARGE = false;
+  private static final int DEFAULT_ENLARGE_FACTOR = 3;
+  private static final double DEFAULT_SMART_ARRAY_ENLARGE_FACTOR_MAX = 5.0;
+  // Needs to be greater than 1.0
+  private static final double DEFAULT_SMART_ARRAY_ENLARGE_FACTOR_MIN = 1.2;
+  // Given the defaults it will take around 500 records to reach the min 
enlarge factor - given that the default
+  // batch size is 1000 this is a fairly conservative default
+  private static final double DEFAULT_SMART_ARRAY_ENLARGE_DECAY_FACTOR = 0.003;
+
   private VectorizedRowBatch rowBatch;
+  @Getter
+  private int resizeCount = 0;
+  private double smartArrayEnlargeFactorMax;
+  private double smartArrayEnlargeFactorMin;
+  private double smartArrayEnlargeDecayFactor;
+  private boolean enabledSmartSizing = false;
+  int enlargeFactor = 0;
 
-  // TODO: Consider moving the resize algorithm from the converter to this 
class
-  OrcConverterMemoryManager(VectorizedRowBatch rowBatch) {
+  OrcConverterMemoryManager(VectorizedRowBatch rowBatch, State state) {
     this.rowBatch = rowBatch;
+    this.enabledSmartSizing = 
state.getPropAsBoolean(GobblinOrcWriterConfigs.ENABLE_SMART_ARRAY_ENLARGE, 
DEFAULT_ENABLE_SMART_ARRAY_ENLARGE);
+    this.enlargeFactor = 
state.getPropAsInt(GobblinOrcWriterConfigs.ENLARGE_FACTOR_KEY, 
DEFAULT_ENLARGE_FACTOR);
+    this.smartArrayEnlargeFactorMax = 
state.getPropAsDouble(GobblinOrcWriterConfigs.SMART_ARRAY_ENLARGE_FACTOR_MAX, 
DEFAULT_SMART_ARRAY_ENLARGE_FACTOR_MAX);
+    this.smartArrayEnlargeFactorMin = 
state.getPropAsDouble(GobblinOrcWriterConfigs.SMART_ARRAY_ENLARGE_FACTOR_MIN, 
DEFAULT_SMART_ARRAY_ENLARGE_FACTOR_MIN);
+    this.smartArrayEnlargeDecayFactor = 
state.getPropAsDouble(GobblinOrcWriterConfigs.SMART_ARRAY_ENLARGE_DECAY_FACTOR, 
DEFAULT_SMART_ARRAY_ENLARGE_DECAY_FACTOR);
+    if (enabledSmartSizing) {
+      Preconditions.checkArgument(this.smartArrayEnlargeFactorMax >= 1,
+          String.format("Smart array enlarge factor needs to be larger than 
1.0, provided value %s", this.smartArrayEnlargeFactorMax));
+      Preconditions.checkArgument(this.smartArrayEnlargeFactorMin >= 1,
+          String.format("Smart array enlarge factor needs to be larger than 
1.0, provided value %s", this.smartArrayEnlargeFactorMin));
+      Preconditions.checkArgument(this.smartArrayEnlargeDecayFactor > 0 && 
this.smartArrayEnlargeDecayFactor < 1,
+          String.format("Smart array enlarge decay factor needs to be between 
0 and 1, provided value %s", this.smartArrayEnlargeDecayFactor));
+      log.info("Enabled smart resizing for rowBatch - 
smartArrayEnlargeFactorMax: {}, smartArrayEnlargeFactorMin: {}, 
smartArrayEnlargeDecayFactor: {}",
+          smartArrayEnlargeFactorMax, smartArrayEnlargeFactorMin, 
smartArrayEnlargeDecayFactor);
+    }
+    log.info("Static enlargeFactor for rowBatch: {}", enlargeFactor);
   }
 
   /**
@@ -48,35 +86,51 @@ public class OrcConverterMemoryManager {
    * This takes into account the default null values of different ORC 
ColumnVectors and approximates their sizes
    * Although its a rough calculation, it can accurately depict the weight of 
resizes in a column for very large arrays and maps
    * Which tend to dominate the size of the buffer overall
-   * TODO: Clean this method up considerably and refactor resize logic here
    * @param col
    * @return
    */
   public long calculateSizeOfColHelper(ColumnVector col) {
     long converterBufferColSize = 0;
-    if (col instanceof ListColumnVector) {
-      ListColumnVector listColumnVector = (ListColumnVector) col;
-      converterBufferColSize += 
calculateSizeOfColHelper(listColumnVector.child);
-    } else if (col instanceof MapColumnVector) {
-      MapColumnVector mapColumnVector = (MapColumnVector) col;
-      converterBufferColSize += calculateSizeOfColHelper(mapColumnVector.keys);
-      converterBufferColSize += 
calculateSizeOfColHelper(mapColumnVector.values);
-    } else if (col instanceof StructColumnVector) {
-      StructColumnVector structColumnVector = (StructColumnVector) col;
-      for (int j = 0; j < structColumnVector.fields.length; j++) {
-        converterBufferColSize += 
calculateSizeOfColHelper(structColumnVector.fields[j]);
-      }
-    } else if (col instanceof UnionColumnVector) {
-      UnionColumnVector unionColumnVector = (UnionColumnVector) col;
-      for (int j = 0; j < unionColumnVector.fields.length; j++) {
-        converterBufferColSize += 
calculateSizeOfColHelper(unionColumnVector.fields[j]);
-      }
-    } else if (col instanceof LongColumnVector || col instanceof 
DoubleColumnVector || col instanceof DecimalColumnVector) {
-      // Memory space in bytes of native type
-      converterBufferColSize += col.isNull.length * 8;
-    } else if (col instanceof BytesColumnVector) {
-      // Contains two integer list references of size vector for tracking so 
will use that as null size
-      converterBufferColSize += ((BytesColumnVector) col).vector.length * 8;
+    switch (col.type) {
+      case LIST:
+        ListColumnVector listColumnVector = (ListColumnVector) col;
+        converterBufferColSize += 
calculateSizeOfColHelper(listColumnVector.child);
+        break;
+      case MAP:
+        MapColumnVector mapColumnVector = (MapColumnVector) col;
+        converterBufferColSize += 
calculateSizeOfColHelper(mapColumnVector.keys);
+        converterBufferColSize += 
calculateSizeOfColHelper(mapColumnVector.values);
+        break;
+      case STRUCT:
+        StructColumnVector structColumnVector = (StructColumnVector) col;
+        for (int j = 0; j < structColumnVector.fields.length; j++) {
+          converterBufferColSize += 
calculateSizeOfColHelper(structColumnVector.fields[j]);
+        }
+        break;
+      case UNION:
+        UnionColumnVector unionColumnVector = (UnionColumnVector) col;
+        for (int j = 0; j < unionColumnVector.fields.length; j++) {
+          converterBufferColSize += 
calculateSizeOfColHelper(unionColumnVector.fields[j]);
+        }
+        break;
+      case BYTES:
+        BytesColumnVector bytesColumnVector = (BytesColumnVector) col;
+        converterBufferColSize += bytesColumnVector.vector.length * 8;
+        break;
+      case DECIMAL:
+        DecimalColumnVector decimalColumnVector = (DecimalColumnVector) col;
+        converterBufferColSize += decimalColumnVector.precision + 2;
+        break;
+      case DOUBLE:
+        DoubleColumnVector doubleColumnVector = (DoubleColumnVector) col;
+        converterBufferColSize += doubleColumnVector.vector.length * 8;
+        break;
+      case LONG:
+        LongColumnVector longColumnVector = (LongColumnVector) col;
+        converterBufferColSize += longColumnVector.vector.length * 8;
+        break;
+      default:
+        // Should never reach here given the types used in 
GenericRecordToOrcValueWriter
     }
     // Calculate overhead of the column's own null reference
     converterBufferColSize += col.isNull.length;
@@ -97,4 +151,19 @@ public class OrcConverterMemoryManager {
     return converterBufferTotalSize;
   }
 
+  /**
+   * Resize the child-array size based on configuration.
+   * If smart resizing is enabled, it will use an exponential decay algorithm 
where it would resize the array by a smaller amount
+   * the more records the converter has processed, as the fluctuation in 
record size becomes less likely to differ significantly by then
+   * Since the writer is closed and reset periodically, this is generally a 
safe assumption that should prevent large empty array buffers
+   */
+  public int resize(int rowsAdded, int requestedSize) {
+    resizeCount += 1;
+    log.info(String.format("It has been resized %s times in current writer", 
resizeCount));
+    if (enabledSmartSizing) {
+      double decayingEnlargeFactor =  this.smartArrayEnlargeFactorMax * 
Math.pow((1-this.smartArrayEnlargeDecayFactor), rowsAdded-1);
+      return (int) Math.round(requestedSize * Math.max(decayingEnlargeFactor, 
this.smartArrayEnlargeFactorMin));
+    }
+    return enlargeFactor * requestedSize;
+  }
 }
diff --git 
a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriterTest.java
 
b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriterTest.java
index c22c62827..cfef8f7d0 100644
--- 
a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriterTest.java
+++ 
b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriterTest.java
@@ -47,6 +47,7 @@ import com.google.common.io.Files;
 
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.util.orc.AvroOrcSchemaConverter;
 
 import static org.apache.orc.mapred.OrcMapredRecordReader.nextValue;
@@ -61,8 +62,9 @@ public class GenericRecordToOrcValueWriterTest {
         new 
Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("union_test/schema.avsc"));
 
     TypeDescription orcSchema = AvroOrcSchemaConverter.getOrcSchema(schema);
-    GenericRecordToOrcValueWriter valueWriter = new 
GenericRecordToOrcValueWriter(orcSchema, schema);
     VectorizedRowBatch rowBatch = orcSchema.createRowBatch();
+    OrcConverterMemoryManager memoryManager = new 
OrcConverterMemoryManager(rowBatch, new State());
+    GenericRecordToOrcValueWriter valueWriter = new 
GenericRecordToOrcValueWriter(orcSchema, schema, memoryManager);
 
     List<GenericRecord> recordList = GobblinOrcWriterTest
         .deserializeAvroRecords(this.getClass(), schema, 
"union_test/data.json");
@@ -122,8 +124,9 @@ public class GenericRecordToOrcValueWriterTest {
         new 
Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("decimal_test/schema.avsc"));
 
     TypeDescription orcSchema = AvroOrcSchemaConverter.getOrcSchema(schema);
-    GenericRecordToOrcValueWriter valueWriter = new 
GenericRecordToOrcValueWriter(orcSchema, schema);
     VectorizedRowBatch rowBatch = orcSchema.createRowBatch();
+    OrcConverterMemoryManager memoryManager = new 
OrcConverterMemoryManager(rowBatch, new State());
+    GenericRecordToOrcValueWriter valueWriter = new 
GenericRecordToOrcValueWriter(orcSchema, schema, memoryManager);
 
     List<GenericRecord> recordList = GobblinOrcWriterTest
         .deserializeAvroRecords(this.getClass(), schema, 
"decimal_test/data.json");
@@ -158,10 +161,11 @@ public class GenericRecordToOrcValueWriterTest {
         new 
Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("list_map_test/schema.avsc"));
 
     TypeDescription orcSchema = AvroOrcSchemaConverter.getOrcSchema(schema);
-    GenericRecordToOrcValueWriter valueWriter = new 
GenericRecordToOrcValueWriter(orcSchema, schema);
     // Make the batch size very small so that the enlarge behavior would 
easily be triggered.
     // But this has to more than the number of records that we deserialized 
form data.json, as here we don't reset batch.
     VectorizedRowBatch rowBatch = orcSchema.createRowBatch(10);
+    OrcConverterMemoryManager memoryManager = new 
OrcConverterMemoryManager(rowBatch, new State());
+    GenericRecordToOrcValueWriter valueWriter = new 
GenericRecordToOrcValueWriter(orcSchema, schema, memoryManager);
 
     List<GenericRecord> recordList = GobblinOrcWriterTest
         .deserializeAvroRecords(this.getClass(), schema, 
"list_map_test/data.json");
@@ -170,7 +174,7 @@ public class GenericRecordToOrcValueWriterTest {
       valueWriter.write(record, rowBatch);
     }
     // Examining resize count, which should happen only once for map and list, 
so totally 2.
-    Assert.assertEquals(valueWriter.resizeCount, 2);
+    Assert.assertEquals(valueWriter.getResizeCount(), 2);
   }
 
   @Test
@@ -180,10 +184,11 @@ public class GenericRecordToOrcValueWriterTest {
         new 
Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("list_map_test/schema.avsc"));
 
     TypeDescription orcSchema = AvroOrcSchemaConverter.getOrcSchema(schema);
-    GenericRecordToOrcValueWriter valueWriter = new 
GenericRecordToOrcValueWriter(orcSchema, schema);
     // Make the batch size very small so that the enlarge behavior would 
easily be triggered.
     // But this has to more than the number of records that we deserialized 
form data.json, as here we don't reset batch.
     VectorizedRowBatch rowBatch = orcSchema.createRowBatch(10);
+    OrcConverterMemoryManager memoryManager = new 
OrcConverterMemoryManager(rowBatch, new State());
+    GenericRecordToOrcValueWriter valueWriter = new 
GenericRecordToOrcValueWriter(orcSchema, schema, memoryManager);
 
     List<GenericRecord> recordList = GobblinOrcWriterTest
         .deserializeAvroRecords(this.getClass(), schema, 
"list_map_test/data.json");
diff --git 
a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/OrcConverterMemoryManagerTest.java
 
b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/OrcConverterMemoryManagerTest.java
index c04778187..9775f0765 100644
--- 
a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/OrcConverterMemoryManagerTest.java
+++ 
b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/OrcConverterMemoryManagerTest.java
@@ -26,6 +26,7 @@ import 
org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.util.orc.AvroOrcSchemaConverter;
 
 
@@ -39,8 +40,8 @@ public class OrcConverterMemoryManagerTest {
     TypeDescription orcSchema = AvroOrcSchemaConverter.getOrcSchema(schema);
     // Make batch size small so that the enlarge behavior would easily be 
triggered.
     VectorizedRowBatch rowBatch = orcSchema.createRowBatch(10);
-    OrcConverterMemoryManager memoryManager = new 
OrcConverterMemoryManager(rowBatch);
-    GenericRecordToOrcValueWriter valueWriter = new 
GenericRecordToOrcValueWriter(orcSchema, schema);
+    OrcConverterMemoryManager memoryManager = new 
OrcConverterMemoryManager(rowBatch, new State());
+    GenericRecordToOrcValueWriter valueWriter = new 
GenericRecordToOrcValueWriter(orcSchema, schema, memoryManager);
 
     List<GenericRecord> recordList = GobblinOrcWriterTest
         .deserializeAvroRecords(this.getClass(), schema, 
"list_map_test/data.json");
@@ -62,8 +63,8 @@ public class OrcConverterMemoryManagerTest {
     TypeDescription orcSchema = AvroOrcSchemaConverter.getOrcSchema(schema);
     // Make batch such that only deeply nested list is resized
     VectorizedRowBatch rowBatch = orcSchema.createRowBatch(15);
-    OrcConverterMemoryManager memoryManager = new 
OrcConverterMemoryManager(rowBatch);
-    GenericRecordToOrcValueWriter valueWriter = new 
GenericRecordToOrcValueWriter(orcSchema, schema);
+    OrcConverterMemoryManager memoryManager = new 
OrcConverterMemoryManager(rowBatch, new State());
+    GenericRecordToOrcValueWriter valueWriter = new 
GenericRecordToOrcValueWriter(orcSchema, schema, memoryManager);
 
     List<GenericRecord> recordList = GobblinOrcWriterTest
         .deserializeAvroRecords(this.getClass(), schema, 
"converter_memory_manager_nested_test/data.json");
@@ -77,4 +78,55 @@ public class OrcConverterMemoryManagerTest {
     int expectedSize = 30*3*9 + 30*9 + 15*4; // Deeply nested list + maps + 
other structure overhead
     Assert.assertEquals(memoryManager.getConverterBufferTotalSize(), 
expectedSize);
   }
+
+  @Test
+  public void testBufferSmartResize() throws Exception {
+    Schema schema =
+        new 
Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("converter_memory_manager_nested_test/schema.avsc"));
+    TypeDescription orcSchema = AvroOrcSchemaConverter.getOrcSchema(schema);
+    // Make batch such that only deeply nested list is resized
+    VectorizedRowBatch rowBatch = orcSchema.createRowBatch(15);
+    State memoryManagerState = new State();
+    
memoryManagerState.setProp(GobblinOrcWriterConfigs.ENABLE_SMART_ARRAY_ENLARGE, 
"true");
+    
memoryManagerState.setProp(GobblinOrcWriterConfigs.SMART_ARRAY_ENLARGE_DECAY_FACTOR,
 "0.5");
+    
memoryManagerState.setProp(GobblinOrcWriterConfigs.SMART_ARRAY_ENLARGE_FACTOR_MAX,
 "10");
+    
memoryManagerState.setProp(GobblinOrcWriterConfigs.SMART_ARRAY_ENLARGE_FACTOR_MIN,
 "1");
+    OrcConverterMemoryManager memoryManager = new 
OrcConverterMemoryManager(rowBatch, memoryManagerState);
+
+    int result = memoryManager.resize(1, 1000);
+    Assert.assertEquals(result, 10000);
+
+    // Result is equal to requested size since the decay factor dominates the 
resize
+    result = memoryManager.resize(100, 1000);
+    Assert.assertEquals(result, 1000);
+  }
+
+  @Test
+  public void testBufferSmartResizeParameters() throws Exception {
+    Schema schema =
+        new 
Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("converter_memory_manager_nested_test/schema.avsc"));
+    TypeDescription orcSchema = AvroOrcSchemaConverter.getOrcSchema(schema);
+    // Make batch such that only deeply nested list is resized
+    VectorizedRowBatch rowBatch = orcSchema.createRowBatch(15);
+    State memoryManagerState0 = new State();
+    
memoryManagerState0.setProp(GobblinOrcWriterConfigs.ENABLE_SMART_ARRAY_ENLARGE, 
"true");
+    
memoryManagerState0.setProp(GobblinOrcWriterConfigs.SMART_ARRAY_ENLARGE_DECAY_FACTOR,
 "0.5");
+    
memoryManagerState0.setProp(GobblinOrcWriterConfigs.SMART_ARRAY_ENLARGE_FACTOR_MAX,
 "0");
+    
memoryManagerState0.setProp(GobblinOrcWriterConfigs.SMART_ARRAY_ENLARGE_FACTOR_MIN,
 "1");
+    Assert.assertThrows(IllegalArgumentException.class, () -> new 
OrcConverterMemoryManager(rowBatch, memoryManagerState0));
+
+    State memoryManagerState1 = new State();
+    
memoryManagerState1.setProp(GobblinOrcWriterConfigs.ENABLE_SMART_ARRAY_ENLARGE, 
"true");
+    
memoryManagerState1.setProp(GobblinOrcWriterConfigs.SMART_ARRAY_ENLARGE_DECAY_FACTOR,
 "0.5");
+    
memoryManagerState1.setProp(GobblinOrcWriterConfigs.SMART_ARRAY_ENLARGE_FACTOR_MAX,
 "1");
+    
memoryManagerState1.setProp(GobblinOrcWriterConfigs.SMART_ARRAY_ENLARGE_FACTOR_MIN,
 "0");
+    Assert.assertThrows(IllegalArgumentException.class, () -> new 
OrcConverterMemoryManager(rowBatch, memoryManagerState1));
+
+    State memoryManagerState2 = new State();
+    
memoryManagerState2.setProp(GobblinOrcWriterConfigs.ENABLE_SMART_ARRAY_ENLARGE, 
"true");
+    
memoryManagerState2.setProp(GobblinOrcWriterConfigs.SMART_ARRAY_ENLARGE_DECAY_FACTOR,
 "1.5");
+    
memoryManagerState2.setProp(GobblinOrcWriterConfigs.SMART_ARRAY_ENLARGE_FACTOR_MAX,
 "1");
+    
memoryManagerState2.setProp(GobblinOrcWriterConfigs.SMART_ARRAY_ENLARGE_FACTOR_MIN,
 "1");
+    Assert.assertThrows(IllegalArgumentException.class, () -> new 
OrcConverterMemoryManager(rowBatch, memoryManagerState2));
+  }
 }

Reply via email to