Repository: incubator-beam
Updated Branches:
  refs/heads/master 7465edb73 -> 082b7a1bc


Enforce JobName Preconditions in the Dataflow Runner

This ensures that whenever a DataflowPipelineRunner is created, the
jobName of the created options conforms to the requirements of the
Dataflow Service.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d148a877
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d148a877
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d148a877

Branch: refs/heads/master
Commit: d148a877b175204bce711a903b7c758147e8a6be
Parents: 7465edb
Author: Thomas Groh <[email protected]>
Authored: Fri Apr 15 10:05:36 2016 -0700
Committer: Thomas Groh <[email protected]>
Committed: Fri Apr 22 08:48:04 2016 -0700

----------------------------------------------------------------------
 .../sdk/options/DataflowPipelineOptions.java    |  7 +++-
 .../sdk/runners/DataflowPipelineRunner.java     | 19 +++++++--
 .../BlockingDataflowPipelineRunnerTest.java     |  2 +-
 .../sdk/runners/DataflowPipelineRunnerTest.java | 43 ++++++++++++++++----
 4 files changed, 58 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d148a877/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineOptions.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineOptions.java
index 02210fe..7f9d189 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineOptions.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineOptions.java
@@ -67,8 +67,11 @@ public interface DataflowPipelineOptions
    * name will not be able to be created. Defaults to using the 
ApplicationName-UserName-Date.
    */
   @Description("The Dataflow job name is used as an idempotence key within the 
Dataflow service. "
-      + "If there is an existing job that is currently active, another active 
job with the same "
-      + "name will not be able to be created. Defaults to using the 
ApplicationName-UserName-Date.")
+      + "For each running job in the same GCP project, jobs with the same name 
cannot be created "
+      + "unless the new job is an explicit update of the previous one. 
Defaults to using "
+      + "ApplicationName-UserName-Date. The job name must match the regular 
expression "
+      + "'[a-z]([-a-z0-9]{0,38}[a-z0-9])?'. The runner will automatically 
truncate the name of the "
+      + "job and convert to lower case.")
   @Default.InstanceFactory(JobNameFactory.class)
   String getJobName();
   void setJobName(String value);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d148a877/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineRunner.java
index c147f02..3e6132a 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineRunner.java
@@ -286,13 +286,26 @@ public class DataflowPipelineRunner extends 
PipelineRunner<DataflowPipelineJob>
       LOG.debug("Classpath elements: {}", dataflowOptions.getFilesToStage());
     }
 
-    // Verify jobName according to service requirements.
-    String jobName = dataflowOptions.getJobName().toLowerCase();
-    Preconditions.checkArgument(
+    // Verify jobName according to service requirements, truncating converting 
to lowercase if
+    // necessary.
+    String jobName =
+        dataflowOptions
+            .getJobName()
+            .substring(0, Math.min(dataflowOptions.getJobName().length(), 40))
+            .toLowerCase();
+    checkArgument(
         jobName.matches("[a-z]([-a-z0-9]*[a-z0-9])?"),
         "JobName invalid; the name must consist of only the characters "
             + "[-a-z0-9], starting with a letter and ending with a letter "
             + "or number");
+    if (!jobName.equals(dataflowOptions.getJobName())) {
+      LOG.info(
+          "PipelineOptions.jobName did not match the service requirements. "
+              + "Using {} instead of {}.",
+          jobName,
+          dataflowOptions.getJobName());
+    }
+    dataflowOptions.setJobName(jobName);
 
     // Verify project
     String project = dataflowOptions.getProject();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d148a877/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunnerTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunnerTest.java
index ae504ed..13e120b 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunnerTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/BlockingDataflowPipelineRunnerTest.java
@@ -296,7 +296,7 @@ public class BlockingDataflowPipelineRunnerTest {
     options.setTempLocation("gs://test/temp/location");
     options.setGcpCredential(new TestCredential());
     options.setPathValidatorClass(NoopPathValidator.class);
-    assertEquals("BlockingDataflowPipelineRunner#TestJobName",
+    assertEquals("BlockingDataflowPipelineRunner#testjobname",
         BlockingDataflowPipelineRunner.fromOptions(options).toString());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d148a877/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRunnerTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRunnerTest.java
index 8b5cbdb..412eccb 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRunnerTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRunnerTest.java
@@ -21,6 +21,7 @@ import static 
org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.startsWith;
@@ -124,6 +125,7 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.regex.Pattern;
 
 /**
  * Tests for the {@link DataflowPipelineRunner}.
@@ -142,6 +144,7 @@ public class DataflowPipelineRunnerTest {
   private static void assertValidJob(Job job) {
     assertNull(job.getId());
     assertNull(job.getCurrentState());
+    assertTrue(Pattern.matches("[a-z]([-a-z0-9]{0,38}[a-z0-9])?", 
job.getName()));
   }
 
   private Pipeline buildDataflowPipeline(DataflowPipelineOptions options) {
@@ -171,11 +174,14 @@ public class DataflowPipelineRunnerTest {
     when(mockJobs.list(eq(PROJECT_ID))).thenReturn(mockList);
     when(mockList.setPageToken(anyString())).thenReturn(mockList);
     when(mockList.execute())
-        .thenReturn(new ListJobsResponse().setJobs(
-            Arrays.asList(new Job()
-                              .setName("oldJobName")
-                              .setId("oldJobId")
-                              .setCurrentState("JOB_STATE_RUNNING"))));
+        .thenReturn(
+            new ListJobsResponse()
+                .setJobs(
+                    Arrays.asList(
+                        new Job()
+                            .setName("oldjobname")
+                            .setId("oldJobId")
+                            .setCurrentState("JOB_STATE_RUNNING"))));
 
     Job resultJob = new Job();
     resultJob.setId("newid");
@@ -226,6 +232,28 @@ public class DataflowPipelineRunnerTest {
   }
 
   @Test
+  public void testFromOptionsWithLongNameTruncates() throws Exception {
+    String longName = 
"thisnameisreallyquitelonganddoneinordertoforcetruncation";
+    ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
+    DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
+    options.setJobName(longName);
+
+    DataflowPipelineRunner runner = 
DataflowPipelineRunner.fromOptions(options);
+    assertThat(options.getJobName(), equalTo(longName.substring(0, 40)));
+  }
+
+  @Test
+  public void testFromOptionsWithUppercaseConvertsToLowercase() throws 
Exception {
+    String mixedCase = "ThisJobNameHasMixedCase";
+    ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
+    DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
+    options.setJobName(mixedCase);
+
+    DataflowPipelineRunner runner = 
DataflowPipelineRunner.fromOptions(options);
+    assertThat(options.getJobName(), equalTo(mixedCase.toLowerCase()));
+  }
+
+  @Test
   public void testRun() throws IOException {
     ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
 
@@ -277,7 +305,7 @@ public class DataflowPipelineRunnerTest {
   @Test
   public void testUpdateNonExistentPipeline() throws IOException {
     thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Could not find running job named badJobName");
+    thrown.expectMessage("Could not find running job named badjobname");
 
     DataflowPipelineOptions options = buildPipelineOptions();
     options.setUpdate(true);
@@ -861,7 +889,8 @@ public class DataflowPipelineRunnerTest {
     options.setTempLocation("gs://test/temp/location");
     options.setGcpCredential(new TestCredential());
     options.setPathValidatorClass(NoopPathValidator.class);
-    assertEquals("DataflowPipelineRunner#TestJobName",
+    assertEquals(
+        "DataflowPipelineRunner#testjobname",
         DataflowPipelineRunner.fromOptions(options).toString());
   }
 

Reply via email to