Add an option for dataflow job labels.

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/48ae7d1d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/48ae7d1d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/48ae7d1d

Branch: refs/heads/release-2.2.0
Commit: 48ae7d1d8243c6a3a037ad454162376f98dec3ab
Parents: 6db1db7
Author: Ahmet Altay <[email protected]>
Authored: Thu Oct 12 19:17:28 2017 -0700
Committer: Ahmet Altay <[email protected]>
Committed: Mon Oct 30 14:42:12 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/options/pipeline_options.py      |  7 +++++++
 .../apache_beam/runners/dataflow/internal/apiclient.py   | 11 +++++++++++
 2 files changed, 18 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/48ae7d1d/sdks/python/apache_beam/options/pipeline_options.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/options/pipeline_options.py 
b/sdks/python/apache_beam/options/pipeline_options.py
index 3abcbf2..37703fe 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -374,6 +374,13 @@ class GoogleCloudOptions(PipelineOptions):
     parser.add_argument('--template_location',
                         default=None,
                         help='Save job to specified local or GCS location.')
+    parser.add_argument(
+        '--label', '--labels',
+        dest='labels',
+        action='append',
+        default=None,
+        help='Labels that will be applied to this Dataflow job. Labels are key 
'
+        'value pairs separated by = (e.g. --label key=value).')
 
   def validate(self, validator):
     errors = []

http://git-wip-us.apache.org/repos/asf/beam/blob/48ae7d1d/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py 
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index e48b58c..eec598a 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -363,6 +363,17 @@ class Job(object):
       self.proto.type = dataflow.Job.TypeValueValuesEnum.JOB_TYPE_STREAMING
     else:
       self.proto.type = dataflow.Job.TypeValueValuesEnum.JOB_TYPE_BATCH
+
+    # Labels.
+    if self.google_cloud_options.labels:
+      self.proto.labels = dataflow.Job.LabelsValue()
+      for label in self.google_cloud_options.labels:
+        parts = label.split('=', 1)
+        key = parts[0]
+        value = parts[1] if len(parts) > 1 else ''
+        self.proto.labels.additionalProperties.append(
+            dataflow.Job.LabelsValue.AdditionalProperty(key=key, value=value))
+
     self.base64_str_re = re.compile(r'^[A-Za-z0-9+/]*=*$')
     self.coder_str_re = re.compile(r'^([A-Za-z]+\$)([A-Za-z0-9+/]*=*)$')
 

Reply via email to