This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new d85dc7e  [GOBBLIN-976] Add dynamic config to the state before 
instantiating metrics reporter in MRJobLauncher[]
d85dc7e is described below

commit d85dc7e2b222770243afd08f73b26c53db14c165
Author: sv2000 <[email protected]>
AuthorDate: Thu Nov 21 20:20:13 2019 -0800

    [GOBBLIN-976] Add dynamic config to the state before instantiating metrics 
reporter in MRJobLauncher[]
    
    Closes #2823 from sv2000/sslConfigs
---
 .../gobblin/runtime/mapreduce/MRJobLauncher.java   | 70 +++++++++++-----------
 1 file changed, 36 insertions(+), 34 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
index ade35d1..53b5808 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
@@ -27,6 +27,41 @@ import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.task.MapContextImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.io.Closer;
+import com.google.common.util.concurrent.ServiceManager;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValue;
+
 import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
 import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
 import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
@@ -69,40 +104,6 @@ import org.apache.gobblin.util.JobLauncherUtils;
 import org.apache.gobblin.util.ParallelRunner;
 import org.apache.gobblin.util.SerializationUtils;
 import org.apache.gobblin.util.reflection.RestrictedFieldAccessingUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobContext;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.CounterGroup;
-import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.filecache.DistributedCache;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.task.MapContextImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Splitter;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.google.common.io.Closer;
-import com.google.common.util.concurrent.ServiceManager;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigValue;
 
 
 /**
@@ -766,6 +767,7 @@ public class MRJobLauncher extends AbstractJobLauncher {
       for (Map.Entry<String, ConfigValue> entry : dynamicConfig.entrySet()) {
         this.jobState.setProp(entry.getKey(), 
entry.getValue().unwrapped().toString());
         configuration.set(entry.getKey(), 
entry.getValue().unwrapped().toString());
+        gobblinJobState.setProp(entry.getKey(), 
entry.getValue().unwrapped().toString());
       }
 
       this.taskExecutor = new TaskExecutor(configuration);

Reply via email to