[ 
https://issues.apache.org/jira/browse/BEAM-5953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16702212#comment-16702212
 ] 

Mark Liu commented on BEAM-5953:
--------------------------------

Here is new breakage in sdk harness when coders are deserialized from proto:

{code}
Error processing instruction -26. Original traceback is Traceback (most recent 
call last):
File 
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 242, in
                get_bundle_processor processor = 
self.cached_bundle_processors[bundle_descriptor_id].pop() IndexError: pop from 
empty list During handling of the above exception, another exception occurred: 
Traceback (most recent call last):
File 
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 132, in
                _execute response = task()
File 
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 167, in
                <lambda> self._execute(lambda: worker.do_instruction(work), 
work)
File 
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 216, in
                do_instruction request.instruction_id)
File 
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 230, in
                process_bundle request.process_bundle_descriptor_reference) as 
bundle_processor:
File "/usr/local/lib/python3.5/contextlib.py", line 59, in
                __enter__ return next(self.gen)
File 
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 251, in
                get_bundle_processor self.data_channel_factory)
File 
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 351, in
                __init__ self.ops = 
self.create_execution_tree(self.process_bundle_descriptor)
File 
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 395, in
                create_execution_tree descriptor.transforms, 
key=topological_height, reverse=True)])
File 
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 394, in
                <listcomp> for transform_id in sorted(
File 
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 328, in
                wrapper result = cache[args] = func(*args)
File 
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 378, in
                get_operation in 
descriptor.transforms[transform_id].outputs.items()
File 
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 377, in
                <dictcomp> for tag, pcoll_id
File 
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 376, in
                <listcomp> tag: [get_operation(op) for op in 
pcoll_consumers[pcoll_id]]
File 
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 328, in
                wrapper result = cache[args] = func(*args)
File 
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 378, in
                get_operation in 
descriptor.transforms[transform_id].outputs.items()
File 
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 377, in
                <dictcomp> for tag, pcoll_id
File 
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 376, in
                <listcomp> tag: [get_operation(op) for op in 
pcoll_consumers[pcoll_id]]
File 
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 328, in
                wrapper result = cache[args] = func(*args)
File 
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 378, in
                get_operation in 
descriptor.transforms[transform_id].outputs.items()
File 
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 377, in
                <dictcomp> for tag, pcoll_id
File 
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 376, in
                <listcomp> tag: [get_operation(op) for op in 
pcoll_consumers[pcoll_id]]
File 
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 328, in
                wrapper result = cache[args] = func(*args)
File 
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 378, in
                get_operation in 
descriptor.transforms[transform_id].outputs.items()
File 
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 377, in
                <dictcomp> for tag, pcoll_id
File 
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 376, in
                <listcomp> tag: [get_operation(op) for op in 
pcoll_consumers[pcoll_id]]
File 
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 328, in
                wrapper result = cache[args] = func(*args)
File 
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 381, in
                get_operation transform_id, transform_consumers)
File 
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 524, in
                create_operation return creator(self, transform_id, 
transform_proto, payload, consumers)
File 
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 636, in
                create output_coder = 
factory.get_only_input_coder(transform_proto)
File 
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 564, in
                get_only_input_coder return 
only_element(list(self.get_input_coders(transform_proto).values()))
File 
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 560, in
                get_input_coders for tag, pcoll_id in 
transform_proto.inputs.items()
File 
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 560, in
                <dictcomp> for tag, pcoll_id in transform_proto.inputs.items()
File 
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 538, in
                get_windowed_coder coder = 
self.get_coder(self.descriptor.pcollections[pcoll_id].coder_id)
File 
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 535, in
                get_coder json.loads(coder_proto.spec.spec.payload))
File "/usr/local/lib/python3.5/json/__init__.py", line 312, in
                loads s.__class__.__name__)) TypeError: the JSON object must be 
str, not 'bytes'
{code}

I tried to use a hacky way to work around the issue at first by decode 
coder_proto.spec.spec.payload, but got similar error right after it in 
get_coder_from_spec.

> Support DataflowRunner on Python 3
> ----------------------------------
>
>                 Key: BEAM-5953
>                 URL: https://issues.apache.org/jira/browse/BEAM-5953
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-py-core
>            Reporter: Mark Liu
>            Assignee: Mark Liu
>            Priority: Major
>          Time Spent: 1h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to