Set Gcs upload buffer size to 1M in streaming mode in DataflowRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/da3081a6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/da3081a6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/da3081a6 Branch: refs/heads/master Commit: da3081a68c82c6f22ee382dfe0ffe1bd6be5d0e2 Parents: d93ef2e Author: Pei He <[email protected]> Authored: Mon Aug 15 12:22:11 2016 -0700 Committer: Luke Cwik <[email protected]> Committed: Wed Aug 17 16:24:39 2016 -0700 ---------------------------------------------------------------------- .../beam/runners/dataflow/DataflowRunner.java | 6 ++++ .../runners/dataflow/DataflowRunnerTest.java | 31 ++++++++++++++++++++ 2 files changed, 37 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/da3081a6/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 6222289..6f8180e 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -125,6 +125,7 @@ import com.google.api.services.dataflow.model.DataflowPackage; import com.google.api.services.dataflow.model.Job; import com.google.api.services.dataflow.model.ListJobsResponse; import com.google.api.services.dataflow.model.WorkerPool; +import com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Joiner; @@ -309,6 +310,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { + "' invalid. Please make sure the value is non-negative."); } + if (dataflowOptions.isStreaming() && dataflowOptions.getGcsUploadBufferSizeBytes() == null) { + dataflowOptions.setGcsUploadBufferSizeBytes( + AbstractGoogleAsyncWriteChannel.UPLOAD_PIPE_BUFFER_SIZE_DEFAULT); + } + return new DataflowRunner(dataflowOptions); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/da3081a6/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index d7deffd..6f1653b 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -89,6 +89,7 @@ import com.google.api.services.dataflow.Dataflow; import com.google.api.services.dataflow.model.DataflowPackage; import com.google.api.services.dataflow.model.Job; import com.google.api.services.dataflow.model.ListJobsResponse; +import com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -795,6 +796,36 @@ public class DataflowRunnerTest { } } + @Test + public void testGcsUploadBufferSizeDefault() throws IOException { + DataflowPipelineOptions batchOptions = buildPipelineOptions(); + DataflowRunner.fromOptions(batchOptions); + assertNull(batchOptions.getGcsUploadBufferSizeBytes()); + + DataflowPipelineOptions streamingOptions = buildPipelineOptions(); + streamingOptions.setStreaming(true); + DataflowRunner.fromOptions(streamingOptions); + assertEquals( + AbstractGoogleAsyncWriteChannel.UPLOAD_PIPE_BUFFER_SIZE_DEFAULT, + streamingOptions.getGcsUploadBufferSizeBytes().intValue()); + } + + @Test + public void testGcsUploadBufferSize() throws IOException { + int gcsUploadBufferSizeBytes = 12345678; + DataflowPipelineOptions batchOptions = buildPipelineOptions(); + batchOptions.setGcsUploadBufferSizeBytes(gcsUploadBufferSizeBytes); + DataflowRunner.fromOptions(batchOptions); + assertEquals(gcsUploadBufferSizeBytes, batchOptions.getGcsUploadBufferSizeBytes().intValue()); + + DataflowPipelineOptions streamingOptions = buildPipelineOptions(); + streamingOptions.setStreaming(true); + streamingOptions.setGcsUploadBufferSizeBytes(gcsUploadBufferSizeBytes); + DataflowRunner.fromOptions(streamingOptions); + assertEquals( + gcsUploadBufferSizeBytes, streamingOptions.getGcsUploadBufferSizeBytes().intValue()); + } + /** * A fake PTransform for testing. */
