This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new dce9e2520 [GOBBLIN-1919] Simplify a few elements of MR-related job
exec before reusing code in Temporal-based execution (#3784)
dce9e2520 is described below
commit dce9e2520a144f37c63293776a5b02a8349be299
Author: Kip Kohn <[email protected]>
AuthorDate: Thu Oct 26 17:12:39 2023 -0700
[GOBBLIN-1919] Simplify a few elements of MR-related job exec before
reusing code in Temporal-based execution (#3784)
* Simplify a few elements of MR-related job exec before reusing code in
Temporal-based execution
* Add JSON-ification to several foundational config-state representations,
plus encapsulated convience method `JobState.getJobIdFromProps`
* Update javadoc comments
* Encapsulate check for whether a path has the extension of a
multi-work-unit
---
.../gobblin/configuration/ConfigurationKeys.java | 2 +-
.../gobblin/source/workunit/MultiWorkUnit.java | 5 +
.../apache/gobblin/source/workunit/WorkUnit.java | 58 +++++++++
.../gobblin/cluster/GobblinHelixJobLauncher.java | 20 +--
.../org/apache/gobblin/cluster/SingleTask.java | 3 +-
.../gobblin/data/management/copy/CopyableFile.java | 50 +++++++-
.../gobblin/runtime/AbstractJobLauncher.java | 3 -
.../org/apache/gobblin/runtime/JobContext.java | 6 +-
.../java/org/apache/gobblin/runtime/JobState.java | 142 +++++----------------
.../java/org/apache/gobblin/runtime/TaskState.java | 138 +++++---------------
.../runtime/mapreduce/GobblinOutputCommitter.java | 34 ++---
.../gobblin/runtime/mapreduce/MRJobLauncher.java | 68 +++++-----
.../org/apache/gobblin/util/JobLauncherUtils.java | 33 ++++-
13 files changed, 263 insertions(+), 299 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 6d36d9ff3..a50ba8c75 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -725,7 +725,7 @@ public class ConfigurationKeys {
public static final int DEFAULT_MR_JOB_MAX_MAPPERS = 100;
public static final boolean DEFAULT_MR_JOB_MAPPER_FAILURE_IS_FATAL = false;
public static final boolean DEFAULT_MR_PERSIST_WORK_UNITS_THEN_CANCEL =
false;
- public static final String DEFAULT_ENABLE_MR_SPECULATIVE_EXECUTION = "false";
+ public static final boolean DEFAULT_ENABLE_MR_SPECULATIVE_EXECUTION = false;
/**
* Configuration properties used by the distributed job launcher.
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/MultiWorkUnit.java
b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/MultiWorkUnit.java
index d254de065..1392fa97f 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/MultiWorkUnit.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/MultiWorkUnit.java
@@ -55,6 +55,11 @@ public class MultiWorkUnit extends WorkUnit {
super();
}
+ @Override
+ public boolean isMultiWorkUnit() {
+ return true;
+ }
+
/**
* Get an immutable list of {@link WorkUnit}s wrapped by this {@link
MultiWorkUnit}.
*
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java
b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java
index bf38c3525..139e60c01 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java
@@ -24,10 +24,12 @@ import org.apache.gobblin.configuration.State;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.io.StringWriter;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
+import com.google.gson.stream.JsonWriter;
import org.apache.gobblin.source.extractor.Extractor;
import org.apache.gobblin.source.extractor.Watermark;
@@ -134,6 +136,11 @@ public class WorkUnit extends State {
this.extract = other.getExtract();
}
+ /** @return whether a multi-work-unit (or else a singular one) */
+ public boolean isMultiWorkUnit() {
+ return false; // more efficient than `this instanceof MultiWorkUnit` plus
no circular dependency
+ }
+
/**
* Factory method.
*
@@ -365,6 +372,57 @@ public class WorkUnit extends State {
return result;
}
+ /** @return pretty-printed JSON, including all properties */
+ public String toJsonString() {
+ StringWriter stringWriter = new StringWriter();
+ try (JsonWriter jsonWriter = new JsonWriter(stringWriter)) {
+ jsonWriter.setIndent("\t");
+ this.toJson(jsonWriter);
+ } catch (IOException ioe) {
+ // Ignored
+ }
+ return stringWriter.toString();
+ }
+
+ public void toJson(JsonWriter jsonWriter) throws IOException {
+ jsonWriter.beginObject();
+
+ jsonWriter.name("id").value(this.getId());
+ jsonWriter.name("properties");
+ jsonWriter.beginObject();
+ for (String key : this.getPropertyNames()) {
+ jsonWriter.name(key).value(this.getProp(key));
+ }
+ jsonWriter.endObject();
+
+ jsonWriter.name("extract");
+ jsonWriter.beginObject();
+ jsonWriter.name("extractId").value(this.getExtract().getId());
+ jsonWriter.name("extractProperties");
+ jsonWriter.beginObject();
+ for (String key : this.getExtract().getPropertyNames()) {
+ jsonWriter.name(key).value(this.getExtract().getProp(key));
+ }
+ jsonWriter.endObject();
+
+ State prevTableState = this.getExtract().getPreviousTableState();
+ if (prevTableState != null) {
+ jsonWriter.name("extractPrevTableState");
+ jsonWriter.beginObject();
+ jsonWriter.name("prevStateId").value(prevTableState.getId());
+ jsonWriter.name("prevStateProperties");
+ jsonWriter.beginObject();
+ for (String key : prevTableState.getPropertyNames()) {
+ jsonWriter.name(key).value(prevTableState.getProp(key));
+ }
+ jsonWriter.endObject();
+ jsonWriter.endObject();
+ }
+ jsonWriter.endObject();
+
+ jsonWriter.endObject();
+ }
+
public String getOutputFilePath() {
// Search for the properties in the workunit.
// This search for the property first in State and then in the Extract of
this workunit.
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index cf324b443..f0fcc258f 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -71,7 +71,6 @@ import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.runtime.TaskStateCollectorService;
import org.apache.gobblin.runtime.listeners.JobListener;
import org.apache.gobblin.runtime.util.StateStores;
-import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.JobLauncherUtils;
@@ -110,8 +109,6 @@ public class GobblinHelixJobLauncher extends
AbstractJobLauncher {
private static final Logger LOGGER =
LoggerFactory.getLogger(GobblinHelixJobLauncher.class);
- private static final String WORK_UNIT_FILE_EXTENSION = ".wu";
-
private final HelixManager helixManager;
private final TaskDriver helixTaskDriver;
private final String helixWorkFlowName;
@@ -345,7 +342,7 @@ public class GobblinHelixJobLauncher extends
AbstractJobLauncher {
try (ParallelRunner stateSerDeRunner = new
ParallelRunner(this.stateSerDeRunnerThreads, this.fs)) {
int multiTaskIdSequence = 0;
for (WorkUnit workUnit : workUnits) {
- if (workUnit instanceof MultiWorkUnit) {
+ if (workUnit.isMultiWorkUnit()) {
workUnit.setId(JobLauncherUtils.newMultiTaskId(this.jobContext.getJobId(),
multiTaskIdSequence++));
}
addWorkUnit(workUnit, stateSerDeRunner, taskConfigMap);
@@ -535,15 +532,12 @@ public class GobblinHelixJobLauncher extends
AbstractJobLauncher {
private void deleteWorkUnitFromStateStore(String workUnitId, ParallelRunner
stateSerDeRunner) {
String workUnitFilePath =
workUnitToHelixConfig.get(workUnitId).getConfigMap().get(GobblinClusterConfigurationKeys.WORK_UNIT_FILE_PATH);
- final StateStore stateStore;
Path workUnitFile = new Path(workUnitFilePath);
final String fileName = workUnitFile.getName();
final String storeName = workUnitFile.getParent().getName();
- if (fileName.endsWith(MULTI_WORK_UNIT_FILE_EXTENSION)) {
- stateStore = stateStores.getMwuStateStore();
- } else {
- stateStore = stateStores.getWuStateStore();
- }
+ final StateStore stateStore =
JobLauncherUtils.hasMultiWorkUnitExtension(workUnitFile)
+ ? stateStores.getMwuStateStore()
+ : stateStores.getWuStateStore();
stateSerDeRunner.submitCallable(new Callable<Void>() {
@Override
public Void call() throws Exception {
@@ -561,11 +555,11 @@ public class GobblinHelixJobLauncher extends
AbstractJobLauncher {
final StateStore stateStore;
String workUnitFileName = workUnit.getId();
- if (workUnit instanceof MultiWorkUnit) {
- workUnitFileName += MULTI_WORK_UNIT_FILE_EXTENSION;
+ if (workUnit.isMultiWorkUnit()) {
+ workUnitFileName += JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION;
stateStore = stateStores.getMwuStateStore();
} else {
- workUnitFileName += WORK_UNIT_FILE_EXTENSION;
+ workUnitFileName += JobLauncherUtils.WORK_UNIT_FILE_EXTENSION;
stateStore = stateStores.getWuStateStore();
}
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
index 93caab41c..5d67c9f51 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
@@ -39,7 +39,6 @@ import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
-import org.apache.gobblin.runtime.AbstractJobLauncher;
import org.apache.gobblin.runtime.GobblinMultiTaskAttempt;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.util.StateStores;
@@ -184,7 +183,7 @@ public class SingleTask {
WorkUnit workUnit;
try {
- if
(_workUnitFilePath.getName().endsWith(AbstractJobLauncher.MULTI_WORK_UNIT_FILE_EXTENSION))
{
+ if (JobLauncherUtils.hasMultiWorkUnitExtension(_workUnitFilePath)) {
workUnit = _stateStores.getMwuStateStore().getAll(storeName,
fileName).get(0);
} else {
workUnit = _stateStores.getWuStateStore().getAll(storeName,
fileName).get(0);
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
index 85fa80f0f..9e9af05e3 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
@@ -17,8 +17,8 @@
package org.apache.gobblin.data.management.copy;
-import com.google.common.cache.Cache;
import java.io.IOException;
+import java.io.StringWriter;
import java.util.List;
import java.util.Map;
@@ -34,8 +34,10 @@ import org.apache.hadoop.fs.permission.FsPermission;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.gson.stream.JsonWriter;
import lombok.AccessLevel;
import lombok.EqualsAndHashCode;
@@ -132,6 +134,52 @@ public class CopyableFile extends CopyEntity implements
File {
this.datasetOutputPath = datasetOutputPath;
}
+ /** @return pretty-printed JSON, including all metadata */
+ public String toJsonString() {
+ return toJsonString(true);
+ }
+
+ /** @return pretty-printed JSON, optionally including metadata */
+ public String toJsonString(boolean includeMetadata) {
+ StringWriter stringWriter = new StringWriter();
+ try (JsonWriter jsonWriter = new JsonWriter(stringWriter)) {
+ jsonWriter.setIndent("\t");
+ this.toJson(jsonWriter, includeMetadata);
+ } catch (IOException ioe) {
+ // Ignored
+ }
+ return stringWriter.toString();
+ }
+
+ public void toJson(JsonWriter jsonWriter, boolean includeMetadata) throws
IOException {
+ jsonWriter.beginObject();
+
+ jsonWriter
+ .name("file set").value(this.getFileSet())
+ .name("origin").value(this.getOrigin().toString())
+ .name("destination").value(this.getDestination().toString())
+
.name("destinationOwnerAndPermission").value(this.getDestinationOwnerAndPermission().toString())
+ // TODO:
+ // this.ancestorsOwnerAndPermission
+ // this.checksum
+ // this.preserve
+ // this.dataFileVersionStrategy
+ // this.originTimestamp
+ // this.upstreamTimestamp
+
.name("datasetOutputPath").value(this.getDatasetOutputPath().toString());
+
+ if (includeMetadata && this.getAdditionalMetadata() != null) {
+ jsonWriter.name("metadata");
+ jsonWriter.beginObject();
+ for (Map.Entry<String, String> entry :
this.getAdditionalMetadata().entrySet()) {
+ jsonWriter.name(entry.getKey()).value(entry.getValue());
+ }
+ jsonWriter.endObject();
+ }
+
+ jsonWriter.endObject();
+ }
+
/**
* Set file system based source and destination dataset for this {@link
CopyableFile}
*
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 3f2570c68..bf86e628e 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -129,9 +129,6 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
public static final String JOB_STATE_FILE_NAME = "job.state";
- public static final String WORK_UNIT_FILE_EXTENSION = ".wu";
- public static final String MULTI_WORK_UNIT_FILE_EXTENSION = ".mwu";
-
public static final String GOBBLIN_JOB_TEMPLATE_KEY = "gobblin.template.uri";
public static final String NUM_WORKUNITS = "numWorkUnits";
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
index 5c6eb3eae..658b308b7 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
@@ -74,7 +74,6 @@ import org.apache.gobblin.util.Either;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.Id;
-import org.apache.gobblin.util.JobLauncherUtils;
import org.apache.gobblin.util.executors.IteratorExecutor;
@@ -140,10 +139,9 @@ public class JobContext implements Closeable {
"A job must have a job name specified by job.name");
this.jobName = JobState.getJobNameFromProps(jobProps);
- this.jobId = jobProps.containsKey(ConfigurationKeys.JOB_ID_KEY) ?
jobProps.getProperty(ConfigurationKeys.JOB_ID_KEY)
- : JobLauncherUtils.newJobId(this.jobName);
+ this.jobId = JobState.getJobIdFromProps(jobProps);
+ jobProps.setProperty(ConfigurationKeys.JOB_ID_KEY, this.jobId); // in case
not yet directly defined as such
this.jobSequence = Long.toString(Id.Job.parse(this.jobId).getSequence());
- jobProps.setProperty(ConfigurationKeys.JOB_ID_KEY, this.jobId);
this.jobBroker = instanceBroker.newSubscopedBuilder(new
JobScopeInstance(this.jobName, this.jobId))
.withOverridingConfig(ConfigUtils.propertiesToConfig(jobProps)).build();
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
index 02cf3c9df..5f8783fcc 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
@@ -27,6 +27,9 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
+import lombok.Getter;
+import lombok.Setter;
+
import org.apache.gobblin.metastore.DatasetStateStore;
import org.apache.gobblin.runtime.job.JobProgress;
@@ -64,6 +67,7 @@ import org.apache.gobblin.runtime.util.MetricGroup;
import org.apache.gobblin.source.extractor.JobCommitPolicy;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.ImmutableProperties;
+import org.apache.gobblin.util.JobLauncherUtils;
/**
@@ -127,12 +131,22 @@ public class JobState extends SourceState implements
JobProgress {
}
}
+ @Getter @Setter
private String jobName;
+ @Getter @Setter
private String jobId;
+ /** job start time in milliseconds */
+ @Getter @Setter
private long startTime = 0;
+ /** job end time in milliseconds */
+ @Getter @Setter
private long endTime = 0;
+ /** job duration in milliseconds */
+ @Getter @Setter
private long duration = 0;
private RunningState state = RunningState.PENDING;
+ /** the number of tasks this job consists of */
+ @Getter @Setter
private int taskCount = 0;
private final Map<String, TaskState> taskStates = Maps.newLinkedHashMap();
// Skipped task states shouldn't be exposed to publisher, but they need to
be in JobState and DatasetState so that they can be written to StateStore.
@@ -149,7 +163,7 @@ public class JobState extends SourceState implements
JobProgress {
this.setId(jobId);
}
- public JobState(State properties,String jobName, String jobId) {
+ public JobState(State properties, String jobName, String jobId) {
super(properties);
this.jobName = jobName;
this.jobId = jobId;
@@ -172,6 +186,11 @@ public class JobState extends SourceState implements
JobProgress {
return props.getProperty(ConfigurationKeys.JOB_NAME_KEY);
}
+ public static String getJobIdFromProps(Properties props) {
+ return props.containsKey(ConfigurationKeys.JOB_ID_KEY) ?
props.getProperty(ConfigurationKeys.JOB_ID_KEY)
+ : JobLauncherUtils.newJobId(JobState.getJobNameFromProps(props));
+ }
+
public static String getJobGroupFromState(State state) {
return state.getProp(ConfigurationKeys.JOB_GROUP_KEY);
}
@@ -188,69 +207,6 @@ public class JobState extends SourceState implements
JobProgress {
return props.getProperty(ConfigurationKeys.JOB_DESCRIPTION_KEY);
}
- /**
- * Get job name.
- *
- * @return job name
- */
- public String getJobName() {
- return this.jobName;
- }
-
- /**
- * Set job name.
- *
- * @param jobName job name
- */
- public void setJobName(String jobName) {
- this.jobName = jobName;
- }
-
- /**
- * Get job ID.
- *
- * @return job ID
- */
- public String getJobId() {
- return this.jobId;
- }
-
- /**
- * Set job ID.
- *
- * @param jobId job ID
- */
- public void setJobId(String jobId) {
- this.jobId = jobId;
- }
-
- /**
- * Get job start time.
- *
- * @return job start time
- */
- public long getStartTime() {
- return this.startTime;
- }
-
- /**
- * Set job start time.
- *
- * @param startTime job start time
- */
- public void setStartTime(long startTime) {
- this.startTime = startTime;
- }
-
- /**
- * Get job end time.
- *
- * @return job end time
- */
- public long getEndTime() {
- return this.endTime;
- }
-
/**
* Get the currently elapsed time for this job.
* @return
@@ -265,33 +221,6 @@ public class JobState extends SourceState implements
JobProgress {
return 0;
}
- /**
- * Set job end time.
- *
- * @param endTime job end time
- */
- public void setEndTime(long endTime) {
- this.endTime = endTime;
- }
-
- /**
- * Get job duration in milliseconds.
- *
- * @return job duration in milliseconds
- */
- public long getDuration() {
- return this.duration;
- }
-
- /**
- * Set job duration in milliseconds.
- *
- * @param duration job duration in milliseconds
- */
- public void setDuration(long duration) {
- this.duration = duration;
- }
-
/**
* Get job running state of type {@link RunningState}.
*
@@ -310,24 +239,6 @@ public class JobState extends SourceState implements
JobProgress {
this.state = state;
}
- /**
- * Get the number of tasks this job consists of.
- *
- * @return number of tasks this job consists of
- */
- public int getTaskCount() {
- return this.taskCount;
- }
-
- /**
- * Set the number of tasks this job consists of.
- *
- * @param taskCount number of tasks this job consists of
- */
- public void setTaskCount(int taskCount) {
- this.taskCount = taskCount;
- }
-
/**
* If not already present, set the {@link
ConfigurationKeys#JOB_FAILURE_EXCEPTION_KEY} to a {@link String}
* representation of the given {@link Throwable}.
@@ -682,12 +593,23 @@ public class JobState extends SourceState implements
JobProgress {
return result;
}
+ /** @return pretty-printed JSON, without including properties */
@Override
public String toString() {
+ return toJsonString(false);
+ }
+
+ /** @return pretty-printed JSON, including all properties */
+ public String toJsonString() {
+ return toJsonString(true);
+ }
+
+ /** @return pretty-printed JSON, optionally including properties */
+ public String toJsonString(boolean includeProperties) {
StringWriter stringWriter = new StringWriter();
try (JsonWriter jsonWriter = new JsonWriter(stringWriter)) {
jsonWriter.setIndent("\t");
- this.toJson(jsonWriter, false);
+ this.toJson(jsonWriter, includeProperties);
} catch (IOException ioe) {
// Ignored
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskState.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskState.java
index 903d94b18..60c475203 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskState.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskState.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.runtime;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.io.StringWriter;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
@@ -40,6 +41,7 @@ import com.linkedin.data.template.StringMap;
import javax.annotation.Nullable;
import lombok.Getter;
+import lombok.Setter;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.WorkUnitState;
@@ -89,15 +91,27 @@ public class TaskState extends WorkUnitState implements
TaskProgress {
*/
private static final String BYTES_PER_SECOND = "bytesPerSec";
+ /** ID of the job this {@link TaskState} is for */
+ @Getter @Setter
private String jobId;
+ /** ID of the task this {@link TaskState} is for */
+ @Getter @Setter
private String taskId;
+ /** sequence number of the task this {@link TaskState} is for */
+ @Getter
private String taskKey;
@Getter
private Optional<String> taskAttemptId;
+ /** task start time in milliseconds */
+ @Getter @Setter
private long startTime = 0;
+ /** task end time in milliseconds */
+ @Getter @Setter
private long endTime = 0;
- private long duration;
+ /** task duration in milliseconds */
+ @Getter @Setter
+ private long taskDuration;
// Needed for serialization/deserialization
public TaskState() {}
@@ -123,105 +137,6 @@ public class TaskState extends WorkUnitState implements
TaskProgress {
this.setId(this.taskId);
}
- /**
- * Get the ID of the job this {@link TaskState} is for.
- *
- * @return ID of the job this {@link TaskState} is for
- */
- public String getJobId() {
- return this.jobId;
- }
-
- /**
- * Set the ID of the job this {@link TaskState} is for.
- *
- * @param jobId ID of the job this {@link TaskState} is for
- */
- public void setJobId(String jobId) {
- this.jobId = jobId;
- }
-
- /**
- * Get the sequence number of the task this {@link TaskState} is for.
- *
- * @return Sequence number of the task this {@link TaskState} is for
- */
- public String getTaskKey() {
- return this.taskKey;
- }
-
- /**
- * Get the ID of the task this {@link TaskState} is for.
- *
- * @return ID of the task this {@link TaskState} is for
- */
- public String getTaskId() {
- return this.taskId;
- }
-
- /**
- * Set the ID of the task this {@link TaskState} is for.
- *
- * @param taskId ID of the task this {@link TaskState} is for
- */
- public void setTaskId(String taskId) {
- this.taskId = taskId;
- }
-
- /**
- * Get task start time in milliseconds.
- *
- * @return task start time in milliseconds
- */
- public long getStartTime() {
- return this.startTime;
- }
-
- /**
- * Set task start time in milliseconds.
- *
- * @param startTime task start time in milliseconds
- */
- public void setStartTime(long startTime) {
- this.startTime = startTime;
- }
-
- /**
- * Get task end time in milliseconds.
- *
- * @return task end time in milliseconds
- */
- public long getEndTime() {
- return this.endTime;
- }
-
- /**
- * set task end time in milliseconds.
- *
- * @param endTime task end time in milliseconds
- */
- public void setEndTime(long endTime) {
- this.endTime = endTime;
- }
-
- /**
- * Get task duration in milliseconds.
- *
- * @return task duration in milliseconds
- */
- public long getTaskDuration() {
- return this.duration;
- }
-
- /**
- * Set task duration in milliseconds.
- *
- * @param duration task duration in milliseconds
- */
- public void setTaskDuration(long duration) {
- this.duration = duration;
- }
-
/**
* Get the {@link ConfigurationKeys#TASK_FAILURE_EXCEPTION_KEY} if it
exists, else return {@link Optional#absent()}.
*/
@@ -348,7 +263,7 @@ public class TaskState extends WorkUnitState implements
TaskProgress {
this.setId(this.taskId);
this.startTime = in.readLong();
this.endTime = in.readLong();
- this.duration = in.readLong();
+ this.taskDuration = in.readLong();
super.readFields(in);
}
@@ -361,7 +276,7 @@ public class TaskState extends WorkUnitState implements
TaskProgress {
text.write(out);
out.writeLong(this.startTime);
out.writeLong(this.endTime);
- out.writeLong(this.duration);
+ out.writeLong(this.taskDuration);
super.write(out);
}
@@ -384,6 +299,23 @@ public class TaskState extends WorkUnitState implements
TaskProgress {
return result;
}
+ /** @return pretty-printed JSON, including all properties */
+ public String toJsonString() {
+ return toJsonString(true);
+ }
+
+ /** @return pretty-printed JSON, optionally including properties */
+ public String toJsonString(boolean includeProperties) {
+ StringWriter stringWriter = new StringWriter();
+ try (JsonWriter jsonWriter = new JsonWriter(stringWriter)) {
+ jsonWriter.setIndent("\t");
+ this.toJson(jsonWriter, includeProperties);
+ } catch (IOException ioe) {
+ // Ignored
+ }
+ return stringWriter.toString();
+ }
+
/**
* Convert this {@link TaskState} to a json document.
*
@@ -432,7 +364,7 @@ public class TaskState extends WorkUnitState implements
TaskProgress {
if (this.endTime > 0) {
taskExecutionInfo.setEndTime(this.endTime);
}
- taskExecutionInfo.setDuration(this.duration);
+ taskExecutionInfo.setDuration(this.taskDuration);
taskExecutionInfo.setState(TaskStateEnum.valueOf(getWorkingState().name()));
if (this.contains(ConfigurationKeys.TASK_FAILURE_EXCEPTION_KEY)) {
taskExecutionInfo.setFailureException(this.getProp(ConfigurationKeys.TASK_FAILURE_EXCEPTION_KEY));
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/GobblinOutputCommitter.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/GobblinOutputCommitter.java
index 65401c03d..57ba0936d 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/GobblinOutputCommitter.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/GobblinOutputCommitter.java
@@ -40,7 +40,6 @@ import com.google.common.io.Closer;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.WorkUnitState;
-import org.apache.gobblin.runtime.AbstractJobLauncher;
import org.apache.gobblin.runtime.GobblinMultiTaskAttempt;
import org.apache.gobblin.runtime.listeners.JobListener;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
@@ -89,28 +88,18 @@ public class GobblinOutputCommitter extends OutputCommitter
{
Closer workUnitFileCloser = Closer.create();
- // If the file ends with ".wu" de-serialize it into a WorkUnit
- if
(status.getPath().getName().endsWith(AbstractJobLauncher.WORK_UNIT_FILE_EXTENSION))
{
- WorkUnit wu = WorkUnit.createEmpty();
- try {
- wu.readFields(workUnitFileCloser.register(new
DataInputStream(fs.open(status.getPath()))));
- } finally {
- workUnitFileCloser.close();
- }
- JobLauncherUtils.cleanTaskStagingData(new WorkUnitState(wu), LOG);
+ WorkUnit wu =
JobLauncherUtils.createEmptyWorkUnitPerExtension(status.getPath());
+ try {
+ wu.readFields(workUnitFileCloser.register(new
DataInputStream(fs.open(status.getPath()))));
+ } finally {
+ workUnitFileCloser.close();
}
-
- // If the file ends with ".mwu" de-serialize it into a MultiWorkUnit
- if
(status.getPath().getName().endsWith(AbstractJobLauncher.MULTI_WORK_UNIT_FILE_EXTENSION))
{
- MultiWorkUnit mwu = MultiWorkUnit.createEmpty();
- try {
- mwu.readFields(workUnitFileCloser.register(new
DataInputStream(fs.open(status.getPath()))));
- } finally {
- workUnitFileCloser.close();
- }
- for (WorkUnit wu : mwu.getWorkUnits()) {
- JobLauncherUtils.cleanTaskStagingData(new WorkUnitState(wu), LOG);
+ if (wu instanceof MultiWorkUnit) {
+ for (WorkUnit eachWU : ((MultiWorkUnit) wu).getWorkUnits()) {
+ JobLauncherUtils.cleanTaskStagingData(new WorkUnitState(eachWU),
LOG);
}
+ } else {
+ JobLauncherUtils.cleanTaskStagingData(new WorkUnitState(wu), LOG);
}
}
} finally {
@@ -173,8 +162,7 @@ public class GobblinOutputCommitter extends OutputCommitter
{
private static class WorkUnitFilter implements PathFilter {
@Override
public boolean accept(Path path) {
- return
path.getName().endsWith(AbstractJobLauncher.WORK_UNIT_FILE_EXTENSION)
- ||
path.getName().endsWith(AbstractJobLauncher.MULTI_WORK_UNIT_FILE_EXTENSION);
+ return JobLauncherUtils.hasAnyWorkUnitExtension(path);
}
}
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
index cd34fe1b5..1893cebd7 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
@@ -25,6 +25,7 @@ import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -56,7 +57,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -523,19 +523,18 @@ public class MRJobLauncher extends AbstractJobLauncher {
mrJobSetupTimer.stop();
}
- static boolean isSpeculativeExecutionEnabled(Properties props) {
- return Boolean.valueOf(
- props.getProperty(JobContext.MAP_SPECULATIVE,
ConfigurationKeys.DEFAULT_ENABLE_MR_SPECULATIVE_EXECUTION));
+ static boolean isBooleanPropEnabled(Properties props, String propKey,
Optional<Boolean> optDefault) {
+ return (props.containsKey(propKey) &&
Boolean.parseBoolean(props.getProperty(propKey)))
+ || (optDefault.isPresent() && optDefault.get());
}
- static boolean isCustomizedProgressReportEnabled(Properties properties) {
- return properties.containsKey(ENABLED_CUSTOMIZED_PROGRESS)
- &&
Boolean.parseBoolean(properties.getProperty(ENABLED_CUSTOMIZED_PROGRESS));
+ static boolean isSpeculativeExecutionEnabled(Properties props) {
+ return isBooleanPropEnabled(props, JobContext.MAP_SPECULATIVE,
+
Optional.of(ConfigurationKeys.DEFAULT_ENABLE_MR_SPECULATIVE_EXECUTION));
}
- static boolean isBooleanPropEnabled(Properties props, String propKey,
Optional<Boolean> optDefault) {
- return (props.containsKey(propKey) &&
Boolean.parseBoolean(props.getProperty(propKey)))
- || (optDefault.isPresent() && optDefault.get());
+ static boolean isCustomizedProgressReportEnabled(Properties properties) {
+ return isBooleanPropEnabled(properties, ENABLED_CUSTOMIZED_PROGRESS,
Optional.empty());
}
static boolean isMapperFailureFatalEnabled(Properties props) {
@@ -688,11 +687,11 @@ public class MRJobLauncher extends AbstractJobLauncher {
for (WorkUnit workUnit : workUnits) {
String workUnitFileName;
- if (workUnit instanceof MultiWorkUnit) {
+ if (workUnit.isMultiWorkUnit()) {
workUnitFileName =
JobLauncherUtils.newMultiTaskId(this.jobContext.getJobId(),
multiTaskIdSequence++)
- + MULTI_WORK_UNIT_FILE_EXTENSION;
+ + JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION;
} else {
- workUnitFileName = workUnit.getProp(ConfigurationKeys.TASK_ID_KEY) +
WORK_UNIT_FILE_EXTENSION;
+ workUnitFileName = workUnit.getProp(ConfigurationKeys.TASK_ID_KEY) +
JobLauncherUtils.WORK_UNIT_FILE_EXTENSION;
}
Path workUnitFile = new Path(this.jobInputPath, workUnitFileName);
LOG.debug("Writing work unit file " + workUnitFileName);
@@ -731,7 +730,7 @@ public class MRJobLauncher extends AbstractJobLauncher {
*/
@VisibleForTesting
void countersToMetrics(GobblinMetrics metrics) throws IOException {
- Optional<Counters> counters =
Optional.fromNullable(this.job.getCounters());
+ Optional<Counters> counters = Optional.ofNullable(this.job.getCounters());
if (counters.isPresent()) {
// Write job-level counters
@@ -772,7 +771,7 @@ public class MRJobLauncher extends AbstractJobLauncher {
private TaskExecutor taskExecutor;
private TaskStateTracker taskStateTracker;
private ServiceManager serviceManager;
- private Optional<JobMetrics> jobMetrics = Optional.absent();
+ private Optional<JobMetrics> jobMetrics = Optional.empty();
private boolean isSpeculativeEnabled;
private boolean customizedProgressEnabled;
private final JobState jobState = new JobState();
@@ -809,25 +808,18 @@ public class MRJobLauncher extends AbstractJobLauncher {
this.fs = FileSystem.get(context.getConfiguration());
this.taskStateStore =
new FsStateStore<>(this.fs,
FileOutputFormat.getOutputPath(context).toUri().getPath(), TaskState.class);
-
String jobStateFileName =
context.getConfiguration().get(ConfigurationKeys.JOB_STATE_DISTRIBUTED_CACHE_NAME);
- boolean foundStateFile = false;
- for (Path dcPath :
DistributedCache.getLocalCacheFiles(context.getConfiguration())) {
- if (dcPath.getName().equals(jobStateFileName)) {
- SerializationUtils.deserializeStateFromInputStream(
- closer.register(new
FileInputStream(dcPath.toUri().getPath())), this.jobState);
- foundStateFile = true;
- break;
- }
- }
- if (!foundStateFile) {
- throw new IOException("Job state file not found.");
+ Optional<URI> jobStateFileUri =
getStateFileUriForJob(context.getConfiguration(), jobStateFileName);
+ if (jobStateFileUri.isPresent()) {
+ SerializationUtils.deserializeStateFromInputStream(
+ closer.register(new
FileInputStream(jobStateFileUri.get().getPath())), this.jobState);
+ } else {
+ throw new IOException("Job state file not found: '" +
jobStateFileName + "'.");
}
} catch (IOException | ReflectiveOperationException e) {
throw new RuntimeException("Failed to setup the mapper task", e);
}
-
// load dynamic configuration to add to the job configuration
Configuration configuration = context.getConfiguration();
Config jobStateAsConfig =
ConfigUtils.propertiesToConfig(this.jobState.getProperties());
@@ -989,17 +981,17 @@ public class MRJobLauncher extends AbstractJobLauncher {
@Override
public void map(LongWritable key, Text value, Context context) throws
IOException, InterruptedException {
- WorkUnit workUnit =
(value.toString().endsWith(MULTI_WORK_UNIT_FILE_EXTENSION) ?
MultiWorkUnit.createEmpty()
- : WorkUnit.createEmpty());
- SerializationUtils.deserializeState(this.fs, new Path(value.toString()),
workUnit);
-
- if (workUnit instanceof MultiWorkUnit) {
- List<WorkUnit> flattenedWorkUnits =
- JobLauncherUtils.flattenWorkUnits(((MultiWorkUnit)
workUnit).getWorkUnits());
- this.workUnits.addAll(flattenedWorkUnits);
- } else {
- this.workUnits.add(workUnit);
+ this.workUnits.addAll(JobLauncherUtils.loadFlattenedWorkUnits(this.fs,
new Path(value.toString())));
+ }
+
+ /** @return {@link URI} if a distributed cache file matches
`jobStateFileName` */
+ protected Optional<URI> getStateFileUriForJob(Configuration conf, String
jobStateFileName) throws IOException {
+ for (Path dcPath : DistributedCache.getLocalCacheFiles(conf)) {
+ if (dcPath.getName().equals(jobStateFileName)) {
+ return Optional.of(dcPath.toUri());
+ }
}
+ return Optional.empty();
}
/**
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
index 42fae521e..1f20b1b73 100644
---
a/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
+++
b/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
@@ -55,6 +55,9 @@ import org.apache.gobblin.source.workunit.WorkUnit;
@Slf4j
public class JobLauncherUtils {
+ public static final String WORK_UNIT_FILE_EXTENSION = ".wu";
+ public static final String MULTI_WORK_UNIT_FILE_EXTENSION = ".mwu";
+
// A cache for proxied FileSystems by owners
private static Cache<String, FileSystem> fileSystemCacheByOwners =
CacheBuilder.newBuilder().build();
@@ -114,7 +117,7 @@ public class JobLauncherUtils {
public static List<WorkUnit> flattenWorkUnits(Collection<WorkUnit>
workUnits) {
List<WorkUnit> flattenedWorkUnits = Lists.newArrayList();
for (WorkUnit workUnit : workUnits) {
- if (workUnit instanceof MultiWorkUnit) {
+ if (workUnit.isMultiWorkUnit()) {
flattenedWorkUnits.addAll(flattenWorkUnits(((MultiWorkUnit)
workUnit).getWorkUnits()));
} else {
flattenedWorkUnits.add(workUnit);
@@ -123,6 +126,34 @@ public class JobLauncherUtils {
return flattenedWorkUnits;
}
+ /** @return flattened list of {@link WorkUnit}s loaded from `path`, which
may possibly hold a multi-work unit */
+ public static List<WorkUnit> loadFlattenedWorkUnits(FileSystem fs, Path
path) throws IOException {
+ WorkUnit workUnit = JobLauncherUtils.createEmptyWorkUnitPerExtension(path);
+ SerializationUtils.deserializeState(fs, path, workUnit);
+
+ if (workUnit.isMultiWorkUnit()) {
+ return JobLauncherUtils.flattenWorkUnits(((MultiWorkUnit)
workUnit).getWorkUnits());
+ } else {
+ return Lists.newArrayList(workUnit);
+ }
+ }
+
+ /** @return an empty {@link WorkUnit}, potentially an empty {@link
MultiWorkUnit}, based on the {@link Path} extension */
+ public static WorkUnit createEmptyWorkUnitPerExtension(Path p) {
+ return JobLauncherUtils.hasMultiWorkUnitExtension(p) ?
MultiWorkUnit.createEmpty() : WorkUnit.createEmpty();
+ }
+
+ /** @return whether {@link Path} ends with {@link
JobLauncherUtils#MULTI_WORK_UNIT_FILE_EXTENSION} */
+ public static boolean hasMultiWorkUnitExtension(Path p) {
+ return
p.getName().endsWith(JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION);
+ }
+
+ /** @return whether {@link Path} ends with {@link
JobLauncherUtils#MULTI_WORK_UNIT_FILE_EXTENSION} or {@link
JobLauncherUtils#WORK_UNIT_FILE_EXTENSION} */
+ public static boolean hasAnyWorkUnitExtension(Path p) {
+ return
p.getName().endsWith(JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION)
+ || p.getName().endsWith(JobLauncherUtils.WORK_UNIT_FILE_EXTENSION);
+ }
+
/**
* Cleanup the staging data for a list of Gobblin tasks. This method calls
the
* {@link #cleanTaskStagingData(State, Logger)} method.