This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 2eaad1d  [GOBBLIN-1417] Make row batch size in ORC Mapreduce writer 
configurable
2eaad1d is described below

commit 2eaad1d5e95f9a1daf1ed58af2715a90235b41d2
Author: Sudarshan Vasudevan <[email protected]>
AuthorDate: Thu May 6 18:00:43 2021 -0700

    [GOBBLIN-1417] Make row batch size in ORC Mapreduce writer configurable
    
    Closes #3253 from sv2000/orcMRRowBatchSize
---
 .../mapreduce/CompactionOrcJobConfigurator.java    | 30 ++++++++++++++--------
 .../mapreduce/orc/OrcKeyCompactorOutputFormat.java | 16 ++++++++----
 .../apache/gobblin/writer/GobblinOrcWriter.java    |  4 +--
 gradle/scripts/dependencyDefinitions.gradle        |  6 ++---
 gradle/scripts/repositories.gradle                 |  3 +++
 5 files changed, 39 insertions(+), 20 deletions(-)

diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionOrcJobConfigurator.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionOrcJobConfigurator.java
index c417c60..d554a53 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionOrcJobConfigurator.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionOrcJobConfigurator.java
@@ -20,13 +20,6 @@ package org.apache.gobblin.compaction.mapreduce;
 import java.io.IOException;
 
 import org.apache.commons.lang3.StringUtils;
-import org.apache.gobblin.compaction.mapreduce.orc.OrcKeyCompactorOutputFormat;
-import org.apache.gobblin.compaction.mapreduce.orc.OrcKeyComparator;
-import org.apache.gobblin.compaction.mapreduce.orc.OrcKeyDedupReducer;
-import org.apache.gobblin.compaction.mapreduce.orc.OrcUtils;
-import org.apache.gobblin.compaction.mapreduce.orc.OrcValueMapper;
-import 
org.apache.gobblin.compaction.mapreduce.orc.OrcValueCombineFileInputFormat;
-import org.apache.gobblin.configuration.State;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.orc.OrcConf;
@@ -34,12 +27,20 @@ import org.apache.orc.TypeDescription;
 import org.apache.orc.mapred.OrcKey;
 import org.apache.orc.mapred.OrcValue;
 
-import static 
org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter.*;
-import static 
org.apache.gobblin.compaction.mapreduce.orc.OrcUtils.eligibleForUpConvert;
+import org.apache.gobblin.compaction.mapreduce.orc.OrcKeyCompactorOutputFormat;
+import org.apache.gobblin.compaction.mapreduce.orc.OrcKeyComparator;
+import org.apache.gobblin.compaction.mapreduce.orc.OrcKeyDedupReducer;
+import org.apache.gobblin.compaction.mapreduce.orc.OrcUtils;
+import 
org.apache.gobblin.compaction.mapreduce.orc.OrcValueCombineFileInputFormat;
+import org.apache.gobblin.compaction.mapreduce.orc.OrcValueMapper;
+import org.apache.gobblin.configuration.State;
 
