ZihanLi58 commented on code in PR #3787:
URL: https://github.com/apache/gobblin/pull/3787#discussion_r1334726623


##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/OrcConverterMemoryManager.java:
##########
@@ -28,55 +28,107 @@
 import org.apache.orc.storage.ql.exec.vector.UnionColumnVector;
 import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
 
+import com.google.common.base.Preconditions;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
 
 /**
  * A helper class to calculate the size of array buffers in a {@link 
VectorizedRowBatch}.
  * This estimate is mainly based on the maximum size of each variable length 
column, which can be resized
  * Since the resizing algorithm for each column can balloon, this can affect 
likelihood of OOM
  */
+@Slf4j
 public class OrcConverterMemoryManager {
 
+  private static final boolean DEFAULT_ENABLE_SMART_ARRAY_ENLARGE = false;
+  private static final int DEFAULT_ENLARGE_FACTOR = 3;
+  private static final double DEFAULT_SMART_ARRAY_ENLARGE_FACTOR_MAX = 5.0;
+  // Needs to be greater than 1.0
+  private static final double DEFAULT_SMART_ARRAY_ENLARGE_FACTOR_MIN = 1.2;
+  // Given the defaults it will take around 500 records to reach the min 
enlarge factor - given that the default
+  // batch size is 1000 this is a fairly conservative default
+  private static final double DEFAULT_SMART_ARRAY_ENLARGE_DECAY_FACTOR = 0.003;
+
   private VectorizedRowBatch rowBatch;
+  private int resizeCount = 0;
+  private double smartArrayEnlargeFactorMax;
+  private double smartArrayEnlargeFactorMin;
+  private double smartArrayEnlargeDecayFactor;
+  private boolean enabledSmartSizing = false;
+  int enlargeFactor = 0;
 
-  // TODO: Consider moving the resize algorithm from the converter to this 
class
-  OrcConverterMemoryManager(VectorizedRowBatch rowBatch) {
+  OrcConverterMemoryManager(VectorizedRowBatch rowBatch, State state) {
     this.rowBatch = rowBatch;
+    this.enabledSmartSizing = 
state.getPropAsBoolean(GobblinOrcWriterConfigs.ENABLE_SMART_ARRAY_ENLARGE, 
DEFAULT_ENABLE_SMART_ARRAY_ENLARGE);
+    this.enlargeFactor = 
state.getPropAsInt(GobblinOrcWriterConfigs.ENLARGE_FACTOR_KEY, 
DEFAULT_ENLARGE_FACTOR);
+    this.smartArrayEnlargeFactorMax = 
state.getPropAsDouble(GobblinOrcWriterConfigs.SMART_ARRAY_ENLARGE_FACTOR_MAX, 
DEFAULT_SMART_ARRAY_ENLARGE_FACTOR_MAX);
+    this.smartArrayEnlargeFactorMin = 
state.getPropAsDouble(GobblinOrcWriterConfigs.SMART_ARRAY_ENLARGE_FACTOR_MIN, 
DEFAULT_SMART_ARRAY_ENLARGE_FACTOR_MIN);
+    this.smartArrayEnlargeDecayFactor = 
state.getPropAsDouble(GobblinOrcWriterConfigs.SMART_ARRAY_ENLARGE_DECAY_FACTOR, 
DEFAULT_SMART_ARRAY_ENLARGE_DECAY_FACTOR);
+    if (enabledSmartSizing) {
+      Preconditions.checkArgument(this.smartArrayEnlargeFactorMax >= 1,
+          String.format("Smart array enlarge factor needs to be larger than 
1.0, provided value %s", this.smartArrayEnlargeFactorMax));
+      Preconditions.checkArgument(this.smartArrayEnlargeFactorMin >= 1,

Review Comment:
   If it's equal to one, will that ends up with resize does not work as 
expected and for a specific record, we always are not able to hold it and keep 
retry resizing? 



##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/OrcConverterMemoryManager.java:
##########
@@ -97,4 +149,19 @@ public long getConverterBufferTotalSize() {
     return converterBufferTotalSize;
   }
 
+  /**
+   * Resize the child-array size based on configuration.
+   * If smart resizing is enabled, it will use an exponential decay algorithm 
where it would resize the array by a smaller amount
+   * the more records the converter has processed, as the fluctuation in 
record size becomes less likely to differ significantly by then
+   * Since the writer is closed and reset periodically, this is generally a 
safe assumption that should prevent large empty array buffers
+   */
+  public int resize(int rowsAdded, int requestedSize) {
+    resizeCount += 1;
+    log.info(String.format("It has been resized %s times in current writer", 
resizeCount));
+    if (enabledSmartSizing) {
+      double decayingEnlargeFactor =  this.smartArrayEnlargeFactorMax * 
Math.pow((1-this.smartArrayEnlargeDecayFactor), rowsAdded-1);

Review Comment:
   why do we use rowsAdded instead of resizeCount here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to