gemini-code-assist[bot] commented on code in PR #36140: URL: https://github.com/apache/beam/pull/36140#discussion_r2347001830
########## sdks/python/apache_beam/io/gcp/pubsub.py: ########## @@ -564,11 +568,65 @@ def __init__(self, transform): # TODO(https://github.com/apache/beam/issues/18939): Add support for # id_label and timestamp_attribute. - if transform.id_label: - raise NotImplementedError('id_label is not supported for PubSub writes') - if transform.timestamp_attribute: - raise NotImplementedError( - 'timestamp_attribute is not supported for PubSub writes') + # Only raise errors for DirectRunner or batch pipelines + pipeline_options = transform.pipeline_options + should_raise_error = False + + if pipeline_options: + from apache_beam.options.pipeline_options import StandardOptions + + # Check if using DirectRunner + try: + # Get runner from pipeline options + all_options = pipeline_options.get_all_options() + runner_name = all_options.get('runner', StandardOptions.DEFAULT_RUNNER) + + # Check if it's a DirectRunner variant + if (runner_name is None or + (runner_name in StandardOptions.LOCAL_RUNNERS or 'DirectRunner' + in str(runner_name) or 'TestDirectRunner' in str(runner_name))): + should_raise_error = True + except Exception: + # If we can't determine runner, assume DirectRunner for safety + should_raise_error = True + + # Check if in batch mode (not streaming) + standard_options = pipeline_options.view_as(StandardOptions) + if not standard_options.streaming: + should_raise_error = True + else: + # If no pipeline options available, fall back to original behavior + should_raise_error = True + + # Log debug information for troubleshooting + import logging + runner_info = getattr( + pipeline_options, 'runner', + 'None') if pipeline_options else 'No options' + streaming_info = 'Unknown' + if pipeline_options: + try: + standard_options = pipeline_options.view_as(StandardOptions) + streaming_info = 'streaming=%s' % standard_options.streaming + except Exception: + streaming_info = 'streaming=unknown' + + logging.debug( + 'PubSub unsupported feature check: runner=%s, %s', + runner_info, + streaming_info) + + if should_raise_error: + + if transform.id_label: + raise NotImplementedError( + f'id_label is not supported for PubSub writes with DirectRunner ' + f'or in batch mode (runner={runner_info}, {streaming_info})') + if transform.timestamp_attribute: + raise NotImplementedError( + f'timestamp_attribute is not supported for PubSub writes with ' + f'DirectRunner or in batch mode ' + f'(runner={runner_info}, {streaming_info})') Review Comment:  This block of code to check for unsupported features is quite complex and has a few issues that could be improved for clarity and robustness: 1. The logic to determine `should_raise_error` is spread out, making it hard to follow. It could be simplified by combining the checks for DirectRunner and batch mode. 2. There are multiple broad `except Exception` blocks. This can hide bugs. It's better to catch specific exceptions or at least log the caught exception. 3. The `runner_name` is determined, but then a different, potentially incorrect method (`getattr(pipeline_options, 'runner', ...)`) is used for logging. 4. `import logging` is used inside the method, which is against Python style guides. It should be at the top of the file.[^1] 5. Information like `standard_options` is retrieved multiple times. Consider refactoring this logic to be more linear and clear as suggested below. This refactoring: * Consolidates the logic for determining if an error should be raised. * Ensures consistent use of runner information for logic and logging. * Logs the exception when runner detection fails. * Simplifies the overall structure. ```python # Only raise errors for DirectRunner or batch pipelines pipeline_options = transform.pipeline_options runner_name = 'No options' is_streaming = None should_raise_error = True # Default to original behavior (always raise) if pipeline_options: from apache_beam.options.pipeline_options import StandardOptions standard_options = pipeline_options.view_as(StandardOptions) is_streaming = standard_options.streaming try: all_options = pipeline_options.get_all_options() runner_name = all_options.get('runner', StandardOptions.DEFAULT_RUNNER) except Exception as e: import logging logging.warning( 'Could not determine runner from pipeline options, assuming a ' 'runner with limitations for PubSub writes: %s', e) runner_name = 'Unknown (error)' is_direct_runner = ( runner_name is None or runner_name in StandardOptions.LOCAL_RUNNERS or 'DirectRunner' in str(runner_name) or 'TestDirectRunner' in str(runner_name) ) # Features are supported only on non-Direct runners in streaming mode. should_raise_error = is_direct_runner or not is_streaming import logging streaming_info = f'streaming={is_streaming}' if is_streaming is not None else 'streaming=unknown' logging.debug( 'PubSub unsupported feature check: runner=%s, %s', runner_name, streaming_info) if should_raise_error: if transform.id_label: raise NotImplementedError( f'id_label is not supported for PubSub writes with DirectRunner ' f'or in batch mode (runner={runner_name}, {streaming_info})') if transform.timestamp_attribute: raise NotImplementedError( f'timestamp_attribute is not supported for PubSub writes with ' f'DirectRunner or in batch mode ' f'(runner={runner_name}, {streaming_info})') ``` #### Style Guide References [^1]: PEP 8 recommends that imports are always put at the top of the file, just after any module comments and docstrings, and before module globals and constants. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org