+import static 
org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter.COMPACTION_OUTPUT_EXTENSION;
+import static 
org.apache.gobblin.compaction.mapreduce.orc.OrcUtils.eligibleForUpConvert;
+import static 
org.apache.gobblin.writer.GobblinOrcWriter.DEFAULT_ORC_WRITER_BATCH_SIZE;
+import static org.apache.gobblin.writer.GobblinOrcWriter.ORC_WRITER_BATCH_SIZE;
 
 public class CompactionOrcJobConfigurator extends CompactionJobConfigurator {
-
   /**
    * The key schema for the shuffle output.
    */
@@ -81,6 +82,14 @@ public class CompactionOrcJobConfigurator extends 
CompactionJobConfigurator {
     job.getConfiguration().set(OrcConf.MAPRED_OUTPUT_SCHEMA.getAttribute(), 
schema.toString());
   }
 
+  private int getWriterRowBatchSize() {
+    return this.state.getPropAsInt(ORC_WRITER_BATCH_SIZE, 
DEFAULT_ORC_WRITER_BATCH_SIZE);
+  }
+
+  protected void setOrcWriterBatchSize(Job job) {
+    job.getConfiguration().setInt(ORC_WRITER_BATCH_SIZE, 
getWriterRowBatchSize());
+  }
+
   protected void configureMapper(Job job) {
     job.setInputFormatClass(OrcValueCombineFileInputFormat.class);
     job.setMapperClass(OrcValueMapper.class);
@@ -96,5 +105,6 @@ public class CompactionOrcJobConfigurator extends 
CompactionJobConfigurator {
     job.setOutputKeyClass(NullWritable.class);
     job.setOutputValueClass(OrcValue.class);
     setNumberOfReducers(job);
+    setOrcWriterBatchSize(job);
   }
 }
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyCompactorOutputFormat.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyCompactorOutputFormat.java
index 9c94224..c17f568 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyCompactorOutputFormat.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyCompactorOutputFormat.java
@@ -18,8 +18,7 @@
 package org.apache.gobblin.compaction.mapreduce.orc;
 
 import java.io.IOException;
-import org.apache.gobblin.compaction.mapreduce.CompactionJobConfigurator;
-import org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.OutputCommitter;
@@ -32,14 +31,19 @@ import org.apache.orc.Writer;
 import org.apache.orc.mapreduce.OrcMapreduceRecordWriter;
 import org.apache.orc.mapreduce.OrcOutputFormat;
 
-import static 
org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter.*;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter;
+import org.apache.gobblin.writer.GobblinOrcWriter;
+
+import static 
org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter.COMPACTION_OUTPUT_EXTENSION;
 
 
 /**
  * Extension of {@link OrcOutputFormat} for customized {@link 
CompactorOutputCommitter}
  */
+@Slf4j
 public class OrcKeyCompactorOutputFormat extends OrcOutputFormat {
-
   private FileOutputCommitter committer = null;
 
   @Override
@@ -65,6 +69,8 @@ public class OrcKeyCompactorOutputFormat extends 
OrcOutputFormat {
     Path filename = getDefaultWorkFile(taskAttemptContext, extension);
     Writer writer = OrcFile.createWriter(filename,
         org.apache.orc.mapred.OrcOutputFormat.buildOptions(conf));
-    return new OrcMapreduceRecordWriter(writer);
+    int rowBatchSize = conf.getInt(GobblinOrcWriter.ORC_WRITER_BATCH_SIZE, 
GobblinOrcWriter.DEFAULT_ORC_WRITER_BATCH_SIZE);
+    log.info("Creating OrcMapreduceRecordWriter with row batch size = {}", 
rowBatchSize);
+    return new OrcMapreduceRecordWriter(writer, rowBatchSize);
   }
 }
diff --git 
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
 
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
index c79eeda..57c19b6 100644
--- 
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
+++ 
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
@@ -59,8 +59,8 @@ import static 
org.apache.gobblin.writer.AvroOrcSchemaConverter.getOrcSchema;
 @Slf4j
 public class GobblinOrcWriter extends FsDataWriter<GenericRecord> {
   static final String ORC_WRITER_PREFIX = "orcWriter.";
-  private static final String ORC_WRITER_BATCH_SIZE = ORC_WRITER_PREFIX + 
"batchSize";
-  private static final int DEFAULT_ORC_WRITER_BATCH_SIZE = 1000;
+  public static final String ORC_WRITER_BATCH_SIZE = ORC_WRITER_PREFIX + 
"batchSize";
+  public static final int DEFAULT_ORC_WRITER_BATCH_SIZE = 1000;
 
   private static final String CONTAINER_MEMORY_MBS = ORC_WRITER_PREFIX + 
"jvm.memory.mbs";
   private static final int DEFAULT_CONTAINER_MEMORY_MBS = 4096;
diff --git a/gradle/scripts/dependencyDefinitions.gradle 
b/gradle/scripts/dependencyDefinitions.gradle
index 50b161d..8aaeb00 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -180,9 +180,9 @@ ext.externalDependency = [
     /**
      * Avoiding conflicts with Hive 1.x versions existed in the classpath
      */
-    "orcMapreduce":"org.apache.orc:orc-mapreduce:1.6.5:nohive",
-    "orcCore": "org.apache.orc:orc-core:1.6.5:nohive",
-    "orcTools":"org.apache.orc:orc-tools:1.6.5",
+    "orcMapreduce":"org.apache.orc:orc-mapreduce:1.6.8-SNAPSHOT:nohive",
+    "orcCore": "org.apache.orc:orc-core:1.6.8-SNAPSHOT:nohive",
+    "orcTools":"org.apache.orc:orc-tools:1.6.8-SNAPSHOT",
     'parquet': 'org.apache.parquet:parquet-hadoop:1.11.0',
     'parquetAvro': 'org.apache.parquet:parquet-avro:1.11.0',
     'parquetProto': 'org.apache.parquet:parquet-protobuf:1.11.0',
diff --git a/gradle/scripts/repositories.gradle 
b/gradle/scripts/repositories.gradle
index fa2c2b8..a135565 100644
--- a/gradle/scripts/repositories.gradle
+++ b/gradle/scripts/repositories.gradle
@@ -27,6 +27,9 @@ repositories {
     url "https://linkedin.bintray.com/maven";
   }
   maven {
+    url "https://repository.apache.org/content/repositories/snapshots";
+  }
+  maven {
     url "https://linkedin.jfrog.com/artifactory/";
   }
   jcenter()

Reply via email to