Adding support for subnetwork in Python Pipelineoptions
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/902f27a3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/902f27a3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/902f27a3 Branch: refs/heads/jstorm-runner Commit: 902f27a34cf65923823d10c0edf11ba27e883c30 Parents: aad8555 Author: Pablo <[email protected]> Authored: Mon May 15 15:47:18 2017 -0700 Committer: Ahmet Altay <[email protected]> Committed: Mon May 15 21:32:46 2017 -0700 ---------------------------------------------------------------------- .../apache_beam/options/pipeline_options.py | 11 +++++++++- .../runners/dataflow/internal/apiclient.py | 2 ++ .../runners/dataflow/internal/apiclient_test.py | 21 ++++++++++++++++++++ 3 files changed, 33 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/902f27a3/sdks/python/apache_beam/options/pipeline_options.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 983d128..777926a 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -169,7 +169,7 @@ class PipelineOptions(HasDisplayData): """Returns a PipelineOptions from a dictionary of arguments. Args: - options: Dictinary of argument value pairs. + options: Dictionary of argument value pairs. Returns: A PipelineOptions object representing the given arguments. @@ -455,6 +455,15 @@ class WorkerOptions(PipelineOptions): 'GCE network for launching workers. Default is up to the Dataflow ' 'service.')) parser.add_argument( + '--subnetwork', + default=None, + help=( + 'GCE subnetwork for launching workers. Default is up to the ' + 'Dataflow service. Expected format is ' + 'regions/REGION/subnetworks/SUBNETWORK or the fully qualified ' + 'subnetwork name. For more information, see ' + 'https://cloud.google.com/compute/docs/vpc/')) + parser.add_argument( '--worker_harness_container_image', default=None, help=('Docker registry location of container image to use for the ' http://git-wip-us.apache.org/repos/asf/beam/blob/902f27a3/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index ea49593..bfdd5e4 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -199,6 +199,8 @@ class Environment(object): pool.zone = self.worker_options.zone if self.worker_options.network: pool.network = self.worker_options.network + if self.worker_options.subnetwork: + pool.subnetwork = self.worker_options.subnetwork if self.worker_options.worker_harness_container_image: pool.workerHarnessContainerImage = ( self.worker_options.worker_harness_container_image) http://git-wip-us.apache.org/repos/asf/beam/blob/902f27a3/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py index 6ed1fb4..67cf77f 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py @@ -44,6 +44,27 @@ class UtilTest(unittest.TestCase): pipeline_options, DataflowRunner.BATCH_ENVIRONMENT_MAJOR_VERSION) + def test_set_network(self): + pipeline_options = PipelineOptions( + ['--network', 'anetworkname', + '--temp_location', 'gs://any-location/temp']) + env = apiclient.Environment([], #packages + pipeline_options, + '2.0.0') #any environment version + self.assertEqual(env.proto.workerPools[0].network, + 'anetworkname') + + def test_set_subnetwork(self): + pipeline_options = PipelineOptions( + ['--subnetwork', '/regions/MY/subnetworks/SUBNETWORK', + '--temp_location', 'gs://any-location/temp']) + + env = apiclient.Environment([], #packages + pipeline_options, + '2.0.0') #any environment version + self.assertEqual(env.proto.workerPools[0].subnetwork, + '/regions/MY/subnetworks/SUBNETWORK') + def test_invalid_default_job_name(self): # Regexp for job names in dataflow. regexp = '^[a-z]([-a-z0-9]{0,61}[a-z0-9])?$'
