This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new fe825c5 [GOBBLIN-911] Make profiling of HiveWritableHdfsDataWriter
easier by injecting jobConf
fe825c5 is described below
commit fe825c524e5d1b556358827cc13a4c4c3d450d74
Author: autumnust <[email protected]>
AuthorDate: Wed Oct 16 16:43:54 2019 -0700
[GOBBLIN-911] Make profiling of HiveWritableHdfsDataWriter easier by
injecting jobConf
Closes #2764 from autumnust/optimizeProfileProcess
---
.../source/extractor/hadoop/AvroFileExtractor.java | 15 ++++++++++++---
.../apache/gobblin/writer/HiveWritableHdfsDataWriter.java | 8 +++++++-
2 files changed, 19 insertions(+), 4 deletions(-)
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/hadoop/AvroFileExtractor.java
b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/hadoop/AvroFileExtractor.java
index b610107..0a669b1 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/hadoop/AvroFileExtractor.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/hadoop/AvroFileExtractor.java
@@ -35,13 +35,15 @@ import
org.apache.gobblin.source.extractor.filebased.FileBasedHelperException;
* A custom type of {@link FileBasedExtractor}s for extracting data from Avro
files.
*/
public class AvroFileExtractor extends FileBasedExtractor<Schema,
GenericRecord> {
+ private Schema extractorCachedSchema;
public AvroFileExtractor(WorkUnitState workUnitState) {
super(workUnitState, new AvroFsHelper(workUnitState));
}
@Override
- public Iterator<GenericRecord> downloadFile(String file) throws IOException {
+ public Iterator<GenericRecord> downloadFile(String file)
+ throws IOException {
try {
return this.closer.register(((AvroFsHelper)
this.fsHelper).getAvroFile(file));
} catch (FileBasedHelperException e) {
@@ -51,7 +53,8 @@ public class AvroFileExtractor extends
FileBasedExtractor<Schema, GenericRecord>
}
/**
- * Assumption is that all files in the input directory have the same schema
+ * Assumption is that all files in the input directory have the same schema.
+ * This method is being invoked in
org.apache.gobblin.runtime.Task#runSynchronousModel()
*/
@Override
public Schema getSchema() {
@@ -59,15 +62,21 @@ public class AvroFileExtractor extends
FileBasedExtractor<Schema, GenericRecord>
return new
Schema.Parser().parse(this.workUnit.getProp(ConfigurationKeys.SOURCE_SCHEMA));
}
+ if (extractorCachedSchema != null) {
+ return extractorCachedSchema;
+ }
+
AvroFsHelper hfsHelper = (AvroFsHelper) this.fsHelper;
if (this.filesToPull.isEmpty()) {
return null;
}
try {
- return hfsHelper.getAvroSchema(this.filesToPull.get(0));
+ extractorCachedSchema = hfsHelper.getAvroSchema(this.filesToPull.get(0));
} catch (FileBasedHelperException e) {
Throwables.propagate(e);
return null;
}
+
+ return extractorCachedSchema;
}
}
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriter.java
b/gobblin-core/src/main/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriter.java
index 47a596e..a83dc05 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriter.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriter.java
@@ -64,7 +64,13 @@ public class HiveWritableHdfsDataWriter extends
FsDataWriter<Writable> {
Class<? extends Writable> writableClass = (Class<? extends Writable>)
Class
.forName(this.properties.getProp(HiveWritableHdfsDataWriterBuilder.WRITER_WRITABLE_CLASS));
- return outputFormat.getHiveRecordWriter(new JobConf(), this.stagingFile,
writableClass, true,
+ // Merging Job Properties into JobConf for easy tuning
+ JobConf loadedJobConf = new JobConf();
+ for (Object key : this.properties.getProperties().keySet()) {
+ loadedJobConf.set((String)key, this.properties.getProp((String)key));
+ }
+
+ return outputFormat.getHiveRecordWriter(loadedJobConf, this.stagingFile,
writableClass, true,
this.properties.getProperties(), null);
} catch (Throwable t) {
throw new IOException(String.format("Failed to create writer"), t);