This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new bea009e60 [GOBBLIN-1857] Add override flag to force generate a job
execution id based on gobbl… (#3719)
bea009e60 is described below
commit bea009e6035937643bfbed73b7b63455a02d790f
Author: William Lo <[email protected]>
AuthorDate: Thu Jul 20 17:18:36 2023 -0400
[GOBBLIN-1857] Add override flag to force generate a job execution id based
on gobbl… (#3719)
* Add override flag to force generate a job execution id based on gobblin
cluster system time
* fix typo
---
.../cluster/GobblinClusterConfigurationKeys.java | 4 ++
.../apache/gobblin/cluster/HelixJobsMapping.java | 12 +++--
.../cluster/GobblinHelixJobMappingTest.java | 53 ++++++++++++++++++++++
3 files changed, 64 insertions(+), 5 deletions(-)
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index ef83ab029..a2e8a4b56 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -180,6 +180,10 @@ public class GobblinClusterConfigurationKeys {
public static final String CANCEL_RUNNING_JOB_ON_DELETE =
GOBBLIN_CLUSTER_PREFIX + "job.cancelRunningJobOnDelete";
public static final String DEFAULT_CANCEL_RUNNING_JOB_ON_DELETE = "false";
+ // Job Execution ID for Helix jobs is inferred from Flow Execution IDs, but
there are scenarios in earlyStop jobs where
+ // this behavior needs to be avoided due to concurrent planning and actual
jobs sharing the same execution ID
+ public static final String USE_GENERATED_JOBEXECUTION_IDS =
GOBBLIN_CLUSTER_PREFIX + "job.useGeneratedJobExecutionIds";
+
// By default we cancel job by calling helix stop API. In some cases, jobs
just hang in STOPPING state and preventing
// new job being launched. We provide this config to give an option to
cancel jobs by calling Delete API. Directly delete
// a Helix workflow should be safe in Gobblin world, as Gobblin job is
stateless for Helix since we implement our own state store
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java
index 8128aa0aa..b8fb436e4 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java
@@ -96,15 +96,17 @@ public class HelixJobsMapping {
}
public static String createPlanningJobId (Properties jobPlanningProps) {
+ long planningJobId = PropertiesUtils.getPropAsBoolean(jobPlanningProps,
GobblinClusterConfigurationKeys.USE_GENERATED_JOBEXECUTION_IDS, "false") ?
+ System.currentTimeMillis() :
PropertiesUtils.getPropAsLong(jobPlanningProps,
ConfigurationKeys.FLOW_EXECUTION_ID_KEY, System.currentTimeMillis());
return
JobLauncherUtils.newJobId(GobblinClusterConfigurationKeys.PLANNING_JOB_NAME_PREFIX
- + JobState.getJobNameFromProps(jobPlanningProps),
- PropertiesUtils.getPropAsLong(jobPlanningProps,
ConfigurationKeys.FLOW_EXECUTION_ID_KEY, System.currentTimeMillis()));
+ + JobState.getJobNameFromProps(jobPlanningProps), planningJobId);
}
public static String createActualJobId (Properties jobProps) {
- return
JobLauncherUtils.newJobId(GobblinClusterConfigurationKeys.ACTUAL_JOB_NAME_PREFIX
- + JobState.getJobNameFromProps(jobProps),
- PropertiesUtils.getPropAsLong(jobProps,
ConfigurationKeys.FLOW_EXECUTION_ID_KEY, System.currentTimeMillis()));
+ long actualJobId = PropertiesUtils.getPropAsBoolean(jobProps,
GobblinClusterConfigurationKeys.USE_GENERATED_JOBEXECUTION_IDS, "false") ?
+ System.currentTimeMillis() : PropertiesUtils.getPropAsLong(jobProps,
ConfigurationKeys.FLOW_EXECUTION_ID_KEY, System.currentTimeMillis());
+ return
JobLauncherUtils.newJobId(GobblinClusterConfigurationKeys.ACTUAL_JOB_NAME_PREFIX
+ + JobState.getJobNameFromProps(jobProps), actualJobId);
}
@Nullable
diff --git
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobMappingTest.java
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobMappingTest.java
new file mode 100644
index 000000000..022a9fb8a
--- /dev/null
+++
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobMappingTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.util.Properties;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+
+
+public class GobblinHelixJobMappingTest {
+
+ @Test
+ void testMapJobNameWithFlowExecutionId() {
+ Properties props = new Properties();
+ props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "1234");
+ props.setProperty(ConfigurationKeys.JOB_NAME_KEY, "job1");
+ String planningJobId = HelixJobsMapping.createPlanningJobId(props);
+ String actualJobId = HelixJobsMapping.createActualJobId(props);
+ Assert.assertEquals(planningJobId, "job_PlanningJobjob1_1234");
+ Assert.assertEquals(actualJobId, "job_ActualJobjob1_1234");
+ }
+
+ @Test
+ void testMapJobNameWithOverride() {
+ Properties props = new Properties();
+
props.setProperty(GobblinClusterConfigurationKeys.USE_GENERATED_JOBEXECUTION_IDS,
"true");
+ props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "1234");
+ props.setProperty(ConfigurationKeys.JOB_NAME_KEY, "job1");
+ String planningJobId = HelixJobsMapping.createPlanningJobId(props);
+ String actualJobId = HelixJobsMapping.createActualJobId(props);
+ // The jobID will be the system timestamp instead of the flow execution ID
+ Assert.assertNotEquals(planningJobId, "job_PlanningJobjob1_1234");
+ Assert.assertNotEquals(actualJobId, "job_ActualJobjob1_1234");
+ }
+}