ZihanLi58 commented on code in PR #3751:
URL: https://github.com/apache/gobblin/pull/3751#discussion_r1309346102
##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java:
##########
@@ -183,9 +241,75 @@ public void commit()
throws IOException {
closeInternal();
super.commit();
+ if (this.selfTuningWriter) {
+ properties.setProp(ORC_WRITER_ESTIMATED_RECORD_SIZE,
String.valueOf(estimatedRecordSizeBytes));
+
properties.setProp(ORC_WRITER_ESTIMATED_BYTES_ALLOCATED_CONVERTER_MEMORY,
String.valueOf(this.converterMemoryManager.getConverterBufferTotalSize()));
+ properties.setProp(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(),
String.valueOf(this.batchSize * calculateOrcFileWriterRowsBetweenCheck()));
+ }
}
/**
+ * Modifies the size of the writer buffer based on the average size of the
records written so far, the amount of available memory during initialization,
and the number of concurrent writers.
+ * The new batch size is calculated as follows:
+ * 1. Memory available = (available memory during startup)/(concurrent
writers) - (memory used by ORCFile writer)
+ * 2. Average file size, estimated during Avro -> ORC conversion
+ * 3. Estimate of memory used by the converter lists, as during resize the
internal buffer size can grow large
+ * 4. New batch size = (Memory available - Estimated memory used by
converter lists) / Average file size * memory buffer
+ * Generally in this writer, the memory the converter uses for large arrays
is the leading cause of OOM in streaming, along with the records stored in the
rowBatch
+ * Another potential approach is to also check the memory available before
resizing the converter lists, and to flush the batch whenever a resize is
needed.
+ */
+ void tuneBatchSize(long averageSizePerRecord, int
orcFileWriterRowsBetweenCheck) throws IOException {
+ this.estimatedBytesAllocatedConverterMemory =
Math.max(this.estimatedBytesAllocatedConverterMemory,
this.converterMemoryManager.getConverterBufferTotalSize());
+ int currentConcurrentWriters =
this.properties.getPropAsInt(PartitionedDataWriter.CURRENT_PARTITIONED_WRITERS_COUNTER,
CONCURRENT_WRITERS_DEFAULT);
+ // In the native ORC writer implementation, it will flush the writer if
the internal memory exceeds the size of a stripe after rows between check
+ // So worst case the most memory the writer can hold is the size of a
stripe plus size of records * number of records between checks
+ // Note that this is an overestimate as the native ORC file writer should
have some compression ratio
+ long maxMemoryInFileWriter = this.estimatedRecordSizeBytes *
orcFileWriterRowsBetweenCheck + DEFAULT_ORC_WRITER_STRIPE_SIZE;
+
+ int newBatchSize = (int)
Math.round(((this.availableMemory/currentConcurrentWriters -
maxMemoryInFileWriter
+ - this.estimatedBytesAllocatedConverterMemory) *
this.batchSizeMemoryUsageFactor) / averageSizePerRecord);
+ // Handle scenarios where new batch size can be 0 or less due to
overestimating memory used by other components
+ newBatchSize = Math.min(Math.max(1, newBatchSize),
DEFAULT_ORC_WRITER_BATCH_SIZE);
+ // TODO: Consider using a more sophisticated check to determine if the
batch size should be changed
+ if (Math.abs(newBatchSize - this.batchSize) > 0.2 * this.batchSize) {
+ log.info("Tuning ORC writer batch size from {} to {} based on average
byte size per record: {} with available memory {} and {} bytes of allocated
memory in converter buffers, with {} partitioned writers",
+ batchSize, newBatchSize, averageSizePerRecord, availableMemory,
estimatedBytesAllocatedConverterMemory, currentConcurrentWriters);
+ this.batchSize = newBatchSize;
+ // We only initialize the native ORC file writer once to avoid creating
too many small files, as reconfiguring rows between memory check
+ // requires one to close the writer and start a new file
+ if (this.orcFileWriter == null) {
+ initializeOrcFileWriter();
+ }
+ this.flush();
Review Comment:
we don't need to flush when we just initialize the writer?
##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java:
##########
@@ -183,9 +241,75 @@ public void commit()
throws IOException {
closeInternal();
super.commit();
+ if (this.selfTuningWriter) {
+ properties.setProp(ORC_WRITER_ESTIMATED_RECORD_SIZE,
String.valueOf(estimatedRecordSizeBytes));
+
properties.setProp(ORC_WRITER_ESTIMATED_BYTES_ALLOCATED_CONVERTER_MEMORY,
String.valueOf(this.converterMemoryManager.getConverterBufferTotalSize()));
+ properties.setProp(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(),
String.valueOf(this.batchSize * calculateOrcFileWriterRowsBetweenCheck()));
+ }
}
/**
+ * Modifies the size of the writer buffer based on the average size of the
records written so far, the amount of available memory during initialization,
and the number of concurrent writers.
+ * The new batch size is calculated as follows:
+ * 1. Memory available = (available memory during startup)/(concurrent
writers) - (memory used by ORCFile writer)
+ * 2. Average file size, estimated during Avro -> ORC conversion
+ * 3. Estimate of memory used by the converter lists, as during resize the
internal buffer size can grow large
+ * 4. New batch size = (Memory available - Estimated memory used by
converter lists) / Average file size * memory buffer
+ * Generally in this writer, the memory the converter uses for large arrays
is the leading cause of OOM in streaming, along with the records stored in the
rowBatch
+ * Another potential approach is to also check the memory available before
resizing the converter lists, and to flush the batch whenever a resize is
needed.
+ */
+ void tuneBatchSize(long averageSizePerRecord, int
orcFileWriterRowsBetweenCheck) throws IOException {
+ this.estimatedBytesAllocatedConverterMemory =
Math.max(this.estimatedBytesAllocatedConverterMemory,
this.converterMemoryManager.getConverterBufferTotalSize());
+ int currentConcurrentWriters =
this.properties.getPropAsInt(PartitionedDataWriter.CURRENT_PARTITIONED_WRITERS_COUNTER,
CONCURRENT_WRITERS_DEFAULT);
+ // In the native ORC writer implementation, it will flush the writer if
the internal memory exceeds the size of a stripe after rows between check
+ // So worst case the most memory the writer can hold is the size of a
stripe plus size of records * number of records between checks
+ // Note that this is an overestimate as the native ORC file writer should
have some compression ratio
+ long maxMemoryInFileWriter = this.estimatedRecordSizeBytes *
orcFileWriterRowsBetweenCheck + DEFAULT_ORC_WRITER_STRIPE_SIZE;
+
+ int newBatchSize = (int)
Math.round(((this.availableMemory/currentConcurrentWriters -
maxMemoryInFileWriter
+ - this.estimatedBytesAllocatedConverterMemory) *
this.batchSizeMemoryUsageFactor) / averageSizePerRecord);
+ // Handle scenarios where new batch size can be 0 or less due to
overestimating memory used by other components
+ newBatchSize = Math.min(Math.max(1, newBatchSize),
DEFAULT_ORC_WRITER_BATCH_SIZE);
+ // TODO: Consider using a more sophisticated check to determine if the
batch size should be changed
+ if (Math.abs(newBatchSize - this.batchSize) > 0.2 * this.batchSize) {
Review Comment:
If we only flush when new batch size is smaller, will that cause any issue?
Just want to see if we can reduce the number of call to flush() method.
##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/OrcConverterMemoryManager.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+import org.apache.orc.storage.ql.exec.vector.ColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
+import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
+import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
+import org.apache.orc.storage.ql.exec.vector.UnionColumnVector;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+
+
+/**
+ * A helper class to calculate the size of array buffers in a {@link
VectorizedRowBatch}.
+ * This estimate is mainly based on the maximum size of each variable length
column, which can be resized
+ * Since the resizing algorithm for each column can balloon, this can affect
likelihood of OOM
+ */
+public class OrcConverterMemoryManager {
+
+ private VectorizedRowBatch rowBatch;
+ long converterBufferTotalSize;
+
+ // TODO: Consider moving the resize algorithm from the converter to this
class
+ OrcConverterMemoryManager(VectorizedRowBatch rowBatch) {
+ this.rowBatch = rowBatch;
+ }
+
+ // TODO: consider performing this calculation live whenever a resize is done
+ private void calculateResizeSpaceOfArrayBuffers() {
+ ColumnVector[] cols = this.rowBatch.cols;
+ for (int i = 0; i < cols.length; i++) {
+ calculateSizeOfColHelper(cols[i]);
+ }
+ }
+
+ public void calculateSizeOfColHelper(ColumnVector col) {
Review Comment:
Seems we only calculate size for null value, can you add java doc to explain
the algorithm here?
##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java:
##########
@@ -183,9 +241,75 @@ public void commit()
throws IOException {
closeInternal();
super.commit();
+ if (this.selfTuningWriter) {
+ properties.setProp(ORC_WRITER_ESTIMATED_RECORD_SIZE,
String.valueOf(estimatedRecordSizeBytes));
+
properties.setProp(ORC_WRITER_ESTIMATED_BYTES_ALLOCATED_CONVERTER_MEMORY,
String.valueOf(this.converterMemoryManager.getConverterBufferTotalSize()));
+ properties.setProp(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(),
String.valueOf(this.batchSize * calculateOrcFileWriterRowsBetweenCheck()));
+ }
}
/**
+ * Modifies the size of the writer buffer based on the average size of the
records written so far, the amount of available memory during initialization,
and the number of concurrent writers.
+ * The new batch size is calculated as follows:
+ * 1. Memory available = (available memory during startup)/(concurrent
writers) - (memory used by ORCFile writer)
+ * 2. Average file size, estimated during Avro -> ORC conversion
+ * 3. Estimate of memory used by the converter lists, as during resize the
internal buffer size can grow large
+ * 4. New batch size = (Memory available - Estimated memory used by
converter lists) / Average file size * memory buffer
+ * Generally in this writer, the memory the converter uses for large arrays
is the leading cause of OOM in streaming, along with the records stored in the
rowBatch
+ * Another potential approach is to also check the memory available before
resizing the converter lists, and to flush the batch whenever a resize is
needed.
+ */
+ void tuneBatchSize(long averageSizePerRecord, int
orcFileWriterRowsBetweenCheck) throws IOException {
+ this.estimatedBytesAllocatedConverterMemory =
Math.max(this.estimatedBytesAllocatedConverterMemory,
this.converterMemoryManager.getConverterBufferTotalSize());
+ int currentConcurrentWriters =
this.properties.getPropAsInt(PartitionedDataWriter.CURRENT_PARTITIONED_WRITERS_COUNTER,
CONCURRENT_WRITERS_DEFAULT);
+ // In the native ORC writer implementation, it will flush the writer if
the internal memory exceeds the size of a stripe after rows between check
+ // So worst case the most memory the writer can hold is the size of a
stripe plus size of records * number of records between checks
+ // Note that this is an overestimate as the native ORC file writer should
have some compression ratio
+ long maxMemoryInFileWriter = this.estimatedRecordSizeBytes *
orcFileWriterRowsBetweenCheck + DEFAULT_ORC_WRITER_STRIPE_SIZE;
+
+ int newBatchSize = (int)
Math.round(((this.availableMemory/currentConcurrentWriters -
maxMemoryInFileWriter
+ - this.estimatedBytesAllocatedConverterMemory) *
this.batchSizeMemoryUsageFactor) / averageSizePerRecord);
+ // Handle scenarios where new batch size can be 0 or less due to
overestimating memory used by other components
+ newBatchSize = Math.min(Math.max(1, newBatchSize),
DEFAULT_ORC_WRITER_BATCH_SIZE);
+ // TODO: Consider using a more sophisticated check to determine if the
batch size should be changed
+ if (Math.abs(newBatchSize - this.batchSize) > 0.2 * this.batchSize) {
+ log.info("Tuning ORC writer batch size from {} to {} based on average
byte size per record: {} with available memory {} and {} bytes of allocated
memory in converter buffers, with {} partitioned writers",
+ batchSize, newBatchSize, averageSizePerRecord, availableMemory,
estimatedBytesAllocatedConverterMemory, currentConcurrentWriters);
+ this.batchSize = newBatchSize;
+ // We only initialize the native ORC file writer once to avoid creating
too many small files, as reconfiguring rows between memory check
+ // requires one to close the writer and start a new file
+ if (this.orcFileWriter == null) {
+ initializeOrcFileWriter();
+ }
+ this.flush();
+ this.rowBatch.ensureSize(this.batchSize);
+ }
+ if (this.orcFileWriter == null) {
Review Comment:
duplicate code?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]