[ 
https://issues.apache.org/jira/browse/BEAM-7981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16907753#comment-16907753
 ] 

Udi Meiri commented on BEAM-7981:
---------------------------------

I have not committed to working on it yet.




> ParDo function wrapper doesn't support Iterable output types
> ------------------------------------------------------------
>
>                 Key: BEAM-7981
>                 URL: https://issues.apache.org/jira/browse/BEAM-7981
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Udi Meiri
>            Priority: Major
>
> I believe the bug is in CallableWrapperDoFn.default_type_hints, which 
> converts Iterable[str] to str.
> This test will be included (commented out) in 
> https://github.com/apache/beam/pull/9283
> {code}
>   def test_typed_callable_iterable_output(self):
>     @typehints.with_input_types(int)
>     @typehints.with_output_types(typehints.Iterable[str])
>     def do_fn(element):
>       return [[str(element)] * 2]
>     result = [1, 2] | beam.ParDo(do_fn)
>     self.assertEqual([['1', '1'], ['2', '2']], sorted(result))
> {code}
> Result:
> {code}
> ======================================================================
> ERROR: test_typed_callable_iterable_output 
> (apache_beam.typehints.typed_pipeline_test.MainInputTest)
> ----------------------------------------------------------------------
> Traceback (most recent call last):
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/typehints/typed_pipeline_test.py",
>  line 104, in test_typed_callable_iterable_output
>     result = [1, 2] | beam.ParDo(do_fn)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/transforms/ptransform.py",
>  line 519, in __ror__
>     p.run().wait_until_finish()
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/pipeline.py", 
> line 406, in run
>     self._options).run(False)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/pipeline.py", 
> line 419, in run
>     return self.runner.run_pipeline(self, self._options)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/direct/direct_runner.py",
>  line 129, in run_pipeline
>     return runner.run_pipeline(pipeline, options)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
>  line 366, in run_pipeline
>     default_environment=self._default_environment))
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
>  line 373, in run_via_runner_api
>     return self.run_stages(stage_context, stages)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
>  line 455, in run_stages
>     stage_context.safe_coders)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
>  line 733, in _run_stage
>     result, splits = bundle_manager.process_bundle(data_input, data_output)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
>  line 1663, in process_bundle
>     part, expected_outputs), part_inputs):
>   File "/usr/lib/python3.7/concurrent/futures/_base.py", line 586, in 
> result_iterator
>     yield fs.pop().result()
>   File "/usr/lib/python3.7/concurrent/futures/_base.py", line 432, in result
>     return self.__get_result()
>   File "/usr/lib/python3.7/concurrent/futures/_base.py", line 384, in 
> __get_result
>     raise self._exception
>   File "/usr/lib/python3.7/concurrent/futures/thread.py", line 57, in run
>     result = self.fn(*self.args, **self.kwargs)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
>  line 1663, in <lambda>
>     part, expected_outputs), part_inputs):
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
>  line 1601, in process_bundle
>     result_future = self._worker_handler.control_conn.push(process_bundle_req)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
>  line 1080, in push
>     response = self.worker.do_instruction(request)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py",
>  line 343, in do_instruction
>     request.instruction_id)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py",
>  line 369, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py",
>  line 593, in process_bundle
>     data.ptransform_id].process_encoded(data.data)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py",
>  line 143, in process_encoded
>     self.output(decoded_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py",
>  line 256, in output
>     cython.cast(Receiver, 
> self.receivers[output_index]).receive(windowed_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py",
>  line 143, in receive
>     self.consumer.process(windowed_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py",
>  line 435, in process
>     self.output(windowed_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py",
>  line 256, in output
>     cython.cast(Receiver, 
> self.receivers[output_index]).receive(windowed_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py",
>  line 143, in receive
>     self.consumer.process(windowed_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py",
>  line 594, in process
>     delayed_application = self.dofn_receiver.receive(o)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py",
>  line 776, in receive
>     self.process(windowed_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py",
>  line 782, in process
>     self._reraise_augmented(exn)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py",
>  line 849, in _reraise_augmented
>     raise_with_traceback(new_exn)
>   File 
> "/usr/local/google/home/ehudm/virtualenvs/beam-py37/lib/python3.7/site-packages/future/utils/__init__.py",
>  line 419, in raise_with_traceback
>     raise exc.with_traceback(traceback)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py",
>  line 780, in process
>     return self.do_fn_invoker.invoke_process(windowed_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py",
>  line 441, in invoke_process
>     windowed_value, self.process_method(windowed_value.value))
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py",
>  line 919, in process_outputs
>     self.main_receivers.receive(windowed_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py",
>  line 142, in receive
>     self.update_counters_start(windowed_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py",
>  line 122, in update_counters_start
>     self.opcounter.update_from(windowed_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/opcounters.py",
>  line 196, in update_from
>     self.do_sample(windowed_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/opcounters.py",
>  line 214, in do_sample
>     self.coder_impl.get_estimated_size_and_observables(windowed_value))
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/coders/coder_impl.py",
>  line 1024, in get_estimated_size_and_observables
>     value.value, nested=nested))
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/coders/coder_impl.py",
>  line 220, in get_estimated_size_and_observables
>     return self.estimate_size(value, nested), []
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/coders/coder_impl.py",
>  line 212, in estimate_size
>     return self._get_nested_size(self._size_estimator(value), nested)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/coders/coders.py",
>  line 135, in estimate_size
>     return len(self.encode(value))
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/coders/coders.py",
>  line 326, in encode
>     return value.encode('utf-8')
> AttributeError: 'list' object has no attribute 'encode' [while running 
> 'ParDo(CallableWrapperDoFn)']
> -------------------- >> begin captured logging << --------------------
> root: INFO: Generating grammar tables from 
> /usr/lib/python3.7/lib2to3/Grammar.txt
> root: INFO: Generating grammar tables from 
> /usr/lib/python3.7/lib2to3/PatternGrammar.txt
> avro.schema: Level 5: Register new name for 'org.apache.avro.file.Header'
> avro.schema: Level 5: Register new name for 'org.apache.avro.file.magic'
> avro.schema: Level 5: Register new name for 'org.apache.avro.file.sync'
> root: INFO: ==================== <function annotate_downstream_side_inputs at 
> 0x7f3a39918158> ====================
> root: DEBUG: 3 [1, 1, 1]
> root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n  
> CreatePInput0/Read:beam:transform:read:v1\n  must follow: \n  
> downstream_side_inputs: ', 
> 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n  
> ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n  
> _MaterializeValues0:beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ']
> root: INFO: ==================== <function fix_side_input_pcoll_coders at 
> 0x7f3a39918268> ====================
> root: DEBUG: 3 [1, 1, 1]
> root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n  
> CreatePInput0/Read:beam:transform:read:v1\n  must follow: \n  
> downstream_side_inputs: ', 
> 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n  
> ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n  
> _MaterializeValues0:beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ']
> root: INFO: ==================== <function lift_combiners at 0x7f3a399182f0> 
> ====================
> root: DEBUG: 3 [1, 1, 1]
> root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n  
> CreatePInput0/Read:beam:transform:read:v1\n  must follow: \n  
> downstream_side_inputs: ', 
> 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n  
> ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n  
> _MaterializeValues0:beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ']
> root: INFO: ==================== <function expand_sdf at 0x7f3a39918378> 
> ====================
> root: DEBUG: 3 [1, 1, 1]
> root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n  
> CreatePInput0/Read:beam:transform:read:v1\n  must follow: \n  
> downstream_side_inputs: ', 
> 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n  
> ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n  
> _MaterializeValues0:beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ']
> root: INFO: ==================== <function expand_gbk at 0x7f3a39918400> 
> ====================
> root: DEBUG: 3 [1, 1, 1]
> root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n  
> CreatePInput0/Read:beam:transform:read:v1\n  must follow: \n  
> downstream_side_inputs: ', 
> 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n  
> ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n  
> _MaterializeValues0:beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ']
> root: INFO: ==================== <function sink_flattens at 0x7f3a39918510> 
> ====================
> root: DEBUG: 3 [1, 1, 1]
> root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n  
> CreatePInput0/Read:beam:transform:read:v1\n  must follow: \n  
> downstream_side_inputs: ', 
> 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n  
> ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n  
> _MaterializeValues0:beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ']
> root: INFO: ==================== <function greedily_fuse at 0x7f3a39918598> 
> ====================
> root: DEBUG: 1 [3]
> root: DEBUG: Stages: 
> ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n
>   
> CreatePInput0/Read:beam:transform:read:v1\nParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\n
>   must follow: \n  downstream_side_inputs: ']
> root: INFO: ==================== <function read_to_impulse at 0x7f3a39918620> 
> ====================
> root: DEBUG: 1 [4]
> root: DEBUG: Stages: 
> ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n
>   
> ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\nCreatePInput0/Read/Impulse:beam:transform:impulse:v1\nCreatePInput0/Read:beam:transform:read_from_impulse_python:v1\n
>   must follow: \n  downstream_side_inputs: ']
> root: INFO: ==================== <function impulse_to_input at 
> 0x7f3a399186a8> ====================
> root: DEBUG: 1 [4]
> root: DEBUG: Stages: 
> ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n
>   
> ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\nCreatePInput0/Read:beam:transform:read_from_impulse_python:v1\nCreatePInput0/Read/Impulse:beam:source:runner:0.1\n
>   must follow: \n  downstream_side_inputs: ']
> root: INFO: ==================== <function inject_timer_pcollections at 
> 0x7f3a39918840> ====================
> root: DEBUG: 1 [4]
> root: DEBUG: Stages: 
> ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n
>   
> ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\nCreatePInput0/Read:beam:transform:read_from_impulse_python:v1\nCreatePInput0/Read/Impulse:beam:source:runner:0.1\n
>   must follow: \n  downstream_side_inputs: ']
> root: INFO: ==================== <function sort_stages at 0x7f3a399188c8> 
> ====================
> root: DEBUG: 1 [4]
> root: DEBUG: Stages: 
> ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n
>   
> ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\nCreatePInput0/Read:beam:transform:read_from_impulse_python:v1\nCreatePInput0/Read/Impulse:beam:source:runner:0.1\n
>   must follow: \n  downstream_side_inputs: ']
> root: INFO: ==================== <function window_pcollection_coders at 
> 0x7f3a39918950> ====================
> root: DEBUG: 1 [4]
> root: DEBUG: Stages: 
> ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n
>   
> ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\nCreatePInput0/Read:beam:transform:read_from_impulse_python:v1\nCreatePInput0/Read/Impulse:beam:source:runner:0.1\n
>   must follow: \n  downstream_side_inputs: ']
> root: INFO: Running 
> ((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)
> root: DEBUG: start <DoOperation _MaterializeValues0 output_tags=['out'], 
> receivers=[ConsumerSet[_MaterializeValues0.out0, 
> coder=WindowedValueCoder[FastPrimitivesCoder], len(consumers)=0]]>
> root: DEBUG: start <DoOperation ParDo(CallableWrapperDoFn) 
> output_tags=['out'], 
> receivers=[SingletonConsumerSet[ParDo(CallableWrapperDoFn).out0, 
> coder=WindowedValueCoder[StrUtf8Coder], len(consumers)=1]]>
> root: DEBUG: start <ImpulseReadOperation 
> receivers=[SingletonConsumerSet[CreatePInput0/Read.out0, 
> coder=WindowedValueCoder[VarIntCoder], len(consumers)=1]]>
> root: DEBUG: start <DataInputOperation 
> receivers=[SingletonConsumerSet[CreatePInput0/Read/Impulse.out0, 
> coder=WindowedValueCoder[BytesCoder], len(consumers)=1]]>
> --------------------- >> end captured logging << ---------------------
> ----------------------------------------------------------------------
> Ran 1 test in 0.068s
> FAILED (errors=1)
> Error
> Traceback (most recent call last):
>   File "/usr/lib/python3.7/unittest/case.py", line 59, in testPartExecutor
>     yield
>   File "/usr/lib/python3.7/unittest/case.py", line 615, in run
>     testMethod()
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/typehints/typed_pipeline_test.py",
>  line 104, in test_typed_callable_iterable_output
>     result = [1, 2] | beam.ParDo(do_fn)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/transforms/ptransform.py",
>  line 519, in __ror__
>     p.run().wait_until_finish()
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/pipeline.py", 
> line 406, in run
>     self._options).run(False)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/pipeline.py", 
> line 419, in run
>     return self.runner.run_pipeline(self, self._options)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/direct/direct_runner.py",
>  line 129, in run_pipeline
>     return runner.run_pipeline(pipeline, options)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
>  line 366, in run_pipeline
>     default_environment=self._default_environment))
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
>  line 373, in run_via_runner_api
>     return self.run_stages(stage_context, stages)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
>  line 455, in run_stages
>     stage_context.safe_coders)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
>  line 733, in _run_stage
>     result, splits = bundle_manager.process_bundle(data_input, data_output)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
>  line 1663, in process_bundle
>     part, expected_outputs), part_inputs):
>   File "/usr/lib/python3.7/concurrent/futures/_base.py", line 586, in 
> result_iterator
>     yield fs.pop().result()
>   File "/usr/lib/python3.7/concurrent/futures/_base.py", line 432, in result
>     return self.__get_result()
>   File "/usr/lib/python3.7/concurrent/futures/_base.py", line 384, in 
> __get_result
>     raise self._exception
>   File "/usr/lib/python3.7/concurrent/futures/thread.py", line 57, in run
>     result = self.fn(*self.args, **self.kwargs)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
>  line 1663, in <lambda>
>     part, expected_outputs), part_inputs):
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
>  line 1601, in process_bundle
>     result_future = self._worker_handler.control_conn.push(process_bundle_req)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
>  line 1080, in push
>     response = self.worker.do_instruction(request)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py",
>  line 343, in do_instruction
>     request.instruction_id)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py",
>  line 369, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py",
>  line 593, in process_bundle
>     data.ptransform_id].process_encoded(data.data)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py",
>  line 143, in process_encoded
>     self.output(decoded_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py",
>  line 256, in output
>     cython.cast(Receiver, 
> self.receivers[output_index]).receive(windowed_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py",
>  line 143, in receive
>     self.consumer.process(windowed_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py",
>  line 435, in process
>     self.output(windowed_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py",
>  line 256, in output
>     cython.cast(Receiver, 
> self.receivers[output_index]).receive(windowed_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py",
>  line 143, in receive
>     self.consumer.process(windowed_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py",
>  line 594, in process
>     delayed_application = self.dofn_receiver.receive(o)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py",
>  line 776, in receive
>     self.process(windowed_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py",
>  line 782, in process
>     self._reraise_augmented(exn)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py",
>  line 849, in _reraise_augmented
>     raise_with_traceback(new_exn)
>   File 
> "/usr/local/google/home/ehudm/virtualenvs/beam-py37/lib/python3.7/site-packages/future/utils/__init__.py",
>  line 419, in raise_with_traceback
>     raise exc.with_traceback(traceback)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py",
>  line 780, in process
>     return self.do_fn_invoker.invoke_process(windowed_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py",
>  line 441, in invoke_process
>     windowed_value, self.process_method(windowed_value.value))
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py",
>  line 919, in process_outputs
>     self.main_receivers.receive(windowed_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py",
>  line 142, in receive
>     self.update_counters_start(windowed_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py",
>  line 122, in update_counters_start
>     self.opcounter.update_from(windowed_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/opcounters.py",
>  line 196, in update_from
>     self.do_sample(windowed_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/opcounters.py",
>  line 214, in do_sample
>     self.coder_impl.get_estimated_size_and_observables(windowed_value))
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/coders/coder_impl.py",
>  line 1024, in get_estimated_size_and_observables
>     value.value, nested=nested))
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/coders/coder_impl.py",
>  line 220, in get_estimated_size_and_observables
>     return self.estimate_size(value, nested), []
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/coders/coder_impl.py",
>  line 212, in estimate_size
>     return self._get_nested_size(self._size_estimator(value), nested)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/coders/coders.py",
>  line 135, in estimate_size
>     return len(self.encode(value))
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/coders/coders.py",
>  line 326, in encode
>     return value.encode('utf-8')
> Exception: 'list' object has no attribute 'encode' [while running 
> 'ParDo(CallableWrapperDoFn)']
> -------------------- >> begin captured logging << --------------------
> root: INFO: Generating grammar tables from 
> /usr/lib/python3.7/lib2to3/Grammar.txt
> root: INFO: Generating grammar tables from 
> /usr/lib/python3.7/lib2to3/PatternGrammar.txt
> avro.schema: Level 5: Register new name for 'org.apache.avro.file.Header'
> avro.schema: Level 5: Register new name for 'org.apache.avro.file.magic'
> avro.schema: Level 5: Register new name for 'org.apache.avro.file.sync'
> root: INFO: ==================== <function annotate_downstream_side_inputs at 
> 0x7f3a39918158> ====================
> root: DEBUG: 3 [1, 1, 1]
> root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n  
> CreatePInput0/Read:beam:transform:read:v1\n  must follow: \n  
> downstream_side_inputs: ', 
> 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n  
> ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n  
> _MaterializeValues0:beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ']
> root: INFO: ==================== <function fix_side_input_pcoll_coders at 
> 0x7f3a39918268> ====================
> root: DEBUG: 3 [1, 1, 1]
> root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n  
> CreatePInput0/Read:beam:transform:read:v1\n  must follow: \n  
> downstream_side_inputs: ', 
> 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n  
> ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n  
> _MaterializeValues0:beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ']
> root: INFO: ==================== <function lift_combiners at 0x7f3a399182f0> 
> ====================
> root: DEBUG: 3 [1, 1, 1]
> root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n  
> CreatePInput0/Read:beam:transform:read:v1\n  must follow: \n  
> downstream_side_inputs: ', 
> 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n  
> ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n  
> _MaterializeValues0:beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ']
> root: INFO: ==================== <function expand_sdf at 0x7f3a39918378> 
> ====================
> root: DEBUG: 3 [1, 1, 1]
> root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n  
> CreatePInput0/Read:beam:transform:read:v1\n  must follow: \n  
> downstream_side_inputs: ', 
> 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n  
> ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n  
> _MaterializeValues0:beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ']
> root: INFO: ==================== <function expand_gbk at 0x7f3a39918400> 
> ====================
> root: DEBUG: 3 [1, 1, 1]
> root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n  
> CreatePInput0/Read:beam:transform:read:v1\n  must follow: \n  
> downstream_side_inputs: ', 
> 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n  
> ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n  
> _MaterializeValues0:beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ']
> root: INFO: ==================== <function sink_flattens at 0x7f3a39918510> 
> ====================
> root: DEBUG: 3 [1, 1, 1]
> root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n  
> CreatePInput0/Read:beam:transform:read:v1\n  must follow: \n  
> downstream_side_inputs: ', 
> 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n  
> ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n  
> _MaterializeValues0:beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ']
> root: INFO: ==================== <function greedily_fuse at 0x7f3a39918598> 
> ====================
> root: DEBUG: 1 [3]
> root: DEBUG: Stages: 
> ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n
>   
> CreatePInput0/Read:beam:transform:read:v1\nParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\n
>   must follow: \n  downstream_side_inputs: ']
> root: INFO: ==================== <function read_to_impulse at 0x7f3a39918620> 
> ====================
> root: DEBUG: 1 [4]
> root: DEBUG: Stages: 
> ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n
>   
> ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\nCreatePInput0/Read/Impulse:beam:transform:impulse:v1\nCreatePInput0/Read:beam:transform:read_from_impulse_python:v1\n
>   must follow: \n  downstream_side_inputs: ']
> root: INFO: ==================== <function impulse_to_input at 
> 0x7f3a399186a8> ====================
> root: DEBUG: 1 [4]
> root: DEBUG: Stages: 
> ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n
>   
> ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\nCreatePInput0/Read:beam:transform:read_from_impulse_python:v1\nCreatePInput0/Read/Impulse:beam:source:runner:0.1\n
>   must follow: \n  downstream_side_inputs: ']
> root: INFO: ==================== <function inject_timer_pcollections at 
> 0x7f3a39918840> ====================
> root: DEBUG: 1 [4]
> root: DEBUG: Stages: 
> ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n
>   
> ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\nCreatePInput0/Read:beam:transform:read_from_impulse_python:v1\nCreatePInput0/Read/Impulse:beam:source:runner:0.1\n
>   must follow: \n  downstream_side_inputs: ']
> root: INFO: ==================== <function sort_stages at 0x7f3a399188c8> 
> ====================
> root: DEBUG: 1 [4]
> root: DEBUG: Stages: 
> ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n
>   
> ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\nCreatePInput0/Read:beam:transform:read_from_impulse_python:v1\nCreatePInput0/Read/Impulse:beam:source:runner:0.1\n
>   must follow: \n  downstream_side_inputs: ']
> root: INFO: ==================== <function window_pcollection_coders at 
> 0x7f3a39918950> ====================
> root: DEBUG: 1 [4]
> root: DEBUG: Stages: 
> ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n
>   
> ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\nCreatePInput0/Read:beam:transform:read_from_impulse_python:v1\nCreatePInput0/Read/Impulse:beam:source:runner:0.1\n
>   must follow: \n  downstream_side_inputs: ']
> root: INFO: Running 
> ((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)
> root: DEBUG: start <DoOperation _MaterializeValues0 output_tags=['out'], 
> receivers=[ConsumerSet[_MaterializeValues0.out0, 
> coder=WindowedValueCoder[FastPrimitivesCoder], len(consumers)=0]]>
> root: DEBUG: start <DoOperation ParDo(CallableWrapperDoFn) 
> output_tags=['out'], 
> receivers=[SingletonConsumerSet[ParDo(CallableWrapperDoFn).out0, 
> coder=WindowedValueCoder[StrUtf8Coder], len(consumers)=1]]>
> root: DEBUG: start <ImpulseReadOperation 
> receivers=[SingletonConsumerSet[CreatePInput0/Read.out0, 
> coder=WindowedValueCoder[VarIntCoder], len(consumers)=1]]>
> root: DEBUG: start <DataInputOperation 
> receivers=[SingletonConsumerSet[CreatePInput0/Read/Impulse.out0, 
> coder=WindowedValueCoder[BytesCoder], len(consumers)=1]]>
> --------------------- >> end captured logging << ---------------------
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to