This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new edcdc8d4f [GOBBLIN-1653] Shorten job name length if it exceeds 255
characters (#3514)
edcdc8d4f is described below
commit edcdc8d4f764369d47bf5fd29e69b1a58c5a0f90
Author: William Lo <[email protected]>
AuthorDate: Wed Jun 1 11:31:32 2022 -0700
[GOBBLIN-1653] Shorten job name length if it exceeds 255 characters (#3514)
* Shorten job name length if it exceeds 255 characters (max size for a
directory component)
* Address review to account for flow name in hash
---
.../service/modules/spec/JobExecutionPlan.java | 10 ++++++++--
.../spec/JobExecutionPlanDagFactoryTest.java | 21 +++++++++++++++++++++
2 files changed, 29 insertions(+), 2 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index 79374af33..e04e7cb90 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -60,6 +60,7 @@ import static
org.apache.gobblin.runtime.AbstractJobLauncher.GOBBLIN_JOB_TEMPLAT
@EqualsAndHashCode(exclude = {"executionStatus", "currentAttempts",
"jobFuture", "flowStartTime"})
public class JobExecutionPlan {
public static final String JOB_MAX_ATTEMPTS = "job.maxAttempts";
+ private static final int MAX_JOB_NAME_LENGTH = 255;
private final JobSpec jobSpec;
private final SpecExecutor specExecutor;
@@ -108,8 +109,13 @@ public class JobExecutionPlan {
// Modify the job name to include the flow group, flow name, edge id,
and a random string to avoid collisions since
// job names are assumed to be unique within a dag.
- jobName = Joiner.on(JOB_NAME_COMPONENT_SEPARATION_CHAR).join(flowGroup,
flowName, jobName, edgeId, flowInputPath.hashCode());
-
+ int hash = flowInputPath.hashCode();
+ jobName = Joiner.on(JOB_NAME_COMPONENT_SEPARATION_CHAR).join(flowGroup,
flowName, jobName, edgeId, hash);
+ // jobNames are commonly used as a directory name, which is limited to
255 characters
+ if (jobName.length() >= MAX_JOB_NAME_LENGTH) {
+ // shorten job length to be 128 characters (flowGroup) + (hashed)
flowName, hashCode length
+ jobName =
Joiner.on(JOB_NAME_COMPONENT_SEPARATION_CHAR).join(flowGroup,
flowName.hashCode(), hash);
+ }
JobSpec.Builder jobSpecBuilder =
JobSpec.builder(jobSpecURIGenerator(flowGroup, jobName,
flowSpec)).withConfig(jobConfig)
.withDescription(flowSpec.getDescription()).withVersion(flowSpec.getVersion());
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
index cba53dc67..0958995a2 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactoryTest.java
@@ -204,4 +204,25 @@ public class JobExecutionPlanDagFactoryTest {
Assert.assertFalse(dag2.getStartNodes().get(0).getValue().getJobSpec().getConfig().getBoolean(ConfigurationKeys.GOBBLIN_OUTPUT_JOB_LEVEL_METRICS));
}
+ @Test
+ public void testCreateDagLongName() throws Exception {
+ // flowName and flowGroup are both 128 characters long, the maximum for
flowName and flowGroup
+ Config flowConfig =
ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY,
"uwXJwZPAPygvmSAfhtrzXL7ovIEKOBZdulBiNIGzaT7vILrK9QB5EDJj0fc4pkgNHuIKZ3d18TZzyH6a9HpaZACwpWpIpf8SYcSfKtXeoF8IJY064BqEUXR32k3ox31G")
+ .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY,
"4mdfSGSv6GoFW7ICWubN2ORK4s5PMTQ60yIWkcbJOVneTSPn12cXT5ueEgij907tjzLlbcjdVjWFITFf9Y5sB9i0EvKGmTbUF98hJGoQlAhmottaipDEFTdbyzt5Loxg")
+ .addPrimitive(ConfigurationKeys.JOB_SCHEDULE_KEY, "0/2 * * * *
?").build();
+
+ Config jobConfig = ConfigBuilder.create()
+ .addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY,
"source:destination:edgeName1")
+ .addPrimitive(ConfigurationKeys.JOB_SCHEDULE_KEY, "0/2 * * * *
?").build();
+
+ FlowSpec flowSpec =
FlowSpec.builder("testFlowSpec").withConfig(flowConfig).build();
+ JobExecutionPlan jobExecutionPlan = new
JobExecutionPlan.Factory().createPlan(flowSpec,
jobConfig.withValue(ConfigurationKeys.JOB_TEMPLATE_PATH,
+ ConfigValueFactory.fromAnyRef("testUri")), new
InMemorySpecExecutor(ConfigFactory.empty()), 0L, ConfigFactory.empty());
+
+ Dag<JobExecutionPlan> dag1 = new
JobExecutionPlanDagFactory().createDag(Arrays.asList(jobExecutionPlan));
+
+
Assert.assertEquals(dag1.getStartNodes().get(0).getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY).length(),
142);
+
+ }
+
}
\ No newline at end of file