gemini-code-assist[bot] commented on code in PR #37639:
URL: https://github.com/apache/beam/pull/37639#discussion_r3311963385
##########
sdks/python/apache_beam/runners/dataflow/internal/apiclient.py:
##########
@@ -236,77 +235,78 @@ def __init__(
container_image_url = environment_payload.container_image
container_image = dataflow.SdkHarnessContainerImage()
- container_image.containerImage = container_image_url
- container_image.useSingleCorePerContainer = (
+ container_image.container_image = container_image_url
+ container_image.use_single_core_per_container = (
common_urns.protocols.MULTI_CORE_BUNDLE_PROCESSING.urn
not in environment.capabilities)
- container_image.environmentId = id
+ container_image.environment_id = id
for capability in environment.capabilities:
container_image.capabilities.append(capability)
- pool.sdkHarnessContainerImages.append(container_image)
+ pool.sdk_harness_container_images.append(container_image)
- if not pool.sdkHarnessContainerImages:
- pool.workerHarnessContainerImage = (
+ if not pool.sdk_harness_container_images:
+ pool.worker_harness_container_image = (
get_container_image_from_options(options))
- elif len(pool.sdkHarnessContainerImages) == 1:
+ elif len(pool.sdk_harness_container_images) == 1:
# Dataflow expects a value here when there is only one environment.
- pool.workerHarnessContainerImage = (
- pool.sdkHarnessContainerImages[0].containerImage)
+ pool.worker_harness_container_image = (
+ pool.sdk_harness_container_images[0].container_image)
if self.debug_options.number_of_worker_harness_threads:
- pool.numThreadsPerWorker = (
+ pool.num_threads_per_worker = (
self.debug_options.number_of_worker_harness_threads)
if self.worker_options.use_public_ips is not None:
if self.worker_options.use_public_ips:
- pool.ipConfiguration = (
-
dataflow.WorkerPool.IpConfigurationValueValuesEnum.WORKER_IP_PUBLIC)
+ pool.ip_configuration = (
+ dataflow.WorkerIPAddressConfiguration.WORKER_IP_PUBLIC)
else:
- pool.ipConfiguration = (
-
dataflow.WorkerPool.IpConfigurationValueValuesEnum.WORKER_IP_PRIVATE
- )
+ pool.ip_configuration = (
+ dataflow.WorkerIPAddressConfiguration.WORKER_IP_PRIVATE)
if self.standard_options.streaming:
# Use separate data disk for streaming.
disk = dataflow.Disk()
if self.local:
- disk.diskType = 'local'
+ disk.disk_type = 'local'
if self.worker_options.disk_type:
- disk.diskType = self.worker_options.disk_type
- pool.dataDisks.append(disk)
- self.proto.workerPools.append(pool)
+ disk.disk_type = self.worker_options.disk_type
+ pool.data_disks.append(disk)
+ self.proto.worker_pools.append(pool)
sdk_pipeline_options = options.get_all_options(retain_unknown_options=True)
if sdk_pipeline_options:
- self.proto.sdkPipelineOptions = (
- dataflow.Environment.SdkPipelineOptionsValue())
-
- options_dict = {
- k: v
- for k, v in sdk_pipeline_options.items() if v is not None
- }
+ options_dict = {}
+ for k, v in sdk_pipeline_options.items():
+ if v is None:
+ continue
+ options_dict[k] = str(v) if isinstance(
+ v, value_provider.ValueProvider) else v
options_dict["pipelineUrl"] = proto_pipeline_staged_url
if pipeline_proto_hash:
options_dict["pipelineProtoHash"] = pipeline_proto_hash
# Don't pass impersonate_service_account through to the harness.
# Though impersonation should start a job, the workers should
# not try to modify their credentials.
options_dict.pop('impersonate_service_account', None)
- self.proto.sdkPipelineOptions.additionalProperties.append(
- dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty(
- key='options', value=to_json_value(options_dict)))
dd = DisplayData.create_from_options(options)
items = [item.get_dict() for item in dd.items]
- self.proto.sdkPipelineOptions.additionalProperties.append(
- dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty(
- key='display_data', value=to_json_value(items)))
+
+ _LOGGER.info('sdk_pipeline_options: %s', options_dict)
+
+ self.proto.sdk_pipeline_options = Struct(
+ fields={
+ 'options': Value(struct_value=options_dict),
+ 'display_data': Value(list_value=items)
+ })
Review Comment:

Directly passing a Python dictionary to `Value(struct_value=...)` or a
Python list to `Value(list_value=...)` will raise a `TypeError` at runtime
because protobuf expects explicit `Struct` and `ListValue` message instances.
Using the `.update()` method on a `Struct` object automatically handles the
conversion of nested Python dictionaries and lists safely.
```suggestion
sdk_pipeline_options_struct = Struct()
sdk_pipeline_options_struct.update({
'options': options_dict,
'display_data': items
})
self.proto.sdk_pipeline_options = sdk_pipeline_options_struct
```
##########
sdks/python/apache_beam/runners/dataflow/dataflow_runner.py:
##########
@@ -185,7 +192,10 @@ def rank_error(msg):
messages, page_token = runner.dataflow_client.list_messages(
job_id, page_token=page_token, start_time=last_message_time)
for m in messages:
- message = '%s: %s: %s' % (m.time, m.messageImportance, m.messageText)
+ message = '%s: %s: %s' % (
+ m.time,
+ dataflow_api.JobMessageImportance(m.message_importance).name,
+ m.message_text)
Review Comment:

The code attempts to resolve the name of `m.message_importance` on line 197
before checking if it is `None` on line 212. If `m.message_importance` is
`None`, `dataflow_api.JobMessageImportance(None)` will raise a `ValueError` and
crash the loop. Resolving the importance name safely handles the `None` case.
```suggestion
importance_name = (
dataflow_api.JobMessageImportance(m.message_importance).name
if m.message_importance is not None
else 'JOB_MESSAGE_IMPORTANCE_UNKNOWN'
)
message = '%s: %s: %s' % (
m.time,
importance_name,
m.message_text)
```
##########
sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py:
##########
@@ -135,24 +132,19 @@ def _get_metric_key(self, metric):
# step name (only happens for unstructured-named metrics).
# 2. Unable to unpack [step] or [namespace]; which should only happen
# for unstructured names.
- step = _get_match(
- metric.name.context.additionalProperties,
- lambda x: x.key == STEP_LABEL).value
+ step = metric.name.context['step']
step = self._translate_step_name(step)
except ValueError:
Review Comment:

Accessing `metric.name.context['step']` directly will raise a `KeyError` if
the `'step'` key is missing from the context dictionary. The original code
caught `ValueError` from `_get_match`, but now a `KeyError` will propagate and
crash the metrics collection. Catching `KeyError` along with `ValueError`
ensures robust handling.
```suggestion
step = metric.name.context['step']
step = self._translate_step_name(step)
except (KeyError, ValueError):
```
##########
sdks/python/setup.py:
##########
@@ -533,6 +533,7 @@ def get_portability_package_data():
'google-cloud-datastore>=2.0.0,<3',
'google-cloud-pubsub>=2.1.0,<3',
'google-cloud-storage>=2.18.2,<4',
+ 'google-cloud-dataflow-client>=0.13.0,<0.14.0',
Review Comment:

The dependency version specified here (`>=0.13.0,<0.14.0`) conflicts with
the version added to the GPU image requirements files
(`gpu_image_requirements.txt` for Python 3.10, 3.11, and 3.12), which specify
`google-cloud-dataflow-client==0.11.0`. Please align the versions to avoid
dependency resolution conflicts during container builds.
##########
sdks/python/apache_beam/runners/dataflow/internal/apiclient.py:
##########
@@ -1027,98 +1035,97 @@ def list_messages(
JOB_MESSAGE_WARNING, JOB_MESSAGE_ERROR.
messageText: A message string.
"""
- request = dataflow.DataflowProjectsLocationsJobsMessagesListRequest(
- jobId=job_id,
+ request = dataflow.ListJobMessagesRequest(
+ job_id=job_id,
location=self.google_cloud_options.region,
- projectId=self.google_cloud_options.project)
+ project_id=self.google_cloud_options.project)
if page_token is not None:
- request.pageToken = page_token
+ request.page_token = page_token
if start_time is not None:
- request.startTime = start_time
+ request.start_time = start_time
if end_time is not None:
- request.endTime = end_time
+ request.end_time = end_time
if minimum_importance is not None:
if minimum_importance == 'JOB_MESSAGE_DEBUG':
- request.minimumImportance = (
- dataflow.DataflowProjectsLocationsJobsMessagesListRequest.
- MinimumImportanceValueValuesEnum.JOB_MESSAGE_DEBUG)
+ request.minimum_importance = (
+ dataflow.JobMessageImportance.JOB_MESSAGE_DEBUG)
elif minimum_importance == 'JOB_MESSAGE_DETAILED':
request.minimumImportance = (
- dataflow.DataflowProjectsLocationsJobsMessagesListRequest.
- MinimumImportanceValueValuesEnum.JOB_MESSAGE_DETAILED)
+ dataflow.JobMessageImportance.JOB_MESSAGE_DETAILED)
elif minimum_importance == 'JOB_MESSAGE_BASIC':
request.minimumImportance = (
- dataflow.DataflowProjectsLocationsJobsMessagesListRequest.
- MinimumImportanceValueValuesEnum.JOB_MESSAGE_BASIC)
+ dataflow.JobMessageImportance.JOB_MESSAGE_BASIC)
elif minimum_importance == 'JOB_MESSAGE_WARNING':
request.minimumImportance = (
- dataflow.DataflowProjectsLocationsJobsMessagesListRequest.
- MinimumImportanceValueValuesEnum.JOB_MESSAGE_WARNING)
+ dataflow.JobMessageImportance.JOB_MESSAGE_WARNING)
elif minimum_importance == 'JOB_MESSAGE_ERROR':
request.minimumImportance = (
- dataflow.DataflowProjectsLocationsJobsMessagesListRequest.
- MinimumImportanceValueValuesEnum.JOB_MESSAGE_ERROR)
+ dataflow.JobMessageImportance.JOB_MESSAGE_ERROR)
else:
raise RuntimeError(
'Unexpected value for minimum_importance argument: %r' %
minimum_importance)
Review Comment:

There is a typo in setting the `minimum_importance` field on the request
object. For `JOB_MESSAGE_DEBUG`, it correctly uses
`request.minimum_importance`, but for all other importance levels (`DETAILED`,
`BASIC`, `WARNING`, `ERROR`), it incorrectly uses `request.minimumImportance`
(camelCase). Since the new client uses snake_case, this will either raise an
`AttributeError` or fail to send the minimum importance parameter to the
Dataflow service.
```suggestion
if minimum_importance is not None:
if minimum_importance == 'JOB_MESSAGE_DEBUG':
request.minimum_importance = (
dataflow.JobMessageImportance.JOB_MESSAGE_DEBUG)
elif minimum_importance == 'JOB_MESSAGE_DETAILED':
request.minimum_importance = (
dataflow.JobMessageImportance.JOB_MESSAGE_DETAILED)
elif minimum_importance == 'JOB_MESSAGE_BASIC':
request.minimum_importance = (
dataflow.JobMessageImportance.JOB_MESSAGE_BASIC)
elif minimum_importance == 'JOB_MESSAGE_WARNING':
request.minimum_importance = (
dataflow.JobMessageImportance.JOB_MESSAGE_WARNING)
elif minimum_importance == 'JOB_MESSAGE_ERROR':
request.minimum_importance = (
dataflow.JobMessageImportance.JOB_MESSAGE_ERROR)
else:
raise RuntimeError(
'Unexpected value for minimum_importance argument: %r' %
minimum_importance)
```
##########
sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py:
##########
@@ -185,10 +177,7 @@ def _populate_metrics(self, response, result,
user_metrics=False):
# in the service.
# The second way is only useful for the UI, and should be ignored.
continue
- is_tentative = [
- prop for prop in metric.name.context.additionalProperties
- if prop.key == 'tentative' and prop.value == 'true'
- ]
+ is_tentative = metric.name.context['tentative']
Review Comment:

Accessing `metric.name.context['tentative']` directly will raise a
`KeyError` for any non-tentative metrics where the `'tentative'` key is not
present in the context. This will crash the metrics collection loop. Using
`.get('tentative')` safely handles the absence of the key.
```suggestion
is_tentative = metric.name.context.get('tentative') == 'true'
```
##########
sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py:
##########
@@ -209,32 +198,13 @@ def _get_metric_value(self, metric):
return None
if metric.scalar is not None:
- return metric.scalar.integer_value
+ # This will always be a single value if there is any data in the field.
+ return metric.scalar
elif metric.distribution is not None:
- dist_count = _get_match(
- metric.distribution.object_value.properties,
- lambda x: x.key == 'count').value.integer_value
- dist_min = _get_match(
- metric.distribution.object_value.properties,
- lambda x: x.key == 'min').value.integer_value
- dist_max = _get_match(
- metric.distribution.object_value.properties,
- lambda x: x.key == 'max').value.integer_value
- dist_sum = _get_match(
- metric.distribution.object_value.properties,
- lambda x: x.key == 'sum').value.integer_value
- if dist_sum is None:
- # distribution metric is not meant to use on large values, but in case
- # it is, the value can overflow and become double_value, the
correctness
- # of the value may not be guaranteed.
- _LOGGER.info(
- "Distribution metric sum value seems to have "
- "overflowed integer_value range, the correctness of sum or mean "
- "value may not be guaranteed: %s" % metric.distribution)
- dist_sum = int(
- _get_match(
- metric.distribution.object_value.properties,
- lambda x: x.key == 'sum').value.double_value)
+ dist_count = metric.distribution['count']
+ dist_min = metric.distribution['min']
+ dist_max = metric.distribution['max']
+ dist_sum = metric.distribution['sum']
return DistributionResult(
DistributionData(dist_sum, dist_count, dist_min, dist_max))
Review Comment:

Because `google-cloud-dataflow-client` uses `proto-plus`, fields of type
`google.protobuf.Value` and `google.protobuf.Struct` are automatically unpacked
to native Python types. However, numbers in these fields are always
deserialized as Python `float` values (e.g., `42.0` instead of `42`). Returning
floats for counters and distribution metrics is a backward-incompatible change
that can break downstream pipelines and assertions. Converting them back to
`int` when appropriate preserves compatibility.
```suggestion
if metric.scalar is not None:
# This will always be a single value if there is any data in the field.
val = metric.scalar
if isinstance(val, float) and val.is_integer():
return int(val)
return val
elif metric.distribution is not None:
dist_count = int(metric.distribution['count'])
dist_min = int(metric.distribution['min'])
dist_max = int(metric.distribution['max'])
dist_sum = int(metric.distribution['sum'])
return DistributionResult(
DistributionData(dist_sum, dist_count, dist_min, dist_max))
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]