itayrav opened a new issue, #26577:
URL: https://github.com/apache/beam/issues/26577

   ### What happened?
   
   Hey all,
   
   I recently started trying to run a streaming pipeline using the 
BundleBasedDirectRunner.
   I've copied the CountingSource example from the snippets, and tried running 
the following code:
   
   ```python
   options = beam.options.pipeline_options.PipelineOptions(streaming=True)
   with beam.Pipeline(runner='BundleBasedDirectRunner', options=options) as 
pipeline:
     _ = (
         pipeline
         | beam.io.Read(CountingSource(10))
         | beam.Map(print)
     )
   ```
   
   This simple code yields the following exception:
   ```python
   ---------------------------------------------------------------------------
   RuntimeError                              Traceback (most recent call last)
   
[<ipython-input-80-b2d65211bc9d>](https://colab.corp.google.com/drive/1gr3GhFMK9InOq9ZSm5cB3kT-k_xnhqUd?pli=1#)
 in <module>()
        40 
        41 options = 
beam.options.pipeline_options.PipelineOptions(streaming=True)
   ---> 42 with beam.Pipeline(runner='BundleBasedDirectRunner', 
options=options) as pipeline:
        43   _ = (
        44       pipeline
   
   14 frames
   
[google3/third_party/py/apache_beam/pipeline.py](https://colab.corp.google.com/drive/1gr3GhFMK9InOq9ZSm5cB3kT-k_xnhqUd?pli=1#)
 in visit_transform(self, transform_node)
       485         # type: (AppliedPTransform) -> None
       486         if override.matches(transform_node):
   --> 487           raise RuntimeError(
       488               'Transform node %r was not replaced as expected.' %
       489               transform_node)
   
   RuntimeError: Transform node AppliedPTransform([80]: 
Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/ProcessKeyedElements/GroupByKey/GroupByKey,
 _GroupByKeyOnly) was not replaced as expected.
   ```
   
   I've also tried the same pipeline with streaming=False, but I believe I 
