[
https://issues.apache.org/jira/browse/BEAM-5953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16702212#comment-16702212
]
Mark Liu commented on BEAM-5953:
--------------------------------
Here is new breakage in sdk harness when coders are deserialized from proto:
{code}
Error processing instruction -26. Original traceback is Traceback (most recent
call last):
File
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 242, in
get_bundle_processor processor =
self.cached_bundle_processors[bundle_descriptor_id].pop() IndexError: pop from
empty list During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 132, in
_execute response = task()
File
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 167, in
<lambda> self._execute(lambda: worker.do_instruction(work),
work)
File
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 216, in
do_instruction request.instruction_id)
File
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 230, in
process_bundle request.process_bundle_descriptor_reference) as
bundle_processor:
File "/usr/local/lib/python3.5/contextlib.py", line 59, in
__enter__ return next(self.gen)
File
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 251, in
get_bundle_processor self.data_channel_factory)
File
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 351, in
__init__ self.ops =
self.create_execution_tree(self.process_bundle_descriptor)
File
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 395, in
create_execution_tree descriptor.transforms,
key=topological_height, reverse=True)])
File
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 394, in
<listcomp> for transform_id in sorted(
File
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 328, in
wrapper result = cache[args] = func(*args)
File
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 378, in
get_operation in
descriptor.transforms[transform_id].outputs.items()
File
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 377, in
<dictcomp> for tag, pcoll_id
File
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 376, in
<listcomp> tag: [get_operation(op) for op in
pcoll_consumers[pcoll_id]]
File
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 328, in
wrapper result = cache[args] = func(*args)
File
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 378, in
get_operation in
descriptor.transforms[transform_id].outputs.items()
File
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 377, in
<dictcomp> for tag, pcoll_id
File
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 376, in
<listcomp> tag: [get_operation(op) for op in
pcoll_consumers[pcoll_id]]
File
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 328, in
wrapper result = cache[args] = func(*args)
File
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 378, in
get_operation in
descriptor.transforms[transform_id].outputs.items()
File
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 377, in
<dictcomp> for tag, pcoll_id
File
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 376, in
<listcomp> tag: [get_operation(op) for op in
pcoll_consumers[pcoll_id]]
File
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 328, in
wrapper result = cache[args] = func(*args)
File
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 378, in
get_operation in
descriptor.transforms[transform_id].outputs.items()
File
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 377, in
<dictcomp> for tag, pcoll_id
File
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 376, in
<listcomp> tag: [get_operation(op) for op in
pcoll_consumers[pcoll_id]]
File
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 328, in
wrapper result = cache[args] = func(*args)
File
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 381, in
get_operation transform_id, transform_consumers)
File
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 524, in
create_operation return creator(self, transform_id,
transform_proto, payload, consumers)
File
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 636, in
create output_coder =
factory.get_only_input_coder(transform_proto)
File
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 564, in
get_only_input_coder return
only_element(list(self.get_input_coders(transform_proto).values()))
File
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 560, in
get_input_coders for tag, pcoll_id in
transform_proto.inputs.items()
File
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 560, in
<dictcomp> for tag, pcoll_id in transform_proto.inputs.items()
File
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 538, in
get_windowed_coder coder =
self.get_coder(self.descriptor.pcollections[pcoll_id].coder_id)
File
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 535, in
get_coder json.loads(coder_proto.spec.spec.payload))
File "/usr/local/lib/python3.5/json/__init__.py", line 312, in
loads s.__class__.__name__)) TypeError: the JSON object must be
str, not 'bytes'
{code}
I tried to use a hacky way to work around the issue at first by decode
coder_proto.spec.spec.payload, but got similar error right after it in
get_coder_from_spec.
> Support DataflowRunner on Python 3
> ----------------------------------
>
> Key: BEAM-5953
> URL: https://issues.apache.org/jira/browse/BEAM-5953
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-py-core
> Reporter: Mark Liu
> Assignee: Mark Liu
> Priority: Major
> Time Spent: 1h 10m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)