Repository: incubator-beam Updated Branches: refs/heads/master 7465edb73 -> 082b7a1bc
Enforce JobName Preconditions in the Dataflow Runner This ensures that whenever a DataflowPipelineRunner is created, the jobName of the created options conforms to the requirements of the Dataflow Service. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d148a877 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d148a877 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d148a877 Branch: refs/heads/master Commit: d148a877b175204bce711a903b7c758147e8a6be Parents: 7465edb Author: Thomas Groh <[email protected]> Authored: Fri Apr 15 10:05:36 2016 -0700 Committer: Thomas Groh <[email protected]> Committed: Fri Apr 22 08:48:04 2016 -0700 ---------------------------------------------------------------------- .../sdk/options/DataflowPipelineOptions.java | 7 +++- .../sdk/runners/DataflowPipelineRunner.java | 19 +++++++-- .../BlockingDataflowPipelineRunnerTest.java | 2 +- .../sdk/runners/DataflowPipelineRunnerTest.java | 43 ++++++++++++++++---- 4 files changed, 58 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d148a877/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineOptions.java index 02210fe..7f9d189 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineOptions.java @@ -67,8 +67,11 @@ public interface DataflowPipelineOptions * name will not be able to be created. Defaults to using the ApplicationName-UserName-Date. */ @Description("The Dataflow job name is used as an idempotence key within the Dataflow service. " - + "If there is an existing job that is currently active, another active job with the same " - + "name will not be able to be created. Defaults to using the ApplicationName-UserName-Date.") + + "For each running job in the same GCP project, jobs with the same name cannot be created " + + "unless the new job is an explicit update of the previous one. Defaults to using " + + "ApplicationName-UserName-Date. The job name must match the regular expression " + + "'[a-z]([-a-z0-9]{0,38}[a-z0-9])?'. The runner will automatically truncate the name of the " + + "job and convert to lower case.") @Default.InstanceFactory(JobNameFactory.class) String getJobName(); void setJobName(String value); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d148a877/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineRunner.java index c147f02..3e6132a 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineRunner.java @@ -286,13 +286,26 @@ public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob> LOG.debug("Classpath elements: {}", dataflowOptions.getFilesToStage()); } - // Verify jobName according to service requirements. - String jobName = dataflowOptions.getJobName().toLowerCase(); - Preconditions.checkArgument( + // Verify jobName according to service requirements, truncating converting to lowercase if + // necessary. + String jobName = + dataflowOptions + .getJobName() + .substring(0, Math.min(dataflowOptions.getJobName().length(), 40)) + .toLowerCase(); + checkArgument( jobName.matches("[a-z]([-a-z0-9]*[a-z0-9])?"), "JobName invalid; the name must consist of only the characters " + "[-a-z0-9], starting with a letter and ending with a letter " + "or number"); + if (!jobName.equals(dataflowOptions.getJobName())) { + LOG.info( + "PipelineOptions.jobName did not match the service requirements. " + + "Using {} instead of {}.", + jobName, + dataflowOptions.getJobName()); + } + dataflowOptions.setJobName(jobName); // Verify project String project = dataflowOptions.getProject(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d148a877/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunnerTest.java index ae504ed..13e120b 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunnerTest.java @@ -296,7 +296,7 @@ public class BlockingDataflowPipelineRunnerTest { options.setTempLocation("gs://test/temp/location"); options.setGcpCredential(new TestCredential()); options.setPathValidatorClass(NoopPathValidator.class); - assertEquals("BlockingDataflowPipelineRunner#TestJobName", + assertEquals("BlockingDataflowPipelineRunner#testjobname", BlockingDataflowPipelineRunner.fromOptions(options).toString()); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d148a877/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRunnerTest.java index 8b5cbdb..412eccb 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRunnerTest.java @@ -21,6 +21,7 @@ import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.startsWith; @@ -124,6 +125,7 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.regex.Pattern; /** * Tests for the {@link DataflowPipelineRunner}. @@ -142,6 +144,7 @@ public class DataflowPipelineRunnerTest { private static void assertValidJob(Job job) { assertNull(job.getId()); assertNull(job.getCurrentState()); + assertTrue(Pattern.matches("[a-z]([-a-z0-9]{0,38}[a-z0-9])?", job.getName())); } private Pipeline buildDataflowPipeline(DataflowPipelineOptions options) { @@ -171,11 +174,14 @@ public class DataflowPipelineRunnerTest { when(mockJobs.list(eq(PROJECT_ID))).thenReturn(mockList); when(mockList.setPageToken(anyString())).thenReturn(mockList); when(mockList.execute()) - .thenReturn(new ListJobsResponse().setJobs( - Arrays.asList(new Job() - .setName("oldJobName") - .setId("oldJobId") - .setCurrentState("JOB_STATE_RUNNING")))); + .thenReturn( + new ListJobsResponse() + .setJobs( + Arrays.asList( + new Job() + .setName("oldjobname") + .setId("oldJobId") + .setCurrentState("JOB_STATE_RUNNING")))); Job resultJob = new Job(); resultJob.setId("newid"); @@ -226,6 +232,28 @@ public class DataflowPipelineRunnerTest { } @Test + public void testFromOptionsWithLongNameTruncates() throws Exception { + String longName = "thisnameisreallyquitelonganddoneinordertoforcetruncation"; + ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); + DataflowPipelineOptions options = buildPipelineOptions(jobCaptor); + options.setJobName(longName); + + DataflowPipelineRunner runner = DataflowPipelineRunner.fromOptions(options); + assertThat(options.getJobName(), equalTo(longName.substring(0, 40))); + } + + @Test + public void testFromOptionsWithUppercaseConvertsToLowercase() throws Exception { + String mixedCase = "ThisJobNameHasMixedCase"; + ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); + DataflowPipelineOptions options = buildPipelineOptions(jobCaptor); + options.setJobName(mixedCase); + + DataflowPipelineRunner runner = DataflowPipelineRunner.fromOptions(options); + assertThat(options.getJobName(), equalTo(mixedCase.toLowerCase())); + } + + @Test public void testRun() throws IOException { ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); @@ -277,7 +305,7 @@ public class DataflowPipelineRunnerTest { @Test public void testUpdateNonExistentPipeline() throws IOException { thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Could not find running job named badJobName"); + thrown.expectMessage("Could not find running job named badjobname"); DataflowPipelineOptions options = buildPipelineOptions(); options.setUpdate(true); @@ -861,7 +889,8 @@ public class DataflowPipelineRunnerTest { options.setTempLocation("gs://test/temp/location"); options.setGcpCredential(new TestCredential()); options.setPathValidatorClass(NoopPathValidator.class); - assertEquals("DataflowPipelineRunner#TestJobName", + assertEquals( + "DataflowPipelineRunner#testjobname", DataflowPipelineRunner.fromOptions(options).toString()); }
