This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 0187e03 [GOBBLIN-1290] Auto-tune ORC writer parameters
0187e03 is described below
commit 0187e03a6fb213edf7c941e4a43bfed62534559c
Author: Lei Sun <[email protected]>
AuthorDate: Thu Nov 19 12:03:37 2020 -0800
[GOBBLIN-1290] Auto-tune ORC writer parameters
Auto-tune ORC writer parameters based on record
size
Format log a bit
Update git ignore
Add simple unit test
update gitignore for pegasus migration
Address reviewer's comments
Closes #3128 from autumnust/autotune
---
.gitignore | 2 +
.../gobblin/configuration/ConfigurationKeys.java | 4 ++
gobblin-cluster/build.gradle | 14 ++--
.../extractor/extract/kafka/KafkaSource.java | 1 -
.../source/extractor/extract/kafka/KafkaUtils.java | 6 +-
gobblin-modules/gobblin-orc/build.gradle | 1 +
.../apache/gobblin/writer/GobblinOrcWriter.java | 78 +++++++++++++++++++++-
.../gobblin/writer/GobblinOrcWriterTest.java | 42 ++++++++++++
8 files changed, 135 insertions(+), 13 deletions(-)
diff --git a/.gitignore b/.gitignore
index a6075f2..ac60457 100644
--- a/.gitignore
+++ b/.gitignore
@@ -10,6 +10,7 @@
**/*.iws
**/*.ipr
.shelf/
+.ideaDataSources/
# VS Code related
.vscode
@@ -49,6 +50,7 @@ out/
*/bin/
**/mainGeneratedDataTemplate
**/mainGeneratedRest
+**/main/snapshot
gobblin-dist/
gobblin-distribution-*.tar.gz
gobblin.tar.gz
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 004be99..f93bcdc 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -502,6 +502,10 @@ public class ConfigurationKeys {
public static final boolean DEFAULT_SCHEMA_IN_SOURCE_DIR = false;
public static final String SCHEMA_FILENAME = "schema.filename";
public static final String DEFAULT_SCHEMA_FILENAME = "metadata.json";
+ // An optional configuration for extractor's specific implementation to set,
which helps data writer
+ // tune some parameters that are relevant to the record size.
+ // See the reference GobblinOrcWriter as an example.
+ public static final String AVG_RECORD_SIZE = "avg.record.size";
// Comma-separated source entity names
public static final String SOURCE_ENTITIES = "source.entities";
diff --git a/gobblin-cluster/build.gradle b/gobblin-cluster/build.gradle
index 8e92618..f8e8324 100644
--- a/gobblin-cluster/build.gradle
+++ b/gobblin-cluster/build.gradle
@@ -85,14 +85,12 @@ configurations {
}
}
// Generates a maven test artifact (gobblin-cluster-test)
-project.publishing {
- publications {
- testsPub(MavenPublication) {
- artifactId "${project.archivesBaseName}-test"
- artifacts = [testJar]
- pom pomAttributes
- pom.withXml addRuntimeDependenciesToPom
- }
+project.publishing.publications {
+ testsPub(MavenPublication) {
+ artifactId "${project.archivesBaseName}-test"
+ artifacts = [testJar]
+ pom pomAttributes
+ pom.withXml addRuntimeDependenciesToPom
}
}
addPublicationToBintray("testsPub")
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index 6e5927b..352a167 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -105,7 +105,6 @@ public abstract class KafkaSource<S, D> extends
EventBasedSource<S, D> {
//A workunit property that contains the number of topic partitions for a
given topic. Useful for
//workunit size estimation to assign weights to a given topic partition.
public static final String NUM_TOPIC_PARTITIONS = "numTopicPartitions";
- public static final String AVG_RECORD_SIZE = "avg.record.size";
public static final String AVG_RECORD_MILLIS = "avg.record.millis";
public static final String START_FETCH_EPOCH_TIME = "startFetchEpochTime";
public static final String STOP_FETCH_EPOCH_TIME = "stopFetchEpochTime";
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtils.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtils.java
index 5472134..f5b52b7 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtils.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtils.java
@@ -110,7 +110,7 @@ public class KafkaUtils {
*/
public static boolean containsPartitionAvgRecordSize(State state,
KafkaPartition partition) {
return state.contains(
- getPartitionPropName(partition.getTopicName(), partition.getId()) +
"." + KafkaSource.AVG_RECORD_SIZE);
+ getPartitionPropName(partition.getTopicName(), partition.getId()) +
"." + ConfigurationKeys.AVG_RECORD_SIZE);
}
/**
@@ -119,7 +119,7 @@ public class KafkaUtils {
*/
public static long getPartitionAvgRecordSize(State state, KafkaPartition
partition) {
return state.getPropAsLong(
- getPartitionPropName(partition.getTopicName(), partition.getId()) +
"." + KafkaSource.AVG_RECORD_SIZE);
+ getPartitionPropName(partition.getTopicName(), partition.getId()) +
"." + ConfigurationKeys.AVG_RECORD_SIZE);
}
/**
@@ -127,7 +127,7 @@ public class KafkaUtils {
* "[topicname].[partitionid].avg.record.size".
*/
public static void setPartitionAvgRecordSize(State state, KafkaPartition
partition, long size) {
- state.setProp(getPartitionPropName(partition.getTopicName(),
partition.getId()) + "." + KafkaSource.AVG_RECORD_SIZE,
+ state.setProp(getPartitionPropName(partition.getTopicName(),
partition.getId()) + "." + ConfigurationKeys.AVG_RECORD_SIZE,
size);
}
diff --git a/gobblin-modules/gobblin-orc/build.gradle
b/gobblin-modules/gobblin-orc/build.gradle
index 68f7857..603446e 100644
--- a/gobblin-modules/gobblin-orc/build.gradle
+++ b/gobblin-modules/gobblin-orc/build.gradle
@@ -29,6 +29,7 @@ dependencies {
compile externalDependency.hiveStorageApi
compile externalDependency.orcCore
+
testCompile externalDependency.testng
testCompile externalDependency.mockito
testCompile externalDependency.hiveSerDe
diff --git
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
index 96f0c8c..c79eeda 100644
---
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
+++
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator;
import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
+import org.apache.orc.OrcConf;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
@@ -48,6 +49,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.state.ConstructState;
+import static
org.apache.gobblin.configuration.ConfigurationKeys.AVG_RECORD_SIZE;
import static org.apache.gobblin.writer.AvroOrcSchemaConverter.getOrcSchema;
@@ -60,6 +62,34 @@ public class GobblinOrcWriter extends
FsDataWriter<GenericRecord> {
private static final String ORC_WRITER_BATCH_SIZE = ORC_WRITER_PREFIX +
"batchSize";
private static final int DEFAULT_ORC_WRITER_BATCH_SIZE = 1000;
+ private static final String CONTAINER_MEMORY_MBS = ORC_WRITER_PREFIX +
"jvm.memory.mbs";
+ private static final int DEFAULT_CONTAINER_MEMORY_MBS = 4096;
+
+ private static final String CONTAINER_JVM_MEMORY_XMX_RATIO_KEY =
ORC_WRITER_PREFIX + "container.jvmMemoryXmxRatio";
+ private static final double DEFAULT_CONTAINER_JVM_MEMORY_XMX_RATIO_KEY = 1.0;
+
+ static final String CONTAINER_JVM_MEMORY_OVERHEAD_MBS = ORC_WRITER_PREFIX +
"container.jvmMemoryOverheadMbs";
+ private static final int DEFAULT_CONTAINER_JVM_MEMORY_OVERHEAD_MBS = 0;
+
+ @VisibleForTesting
+ static final String ORC_WRITER_AUTO_TUNE_ENABLED = ORC_WRITER_PREFIX +
"autoTuneEnabled";
+ private static final boolean ORC_WRITER_AUTO_TUNE_DEFAULT = false;
+ private static final long EXEMPLIFIED_RECORD_SIZE_IN_BYTES = 1024;
+
+ /**
+ * This value gives an estimation on how many writers are buffering records
at the same time in a container.
+ * Since time-based partition scheme is a commonly used practice, plus the
chances for late-arrival data,
+ * usually there would be 2-3 writers running during the hourly boundary. 3
is chosen here for being conservative.
+ */
+ private static final int ESTIMATED_PARALLELISM_WRITERS = 3;
+
+ // The serialized record size passed from AVG_RECORD_SIZE is smaller than
the actual in-memory representation
+ // of a record. This is just the number represents how many times that the
actual buffer storing record is larger
+ // than the serialized size passed down from upstream constructs.
+ @VisibleForTesting
+ static final String RECORD_SIZE_SCALE_FACTOR = "recordSize.scaleFactor";
+ static final int DEFAULT_RECORD_SIZE_SCALE_FACTOR = 6;
+
/**
* Check comment of {@link #deepCleanRowBatch} for the usage of this
configuration.
*/
@@ -77,10 +107,56 @@ public class GobblinOrcWriter extends
FsDataWriter<GenericRecord> {
private final int batchSize;
private final Schema avroSchema;
+ /**
+ * There are couple of parameters in ORC writer that requires manual tuning
based on record size given that executor
+ * for running these ORC writers has limited heap space. This helper
function wrap them and has side effect for the
+ * argument {@param properties}.
+ *
+ * Assumption for current implementation:
+ * The extractor or source class should set {@link
org.apache.gobblin.configuration.ConfigurationKeys#AVG_RECORD_SIZE}
+ */
+ protected void autoTunedOrcWriterParams(State properties) {
+ double writerRatio =
properties.getPropAsDouble(OrcConf.MEMORY_POOL.name(), (double)
OrcConf.MEMORY_POOL.getDefaultValue());
+ long availableHeapPerWriter = Math.round(availableHeapSize(properties) *
writerRatio / ESTIMATED_PARALLELISM_WRITERS);
+
+ // Upstream constructs will need to set this value properly
+ long estimatedRecordSize = getEstimatedRecordSize(properties);
+ long rowsBetweenCheck = availableHeapPerWriter * 1024 /
estimatedRecordSize;
+ properties.setProp(OrcConf.ROWS_BETWEEN_CHECKS.name(),
+ Math.min(rowsBetweenCheck, (int)
OrcConf.ROWS_BETWEEN_CHECKS.getDefaultValue()));
+ // Row batch size should be smaller than row_between_check, 4 is just a
magic number picked here.
+ long batchSize = Math.min(rowsBetweenCheck / 4,
DEFAULT_ORC_WRITER_BATCH_SIZE);
+ properties.setProp(ORC_WRITER_BATCH_SIZE, batchSize);
+ log.info("Tuned the parameter " + OrcConf.ROWS_BETWEEN_CHECKS.name() + "
to be:" + rowsBetweenCheck + ","
+ + ORC_WRITER_BATCH_SIZE + " to be:" + batchSize);
+ }
+
+ /**
+ * Calculate the heap size in MB available for ORC writers.
+ */
+ protected long availableHeapSize(State Properties) {
+ // Calculate the recommended size as the threshold for memory check
+ long physicalMem =
Math.round(Properties.getPropAsLong(CONTAINER_MEMORY_MBS,
DEFAULT_CONTAINER_MEMORY_MBS)
+ * properties.getPropAsDouble(CONTAINER_JVM_MEMORY_XMX_RATIO_KEY,
DEFAULT_CONTAINER_JVM_MEMORY_XMX_RATIO_KEY));
+ long nonHeap = properties.getPropAsLong(CONTAINER_JVM_MEMORY_OVERHEAD_MBS,
DEFAULT_CONTAINER_JVM_MEMORY_OVERHEAD_MBS);
+ return physicalMem - nonHeap;
+ }
+
+ /**
+ * Calculate the estimated record size in bytes.
+ */
+ protected long getEstimatedRecordSize(State properties) {
+ long estimatedRecordSizeScale =
properties.getPropAsInt(RECORD_SIZE_SCALE_FACTOR,
DEFAULT_RECORD_SIZE_SCALE_FACTOR);
+ return (properties.contains(AVG_RECORD_SIZE) ?
properties.getPropAsLong(AVG_RECORD_SIZE)
+ : EXEMPLIFIED_RECORD_SIZE_IN_BYTES) * estimatedRecordSizeScale;
+ }
+
public GobblinOrcWriter(FsDataWriterBuilder<Schema, GenericRecord> builder,
State properties)
throws IOException {
super(builder, properties);
-
+ if (properties.getPropAsBoolean(ORC_WRITER_AUTO_TUNE_ENABLED,
ORC_WRITER_AUTO_TUNE_DEFAULT)) {
+ autoTunedOrcWriterParams(properties);
+ }
// Create value-writer which is essentially a record-by-record-converter
with buffering in batch.
this.avroSchema = builder.getSchema();
diff --git
a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GobblinOrcWriterTest.java
b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GobblinOrcWriterTest.java
index f3bd2dc..e6b5f3d 100644
---
a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GobblinOrcWriterTest.java
+++
b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GobblinOrcWriterTest.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
+import org.apache.orc.OrcConf;
import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
import org.mockito.Mockito;
@@ -44,10 +45,19 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.source.workunit.WorkUnit;
+import static
org.apache.gobblin.configuration.ConfigurationKeys.AVG_RECORD_SIZE;
import static
org.apache.gobblin.writer.GenericRecordToOrcValueWriterTest.deserializeOrcRecords;
+import static
org.apache.gobblin.writer.GobblinOrcWriter.CONTAINER_JVM_MEMORY_OVERHEAD_MBS;
+import static
org.apache.gobblin.writer.GobblinOrcWriter.DEFAULT_RECORD_SIZE_SCALE_FACTOR;
+import static
org.apache.gobblin.writer.GobblinOrcWriter.ORC_WRITER_AUTO_TUNE_ENABLED;
+import static
org.apache.gobblin.writer.GobblinOrcWriter.RECORD_SIZE_SCALE_FACTOR;
import static org.mockito.Mockito.*;
+/**
+ * For running these tests in IDE, make sure all ORC libraries existed in the
external library folder are specified
+ * with "nohive" classifier if they do (orc-core)
+ */
public class GobblinOrcWriterTest {
public static final List<GenericRecord> deserializeAvroRecords(Class clazz,
Schema schema, String schemaPath)
@@ -72,6 +82,38 @@ public class GobblinOrcWriterTest {
}
@Test
+ public void testAutoTuned() throws Exception {
+ Closer closer = Closer.create();
+ Schema schema =
+ new
Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("orc_writer_test/schema.avsc"));
+
+ FsDataWriterBuilder<Schema, GenericRecord> mockBuilder =
+ (FsDataWriterBuilder<Schema, GenericRecord>)
Mockito.mock(FsDataWriterBuilder.class);
+ when(mockBuilder.getSchema()).thenReturn(schema);
+ State properties = new WorkUnit();
+ String stagingDir = Files.createTempDir().getAbsolutePath();
+ String outputDir = Files.createTempDir().getAbsolutePath();
+ properties.setProp(ConfigurationKeys.WRITER_STAGING_DIR, stagingDir);
+ properties.setProp(ConfigurationKeys.WRITER_FILE_PATH, "simple");
+ properties.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, outputDir);
+ when(mockBuilder.getFileName(properties)).thenReturn("file");
+
+ properties.setProp(ORC_WRITER_AUTO_TUNE_ENABLED, true);
+ properties.setProp(CONTAINER_JVM_MEMORY_OVERHEAD_MBS, 2048);
+ closer.register(new GobblinOrcWriter(mockBuilder, properties));
+ // Verify the side effect within the properties object.
+
Assert.assertEquals(properties.getPropAsInt(OrcConf.ROWS_BETWEEN_CHECKS.name()),
+ Math.round((4096 - 2048) * 0.5 * 1024 / 3) / (1024 *
properties.getPropAsInt(RECORD_SIZE_SCALE_FACTOR,
DEFAULT_RECORD_SIZE_SCALE_FACTOR)));
+
+ // Will get to 5000
+ properties.setProp(AVG_RECORD_SIZE, 10);
+ closer.register(new GobblinOrcWriter(mockBuilder, properties));
+
Assert.assertEquals(properties.getPropAsInt(OrcConf.ROWS_BETWEEN_CHECKS.name()),
5000);
+
+ closer.close();
+ }
+
+ @Test
public void testRowBatchDeepClean() throws Exception {
Schema schema = new Schema.Parser().parse(
this.getClass().getClassLoader().getResourceAsStream("orc_writer_list_test/schema.avsc"));