This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e2ecf0  [GOBBLIN-908] Customized Progress to enable speculative 
execution
7e2ecf0 is described below

commit 7e2ecf0a6453083cbd94e9b8540d791f40f69cd4
Author: autumnust <[email protected]>
AuthorDate: Tue Oct 15 17:09:30 2019 -0700

    [GOBBLIN-908] Customized Progress to enable speculative execution
    
    Closes #2762 from autumnust/CustomizeProgress
---
 .../runtime/mapreduce/CustomizedProgresser.java    |  42 ++++++
 .../mapreduce/CustomizedProgresserBase.java        |  52 +++++++
 .../mapreduce/GobblinWorkUnitsInputFormat.java     |  25 +++-
 .../gobblin/runtime/mapreduce/MRJobLauncher.java   | 159 ++++++++++++++-------
 .../org/apache/gobblin/test/TestExtractor.java     |   2 +-
 .../reflection/RestrictedFieldAccessingUtils.java  |  61 ++++++++
 .../apache/gobblin/util/reflection/BaseClass.java  |  38 +++++
 .../gobblin/util/reflection/DerivedClass.java      |  24 ++++
 .../gobblin/util/reflection/EnclosedClass.java     |  32 +++++
 .../RestrictedFieldAccessingUtilsTest.java         |  59 ++++++++
 10 files changed, 444 insertions(+), 50 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/CustomizedProgresser.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/CustomizedProgresser.java
new file mode 100644
index 0000000..c664f12
--- /dev/null
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/CustomizedProgresser.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.mapreduce;
+
+import org.apache.hadoop.mapreduce.Mapper;
+
+
+/**
+ * Interfaces methods to obtain customized progress
+ */
+public interface CustomizedProgresser {
+
+  /**
+   * Create an instance of {@link CustomizedProgresser} from Hadoop Mapper's 
context
+   * which contains both MR-related configuration to measure real progress, 
and Gobblin-job configurations.
+   */
+  interface Factory {
+    public CustomizedProgresser createCustomizedProgresser(Mapper.Context 
mapperContext);
+  }
+
+  /**
+   * Calculating progress based on application's requirement.
+   * e.g. For Gobblin-Kafka in batch mode, the number of records being written 
divided by offsets calculated
+   * from workunit-planning is one of the good metric to measure mapper's 
progress.
+   */
+  float getCustomizedProgress();
+}
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/CustomizedProgresserBase.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/CustomizedProgresserBase.java
new file mode 100644
index 0000000..17d2d72
--- /dev/null
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/CustomizedProgresserBase.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.mapreduce;
+
+import org.apache.hadoop.mapreduce.Mapper;
+
+
+/**
+ * A customized progresser that reports static value, dummy implementation of 
{@link CustomizedProgresser}
+ * while still useful to prevent direct reporting of 1.0f to MR framework.
+ *
+ * Customized application implementation should extends this class instead of 
implementing {@link CustomizedProgresser}
+ * directly as the interface could be changed if we are attempting to add 
Reducer's progress as well.
+ */
+public class CustomizedProgresserBase implements CustomizedProgresser {
+
+  private static final String STATIC_PROGRESS = 
"customizedProgress.staticProgressValue";
+  private static final float DEFAULT_STATIC_PROGRESS = 0.5f;
+
+  private float staticProgress;
+
+  public static class BaseFactory implements CustomizedProgresser.Factory {
+    @Override
+    public CustomizedProgresser createCustomizedProgresser(Mapper.Context 
mapperContext) {
+      return new CustomizedProgresserBase(mapperContext);
+    }
+  }
+
+  public CustomizedProgresserBase(Mapper.Context mapperContext) {
+    this.staticProgress = 
mapperContext.getConfiguration().getFloat(STATIC_PROGRESS, 
DEFAULT_STATIC_PROGRESS);
+  }
+
+  @Override
+  public float getCustomizedProgress() {
+    return staticProgress;
+  }
+}
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/GobblinWorkUnitsInputFormat.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/GobblinWorkUnitsInputFormat.java
index 90ad672..f809988 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/GobblinWorkUnitsInputFormat.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/GobblinWorkUnitsInputFormat.java
@@ -23,7 +23,9 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Properties;
 
+import org.apache.gobblin.util.HadoopUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -167,9 +169,23 @@ public class GobblinWorkUnitsInputFormat extends 
InputFormat<LongWritable, Text>
    * Returns records containing the name of the work unit / multi work unit 
