This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 63b1e5a7394 Add support for passing labels as --labels '{ "name": 
"wrench", "mass": "1_3kg", "count": "3" }' (#25739)
63b1e5a7394 is described below

commit 63b1e5a7394662974024efca0cc7d3e449d8a66d
Author: Anand Inguva <[email protected]>
AuthorDate: Wed Mar 8 16:37:29 2023 -0500

    Add support for passing labels as --labels '{ "name": "wrench", "mass": 
"1_3kg", "count": "3" }' (#25739)
    
    * Add support for passing labels as --labels=key=value,key1=value1
    
    * Update doc string
    
    * Add support for parsing dict for --labels
---
 sdks/python/apache_beam/options/pipeline_options.py |  3 ++-
 .../runners/dataflow/internal/apiclient.py          | 21 +++++++++++++++------
 .../runners/dataflow/internal/apiclient_test.py     | 19 +++++++++++++++++++
 3 files changed, 36 insertions(+), 7 deletions(-)

diff --git a/sdks/python/apache_beam/options/pipeline_options.py 
b/sdks/python/apache_beam/options/pipeline_options.py
index ee0b1095fa2..229f04b8a35 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -738,7 +738,8 @@ class GoogleCloudOptions(PipelineOptions):
         default=None,
         help='Labels to be applied to this Dataflow job. '
         'Labels are key value pairs separated by = '
-        '(e.g. --label key=value).')
+        '(e.g. --label key=value) or '
+        '(--labels=\'{ "key": "value", "mass": "1_3kg", "count": "3" }\').')
     parser.add_argument(
         '--update',
         default=False,
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py 
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index b389d08a2a0..882617674c8 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -26,6 +26,7 @@ Dataflow client utility functions."""
 #  --outdir=apache_beam/runners/dataflow/internal/clients/cloudbuild \
 #  --root_package=. client
 
+import ast
 import codecs
 from functools import partial
 import getpass
@@ -506,12 +507,20 @@ class Job(object):
     # Labels.
     if self.google_cloud_options.labels:
       self.proto.labels = dataflow.Job.LabelsValue()
-      for label in self.google_cloud_options.labels:
-        parts = label.split('=', 1)
-        key = parts[0]
-        value = parts[1] if len(parts) > 1 else ''
-        self.proto.labels.additionalProperties.append(
-            dataflow.Job.LabelsValue.AdditionalProperty(key=key, value=value))
+      labels = self.google_cloud_options.labels
+      for label in labels:
+        if '{' in label:
+          label = ast.literal_eval(label)
+          for key, value in label.items():
+            self.proto.labels.additionalProperties.append(
+                dataflow.Job.LabelsValue.AdditionalProperty(
+                    key=key, value=value))
+        else:
+          parts = label.split('=', 1)
+          key = parts[0]
+          value = parts[1] if len(parts) > 1 else ''
+          self.proto.labels.additionalProperties.append(
+              dataflow.Job.LabelsValue.AdditionalProperty(key=key, 
value=value))
 
     # Client Request ID
     self.proto.clientRequestId = '{}-{}'.format(
diff --git 
a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py 
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
index 4a5a85f4403..2994b6c5138 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
@@ -826,6 +826,25 @@ class UtilTest(unittest.TestCase):
     self.assertEqual('key5', job.proto.labels.additionalProperties[4].key)
     self.assertEqual('', job.proto.labels.additionalProperties[4].value)
 
+    pipeline_options = PipelineOptions([
+        '--project',
+        'test_project',
+        '--job_name',
+        'test_job_name',
+        '--temp_location',
+        'gs://test-location/temp',
+        '--labels',
+        '{ "name": "wrench", "mass": "1_3kg", "count": "3" }'
+    ])
+    job = apiclient.Job(pipeline_options, beam_runner_api_pb2.Pipeline())
+    self.assertEqual(3, len(job.proto.labels.additionalProperties))
+    self.assertEqual('name', job.proto.labels.additionalProperties[0].key)
+    self.assertEqual('wrench', job.proto.labels.additionalProperties[0].value)
+    self.assertEqual('mass', job.proto.labels.additionalProperties[1].key)
+    self.assertEqual('1_3kg', job.proto.labels.additionalProperties[1].value)
+    self.assertEqual('count', job.proto.labels.additionalProperties[2].key)
+    self.assertEqual('3', job.proto.labels.additionalProperties[2].value)
+
   def test_experiment_use_multiple_sdk_containers(self):
     pipeline_options = PipelineOptions([
         '--project',

Reply via email to