This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new bea009e60 [GOBBLIN-1857] Add override flag to force generate a job 
execution id based on gobbl… (#3719)
bea009e60 is described below

commit bea009e6035937643bfbed73b7b63455a02d790f
Author: William Lo <[email protected]>
AuthorDate: Thu Jul 20 17:18:36 2023 -0400

    [GOBBLIN-1857] Add override flag to force generate a job execution id based 
on gobbl… (#3719)
    
    * Add override flag to force generate a job execution id based on gobblin 
cluster system time
    
    * fix typo
---
 .../cluster/GobblinClusterConfigurationKeys.java   |  4 ++
 .../apache/gobblin/cluster/HelixJobsMapping.java   | 12 +++--
 .../cluster/GobblinHelixJobMappingTest.java        | 53 ++++++++++++++++++++++
 3 files changed, 64 insertions(+), 5 deletions(-)

diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index ef83ab029..a2e8a4b56 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -180,6 +180,10 @@ public class GobblinClusterConfigurationKeys {
   public static final String CANCEL_RUNNING_JOB_ON_DELETE = 
GOBBLIN_CLUSTER_PREFIX + "job.cancelRunningJobOnDelete";
   public static final String DEFAULT_CANCEL_RUNNING_JOB_ON_DELETE = "false";
 
+  // Job Execution ID for Helix jobs is inferred from Flow Execution IDs, but 
there are scenarios in earlyStop jobs where
+  // this behavior needs to be avoided due to concurrent planning and actual 
jobs sharing the same execution ID
+  public static final String USE_GENERATED_JOBEXECUTION_IDS = 
GOBBLIN_CLUSTER_PREFIX + "job.useGeneratedJobExecutionIds";
+
   // By default we cancel job by calling helix stop API. In some cases, jobs 
just hang in STOPPING state and preventing
   // new job being launched. We provide this config to give an option to 
cancel jobs by calling Delete API. Directly delete
   // a Helix workflow should be safe in Gobblin world, as Gobblin job is 
stateless for Helix since we implement our own state store
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java
index 8128aa0aa..b8fb436e4 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java
@@ -96,15 +96,17 @@ public class HelixJobsMapping {
   }
 
   public static String createPlanningJobId (Properties jobPlanningProps) {
+    long planningJobId = PropertiesUtils.getPropAsBoolean(jobPlanningProps, 
GobblinClusterConfigurationKeys.USE_GENERATED_JOBEXECUTION_IDS, "false") ?
+        System.currentTimeMillis() : 
PropertiesUtils.getPropAsLong(jobPlanningProps, 
ConfigurationKeys.FLOW_EXECUTION_ID_KEY, System.currentTimeMillis());
     return 
JobLauncherUtils.newJobId(GobblinClusterConfigurationKeys.PLANNING_JOB_NAME_PREFIX
-            + JobState.getJobNameFromProps(jobPlanningProps),
-        PropertiesUtils.getPropAsLong(jobPlanningProps, 
ConfigurationKeys.FLOW_EXECUTION_ID_KEY, System.currentTimeMillis()));
+            + JobState.getJobNameFromProps(jobPlanningProps), planningJobId);
   }
 
   public static String createActualJobId (Properties jobProps) {
-     return 
JobLauncherUtils.newJobId(GobblinClusterConfigurationKeys.ACTUAL_JOB_NAME_PREFIX
-             + JobState.getJobNameFromProps(jobProps),
-          PropertiesUtils.getPropAsLong(jobProps, 
ConfigurationKeys.FLOW_EXECUTION_ID_KEY, System.currentTimeMillis()));
+    long actualJobId = PropertiesUtils.getPropAsBoolean(jobProps, 
GobblinClusterConfigurationKeys.USE_GENERATED_JOBEXECUTION_IDS, "false") ?
+        System.currentTimeMillis() : PropertiesUtils.getPropAsLong(jobProps, 
ConfigurationKeys.FLOW_EXECUTION_ID_KEY, System.currentTimeMillis());
+    return 
JobLauncherUtils.newJobId(GobblinClusterConfigurationKeys.ACTUAL_JOB_NAME_PREFIX
+        + JobState.getJobNameFromProps(jobProps), actualJobId);
   }
 
   @Nullable
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobMappingTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobMappingTest.java
new file mode 100644
index 000000000..022a9fb8a
--- /dev/null
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobMappingTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.util.Properties;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+
+
+public class GobblinHelixJobMappingTest {
+
+  @Test
+  void testMapJobNameWithFlowExecutionId() {
+    Properties props = new Properties();
+    props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "1234");
+    props.setProperty(ConfigurationKeys.JOB_NAME_KEY, "job1");
+    String planningJobId = HelixJobsMapping.createPlanningJobId(props);
+    String actualJobId = HelixJobsMapping.createActualJobId(props);
+    Assert.assertEquals(planningJobId, "job_PlanningJobjob1_1234");
+    Assert.assertEquals(actualJobId, "job_ActualJobjob1_1234");
+  }
+
+  @Test
+  void testMapJobNameWithOverride() {
+    Properties props = new Properties();
+    
props.setProperty(GobblinClusterConfigurationKeys.USE_GENERATED_JOBEXECUTION_IDS,
 "true");
+    props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "1234");
+    props.setProperty(ConfigurationKeys.JOB_NAME_KEY, "job1");
+    String planningJobId = HelixJobsMapping.createPlanningJobId(props);
+    String actualJobId = HelixJobsMapping.createActualJobId(props);
+    // The jobID will be the system timestamp instead of the flow execution ID
+    Assert.assertNotEquals(planningJobId, "job_PlanningJobjob1_1234");
+    Assert.assertNotEquals(actualJobId, "job_ActualJobjob1_1234");
+  }
+}

Reply via email to