jonathaningram commented on issue #35318: URL: https://github.com/apache/beam/issues/35318#issuecomment-2998390616
This happened again when adding `timestamp_attribute` to my Pub/Sub write stage. Before: ```yaml - type: WriteToPubSub name: WriteToPubSub input: MyInput config: topic: "my-topic" format: JSON id_attribute: id ``` After: ```yaml - type: WriteToPubSub name: WriteToPubSub input: MyInput config: topic: "my-topic" format: JSON id_attribute: id timestamp_attribute: publish_time ``` I don't claim that this is anything to do with `timestamp_attribute` itself. Stack is: ``` Error message from worker: generic::unknown: Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 313, in _execute response = task() ^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 388, in <lambda> lambda: self.create_worker().do_instruction(request), request) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 658, in do_instruction return getattr(self, request_type)( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 689, in process_bundle bundle_processor = self.bundle_processor_cache.get( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 512, in get processor = bundle_processor.BundleProcessor( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1133, in __init__ self.ops = self.create_execution_tree(self.process_bundle_descriptor) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1190, in create_execution_tree return collections.OrderedDict([( ^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1193, in <listcomp> get_operation(transform_id))) for transform_id in sorted( ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1038, in wrapper result = cache[args] = func(*args) ^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1167, in get_operation transform_consumers = { ^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in <dictcomp> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in <listcomp> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] ^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1038, in wrapper result = cache[args] = func(*args) ^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1167, in get_operation transform_consumers = { ^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in <dictcomp> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in <listcomp> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] ^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1038, in wrapper result = cache[args] = func(*args) ^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1167, in get_operation transform_consumers = { ^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in <dictcomp> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in <listcomp> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] ^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1038, in wrapper result = cache[args] = func(*args) ^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1167, in get_operation transform_consumers = { ^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in <dictcomp> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in <listcomp> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] ^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1038, in wrapper result = cache[args] = func(*args) ^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1167, in get_operation transform_consumers = { ^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in <dictcomp> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in <listcomp> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] ^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1038, in wrapper result = cache[args] = func(*args) ^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1178, in get_operation return transform_factory.create_operation( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1498, in create_operation return creator(self, transform_id, transform_proto, payload, consumers) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1614, in create_sink_runner return DataOutputOperation( ^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 146, in __init__ self.windowed_coder_impl = windowed_coder.get_impl() ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/coders/coders.py", line 232, in get_impl self._impl = self._create_impl() ^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/coders/coders.py", line 1429, in _create_impl self.wrapped_value_coder.get_impl(), ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/coders/coders.py", line 232, in get_impl self._impl = self._create_impl() ^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/coders/coders.py", line 1536, in _create_impl return coder_impl.LengthPrefixCoderImpl(self._value_coder.get_impl()) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/coders/coders.py", line 232, in get_impl self._impl = self._create_impl() ^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/apache_beam/coders/row_coder.py", line 79, in _create_impl return RowCoderImpl(self.schema, self.components) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "apache_beam/coders/coder_impl.py", line 1836, in apache_beam.coders.coder_impl.RowCoderImpl.__init__ ValueError: Schema with id 7f9fccc6-aedf-4c47-8a31-7c607885c94b has encoding_positions_set=True, but not all fields have encoding_position set passed through: ==> dist_proc/dax/workflow/worker/fnapi_service_impl.cc:1341 ``` No idea what's going on here, but noting I didn't change any of the "custom" part of the pipeline. Not sure if it would help, but instead of the error message: ``` ValueError: Schema with id 7f9fccc6-aedf-4c47-8a31-7c607885c94b has encoding_positions_set=True, but not all fields have encoding_position set ``` Could it do something like: ``` ValueError: Schema with id 7f9fccc6-aedf-4c47-8a31-7c607885c94b has encoding_positions_set=True, but found fields without encoding_position set: field1, field2, field3 ``` That is, show the fields that are at fault here. I doubt that would help me fix this myself, but maybe it would help you debug? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org