Add an option for dataflow job labels.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/48ae7d1d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/48ae7d1d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/48ae7d1d Branch: refs/heads/release-2.2.0 Commit: 48ae7d1d8243c6a3a037ad454162376f98dec3ab Parents: 6db1db7 Author: Ahmet Altay <[email protected]> Authored: Thu Oct 12 19:17:28 2017 -0700 Committer: Ahmet Altay <[email protected]> Committed: Mon Oct 30 14:42:12 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/options/pipeline_options.py | 7 +++++++ .../apache_beam/runners/dataflow/internal/apiclient.py | 11 +++++++++++ 2 files changed, 18 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/48ae7d1d/sdks/python/apache_beam/options/pipeline_options.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 3abcbf2..37703fe 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -374,6 +374,13 @@ class GoogleCloudOptions(PipelineOptions): parser.add_argument('--template_location', default=None, help='Save job to specified local or GCS location.') + parser.add_argument( + '--label', '--labels', + dest='labels', + action='append', + default=None, + help='Labels that will be applied to this Dataflow job. Labels are key ' + 'value pairs separated by = (e.g. --label key=value).') def validate(self, validator): errors = [] http://git-wip-us.apache.org/repos/asf/beam/blob/48ae7d1d/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index e48b58c..eec598a 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -363,6 +363,17 @@ class Job(object): self.proto.type = dataflow.Job.TypeValueValuesEnum.JOB_TYPE_STREAMING else: self.proto.type = dataflow.Job.TypeValueValuesEnum.JOB_TYPE_BATCH + + # Labels. + if self.google_cloud_options.labels: + self.proto.labels = dataflow.Job.LabelsValue() + for label in self.google_cloud_options.labels: + parts = label.split('=', 1) + key = parts[0] + value = parts[1] if len(parts) > 1 else '' + self.proto.labels.additionalProperties.append( + dataflow.Job.LabelsValue.AdditionalProperty(key=key, value=value)) + self.base64_str_re = re.compile(r'^[A-Za-z0-9+/]*=*$') self.coder_str_re = re.compile(r'^([A-Za-z]+\$)([A-Za-z0-9+/]*=*)$')
