[
https://issues.apache.org/jira/browse/GOBBLIN-1891?focusedWorklogId=878945&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-878945
]
ASF GitHub Bot logged work on GOBBLIN-1891:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 29/Aug/23 23:40
Start Date: 29/Aug/23 23:40
Worklog Time Spent: 10m
Work Description: Will-Lo commented on code in PR #3751:
URL: https://github.com/apache/gobblin/pull/3751#discussion_r1309453008
##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java:
##########
@@ -183,9 +241,75 @@ public void commit()
throws IOException {
closeInternal();
super.commit();
+ if (this.selfTuningWriter) {
+ properties.setProp(ORC_WRITER_ESTIMATED_RECORD_SIZE,
String.valueOf(estimatedRecordSizeBytes));
+
properties.setProp(ORC_WRITER_ESTIMATED_BYTES_ALLOCATED_CONVERTER_MEMORY,
String.valueOf(this.converterMemoryManager.getConverterBufferTotalSize()));
+ properties.setProp(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(),
String.valueOf(this.batchSize * calculateOrcFileWriterRowsBetweenCheck()));
+ }
}
/**
+ * Modifies the size of the writer buffer based on the average size of the
records written so far, the amount of available memory during initialization,
and the number of concurrent writers.
+ * The new batch size is calculated as follows:
+ * 1. Memory available = (available memory during startup)/(concurrent
writers) - (memory used by ORCFile writer)
+ * 2. Average file size, estimated during Avro -> ORC conversion
+ * 3. Estimate of memory used by the converter lists, as during resize the
internal buffer size can grow large
+ * 4. New batch size = (Memory available - Estimated memory used by
converter lists) / Average file size * memory buffer
+ * Generally in this writer, the memory the converter uses for large arrays
is the leading cause of OOM in streaming, along with the records stored in the
rowBatch
+ * Another potential approach is to also check the memory available before
resizing the converter lists, and to flush the batch whenever a resize is
needed.
+ */
+ void tuneBatchSize(long averageSizePerRecord, int
orcFileWriterRowsBetweenCheck) throws IOException {
+ this.estimatedBytesAllocatedConverterMemory =
Math.max(this.estimatedBytesAllocatedConverterMemory,
this.converterMemoryManager.getConverterBufferTotalSize());
+ int currentConcurrentWriters =
this.properties.getPropAsInt(PartitionedDataWriter.CURRENT_PARTITIONED_WRITERS_COUNTER,
CONCURRENT_WRITERS_DEFAULT);
+ // In the native ORC writer implementation, it will flush the writer if
the internal memory exceeds the size of a stripe after rows between check
+ // So worst case the most memory the writer can hold is the size of a
stripe plus size of records * number of records between checks
+ // Note that this is an overestimate as the native ORC file writer should
have some compression ratio
+ long maxMemoryInFileWriter = this.estimatedRecordSizeBytes *
orcFileWriterRowsBetweenCheck + DEFAULT_ORC_WRITER_STRIPE_SIZE;
+
+ int newBatchSize = (int)
Math.round(((this.availableMemory/currentConcurrentWriters -
maxMemoryInFileWriter
+ - this.estimatedBytesAllocatedConverterMemory) *
this.batchSizeMemoryUsageFactor) / averageSizePerRecord);
+ // Handle scenarios where new batch size can be 0 or less due to
overestimating memory used by other components
+ newBatchSize = Math.min(Math.max(1, newBatchSize),
DEFAULT_ORC_WRITER_BATCH_SIZE);
+ // TODO: Consider using a more sophisticated check to determine if the
batch size should be changed
+ if (Math.abs(newBatchSize - this.batchSize) > 0.2 * this.batchSize) {
+ log.info("Tuning ORC writer batch size from {} to {} based on average
byte size per record: {} with available memory {} and {} bytes of allocated
memory in converter buffers, with {} partitioned writers",
+ batchSize, newBatchSize, averageSizePerRecord, availableMemory,
estimatedBytesAllocatedConverterMemory, currentConcurrentWriters);
+ this.batchSize = newBatchSize;
+ // We only initialize the native ORC file writer once to avoid creating
too many small files, as reconfiguring rows between memory check
+ // requires one to close the writer and start a new file
+ if (this.orcFileWriter == null) {
+ initializeOrcFileWriter();
+ }
+ this.flush();
Review Comment:
If the rowbatch is empty then it will ignore flushing, mostly just because I
reuse the tuneBatchSize both during first initialization and before each tune.
##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java:
##########
@@ -183,9 +241,75 @@ public void commit()
throws IOException {
closeInternal();
super.commit();
+ if (this.selfTuningWriter) {
+ properties.setProp(ORC_WRITER_ESTIMATED_RECORD_SIZE,
String.valueOf(estimatedRecordSizeBytes));
+
properties.setProp(ORC_WRITER_ESTIMATED_BYTES_ALLOCATED_CONVERTER_MEMORY,
String.valueOf(this.converterMemoryManager.getConverterBufferTotalSize()));
+ properties.setProp(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(),
String.valueOf(this.batchSize * calculateOrcFileWriterRowsBetweenCheck()));
+ }
}
/**
+ * Modifies the size of the writer buffer based on the average size of the
records written so far, the amount of available memory during initialization,
and the number of concurrent writers.
+ * The new batch size is calculated as follows:
+ * 1. Memory available = (available memory during startup)/(concurrent
writers) - (memory used by ORCFile writer)
+ * 2. Average file size, estimated during Avro -> ORC conversion
+ * 3. Estimate of memory used by the converter lists, as during resize the
internal buffer size can grow large
+ * 4. New batch size = (Memory available - Estimated memory used by
converter lists) / Average file size * memory buffer
+ * Generally in this writer, the memory the converter uses for large arrays
is the leading cause of OOM in streaming, along with the records stored in the
rowBatch
+ * Another potential approach is to also check the memory available before
resizing the converter lists, and to flush the batch whenever a resize is
needed.
+ */
+ void tuneBatchSize(long averageSizePerRecord, int
orcFileWriterRowsBetweenCheck) throws IOException {
+ this.estimatedBytesAllocatedConverterMemory =
Math.max(this.estimatedBytesAllocatedConverterMemory,
this.converterMemoryManager.getConverterBufferTotalSize());
+ int currentConcurrentWriters =
this.properties.getPropAsInt(PartitionedDataWriter.CURRENT_PARTITIONED_WRITERS_COUNTER,
CONCURRENT_WRITERS_DEFAULT);
+ // In the native ORC writer implementation, it will flush the writer if
the internal memory exceeds the size of a stripe after rows between check
+ // So worst case the most memory the writer can hold is the size of a
stripe plus size of records * number of records between checks
+ // Note that this is an overestimate as the native ORC file writer should
have some compression ratio
+ long maxMemoryInFileWriter = this.estimatedRecordSizeBytes *
orcFileWriterRowsBetweenCheck + DEFAULT_ORC_WRITER_STRIPE_SIZE;
+
+ int newBatchSize = (int)
Math.round(((this.availableMemory/currentConcurrentWriters -
maxMemoryInFileWriter
+ - this.estimatedBytesAllocatedConverterMemory) *
this.batchSizeMemoryUsageFactor) / averageSizePerRecord);
+ // Handle scenarios where new batch size can be 0 or less due to
overestimating memory used by other components
+ newBatchSize = Math.min(Math.max(1, newBatchSize),
DEFAULT_ORC_WRITER_BATCH_SIZE);
+ // TODO: Consider using a more sophisticated check to determine if the
batch size should be changed
+ if (Math.abs(newBatchSize - this.batchSize) > 0.2 * this.batchSize) {
+ log.info("Tuning ORC writer batch size from {} to {} based on average
byte size per record: {} with available memory {} and {} bytes of allocated
memory in converter buffers, with {} partitioned writers",
+ batchSize, newBatchSize, averageSizePerRecord, availableMemory,
estimatedBytesAllocatedConverterMemory, currentConcurrentWriters);
+ this.batchSize = newBatchSize;
+ // We only initialize the native ORC file writer once to avoid creating
too many small files, as reconfiguring rows between memory check
+ // requires one to close the writer and start a new file
+ if (this.orcFileWriter == null) {
+ initializeOrcFileWriter();
+ }
+ this.flush();
Review Comment:
If the rowbatch is empty then it will ignore flushing, mostly just because I
reuse the tuneBatchSize both during first initialization and every tune
frequency
Issue Time Tracking
-------------------
Worklog Id: (was: 878945)
Time Spent: 0.5h (was: 20m)
> Create self-tuning ORC Writer
> -----------------------------
>
> Key: GOBBLIN-1891
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1891
> Project: Apache Gobblin
> Issue Type: New Feature
> Components: gobblin-core
> Reporter: William Lo
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> In Gobblin streaming, the Avro to ORC converter and writer constantly face
> OOM issues when the record sizes are large due to large arrays or maps.
> Since streaming pipelines are run indefinitely*, static configurations are
> usually insufficient to handle varying sizes of data, the converter buffers,
> increases in partitions, etc. This causes pipelines to often stall and make
> no progress if the incoming data size is increased beyond the memory limits
> of the container.
> We want to implement a bufferedORCWriter, which utilizes many of the same
> components as the current ORC Writer, except that the batchSize is adaptable
> to larger record sizes and takes into the account of the memory available to
> the JVM to avoid OOM issues as well as the memory the converter uses, and the
> number of partitioned writers. This should be enabled only by a
> configuration, and have knobs available so that one can increase the
> sensitivity and the performance of this writer.
> Future improvements include improving the converter to use up less unused
> memory every resize, and more accurate estimations done for memory usage in
> the orc writer.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)