gemini-code-assist[bot] commented on code in PR #36140:
URL: https://github.com/apache/beam/pull/36140#discussion_r2347001830


##########
sdks/python/apache_beam/io/gcp/pubsub.py:
##########
@@ -564,11 +568,65 @@ def __init__(self, transform):
 
     # TODO(https://github.com/apache/beam/issues/18939): Add support for
     # id_label and timestamp_attribute.
-    if transform.id_label:
-      raise NotImplementedError('id_label is not supported for PubSub writes')
-    if transform.timestamp_attribute:
-      raise NotImplementedError(
-          'timestamp_attribute is not supported for PubSub writes')
+    # Only raise errors for DirectRunner or batch pipelines
+    pipeline_options = transform.pipeline_options
+    should_raise_error = False
+
+    if pipeline_options:
+      from apache_beam.options.pipeline_options import StandardOptions
+
+      # Check if using DirectRunner
+      try:
+        # Get runner from pipeline options
+        all_options = pipeline_options.get_all_options()
+        runner_name = all_options.get('runner', StandardOptions.DEFAULT_RUNNER)
+
+        # Check if it's a DirectRunner variant
+        if (runner_name is None or
+            (runner_name in StandardOptions.LOCAL_RUNNERS or 'DirectRunner'
+             in str(runner_name) or 'TestDirectRunner' in str(runner_name))):
+          should_raise_error = True
+      except Exception:
+        # If we can't determine runner, assume DirectRunner for safety
+        should_raise_error = True
+
+      # Check if in batch mode (not streaming)
+      standard_options = pipeline_options.view_as(StandardOptions)
+      if not standard_options.streaming:
+        should_raise_error = True
+    else:
+      # If no pipeline options available, fall back to original behavior
+      should_raise_error = True
+
+    # Log debug information for troubleshooting
+    import logging
+    runner_info = getattr(
+        pipeline_options, 'runner',
+        'None') if pipeline_options else 'No options'
+    streaming_info = 'Unknown'
+    if pipeline_options:
+      try:
+        standard_options = pipeline_options.view_as(StandardOptions)
+        streaming_info = 'streaming=%s' % standard_options.streaming
+      except Exception:
+        streaming_info = 'streaming=unknown'
+
+    logging.debug(
+        'PubSub unsupported feature check: runner=%s, %s',
+        runner_info,
+        streaming_info)
+
+    if should_raise_error:
+
+      if transform.id_label:
+        raise NotImplementedError(
+            f'id_label is not supported for PubSub writes with DirectRunner '
+            f'or in batch mode (runner={runner_info}, {streaming_info})')
+      if transform.timestamp_attribute:
+        raise NotImplementedError(
+            f'timestamp_attribute is not supported for PubSub writes with '
+            f'DirectRunner or in batch mode '
+            f'(runner={runner_info}, {streaming_info})')

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   This block of code to check for unsupported features is quite complex and 
has a few issues that could be improved for clarity and robustness:
   1.  The logic to determine `should_raise_error` is spread out, making it 
hard to follow. It could be simplified by combining the checks for DirectRunner 
and batch mode.
   2.  There are multiple broad `except Exception` blocks. This can hide bugs. 
It's better to catch specific exceptions or at least log the caught exception.
   3.  The `runner_name` is determined, but then a different, potentially 
incorrect method (`getattr(pipeline_options, 'runner', ...)`) is used for 
logging.
   4.  `import logging` is used inside the method, which is against Python 
style guides. It should be at the top of the file.[^1]
   5.  Information like `standard_options` is retrieved multiple times.
   
   Consider refactoring this logic to be more linear and clear as suggested 
below. This refactoring:
   *   Consolidates the logic for determining if an error should be raised.
   *   Ensures consistent use of runner information for logic and logging.
   *   Logs the exception when runner detection fails.
   *   Simplifies the overall structure.
   
   ```python
       # Only raise errors for DirectRunner or batch pipelines
       pipeline_options = transform.pipeline_options
       runner_name = 'No options'
       is_streaming = None
       should_raise_error = True  # Default to original behavior (always raise)
   
       if pipeline_options:
         from apache_beam.options.pipeline_options import StandardOptions
         standard_options = pipeline_options.view_as(StandardOptions)
         is_streaming = standard_options.streaming
   
         try:
           all_options = pipeline_options.get_all_options()
           runner_name = all_options.get('runner', 
StandardOptions.DEFAULT_RUNNER)
         except Exception as e:
           import logging
           logging.warning(
               'Could not determine runner from pipeline options, assuming a '
               'runner with limitations for PubSub writes: %s', e)
           runner_name = 'Unknown (error)'
   
         is_direct_runner = (
             runner_name is None or
             runner_name in StandardOptions.LOCAL_RUNNERS or
             'DirectRunner' in str(runner_name) or
             'TestDirectRunner' in str(runner_name)
         )
   
         # Features are supported only on non-Direct runners in streaming mode.
         should_raise_error = is_direct_runner or not is_streaming
   
       import logging
       streaming_info = f'streaming={is_streaming}' if is_streaming is not None 
else 'streaming=unknown'
       logging.debug(
           'PubSub unsupported feature check: runner=%s, %s',
           runner_name,
           streaming_info)
   
       if should_raise_error:
         if transform.id_label:
           raise NotImplementedError(
               f'id_label is not supported for PubSub writes with DirectRunner '
               f'or in batch mode (runner={runner_name}, {streaming_info})')
         if transform.timestamp_attribute:
           raise NotImplementedError(
               f'timestamp_attribute is not supported for PubSub writes with '
               f'DirectRunner or in batch mode '
               f'(runner={runner_name}, {streaming_info})')
   ```
   
   #### Style Guide References
   [^1]: PEP 8 recommends that imports are always put at the top of the file, 
just after any module comments and docstrings, and before module globals and 
constants.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to