This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 7e2ecf0 [GOBBLIN-908] Customized Progress to enable speculative
execution
7e2ecf0 is described below
commit 7e2ecf0a6453083cbd94e9b8540d791f40f69cd4
Author: autumnust <[email protected]>
AuthorDate: Tue Oct 15 17:09:30 2019 -0700
[GOBBLIN-908] Customized Progress to enable speculative execution
Closes #2762 from autumnust/CustomizeProgress
---
.../runtime/mapreduce/CustomizedProgresser.java | 42 ++++++
.../mapreduce/CustomizedProgresserBase.java | 52 +++++++
.../mapreduce/GobblinWorkUnitsInputFormat.java | 25 +++-
.../gobblin/runtime/mapreduce/MRJobLauncher.java | 159 ++++++++++++++-------
.../org/apache/gobblin/test/TestExtractor.java | 2 +-
.../reflection/RestrictedFieldAccessingUtils.java | 61 ++++++++
.../apache/gobblin/util/reflection/BaseClass.java | 38 +++++
.../gobblin/util/reflection/DerivedClass.java | 24 ++++
.../gobblin/util/reflection/EnclosedClass.java | 32 +++++
.../RestrictedFieldAccessingUtilsTest.java | 59 ++++++++
10 files changed, 444 insertions(+), 50 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/CustomizedProgresser.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/CustomizedProgresser.java
new file mode 100644
index 0000000..c664f12
--- /dev/null
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/CustomizedProgresser.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.mapreduce;
+
+import org.apache.hadoop.mapreduce.Mapper;
+
+
+/**
+ * Interfaces methods to obtain customized progress
+ */
+public interface CustomizedProgresser {
+
+ /**
+ * Create an instance of {@link CustomizedProgresser} from Hadoop Mapper's
context
+ * which contains both MR-related configuration to measure real progress,
and Gobblin-job configurations.
+ */
+ interface Factory {
+ public CustomizedProgresser createCustomizedProgresser(Mapper.Context
mapperContext);
+ }
+
+ /**
+ * Calculating progress based on application's requirement.
+ * e.g. For Gobblin-Kafka in batch mode, the number of records being written
divided by offsets calculated
+ * from workunit-planning is one of the good metric to measure mapper's
progress.
+ */
+ float getCustomizedProgress();
+}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/CustomizedProgresserBase.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/CustomizedProgresserBase.java
new file mode 100644
index 0000000..17d2d72
--- /dev/null
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/CustomizedProgresserBase.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.mapreduce;
+
+import org.apache.hadoop.mapreduce.Mapper;
+
+
+/**
+ * A customized progresser that reports static value, dummy implementation of
{@link CustomizedProgresser}
+ * while still useful to prevent direct reporting of 1.0f to MR framework.
+ *
+ * Customized application implementation should extends this class instead of
implementing {@link CustomizedProgresser}
+ * directly as the interface could be changed if we are attempting to add
Reducer's progress as well.
+ */
+public class CustomizedProgresserBase implements CustomizedProgresser {
+
+ private static final String STATIC_PROGRESS =
"customizedProgress.staticProgressValue";
+ private static final float DEFAULT_STATIC_PROGRESS = 0.5f;
+
+ private float staticProgress;
+
+ public static class BaseFactory implements CustomizedProgresser.Factory {
+ @Override
+ public CustomizedProgresser createCustomizedProgresser(Mapper.Context
mapperContext) {
+ return new CustomizedProgresserBase(mapperContext);
+ }
+ }
+
+ public CustomizedProgresserBase(Mapper.Context mapperContext) {
+ this.staticProgress =
mapperContext.getConfiguration().getFloat(STATIC_PROGRESS,
DEFAULT_STATIC_PROGRESS);
+ }
+
+ @Override
+ public float getCustomizedProgress() {
+ return staticProgress;
+ }
+}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/GobblinWorkUnitsInputFormat.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/GobblinWorkUnitsInputFormat.java
index 90ad672..f809988 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/GobblinWorkUnitsInputFormat.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/GobblinWorkUnitsInputFormat.java
@@ -23,7 +23,9 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
+import java.util.Properties;
+import org.apache.gobblin.util.HadoopUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -167,9 +169,23 @@ public class GobblinWorkUnitsInputFormat extends
InputFormat<LongWritable, Text>
* Returns records containing the name of the work unit / multi work unit
files to process.
*/
public static class GobblinRecordReader extends RecordReader<LongWritable,
Text> {
+
+ /**
+ * A factor value that would be used to multiply with "(float)
this.currentIdx / (float) this.totalPaths"
+ * to reflect progress of the whole job.
+ * We used to use bare "(float) this.currentIdx / (float) this.totalPaths"
value for progress, the problem of that is
+ * whenever deserialization of Gobblin-Workunit finished, the progress is
reported as 1.
+ * We could customize the progress in mapper, but we still want to measure
the progress of deserialization.
+ * The real progress multiplied with certain factor in (0,1) range could
hopefully better represent the progress.
+ */
+ private static final String READER_PROGRESS_FACTOR =
"mapper.readerProgressFactor" ;
+ private static final float DEFAULT_READER_PROGRESS_FACTOR = 0.1f;
+
private int currentIdx = -1;
private final List<String> paths;
private final int totalPaths;
+ private Properties properties;
+
public GobblinRecordReader(GobblinSplit split) {
this.paths = split.getPaths();
@@ -179,6 +195,7 @@ public class GobblinWorkUnitsInputFormat extends
InputFormat<LongWritable, Text>
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
+ this.properties =
HadoopUtils.getStateFromConf(context.getConfiguration()).getProperties();
}
@Override
@@ -203,7 +220,13 @@ public class GobblinWorkUnitsInputFormat extends
InputFormat<LongWritable, Text>
@Override
public float getProgress()
throws IOException, InterruptedException {
- return (float) this.currentIdx / (float) this.totalPaths;
+ if (MRJobLauncher.isCustomizedProgressReportEnabled(properties)) {
+ return 0.0f;
+ } else {
+ float factor = properties.containsKey(READER_PROGRESS_FACTOR) ?
+ Float.parseFloat(properties.getProperty(READER_PROGRESS_FACTOR)) :
DEFAULT_READER_PROGRESS_FACTOR;
+ return factor * ((float) this.currentIdx / (float) this.totalPaths);
+ }
}
@Override
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
index 0077d0f..ade35d1 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
@@ -27,13 +27,48 @@ import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.commit.CommitStep;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.DynamicConfigGenerator;
+import org.apache.gobblin.configuration.State;
import org.apache.gobblin.fsm.FiniteStateMachine;
-import org.apache.gobblin.fsm.StateWithCallbacks;
+import org.apache.gobblin.metastore.FsStateStore;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.CountEventBuilder;
-import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.JobEvent;
import org.apache.gobblin.metrics.event.JobStateEventBuilder;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.password.PasswordManager;
+import org.apache.gobblin.runtime.AbstractJobLauncher;
+import org.apache.gobblin.runtime.DynamicConfigGeneratorFactory;
+import org.apache.gobblin.runtime.GobblinMultiTaskAttempt;
+import org.apache.gobblin.runtime.JobLauncher;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.Task;
+import org.apache.gobblin.runtime.TaskExecutor;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.TaskStateCollectorService;
+import org.apache.gobblin.runtime.TaskStateTracker;
import org.apache.gobblin.runtime.job.GobblinJobFiniteStateMachine;
+import org.apache.gobblin.runtime.job.GobblinJobFiniteStateMachine.JobFSMState;
+import org.apache.gobblin.runtime.job.GobblinJobFiniteStateMachine.StateType;
+import org.apache.gobblin.runtime.util.JobMetrics;
+import org.apache.gobblin.runtime.util.MetricGroup;
+import org.apache.gobblin.source.workunit.MultiWorkUnit;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.JobConfigurationUtils;
+import org.apache.gobblin.util.JobLauncherUtils;
+import org.apache.gobblin.util.ParallelRunner;
+import org.apache.gobblin.util.SerializationUtils;
+import org.apache.gobblin.util.reflection.RestrictedFieldAccessingUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -51,7 +86,9 @@ import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.task.MapContextImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,42 +104,6 @@ import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValue;
-import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
-import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
-import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
-import org.apache.gobblin.broker.iface.SharedResourcesBroker;
-import org.apache.gobblin.commit.CommitStep;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.DynamicConfigGenerator;
-import org.apache.gobblin.metastore.FsStateStore;
-import org.apache.gobblin.metastore.StateStore;
-import org.apache.gobblin.metrics.GobblinMetrics;
-import org.apache.gobblin.metrics.Tag;
-import org.apache.gobblin.metrics.event.TimingEvent;
-import org.apache.gobblin.password.PasswordManager;
-import org.apache.gobblin.runtime.AbstractJobLauncher;
-import org.apache.gobblin.runtime.DynamicConfigGeneratorFactory;
-import org.apache.gobblin.runtime.GobblinMultiTaskAttempt;
-import org.apache.gobblin.runtime.JobLauncher;
-import org.apache.gobblin.runtime.JobState;
-import org.apache.gobblin.runtime.Task;
-import org.apache.gobblin.runtime.TaskExecutor;
-import org.apache.gobblin.runtime.TaskState;
-import org.apache.gobblin.runtime.TaskStateCollectorService;
-import org.apache.gobblin.runtime.TaskStateTracker;
-import org.apache.gobblin.runtime.job.GobblinJobFiniteStateMachine.JobFSMState;
-import org.apache.gobblin.runtime.job.GobblinJobFiniteStateMachine.StateType;
-import org.apache.gobblin.runtime.util.JobMetrics;
-import org.apache.gobblin.runtime.util.MetricGroup;
-import org.apache.gobblin.source.workunit.MultiWorkUnit;
-import org.apache.gobblin.source.workunit.WorkUnit;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.HadoopUtils;
-import org.apache.gobblin.util.JobConfigurationUtils;
-import org.apache.gobblin.util.JobLauncherUtils;
-import org.apache.gobblin.util.ParallelRunner;
-import org.apache.gobblin.util.SerializationUtils;
-
/**
* An implementation of {@link JobLauncher} that launches a Gobblin job as a
Hadoop MR job.
@@ -131,10 +132,14 @@ public class MRJobLauncher extends AbstractJobLauncher {
private static final String FILES_DIR_NAME = "_files";
static final String INPUT_DIR_NAME = "input";
private static final String OUTPUT_DIR_NAME = "output";
- private static final String WORK_UNIT_LIST_FILE_EXTENSION = ".wulist";
private static final String SERIALIZE_PREVIOUS_WORKUNIT_STATES_KEY =
"MRJobLauncher.serializePreviousWorkunitStates";
private static final boolean DEFAULT_SERIALIZE_PREVIOUS_WORKUNIT_STATES =
true;
+ /**
+ * In MR-mode, it is necessary to enable customized progress if speculative
execution is required.
+ */
+ private static final String ENABLED_CUSTOMIZED_PROGRESS =
"MRJobLauncher.enabledCustomizedProgress";
+
// Configuration that make uploading of jar files more reliable,
// since multiple Gobblin Jobs are sharing the same jar directory.
private static final int MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT = 5;
@@ -288,6 +293,10 @@ public class MRJobLauncher extends AbstractJobLauncher {
if (!jobState.contains(ConfigurationKeys.JOB_TRACKING_URL_KEY)) {
jobState.setProp(ConfigurationKeys.JOB_TRACKING_URL_KEY,
this.job.getTrackingURL());
}
+ /**
+ * Catch {@link UnallowedTransitionException} only, leaving other
failure while submitting MR jobs to catch
+ * block afterwards.
+ */
} catch (FiniteStateMachine.UnallowedTransitionException unallowed) {
LOG.error("Cannot start MR job.", unallowed);
}
@@ -309,12 +318,20 @@ public class MRJobLauncher extends AbstractJobLauncher {
// Create a metrics set for this job run from the Hadoop counters.
// The metrics set is to be persisted to the metrics store later.
countersToMetrics(JobMetrics.get(jobName,
this.jobProps.getProperty(ConfigurationKeys.JOB_ID_KEY)));
+ } catch (Throwable t) {
+ throw new RuntimeException("The MR job cannot be submitted due to:", t);
} finally {
JobStateEventBuilder eventBuilder = new
JobStateEventBuilder(JobStateEventBuilder.MRJobState.MR_JOB_STATE);
- eventBuilder.jobTrackingURL = this.job.getTrackingURL();
- eventBuilder.status = JobStateEventBuilder.Status.SUCCEEDED;
- if (this.job.getJobState() != JobStatus.State.SUCCEEDED) {
+
+ if (!hadoopJobSubmitted) {
+ eventBuilder.jobTrackingURL = "";
eventBuilder.status = JobStateEventBuilder.Status.FAILED;
+ } else {
+ eventBuilder.jobTrackingURL = this.job.getTrackingURL();
+ eventBuilder.status = JobStateEventBuilder.Status.SUCCEEDED;
+ if (this.job.getJobState() != JobStatus.State.SUCCEEDED) {
+ eventBuilder.status = JobStateEventBuilder.Status.FAILED;
+ }
}
this.eventSubmitter.submit(eventBuilder);
@@ -465,6 +482,11 @@ public class MRJobLauncher extends AbstractJobLauncher {
props.getProperty(JobContext.MAP_SPECULATIVE,
ConfigurationKeys.DEFAULT_ENABLE_MR_SPECULATIVE_EXECUTION));
}
+ static boolean isCustomizedProgressReportEnabled(Properties properties) {
+ return properties.containsKey(ENABLED_CUSTOMIZED_PROGRESS)
+ &&
Boolean.parseBoolean(properties.getProperty(ENABLED_CUSTOMIZED_PROGRESS));
+ }
+
@VisibleForTesting
static void serializeJobState(FileSystem fs, Path mrJobDir, Configuration
conf, JobState jobState, Job job)
throws IOException {
@@ -687,16 +709,30 @@ public class MRJobLauncher extends AbstractJobLauncher {
private ServiceManager serviceManager;
private Optional<JobMetrics> jobMetrics = Optional.absent();
private boolean isSpeculativeEnabled;
+ private boolean customizedProgressEnabled;
private final JobState jobState = new JobState();
+ private CustomizedProgresser customizedProgresser;
+
+ private static final String CUSTOMIZED_PROGRESSER_FACTORY_CLASS =
"customizedProgresser.factoryClass";
+ private static final String DEFAULT_CUSTOMIZED_PROGRESSER_FACTORY_CLASS =
+
"org.apache.gobblin.runtime.mapreduce.CustomizedProgresserBase$BaseFactory";
// A list of WorkUnits (flattened for MultiWorkUnits) to be run by this
mapper
private final List<WorkUnit> workUnits = Lists.newArrayList();
@Override
protected void setup(Context context) {
+ final State gobblinJobState =
HadoopUtils.getStateFromConf(context.getConfiguration());
try (Closer closer = Closer.create()) {
- this.isSpeculativeEnabled =
-
isSpeculativeExecutionEnabled(HadoopUtils.getStateFromConf(context.getConfiguration()).getProperties());
+ // Default for customizedProgressEnabled is false.
+ this.customizedProgressEnabled =
isCustomizedProgressReportEnabled(gobblinJobState.getProperties());
+ this.isSpeculativeEnabled =
isSpeculativeExecutionEnabled(gobblinJobState.getProperties());
+
+ String factoryClassName = gobblinJobState.getProperties().getProperty(
+ CUSTOMIZED_PROGRESSER_FACTORY_CLASS,
DEFAULT_CUSTOMIZED_PROGRESSER_FACTORY_CLASS);
+ this.customizedProgresser =
Class.forName(factoryClassName).asSubclass(CustomizedProgresser.Factory.class)
+ .newInstance().createCustomizedProgresser(context);
+
this.fs = FileSystem.get(context.getConfiguration());
this.taskStateStore =
new FsStateStore<>(this.fs,
FileOutputFormat.getOutputPath(context).toUri().getPath(), TaskState.class);
@@ -714,8 +750,8 @@ public class MRJobLauncher extends AbstractJobLauncher {
if (!foundStateFile) {
throw new IOException("Job state file not found.");
}
- } catch (IOException ioe) {
- throw new RuntimeException("Failed to setup the mapper task", ioe);
+ } catch (IOException | ReflectiveOperationException e) {
+ throw new RuntimeException("Failed to setup the mapper task", e);
}
@@ -747,8 +783,7 @@ public class MRJobLauncher extends AbstractJobLauncher {
configuration.get(ConfigurationKeys.METRICS_ENABLED_KEY,
ConfigurationKeys.DEFAULT_METRICS_ENABLED))) {
this.jobMetrics = Optional.of(JobMetrics.get(this.jobState));
this.jobMetrics.get()
-
.startMetricReportingWithFileSuffix(HadoopUtils.getStateFromConf(configuration),
- context.getTaskAttemptID().toString());
+ .startMetricReportingWithFileSuffix(gobblinJobState,
context.getTaskAttemptID().toString());
}
}
@@ -768,6 +803,13 @@ public class MRJobLauncher extends AbstractJobLauncher {
while (context.nextKeyValue()) {
this.map(context.getCurrentKey(), context.getCurrentValue(),
context);
}
+
+ // org.apache.hadoop.util.Progress.complete will set the progress to
1.0f eventually so we don't have to
+ // set it in finally block.
+ if (customizedProgressEnabled) {
+ setProgressInMapper(customizedProgresser.getCustomizedProgress(),
context);
+ }
+
GobblinMultiTaskAttempt.CommitPolicy multiTaskAttemptCommitPolicy =
isSpeculativeEnabled ?
GobblinMultiTaskAttempt.CommitPolicy.CUSTOMIZED
: GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE;
@@ -849,5 +891,26 @@ public class MRJobLauncher extends AbstractJobLauncher {
this.workUnits.add(workUnit);
}
}
+
+ /**
+ * Setting progress within implementation of {@link Mapper} for reporting
progress.
+ * Gobblin (when running in MR mode) used to report progress only in
{@link GobblinWorkUnitsInputFormat} while
+ * deserializing {@link WorkUnit} in MapReduce job. In that scenario,
whenever workunit is deserialized (but not yet
+ * executed) the progress will be reported as 1.0f. This could implicitly
disable the feature of speculative-execution
+ * provided by MR-framework as the latter is looking at the progress to
determine if speculative-execution is necessary
+ * to trigger or not.
+ *
+ * Different application of Gobblin should have customized logic on
calculating progress.
+ */
+ void setProgressInMapper(float progress, Context context) {
+ try {
+ WrappedMapper.Context wrappedContext = ((WrappedMapper.Context)
context);
+ Object contextImpl =
RestrictedFieldAccessingUtils.getRestrictedFieldByReflection(wrappedContext,
"mapContext", wrappedContext.getClass());
+
((org.apache.hadoop.mapred.Task.TaskReporter)RestrictedFieldAccessingUtils
+ .getRestrictedFieldByReflectionRecursively(contextImpl,
"reporter", MapContextImpl.class)).setProgress(progress);
+ } catch (NoSuchFieldException | IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
}
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/test/TestExtractor.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/test/TestExtractor.java
index 8843d67..79126f9 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/test/TestExtractor.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/test/TestExtractor.java
@@ -71,7 +71,7 @@ public class TestExtractor implements Extractor<String,
String> {
FileSystem fs = FileSystem
.get(URI.create(workUnitState.getProp(ConfigurationKeys.FS_URI_KEY,
ConfigurationKeys.LOCAL_FS_URI)),
new Configuration());
- fs.makeQualified(sourceFile);
+ sourceFile = new Path(fs.makeQualified(sourceFile).toUri().getRawPath());
this.dataFileReader =
new DataFileReader<GenericRecord>(new FsInput(sourceFile, new
Configuration()), datumReader);
} catch (IOException ioe) {
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/reflection/RestrictedFieldAccessingUtils.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/reflection/RestrictedFieldAccessingUtils.java
new file mode 100644
index 0000000..70e5715
--- /dev/null
+++
b/gobblin-utility/src/main/java/org/apache/gobblin/util/reflection/RestrictedFieldAccessingUtils.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.reflection;
+
+import java.lang.reflect.Field;
+import java.util.Arrays;
+
+
+/**
+ * These are hacky methods that should only be used when there are
access-modifiers prevents accessing fields that
+ * are essential for application code to access.
+ */
+public class RestrictedFieldAccessingUtils {
+ private RestrictedFieldAccessingUtils() {
+ }
+
+ /**
+ * Getting field defined in containingObj which was not publicly-accessible,
using java-reflection.
+ */
+ public static Object getRestrictedFieldByReflection(Object containingObj,
String fieldName, Class clazz)
+ throws NoSuchFieldException, IllegalAccessException {
+ Field field = clazz.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ return field.get(containingObj);
+ }
+
+ /**
+ * Getting field defined in superclass(es) which was not
publicly-accessible, using java-reflection.
+ */
+ public static Object getRestrictedFieldByReflectionRecursively(Object
containingObj, String fieldName, Class clazz)
+ throws NoSuchFieldException, IllegalAccessException {
+
+ // When it reaches Object.class level and still not find the field, throw
exception.
+ if (clazz.getCanonicalName().equals("java.lang.Object")) {
+ throw new NoSuchFieldException(
+ String.format("Field %s doesn't exist in specified class and its
ancestors", fieldName));
+ }
+
+ if (!Arrays.asList(clazz.getDeclaredFields()).stream()
+ .anyMatch(x -> x.getName().equals(fieldName))) {
+ return getRestrictedFieldByReflectionRecursively(containingObj,
fieldName, clazz.getSuperclass());
+ } else {
+ return getRestrictedFieldByReflection(containingObj, fieldName, clazz);
+ }
+ }
+}
diff --git
a/gobblin-utility/src/test/java/org/apache/gobblin/util/reflection/BaseClass.java
b/gobblin-utility/src/test/java/org/apache/gobblin/util/reflection/BaseClass.java
new file mode 100644
index 0000000..08690ae
--- /dev/null
+++
b/gobblin-utility/src/test/java/org/apache/gobblin/util/reflection/BaseClass.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.reflection;
+
+/**
+ * A testing Class for {@link RestrictedFieldAccessingUtilsTest}.
+ */
+public class BaseClass {
+ private EnclosedClass enclose;
+ int a;
+
+ public BaseClass(int a) {
+ enclose = new EnclosedClass(a);
+ this.a = a;
+ }
+
+ /**
+ * Exposed field inside enclose to verify object's exposure and set-type of
method.
+ */
+ public int getEnclosingValue() {
+ return enclose.getValue();
+ }
+}
diff --git
a/gobblin-utility/src/test/java/org/apache/gobblin/util/reflection/DerivedClass.java
b/gobblin-utility/src/test/java/org/apache/gobblin/util/reflection/DerivedClass.java
new file mode 100644
index 0000000..b345206
--- /dev/null
+++
b/gobblin-utility/src/test/java/org/apache/gobblin/util/reflection/DerivedClass.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.reflection;
+
+public class DerivedClass extends BaseClass {
+ public DerivedClass(int a) {
+ super(a);
+ }
+}
diff --git
a/gobblin-utility/src/test/java/org/apache/gobblin/util/reflection/EnclosedClass.java
b/gobblin-utility/src/test/java/org/apache/gobblin/util/reflection/EnclosedClass.java
new file mode 100644
index 0000000..2a6bb52
--- /dev/null
+++
b/gobblin-utility/src/test/java/org/apache/gobblin/util/reflection/EnclosedClass.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.reflection;
+
+import lombok.Getter;
+import lombok.Setter;
+
+
+public class EnclosedClass {
+ public EnclosedClass(int value) {
+ this.value = value;
+ }
+
+ @Setter
+ @Getter
+ int value;
+}
diff --git
a/gobblin-utility/src/test/java/org/apache/gobblin/util/reflection/RestrictedFieldAccessingUtilsTest.java
b/gobblin-utility/src/test/java/org/apache/gobblin/util/reflection/RestrictedFieldAccessingUtilsTest.java
new file mode 100644
index 0000000..d3079b1
--- /dev/null
+++
b/gobblin-utility/src/test/java/org/apache/gobblin/util/reflection/RestrictedFieldAccessingUtilsTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.reflection;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class RestrictedFieldAccessingUtilsTest {
+
+ @Test
+ public void testGetRestrictedFieldByReflection()
+ throws Exception {
+ BaseClass baseClass = new BaseClass(5);
+ int a = (int)
RestrictedFieldAccessingUtils.getRestrictedFieldByReflection(baseClass, "a",
baseClass.getClass());
+ Assert.assertEquals(a, 5);
+ }
+
+ @Test
+ public void testGetRestrictedFieldByReflectionRecursively()
+ throws Exception {
+ DerivedClass derivedClass = new DerivedClass(5);
+ Assert.assertEquals(derivedClass.getEnclosingValue(), 5);
+ ((EnclosedClass) RestrictedFieldAccessingUtils
+ .getRestrictedFieldByReflectionRecursively(derivedClass, "enclose",
derivedClass.getClass())).setValue(100);
+ Assert.assertEquals(derivedClass.getEnclosingValue(), 100);
+ }
+
+ @Test
+ public void testNoSuchFieldException()
+ throws Exception {
+ DerivedClass derivedClass = new DerivedClass(5);
+ try {
+ RestrictedFieldAccessingUtils
+ .getRestrictedFieldByReflectionRecursively(derivedClass, "non",
derivedClass.getClass());
+ } catch (NoSuchFieldException ne) {
+ Assert.assertTrue(true);
+ return;
+ }
+
+ // Should never reach here.
+ Assert.assertTrue(false);
+ }
+}
\ No newline at end of file