Joseph Toth created BEAM-11158:
----------------------------------

             Summary: Side Inputs to beam.Partition
                 Key: BEAM-11158
                 URL: https://issues.apache.org/jira/browse/BEAM-11158
             Project: Beam
          Issue Type: Bug
          Components: sdk-py-core
            Reporter: Joseph Toth


Side inputs work with a regular ParDo and function, but I can't seem to get it 
to work with beam.Partition. The code and exception below demonstrates the 
problem.

```
import apache_beam as beam

def main():

  class SideFn(beam.PartitionFn):
    def partition_for(self, element, *args, **kwargs):
      print(element, args, kwargs)

  def just_print(element, *args, **kwargs):
      print(element, args, kwargs)

  with beam.Pipeline() as p:
      side = p | 'CreateSide' >> beam.Create(['a'])
      p | beam.Create([1, 2, 3]) | beam.Partition(SideFn(), 99,  
side=beam.pvalue.AsSingleton(side))
      # p | beam.Create([1, 2, 3]) | beam.ParDo(just_print, 99,  
side=beam.pvalue.AsSingleton(side))


if __name__ == '__main__':
    main()
```



/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/bin/python 
/Users/joetoth/projects/joetoth.com/psy/part.py
Traceback (most recent call last):
  File 
"/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/internal/util.py",
 line 134, in <genexpr>
    (k, next(v_iter)) if isinstance(v, ArgumentPlaceholder) else (k, v) for k,
StopIteration

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1213, in 
apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 742, in 
apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 804, in 
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File 
"/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/internal/util.py",
 line 133, in insert_values_in_args
    new_kwargs = dict(
RuntimeError: generator raised StopIteration

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/joetoth/projects/joetoth.com/psy/part.py", line 19, in <module>
    main()
  File "/Users/joetoth/projects/joetoth.com/psy/part.py", line 14, in main
    p | beam.Create([1, 2, 3]) | beam.Partition(SideFn(), 99,  
side=beam.pvalue.AsSingleton(side))
  File 
"/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/pipeline.py",
 line 568, in __exit__
    self.result = self.run()
  File 
"/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/pipeline.py",
 line 547, in run
    return self.runner.run_pipeline(self, self._options)
  File 
"/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/direct/direct_runner.py",
 line 119, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File 
"/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 175, in run_pipeline
    self._latest_run_result = self.run_via_runner_api(
  File 
"/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 186, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File 
"/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 344, in run_stages
    stage_results = self._run_stage(
  File 
"/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 527, in _run_stage
    last_result, deferred_inputs, fired_timers = self._run_bundle(
  File 
"/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 571, in _run_bundle
    result, splits = bundle_manager.process_bundle(
  File 
"/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 852, in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
  File 
"/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py",
 line 353, in push
    response = self.worker.do_instruction(request)
  File 
"/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 483, in do_instruction
    return getattr(self, request_type)(
  File 
"/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 519, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File 
"/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 984, in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(
  File 
"/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 221, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 354, in 
apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 356, in 
apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 218, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in 
apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in 
apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in 
apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1279, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1213, in 
apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 569, in 
apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1374, in 
apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 218, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in 
apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in 
apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in 
apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1279, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1213, in 
apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 569, in 
apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1374, in 
apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 218, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in 
apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in 
apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in 
apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1279, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1213, in 
apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 569, in 
apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1374, in 
apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 218, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in 
apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in 
apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in 
apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1294, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
  File 
"/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/future/utils/__init__.py",
 line 446, in raise_with_traceback
    raise exc.with_traceback(traceback)
  File "apache_beam/runners/common.py", line 1213, in 
apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 742, in 
apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 804, in 
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File 
"/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/internal/util.py",
 line 133, in insert_values_in_args
    new_kwargs = dict(
RuntimeError: generator raised StopIteration [while running 
'Partition(SideFn)/ParDo(ApplyPartitionFnFn)/ParDo(ApplyPartitionFnFn)']

Process finished with exit code 1




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to