This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 208742617 Fix for job id tracking where configuration is unnecessary
(#3729)
208742617 is described below
commit 208742617cce8d6927145ce770028176d1b02bee
Author: William Lo <[email protected]>
AuthorDate: Wed Aug 2 16:31:12 2023 -0400
Fix for job id tracking where configuration is unnecessary (#3729)
---
.../java/org/apache/gobblin/cluster/HelixJobsMapping.java | 13 ++++++-------
.../gobblin/cluster/GobblinHelixJobMappingTest.java | 15 +++++++--------
2 files changed, 13 insertions(+), 15 deletions(-)
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java
index b8fb436e4..935b18d0c 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java
@@ -36,7 +36,6 @@ import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.JobLauncherUtils;
-import org.apache.gobblin.util.PropertiesUtils;
/**
@@ -96,17 +95,17 @@ public class HelixJobsMapping {
}
public static String createPlanningJobId (Properties jobPlanningProps) {
- long planningJobId = PropertiesUtils.getPropAsBoolean(jobPlanningProps,
GobblinClusterConfigurationKeys.USE_GENERATED_JOBEXECUTION_IDS, "false") ?
- System.currentTimeMillis() :
PropertiesUtils.getPropAsLong(jobPlanningProps,
ConfigurationKeys.FLOW_EXECUTION_ID_KEY, System.currentTimeMillis());
+ String flowExecIdSuffix =
jobPlanningProps.containsKey(ConfigurationKeys.FLOW_EXECUTION_ID_KEY) ?
+ "_" +
jobPlanningProps.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY) : "";
return
JobLauncherUtils.newJobId(GobblinClusterConfigurationKeys.PLANNING_JOB_NAME_PREFIX
- + JobState.getJobNameFromProps(jobPlanningProps), planningJobId);
+ + JobState.getJobNameFromProps(jobPlanningProps) +
flowExecIdSuffix);
}
public static String createActualJobId (Properties jobProps) {
- long actualJobId = PropertiesUtils.getPropAsBoolean(jobProps,
GobblinClusterConfigurationKeys.USE_GENERATED_JOBEXECUTION_IDS, "false") ?
- System.currentTimeMillis() : PropertiesUtils.getPropAsLong(jobProps,
ConfigurationKeys.FLOW_EXECUTION_ID_KEY, System.currentTimeMillis());
+ String flowExecIdSuffix =
jobProps.containsKey(ConfigurationKeys.FLOW_EXECUTION_ID_KEY) ?
+ "_" + jobProps.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY) :
"";
return
JobLauncherUtils.newJobId(GobblinClusterConfigurationKeys.ACTUAL_JOB_NAME_PREFIX
- + JobState.getJobNameFromProps(jobProps), actualJobId);
+ + JobState.getJobNameFromProps(jobProps) + flowExecIdSuffix);
}
@Nullable
diff --git
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobMappingTest.java
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobMappingTest.java
index 022a9fb8a..2070422b0 100644
---
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobMappingTest.java
+++
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobMappingTest.java
@@ -34,20 +34,19 @@ public class GobblinHelixJobMappingTest {
props.setProperty(ConfigurationKeys.JOB_NAME_KEY, "job1");
String planningJobId = HelixJobsMapping.createPlanningJobId(props);
String actualJobId = HelixJobsMapping.createActualJobId(props);
- Assert.assertEquals(planningJobId, "job_PlanningJobjob1_1234");
- Assert.assertEquals(actualJobId, "job_ActualJobjob1_1234");
+ // The jobID contains the system timestamp that we need to parse out
+ Assert.assertEquals(planningJobId.substring(0,
planningJobId.lastIndexOf("_")), "job_PlanningJobjob1_1234");
+ Assert.assertEquals(actualJobId.substring(0,
actualJobId.lastIndexOf("_")), "job_ActualJobjob1_1234");
}
@Test
- void testMapJobNameWithOverride() {
+ void testMapJobNameWithoutFlowExecutionId() {
Properties props = new Properties();
-
props.setProperty(GobblinClusterConfigurationKeys.USE_GENERATED_JOBEXECUTION_IDS,
"true");
- props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "1234");
props.setProperty(ConfigurationKeys.JOB_NAME_KEY, "job1");
String planningJobId = HelixJobsMapping.createPlanningJobId(props);
String actualJobId = HelixJobsMapping.createActualJobId(props);
- // The jobID will be the system timestamp instead of the flow execution ID
- Assert.assertNotEquals(planningJobId, "job_PlanningJobjob1_1234");
- Assert.assertNotEquals(actualJobId, "job_ActualJobjob1_1234");
+ // The jobID contains the system timestamp that we need to parse out
+ Assert.assertEquals(planningJobId.substring(0,
planningJobId.lastIndexOf("_")), "job_PlanningJobjob1");
+ Assert.assertEquals(actualJobId.substring(0,
actualJobId.lastIndexOf("_")), "job_ActualJobjob1");
}
}