Repository: beam
Updated Branches:
  refs/heads/master 2acdc747a -> 1974b920e


Stage the portable pipeline in Dataflow


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/aea0c601
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/aea0c601
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/aea0c601

Branch: refs/heads/master
Commit: aea0c6017dc2cef2e62216d0882c7cc89cb57732
Parents: 9b866fe
Author: Kenneth Knowles <[email protected]>
Authored: Tue Oct 10 20:11:47 2017 -0700
Committer: Kenneth Knowles <[email protected]>
Committed: Wed Oct 18 13:02:24 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   | 28 +++++++
 .../runners/dataflow/DataflowRunnerTest.java    | 79 ++++++++++++++++----
 2 files changed, 91 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/aea0c601/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 5e91850..6dbc4af 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -41,6 +41,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.net.URISyntaxException;
@@ -64,6 +65,7 @@ import 
org.apache.beam.runners.core.construction.DeduplicatedFlattenFactory;
 import org.apache.beam.runners.core.construction.EmptyFlattenAsCreateFactory;
 import org.apache.beam.runners.core.construction.PTransformMatchers;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
+import org.apache.beam.runners.core.construction.PipelineTranslation;
 import org.apache.beam.runners.core.construction.RehydratedComponents;
 import org.apache.beam.runners.core.construction.ReplacementOutputs;
 import 
org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
@@ -188,6 +190,14 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
   @VisibleForTesting
   static final int GCS_UPLOAD_BUFFER_SIZE_BYTES_DEFAULT = 1024 * 1024;
 
