addressed feedback
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0e088b7f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0e088b7f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0e088b7f Branch: refs/heads/master Commit: 0e088b7fcb2b35d7fdc5125d4dc66e9fa6ae7ffd Parents: da3081a Author: Pei He <[email protected]> Authored: Wed Aug 17 13:56:37 2016 -0700 Committer: Luke Cwik <[email protected]> Committed: Wed Aug 17 16:24:39 2016 -0700 ---------------------------------------------------------------------- .../beam/runners/dataflow/DataflowRunner.java | 6 ++++-- .../runners/dataflow/DataflowRunnerTest.java | 21 +++++++++++++------- 2 files changed, 18 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0e088b7f/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 6f8180e..1a845ea 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -219,6 +219,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { // The limit of CreateJob request size. private static final int CREATE_JOB_REQUEST_LIMIT_BYTES = 10 * 1024 * 1024; + @VisibleForTesting + static final int GCS_UPLOAD_BUFFER_SIZE_BYTES_DEFAULT = 1 * 1024 * 1024; + private final Set<PCollection<?>> pcollectionsRequiringIndexedFormat; /** @@ -311,8 +314,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { } if (dataflowOptions.isStreaming() && dataflowOptions.getGcsUploadBufferSizeBytes() == null) { - dataflowOptions.setGcsUploadBufferSizeBytes( - AbstractGoogleAsyncWriteChannel.UPLOAD_PIPE_BUFFER_SIZE_DEFAULT); + dataflowOptions.setGcsUploadBufferSizeBytes(GCS_UPLOAD_BUFFER_SIZE_BYTES_DEFAULT); } return new DataflowRunner(dataflowOptions); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0e088b7f/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index 6f1653b..58b9878 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -797,31 +797,38 @@ public class DataflowRunnerTest { } @Test - public void testGcsUploadBufferSizeDefault() throws IOException { + public void testGcsUploadBufferSizeIsUnsetForBatchWhenDefault() throws IOException { DataflowPipelineOptions batchOptions = buildPipelineOptions(); - DataflowRunner.fromOptions(batchOptions); + batchOptions.setRunner(DataflowRunner.class); + Pipeline.create(batchOptions); assertNull(batchOptions.getGcsUploadBufferSizeBytes()); + } + @Test + public void testGcsUploadBufferSizeIsSetForStreamingWhenDefault() throws IOException { DataflowPipelineOptions streamingOptions = buildPipelineOptions(); streamingOptions.setStreaming(true); - DataflowRunner.fromOptions(streamingOptions); + streamingOptions.setRunner(DataflowRunner.class); + Pipeline.create(streamingOptions); assertEquals( - AbstractGoogleAsyncWriteChannel.UPLOAD_PIPE_BUFFER_SIZE_DEFAULT, + DataflowRunner.GCS_UPLOAD_BUFFER_SIZE_BYTES_DEFAULT, streamingOptions.getGcsUploadBufferSizeBytes().intValue()); } @Test - public void testGcsUploadBufferSize() throws IOException { + public void testGcsUploadBufferSizeUnchangedWhenNotDefault() throws IOException { int gcsUploadBufferSizeBytes = 12345678; DataflowPipelineOptions batchOptions = buildPipelineOptions(); batchOptions.setGcsUploadBufferSizeBytes(gcsUploadBufferSizeBytes); - DataflowRunner.fromOptions(batchOptions); + batchOptions.setRunner(DataflowRunner.class); + Pipeline.create(batchOptions); assertEquals(gcsUploadBufferSizeBytes, batchOptions.getGcsUploadBufferSizeBytes().intValue()); DataflowPipelineOptions streamingOptions = buildPipelineOptions(); streamingOptions.setStreaming(true); streamingOptions.setGcsUploadBufferSizeBytes(gcsUploadBufferSizeBytes); - DataflowRunner.fromOptions(streamingOptions); + streamingOptions.setRunner(DataflowRunner.class); + Pipeline.create(streamingOptions); assertEquals( gcsUploadBufferSizeBytes, streamingOptions.getGcsUploadBufferSizeBytes().intValue()); }