encountered a different bug:
   ```python
   ---------------------------------------------------------------------------
   TypeError                                 Traceback (most recent call last)
   
/export/hda3/borglet/remote_hdd_fs_dirs/0.itayr_colab_kernel_gpu_tesla_v100_55288878.1.kernel.health-research-endoscopy-deid-colab-jobs.2967757270244.14b334fb3717c109/mount/server/rl_colab.par/google3/third_party/py/apache_beam/runners/common.abi3.so
 in apache_beam.runners.common.DoFnRunner._invoke_bundle_method()
   
   
/export/hda3/borglet/remote_hdd_fs_dirs/0.itayr_colab_kernel_gpu_tesla_v100_55288878.1.kernel.health-research-endoscopy-deid-colab-jobs.2967757270244.14b334fb3717c109/mount/server/rl_colab.par/google3/third_party/py/apache_beam/runners/common.abi3.so
 in apache_beam.runners.common.DoFnInvoker.invoke_start_bundle()
   
   
/export/hda3/borglet/remote_hdd_fs_dirs/0.itayr_colab_kernel_gpu_tesla_v100_55288878.1.kernel.health-research-endoscopy-deid-colab-jobs.2967757270244.14b334fb3717c109/mount/server/rl_colab.par/google3/third_party/py/apache_beam/runners/common.abi3.so
 in apache_beam.runners.common.DoFnInvoker.invoke_start_bundle()
   
   
[google3/third_party/py/apache_beam/runners/direct/sdf_direct_runner.py](https://colab.corp.google.com/drive/1gr3GhFMK9InOq9ZSm5cB3kT-k_xnhqUd?pli=1#)
 in start_bundle(self)
       121   def start_bundle(self):
   --> 122     self._invoker = DoFnInvoker.create_invoker(
       123         self._signature,
   
   
/export/hda3/borglet/remote_hdd_fs_dirs/0.itayr_colab_kernel_gpu_tesla_v100_55288878.1.kernel.health-research-endoscopy-deid-colab-jobs.2967757270244.14b334fb3717c109/mount/server/rl_colab.par/google3/third_party/py/apache_beam/runners/common.abi3.so
 in apache_beam.runners.common.DoFnInvoker.create_invoker()
   
   TypeError: create_invoker() takes at least 2 positional arguments (1 given)
   
   During handling of the above exception, another exception occurred:
   
   TypeError                                 Traceback (most recent call last)
   8 frames
   
/export/hda3/borglet/remote_hdd_fs_dirs/0.itayr_colab_kernel_gpu_tesla_v100_55288878.1.kernel.health-research-endoscopy-deid-colab-jobs.2967757270244.14b334fb3717c109/mount/server/rl_colab.par/google3/third_party/py/apache_beam/runners/common.abi3.so
 in apache_beam.runners.common.DoFnRunner.start()
   
   
/export/hda3/borglet/remote_hdd_fs_dirs/0.itayr_colab_kernel_gpu_tesla_v100_55288878.1.kernel.health-research-endoscopy-deid-colab-jobs.2967757270244.14b334fb3717c109/mount/server/rl_colab.par/google3/third_party/py/apache_beam/runners/common.abi3.so
 in apache_beam.runners.common.DoFnRunner._invoke_bundle_method()
   
   
/export/hda3/borglet/remote_hdd_fs_dirs/0.itayr_colab_kernel_gpu_tesla_v100_55288878.1.kernel.health-research-endoscopy-deid-colab-jobs.2967757270244.14b334fb3717c109/mount/server/rl_colab.par/google3/third_party/py/apache_beam/runners/common.abi3.so
 in apache_beam.runners.common.DoFnRunner._reraise_augmented()
   
   
/export/hda3/borglet/remote_hdd_fs_dirs/0.itayr_colab_kernel_gpu_tesla_v100_55288878.1.kernel.health-research-endoscopy-deid-colab-jobs.2967757270244.14b334fb3717c109/mount/server/rl_colab.par/google3/third_party/py/apache_beam/runners/common.abi3.so
 in apache_beam.runners.common.DoFnRunner._invoke_bundle_method()
   
   
/export/hda3/borglet/remote_hdd_fs_dirs/0.itayr_colab_kernel_gpu_tesla_v100_55288878.1.kernel.health-research-endoscopy-deid-colab-jobs.2967757270244.14b334fb3717c109/mount/server/rl_colab.par/google3/third_party/py/apache_beam/runners/common.abi3.so
 in apache_beam.runners.common.DoFnInvoker.invoke_start_bundle()
   
   
/export/hda3/borglet/remote_hdd_fs_dirs/0.itayr_colab_kernel_gpu_tesla_v100_55288878.1.kernel.health-research-endoscopy-deid-colab-jobs.2967757270244.14b334fb3717c109/mount/server/rl_colab.par/google3/third_party/py/apache_beam/runners/common.abi3.so
 in apache_beam.runners.common.DoFnInvoker.invoke_start_bundle()
   
   
[google3/third_party/py/apache_beam/runners/direct/sdf_direct_runner.py](https://colab.corp.google.com/drive/1gr3GhFMK9InOq9ZSm5cB3kT-k_xnhqUd?pli=1#)
 in start_bundle(self)
       120 
       121   def start_bundle(self):
   --> 122     self._invoker = DoFnInvoker.create_invoker(
       123         self._signature,
       124         output_processor=_NoneShallPassOutputHandler(),
   
   
/export/hda3/borglet/remote_hdd_fs_dirs/0.itayr_colab_kernel_gpu_tesla_v100_55288878.1.kernel.health-research-endoscopy-deid-colab-jobs.2967757270244.14b334fb3717c109/mount/server/rl_colab.par/google3/third_party/py/apache_beam/runners/common.abi3.so
 in apache_beam.runners.common.DoFnInvoker.create_invoker()
   
   TypeError: create_invoker() takes at least 2 positional arguments (1 given) 
[while running '[81]: 
Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/pair']
   ```
   
   Indeed, looking at the source code for the latter case, it appears that 
create_invoker expects an argument names output_handler, and not 
output_processor.
   
   A few other things to note:
   
   1. I should say that running the same pipeline with the DirectRunner works 
as expected, for both streaming=True and False (though I believe streaming 
makes no difference in case of using the SwitchingDirectRunner).
   2. The same errors are encountered when trying to use a Splittable DoFn, so 
I believe that's the root of the problem?
   
   Any help would be much appreciated as I'm struggling with this for a few 
days now! :)
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [X] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [X] Component: IO connector
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to