This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new a392ebd29 [GOBBLIN-1918] Use Exponential Decay for RowBatch resizing
(#3787)
a392ebd29 is described below
commit a392ebd2988f52f7109dbb95bf9966ecac0d947f
Author: William Lo <[email protected]>
AuthorDate: Mon Sep 25 18:24:21 2023 -0400
[GOBBLIN-1918] Use Exponential Decay for RowBatch resizing (#3787)
* WIP
* Add configurable decay alg for buffer resizing orc converter
* Add configs and tests
* Move configs to shared class
* Fix test after moving configs
* Fix more tests
* Fix bug with getting resize counts
---
.../writer/GenericRecordToOrcValueWriter.java | 43 +-------
.../gobblin/writer/GobblinBaseOrcWriter.java | 9 +-
.../apache/gobblin/writer/GobblinOrcWriter.java | 4 +-
.../gobblin/writer/GobblinOrcWriterConfigs.java | 9 ++
.../gobblin/writer/OrcConverterMemoryManager.java | 121 ++++++++++++++++-----
.../writer/GenericRecordToOrcValueWriterTest.java | 15 ++-
.../writer/OrcConverterMemoryManagerTest.java | 60 +++++++++-
7 files changed, 183 insertions(+), 78 deletions(-)
diff --git
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java
index 4e1a49512..5c4c878ee 100644
---
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java
+++
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java
@@ -44,12 +44,9 @@ import
org.apache.orc.storage.ql.exec.vector.StructColumnVector;
import org.apache.orc.storage.ql.exec.vector.UnionColumnVector;
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
-import com.google.common.annotations.VisibleForTesting;
-
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.gobblin.configuration.State;
import org.apache.gobblin.util.orc.AvroOrcSchemaConverter;
@@ -60,17 +57,10 @@ import org.apache.gobblin.util.orc.AvroOrcSchemaConverter;
*/
@Slf4j
public class GenericRecordToOrcValueWriter implements
OrcValueWriter<GenericRecord> {
- private static final String ENABLE_SMART_ARRAY_ENLARGE =
GobblinOrcWriterConfigs.ORC_WRITER_PREFIX +
"enabledMulValueColumnVectorSmartSizing";
- private static final boolean DEFAULT_ENABLE_SMART_ARRAY_ENLARGE = false;
- private static final String ENLARGE_FACTOR_KEY =
GobblinOrcWriterConfigs.ORC_WRITER_PREFIX + "enlargeFactor";
- private static final int DEFAULT_ENLARGE_FACTOR = 3;
private boolean enabledSmartSizing;
private int enlargeFactor;
-
- // A rough measure of how many times resize is triggered, helping on
debugging and testing.
- @VisibleForTesting
- public int resizeCount = 0;
+ private OrcConverterMemoryManager memoryManager;
@Getter
long totalBytesConverted = 0;
@@ -93,17 +83,9 @@ public class GenericRecordToOrcValueWriter implements
OrcValueWriter<GenericReco
private final Converter[] converters;
- public GenericRecordToOrcValueWriter(TypeDescription typeDescription, Schema
avroSchema) {
+ public GenericRecordToOrcValueWriter(TypeDescription typeDescription, Schema
avroSchema, OrcConverterMemoryManager memoryManager) {
converters = buildConverters(typeDescription, avroSchema);
- this.enabledSmartSizing = DEFAULT_ENABLE_SMART_ARRAY_ENLARGE;
- this.enlargeFactor = DEFAULT_ENLARGE_FACTOR;
- }
-
- public GenericRecordToOrcValueWriter(TypeDescription typeDescription, Schema
avroSchema, State state) {
- this(typeDescription, avroSchema);
- this.enabledSmartSizing =
state.getPropAsBoolean(ENABLE_SMART_ARRAY_ENLARGE,
DEFAULT_ENABLE_SMART_ARRAY_ENLARGE);
- this.enlargeFactor = state.getPropAsInt(ENLARGE_FACTOR_KEY,
DEFAULT_ENLARGE_FACTOR);
- log.info("enabledSmartSizing: {}, enlargeFactor: {}", enabledSmartSizing,
enlargeFactor);
+ this.memoryManager = memoryManager;
}
/** Converts a record from the GenericRecord to the ORC ColumnVectors.
@@ -344,7 +326,7 @@ public class GenericRecordToOrcValueWriter implements
OrcValueWriter<GenericReco
// make sure the child is big enough
// If seeing child array being saturated, will need to expand with a
reasonable amount.
if (cv.childCount > cv.child.isNull.length) {
- int resizedLength = resize(rowsAdded, cv.isNull.length, cv.childCount);
+ int resizedLength = memoryManager.resize(rowsAdded, cv.childCount);
log.info("Column vector: {}, resizing to: {}, child count: {}",
cv.child, resizedLength, cv.childCount);
cv.child.ensureSize(resizedLength, true);
}
@@ -390,7 +372,7 @@ public class GenericRecordToOrcValueWriter implements
OrcValueWriter<GenericReco
cv.childCount += cv.lengths[rowId];
// make sure the child is big enough
if (cv.childCount > cv.keys.isNull.length) {
- int resizedLength = resize(rowsAdded, cv.isNull.length, cv.childCount);
+ int resizedLength = memoryManager.resize(rowsAdded, cv.childCount);
log.info("Column vector: {}, resizing to: {}, child count: {}",
cv.keys, resizedLength, cv.childCount);
cv.keys.ensureSize(resizedLength, true);
log.info("Column vector: {}, resizing to: {}, child count: {}",
cv.values, resizedLength, cv.childCount);
@@ -423,19 +405,6 @@ public class GenericRecordToOrcValueWriter implements
OrcValueWriter<GenericReco
}
}
- /**
- * Resize the child-array size based on configuration.
- * If smart-sizing is enabled, it will using the avg size of container and
expand the whole child array to
- * delta(avgSizeOfContainer * numberOfContainer(batchSize)) the first time
this is called.
- * If there's further resize requested, it will add delta again to be
conservative, but chances of adding delta
- * for multiple times should be low, unless the container size is
fluctuating too much.
- */
- private int resize(int rowsAdded, int batchSize, int requestedSize) {
- resizeCount += 1;
- log.info(String.format("It has been resized %s times in current writer",
resizeCount));
- return enabledSmartSizing ? requestedSize + (requestedSize / rowsAdded +
1) * batchSize : enlargeFactor * requestedSize;
- }
-
private Converter buildConverter(TypeDescription schema, Schema avroSchema) {
switch (schema.getCategory()) {
case BOOLEAN:
@@ -486,6 +455,6 @@ public class GenericRecordToOrcValueWriter implements
OrcValueWriter<GenericReco
}
public int getResizeCount() {
- return resizeCount;
+ return memoryManager.getResizeCount();
}
}
\ No newline at end of file
diff --git
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
index 739b84374..d8a2a353e 100644
---
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
+++
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
@@ -82,7 +82,7 @@ public abstract class GobblinBaseOrcWriter<S, D> extends
FsDataWriter<D> {
private long orcWriterStripeSizeBytes;
// Holds the maximum size of the previous run's maximum buffer or the max of
the current run's maximum buffer
private long estimatedBytesAllocatedConverterMemory = -1;
- private OrcConverterMemoryManager converterMemoryManager;
+ protected OrcConverterMemoryManager converterMemoryManager;
Configuration writerConfig;
@@ -93,7 +93,6 @@ public abstract class GobblinBaseOrcWriter<S, D> extends
FsDataWriter<D> {
// Create value-writer which is essentially a record-by-record-converter
with buffering in batch.
this.inputSchema = builder.getSchema();
this.typeDescription = getOrcSchema();
- this.valueWriter = getOrcValueWriter(typeDescription, this.inputSchema,
properties);
this.selfTuningWriter =
properties.getPropAsBoolean(GobblinOrcWriterConfigs.ORC_WRITER_AUTO_SELFTUNE_ENABLED,
false);
this.maxOrcBatchSize =
properties.getPropAsInt(GobblinOrcWriterConfigs.ORC_WRITER_AUTO_SELFTUNE_MAX_BATCH_SIZE,
GobblinOrcWriterConfigs.DEFAULT_MAX_ORC_WRITER_BATCH_SIZE);
@@ -107,7 +106,10 @@ public abstract class GobblinBaseOrcWriter<S, D> extends
FsDataWriter<D> {
this.rowBatchMemoryUsageFactor =
properties.getPropAsDouble(GobblinOrcWriterConfigs.ORC_WRITER_ROWBATCH_MEMORY_USAGE_FACTOR,
GobblinOrcWriterConfigs.DEFAULT_ORC_WRITER_BATCHSIZE_MEMORY_USAGE_FACTOR);
this.rowBatch = enableRowBatchPool ?
rowBatchPool.getRowBatch(typeDescription, batchSize) :
typeDescription.createRowBatch(batchSize);
- this.converterMemoryManager = new OrcConverterMemoryManager(this.rowBatch);
+ this.orcWriterStripeSizeBytes =
properties.getPropAsLong(OrcConf.STRIPE_SIZE.getAttribute(), (long)
OrcConf.STRIPE_SIZE.getDefaultValue());
+ this.converterMemoryManager = new OrcConverterMemoryManager(this.rowBatch,
properties);
+ this.valueWriter = getOrcValueWriter(typeDescription, this.inputSchema,
properties);
+
// Track the number of other writer tasks from different datasets
ingesting on the same container
this.concurrentWriterTasks =
properties.getPropAsInt(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_CONCURRENT_TASKS,
1);
this.orcStripeSize =
properties.getPropAsLong(OrcConf.STRIPE_SIZE.getAttribute(), (long)
OrcConf.STRIPE_SIZE.getDefaultValue());
@@ -121,7 +123,6 @@ public abstract class GobblinBaseOrcWriter<S, D> extends
FsDataWriter<D> {
JobConfigurationUtils.putStateIntoConfiguration(properties,
this.writerConfig);
OrcFile.WriterOptions options =
OrcFile.writerOptions(properties.getProperties(), this.writerConfig);
options.setSchema(typeDescription);
-
// Get the amount of allocated and future space available
this.availableMemory = (Runtime.getRuntime().maxMemory() -
(Runtime.getRuntime().totalMemory() -
Runtime.getRuntime().freeMemory()))/this.concurrentWriterTasks;
log.info("Available memory for ORC writer: {}", this.availableMemory);
diff --git
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
index dba1f2fc4..d84125525 100644
---
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
+++
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
@@ -23,7 +23,6 @@ import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.lang3.StringUtils;
-import org.apache.gobblin.util.orc.AvroOrcSchemaConverter;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator;
import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
@@ -32,6 +31,7 @@ import org.apache.orc.TypeDescription;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.util.orc.AvroOrcSchemaConverter;
/**
* A wrapper for ORC-core writer without dependency on Hive SerDe library.
@@ -50,7 +50,7 @@ public class GobblinOrcWriter extends
GobblinBaseOrcWriter<Schema, GenericRecord
@Override
protected OrcValueWriter<GenericRecord> getOrcValueWriter(TypeDescription
typeDescription, Schema inputSchema,
State state) {
- return new GenericRecordToOrcValueWriter(typeDescription,
this.inputSchema, this.properties);
+ return new GenericRecordToOrcValueWriter(typeDescription,
this.inputSchema, this.converterMemoryManager);
}
@Override
diff --git
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterConfigs.java
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterConfigs.java
index 50b51ae81..b0b859f93 100644
---
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterConfigs.java
+++
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterConfigs.java
@@ -78,6 +78,15 @@ public class GobblinOrcWriterConfigs {
public static final int DEFAULT_MIN_ORC_WRITER_ROWCHECK = 150;
public static final int DEFAULT_MAX_ORC_WRITER_ROWCHECK = 5000;
+ /**
+ * Avro to ORC converter configs
+ */
+ public static final String ENABLE_SMART_ARRAY_ENLARGE = ORC_WRITER_PREFIX +
"smartArrayEnlargement.enabled";
+ public static final String SMART_ARRAY_ENLARGE_FACTOR_MAX =
ORC_WRITER_PREFIX + "smartArrayEnlargement.factor.max";
+ public static final String SMART_ARRAY_ENLARGE_FACTOR_MIN =
ORC_WRITER_PREFIX + "smartArrayEnlargement.factor.min";
+ public static final String SMART_ARRAY_ENLARGE_DECAY_FACTOR =
ORC_WRITER_PREFIX + "smartArrayEnlargement.factor.decay";
+ public static final String ENLARGE_FACTOR_KEY = ORC_WRITER_PREFIX +
"enlargeFactor";
+
public static class RuntimeStateConfigs {
public static final String ORC_WRITER_ESTIMATED_RECORD_SIZE =
ORC_WRITER_PREFIX + "estimated.recordSize";
public static final String ORC_WRITER_NATIVE_WRITER_MEMORY =
ORC_WRITER_PREFIX + "estimated.native.writer.memory";
diff --git
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/OrcConverterMemoryManager.java
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/OrcConverterMemoryManager.java
index dcd6250dc..7a74f56fd 100644
---
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/OrcConverterMemoryManager.java
+++
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/OrcConverterMemoryManager.java
@@ -28,19 +28,57 @@ import
org.apache.orc.storage.ql.exec.vector.StructColumnVector;
import org.apache.orc.storage.ql.exec.vector.UnionColumnVector;
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+import com.google.common.base.Preconditions;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
/**
* A helper class to calculate the size of array buffers in a {@link
VectorizedRowBatch}.
* This estimate is mainly based on the maximum size of each variable length
column, which can be resized
* Since the resizing algorithm for each column can balloon, this can affect
likelihood of OOM
*/
+@Slf4j
public class OrcConverterMemoryManager {
+ private static final boolean DEFAULT_ENABLE_SMART_ARRAY_ENLARGE = false;
+ private static final int DEFAULT_ENLARGE_FACTOR = 3;
+ private static final double DEFAULT_SMART_ARRAY_ENLARGE_FACTOR_MAX = 5.0;
+ // Needs to be greater than 1.0
+ private static final double DEFAULT_SMART_ARRAY_ENLARGE_FACTOR_MIN = 1.2;
+ // Given the defaults it will take around 500 records to reach the min
enlarge factor - given that the default
+ // batch size is 1000 this is a fairly conservative default
+ private static final double DEFAULT_SMART_ARRAY_ENLARGE_DECAY_FACTOR = 0.003;
+
private VectorizedRowBatch rowBatch;
+ @Getter
+ private int resizeCount = 0;
+ private double smartArrayEnlargeFactorMax;
+ private double smartArrayEnlargeFactorMin;
+ private double smartArrayEnlargeDecayFactor;
+ private boolean enabledSmartSizing = false;
+ int enlargeFactor = 0;
- // TODO: Consider moving the resize algorithm from the converter to this
class
- OrcConverterMemoryManager(VectorizedRowBatch rowBatch) {
+ OrcConverterMemoryManager(VectorizedRowBatch rowBatch, State state) {
this.rowBatch = rowBatch;
+ this.enabledSmartSizing =
state.getPropAsBoolean(GobblinOrcWriterConfigs.ENABLE_SMART_ARRAY_ENLARGE,
DEFAULT_ENABLE_SMART_ARRAY_ENLARGE);
+ this.enlargeFactor =
state.getPropAsInt(GobblinOrcWriterConfigs.ENLARGE_FACTOR_KEY,
DEFAULT_ENLARGE_FACTOR);
+ this.smartArrayEnlargeFactorMax =
state.getPropAsDouble(GobblinOrcWriterConfigs.SMART_ARRAY_ENLARGE_FACTOR_MAX,
DEFAULT_SMART_ARRAY_ENLARGE_FACTOR_MAX);
+ this.smartArrayEnlargeFactorMin =
state.getPropAsDouble(GobblinOrcWriterConfigs.SMART_ARRAY_ENLARGE_FACTOR_MIN,
DEFAULT_SMART_ARRAY_ENLARGE_FACTOR_MIN);
+ this.smartArrayEnlargeDecayFactor =
state.getPropAsDouble(GobblinOrcWriterConfigs.SMART_ARRAY_ENLARGE_DECAY_FACTOR,
DEFAULT_SMART_ARRAY_ENLARGE_DECAY_FACTOR);
+ if (enabledSmartSizing) {
+ Preconditions.checkArgument(this.smartArrayEnlargeFactorMax >= 1,
+ String.format("Smart array enlarge factor needs to be larger than
1.0, provided value %s", this.smartArrayEnlargeFactorMax));
+ Preconditions.checkArgument(this.smartArrayEnlargeFactorMin >= 1,
+ String.format("Smart array enlarge factor needs to be larger than
1.0, provided value %s", this.smartArrayEnlargeFactorMin));
+ Preconditions.checkArgument(this.smartArrayEnlargeDecayFactor > 0 &&
this.smartArrayEnlargeDecayFactor < 1,
+ String.format("Smart array enlarge decay factor needs to be between
0 and 1, provided value %s", this.smartArrayEnlargeDecayFactor));
+ log.info("Enabled smart resizing for rowBatch -
smartArrayEnlargeFactorMax: {}, smartArrayEnlargeFactorMin: {},
smartArrayEnlargeDecayFactor: {}",
+ smartArrayEnlargeFactorMax, smartArrayEnlargeFactorMin,
smartArrayEnlargeDecayFactor);
+ }
+ log.info("Static enlargeFactor for rowBatch: {}", enlargeFactor);
}
/**
@@ -48,35 +86,51 @@ public class OrcConverterMemoryManager {
* This takes into account the default null values of different ORC
ColumnVectors and approximates their sizes
* Although its a rough calculation, it can accurately depict the weight of
resizes in a column for very large arrays and maps
* Which tend to dominate the size of the buffer overall
- * TODO: Clean this method up considerably and refactor resize logic here
* @param col
* @return
*/
public long calculateSizeOfColHelper(ColumnVector col) {
long converterBufferColSize = 0;
- if (col instanceof ListColumnVector) {
- ListColumnVector listColumnVector = (ListColumnVector) col;
- converterBufferColSize +=
calculateSizeOfColHelper(listColumnVector.child);
- } else if (col instanceof MapColumnVector) {
- MapColumnVector mapColumnVector = (MapColumnVector) col;
- converterBufferColSize += calculateSizeOfColHelper(mapColumnVector.keys);
- converterBufferColSize +=
calculateSizeOfColHelper(mapColumnVector.values);
- } else if (col instanceof StructColumnVector) {
- StructColumnVector structColumnVector = (StructColumnVector) col;
- for (int j = 0; j < structColumnVector.fields.length; j++) {
- converterBufferColSize +=
calculateSizeOfColHelper(structColumnVector.fields[j]);
- }
- } else if (col instanceof UnionColumnVector) {
- UnionColumnVector unionColumnVector = (UnionColumnVector) col;
- for (int j = 0; j < unionColumnVector.fields.length; j++) {
- converterBufferColSize +=
calculateSizeOfColHelper(unionColumnVector.fields[j]);
- }
- } else if (col instanceof LongColumnVector || col instanceof
DoubleColumnVector || col instanceof DecimalColumnVector) {
- // Memory space in bytes of native type
- converterBufferColSize += col.isNull.length * 8;
- } else if (col instanceof BytesColumnVector) {
- // Contains two integer list references of size vector for tracking so
will use that as null size
- converterBufferColSize += ((BytesColumnVector) col).vector.length * 8;
+ switch (col.type) {
+ case LIST:
+ ListColumnVector listColumnVector = (ListColumnVector) col;
+ converterBufferColSize +=
calculateSizeOfColHelper(listColumnVector.child);
+ break;
+ case MAP:
+ MapColumnVector mapColumnVector = (MapColumnVector) col;
+ converterBufferColSize +=
calculateSizeOfColHelper(mapColumnVector.keys);
+ converterBufferColSize +=
calculateSizeOfColHelper(mapColumnVector.values);
+ break;
+ case STRUCT:
+ StructColumnVector structColumnVector = (StructColumnVector) col;
+ for (int j = 0; j < structColumnVector.fields.length; j++) {
+ converterBufferColSize +=
calculateSizeOfColHelper(structColumnVector.fields[j]);
+ }
+ break;
+ case UNION:
+ UnionColumnVector unionColumnVector = (UnionColumnVector) col;
+ for (int j = 0; j < unionColumnVector.fields.length; j++) {
+ converterBufferColSize +=
calculateSizeOfColHelper(unionColumnVector.fields[j]);
+ }
+ break;
+ case BYTES:
+ BytesColumnVector bytesColumnVector = (BytesColumnVector) col;
+ converterBufferColSize += bytesColumnVector.vector.length * 8;
+ break;
+ case DECIMAL:
+ DecimalColumnVector decimalColumnVector = (DecimalColumnVector) col;
+ converterBufferColSize += decimalColumnVector.precision + 2;
+ break;
+ case DOUBLE:
+ DoubleColumnVector doubleColumnVector = (DoubleColumnVector) col;
+ converterBufferColSize += doubleColumnVector.vector.length * 8;
+ break;
+ case LONG:
+ LongColumnVector longColumnVector = (LongColumnVector) col;
+ converterBufferColSize += longColumnVector.vector.length * 8;
+ break;
+ default:
+ // Should never reach here given the types used in
GenericRecordToOrcValueWriter
}
// Calculate overhead of the column's own null reference
converterBufferColSize += col.isNull.length;
@@ -97,4 +151,19 @@ public class OrcConverterMemoryManager {
return converterBufferTotalSize;
}
+ /**
+ * Resize the child-array size based on configuration.
+ * If smart resizing is enabled, it will use an exponential decay algorithm
where it would resize the array by a smaller amount
+ * the more records the converter has processed, as the fluctuation in
record size becomes less likely to differ significantly by then
+ * Since the writer is closed and reset periodically, this is generally a
safe assumption that should prevent large empty array buffers
+ */
+ public int resize(int rowsAdded, int requestedSize) {
+ resizeCount += 1;
+ log.info(String.format("It has been resized %s times in current writer",
resizeCount));
+ if (enabledSmartSizing) {
+ double decayingEnlargeFactor = this.smartArrayEnlargeFactorMax *
Math.pow((1-this.smartArrayEnlargeDecayFactor), rowsAdded-1);
+ return (int) Math.round(requestedSize * Math.max(decayingEnlargeFactor,
this.smartArrayEnlargeFactorMin));
+ }
+ return enlargeFactor * requestedSize;
+ }
}
diff --git
a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriterTest.java
b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriterTest.java
index c22c62827..cfef8f7d0 100644
---
a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriterTest.java
+++
b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriterTest.java
@@ -47,6 +47,7 @@ import com.google.common.io.Files;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.State;
import org.apache.gobblin.util.orc.AvroOrcSchemaConverter;
import static org.apache.orc.mapred.OrcMapredRecordReader.nextValue;
@@ -61,8 +62,9 @@ public class GenericRecordToOrcValueWriterTest {
new
Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("union_test/schema.avsc"));
TypeDescription orcSchema = AvroOrcSchemaConverter.getOrcSchema(schema);
- GenericRecordToOrcValueWriter valueWriter = new
GenericRecordToOrcValueWriter(orcSchema, schema);
VectorizedRowBatch rowBatch = orcSchema.createRowBatch();
+ OrcConverterMemoryManager memoryManager = new
OrcConverterMemoryManager(rowBatch, new State());
+ GenericRecordToOrcValueWriter valueWriter = new
GenericRecordToOrcValueWriter(orcSchema, schema, memoryManager);
List<GenericRecord> recordList = GobblinOrcWriterTest
.deserializeAvroRecords(this.getClass(), schema,
"union_test/data.json");
@@ -122,8 +124,9 @@ public class GenericRecordToOrcValueWriterTest {
new
Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("decimal_test/schema.avsc"));
TypeDescription orcSchema = AvroOrcSchemaConverter.getOrcSchema(schema);
- GenericRecordToOrcValueWriter valueWriter = new
GenericRecordToOrcValueWriter(orcSchema, schema);
VectorizedRowBatch rowBatch = orcSchema.createRowBatch();
+ OrcConverterMemoryManager memoryManager = new
OrcConverterMemoryManager(rowBatch, new State());
+ GenericRecordToOrcValueWriter valueWriter = new
GenericRecordToOrcValueWriter(orcSchema, schema, memoryManager);
List<GenericRecord> recordList = GobblinOrcWriterTest
.deserializeAvroRecords(this.getClass(), schema,
"decimal_test/data.json");
@@ -158,10 +161,11 @@ public class GenericRecordToOrcValueWriterTest {
new
Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("list_map_test/schema.avsc"));
TypeDescription orcSchema = AvroOrcSchemaConverter.getOrcSchema(schema);
- GenericRecordToOrcValueWriter valueWriter = new
GenericRecordToOrcValueWriter(orcSchema, schema);
// Make the batch size very small so that the enlarge behavior would
easily be triggered.
// But this has to more than the number of records that we deserialized
form data.json, as here we don't reset batch.
VectorizedRowBatch rowBatch = orcSchema.createRowBatch(10);
+ OrcConverterMemoryManager memoryManager = new
OrcConverterMemoryManager(rowBatch, new State());
+ GenericRecordToOrcValueWriter valueWriter = new
GenericRecordToOrcValueWriter(orcSchema, schema, memoryManager);
List<GenericRecord> recordList = GobblinOrcWriterTest
.deserializeAvroRecords(this.getClass(), schema,
"list_map_test/data.json");
@@ -170,7 +174,7 @@ public class GenericRecordToOrcValueWriterTest {
valueWriter.write(record, rowBatch);
}
// Examining resize count, which should happen only once for map and list,
so totally 2.
- Assert.assertEquals(valueWriter.resizeCount, 2);
+ Assert.assertEquals(valueWriter.getResizeCount(), 2);
}
@Test
@@ -180,10 +184,11 @@ public class GenericRecordToOrcValueWriterTest {
new
Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("list_map_test/schema.avsc"));
TypeDescription orcSchema = AvroOrcSchemaConverter.getOrcSchema(schema);
- GenericRecordToOrcValueWriter valueWriter = new
GenericRecordToOrcValueWriter(orcSchema, schema);
// Make the batch size very small so that the enlarge behavior would
easily be triggered.
// But this has to more than the number of records that we deserialized
form data.json, as here we don't reset batch.
VectorizedRowBatch rowBatch = orcSchema.createRowBatch(10);
+ OrcConverterMemoryManager memoryManager = new
OrcConverterMemoryManager(rowBatch, new State());
+ GenericRecordToOrcValueWriter valueWriter = new
GenericRecordToOrcValueWriter(orcSchema, schema, memoryManager);
List<GenericRecord> recordList = GobblinOrcWriterTest
.deserializeAvroRecords(this.getClass(), schema,
"list_map_test/data.json");
diff --git
a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/OrcConverterMemoryManagerTest.java
b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/OrcConverterMemoryManagerTest.java
index c04778187..9775f0765 100644
---
a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/OrcConverterMemoryManagerTest.java
+++
b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/OrcConverterMemoryManagerTest.java
@@ -26,6 +26,7 @@ import
org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
import org.testng.Assert;
import org.testng.annotations.Test;
+import org.apache.gobblin.configuration.State;
import org.apache.gobblin.util.orc.AvroOrcSchemaConverter;
@@ -39,8 +40,8 @@ public class OrcConverterMemoryManagerTest {
TypeDescription orcSchema = AvroOrcSchemaConverter.getOrcSchema(schema);
// Make batch size small so that the enlarge behavior would easily be
triggered.
VectorizedRowBatch rowBatch = orcSchema.createRowBatch(10);
- OrcConverterMemoryManager memoryManager = new
OrcConverterMemoryManager(rowBatch);
- GenericRecordToOrcValueWriter valueWriter = new
GenericRecordToOrcValueWriter(orcSchema, schema);
+ OrcConverterMemoryManager memoryManager = new
OrcConverterMemoryManager(rowBatch, new State());
+ GenericRecordToOrcValueWriter valueWriter = new
GenericRecordToOrcValueWriter(orcSchema, schema, memoryManager);
List<GenericRecord> recordList = GobblinOrcWriterTest
.deserializeAvroRecords(this.getClass(), schema,
"list_map_test/data.json");
@@ -62,8 +63,8 @@ public class OrcConverterMemoryManagerTest {
TypeDescription orcSchema = AvroOrcSchemaConverter.getOrcSchema(schema);
// Make batch such that only deeply nested list is resized
VectorizedRowBatch rowBatch = orcSchema.createRowBatch(15);
- OrcConverterMemoryManager memoryManager = new
OrcConverterMemoryManager(rowBatch);
- GenericRecordToOrcValueWriter valueWriter = new
GenericRecordToOrcValueWriter(orcSchema, schema);
+ OrcConverterMemoryManager memoryManager = new
OrcConverterMemoryManager(rowBatch, new State());
+ GenericRecordToOrcValueWriter valueWriter = new
GenericRecordToOrcValueWriter(orcSchema, schema, memoryManager);
List<GenericRecord> recordList = GobblinOrcWriterTest
.deserializeAvroRecords(this.getClass(), schema,
"converter_memory_manager_nested_test/data.json");
@@ -77,4 +78,55 @@ public class OrcConverterMemoryManagerTest {
int expectedSize = 30*3*9 + 30*9 + 15*4; // Deeply nested list + maps +
other structure overhead
Assert.assertEquals(memoryManager.getConverterBufferTotalSize(),
expectedSize);
}
+
+ @Test
+ public void testBufferSmartResize() throws Exception {
+ Schema schema =
+ new
Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("converter_memory_manager_nested_test/schema.avsc"));
+ TypeDescription orcSchema = AvroOrcSchemaConverter.getOrcSchema(schema);
+ // Make batch such that only deeply nested list is resized
+ VectorizedRowBatch rowBatch = orcSchema.createRowBatch(15);
+ State memoryManagerState = new State();
+
memoryManagerState.setProp(GobblinOrcWriterConfigs.ENABLE_SMART_ARRAY_ENLARGE,
"true");
+
memoryManagerState.setProp(GobblinOrcWriterConfigs.SMART_ARRAY_ENLARGE_DECAY_FACTOR,
"0.5");
+
memoryManagerState.setProp(GobblinOrcWriterConfigs.SMART_ARRAY_ENLARGE_FACTOR_MAX,
"10");
+
memoryManagerState.setProp(GobblinOrcWriterConfigs.SMART_ARRAY_ENLARGE_FACTOR_MIN,
"1");
+ OrcConverterMemoryManager memoryManager = new
OrcConverterMemoryManager(rowBatch, memoryManagerState);
+
+ int result = memoryManager.resize(1, 1000);
+ Assert.assertEquals(result, 10000);
+
+ // Result is equal to requested size since the decay factor dominates the
resize
+ result = memoryManager.resize(100, 1000);
+ Assert.assertEquals(result, 1000);
+ }
+
+ @Test
+ public void testBufferSmartResizeParameters() throws Exception {
+ Schema schema =
+ new
Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("converter_memory_manager_nested_test/schema.avsc"));
+ TypeDescription orcSchema = AvroOrcSchemaConverter.getOrcSchema(schema);
+ // Make batch such that only deeply nested list is resized
+ VectorizedRowBatch rowBatch = orcSchema.createRowBatch(15);
+ State memoryManagerState0 = new State();
+
memoryManagerState0.setProp(GobblinOrcWriterConfigs.ENABLE_SMART_ARRAY_ENLARGE,
"true");
+
memoryManagerState0.setProp(GobblinOrcWriterConfigs.SMART_ARRAY_ENLARGE_DECAY_FACTOR,
"0.5");
+
memoryManagerState0.setProp(GobblinOrcWriterConfigs.SMART_ARRAY_ENLARGE_FACTOR_MAX,
"0");
+
memoryManagerState0.setProp(GobblinOrcWriterConfigs.SMART_ARRAY_ENLARGE_FACTOR_MIN,
"1");
+ Assert.assertThrows(IllegalArgumentException.class, () -> new
OrcConverterMemoryManager(rowBatch, memoryManagerState0));
+
+ State memoryManagerState1 = new State();
+
memoryManagerState1.setProp(GobblinOrcWriterConfigs.ENABLE_SMART_ARRAY_ENLARGE,
"true");
+
memoryManagerState1.setProp(GobblinOrcWriterConfigs.SMART_ARRAY_ENLARGE_DECAY_FACTOR,
"0.5");
+
memoryManagerState1.setProp(GobblinOrcWriterConfigs.SMART_ARRAY_ENLARGE_FACTOR_MAX,
"1");
+
memoryManagerState1.setProp(GobblinOrcWriterConfigs.SMART_ARRAY_ENLARGE_FACTOR_MIN,
"0");
+ Assert.assertThrows(IllegalArgumentException.class, () -> new
OrcConverterMemoryManager(rowBatch, memoryManagerState1));
+
+ State memoryManagerState2 = new State();
+
memoryManagerState2.setProp(GobblinOrcWriterConfigs.ENABLE_SMART_ARRAY_ENLARGE,
"true");
+
memoryManagerState2.setProp(GobblinOrcWriterConfigs.SMART_ARRAY_ENLARGE_DECAY_FACTOR,
"1.5");
+
memoryManagerState2.setProp(GobblinOrcWriterConfigs.SMART_ARRAY_ENLARGE_FACTOR_MAX,
"1");
+
memoryManagerState2.setProp(GobblinOrcWriterConfigs.SMART_ARRAY_ENLARGE_FACTOR_MIN,
"1");
+ Assert.assertThrows(IllegalArgumentException.class, () -> new
OrcConverterMemoryManager(rowBatch, memoryManagerState2));
+ }
}