This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 0187e03  [GOBBLIN-1290] Auto-tune ORC writer parameters
0187e03 is described below

commit 0187e03a6fb213edf7c941e4a43bfed62534559c
Author: Lei Sun <[email protected]>
AuthorDate: Thu Nov 19 12:03:37 2020 -0800

    [GOBBLIN-1290] Auto-tune ORC writer parameters
    
    Auto-tune ORC writer parameters based on record
    size
    
    Format log a bit
    
    Update git ignore
    
    Add simple unit test
    
    update gitignore for pegasus migration
    
    Address reviewer's comments
    
    Closes #3128 from autumnust/autotune
---
 .gitignore                                         |  2 +
 .../gobblin/configuration/ConfigurationKeys.java   |  4 ++
 gobblin-cluster/build.gradle                       | 14 ++--
 .../extractor/extract/kafka/KafkaSource.java       |  1 -
 .../source/extractor/extract/kafka/KafkaUtils.java |  6 +-
 gobblin-modules/gobblin-orc/build.gradle           |  1 +
 .../apache/gobblin/writer/GobblinOrcWriter.java    | 78 +++++++++++++++++++++-
 .../gobblin/writer/GobblinOrcWriterTest.java       | 42 ++++++++++++
 8 files changed, 135 insertions(+), 13 deletions(-)

diff --git a/.gitignore b/.gitignore
index a6075f2..ac60457 100644
--- a/.gitignore
+++ b/.gitignore
@@ -10,6 +10,7 @@
 **/*.iws
 **/*.ipr
 .shelf/
+.ideaDataSources/
 
 # VS Code related 
 .vscode
@@ -49,6 +50,7 @@ out/
 */bin/
 **/mainGeneratedDataTemplate
 **/mainGeneratedRest
+**/main/snapshot
 gobblin-dist/
 gobblin-distribution-*.tar.gz
 gobblin.tar.gz
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 004be99..f93bcdc 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -502,6 +502,10 @@ public class ConfigurationKeys {
   public static final boolean DEFAULT_SCHEMA_IN_SOURCE_DIR = false;
   public static final String SCHEMA_FILENAME = "schema.filename";
   public static final String DEFAULT_SCHEMA_FILENAME = "metadata.json";
+  // An optional configuration for extractor's specific implementation to set, 
which helps data writer
+  // tune some parameters that are relevant to the record size.
+  // See the reference GobblinOrcWriter as an example.
+  public static final String AVG_RECORD_SIZE = "avg.record.size";
 
   // Comma-separated source entity names
   public static final String SOURCE_ENTITIES = "source.entities";
diff --git a/gobblin-cluster/build.gradle b/gobblin-cluster/build.gradle
index 8e92618..f8e8324 100644
--- a/gobblin-cluster/build.gradle
+++ b/gobblin-cluster/build.gradle
@@ -85,14 +85,12 @@ configurations {
   }
 }
 // Generates a maven test artifact (gobblin-cluster-test)
