jonathaningram commented on issue #35318:
URL: https://github.com/apache/beam/issues/35318#issuecomment-2998390616

   This happened again when adding `timestamp_attribute` to my Pub/Sub write 
stage.
   
   Before:
   
   ```yaml
   - type: WriteToPubSub
     name: WriteToPubSub
     input: MyInput
     config:
       topic: "my-topic"
       format: JSON
       id_attribute: id
   ```
   
   After:
   
   ```yaml
   - type: WriteToPubSub
     name: WriteToPubSub
     input: MyInput
     config:
       topic: "my-topic"
       format: JSON
       id_attribute: id
       timestamp_attribute: publish_time
   ```
   
   I don't claim that this is anything to do with `timestamp_attribute` itself. 
   
   Stack is:
   
   ```
   Error message from worker: generic::unknown: Traceback (most recent call 
last):
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 313, in _execute
       response = task()
                  ^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 388, in <lambda>
       lambda: self.create_worker().do_instruction(request), request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 658, in do_instruction
       return getattr(self, request_type)(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 689, in process_bundle
       bundle_processor = self.bundle_processor_cache.get(
                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 512, in get
       processor = bundle_processor.BundleProcessor(
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1133, in __init__
       self.ops = self.create_execution_tree(self.process_bundle_descriptor)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1190, in create_execution_tree
       return collections.OrderedDict([(
                                      ^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1193, in <listcomp>
       get_operation(transform_id))) for transform_id in sorted(
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1038, in wrapper
       result = cache[args] = func(*args)
                              ^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1167, in get_operation
       transform_consumers = {
                             ^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1168, in <dictcomp>
       tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1168, in <listcomp>
       tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
             ^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1038, in wrapper
       result = cache[args] = func(*args)
                              ^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1167, in get_operation
       transform_consumers = {
                             ^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1168, in <dictcomp>
       tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1168, in <listcomp>
       tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
             ^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1038, in wrapper
       result = cache[args] = func(*args)
                              ^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1167, in get_operation
       transform_consumers = {
                             ^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1168, in <dictcomp>
       tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1168, in <listcomp>
       tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
             ^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1038, in wrapper
       result = cache[args] = func(*args)
                              ^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1167, in get_operation
       transform_consumers = {
                             ^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1168, in <dictcomp>
       tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1168, in <listcomp>
       tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
             ^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1038, in wrapper
       result = cache[args] = func(*args)
                              ^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1167, in get_operation
       transform_consumers = {
                             ^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1168, in <dictcomp>
       tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1168, in <listcomp>
       tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
             ^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1038, in wrapper
       result = cache[args] = func(*args)
                              ^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1178, in get_operation
       return transform_factory.create_operation(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1498, in create_operation
       return creator(self, transform_id, transform_proto, payload, consumers)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1614, in create_sink_runner
       return DataOutputOperation(
              ^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 146, in __init__
       self.windowed_coder_impl = windowed_coder.get_impl()
                                  ^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/coders/coders.py", line 
232, in get_impl
       self._impl = self._create_impl()
                    ^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/coders/coders.py", line 
1429, in _create_impl
       self.wrapped_value_coder.get_impl(),
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/coders/coders.py", line 
232, in get_impl
       self._impl = self._create_impl()
                    ^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/coders/coders.py", line 
1536, in _create_impl
       return coder_impl.LengthPrefixCoderImpl(self._value_coder.get_impl())
                                               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/coders/coders.py", line 
232, in get_impl
       self._impl = self._create_impl()
                    ^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/coders/row_coder.py", line 
79, in _create_impl
       return RowCoderImpl(self.schema, self.components)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "apache_beam/coders/coder_impl.py", line 1836, in 
apache_beam.coders.coder_impl.RowCoderImpl.__init__
   ValueError: Schema with id 7f9fccc6-aedf-4c47-8a31-7c607885c94b has 
encoding_positions_set=True,
               but not all fields have encoding_position set
   
   passed through:
   ==>
       dist_proc/dax/workflow/worker/fnapi_service_impl.cc:1341
   ```
   
   No idea what's going on here, but noting I didn't change any of the "custom" 
part of the pipeline.
   
   Not sure if it would help, but instead of the error message:
   
   ```
   ValueError: Schema with id 7f9fccc6-aedf-4c47-8a31-7c607885c94b has 
encoding_positions_set=True,
               but not all fields have encoding_position set
   ```
   
   Could it do something like:
   
   ```
   ValueError: Schema with id 7f9fccc6-aedf-4c47-8a31-7c607885c94b has 
encoding_positions_set=True,
               but found fields without encoding_position set: field1, field2, 
field3
   ```
   
   That is, show the fields that are at fault here. I doubt that would help me 
fix this myself, but maybe it would help you debug?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to