Repository: incubator-beam Updated Branches: refs/heads/master bf15d2f3c -> 22ff05c49
[BEAM-93] Add subnetwork support and increment Dataflow API dependency Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ab10ac35 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ab10ac35 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ab10ac35 Branch: refs/heads/master Commit: ab10ac3560ee38398ff222f552e372e91f1ca4af Parents: bf15d2f Author: sammcveety <[email protected]> Authored: Wed Mar 2 21:27:08 2016 -0800 Committer: Davor Bonaci <[email protected]> Committed: Sat Mar 5 21:16:12 2016 -0800 ---------------------------------------------------------------------- pom.xml | 2 +- .../DataflowPipelineWorkerPoolOptions.java | 12 +++++++ .../sdk/runners/DataflowPipelineTranslator.java | 3 ++ .../runners/DataflowPipelineTranslatorTest.java | 34 ++++++++++++++++++++ 4 files changed, 50 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab10ac35/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index de47ff5..f9dbab7 100644 --- a/pom.xml +++ b/pom.xml @@ -70,7 +70,7 @@ <avro.version>1.7.7</avro.version> <bigquery.version>v2-rev248-1.21.0</bigquery.version> <bigtable.version>0.2.3</bigtable.version> - <dataflow.version>v1b3-rev19-1.21.0</dataflow.version> + <dataflow.version>v1b3-rev22-1.21.0</dataflow.version> <dataflow.proto.version>0.5.160222</dataflow.proto.version> <datastore.version>v1beta2-rev1-4.0.0</datastore.version> <google-clients.version>1.21.0</google-clients.version> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab10ac35/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineWorkerPoolOptions.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineWorkerPoolOptions.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineWorkerPoolOptions.java index 25d1589..be5cfdc 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineWorkerPoolOptions.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineWorkerPoolOptions.java @@ -145,6 +145,18 @@ public interface DataflowPipelineWorkerPoolOptions extends PipelineOptions { void setNetwork(String value); /** + * GCE <a href="https://cloud.google.com/compute/docs/networking">subnetwork</a> for launching + * workers. + * + * <p>Default is up to the Dataflow service. Expected format is zones/ZONE/subnetworks/SUBNETWORK. + */ + @Description("GCE subnetwork for launching workers. For more information, see the reference " + + "documentation https://cloud.google.com/compute/docs/networking. " + + "Default is up to the Dataflow service.") + String getSubnetwork(); + void setSubnetwork(String value); + + /** * GCE <a href="https://developers.google.com/compute/docs/zones" * >availability zone</a> for launching workers. * http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab10ac35/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java index ae3a403..d0cc4e5 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java @@ -442,6 +442,9 @@ public class DataflowPipelineTranslator { if (!Strings.isNullOrEmpty(options.getNetwork())) { workerPool.setNetwork(options.getNetwork()); } + if (!Strings.isNullOrEmpty(options.getSubnetwork())) { + workerPool.setSubnetwork(options.getSubnetwork()); + } if (options.getDiskSizeGb() > 0) { workerPool.setDiskSizeGb(options.getDiskSizeGb()); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab10ac35/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java ---------------------------------------------------------------------- diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java index 72090a0..497552f 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java @@ -231,6 +231,40 @@ public class DataflowPipelineTranslatorTest { } @Test + public void testSubnetworkConfig() throws IOException { + final String testSubnetwork = "zones/ZONE/subnetworks/SUBNETWORK"; + + DataflowPipelineOptions options = buildPipelineOptions(); + options.setSubnetwork(testSubnetwork); + + DataflowPipeline p = buildPipeline(options); + p.traverseTopologically(new RecordingPipelineVisitor()); + Job job = + DataflowPipelineTranslator.fromOptions(options) + .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList()) + .getJob(); + + assertEquals(1, job.getEnvironment().getWorkerPools().size()); + assertEquals(testSubnetwork, + job.getEnvironment().getWorkerPools().get(0).getSubnetwork()); + } + + @Test + public void testSubnetworkConfigMissing() throws IOException { + DataflowPipelineOptions options = buildPipelineOptions(); + + DataflowPipeline p = buildPipeline(options); + p.traverseTopologically(new RecordingPipelineVisitor()); + Job job = + DataflowPipelineTranslator.fromOptions(options) + .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList()) + .getJob(); + + assertEquals(1, job.getEnvironment().getWorkerPools().size()); + assertNull(job.getEnvironment().getWorkerPools().get(0).getSubnetwork()); + } + + @Test public void testScalingAlgorithmMissing() throws IOException { DataflowPipelineOptions options = buildPipelineOptions();
