gemini-code-assist[bot] commented on code in PR #39088:
URL: https://github.com/apache/beam/pull/39088#discussion_r3466981677
##########
sdks/python/apache_beam/io/external/xlang_mqttio_it_test.py:
##########
@@ -214,12 +214,20 @@ def subscribe():
publisher.start()
subscriber.start()
- options = PipelineOptions([
- '--runner=PrismRunner',
- '--environment_type=LOOPBACK',
- '--streaming',
- ])
- p = TestPipeline(options=options)
+ # MqttIO read is unbounded, so this pipeline runs in streaming mode and
+ # never terminates on its own. Amend the harness-provided pipeline options
+ # rather than discarding them: enable streaming, run non-blocking so the
+ # observe-then-cancel logic below can execute, and target the Prism
portable
+ # runner. The latter is required because SwitchingDirectRunner disables its
+ # Prism delegation for pipelines containing external (cross-language)
+ # transforms (see runners/direct/direct_runner.py) and falls back to the
+ # BundleBasedDirectRunner, which cannot execute an unbounded read.
+ p = TestPipeline(blocking=False)
+ standard_options = p.get_pipeline_options().view_as(StandardOptions)
+ standard_options.streaming = True
+ standard_options.runner = 'PrismRunner'
+ p.get_pipeline_options().view_as(
+ PortableOptions).environment_type = 'LOOPBACK'
Review Comment:

In Apache Beam, the pipeline runner is instantiated during the `Pipeline`
(and thus `TestPipeline`) initialization (`__init__`) based on the options
available at that time. Modifying `standard_options.runner` *after* the
pipeline has been instantiated has no effect because the runner object has
already been created and stored in `p.runner`.
To ensure the pipeline actually runs on `PrismRunner`, you should pass the
`runner` argument directly to the `TestPipeline` constructor.
```python
p = TestPipeline(runner='PrismRunner', blocking=False)
standard_options = p.get_pipeline_options().view_as(StandardOptions)
standard_options.streaming = True
p.get_pipeline_options().view_as(
PortableOptions).environment_type = 'LOOPBACK'
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]