[ 
https://issues.apache.org/jira/browse/GOBBLIN-1891?focusedWorklogId=878944&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-878944
 ]

ASF GitHub Bot logged work on GOBBLIN-1891:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 29/Aug/23 23:32
            Start Date: 29/Aug/23 23:32
    Worklog Time Spent: 10m 
      Work Description: Will-Lo commented on code in PR #3751:
URL: https://github.com/apache/gobblin/pull/3751#discussion_r1309449383


##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java:
##########
@@ -183,9 +241,75 @@ public void commit()
       throws IOException {
     closeInternal();
     super.commit();
+    if (this.selfTuningWriter) {
+      properties.setProp(ORC_WRITER_ESTIMATED_RECORD_SIZE, 
String.valueOf(estimatedRecordSizeBytes));
+      
properties.setProp(ORC_WRITER_ESTIMATED_BYTES_ALLOCATED_CONVERTER_MEMORY, 
String.valueOf(this.converterMemoryManager.getConverterBufferTotalSize()));
+      properties.setProp(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(), 
String.valueOf(this.batchSize * calculateOrcFileWriterRowsBetweenCheck()));
+    }
   }
 
   /**
+   * Modifies the size of the writer buffer based on the average size of the 
records written so far, the amount of available memory during initialization, 
and the number of concurrent writers.
+   * The new batch size is calculated as follows:
+   * 1. Memory available = (available memory during startup)/(concurrent 
writers) - (memory used by ORCFile writer)
+   * 2. Average file size, estimated during Avro -> ORC conversion
+   * 3. Estimate of memory used by the converter lists, as during resize the 
internal buffer size can grow large
+   * 4. New batch size = (Memory available - Estimated memory used by 
converter lists) / Average file size * memory buffer
+   * Generally in this writer, the memory the converter uses for large arrays 
is the leading cause of OOM in streaming, along with the records stored in the 
rowBatch
+   * Another potential approach is to also check the memory available before 
resizing the converter lists, and to flush the batch whenever a resize is 
needed.
+   */
+  void tuneBatchSize(long averageSizePerRecord, int 
orcFileWriterRowsBetweenCheck) throws IOException {
+    this.estimatedBytesAllocatedConverterMemory = 
Math.max(this.estimatedBytesAllocatedConverterMemory, 
this.converterMemoryManager.getConverterBufferTotalSize());
+    int currentConcurrentWriters = 
this.properties.getPropAsInt(PartitionedDataWriter.CURRENT_PARTITIONED_WRITERS_COUNTER,
 CONCURRENT_WRITERS_DEFAULT);
+    // In the native ORC writer implementation, it will flush the writer if 
the internal memory exceeds the size of a stripe after rows between check
+    // So worst case the most memory the writer can hold is the size of a 
stripe plus size of records * number of records between checks
+    // Note that this is an overestimate as the native ORC file writer should 
have some compression ratio
+    long maxMemoryInFileWriter = this.estimatedRecordSizeBytes * 
orcFileWriterRowsBetweenCheck + DEFAULT_ORC_WRITER_STRIPE_SIZE;
+
+    int newBatchSize = (int) 
Math.round(((this.availableMemory/currentConcurrentWriters - 
maxMemoryInFileWriter
+        - this.estimatedBytesAllocatedConverterMemory) * 
this.batchSizeMemoryUsageFactor) / averageSizePerRecord);
+    // Handle scenarios where new batch size can be 0 or less due to 
overestimating memory used by other components
+    newBatchSize = Math.min(Math.max(1, newBatchSize), 
DEFAULT_ORC_WRITER_BATCH_SIZE);
+    // TODO: Consider using a more sophisticated check to determine if the 
batch size should be changed
+    if (Math.abs(newBatchSize - this.batchSize) > 0.2 * this.batchSize) {
+      log.info("Tuning ORC writer batch size from {} to {} based on average 
byte size per record: {} with available memory {} and {} bytes of allocated 
memory in converter buffers, with {} partitioned writers",
+          batchSize, newBatchSize, averageSizePerRecord, availableMemory, 
estimatedBytesAllocatedConverterMemory, currentConcurrentWriters);
+      this.batchSize = newBatchSize;
+      // We only initialize the native ORC file writer once to avoid creating 
too many small files, as reconfiguring rows between memory check
+      // requires one to close the writer and start a new file
+      if (this.orcFileWriter == null) {
+        initializeOrcFileWriter();
+      }
+      this.flush();
+      this.rowBatch.ensureSize(this.batchSize);
+    }
+    if (this.orcFileWriter == null) {

Review Comment:
   The issue is that because I don't lazy init the writer during flush, I need 
to initialize the writer during the first tune. I need to initialize before the 
first flush essentially, maybe this makes more sense in the flush function





Issue Time Tracking
-------------------

    Worklog Id:     (was: 878944)
    Time Spent: 20m  (was: 10m)

> Create self-tuning ORC Writer
> -----------------------------
>
>                 Key: GOBBLIN-1891
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1891
>             Project: Apache Gobblin
>          Issue Type: New Feature
>          Components: gobblin-core
>            Reporter: William Lo
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> In Gobblin streaming, the Avro to ORC converter and writer constantly face 
> OOM issues when the record sizes are large due to large arrays or maps.
> Since streaming pipelines are run indefinitely*, static configurations are 
> usually insufficient to handle varying sizes of data, the converter buffers, 
> increases in partitions, etc. This causes pipelines to often stall and make 
> no progress if the incoming data size is increased beyond the memory limits 
> of the container.
> We want to implement a bufferedORCWriter, which utilizes many of the same 
> components as the current ORC Writer, except that the batchSize is adaptable 
> to larger record sizes and takes into the account of the memory available to 
> the JVM to avoid OOM issues as well as the memory the converter uses, and the 
> number of partitioned writers. This should be enabled only by a 
> configuration, and have knobs available so that one can increase the 
> sensitivity and the performance of this writer.
> Future improvements include improving the converter to use up less unused 
> memory every resize, and more accurate estimations done for memory usage in 
> the orc writer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to