This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new d0c712a34 [GOBBLIN-1821] Let flow execution ID propagate to the Job ID
if it exists (#3684)
d0c712a34 is described below
commit d0c712a34f0ba573a5ecb349584d66eda068b206
Author: William Lo <[email protected]>
AuthorDate: Mon Apr 24 13:48:10 2023 -0400
[GOBBLIN-1821] Let flow execution ID propagate to the Job ID if it exists
(#3684)
* Let flow execution ID propagate to the Job ID if it exists
* Address comment
---
.../src/main/java/org/apache/gobblin/cluster/HelixUtils.java | 4 +++-
.../main/java/org/apache/gobblin/util/JobLauncherUtils.java | 10 ++++++++++
2 files changed, 13 insertions(+), 1 deletion(-)
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index 54e5a8108..6efc6fff0 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -61,6 +61,7 @@ import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.listeners.JobListener;
import org.apache.gobblin.util.Id;
import org.apache.gobblin.util.JobLauncherUtils;
+import org.apache.gobblin.util.PropertiesUtils;
import static org.apache.helix.task.TaskState.STOPPED;
@@ -159,7 +160,8 @@ public class HelixUtils {
if (jobProps.containsKey(ConfigurationKeys.JOB_ID_KEY)) {
jobId = jobProps.getProperty(ConfigurationKeys.JOB_ID_KEY);
} else {
- jobId =
JobLauncherUtils.newJobId(JobState.getJobNameFromProps(jobProps));
+ jobId = JobLauncherUtils.newJobId(JobState.getJobNameFromProps(jobProps),
+ PropertiesUtils.getPropAsLong(jobProps,
ConfigurationKeys.FLOW_EXECUTION_ID_KEY, System.currentTimeMillis()));
jobProps.put(ConfigurationKeys.JOB_ID_KEY, jobId);
}
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
index 401d159be..42fae521e 100644
---
a/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
+++
b/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
@@ -68,6 +68,16 @@ public class JobLauncherUtils {
return Id.Job.create(jobName, System.currentTimeMillis()).toString();
}
+ /**
+ * Create a new job ID from a flow execution ID.
+ *
+ * @param jobName job name
+ * @return new job ID
+ */
+ public static String newJobId(String jobName, long executionId) {
+ return Id.Job.create(jobName, executionId).toString();
+ }
+
/**
* Create a new task ID for the job with the given job ID.
*