This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b890759a [GOBBLIN-1957] GobblinOrcwriter improvements for large 
records (#3828)
8b890759a is described below

commit 8b890759ac1758b5d18777d61a2e5df528f614ab
Author: William Lo <[email protected]>
AuthorDate: Tue Nov 21 11:17:57 2023 -0500

    [GOBBLIN-1957] GobblinOrcwriter improvements for large records (#3828)
    
    * WIP
    
    * Optimization to limit batchsize based on large record sizes
    
    * Address review
---
 .../gobblin/writer/GobblinBaseOrcWriter.java       | 15 +++++-
 .../gobblin/writer/GobblinOrcWriterConfigs.java    | 16 +++++--
 .../gobblin/writer/GobblinOrcWriterTest.java       | 54 +++++++++++++++++++---
 3 files changed, 73 insertions(+), 12 deletions(-)

diff --git 
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
 
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
index 2ed31b93b..73a53ada9 100644
--- 
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
+++ 
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
@@ -91,6 +91,8 @@ public abstract class GobblinBaseOrcWriter<S, D> extends 
FsDataWriter<D> {
   private int orcFileWriterRowsBetweenCheck;
   private long orcStripeSize;
   private int maxOrcBatchSize;
+  private int batchSizeRowCheckFactor;
+  private boolean enableLimitBufferSizeOrcStripe;
 
   private int concurrentWriterTasks;
   private long orcWriterStripeSizeBytes;
@@ -109,6 +111,7 @@ public abstract class GobblinBaseOrcWriter<S, D> extends 
FsDataWriter<D> {
     this.typeDescription = getOrcSchema();
     this.selfTuningWriter = 
properties.getPropAsBoolean(GobblinOrcWriterConfigs.ORC_WRITER_AUTO_SELFTUNE_ENABLED,
 false);
     this.validateORCAfterClose = 
properties.getPropAsBoolean(GobblinOrcWriterConfigs.ORC_WRITER_VALIDATE_FILE_AFTER_CLOSE,
 false);
+    this.batchSizeRowCheckFactor = 
properties.getPropAsInt(GobblinOrcWriterConfigs.ORC_WRITER_BATCHSIZE_ROWCHECK_FACTOR,
 GobblinOrcWriterConfigs.DEFAULT_ORC_WRITER_BATCHSIZE_ROWCHECK_FACTOR);
     this.maxOrcBatchSize = 
properties.getPropAsInt(GobblinOrcWriterConfigs.ORC_WRITER_AUTO_SELFTUNE_MAX_BATCH_SIZE,
         GobblinOrcWriterConfigs.DEFAULT_MAX_ORC_WRITER_BATCH_SIZE);
     this.batchSize = this.selfTuningWriter ?
@@ -133,6 +136,7 @@ public abstract class GobblinBaseOrcWriter<S, D> extends 
FsDataWriter<D> {
         GobblinOrcWriterConfigs.DEFAULT_MIN_ORC_WRITER_ROWCHECK);
     this.orcFileWriterMaxRowsBetweenCheck = 
properties.getPropAsInt(GobblinOrcWriterConfigs.ORC_WRITER_MAX_ROWCHECK,
         GobblinOrcWriterConfigs.DEFAULT_MAX_ORC_WRITER_ROWCHECK);
+    this.enableLimitBufferSizeOrcStripe = 
properties.getPropAsBoolean(GobblinOrcWriterConfigs.ORC_WRITER_ENABLE_BUFFER_LIMIT_ORC_STRIPE,
 false);
     // Create file-writer
     this.writerConfig = new Configuration();
     // Populate job Configurations into Conf as well so that configurations 
related to ORC writer can be tuned easily.
@@ -312,11 +316,18 @@ public abstract class GobblinBaseOrcWriter<S, D> extends 
FsDataWriter<D> {
       this.currentOrcWriterMaxUnderlyingMemory = 
Math.max(this.currentOrcWriterMaxUnderlyingMemory, 
orcFileWriter.estimateMemory());
     }
     long maxMemoryInFileWriter = Math.max(currentOrcWriterMaxUnderlyingMemory, 
prevOrcWriterMaxUnderlyingMemory);
-
     int newBatchSize = (int) ((this.availableMemory*1.0 / 
currentPartitionedWriters * this.rowBatchMemoryUsageFactor - 
maxMemoryInFileWriter
         - this.estimatedBytesAllocatedConverterMemory) / averageSizePerRecord);
+
+    if (this.enableLimitBufferSizeOrcStripe) {
+      // For large records, prevent the batch size from greatly exceeding the 
size of a stripe as the native ORC Writer will flush its buffer after a stripe 
is filled
+      int maxRecordsPerStripeSize = (int) 
Math.round(this.orcWriterStripeSizeBytes * 1.0 / averageSizePerRecord);
+      this.orcFileWriterMaxRowsBetweenCheck = 
Math.min(this.orcFileWriterMaxRowsBetweenCheck, maxRecordsPerStripeSize);
+      this.maxOrcBatchSize = Math.min(this.maxOrcBatchSize, 
maxRecordsPerStripeSize);
+    }
     // Handle scenarios where new batch size can be 0 or less due to 
overestimating memory used by other components
     newBatchSize = Math.min(Math.max(1, newBatchSize), this.maxOrcBatchSize);
+
     if (Math.abs(newBatchSize - this.batchSize) > 
GobblinOrcWriterConfigs.DEFAULT_ORC_WRITER_TUNE_BATCHSIZE_SENSITIVITY * 
this.batchSize) {
       // Add a factor when tuning up the batch size to prevent large sudden 
increases in memory usage
       if (newBatchSize > this.batchSize) {
@@ -337,7 +348,7 @@ public abstract class GobblinBaseOrcWriter<S, D> extends 
FsDataWriter<D> {
   void initializeOrcFileWriter() {
     try {
       this.orcFileWriterRowsBetweenCheck = Math.max(
-          Math.min(this.batchSize * 
GobblinOrcWriterConfigs.DEFAULT_ORC_WRITER_BATCHSIZE_ROWCHECK_FACTOR, 
this.orcFileWriterMaxRowsBetweenCheck),
+          Math.min(this.batchSize * this.batchSizeRowCheckFactor, 
this.orcFileWriterMaxRowsBetweenCheck),
           this.orcFileWriterMinRowsBetweenCheck
       );
       this.writerConfig.set(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(), 
String.valueOf(this.orcFileWriterRowsBetweenCheck));
diff --git 
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterConfigs.java
 
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterConfigs.java
index 89621d38c..fcd70a78b 100644
--- 
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterConfigs.java
+++ 
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterConfigs.java
@@ -39,6 +39,11 @@ public class GobblinOrcWriterConfigs {
    * Max buffer size of the Gobblin ORC Writer that it can be tuned to
    */
   public static final String ORC_WRITER_AUTO_SELFTUNE_MAX_BATCH_SIZE = 
ORC_WRITER_PREFIX + "auto.selfTune.max.batch.size";
+  /**
+   * The ratio of native ORC Writer buffer size to Gobblin ORC Writer buffer 
size
+   */
+  public static final String ORC_WRITER_BATCHSIZE_ROWCHECK_FACTOR = 
"auto.selfTune.rowCheck.factor";
+
   /**
    * How often should the Gobblin ORC Writer check for tuning
    */
@@ -60,6 +65,12 @@ public class GobblinOrcWriterConfigs {
    */
   public static final String ORC_WRITER_MAX_ROWCHECK = ORC_WRITER_PREFIX + 
"max.rows.between.memory.checks";
 
+  /**
+   * Enable a maximum buffer size of both the native ORC writer and the 
Gobblin ORC writer by the size of a stripe divided by the estimated
+   * size of each record. This is to capture the case when records are 
extremely large and cause large buffer sizes to dominate the memory usage
+   */
+  public static final String ORC_WRITER_ENABLE_BUFFER_LIMIT_ORC_STRIPE = 
ORC_WRITER_PREFIX + "auto.selfTune.max.buffer.orc.stripe";
+
   public static final String ORC_WRITER_INSTRUMENTED = ORC_WRITER_PREFIX + 
"instrumented";
 
   public static final int DEFAULT_ORC_WRITER_BATCH_SIZE = 1000;
@@ -70,10 +81,9 @@ public class GobblinOrcWriterConfigs {
    */
   public static final int DEFAULT_CONCURRENT_WRITERS = 3;
   public static final double DEFAULT_ORC_WRITER_BATCHSIZE_MEMORY_USAGE_FACTOR 
= 0.3;
-  /**
-   * The ratio of native ORC Writer buffer size to Gobblin ORC Writer buffer 
size
-   */
+
   public static final int DEFAULT_ORC_WRITER_BATCHSIZE_ROWCHECK_FACTOR = 5;
+
   public static final int DEFAULT_MAX_ORC_WRITER_BATCH_SIZE = 
DEFAULT_ORC_WRITER_BATCH_SIZE;
   public static final int DEFAULT_ORC_AUTO_SELFTUNE_ROWS_BETWEEN_CHECK = 500;
   /**
diff --git 
a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GobblinOrcWriterTest.java
 
b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GobblinOrcWriterTest.java
index 63db334a7..ea24ef350 100644
--- 
a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GobblinOrcWriterTest.java
+++ 
b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GobblinOrcWriterTest.java
@@ -268,21 +268,23 @@ public class GobblinOrcWriterTest {
     // Force a larger initial batchSize that can be tuned down
     orcWriter.batchSize = 10;
     orcWriter.rowBatch.ensureSize(10);
+    // Given that the available memory is very high, the resulting batchsize 
should be maxed out
     orcWriter.availableMemory = 100000000;
-    // Given the amount of available memory and a low stripe size, and 
estimated rowBatchSize, the resulting batchsize should be maxed out
+    // Consider that the batch size incrementally increases based on the 
difference between target and current batchsize (10)
     orcWriter.tuneBatchSize(10);
-    System.out.println(orcWriter.batchSize);
-    // Take into account that increases in batchsize are multiplied by a 
factor to prevent large jumps in batchsize
-    Assert.assertTrue(orcWriter.batchSize == 
(GobblinOrcWriterConfigs.DEFAULT_ORC_WRITER_BATCH_SIZE+10)/2);
+    Assert.assertEquals(orcWriter.batchSize, 505);
+    orcWriter.tuneBatchSize(10);
+    Assert.assertEquals(orcWriter.batchSize, 752);
+
     orcWriter.availableMemory = 100;
     orcWriter.tuneBatchSize(10);
     // Given that the amount of available memory is low, the resulting 
batchsize should be 1
-    Assert.assertTrue(orcWriter.batchSize == 1);
+    Assert.assertEquals(orcWriter.batchSize,1);
     orcWriter.availableMemory = 10000;
     orcWriter.rowBatch.ensureSize(10000);
     // Since the rowBatch is large, the resulting batchsize should still be 1 
even with more memory
     orcWriter.tuneBatchSize(10);
-    Assert.assertTrue(orcWriter.batchSize == 1);
+    Assert.assertEquals(orcWriter.batchSize, 1);
   }
 
   @Test
@@ -322,4 +324,42 @@ public class GobblinOrcWriterTest {
     
Assert.assertNotNull(dummyState.getProp(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_NATIVE_WRITER_MEMORY));
     Assert.assertNotNull(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute());
   }
-}
\ No newline at end of file
+
+  @Test
+  public void testSelfTuneRowBatchCalculationWithStripeMax() throws Exception {
+    Schema schema =
+        new 
Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("orc_writer_test/schema.avsc"));
+    List<GenericRecord> recordList = deserializeAvroRecords(this.getClass(), 
schema, "orc_writer_test/data_multi.json");
+
+    // Mock WriterBuilder, bunch of mocking behaviors to work-around 
precondition checks in writer builder
+    FsDataWriterBuilder<Schema, GenericRecord> mockBuilder =
+        (FsDataWriterBuilder<Schema, GenericRecord>) 
Mockito.mock(FsDataWriterBuilder.class);
+    when(mockBuilder.getSchema()).thenReturn(schema);
+
+    State dummyState = new WorkUnit();
+    String stagingDir = Files.createTempDir().getAbsolutePath();
+    String outputDir = Files.createTempDir().getAbsolutePath();
+    dummyState.setProp(ConfigurationKeys.WRITER_STAGING_DIR, stagingDir);
+    dummyState.setProp(ConfigurationKeys.WRITER_FILE_PATH,  "selfTune");
+    dummyState.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, outputDir);
+    
dummyState.setProp(GobblinOrcWriterConfigs.ORC_WRITER_AUTO_SELFTUNE_ENABLED, 
"true");
+    dummyState.setProp(OrcConf.STRIPE_SIZE.getAttribute(), "100");
+    
dummyState.setProp(GobblinOrcWriterConfigs.ORC_WRITER_ENABLE_BUFFER_LIMIT_ORC_STRIPE,
 "true");
+    when(mockBuilder.getFileName(dummyState)).thenReturn("file");
+
+    // Having a closer to manage the life-cycle of the writer object.
+    Closer closer = Closer.create();
+    GobblinOrcWriter orcWriter = closer.register(new 
GobblinOrcWriter(mockBuilder, dummyState));
+    // Force a larger initial batchSize that can be tuned down
+    orcWriter.batchSize = 10;
+    orcWriter.rowBatch.ensureSize(10);
+    orcWriter.availableMemory = 100000000;
+    // Since the stripe size is 100, the resulting batchsize should be 10 
(100/10)
+    orcWriter.tuneBatchSize(10);
+    Assert.assertEquals(orcWriter.batchSize,10);
+
+    // Increasing the estimated record size should decrease the max batch size
+    orcWriter.tuneBatchSize(100);
+    Assert.assertEquals(orcWriter.batchSize,1);
+  }
+}

Reply via email to