liferoad commented on code in PR #36140: URL: https://github.com/apache/beam/pull/36140#discussion_r2346892821
########## sdks/python/apache_beam/io/gcp/pubsub.py: ########## @@ -564,11 +568,65 @@ def __init__(self, transform): # TODO(https://github.com/apache/beam/issues/18939): Add support for # id_label and timestamp_attribute. - if transform.id_label: - raise NotImplementedError('id_label is not supported for PubSub writes') - if transform.timestamp_attribute: - raise NotImplementedError( - 'timestamp_attribute is not supported for PubSub writes') + # Only raise errors for DirectRunner or batch pipelines + pipeline_options = transform.pipeline_options + should_raise_error = False + + if pipeline_options: + from apache_beam.options.pipeline_options import StandardOptions + + # Check if using DirectRunner + try: + # Get runner from pipeline options + all_options = pipeline_options.get_all_options() + runner_name = all_options.get('runner', StandardOptions.DEFAULT_RUNNER) + + # Check if it's a DirectRunner variant + if (runner_name is None or + (runner_name in StandardOptions.LOCAL_RUNNERS or 'DirectRunner' + in str(runner_name) or 'TestDirectRunner' in str(runner_name))): + should_raise_error = True + except Exception: + # If we can't determine runner, assume DirectRunner for safety + should_raise_error = True + + # Check if in batch mode (not streaming) + standard_options = pipeline_options.view_as(StandardOptions) + if not standard_options.streaming: + should_raise_error = True + else: + # If no pipeline options available, fall back to original behavior + should_raise_error = True + + # Log debug information for troubleshooting + import logging + runner_info = getattr( + pipeline_options, 'runner', + 'None') if pipeline_options else 'No options' + streaming_info = 'Unknown' + if pipeline_options: + try: + standard_options = pipeline_options.view_as(StandardOptions) + streaming_info = 'streaming=%s' % standard_options.streaming + except Exception: + streaming_info = 'streaming=unknown' + + logging.debug( + 'PubSub unsupported feature check: runner=%s, %s', + runner_info, + streaming_info) + + if should_raise_error: + + if transform.id_label: Review Comment: only dataflow runner with streaming can support these two parameters. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org