This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 8b890759a [GOBBLIN-1957] GobblinOrcwriter improvements for large
records (#3828)
8b890759a is described below
commit 8b890759ac1758b5d18777d61a2e5df528f614ab
Author: William Lo <[email protected]>
AuthorDate: Tue Nov 21 11:17:57 2023 -0500
[GOBBLIN-1957] GobblinOrcwriter improvements for large records (#3828)
* WIP
* Optimization to limit batchsize based on large record sizes
* Address review
---
.../gobblin/writer/GobblinBaseOrcWriter.java | 15 +++++-
.../gobblin/writer/GobblinOrcWriterConfigs.java | 16 +++++--
.../gobblin/writer/GobblinOrcWriterTest.java | 54 +++++++++++++++++++---
3 files changed, 73 insertions(+), 12 deletions(-)
diff --git
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
index 2ed31b93b..73a53ada9 100644
---
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
+++
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
@@ -91,6 +91,8 @@ public abstract class GobblinBaseOrcWriter<S, D> extends
FsDataWriter<D> {
private int orcFileWriterRowsBetweenCheck;
private long orcStripeSize;
private int maxOrcBatchSize;
+ private int batchSizeRowCheckFactor;
+ private boolean enableLimitBufferSizeOrcStripe;
private int concurrentWriterTasks;
private long orcWriterStripeSizeBytes;
@@ -109,6 +111,7 @@ public abstract class GobblinBaseOrcWriter<S, D> extends
FsDataWriter<D> {
this.typeDescription = getOrcSchema();
this.selfTuningWriter =
properties.getPropAsBoolean(GobblinOrcWriterConfigs.ORC_WRITER_AUTO_SELFTUNE_ENABLED,
false);
this.validateORCAfterClose =
properties.getPropAsBoolean(GobblinOrcWriterConfigs.ORC_WRITER_VALIDATE_FILE_AFTER_CLOSE,
false);
+ this.batchSizeRowCheckFactor =
properties.getPropAsInt(GobblinOrcWriterConfigs.ORC_WRITER_BATCHSIZE_ROWCHECK_FACTOR,
GobblinOrcWriterConfigs.DEFAULT_ORC_WRITER_BATCHSIZE_ROWCHECK_FACTOR);
this.maxOrcBatchSize =
properties.getPropAsInt(GobblinOrcWriterConfigs.ORC_WRITER_AUTO_SELFTUNE_MAX_BATCH_SIZE,
GobblinOrcWriterConfigs.DEFAULT_MAX_ORC_WRITER_BATCH_SIZE);
this.batchSize = this.selfTuningWriter ?
@@ -133,6 +136,7 @@ public abstract class GobblinBaseOrcWriter<S, D> extends
FsDataWriter<D> {
GobblinOrcWriterConfigs.DEFAULT_MIN_ORC_WRITER_ROWCHECK);
this.orcFileWriterMaxRowsBetweenCheck =
properties.getPropAsInt(GobblinOrcWriterConfigs.ORC_WRITER_MAX_ROWCHECK,
GobblinOrcWriterConfigs.DEFAULT_MAX_ORC_WRITER_ROWCHECK);
+ this.enableLimitBufferSizeOrcStripe =
properties.getPropAsBoolean(GobblinOrcWriterConfigs.ORC_WRITER_ENABLE_BUFFER_LIMIT_ORC_STRIPE,
false);
// Create file-writer
this.writerConfig = new Configuration();
// Populate job Configurations into Conf as well so that configurations
related to ORC writer can be tuned easily.
@@ -312,11 +316,18 @@ public abstract class GobblinBaseOrcWriter<S, D> extends
FsDataWriter<D> {
this.currentOrcWriterMaxUnderlyingMemory =
Math.max(this.currentOrcWriterMaxUnderlyingMemory,
orcFileWriter.estimateMemory());
}
long maxMemoryInFileWriter = Math.max(currentOrcWriterMaxUnderlyingMemory,
prevOrcWriterMaxUnderlyingMemory);
-
int newBatchSize = (int) ((this.availableMemory*1.0 /
currentPartitionedWriters * this.rowBatchMemoryUsageFactor -
maxMemoryInFileWriter
- this.estimatedBytesAllocatedConverterMemory) / averageSizePerRecord);
+
+ if (this.enableLimitBufferSizeOrcStripe) {
+ // For large records, prevent the batch size from greatly exceeding the
size of a stripe as the native ORC Writer will flush its buffer after a stripe
is filled
+ int maxRecordsPerStripeSize = (int)
Math.round(this.orcWriterStripeSizeBytes * 1.0 / averageSizePerRecord);
+ this.orcFileWriterMaxRowsBetweenCheck =
Math.min(this.orcFileWriterMaxRowsBetweenCheck, maxRecordsPerStripeSize);
+ this.maxOrcBatchSize = Math.min(this.maxOrcBatchSize,
maxRecordsPerStripeSize);
+ }
// Handle scenarios where new batch size can be 0 or less due to
overestimating memory used by other components
newBatchSize = Math.min(Math.max(1, newBatchSize), this.maxOrcBatchSize);
+
if (Math.abs(newBatchSize - this.batchSize) >
GobblinOrcWriterConfigs.DEFAULT_ORC_WRITER_TUNE_BATCHSIZE_SENSITIVITY *
this.batchSize) {
// Add a factor when tuning up the batch size to prevent large sudden
increases in memory usage
if (newBatchSize > this.batchSize) {
@@ -337,7 +348,7 @@ public abstract class GobblinBaseOrcWriter<S, D> extends
FsDataWriter<D> {
void initializeOrcFileWriter() {
try {
this.orcFileWriterRowsBetweenCheck = Math.max(
- Math.min(this.batchSize *
GobblinOrcWriterConfigs.DEFAULT_ORC_WRITER_BATCHSIZE_ROWCHECK_FACTOR,
this.orcFileWriterMaxRowsBetweenCheck),
+ Math.min(this.batchSize * this.batchSizeRowCheckFactor,
this.orcFileWriterMaxRowsBetweenCheck),
this.orcFileWriterMinRowsBetweenCheck
);
this.writerConfig.set(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(),
String.valueOf(this.orcFileWriterRowsBetweenCheck));
diff --git
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterConfigs.java
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterConfigs.java
index 89621d38c..fcd70a78b 100644
---
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterConfigs.java
+++
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterConfigs.java
@@ -39,6 +39,11 @@ public class GobblinOrcWriterConfigs {
* Max buffer size of the Gobblin ORC Writer that it can be tuned to
*/
public static final String ORC_WRITER_AUTO_SELFTUNE_MAX_BATCH_SIZE =
ORC_WRITER_PREFIX + "auto.selfTune.max.batch.size";
+ /**
+ * The ratio of native ORC Writer buffer size to Gobblin ORC Writer buffer
size
+ */
+ public static final String ORC_WRITER_BATCHSIZE_ROWCHECK_FACTOR =
"auto.selfTune.rowCheck.factor";
+
/**
* How often should the Gobblin ORC Writer check for tuning
*/
@@ -60,6 +65,12 @@ public class GobblinOrcWriterConfigs {
*/
public static final String ORC_WRITER_MAX_ROWCHECK = ORC_WRITER_PREFIX +
"max.rows.between.memory.checks";
+ /**
+ * Enable a maximum buffer size of both the native ORC writer and the
Gobblin ORC writer by the size of a stripe divided by the estimated
+ * size of each record. This is to capture the case when records are
extremely large and cause large buffer sizes to dominate the memory usage
+ */
+ public static final String ORC_WRITER_ENABLE_BUFFER_LIMIT_ORC_STRIPE =
ORC_WRITER_PREFIX + "auto.selfTune.max.buffer.orc.stripe";
+
public static final String ORC_WRITER_INSTRUMENTED = ORC_WRITER_PREFIX +
"instrumented";
public static final int DEFAULT_ORC_WRITER_BATCH_SIZE = 1000;
@@ -70,10 +81,9 @@ public class GobblinOrcWriterConfigs {
*/
public static final int DEFAULT_CONCURRENT_WRITERS = 3;
public static final double DEFAULT_ORC_WRITER_BATCHSIZE_MEMORY_USAGE_FACTOR
= 0.3;
- /**
- * The ratio of native ORC Writer buffer size to Gobblin ORC Writer buffer
size
- */
+
public static final int DEFAULT_ORC_WRITER_BATCHSIZE_ROWCHECK_FACTOR = 5;
+
public static final int DEFAULT_MAX_ORC_WRITER_BATCH_SIZE =
DEFAULT_ORC_WRITER_BATCH_SIZE;
public static final int DEFAULT_ORC_AUTO_SELFTUNE_ROWS_BETWEEN_CHECK = 500;
/**
diff --git
a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GobblinOrcWriterTest.java
b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GobblinOrcWriterTest.java
index 63db334a7..ea24ef350 100644
---
a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GobblinOrcWriterTest.java
+++
b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GobblinOrcWriterTest.java
@@ -268,21 +268,23 @@ public class GobblinOrcWriterTest {
// Force a larger initial batchSize that can be tuned down
orcWriter.batchSize = 10;
orcWriter.rowBatch.ensureSize(10);
+ // Given that the available memory is very high, the resulting batchsize
should be maxed out
orcWriter.availableMemory = 100000000;
- // Given the amount of available memory and a low stripe size, and
estimated rowBatchSize, the resulting batchsize should be maxed out
+ // Consider that the batch size incrementally increases based on the
difference between target and current batchsize (10)
orcWriter.tuneBatchSize(10);
- System.out.println(orcWriter.batchSize);
- // Take into account that increases in batchsize are multiplied by a
factor to prevent large jumps in batchsize
- Assert.assertTrue(orcWriter.batchSize ==
(GobblinOrcWriterConfigs.DEFAULT_ORC_WRITER_BATCH_SIZE+10)/2);
+ Assert.assertEquals(orcWriter.batchSize, 505);
+ orcWriter.tuneBatchSize(10);
+ Assert.assertEquals(orcWriter.batchSize, 752);
+
orcWriter.availableMemory = 100;
orcWriter.tuneBatchSize(10);
// Given that the amount of available memory is low, the resulting
batchsize should be 1
- Assert.assertTrue(orcWriter.batchSize == 1);
+ Assert.assertEquals(orcWriter.batchSize,1);
orcWriter.availableMemory = 10000;
orcWriter.rowBatch.ensureSize(10000);
// Since the rowBatch is large, the resulting batchsize should still be 1
even with more memory
orcWriter.tuneBatchSize(10);
- Assert.assertTrue(orcWriter.batchSize == 1);
+ Assert.assertEquals(orcWriter.batchSize, 1);
}
@Test
@@ -322,4 +324,42 @@ public class GobblinOrcWriterTest {
Assert.assertNotNull(dummyState.getProp(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_NATIVE_WRITER_MEMORY));
Assert.assertNotNull(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute());
}
-}
\ No newline at end of file
+
+ @Test
+ public void testSelfTuneRowBatchCalculationWithStripeMax() throws Exception {
+ Schema schema =
+ new
Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("orc_writer_test/schema.avsc"));
+ List<GenericRecord> recordList = deserializeAvroRecords(this.getClass(),
schema, "orc_writer_test/data_multi.json");
+
+ // Mock WriterBuilder, bunch of mocking behaviors to work-around
precondition checks in writer builder
+ FsDataWriterBuilder<Schema, GenericRecord> mockBuilder =
+ (FsDataWriterBuilder<Schema, GenericRecord>)
Mockito.mock(FsDataWriterBuilder.class);
+ when(mockBuilder.getSchema()).thenReturn(schema);
+
+ State dummyState = new WorkUnit();
+ String stagingDir = Files.createTempDir().getAbsolutePath();
+ String outputDir = Files.createTempDir().getAbsolutePath();
+ dummyState.setProp(ConfigurationKeys.WRITER_STAGING_DIR, stagingDir);
+ dummyState.setProp(ConfigurationKeys.WRITER_FILE_PATH, "selfTune");
+ dummyState.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, outputDir);
+
dummyState.setProp(GobblinOrcWriterConfigs.ORC_WRITER_AUTO_SELFTUNE_ENABLED,
"true");
+ dummyState.setProp(OrcConf.STRIPE_SIZE.getAttribute(), "100");
+
dummyState.setProp(GobblinOrcWriterConfigs.ORC_WRITER_ENABLE_BUFFER_LIMIT_ORC_STRIPE,
"true");
+ when(mockBuilder.getFileName(dummyState)).thenReturn("file");
+
+ // Having a closer to manage the life-cycle of the writer object.
+ Closer closer = Closer.create();
+ GobblinOrcWriter orcWriter = closer.register(new
GobblinOrcWriter(mockBuilder, dummyState));
+ // Force a larger initial batchSize that can be tuned down
+ orcWriter.batchSize = 10;
+ orcWriter.rowBatch.ensureSize(10);
+ orcWriter.availableMemory = 100000000;
+ // Since the stripe size is 100, the resulting batchsize should be 10
(100/10)
+ orcWriter.tuneBatchSize(10);
+ Assert.assertEquals(orcWriter.batchSize,10);
+
+ // Increasing the estimated record size should decrease the max batch size
+ orcWriter.tuneBatchSize(100);
+ Assert.assertEquals(orcWriter.batchSize,1);
+ }
+}