files to process.
    */
   public static class GobblinRecordReader extends RecordReader<LongWritable, 
Text> {
+
+    /**
+     * A factor value that would be used to multiply with "(float) 
this.currentIdx / (float) this.totalPaths"
+     * to reflect progress of the whole job.
+     * We used to use bare "(float) this.currentIdx / (float) this.totalPaths" 
value for progress, the problem of that is
+     * whenever deserialization of Gobblin-Workunit finished, the progress is 
reported as 1.
+     * We could customize the progress in mapper, but we still want to measure 
the progress of deserialization.
+     * The real progress multiplied with certain factor in (0,1) range could 
hopefully better represent the progress.
+     */
+    private static final String READER_PROGRESS_FACTOR = 
"mapper.readerProgressFactor" ;
+    private static final float DEFAULT_READER_PROGRESS_FACTOR = 0.1f;
+
     private int currentIdx = -1;
     private final List<String> paths;
     private final int totalPaths;
+    private Properties properties;
+
 
     public GobblinRecordReader(GobblinSplit split) {
       this.paths = split.getPaths();
@@ -179,6 +195,7 @@ public class GobblinWorkUnitsInputFormat extends 
InputFormat<LongWritable, Text>
     @Override
     public void initialize(InputSplit split, TaskAttemptContext context)
         throws IOException, InterruptedException {
+      this.properties = 
HadoopUtils.getStateFromConf(context.getConfiguration()).getProperties();
     }
 
     @Override
@@ -203,7 +220,13 @@ public class GobblinWorkUnitsInputFormat extends 
InputFormat<LongWritable, Text>
     @Override
     public float getProgress()
         throws IOException, InterruptedException {
-      return (float) this.currentIdx / (float) this.totalPaths;
+      if (MRJobLauncher.isCustomizedProgressReportEnabled(properties)) {
+        return 0.0f;
+      } else {
+        float factor = properties.containsKey(READER_PROGRESS_FACTOR) ?
+            Float.parseFloat(properties.getProperty(READER_PROGRESS_FACTOR)) : 
DEFAULT_READER_PROGRESS_FACTOR;
+        return factor * ((float) this.currentIdx / (float) this.totalPaths);
+      }
     }
 
     @Override
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
index 0077d0f..ade35d1 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
@@ -27,13 +27,48 @@ import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.commit.CommitStep;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.DynamicConfigGenerator;
+import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.fsm.FiniteStateMachine;
-import org.apache.gobblin.fsm.StateWithCallbacks;
+import org.apache.gobblin.metastore.FsStateStore;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.metrics.event.CountEventBuilder;
-import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.JobEvent;
 import org.apache.gobblin.metrics.event.JobStateEventBuilder;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.password.PasswordManager;
+import org.apache.gobblin.runtime.AbstractJobLauncher;
+import org.apache.gobblin.runtime.DynamicConfigGeneratorFactory;
+import org.apache.gobblin.runtime.GobblinMultiTaskAttempt;
+import org.apache.gobblin.runtime.JobLauncher;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.Task;
+import org.apache.gobblin.runtime.TaskExecutor;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.TaskStateCollectorService;
+import org.apache.gobblin.runtime.TaskStateTracker;
 import org.apache.gobblin.runtime.job.GobblinJobFiniteStateMachine;
+import org.apache.gobblin.runtime.job.GobblinJobFiniteStateMachine.JobFSMState;
+import org.apache.gobblin.runtime.job.GobblinJobFiniteStateMachine.StateType;
+import org.apache.gobblin.runtime.util.JobMetrics;
+import org.apache.gobblin.runtime.util.MetricGroup;
+import org.apache.gobblin.source.workunit.MultiWorkUnit;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.JobConfigurationUtils;
+import org.apache.gobblin.util.JobLauncherUtils;
+import org.apache.gobblin.util.ParallelRunner;
+import org.apache.gobblin.util.SerializationUtils;
+import org.apache.gobblin.util.reflection.RestrictedFieldAccessingUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -51,7 +86,9 @@ import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.task.MapContextImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,42 +104,6 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValue;
 
-import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
-import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
-import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
-import org.apache.gobblin.broker.iface.SharedResourcesBroker;
-import org.apache.gobblin.commit.CommitStep;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.DynamicConfigGenerator;
-import org.apache.gobblin.metastore.FsStateStore;
-import org.apache.gobblin.metastore.StateStore;
-import org.apache.gobblin.metrics.GobblinMetrics;
-import org.apache.gobblin.metrics.Tag;
-import org.apache.gobblin.metrics.event.TimingEvent;
-import org.apache.gobblin.password.PasswordManager;
-import org.apache.gobblin.runtime.AbstractJobLauncher;
-import org.apache.gobblin.runtime.DynamicConfigGeneratorFactory;
-import org.apache.gobblin.runtime.GobblinMultiTaskAttempt;
-import org.apache.gobblin.runtime.JobLauncher;
-import org.apache.gobblin.runtime.JobState;
-import org.apache.gobblin.runtime.Task;
-import org.apache.gobblin.runtime.TaskExecutor;
-import org.apache.gobblin.runtime.TaskState;
-import org.apache.gobblin.runtime.TaskStateCollectorService;
-import org.apache.gobblin.runtime.TaskStateTracker;
-import org.apache.gobblin.runtime.job.GobblinJobFiniteStateMachine.JobFSMState;
-import org.apache.gobblin.runtime.job.GobblinJobFiniteStateMachine.StateType;
-import org.apache.gobblin.runtime.util.JobMetrics;
-import org.apache.gobblin.runtime.util.MetricGroup;
-import org.apache.gobblin.source.workunit.MultiWorkUnit;
-import org.apache.gobblin.source.workunit.WorkUnit;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.HadoopUtils;
-import org.apache.gobblin.util.JobConfigurationUtils;
-import org.apache.gobblin.util.JobLauncherUtils;
-import org.apache.gobblin.util.ParallelRunner;
-import org.apache.gobblin.util.SerializationUtils;
-
 
 /**
  * An implementation of {@link JobLauncher} that launches a Gobblin job as a 
Hadoop MR job.
@@ -131,10 +132,14 @@ public class MRJobLauncher extends AbstractJobLauncher {
   private static final String FILES_DIR_NAME = "_files";
   static final String INPUT_DIR_NAME = "input";
   private static final String OUTPUT_DIR_NAME = "output";
-  private static final String WORK_UNIT_LIST_FILE_EXTENSION = ".wulist";
   private static final String SERIALIZE_PREVIOUS_WORKUNIT_STATES_KEY = 
"MRJobLauncher.serializePreviousWorkunitStates";
   private static final boolean DEFAULT_SERIALIZE_PREVIOUS_WORKUNIT_STATES = 
true;
 
+  /**
+   * In MR-mode, it is necessary to enable customized progress if speculative 
execution is required.
+   */
+  private static final String ENABLED_CUSTOMIZED_PROGRESS = 
"MRJobLauncher.enabledCustomizedProgress";
+
   // Configuration that make uploading of jar files more reliable,
   // since multiple Gobblin Jobs are sharing the same jar directory.
   private static final int MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT = 5;
@@ -288,6 +293,10 @@ public class MRJobLauncher extends AbstractJobLauncher {
         if (!jobState.contains(ConfigurationKeys.JOB_TRACKING_URL_KEY)) {
           jobState.setProp(ConfigurationKeys.JOB_TRACKING_URL_KEY, 
this.job.getTrackingURL());
         }
+        /**
+         * Catch {@link UnallowedTransitionException} only, leaving other 
failure while submitting MR jobs to catch
+         * block afterwards.
+         */
       } catch (FiniteStateMachine.UnallowedTransitionException unallowed) {
         LOG.error("Cannot start MR job.", unallowed);
       }
@@ -309,12 +318,20 @@ public class MRJobLauncher extends AbstractJobLauncher {
       // Create a metrics set for this job run from the Hadoop counters.
       // The metrics set is to be persisted to the metrics store later.
       countersToMetrics(JobMetrics.get(jobName, 
this.jobProps.getProperty(ConfigurationKeys.JOB_ID_KEY)));
+    } catch (Throwable t) {
+      throw new RuntimeException("The MR job cannot be submitted due to:", t);
     } finally {
       JobStateEventBuilder eventBuilder = new 
JobStateEventBuilder(JobStateEventBuilder.MRJobState.MR_JOB_STATE);
-      eventBuilder.jobTrackingURL = this.job.getTrackingURL();
-      eventBuilder.status = JobStateEventBuilder.Status.SUCCEEDED;
-      if (this.job.getJobState() != JobStatus.State.SUCCEEDED) {
+
+      if (!hadoopJobSubmitted) {
+        eventBuilder.jobTrackingURL = "";
         eventBuilder.status = JobStateEventBuilder.Status.FAILED;
+      } else {
+        eventBuilder.jobTrackingURL = this.job.getTrackingURL();
+        eventBuilder.status = JobStateEventBuilder.Status.SUCCEEDED;
+        if (this.job.getJobState() != JobStatus.State.SUCCEEDED) {
+          eventBuilder.status = JobStateEventBuilder.Status.FAILED;
+        }
       }
       this.eventSubmitter.submit(eventBuilder);
 
@@ -465,6 +482,11 @@ public class MRJobLauncher extends AbstractJobLauncher {
         props.getProperty(JobContext.MAP_SPECULATIVE, 
ConfigurationKeys.DEFAULT_ENABLE_MR_SPECULATIVE_EXECUTION));
   }
 
+  static boolean isCustomizedProgressReportEnabled(Properties properties) {
+    return properties.containsKey(ENABLED_CUSTOMIZED_PROGRESS)
+        && 
Boolean.parseBoolean(properties.getProperty(ENABLED_CUSTOMIZED_PROGRESS));
+  }
+
   @VisibleForTesting
   static void serializeJobState(FileSystem fs, Path mrJobDir, Configuration 
conf, JobState jobState, Job job)
       throws IOException {
@@ -687,16 +709,30 @@ public class MRJobLauncher extends AbstractJobLauncher {
     private ServiceManager serviceManager;
     private Optional<JobMetrics> jobMetrics = Optional.absent();
     private boolean isSpeculativeEnabled;
+    private boolean customizedProgressEnabled;
     private final JobState jobState = new JobState();
+    private CustomizedProgresser customizedProgresser;
+
+    private static final String CUSTOMIZED_PROGRESSER_FACTORY_CLASS = 
"customizedProgresser.factoryClass";
+    private static final String DEFAULT_CUSTOMIZED_PROGRESSER_FACTORY_CLASS =
+        
"org.apache.gobblin.runtime.mapreduce.CustomizedProgresserBase$BaseFactory";
 
     // A list of WorkUnits (flattened for MultiWorkUnits) to be run by this 
mapper
     private final List<WorkUnit> workUnits = Lists.newArrayList();
 
     @Override
     protected void setup(Context context) {
+      final State gobblinJobState = 
HadoopUtils.getStateFromConf(context.getConfiguration());
       try (Closer closer = Closer.create()) {
-        this.isSpeculativeEnabled =
-            
isSpeculativeExecutionEnabled(HadoopUtils.getStateFromConf(context.getConfiguration()).getProperties());
+        // Default for customizedProgressEnabled is false.
+        this.customizedProgressEnabled = 
isCustomizedProgressReportEnabled(gobblinJobState.getProperties());
+        this.isSpeculativeEnabled = 
isSpeculativeExecutionEnabled(gobblinJobState.getProperties());
+
+        String factoryClassName = gobblinJobState.getProperties().getProperty(
+            CUSTOMIZED_PROGRESSER_FACTORY_CLASS, 
DEFAULT_CUSTOMIZED_PROGRESSER_FACTORY_CLASS);
+        this.customizedProgresser = 
Class.forName(factoryClassName).asSubclass(CustomizedProgresser.Factory.class)
+            .newInstance().createCustomizedProgresser(context);
+
         this.fs = FileSystem.get(context.getConfiguration());
         this.taskStateStore =
             new FsStateStore<>(this.fs, 
FileOutputFormat.getOutputPath(context).toUri().getPath(), TaskState.class);
@@ -714,8 +750,8 @@ public class MRJobLauncher extends AbstractJobLauncher {
         if (!foundStateFile) {
           throw new IOException("Job state file not found.");
         }
-      } catch (IOException ioe) {
-        throw new RuntimeException("Failed to setup the mapper task", ioe);
+      } catch (IOException | ReflectiveOperationException e) {
+        throw new RuntimeException("Failed to setup the mapper task", e);
       }
 
 
@@ -747,8 +783,7 @@ public class MRJobLauncher extends AbstractJobLauncher {
           configuration.get(ConfigurationKeys.METRICS_ENABLED_KEY, 
ConfigurationKeys.DEFAULT_METRICS_ENABLED))) {
         this.jobMetrics = Optional.of(JobMetrics.get(this.jobState));
         this.jobMetrics.get()
-            
.startMetricReportingWithFileSuffix(HadoopUtils.getStateFromConf(configuration),
-                context.getTaskAttemptID().toString());
+            .startMetricReportingWithFileSuffix(gobblinJobState, 
context.getTaskAttemptID().toString());
       }
     }
 
@@ -768,6 +803,13 @@ public class MRJobLauncher extends AbstractJobLauncher {
         while (context.nextKeyValue()) {
           this.map(context.getCurrentKey(), context.getCurrentValue(), 
context);
         }
+
+        // org.apache.hadoop.util.Progress.complete will set the progress to 
1.0f eventually so we don't have to
+        // set it in finally block.
+        if (customizedProgressEnabled) {
+          setProgressInMapper(customizedProgresser.getCustomizedProgress(), 
context);
+        }
+
         GobblinMultiTaskAttempt.CommitPolicy multiTaskAttemptCommitPolicy =
             isSpeculativeEnabled ? 
GobblinMultiTaskAttempt.CommitPolicy.CUSTOMIZED
                 : GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE;
@@ -849,5 +891,26 @@ public class MRJobLauncher extends AbstractJobLauncher {
         this.workUnits.add(workUnit);
       }
     }
+
+    /**
+     * Setting progress within implementation of {@link Mapper} for reporting 
progress.
+     * Gobblin (when running in MR mode) used to report progress only in 
{@link GobblinWorkUnitsInputFormat} while
+     * deserializing {@link WorkUnit} in MapReduce job. In that scenario, 
whenever workunit is deserialized (but not yet
+     * executed) the progress will be reported as 1.0f. This could implicitly 
disable the feature of speculative-execution
+     * provided by MR-framework as the latter is looking at the progress to 
determine if speculative-execution is necessary
+     * to trigger or not.
+     *
+     * Different application of Gobblin should have customized logic on 
calculating progress.
+     */
+    void setProgressInMapper(float progress, Context context) {
+      try {
+        WrappedMapper.Context wrappedContext = ((WrappedMapper.Context) 
context);
+        Object contextImpl = 
RestrictedFieldAccessingUtils.getRestrictedFieldByReflection(wrappedContext, 
"mapContext", wrappedContext.getClass());
+        
((org.apache.hadoop.mapred.Task.TaskReporter)RestrictedFieldAccessingUtils
+            .getRestrictedFieldByReflectionRecursively(contextImpl, 
"reporter", MapContextImpl.class)).setProgress(progress);
+      } catch (NoSuchFieldException | IllegalAccessException e) {
+        throw new RuntimeException(e);
+      }
+    }
   }
 }
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/test/TestExtractor.java 
b/gobblin-runtime/src/test/java/org/apache/gobblin/test/TestExtractor.java
index 8843d67..79126f9 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/test/TestExtractor.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/test/TestExtractor.java
@@ -71,7 +71,7 @@ public class TestExtractor implements Extractor<String, 
String> {
       FileSystem fs = FileSystem
           .get(URI.create(workUnitState.getProp(ConfigurationKeys.FS_URI_KEY, 
ConfigurationKeys.LOCAL_FS_URI)),
               new Configuration());
-      fs.makeQualified(sourceFile);
+      sourceFile = new Path(fs.makeQualified(sourceFile).toUri().getRawPath());
       this.dataFileReader =
           new DataFileReader<GenericRecord>(new FsInput(sourceFile, new 
Configuration()), datumReader);
     } catch (IOException ioe) {
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/reflection/RestrictedFieldAccessingUtils.java
 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/reflection/RestrictedFieldAccessingUtils.java
new file mode 100644
index 0000000..70e5715
--- /dev/null
+++ 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/reflection/RestrictedFieldAccessingUtils.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.reflection;
+
+import java.lang.reflect.Field;
+import java.util.Arrays;
+
+
+/**
+ * These are hacky methods that should only be used when there are 
access-modifiers prevents accessing fields that
+ * are essential for application code to access.
+ */
+public class RestrictedFieldAccessingUtils {
+  private RestrictedFieldAccessingUtils() {
+  }
+
+  /**
+   * Getting field defined in containingObj which was not publicly-accessible, 
using java-reflection.
+   */
+  public static Object getRestrictedFieldByReflection(Object containingObj, 
String fieldName, Class clazz)
+      throws NoSuchFieldException, IllegalAccessException {
+    Field field = clazz.getDeclaredField(fieldName);
+    field.setAccessible(true);
+    return field.get(containingObj);
+  }
+
+  /**
+   * Getting field defined in superclass(es) which was not 
publicly-accessible, using java-reflection.
+   */
+  public static Object getRestrictedFieldByReflectionRecursively(Object 
containingObj, String fieldName, Class clazz)
+      throws NoSuchFieldException, IllegalAccessException {
+
+    // When it reaches Object.class level and still not find the field, throw 
exception.
+    if (clazz.getCanonicalName().equals("java.lang.Object")) {
+      throw new NoSuchFieldException(
+          String.format("Field %s doesn't exist in specified class and its 
ancestors", fieldName));
+    }
+
+    if (!Arrays.asList(clazz.getDeclaredFields()).stream()
+        .anyMatch(x -> x.getName().equals(fieldName))) {
+      return getRestrictedFieldByReflectionRecursively(containingObj, 
fieldName, clazz.getSuperclass());
+    } else {
+      return getRestrictedFieldByReflection(containingObj, fieldName, clazz);
+    }
+  }
+}
diff --git 
a/gobblin-utility/src/test/java/org/apache/gobblin/util/reflection/BaseClass.java
 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/reflection/BaseClass.java
new file mode 100644
index 0000000..08690ae
--- /dev/null
+++ 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/reflection/BaseClass.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.reflection;
+
+/**
+ * A testing Class for {@link RestrictedFieldAccessingUtilsTest}.
+ */
+public class BaseClass {
+  private EnclosedClass enclose;
+  int a;
+
+  public BaseClass(int a) {
+    enclose = new EnclosedClass(a);
+    this.a = a;
+  }
+
+  /**
+   * Exposed field inside enclose to verify object's exposure and set-type of 
method.
+   */
+  public int getEnclosingValue() {
+    return enclose.getValue();
+  }
+}
diff --git 
a/gobblin-utility/src/test/java/org/apache/gobblin/util/reflection/DerivedClass.java
 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/reflection/DerivedClass.java
new file mode 100644
index 0000000..b345206
--- /dev/null
+++ 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/reflection/DerivedClass.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.reflection;
+
+public class DerivedClass extends BaseClass {
+  public DerivedClass(int a) {
+    super(a);
+  }
+}
diff --git 
a/gobblin-utility/src/test/java/org/apache/gobblin/util/reflection/EnclosedClass.java
 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/reflection/EnclosedClass.java
new file mode 100644
index 0000000..2a6bb52
--- /dev/null
+++ 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/reflection/EnclosedClass.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.reflection;
+
+import lombok.Getter;
+import lombok.Setter;
+
+
+public class EnclosedClass {
+  public EnclosedClass(int value) {
+    this.value = value;
+  }
+
+  @Setter
+  @Getter
+  int value;
+}
diff --git 
a/gobblin-utility/src/test/java/org/apache/gobblin/util/reflection/RestrictedFieldAccessingUtilsTest.java
 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/reflection/RestrictedFieldAccessingUtilsTest.java
new file mode 100644
index 0000000..d3079b1
--- /dev/null
+++ 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/reflection/RestrictedFieldAccessingUtilsTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.reflection;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class RestrictedFieldAccessingUtilsTest {
+
+  @Test
+  public void testGetRestrictedFieldByReflection()
+      throws Exception {
+    BaseClass baseClass = new BaseClass(5);
+    int a = (int) 
RestrictedFieldAccessingUtils.getRestrictedFieldByReflection(baseClass, "a", 
baseClass.getClass());
+    Assert.assertEquals(a, 5);
+  }
+
+  @Test
+  public void testGetRestrictedFieldByReflectionRecursively()
+      throws Exception {
+    DerivedClass derivedClass = new DerivedClass(5);
+    Assert.assertEquals(derivedClass.getEnclosingValue(), 5);
+    ((EnclosedClass) RestrictedFieldAccessingUtils
+        .getRestrictedFieldByReflectionRecursively(derivedClass, "enclose", 
derivedClass.getClass())).setValue(100);
+    Assert.assertEquals(derivedClass.getEnclosingValue(), 100);
+  }
+
+  @Test
+  public void testNoSuchFieldException()
+      throws Exception {
+    DerivedClass derivedClass = new DerivedClass(5);
+    try {
+      RestrictedFieldAccessingUtils
+          .getRestrictedFieldByReflectionRecursively(derivedClass, "non", 
derivedClass.getClass());
+    } catch (NoSuchFieldException ne) {
+      Assert.assertTrue(true);
+      return;
+    }
+
+    // Should never reach here.
+    Assert.assertTrue(false);
+  }
+}
\ No newline at end of file

Reply via email to