This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 63b1e5a7394 Add support for passing labels as --labels '{ "name":
"wrench", "mass": "1_3kg", "count": "3" }' (#25739)
63b1e5a7394 is described below
commit 63b1e5a7394662974024efca0cc7d3e449d8a66d
Author: Anand Inguva <[email protected]>
AuthorDate: Wed Mar 8 16:37:29 2023 -0500
Add support for passing labels as --labels '{ "name": "wrench", "mass":
"1_3kg", "count": "3" }' (#25739)
* Add support for passing labels as --labels=key=value,key1=value1
* Update doc string
* Add support for parsing dict for --labels
---
sdks/python/apache_beam/options/pipeline_options.py | 3 ++-
.../runners/dataflow/internal/apiclient.py | 21 +++++++++++++++------
.../runners/dataflow/internal/apiclient_test.py | 19 +++++++++++++++++++
3 files changed, 36 insertions(+), 7 deletions(-)
diff --git a/sdks/python/apache_beam/options/pipeline_options.py
b/sdks/python/apache_beam/options/pipeline_options.py
index ee0b1095fa2..229f04b8a35 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -738,7 +738,8 @@ class GoogleCloudOptions(PipelineOptions):
default=None,
help='Labels to be applied to this Dataflow job. '
'Labels are key value pairs separated by = '
- '(e.g. --label key=value).')
+ '(e.g. --label key=value) or '
+ '(--labels=\'{ "key": "value", "mass": "1_3kg", "count": "3" }\').')
parser.add_argument(
'--update',
default=False,
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index b389d08a2a0..882617674c8 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -26,6 +26,7 @@ Dataflow client utility functions."""
# --outdir=apache_beam/runners/dataflow/internal/clients/cloudbuild \
# --root_package=. client
+import ast
import codecs
from functools import partial
import getpass
@@ -506,12 +507,20 @@ class Job(object):
# Labels.
if self.google_cloud_options.labels:
self.proto.labels = dataflow.Job.LabelsValue()
- for label in self.google_cloud_options.labels:
- parts = label.split('=', 1)
- key = parts[0]
- value = parts[1] if len(parts) > 1 else ''
- self.proto.labels.additionalProperties.append(
- dataflow.Job.LabelsValue.AdditionalProperty(key=key, value=value))
+ labels = self.google_cloud_options.labels
+ for label in labels:
+ if '{' in label:
+ label = ast.literal_eval(label)
+ for key, value in label.items():
+ self.proto.labels.additionalProperties.append(
+ dataflow.Job.LabelsValue.AdditionalProperty(
+ key=key, value=value))
+ else:
+ parts = label.split('=', 1)
+ key = parts[0]
+ value = parts[1] if len(parts) > 1 else ''
+ self.proto.labels.additionalProperties.append(
+ dataflow.Job.LabelsValue.AdditionalProperty(key=key,
value=value))
# Client Request ID
self.proto.clientRequestId = '{}-{}'.format(
diff --git
a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
index 4a5a85f4403..2994b6c5138 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
@@ -826,6 +826,25 @@ class UtilTest(unittest.TestCase):
self.assertEqual('key5', job.proto.labels.additionalProperties[4].key)
self.assertEqual('', job.proto.labels.additionalProperties[4].value)
+ pipeline_options = PipelineOptions([
+ '--project',
+ 'test_project',
+ '--job_name',
+ 'test_job_name',
+ '--temp_location',
+ 'gs://test-location/temp',
+ '--labels',
+ '{ "name": "wrench", "mass": "1_3kg", "count": "3" }'
+ ])
+ job = apiclient.Job(pipeline_options, beam_runner_api_pb2.Pipeline())
+ self.assertEqual(3, len(job.proto.labels.additionalProperties))
+ self.assertEqual('name', job.proto.labels.additionalProperties[0].key)
+ self.assertEqual('wrench', job.proto.labels.additionalProperties[0].value)
+ self.assertEqual('mass', job.proto.labels.additionalProperties[1].key)
+ self.assertEqual('1_3kg', job.proto.labels.additionalProperties[1].value)
+ self.assertEqual('count', job.proto.labels.additionalProperties[2].key)
+ self.assertEqual('3', job.proto.labels.additionalProperties[2].value)
+
def test_experiment_use_multiple_sdk_containers(self):
pipeline_options = PipelineOptions([
'--project',