Heejong Lee created BEAM-10696:
----------------------------------
Summary: unable to explicitly use element coder in
CombiningValueStateSpec constructor
Key: BEAM-10696
URL: https://issues.apache.org/jira/browse/BEAM-10696
Project: Beam
Issue Type: Bug
Components: sdk-py-core
Reporter: Heejong Lee
looks like the example:
COUNT_STATE = CombiningValueStateSpec('count',
VarIntCoder(),
combiners.SumCombineFn())
in [https://beam.apache.org/blog/timely-processing/] doesn't work because of
coder error.
{code:java}
Error message from worker: generic::unknown: Traceback (most recent call last):
File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 256, in _execute
response = task()
File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 313, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 483, in do_instruction
getattr(request, request_type), request.instruction_id)
File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 518, in process_bundle
bundle_processor.process_bundle(instruction_id))
File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 983, in process_bundle
op.finish()
File "apache_beam/runners/worker/operations.py", line 697, in
apache_beam.runners.worker.operations.DoOperation.finish
File "apache_beam/runners/worker/operations.py", line 699, in
apache_beam.runners.worker.operations.DoOperation.finish
File "apache_beam/runners/worker/operations.py", line 702, in
apache_beam.runners.worker.operations.DoOperation.finish
File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 775, in commit
state.commit()
File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 492, in commit
self._underlying_bag_state.commit()
File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 553, in commit
self._state_key, self._value_coder.get_impl(), self._added_elements)
File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 1012, in extend
coder.encode_to_stream(element, out, True)
File "apache_beam/coders/coder_impl.py", line 777, in
apache_beam.coders.coder_impl.VarIntCoderImpl.encode_to_stream
File "apache_beam/coders/coder_impl.py", line 779, in
apache_beam.coders.coder_impl.VarIntCoderImpl.encode_to_stream
TypeError: an integer is required
{code}
The actual element type here was a list of int, not just int.
The API document mentions that
{code:java}
coder (Coder): Coder specifying how to encode the values to be combined.
May be inferred.
{code}
which is seemingly wrong.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)