[
https://issues.apache.org/jira/browse/BEAM-12959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kenneth Knowles updated BEAM-12959:
-----------------------------------
Priority: P1 (was: P0)
> Dataflow error in CombinePerKey operation
> -----------------------------------------
>
> Key: BEAM-12959
> URL: https://issues.apache.org/jira/browse/BEAM-12959
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Affects Versions: 2.30.0, 2.31.0, 2.32.0
> Reporter: Eddie Wang
> Priority: P1
>
> This occurs in Dataflow when trying to deploy a workflow from Pubsub ->
> SlidingWindows -> beam.ParDo(KeyValues()) -> beam.GroupByKey ->
> beam.CombinePerKey
> {code:bash}
> During handling of the above exception, another exception occurred:
> Traceback (most recent call last):
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 284, in _execute
> response = task()
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 357, in <lambda>
> lambda: self.create_worker().do_instruction(request), request)
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 602, in do_instruction
> getattr(request, request_type), request.instruction_id)
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 633, in process_bundle
> instruction_id, request.process_bundle_descriptor_id)
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 462, in get
> self.data_channel_factory)
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 862, in __init__
> self.ops = self.create_execution_tree(self.process_bundle_descriptor)
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 919, in create_execution_tree
> descriptor.transforms, key=topological_height, reverse=True)])
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 918, in <listcomp>
> get_operation(transform_id))) for transform_id in sorted(
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 806, in wrapper
> result = cache[args] = func(*args)
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 900, in get_operation
> pcoll_id in descriptor.transforms[transform_id].outputs.items()
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 900, in <dictcomp>
> pcoll_id in descriptor.transforms[transform_id].outputs.items()
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 898, in <listcomp>
> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 806, in wrapper
> result = cache[args] = func(*args)
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 900, in get_operation
> pcoll_id in descriptor.transforms[transform_id].outputs.items()
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 900, in <dictcomp>
> pcoll_id in descriptor.transforms[transform_id].outputs.items()
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 898, in <listcomp>
> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 806, in wrapper
> result = cache[args] = func(*args)
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 900, in get_operation
> pcoll_id in descriptor.transforms[transform_id].outputs.items()
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 900, in <dictcomp>
> pcoll_id in descriptor.transforms[transform_id].outputs.items()
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 898, in <listcomp>
> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 806, in wrapper
> result = cache[args] = func(*args)
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 900, in get_operation
> pcoll_id in descriptor.transforms[transform_id].outputs.items()
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 900, in <dictcomp>
> pcoll_id in descriptor.transforms[transform_id].outputs.items()
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 898, in <listcomp>
> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 806, in wrapper
> result = cache[args] = func(*args)
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 903, in get_operation
> transform_id, transform_consumers)
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 1192, in create_operation
> return creator(self, transform_id, transform_proto, payload, consumers)
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 1776, in create_combine_per_key_convert_to_accumulators
> factory, transform_id, transform_proto, payload, consumers, 'convert')
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 1798, in _create_combine_phase_operation
> factory.context), [], {}))
> File "/usr/local/lib/python3.7/site-packages/apache_beam/utils/urns.py",
> line 186, in from_runner_api
> proto_utils.parse_Bytes(fn_proto.payload, parameter_type), context)
> File "/usr/local/lib/python3.7/site-packages/apache_beam/utils/urns.py",
> line 160, in <lambda>
> unused_context: pickler.loads(proto.value))
> File
> "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py",
> line 287, in loads
> return dill.loads(s)
> File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 275, in
> loads
> return load(file, ignore, **kwds)
> File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 270, in
> load
> return Unpickler(file, ignore=ignore, **kwds).load()
> File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 472, in
> load
> obj = StockUnpickler.load(self)
> TypeError: code() takes at most 15 arguments (16 given)
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)