-project.publishing {
-  publications {
-    testsPub(MavenPublication) {
-      artifactId "${project.archivesBaseName}-test"
-      artifacts = [testJar]
-      pom pomAttributes
-      pom.withXml addRuntimeDependenciesToPom
-    }
+project.publishing.publications {
+  testsPub(MavenPublication) {
+    artifactId "${project.archivesBaseName}-test"
+    artifacts = [testJar]
+    pom pomAttributes
+    pom.withXml addRuntimeDependenciesToPom
   }
 }
 addPublicationToBintray("testsPub")
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index 6e5927b..352a167 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -105,7 +105,6 @@ public abstract class KafkaSource<S, D> extends 
EventBasedSource<S, D> {
   //A workunit property that contains the number of topic partitions for a 
given topic. Useful for
   //workunit size estimation to assign weights to a given topic partition.
   public static final String NUM_TOPIC_PARTITIONS = "numTopicPartitions";
-  public static final String AVG_RECORD_SIZE = "avg.record.size";
   public static final String AVG_RECORD_MILLIS = "avg.record.millis";
   public static final String START_FETCH_EPOCH_TIME = "startFetchEpochTime";
   public static final String STOP_FETCH_EPOCH_TIME = "stopFetchEpochTime";
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtils.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtils.java
index 5472134..f5b52b7 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtils.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtils.java
@@ -110,7 +110,7 @@ public class KafkaUtils {
    */
   public static boolean containsPartitionAvgRecordSize(State state, 
KafkaPartition partition) {
     return state.contains(
-        getPartitionPropName(partition.getTopicName(), partition.getId()) + 
"." + KafkaSource.AVG_RECORD_SIZE);
+        getPartitionPropName(partition.getTopicName(), partition.getId()) + 
"." + ConfigurationKeys.AVG_RECORD_SIZE);
   }
 
   /**
@@ -119,7 +119,7 @@ public class KafkaUtils {
    */
   public static long getPartitionAvgRecordSize(State state, KafkaPartition 
partition) {
     return state.getPropAsLong(
-        getPartitionPropName(partition.getTopicName(), partition.getId()) + 
"." + KafkaSource.AVG_RECORD_SIZE);
+        getPartitionPropName(partition.getTopicName(), partition.getId()) + 
"." + ConfigurationKeys.AVG_RECORD_SIZE);
   }
 
   /**
@@ -127,7 +127,7 @@ public class KafkaUtils {
    * "[topicname].[partitionid].avg.record.size".
    */
   public static void setPartitionAvgRecordSize(State state, KafkaPartition 
partition, long size) {
-    state.setProp(getPartitionPropName(partition.getTopicName(), 
partition.getId()) + "." + KafkaSource.AVG_RECORD_SIZE,
+    state.setProp(getPartitionPropName(partition.getTopicName(), 
partition.getId()) + "." + ConfigurationKeys.AVG_RECORD_SIZE,
         size);
   }
 
diff --git a/gobblin-modules/gobblin-orc/build.gradle 
b/gobblin-modules/gobblin-orc/build.gradle
index 68f7857..603446e 100644
--- a/gobblin-modules/gobblin-orc/build.gradle
+++ b/gobblin-modules/gobblin-orc/build.gradle
@@ -29,6 +29,7 @@ dependencies {
   compile externalDependency.hiveStorageApi
   compile externalDependency.orcCore
 
+
   testCompile externalDependency.testng
   testCompile externalDependency.mockito
   testCompile externalDependency.hiveSerDe
diff --git 
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
 
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
index 96f0c8c..c79eeda 100644
--- 
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
+++ 
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator;
 import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
+import org.apache.orc.OrcConf;
 import org.apache.orc.OrcFile;
 import org.apache.orc.TypeDescription;
 import org.apache.orc.Writer;
@@ -48,6 +49,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.state.ConstructState;
 
+import static 
org.apache.gobblin.configuration.ConfigurationKeys.AVG_RECORD_SIZE;
 import static org.apache.gobblin.writer.AvroOrcSchemaConverter.getOrcSchema;
 
 
@@ -60,6 +62,34 @@ public class GobblinOrcWriter extends 
FsDataWriter<GenericRecord> {
   private static final String ORC_WRITER_BATCH_SIZE = ORC_WRITER_PREFIX + 
"batchSize";
   private static final int DEFAULT_ORC_WRITER_BATCH_SIZE = 1000;
 
+  private static final String CONTAINER_MEMORY_MBS = ORC_WRITER_PREFIX + 
"jvm.memory.mbs";
+  private static final int DEFAULT_CONTAINER_MEMORY_MBS = 4096;
+
+  private static final String CONTAINER_JVM_MEMORY_XMX_RATIO_KEY = 
ORC_WRITER_PREFIX + "container.jvmMemoryXmxRatio";
+  private static final double DEFAULT_CONTAINER_JVM_MEMORY_XMX_RATIO_KEY = 1.0;
+
+  static final String CONTAINER_JVM_MEMORY_OVERHEAD_MBS = ORC_WRITER_PREFIX + 
"container.jvmMemoryOverheadMbs";
+  private static final int DEFAULT_CONTAINER_JVM_MEMORY_OVERHEAD_MBS = 0;
+
+  @VisibleForTesting
+  static final String ORC_WRITER_AUTO_TUNE_ENABLED = ORC_WRITER_PREFIX + 
"autoTuneEnabled";
+  private static final boolean ORC_WRITER_AUTO_TUNE_DEFAULT = false;
+  private static final long EXEMPLIFIED_RECORD_SIZE_IN_BYTES = 1024;
+
+  /**
+   * This value gives an estimation on how many writers are buffering records 
at the same time in a container.
+   * Since time-based partition scheme is a commonly used practice, plus the 
chances for late-arrival data,
+   * usually there would be 2-3 writers running during the hourly boundary. 3 
is chosen here for being conservative.
+   */
+  private static final int ESTIMATED_PARALLELISM_WRITERS = 3;
+
+  // The serialized record size passed from AVG_RECORD_SIZE is smaller than 
the actual in-memory representation
+  // of a record. This is just the number represents how many times that the 
actual buffer storing record is larger
+  // than the serialized size passed down from upstream constructs.
+  @VisibleForTesting
+  static final String RECORD_SIZE_SCALE_FACTOR = "recordSize.scaleFactor";
+  static final int DEFAULT_RECORD_SIZE_SCALE_FACTOR = 6;
+
   /**
    * Check comment of {@link #deepCleanRowBatch} for the usage of this 
configuration.
    */
@@ -77,10 +107,56 @@ public class GobblinOrcWriter extends 
FsDataWriter<GenericRecord> {
   private final int batchSize;
   private final Schema avroSchema;
 
+  /**
+   * There are couple of parameters in ORC writer that requires manual tuning 
based on record size given that executor
+   * for running these ORC writers has limited heap space. This helper 
function wrap them and has side effect for the
+   * argument {@param properties}.
+   *
+   * Assumption for current implementation:
+   * The extractor or source class should set {@link 
org.apache.gobblin.configuration.ConfigurationKeys#AVG_RECORD_SIZE}
+   */
+  protected void autoTunedOrcWriterParams(State properties) {
+    double writerRatio = 
properties.getPropAsDouble(OrcConf.MEMORY_POOL.name(), (double) 
OrcConf.MEMORY_POOL.getDefaultValue());
+    long availableHeapPerWriter = Math.round(availableHeapSize(properties) * 
writerRatio / ESTIMATED_PARALLELISM_WRITERS);
+
+    // Upstream constructs will need to set this value properly
+    long estimatedRecordSize = getEstimatedRecordSize(properties);
+    long rowsBetweenCheck = availableHeapPerWriter * 1024 / 
estimatedRecordSize;
+    properties.setProp(OrcConf.ROWS_BETWEEN_CHECKS.name(),
+        Math.min(rowsBetweenCheck, (int) 
OrcConf.ROWS_BETWEEN_CHECKS.getDefaultValue()));
+    // Row batch size should be smaller than row_between_check, 4 is just a 
magic number picked here.
+    long batchSize = Math.min(rowsBetweenCheck / 4, 
DEFAULT_ORC_WRITER_BATCH_SIZE);
+    properties.setProp(ORC_WRITER_BATCH_SIZE, batchSize);
+    log.info("Tuned the parameter " + OrcConf.ROWS_BETWEEN_CHECKS.name() + " 
to be:" + rowsBetweenCheck + ","
+        + ORC_WRITER_BATCH_SIZE + " to be:" + batchSize);
+  }
+
+  /**
+   * Calculate the heap size in MB available for ORC writers.
+   */
+  protected long availableHeapSize(State Properties) {
+    // Calculate the recommended size as the threshold for memory check
+    long physicalMem = 
Math.round(Properties.getPropAsLong(CONTAINER_MEMORY_MBS, 
DEFAULT_CONTAINER_MEMORY_MBS)
+        * properties.getPropAsDouble(CONTAINER_JVM_MEMORY_XMX_RATIO_KEY, 
DEFAULT_CONTAINER_JVM_MEMORY_XMX_RATIO_KEY));
+    long nonHeap = properties.getPropAsLong(CONTAINER_JVM_MEMORY_OVERHEAD_MBS, 
DEFAULT_CONTAINER_JVM_MEMORY_OVERHEAD_MBS);
+    return physicalMem - nonHeap;
+  }
+
+  /**
+   * Calculate the estimated record size in bytes.
+   */
+  protected long getEstimatedRecordSize(State properties) {
+    long estimatedRecordSizeScale = 
properties.getPropAsInt(RECORD_SIZE_SCALE_FACTOR, 
DEFAULT_RECORD_SIZE_SCALE_FACTOR);
+    return (properties.contains(AVG_RECORD_SIZE) ? 
properties.getPropAsLong(AVG_RECORD_SIZE)
+        : EXEMPLIFIED_RECORD_SIZE_IN_BYTES) * estimatedRecordSizeScale;
+  }
+
   public GobblinOrcWriter(FsDataWriterBuilder<Schema, GenericRecord> builder, 
State properties)
       throws IOException {
     super(builder, properties);
-
+    if (properties.getPropAsBoolean(ORC_WRITER_AUTO_TUNE_ENABLED, 
ORC_WRITER_AUTO_TUNE_DEFAULT)) {
+      autoTunedOrcWriterParams(properties);
+    }
 
     // Create value-writer which is essentially a record-by-record-converter 
with buffering in batch.
     this.avroSchema = builder.getSchema();
diff --git 
a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GobblinOrcWriterTest.java
 
b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GobblinOrcWriterTest.java
index f3bd2dc..e6b5f3d 100644
--- 
a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GobblinOrcWriterTest.java
+++ 
b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GobblinOrcWriterTest.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
+import org.apache.orc.OrcConf;
 import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
 import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
 import org.mockito.Mockito;
@@ -44,10 +45,19 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.source.workunit.WorkUnit;
 
+import static 
org.apache.gobblin.configuration.ConfigurationKeys.AVG_RECORD_SIZE;
 import static 
org.apache.gobblin.writer.GenericRecordToOrcValueWriterTest.deserializeOrcRecords;
+import static 
org.apache.gobblin.writer.GobblinOrcWriter.CONTAINER_JVM_MEMORY_OVERHEAD_MBS;
+import static 
org.apache.gobblin.writer.GobblinOrcWriter.DEFAULT_RECORD_SIZE_SCALE_FACTOR;
+import static 
org.apache.gobblin.writer.GobblinOrcWriter.ORC_WRITER_AUTO_TUNE_ENABLED;
+import static 
org.apache.gobblin.writer.GobblinOrcWriter.RECORD_SIZE_SCALE_FACTOR;
 import static org.mockito.Mockito.*;
 
 
+/**
+ * For running these tests in IDE, make sure all ORC libraries existed in the 
external library folder are specified
+ * with "nohive" classifier if they do (orc-core)
+ */
 public class GobblinOrcWriterTest {
 
   public static final List<GenericRecord> deserializeAvroRecords(Class clazz, 
Schema schema, String schemaPath)
@@ -72,6 +82,38 @@ public class GobblinOrcWriterTest {
   }
 
   @Test
+  public void testAutoTuned() throws Exception {
+    Closer closer = Closer.create();
+    Schema schema =
+        new 
Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("orc_writer_test/schema.avsc"));
+
+    FsDataWriterBuilder<Schema, GenericRecord> mockBuilder =
+        (FsDataWriterBuilder<Schema, GenericRecord>) 
Mockito.mock(FsDataWriterBuilder.class);
+    when(mockBuilder.getSchema()).thenReturn(schema);
+    State properties = new WorkUnit();
+    String stagingDir = Files.createTempDir().getAbsolutePath();
+    String outputDir = Files.createTempDir().getAbsolutePath();
+    properties.setProp(ConfigurationKeys.WRITER_STAGING_DIR, stagingDir);
+    properties.setProp(ConfigurationKeys.WRITER_FILE_PATH, "simple");
+    properties.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, outputDir);
+    when(mockBuilder.getFileName(properties)).thenReturn("file");
+
+    properties.setProp(ORC_WRITER_AUTO_TUNE_ENABLED, true);
+    properties.setProp(CONTAINER_JVM_MEMORY_OVERHEAD_MBS, 2048);
+    closer.register(new GobblinOrcWriter(mockBuilder, properties));
+    // Verify the side effect within the properties object.
+    
Assert.assertEquals(properties.getPropAsInt(OrcConf.ROWS_BETWEEN_CHECKS.name()),
+        Math.round((4096 - 2048) * 0.5 * 1024 / 3) / (1024 * 
properties.getPropAsInt(RECORD_SIZE_SCALE_FACTOR, 
DEFAULT_RECORD_SIZE_SCALE_FACTOR)));
+
+    // Will get to 5000
+    properties.setProp(AVG_RECORD_SIZE, 10);
+    closer.register(new GobblinOrcWriter(mockBuilder, properties));
+    
Assert.assertEquals(properties.getPropAsInt(OrcConf.ROWS_BETWEEN_CHECKS.name()),
 5000);
+
+    closer.close();
+  }
+
+  @Test
   public void testRowBatchDeepClean() throws Exception {
     Schema schema = new Schema.Parser().parse(
         
this.getClass().getClassLoader().getResourceAsStream("orc_writer_list_test/schema.avsc"));

Reply via email to