gemini-code-assist[bot] commented on code in PR #37639:
URL: https://github.com/apache/beam/pull/37639#discussion_r3311963385


##########
sdks/python/apache_beam/runners/dataflow/internal/apiclient.py:
##########
@@ -236,77 +235,78 @@ def __init__(
       container_image_url = environment_payload.container_image
 
       container_image = dataflow.SdkHarnessContainerImage()
-      container_image.containerImage = container_image_url
-      container_image.useSingleCorePerContainer = (
+      container_image.container_image = container_image_url
+      container_image.use_single_core_per_container = (
           common_urns.protocols.MULTI_CORE_BUNDLE_PROCESSING.urn
           not in environment.capabilities)
-      container_image.environmentId = id
+      container_image.environment_id = id
       for capability in environment.capabilities:
         container_image.capabilities.append(capability)
-      pool.sdkHarnessContainerImages.append(container_image)
+      pool.sdk_harness_container_images.append(container_image)
 
-    if not pool.sdkHarnessContainerImages:
-      pool.workerHarnessContainerImage = (
+    if not pool.sdk_harness_container_images:
+      pool.worker_harness_container_image = (
           get_container_image_from_options(options))
-    elif len(pool.sdkHarnessContainerImages) == 1:
+    elif len(pool.sdk_harness_container_images) == 1:
       # Dataflow expects a value here when there is only one environment.
-      pool.workerHarnessContainerImage = (
-          pool.sdkHarnessContainerImages[0].containerImage)
+      pool.worker_harness_container_image = (
+          pool.sdk_harness_container_images[0].container_image)
 
     if self.debug_options.number_of_worker_harness_threads:
-      pool.numThreadsPerWorker = (
+      pool.num_threads_per_worker = (
           self.debug_options.number_of_worker_harness_threads)
     if self.worker_options.use_public_ips is not None:
       if self.worker_options.use_public_ips:
-        pool.ipConfiguration = (
-            
dataflow.WorkerPool.IpConfigurationValueValuesEnum.WORKER_IP_PUBLIC)
+        pool.ip_configuration = (
+            dataflow.WorkerIPAddressConfiguration.WORKER_IP_PUBLIC)
       else:
-        pool.ipConfiguration = (
-            
dataflow.WorkerPool.IpConfigurationValueValuesEnum.WORKER_IP_PRIVATE
-        )
+        pool.ip_configuration = (
+            dataflow.WorkerIPAddressConfiguration.WORKER_IP_PRIVATE)
 
     if self.standard_options.streaming:
       # Use separate data disk for streaming.
       disk = dataflow.Disk()
       if self.local:
-        disk.diskType = 'local'
+        disk.disk_type = 'local'
       if self.worker_options.disk_type:
-        disk.diskType = self.worker_options.disk_type
-      pool.dataDisks.append(disk)
-    self.proto.workerPools.append(pool)
+        disk.disk_type = self.worker_options.disk_type
+      pool.data_disks.append(disk)
+    self.proto.worker_pools.append(pool)
 
     sdk_pipeline_options = options.get_all_options(retain_unknown_options=True)
     if sdk_pipeline_options:
-      self.proto.sdkPipelineOptions = (
-          dataflow.Environment.SdkPipelineOptionsValue())
-
-      options_dict = {
-          k: v
-          for k, v in sdk_pipeline_options.items() if v is not None
-      }
+      options_dict = {}
+      for k, v in sdk_pipeline_options.items():
+        if v is None:
+          continue
+        options_dict[k] = str(v) if isinstance(
+            v, value_provider.ValueProvider) else v
       options_dict["pipelineUrl"] = proto_pipeline_staged_url
       if pipeline_proto_hash:
         options_dict["pipelineProtoHash"] = pipeline_proto_hash
       # Don't pass impersonate_service_account through to the harness.
       # Though impersonation should start a job, the workers should
       # not try to modify their credentials.
       options_dict.pop('impersonate_service_account', None)
-      self.proto.sdkPipelineOptions.additionalProperties.append(
-          dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty(
-              key='options', value=to_json_value(options_dict)))
 
       dd = DisplayData.create_from_options(options)
       items = [item.get_dict() for item in dd.items]
-      self.proto.sdkPipelineOptions.additionalProperties.append(
-          dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty(
-              key='display_data', value=to_json_value(items)))
+
+      _LOGGER.info('sdk_pipeline_options: %s', options_dict)
+
+      self.proto.sdk_pipeline_options = Struct(
+          fields={
+              'options': Value(struct_value=options_dict),
+              'display_data': Value(list_value=items)
+          })

Review Comment:
   ![critical](https://www.gstatic.com/codereviewagent/critical.svg)
   
   Directly passing a Python dictionary to `Value(struct_value=...)` or a 
Python list to `Value(list_value=...)` will raise a `TypeError` at runtime 
because protobuf expects explicit `Struct` and `ListValue` message instances. 
Using the `.update()` method on a `Struct` object automatically handles the 
conversion of nested Python dictionaries and lists safely.
   
   ```suggestion
         sdk_pipeline_options_struct = Struct()
         sdk_pipeline_options_struct.update({
             'options': options_dict,
             'display_data': items
         })
         self.proto.sdk_pipeline_options = sdk_pipeline_options_struct
   ```



##########
sdks/python/apache_beam/runners/dataflow/dataflow_runner.py:
##########
@@ -185,7 +192,10 @@ def rank_error(msg):
         messages, page_token = runner.dataflow_client.list_messages(
             job_id, page_token=page_token, start_time=last_message_time)
         for m in messages:
-          message = '%s: %s: %s' % (m.time, m.messageImportance, m.messageText)
+          message = '%s: %s: %s' % (
+              m.time,
+              dataflow_api.JobMessageImportance(m.message_importance).name,
+              m.message_text)

Review Comment:
   ![critical](https://www.gstatic.com/codereviewagent/critical.svg)
   
   The code attempts to resolve the name of `m.message_importance` on line 197 
before checking if it is `None` on line 212. If `m.message_importance` is 
`None`, `dataflow_api.JobMessageImportance(None)` will raise a `ValueError` and 
crash the loop. Resolving the importance name safely handles the `None` case.
   
   ```suggestion
             importance_name = (
                 dataflow_api.JobMessageImportance(m.message_importance).name
                 if m.message_importance is not None
                 else 'JOB_MESSAGE_IMPORTANCE_UNKNOWN'
             )
             message = '%s: %s: %s' % (
                 m.time,
                 importance_name,
                 m.message_text)
   ```



##########
sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py:
##########
@@ -135,24 +132,19 @@ def _get_metric_key(self, metric):
       #   step name (only happens for unstructured-named metrics).
       # 2. Unable to unpack [step] or [namespace]; which should only happen
       #   for unstructured names.
-      step = _get_match(
-          metric.name.context.additionalProperties,
-          lambda x: x.key == STEP_LABEL).value
+      step = metric.name.context['step']
       step = self._translate_step_name(step)
     except ValueError:

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   Accessing `metric.name.context['step']` directly will raise a `KeyError` if 
the `'step'` key is missing from the context dictionary. The original code 
caught `ValueError` from `_get_match`, but now a `KeyError` will propagate and 
crash the metrics collection. Catching `KeyError` along with `ValueError` 
ensures robust handling.
   
   ```suggestion
         step = metric.name.context['step']
         step = self._translate_step_name(step)
       except (KeyError, ValueError):
   ```



##########
sdks/python/setup.py:
##########
@@ -533,6 +533,7 @@ def get_portability_package_data():
               'google-cloud-datastore>=2.0.0,<3',
               'google-cloud-pubsub>=2.1.0,<3',
               'google-cloud-storage>=2.18.2,<4',
+              'google-cloud-dataflow-client>=0.13.0,<0.14.0',

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The dependency version specified here (`>=0.13.0,<0.14.0`) conflicts with 
the version added to the GPU image requirements files 
(`gpu_image_requirements.txt` for Python 3.10, 3.11, and 3.12), which specify 
`google-cloud-dataflow-client==0.11.0`. Please align the versions to avoid 
dependency resolution conflicts during container builds.



##########
sdks/python/apache_beam/runners/dataflow/internal/apiclient.py:
##########
@@ -1027,98 +1035,97 @@ def list_messages(
         JOB_MESSAGE_WARNING, JOB_MESSAGE_ERROR.
      messageText: A message string.
     """
-    request = dataflow.DataflowProjectsLocationsJobsMessagesListRequest(
-        jobId=job_id,
+    request = dataflow.ListJobMessagesRequest(
+        job_id=job_id,
         location=self.google_cloud_options.region,
-        projectId=self.google_cloud_options.project)
+        project_id=self.google_cloud_options.project)
     if page_token is not None:
-      request.pageToken = page_token
+      request.page_token = page_token
     if start_time is not None:
-      request.startTime = start_time
+      request.start_time = start_time
     if end_time is not None:
-      request.endTime = end_time
+      request.end_time = end_time
     if minimum_importance is not None:
       if minimum_importance == 'JOB_MESSAGE_DEBUG':
-        request.minimumImportance = (
-            dataflow.DataflowProjectsLocationsJobsMessagesListRequest.
-            MinimumImportanceValueValuesEnum.JOB_MESSAGE_DEBUG)
+        request.minimum_importance = (
+            dataflow.JobMessageImportance.JOB_MESSAGE_DEBUG)
       elif minimum_importance == 'JOB_MESSAGE_DETAILED':
         request.minimumImportance = (
-            dataflow.DataflowProjectsLocationsJobsMessagesListRequest.
-            MinimumImportanceValueValuesEnum.JOB_MESSAGE_DETAILED)
+            dataflow.JobMessageImportance.JOB_MESSAGE_DETAILED)
       elif minimum_importance == 'JOB_MESSAGE_BASIC':
         request.minimumImportance = (
-            dataflow.DataflowProjectsLocationsJobsMessagesListRequest.
-            MinimumImportanceValueValuesEnum.JOB_MESSAGE_BASIC)
+            dataflow.JobMessageImportance.JOB_MESSAGE_BASIC)
       elif minimum_importance == 'JOB_MESSAGE_WARNING':
         request.minimumImportance = (
-            dataflow.DataflowProjectsLocationsJobsMessagesListRequest.
-            MinimumImportanceValueValuesEnum.JOB_MESSAGE_WARNING)
+            dataflow.JobMessageImportance.JOB_MESSAGE_WARNING)
       elif minimum_importance == 'JOB_MESSAGE_ERROR':
         request.minimumImportance = (
-            dataflow.DataflowProjectsLocationsJobsMessagesListRequest.
-            MinimumImportanceValueValuesEnum.JOB_MESSAGE_ERROR)
+            dataflow.JobMessageImportance.JOB_MESSAGE_ERROR)
       else:
         raise RuntimeError(
             'Unexpected value for minimum_importance argument: %r' %
             minimum_importance)

Review Comment:
   ![critical](https://www.gstatic.com/codereviewagent/critical.svg)
   
   There is a typo in setting the `minimum_importance` field on the request 
object. For `JOB_MESSAGE_DEBUG`, it correctly uses 
`request.minimum_importance`, but for all other importance levels (`DETAILED`, 
`BASIC`, `WARNING`, `ERROR`), it incorrectly uses `request.minimumImportance` 
(camelCase). Since the new client uses snake_case, this will either raise an 
`AttributeError` or fail to send the minimum importance parameter to the 
Dataflow service.
   
   ```suggestion
       if minimum_importance is not None:
         if minimum_importance == 'JOB_MESSAGE_DEBUG':
           request.minimum_importance = (
               dataflow.JobMessageImportance.JOB_MESSAGE_DEBUG)
         elif minimum_importance == 'JOB_MESSAGE_DETAILED':
           request.minimum_importance = (
               dataflow.JobMessageImportance.JOB_MESSAGE_DETAILED)
         elif minimum_importance == 'JOB_MESSAGE_BASIC':
           request.minimum_importance = (
               dataflow.JobMessageImportance.JOB_MESSAGE_BASIC)
         elif minimum_importance == 'JOB_MESSAGE_WARNING':
           request.minimum_importance = (
               dataflow.JobMessageImportance.JOB_MESSAGE_WARNING)
         elif minimum_importance == 'JOB_MESSAGE_ERROR':
           request.minimum_importance = (
               dataflow.JobMessageImportance.JOB_MESSAGE_ERROR)
         else:
           raise RuntimeError(
               'Unexpected value for minimum_importance argument: %r' %
               minimum_importance)
   ```



##########
sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py:
##########
@@ -185,10 +177,7 @@ def _populate_metrics(self, response, result, 
user_metrics=False):
         #  in the service.
         # The second way is only useful for the UI, and should be ignored.
         continue
-      is_tentative = [
-          prop for prop in metric.name.context.additionalProperties
-          if prop.key == 'tentative' and prop.value == 'true'
-      ]
+      is_tentative = metric.name.context['tentative']

Review Comment:
   ![critical](https://www.gstatic.com/codereviewagent/critical.svg)
   
   Accessing `metric.name.context['tentative']` directly will raise a 
`KeyError` for any non-tentative metrics where the `'tentative'` key is not 
present in the context. This will crash the metrics collection loop. Using 
`.get('tentative')` safely handles the absence of the key.
   
   ```suggestion
         is_tentative = metric.name.context.get('tentative') == 'true'
   ```



##########
sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py:
##########
@@ -209,32 +198,13 @@ def _get_metric_value(self, metric):
       return None
 
     if metric.scalar is not None:
-      return metric.scalar.integer_value
+      # This will always be a single value if there is any data in the field.
+      return metric.scalar
     elif metric.distribution is not None:
-      dist_count = _get_match(
-          metric.distribution.object_value.properties,
-          lambda x: x.key == 'count').value.integer_value
-      dist_min = _get_match(
-          metric.distribution.object_value.properties,
-          lambda x: x.key == 'min').value.integer_value
-      dist_max = _get_match(
-          metric.distribution.object_value.properties,
-          lambda x: x.key == 'max').value.integer_value
-      dist_sum = _get_match(
-          metric.distribution.object_value.properties,
-          lambda x: x.key == 'sum').value.integer_value
-      if dist_sum is None:
-        # distribution metric is not meant to use on large values, but in case
-        # it is, the value can overflow and become double_value, the 
correctness
-        # of the value may not be guaranteed.
-        _LOGGER.info(
-            "Distribution metric sum value seems to have "
-            "overflowed integer_value range, the correctness of sum or mean "
-            "value may not be guaranteed: %s" % metric.distribution)
-        dist_sum = int(
-            _get_match(
-                metric.distribution.object_value.properties,
-                lambda x: x.key == 'sum').value.double_value)
+      dist_count = metric.distribution['count']
+      dist_min = metric.distribution['min']
+      dist_max = metric.distribution['max']
+      dist_sum = metric.distribution['sum']
       return DistributionResult(
           DistributionData(dist_sum, dist_count, dist_min, dist_max))

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   Because `google-cloud-dataflow-client` uses `proto-plus`, fields of type 
`google.protobuf.Value` and `google.protobuf.Struct` are automatically unpacked 
to native Python types. However, numbers in these fields are always 
deserialized as Python `float` values (e.g., `42.0` instead of `42`). Returning 
floats for counters and distribution metrics is a backward-incompatible change 
that can break downstream pipelines and assertions. Converting them back to 
`int` when appropriate preserves compatibility.
   
   ```suggestion
       if metric.scalar is not None:
         # This will always be a single value if there is any data in the field.
         val = metric.scalar
         if isinstance(val, float) and val.is_integer():
           return int(val)
         return val
       elif metric.distribution is not None:
         dist_count = int(metric.distribution['count'])
         dist_min = int(metric.distribution['min'])
         dist_max = int(metric.distribution['max'])
         dist_sum = int(metric.distribution['sum'])
         return DistributionResult(
             DistributionData(dist_sum, dist_count, dist_min, dist_max))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to