Repository: beam Updated Branches: refs/heads/master 2acdc747a -> 1974b920e
Stage the portable pipeline in Dataflow Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/aea0c601 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/aea0c601 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/aea0c601 Branch: refs/heads/master Commit: aea0c6017dc2cef2e62216d0882c7cc89cb57732 Parents: 9b866fe Author: Kenneth Knowles <[email protected]> Authored: Tue Oct 10 20:11:47 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Wed Oct 18 13:02:24 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/dataflow/DataflowRunner.java | 28 +++++++ .../runners/dataflow/DataflowRunnerTest.java | 79 ++++++++++++++++---- 2 files changed, 91 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/aea0c601/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 5e91850..6dbc4af 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -41,6 +41,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.io.PrintWriter; import java.net.URISyntaxException; @@ -64,6 +65,7 @@ import org.apache.beam.runners.core.construction.DeduplicatedFlattenFactory; import org.apache.beam.runners.core.construction.EmptyFlattenAsCreateFactory; import org.apache.beam.runners.core.construction.PTransformMatchers; import org.apache.beam.runners.core.construction.PTransformReplacements; +import org.apache.beam.runners.core.construction.PipelineTranslation; import org.apache.beam.runners.core.construction.RehydratedComponents; import org.apache.beam.runners.core.construction.ReplacementOutputs; import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; @@ -188,6 +190,14 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { @VisibleForTesting static final int GCS_UPLOAD_BUFFER_SIZE_BYTES_DEFAULT = 1024 * 1024; + @VisibleForTesting + static final String PIPELINE_FILE_NAME = "pipeline"; + + @VisibleForTesting + static final String SERIALIZED_PROTOBUF_EXTENSION = ".pb"; + + private static final String STAGED_PIPELINE_METADATA_PROPERTY = "pipeline_url"; + private final Set<PCollection<?>> pcollectionsRequiringIndexedFormat; /** @@ -516,6 +526,22 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { List<DataflowPackage> packages = options.getStager().stageDefaultFiles(); + RunnerApi.Pipeline protoPipeline = PipelineTranslation.toProto(pipeline); + File serializedProtoPipeline; + try { + serializedProtoPipeline = + File.createTempFile(PIPELINE_FILE_NAME, SERIALIZED_PROTOBUF_EXTENSION); + protoPipeline.writeDelimitedTo(new FileOutputStream(serializedProtoPipeline)); + } catch (IOException e) { + throw new RuntimeException(e); + } + + LOG.info("Staging pipeline description to {}", options.getStagingLocation()); + DataflowPackage stagedPipeline = + options + .getStager() + .stageFiles(ImmutableList.of(serializedProtoPipeline.getAbsolutePath())) + .get(0); // Set a unique client_request_id in the CreateJob request. // This is used to ensure idempotence of job creation across retried @@ -560,6 +586,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { String workerHarnessContainerImage = getContainerImageForJob(options); for (WorkerPool workerPool : newJob.getEnvironment().getWorkerPools()) { workerPool.setWorkerHarnessContainerImage(workerHarnessContainerImage); + workerPool.setMetadata( + ImmutableMap.of(STAGED_PIPELINE_METADATA_PROPERTY, stagedPipeline.getLocation())); } newJob.getEnvironment().setVersion(getEnvironmentVersion(options)); http://git-wip-us.apache.org/repos/asf/beam/blob/aea0c601/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index 0e3c266..5bc798a 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -45,6 +45,7 @@ import com.google.api.services.dataflow.Dataflow; import com.google.api.services.dataflow.model.DataflowPackage; import com.google.api.services.dataflow.model.Job; import com.google.api.services.dataflow.model.ListJobsResponse; +import com.google.api.services.storage.model.StorageObject; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import java.io.File; @@ -141,6 +142,7 @@ import org.mockito.stubbing.Answer; @RunWith(JUnit4.class) public class DataflowRunnerTest implements Serializable { + private static final String VALID_BUCKET = "valid-bucket"; private static final String VALID_STAGING_BUCKET = "gs://valid-bucket/staging"; private static final String VALID_TEMP_BUCKET = "gs://valid-bucket/temp"; private static final String VALID_PROFILE_BUCKET = "gs://valid-bucket/profiles"; @@ -166,15 +168,33 @@ public class DataflowRunnerTest implements Serializable { @Before public void setUp() throws IOException { this.mockGcsUtil = mock(GcsUtil.class); + when(mockGcsUtil.create(any(GcsPath.class), anyString())) - .then(new Answer<SeekableByteChannel>() { - @Override - public SeekableByteChannel answer(InvocationOnMock invocation) throws Throwable { - return FileChannel.open( - Files.createTempFile("channel-", ".tmp"), - StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE); - } - }); + .then( + new Answer<SeekableByteChannel>() { + @Override + public SeekableByteChannel answer(InvocationOnMock invocation) throws Throwable { + return FileChannel.open( + Files.createTempFile("channel-", ".tmp"), + StandardOpenOption.CREATE, + StandardOpenOption.WRITE, + StandardOpenOption.DELETE_ON_CLOSE); + } + }); + + when(mockGcsUtil.create(any(GcsPath.class), anyString(), anyInt())) + .then( + new Answer<SeekableByteChannel>() { + @Override + public SeekableByteChannel answer(InvocationOnMock invocation) throws Throwable { + return FileChannel.open( + Files.createTempFile("channel-", ".tmp"), + StandardOpenOption.CREATE, + StandardOpenOption.WRITE, + StandardOpenOption.DELETE_ON_CLOSE); + } + }); + when(mockGcsUtil.expand(any(GcsPath.class))).then(new Answer<List<GcsPath>>() { @Override public List<GcsPath> answer(InvocationOnMock invocation) throws Throwable { @@ -189,6 +209,30 @@ public class DataflowRunnerTest implements Serializable { when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(VALID_PROFILE_BUCKET))).thenReturn(true); when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(NON_EXISTENT_BUCKET))).thenReturn(false); + // Let every valid path be matched + when(mockGcsUtil.getObjects(anyListOf(GcsPath.class))) + .thenAnswer( + new Answer<List<GcsUtil.StorageObjectOrIOException>>() { + @Override + public List<GcsUtil.StorageObjectOrIOException> answer( + InvocationOnMock invocationOnMock) throws Throwable { + + List<GcsPath> gcsPaths = (List<GcsPath>) invocationOnMock.getArguments()[0]; + List<GcsUtil.StorageObjectOrIOException> results = new ArrayList<>(); + + for (GcsPath gcsPath : gcsPaths) { + if (gcsPath.getBucket().equals(VALID_BUCKET)) { + StorageObject resultObject = new StorageObject(); + resultObject.setBucket(gcsPath.getBucket()); + resultObject.setName(gcsPath.getObject()); + results.add(GcsUtil.StorageObjectOrIOException.create(resultObject)); + } + } + + return results; + } + }); + // The dataflow pipeline attempts to output to this location. when(mockGcsUtil.bucketAccessible(GcsPath.fromUri("gs://bucket/object"))).thenReturn(true); @@ -524,14 +568,17 @@ public class DataflowRunnerTest implements Serializable { options.setGcpCredential(new TestCredential()); when(mockGcsUtil.create(any(GcsPath.class), anyString(), anyInt())) - .then(new Answer<SeekableByteChannel>() { - @Override - public SeekableByteChannel answer(InvocationOnMock invocation) throws Throwable { - return FileChannel.open( - Files.createTempFile("channel-", ".tmp"), - StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE); - } - }); + .then( + new Answer<SeekableByteChannel>() { + @Override + public SeekableByteChannel answer(InvocationOnMock invocation) throws Throwable { + return FileChannel.open( + Files.createTempFile("channel-", ".tmp"), + StandardOpenOption.CREATE, + StandardOpenOption.WRITE, + StandardOpenOption.DELETE_ON_CLOSE); + } + }); Pipeline p = buildDataflowPipeline(options);
