Repository: beam Updated Branches: refs/heads/master f398e5ad4 -> a3a7807fe
Add an option for dataflow job labels. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3feef917 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3feef917 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3feef917 Branch: refs/heads/master Commit: 3feef91761c6f5a44f535e4daf9c39a88320e229 Parents: f398e5a Author: Ahmet Altay <[email protected]> Authored: Thu Oct 12 19:17:28 2017 -0700 Committer: Ahmet Altay <[email protected]> Committed: Fri Oct 13 12:51:30 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/options/pipeline_options.py | 7 +++++++ .../apache_beam/runners/dataflow/internal/apiclient.py | 11 +++++++++++ 2 files changed, 18 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/3feef917/sdks/python/apache_beam/options/pipeline_options.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 2598551..a09c7c3 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -374,6 +374,13 @@ class GoogleCloudOptions(PipelineOptions): parser.add_argument('--template_location', default=None, help='Save job to specified local or GCS location.') + parser.add_argument( + '--label', '--labels', + dest='labels', + action='append', + default=None, + help='Labels that will be applied to this Dataflow job. Labels are key ' + 'value pairs separated by = (e.g. --label key=value).') def validate(self, validator): errors = [] http://git-wip-us.apache.org/repos/asf/beam/blob/3feef917/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index e48b58c..eec598a 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -363,6 +363,17 @@ class Job(object): self.proto.type = dataflow.Job.TypeValueValuesEnum.JOB_TYPE_STREAMING else: self.proto.type = dataflow.Job.TypeValueValuesEnum.JOB_TYPE_BATCH + + # Labels. + if self.google_cloud_options.labels: + self.proto.labels = dataflow.Job.LabelsValue() + for label in self.google_cloud_options.labels: + parts = label.split('=', 1) + key = parts[0] + value = parts[1] if len(parts) > 1 else '' + self.proto.labels.additionalProperties.append( + dataflow.Job.LabelsValue.AdditionalProperty(key=key, value=value)) + self.base64_str_re = re.compile(r'^[A-Za-z0-9+/]*=*$') self.coder_str_re = re.compile(r'^([A-Za-z]+\$)([A-Za-z0-9+/]*=*)$')