+  @VisibleForTesting
+  static final String PIPELINE_FILE_NAME = "pipeline";
+
+  @VisibleForTesting
+  static final String SERIALIZED_PROTOBUF_EXTENSION = ".pb";
+
+  private static final String STAGED_PIPELINE_METADATA_PROPERTY = 
"pipeline_url";
+
   private final Set<PCollection<?>> pcollectionsRequiringIndexedFormat;
 
   /**
@@ -516,6 +526,22 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
 
     List<DataflowPackage> packages = options.getStager().stageDefaultFiles();
 
+    RunnerApi.Pipeline protoPipeline = PipelineTranslation.toProto(pipeline);
+    File serializedProtoPipeline;
+    try {
+      serializedProtoPipeline =
+          File.createTempFile(PIPELINE_FILE_NAME, 
SERIALIZED_PROTOBUF_EXTENSION);
+      protoPipeline.writeDelimitedTo(new 
FileOutputStream(serializedProtoPipeline));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    LOG.info("Staging pipeline description to {}", 
options.getStagingLocation());
+    DataflowPackage stagedPipeline =
+        options
+            .getStager()
+            
.stageFiles(ImmutableList.of(serializedProtoPipeline.getAbsolutePath()))
+            .get(0);
 
     // Set a unique client_request_id in the CreateJob request.
     // This is used to ensure idempotence of job creation across retried
@@ -560,6 +586,8 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
     String workerHarnessContainerImage = getContainerImageForJob(options);
     for (WorkerPool workerPool : newJob.getEnvironment().getWorkerPools()) {
       workerPool.setWorkerHarnessContainerImage(workerHarnessContainerImage);
+      workerPool.setMetadata(
+          ImmutableMap.of(STAGED_PIPELINE_METADATA_PROPERTY, 
stagedPipeline.getLocation()));
     }
 
     newJob.getEnvironment().setVersion(getEnvironmentVersion(options));

http://git-wip-us.apache.org/repos/asf/beam/blob/aea0c601/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 0e3c266..5bc798a 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -45,6 +45,7 @@ import com.google.api.services.dataflow.Dataflow;
 import com.google.api.services.dataflow.model.DataflowPackage;
 import com.google.api.services.dataflow.model.Job;
 import com.google.api.services.dataflow.model.ListJobsResponse;
+import com.google.api.services.storage.model.StorageObject;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import java.io.File;
@@ -141,6 +142,7 @@ import org.mockito.stubbing.Answer;
 @RunWith(JUnit4.class)
 public class DataflowRunnerTest implements Serializable {
 
+  private static final String VALID_BUCKET = "valid-bucket";
   private static final String VALID_STAGING_BUCKET = 
"gs://valid-bucket/staging";
   private static final String VALID_TEMP_BUCKET = "gs://valid-bucket/temp";
   private static final String VALID_PROFILE_BUCKET = 
"gs://valid-bucket/profiles";
@@ -166,15 +168,33 @@ public class DataflowRunnerTest implements Serializable {
   @Before
   public void setUp() throws IOException {
     this.mockGcsUtil = mock(GcsUtil.class);
+
     when(mockGcsUtil.create(any(GcsPath.class), anyString()))
-        .then(new Answer<SeekableByteChannel>() {
-          @Override
-          public SeekableByteChannel answer(InvocationOnMock invocation) 
throws Throwable {
-            return FileChannel.open(
-                Files.createTempFile("channel-", ".tmp"),
-                StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE);
-          }
-        });
+        .then(
+            new Answer<SeekableByteChannel>() {
+              @Override
+              public SeekableByteChannel answer(InvocationOnMock invocation) 
throws Throwable {
+                return FileChannel.open(
+                    Files.createTempFile("channel-", ".tmp"),
+                    StandardOpenOption.CREATE,
+                    StandardOpenOption.WRITE,
+                    StandardOpenOption.DELETE_ON_CLOSE);
+              }
+            });
+
+    when(mockGcsUtil.create(any(GcsPath.class), anyString(), anyInt()))
+        .then(
+            new Answer<SeekableByteChannel>() {
+              @Override
+              public SeekableByteChannel answer(InvocationOnMock invocation) 
throws Throwable {
+                return FileChannel.open(
+                    Files.createTempFile("channel-", ".tmp"),
+                    StandardOpenOption.CREATE,
+                    StandardOpenOption.WRITE,
+                    StandardOpenOption.DELETE_ON_CLOSE);
+              }
+            });
+
     when(mockGcsUtil.expand(any(GcsPath.class))).then(new 
Answer<List<GcsPath>>() {
       @Override
       public List<GcsPath> answer(InvocationOnMock invocation) throws 
Throwable {
@@ -189,6 +209,30 @@ public class DataflowRunnerTest implements Serializable {
     
when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(VALID_PROFILE_BUCKET))).thenReturn(true);
     
when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(NON_EXISTENT_BUCKET))).thenReturn(false);
 
+    // Let every valid path be matched
+    when(mockGcsUtil.getObjects(anyListOf(GcsPath.class)))
+        .thenAnswer(
+            new Answer<List<GcsUtil.StorageObjectOrIOException>>() {
+              @Override
+              public List<GcsUtil.StorageObjectOrIOException> answer(
+                  InvocationOnMock invocationOnMock) throws Throwable {
+
+                List<GcsPath> gcsPaths = (List<GcsPath>) 
invocationOnMock.getArguments()[0];
+                List<GcsUtil.StorageObjectOrIOException> results = new 
ArrayList<>();
+
+                for (GcsPath gcsPath : gcsPaths) {
+                  if (gcsPath.getBucket().equals(VALID_BUCKET)) {
+                    StorageObject resultObject = new StorageObject();
+                    resultObject.setBucket(gcsPath.getBucket());
+                    resultObject.setName(gcsPath.getObject());
+                    
results.add(GcsUtil.StorageObjectOrIOException.create(resultObject));
+                  }
+                }
+
+                return results;
+              }
+            });
+
     // The dataflow pipeline attempts to output to this location.
     
when(mockGcsUtil.bucketAccessible(GcsPath.fromUri("gs://bucket/object"))).thenReturn(true);
 
@@ -524,14 +568,17 @@ public class DataflowRunnerTest implements Serializable {
     options.setGcpCredential(new TestCredential());
 
     when(mockGcsUtil.create(any(GcsPath.class), anyString(), anyInt()))
-        .then(new Answer<SeekableByteChannel>() {
-          @Override
-          public SeekableByteChannel answer(InvocationOnMock invocation) 
throws Throwable {
-            return FileChannel.open(
-                Files.createTempFile("channel-", ".tmp"),
-                StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE);
-          }
-        });
+        .then(
+            new Answer<SeekableByteChannel>() {
+              @Override
+              public SeekableByteChannel answer(InvocationOnMock invocation) 
throws Throwable {
+                return FileChannel.open(
+                    Files.createTempFile("channel-", ".tmp"),
+                    StandardOpenOption.CREATE,
+                    StandardOpenOption.WRITE,
+                    StandardOpenOption.DELETE_ON_CLOSE);
+              }
+            });
 
     Pipeline p = buildDataflowPipeline(options);
 

Reply via email to