Joseph Toth created BEAM-11158:
----------------------------------
Summary: Side Inputs to beam.Partition
Key: BEAM-11158
URL: https://issues.apache.org/jira/browse/BEAM-11158
Project: Beam
Issue Type: Bug
Components: sdk-py-core
Reporter: Joseph Toth
Side inputs work with a regular ParDo and function, but I can't seem to get it
to work with beam.Partition. The code and exception below demonstrates the
problem.
```
import apache_beam as beam
def main():
class SideFn(beam.PartitionFn):
def partition_for(self, element, *args, **kwargs):
print(element, args, kwargs)
def just_print(element, *args, **kwargs):
print(element, args, kwargs)
with beam.Pipeline() as p:
side = p | 'CreateSide' >> beam.Create(['a'])
p | beam.Create([1, 2, 3]) | beam.Partition(SideFn(), 99,
side=beam.pvalue.AsSingleton(side))
# p | beam.Create([1, 2, 3]) | beam.ParDo(just_print, 99,
side=beam.pvalue.AsSingleton(side))
if __name__ == '__main__':
main()
```
/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/bin/python
/Users/joetoth/projects/joetoth.com/psy/part.py
Traceback (most recent call last):
File
"/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/internal/util.py",
line 134, in <genexpr>
(k, next(v_iter)) if isinstance(v, ArgumentPlaceholder) else (k, v) for k,
StopIteration
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1213, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 742, in
apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 804, in
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File
"/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/internal/util.py",
line 133, in insert_values_in_args
new_kwargs = dict(
RuntimeError: generator raised StopIteration
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/joetoth/projects/joetoth.com/psy/part.py", line 19, in <module>
main()
File "/Users/joetoth/projects/joetoth.com/psy/part.py", line 14, in main
p | beam.Create([1, 2, 3]) | beam.Partition(SideFn(), 99,
side=beam.pvalue.AsSingleton(side))
File
"/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/pipeline.py",
line 568, in __exit__
self.result = self.run()
File
"/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/pipeline.py",
line 547, in run
return self.runner.run_pipeline(self, self._options)
File
"/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/direct/direct_runner.py",
line 119, in run_pipeline
return runner.run_pipeline(pipeline, options)
File
"/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 175, in run_pipeline
self._latest_run_result = self.run_via_runner_api(
File
"/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 186, in run_via_runner_api
return self.run_stages(stage_context, stages)
File
"/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 344, in run_stages
stage_results = self._run_stage(
File
"/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 527, in _run_stage
last_result, deferred_inputs, fired_timers = self._run_bundle(
File
"/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 571, in _run_bundle
result, splits = bundle_manager.process_bundle(
File
"/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 852, in process_bundle
result_future = self._worker_handler.control_conn.push(process_bundle_req)
File
"/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py",
line 353, in push
response = self.worker.do_instruction(request)
File
"/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 483, in do_instruction
return getattr(self, request_type)(
File
"/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 519, in process_bundle
bundle_processor.process_bundle(instruction_id))
File
"/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 984, in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
File
"/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 221, in process_encoded
self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 354, in
apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 356, in
apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 218, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 703, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 704, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1215, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1279, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1213, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 569, in
apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1374, in
apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 218, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 703, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 704, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1215, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1279, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1213, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 569, in
apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1374, in
apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 218, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 703, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 704, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1215, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1279, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1213, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 569, in
apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1374, in
apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 218, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 703, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 704, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1215, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1294, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
File
"/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/future/utils/__init__.py",
line 446, in raise_with_traceback
raise exc.with_traceback(traceback)
File "apache_beam/runners/common.py", line 1213, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 742, in
apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 804, in
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File
"/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/internal/util.py",
line 133, in insert_values_in_args
new_kwargs = dict(
RuntimeError: generator raised StopIteration [while running
'Partition(SideFn)/ParDo(ApplyPartitionFnFn)/ParDo(ApplyPartitionFnFn)']
Process finished with exit code 1
--
This message was sent by Atlassian Jira
(v8.3.4#803005)