Will-Lo commented on code in PR #3751:
URL: https://github.com/apache/gobblin/pull/3751#discussion_r1309449383


##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java:
##########
@@ -183,9 +241,75 @@ public void commit()
       throws IOException {
     closeInternal();
     super.commit();
+    if (this.selfTuningWriter) {
+      properties.setProp(ORC_WRITER_ESTIMATED_RECORD_SIZE, 
String.valueOf(estimatedRecordSizeBytes));
+      
properties.setProp(ORC_WRITER_ESTIMATED_BYTES_ALLOCATED_CONVERTER_MEMORY, 
String.valueOf(this.converterMemoryManager.getConverterBufferTotalSize()));
+      properties.setProp(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(), 
String.valueOf(this.batchSize * calculateOrcFileWriterRowsBetweenCheck()));
+    }
   }
 
   /**
+   * Modifies the size of the writer buffer based on the average size of the 
records written so far, the amount of available memory during initialization, 
and the number of concurrent writers.
+   * The new batch size is calculated as follows:
+   * 1. Memory available = (available memory during startup)/(concurrent 
writers) - (memory used by ORCFile writer)
+   * 2. Average file size, estimated during Avro -> ORC conversion
+   * 3. Estimate of memory used by the converter lists, as during resize the 
internal buffer size can grow large
+   * 4. New batch size = (Memory available - Estimated memory used by 
converter lists) / Average file size * memory buffer
+   * Generally in this writer, the memory the converter uses for large arrays 
is the leading cause of OOM in streaming, along with the records stored in the 
rowBatch
+   * Another potential approach is to also check the memory available before 
resizing the converter lists, and to flush the batch whenever a resize is 
needed.
+   */
+  void tuneBatchSize(long averageSizePerRecord, int 
orcFileWriterRowsBetweenCheck) throws IOException {
+    this.estimatedBytesAllocatedConverterMemory = 
Math.max(this.estimatedBytesAllocatedConverterMemory, 
this.converterMemoryManager.getConverterBufferTotalSize());
+    int currentConcurrentWriters = 
this.properties.getPropAsInt(PartitionedDataWriter.CURRENT_PARTITIONED_WRITERS_COUNTER,
 CONCURRENT_WRITERS_DEFAULT);
+    // In the native ORC writer implementation, it will flush the writer if 
the internal memory exceeds the size of a stripe after rows between check
+    // So worst case the most memory the writer can hold is the size of a 
stripe plus size of records * number of records between checks
+    // Note that this is an overestimate as the native ORC file writer should 
have some compression ratio
+    long maxMemoryInFileWriter = this.estimatedRecordSizeBytes * 
orcFileWriterRowsBetweenCheck + DEFAULT_ORC_WRITER_STRIPE_SIZE;
+
+    int newBatchSize = (int) 
Math.round(((this.availableMemory/currentConcurrentWriters - 
maxMemoryInFileWriter
+        - this.estimatedBytesAllocatedConverterMemory) * 
this.batchSizeMemoryUsageFactor) / averageSizePerRecord);
+    // Handle scenarios where new batch size can be 0 or less due to 
overestimating memory used by other components
+    newBatchSize = Math.min(Math.max(1, newBatchSize), 
DEFAULT_ORC_WRITER_BATCH_SIZE);
+    // TODO: Consider using a more sophisticated check to determine if the 
batch size should be changed
+    if (Math.abs(newBatchSize - this.batchSize) > 0.2 * this.batchSize) {
+      log.info("Tuning ORC writer batch size from {} to {} based on average 
byte size per record: {} with available memory {} and {} bytes of allocated 
memory in converter buffers, with {} partitioned writers",
+          batchSize, newBatchSize, averageSizePerRecord, availableMemory, 
estimatedBytesAllocatedConverterMemory, currentConcurrentWriters);
+      this.batchSize = newBatchSize;
+      // We only initialize the native ORC file writer once to avoid creating 
too many small files, as reconfiguring rows between memory check
+      // requires one to close the writer and start a new file
+      if (this.orcFileWriter == null) {
+        initializeOrcFileWriter();
+      }
+      this.flush();
+      this.rowBatch.ensureSize(this.batchSize);
+    }
+    if (this.orcFileWriter == null) {

Review Comment:
   The issue is that because I don't lazy init the writer during flush, I need 
to initialize the writer during the first tune. I need to initialize before the 
first flush essentially, maybe this makes more sense in the flush function



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to