This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new fe825c5  [GOBBLIN-911] Make profiling of HiveWritableHdfsDataWriter 
easier by injecting jobConf
fe825c5 is described below

commit fe825c524e5d1b556358827cc13a4c4c3d450d74
Author: autumnust <[email protected]>
AuthorDate: Wed Oct 16 16:43:54 2019 -0700

    [GOBBLIN-911] Make profiling of HiveWritableHdfsDataWriter easier by 
injecting jobConf
    
    Closes #2764 from autumnust/optimizeProfileProcess
---
 .../source/extractor/hadoop/AvroFileExtractor.java        | 15 ++++++++++++---
 .../apache/gobblin/writer/HiveWritableHdfsDataWriter.java |  8 +++++++-
 2 files changed, 19 insertions(+), 4 deletions(-)

diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/hadoop/AvroFileExtractor.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/hadoop/AvroFileExtractor.java
index b610107..0a669b1 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/hadoop/AvroFileExtractor.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/hadoop/AvroFileExtractor.java
@@ -35,13 +35,15 @@ import 
org.apache.gobblin.source.extractor.filebased.FileBasedHelperException;
  * A custom type of {@link FileBasedExtractor}s for extracting data from Avro 
files.
  */
 public class AvroFileExtractor extends FileBasedExtractor<Schema, 
GenericRecord> {
+  private Schema extractorCachedSchema;
 
   public AvroFileExtractor(WorkUnitState workUnitState) {
     super(workUnitState, new AvroFsHelper(workUnitState));
   }
 
   @Override
-  public Iterator<GenericRecord> downloadFile(String file) throws IOException {
+  public Iterator<GenericRecord> downloadFile(String file)
+      throws IOException {
     try {
       return this.closer.register(((AvroFsHelper) 
this.fsHelper).getAvroFile(file));
     } catch (FileBasedHelperException e) {
@@ -51,7 +53,8 @@ public class AvroFileExtractor extends 
FileBasedExtractor<Schema, GenericRecord>
   }
 
   /**
-   * Assumption is that all files in the input directory have the same schema
+   * Assumption is that all files in the input directory have the same schema.
+   * This method is being invoked in 
org.apache.gobblin.runtime.Task#runSynchronousModel()
    */
   @Override
   public Schema getSchema() {
@@ -59,15 +62,21 @@ public class AvroFileExtractor extends 
FileBasedExtractor<Schema, GenericRecord>
       return new 
Schema.Parser().parse(this.workUnit.getProp(ConfigurationKeys.SOURCE_SCHEMA));
     }
 
+    if (extractorCachedSchema != null) {
+      return extractorCachedSchema;
+    }
+
     AvroFsHelper hfsHelper = (AvroFsHelper) this.fsHelper;
     if (this.filesToPull.isEmpty()) {
       return null;
     }
     try {
-      return hfsHelper.getAvroSchema(this.filesToPull.get(0));
+      extractorCachedSchema = hfsHelper.getAvroSchema(this.filesToPull.get(0));
     } catch (FileBasedHelperException e) {
       Throwables.propagate(e);
       return null;
     }
+
+    return extractorCachedSchema;
   }
 }
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriter.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriter.java
index 47a596e..a83dc05 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriter.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriter.java
@@ -64,7 +64,13 @@ public class HiveWritableHdfsDataWriter extends 
FsDataWriter<Writable> {
       Class<? extends Writable> writableClass = (Class<? extends Writable>) 
Class
           
.forName(this.properties.getProp(HiveWritableHdfsDataWriterBuilder.WRITER_WRITABLE_CLASS));
 
-      return outputFormat.getHiveRecordWriter(new JobConf(), this.stagingFile, 
writableClass, true,
+      // Merging Job Properties into JobConf for easy tuning
+      JobConf loadedJobConf = new JobConf();
+      for (Object key : this.properties.getProperties().keySet()) {
+        loadedJobConf.set((String)key, this.properties.getProp((String)key));
+      }
+
+      return outputFormat.getHiveRecordWriter(loadedJobConf, this.stagingFile, 
writableClass, true,
           this.properties.getProperties(), null);
     } catch (Throwable t) {
       throw new IOException(String.format("Failed to create writer"), t);

Reply via email to