This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 2eaad1d [GOBBLIN-1417] Make row batch size in ORC Mapreduce writer
configurable
2eaad1d is described below
commit 2eaad1d5e95f9a1daf1ed58af2715a90235b41d2
Author: Sudarshan Vasudevan <[email protected]>
AuthorDate: Thu May 6 18:00:43 2021 -0700
[GOBBLIN-1417] Make row batch size in ORC Mapreduce writer configurable
Closes #3253 from sv2000/orcMRRowBatchSize
---
.../mapreduce/CompactionOrcJobConfigurator.java | 30 ++++++++++++++--------
.../mapreduce/orc/OrcKeyCompactorOutputFormat.java | 16 ++++++++----
.../apache/gobblin/writer/GobblinOrcWriter.java | 4 +--
gradle/scripts/dependencyDefinitions.gradle | 6 ++---
gradle/scripts/repositories.gradle | 3 +++
5 files changed, 39 insertions(+), 20 deletions(-)
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionOrcJobConfigurator.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionOrcJobConfigurator.java
index c417c60..d554a53 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionOrcJobConfigurator.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionOrcJobConfigurator.java
@@ -20,13 +20,6 @@ package org.apache.gobblin.compaction.mapreduce;
import java.io.IOException;
import org.apache.commons.lang3.StringUtils;
-import org.apache.gobblin.compaction.mapreduce.orc.OrcKeyCompactorOutputFormat;
-import org.apache.gobblin.compaction.mapreduce.orc.OrcKeyComparator;
-import org.apache.gobblin.compaction.mapreduce.orc.OrcKeyDedupReducer;
-import org.apache.gobblin.compaction.mapreduce.orc.OrcUtils;
-import org.apache.gobblin.compaction.mapreduce.orc.OrcValueMapper;
-import
org.apache.gobblin.compaction.mapreduce.orc.OrcValueCombineFileInputFormat;
-import org.apache.gobblin.configuration.State;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.orc.OrcConf;
@@ -34,12 +27,20 @@ import org.apache.orc.TypeDescription;
import org.apache.orc.mapred.OrcKey;
import org.apache.orc.mapred.OrcValue;
-import static
org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter.*;
-import static
org.apache.gobblin.compaction.mapreduce.orc.OrcUtils.eligibleForUpConvert;
+import org.apache.gobblin.compaction.mapreduce.orc.OrcKeyCompactorOutputFormat;
+import org.apache.gobblin.compaction.mapreduce.orc.OrcKeyComparator;
+import org.apache.gobblin.compaction.mapreduce.orc.OrcKeyDedupReducer;
+import org.apache.gobblin.compaction.mapreduce.orc.OrcUtils;
+import
org.apache.gobblin.compaction.mapreduce.orc.OrcValueCombineFileInputFormat;
+import org.apache.gobblin.compaction.mapreduce.orc.OrcValueMapper;
+import org.apache.gobblin.configuration.State;
+import static
org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter.COMPACTION_OUTPUT_EXTENSION;
+import static
org.apache.gobblin.compaction.mapreduce.orc.OrcUtils.eligibleForUpConvert;
+import static
org.apache.gobblin.writer.GobblinOrcWriter.DEFAULT_ORC_WRITER_BATCH_SIZE;
+import static org.apache.gobblin.writer.GobblinOrcWriter.ORC_WRITER_BATCH_SIZE;
public class CompactionOrcJobConfigurator extends CompactionJobConfigurator {
-
/**
* The key schema for the shuffle output.
*/
@@ -81,6 +82,14 @@ public class CompactionOrcJobConfigurator extends
CompactionJobConfigurator {
job.getConfiguration().set(OrcConf.MAPRED_OUTPUT_SCHEMA.getAttribute(),
schema.toString());
}
+ private int getWriterRowBatchSize() {
+ return this.state.getPropAsInt(ORC_WRITER_BATCH_SIZE,
DEFAULT_ORC_WRITER_BATCH_SIZE);
+ }
+
+ protected void setOrcWriterBatchSize(Job job) {
+ job.getConfiguration().setInt(ORC_WRITER_BATCH_SIZE,
getWriterRowBatchSize());
+ }
+
protected void configureMapper(Job job) {
job.setInputFormatClass(OrcValueCombineFileInputFormat.class);
job.setMapperClass(OrcValueMapper.class);
@@ -96,5 +105,6 @@ public class CompactionOrcJobConfigurator extends
CompactionJobConfigurator {
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(OrcValue.class);
setNumberOfReducers(job);
+ setOrcWriterBatchSize(job);
}
}
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyCompactorOutputFormat.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyCompactorOutputFormat.java
index 9c94224..c17f568 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyCompactorOutputFormat.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyCompactorOutputFormat.java
@@ -18,8 +18,7 @@
package org.apache.gobblin.compaction.mapreduce.orc;
import java.io.IOException;
-import org.apache.gobblin.compaction.mapreduce.CompactionJobConfigurator;
-import org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.OutputCommitter;
@@ -32,14 +31,19 @@ import org.apache.orc.Writer;
import org.apache.orc.mapreduce.OrcMapreduceRecordWriter;
import org.apache.orc.mapreduce.OrcOutputFormat;
-import static
org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter.*;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter;
+import org.apache.gobblin.writer.GobblinOrcWriter;
+
+import static
org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter.COMPACTION_OUTPUT_EXTENSION;
/**
* Extension of {@link OrcOutputFormat} for customized {@link
CompactorOutputCommitter}
*/
+@Slf4j
public class OrcKeyCompactorOutputFormat extends OrcOutputFormat {
-
private FileOutputCommitter committer = null;
@Override
@@ -65,6 +69,8 @@ public class OrcKeyCompactorOutputFormat extends
OrcOutputFormat {
Path filename = getDefaultWorkFile(taskAttemptContext, extension);
Writer writer = OrcFile.createWriter(filename,
org.apache.orc.mapred.OrcOutputFormat.buildOptions(conf));
- return new OrcMapreduceRecordWriter(writer);
+ int rowBatchSize = conf.getInt(GobblinOrcWriter.ORC_WRITER_BATCH_SIZE,
GobblinOrcWriter.DEFAULT_ORC_WRITER_BATCH_SIZE);
+ log.info("Creating OrcMapreduceRecordWriter with row batch size = {}",
rowBatchSize);
+ return new OrcMapreduceRecordWriter(writer, rowBatchSize);
}
}
diff --git
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
index c79eeda..57c19b6 100644
---
a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
+++
b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
@@ -59,8 +59,8 @@ import static
org.apache.gobblin.writer.AvroOrcSchemaConverter.getOrcSchema;
@Slf4j
public class GobblinOrcWriter extends FsDataWriter<GenericRecord> {
static final String ORC_WRITER_PREFIX = "orcWriter.";
- private static final String ORC_WRITER_BATCH_SIZE = ORC_WRITER_PREFIX +
"batchSize";
- private static final int DEFAULT_ORC_WRITER_BATCH_SIZE = 1000;
+ public static final String ORC_WRITER_BATCH_SIZE = ORC_WRITER_PREFIX +
"batchSize";
+ public static final int DEFAULT_ORC_WRITER_BATCH_SIZE = 1000;
private static final String CONTAINER_MEMORY_MBS = ORC_WRITER_PREFIX +
"jvm.memory.mbs";
private static final int DEFAULT_CONTAINER_MEMORY_MBS = 4096;
diff --git a/gradle/scripts/dependencyDefinitions.gradle
b/gradle/scripts/dependencyDefinitions.gradle
index 50b161d..8aaeb00 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -180,9 +180,9 @@ ext.externalDependency = [
/**
* Avoiding conflicts with Hive 1.x versions existed in the classpath
*/
- "orcMapreduce":"org.apache.orc:orc-mapreduce:1.6.5:nohive",
- "orcCore": "org.apache.orc:orc-core:1.6.5:nohive",
- "orcTools":"org.apache.orc:orc-tools:1.6.5",
+ "orcMapreduce":"org.apache.orc:orc-mapreduce:1.6.8-SNAPSHOT:nohive",
+ "orcCore": "org.apache.orc:orc-core:1.6.8-SNAPSHOT:nohive",
+ "orcTools":"org.apache.orc:orc-tools:1.6.8-SNAPSHOT",
'parquet': 'org.apache.parquet:parquet-hadoop:1.11.0',
'parquetAvro': 'org.apache.parquet:parquet-avro:1.11.0',
'parquetProto': 'org.apache.parquet:parquet-protobuf:1.11.0',
diff --git a/gradle/scripts/repositories.gradle
b/gradle/scripts/repositories.gradle
index fa2c2b8..a135565 100644
--- a/gradle/scripts/repositories.gradle
+++ b/gradle/scripts/repositories.gradle
@@ -27,6 +27,9 @@ repositories {
url "https://linkedin.bintray.com/maven"
}
maven {
+ url "https://repository.apache.org/content/repositories/snapshots"
+ }
+ maven {
url "https://linkedin.jfrog.com/artifactory/"
}
jcenter()