This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 99a1aae89 [GOBBLIN-1975] Keep job.name configuration immutable if 
specified on GaaS (#3847)
99a1aae89 is described below

commit 99a1aae8940d1854d4de83d726013e4972d71c55
Author: William Lo <[email protected]>
AuthorDate: Wed Dec 13 21:23:26 2023 -0500

    [GOBBLIN-1975] Keep job.name configuration immutable if specified on GaaS 
(#3847)
    
    * Revert "[GOBBLIN-1952] Make jobname shortening in GaaS more aggressive 
(#3822)"
    
    This reverts commit 5619a0a421143819360262e0b8ec29cfef9a4ba7.
    
    * use configuration to keep specified jobname if enabled
    
    * Cleanup
---
 .../service/modules/spec/JobExecutionPlan.java     | 22 ++++++++++---------
 .../spec/JobExecutionPlanDagFactoryTest.java       | 25 ++++++++++++++++++++--
 2 files changed, 35 insertions(+), 12 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index 2e2dbb5ee..2d05c93af 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -61,7 +61,8 @@ import static 
org.apache.gobblin.runtime.AbstractJobLauncher.GOBBLIN_JOB_TEMPLAT
 public class JobExecutionPlan {
   public static final String JOB_MAX_ATTEMPTS = "job.maxAttempts";
   public static final String JOB_PROPS_KEY = "job.props";
-  private static final int MAX_JOB_NAME_LENGTH = 128;
+  private static final int MAX_JOB_NAME_LENGTH = 255;
+  public static final String JOB_MAINTAIN_JOBNAME = 
"job.service.maintain.jobnname";
 
   private final JobSpec jobSpec;
   private final SpecExecutor specExecutor;
@@ -108,18 +109,19 @@ public class JobExecutionPlan {
 
       String jobName = ConfigUtils.getString(jobConfig, 
ConfigurationKeys.JOB_NAME_KEY, "");
       String edgeId = ConfigUtils.getString(jobConfig, 
FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "");
-      // Modify the job name to include the flow group, flow name, edge id, 
and a random string to avoid collisions since
-      // job names are assumed to be unique within a dag.
-      int hash = flowInputPath.hashCode();
-      jobName = Joiner.on(JOB_NAME_COMPONENT_SEPARATION_CHAR).join(flowGroup, 
flowName, jobName, edgeId, hash);
-      // jobNames are commonly used as a directory name, which is limited to 
255 characters (account for potential prefixes added/file name lengths)
-      if (jobName.length() >= MAX_JOB_NAME_LENGTH) {
-        // shorten job length but make it uniquely identifiable in multihop 
flows or concurrent jobs, max length 139 characters (128 flow group  + hash)
-        jobName = 
Joiner.on(JOB_NAME_COMPONENT_SEPARATION_CHAR).join(flowGroup, 
jobName.hashCode());
+      if (!ConfigUtils.getBoolean(jobConfig, JOB_MAINTAIN_JOBNAME, false) || 
jobName.isEmpty()) {
+        // Modify the job name to include the flow group, flow name, edge id, 
and a random string to avoid collisions since
+        // job names are assumed to be unique within a dag.
+        int hash = flowInputPath.hashCode();
+        jobName = 
Joiner.on(JOB_NAME_COMPONENT_SEPARATION_CHAR).join(flowGroup, flowName, 
jobName, edgeId, hash);
+        // jobNames are commonly used as a directory name, which is limited to 
255 characters
+        if (jobName.length() >= MAX_JOB_NAME_LENGTH) {
+          // shorten job length to be 128 characters (flowGroup) + (hashed) 
flowName, hashCode length
+          jobName = 
Joiner.on(JOB_NAME_COMPONENT_SEPARATION_CHAR).join(flowGroup, 
flowName.hashCode(), hash);
+        }
       }
       JobSpec.Builder jobSpecBuilder = 
JobSpec.builder(jobSpecURIGenerator(flowGroup, jobName, 
flowSpec)).withConfig(jobConfig)
           
.withDescription(flowSpec.getDescription()).withVersion(flowSpec.getVersion());
-
       //Get job template uri
       URI jobTemplateUri = new 
URI(jobConfig.getString(ConfigurationKeys.JOB_TEMPLATE_PATH));
       JobSpec jobSpec = jobSpecBuilder.withTemplate(jobTemplateUri).build();
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
index a97b04d24..ad11d1399 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
@@ -232,8 +232,7 @@ public class JobExecutionPlanDagFactoryTest {
 
     Dag<JobExecutionPlan> dag1 = new 
JobExecutionPlanDagFactory().createDag(Arrays.asList(jobExecutionPlan));
 
-    
Assert.assertEquals(dag1.getStartNodes().get(0).getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY).length(),
 139);
-
+    
Assert.assertEquals(dag1.getStartNodes().get(0).getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY).length(),
 142);
   }
 
   @Test
@@ -258,4 +257,26 @@ public class JobExecutionPlanDagFactoryTest {
     
Assert.assertEquals(dag1.getStartNodes().get(0).getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_EDGE_ID_KEY),
 "source:destination:edgeName1");
   }
 
+  @Test
+  public void testMaintainJobNameInDag() throws Exception {
+    // flowName and flowGroup are both 128 characters long, the maximum for 
flowName and flowGroup
+    Config flowConfig = 
ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, 
"uwXJwZPAPygvmSAfhtrzXL7ovIEKOBZdulBiNIGzaT7vILrK9QB5EDJj0fc4pkgNHuIKZ3d18TZzyH6a9HpaZACwpWpIpf8SYcSfKtXeoF8IJY064BqEUXR32k3ox31G")
+        .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, 
"4mdfSGSv6GoFW7ICWubN2ORK4s5PMTQ60yIWkcbJOVneTSPn12cXT5ueEgij907tjzLlbcjdVjWFITFf9Y5sB9i0EvKGmTbUF98hJGoQlAhmottaipDEFTdbyzt5Loxg")
+        .addPrimitive(ConfigurationKeys.JOB_SCHEDULE_KEY, "0/2 * * * * 
?").build();
+
+    Config jobConfig = ConfigBuilder.create()
+        .addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, 
"source:destination:edgeName1")
+        .addPrimitive(ConfigurationKeys.JOB_SCHEDULE_KEY, "0/2 * * * * ?")
+        .addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "jobNameTest")
+        .addPrimitive(JobExecutionPlan.JOB_MAINTAIN_JOBNAME, "true").build();
+
+    FlowSpec flowSpec = 
FlowSpec.builder("testFlowSpec").withConfig(flowConfig).build();
+    JobExecutionPlan jobExecutionPlan = new 
JobExecutionPlan.Factory().createPlan(flowSpec, 
jobConfig.withValue(ConfigurationKeys.JOB_TEMPLATE_PATH,
+        ConfigValueFactory.fromAnyRef("testUri")), new 
InMemorySpecExecutor(ConfigFactory.empty()), 0L, ConfigFactory.empty());
+
+    Dag<JobExecutionPlan> dag1 = new 
JobExecutionPlanDagFactory().createDag(Arrays.asList(jobExecutionPlan));
+
+    
Assert.assertEquals(dag1.getStartNodes().get(0).getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY),"jobNameTest");
+  }
+
 }
\ No newline at end of file

Reply via email to