This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 99a1aae89 [GOBBLIN-1975] Keep job.name configuration immutable if
specified on GaaS (#3847)
99a1aae89 is described below
commit 99a1aae8940d1854d4de83d726013e4972d71c55
Author: William Lo <[email protected]>
AuthorDate: Wed Dec 13 21:23:26 2023 -0500
[GOBBLIN-1975] Keep job.name configuration immutable if specified on GaaS
(#3847)
* Revert "[GOBBLIN-1952] Make jobname shortening in GaaS more aggressive
(#3822)"
This reverts commit 5619a0a421143819360262e0b8ec29cfef9a4ba7.
* use configuration to keep specified jobname if enabled
* Cleanup
---
.../service/modules/spec/JobExecutionPlan.java | 22 ++++++++++---------
.../spec/JobExecutionPlanDagFactoryTest.java | 25 ++++++++++++++++++++--
2 files changed, 35 insertions(+), 12 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index 2e2dbb5ee..2d05c93af 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -61,7 +61,8 @@ import static
org.apache.gobblin.runtime.AbstractJobLauncher.GOBBLIN_JOB_TEMPLAT
public class JobExecutionPlan {
public static final String JOB_MAX_ATTEMPTS = "job.maxAttempts";
public static final String JOB_PROPS_KEY = "job.props";
- private static final int MAX_JOB_NAME_LENGTH = 128;
+ private static final int MAX_JOB_NAME_LENGTH = 255;
+ public static final String JOB_MAINTAIN_JOBNAME =
"job.service.maintain.jobnname";
private final JobSpec jobSpec;
private final SpecExecutor specExecutor;
@@ -108,18 +109,19 @@ public class JobExecutionPlan {
String jobName = ConfigUtils.getString(jobConfig,
ConfigurationKeys.JOB_NAME_KEY, "");
String edgeId = ConfigUtils.getString(jobConfig,
FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "");
- // Modify the job name to include the flow group, flow name, edge id,
and a random string to avoid collisions since
- // job names are assumed to be unique within a dag.
- int hash = flowInputPath.hashCode();
- jobName = Joiner.on(JOB_NAME_COMPONENT_SEPARATION_CHAR).join(flowGroup,
flowName, jobName, edgeId, hash);
- // jobNames are commonly used as a directory name, which is limited to
255 characters (account for potential prefixes added/file name lengths)
- if (jobName.length() >= MAX_JOB_NAME_LENGTH) {
- // shorten job length but make it uniquely identifiable in multihop
flows or concurrent jobs, max length 139 characters (128 flow group + hash)
- jobName =
Joiner.on(JOB_NAME_COMPONENT_SEPARATION_CHAR).join(flowGroup,
jobName.hashCode());
+ if (!ConfigUtils.getBoolean(jobConfig, JOB_MAINTAIN_JOBNAME, false) ||
jobName.isEmpty()) {
+ // Modify the job name to include the flow group, flow name, edge id,
and a random string to avoid collisions since
+ // job names are assumed to be unique within a dag.
+ int hash = flowInputPath.hashCode();
+ jobName =
Joiner.on(JOB_NAME_COMPONENT_SEPARATION_CHAR).join(flowGroup, flowName,
jobName, edgeId, hash);
+ // jobNames are commonly used as a directory name, which is limited to
255 characters
+ if (jobName.length() >= MAX_JOB_NAME_LENGTH) {
+ // shorten job length to be 128 characters (flowGroup) + (hashed)
flowName, hashCode length
+ jobName =
Joiner.on(JOB_NAME_COMPONENT_SEPARATION_CHAR).join(flowGroup,
flowName.hashCode(), hash);
+ }
}
JobSpec.Builder jobSpecBuilder =
JobSpec.builder(jobSpecURIGenerator(flowGroup, jobName,
flowSpec)).withConfig(jobConfig)
.withDescription(flowSpec.getDescription()).withVersion(flowSpec.getVersion());
-
//Get job template uri
URI jobTemplateUri = new
URI(jobConfig.getString(ConfigurationKeys.JOB_TEMPLATE_PATH));
JobSpec jobSpec = jobSpecBuilder.withTemplate(jobTemplateUri).build();
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
index a97b04d24..ad11d1399 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
@@ -232,8 +232,7 @@ public class JobExecutionPlanDagFactoryTest {
Dag<JobExecutionPlan> dag1 = new
JobExecutionPlanDagFactory().createDag(Arrays.asList(jobExecutionPlan));
-
Assert.assertEquals(dag1.getStartNodes().get(0).getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY).length(),
139);
-
+
Assert.assertEquals(dag1.getStartNodes().get(0).getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY).length(),
142);
}
@Test
@@ -258,4 +257,26 @@ public class JobExecutionPlanDagFactoryTest {
Assert.assertEquals(dag1.getStartNodes().get(0).getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_EDGE_ID_KEY),
"source:destination:edgeName1");
}
+ @Test
+ public void testMaintainJobNameInDag() throws Exception {
+ // flowName and flowGroup are both 128 characters long, the maximum for
flowName and flowGroup
+ Config flowConfig =
ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY,
"uwXJwZPAPygvmSAfhtrzXL7ovIEKOBZdulBiNIGzaT7vILrK9QB5EDJj0fc4pkgNHuIKZ3d18TZzyH6a9HpaZACwpWpIpf8SYcSfKtXeoF8IJY064BqEUXR32k3ox31G")
+ .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY,
"4mdfSGSv6GoFW7ICWubN2ORK4s5PMTQ60yIWkcbJOVneTSPn12cXT5ueEgij907tjzLlbcjdVjWFITFf9Y5sB9i0EvKGmTbUF98hJGoQlAhmottaipDEFTdbyzt5Loxg")
+ .addPrimitive(ConfigurationKeys.JOB_SCHEDULE_KEY, "0/2 * * * *
?").build();
+
+ Config jobConfig = ConfigBuilder.create()
+ .addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY,
"source:destination:edgeName1")
+ .addPrimitive(ConfigurationKeys.JOB_SCHEDULE_KEY, "0/2 * * * * ?")
+ .addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "jobNameTest")
+ .addPrimitive(JobExecutionPlan.JOB_MAINTAIN_JOBNAME, "true").build();
+
+ FlowSpec flowSpec =
FlowSpec.builder("testFlowSpec").withConfig(flowConfig).build();
+ JobExecutionPlan jobExecutionPlan = new
JobExecutionPlan.Factory().createPlan(flowSpec,
jobConfig.withValue(ConfigurationKeys.JOB_TEMPLATE_PATH,
+ ConfigValueFactory.fromAnyRef("testUri")), new
InMemorySpecExecutor(ConfigFactory.empty()), 0L, ConfigFactory.empty());
+
+ Dag<JobExecutionPlan> dag1 = new
JobExecutionPlanDagFactory().createDag(Arrays.asList(jobExecutionPlan));
+
+
Assert.assertEquals(dag1.getStartNodes().get(0).getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY),"jobNameTest");
+ }
+
}
\ No newline at end of file