damondouglas commented on PR #33438:
URL: https://github.com/apache/beam/pull/33438#issuecomment-2560467372

   Python sends an empty string for worker_id key in the gRPC metadata. 
Debugging the `workerFromMetadataCtx` method, I see a worker_id sent over when 
running `python apache_beam/examples/wordcount.py`, except for a single 
instance.
   
   The error `worker I'd in ctx metadata is an empty string` is specific to `id 
== ""`. This is a conditional after `grpcx.ReadWorkerID` successfully passes 
all checks for available `metadata.FromIncomingContext`, and the `metadata.MD` 
has the `worker_id` key, and finally that the length of the 
`metadata.MD[worker_id]` is 1.
   
   <summary>
   
   `worker id in ctx metadata is an empty string`
   
   <details>
   ```
   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC 
that terminated with:
        status = StatusCode.UNKNOWN
        details = "worker id in ctx metadata is an empty string"
        debug_error_string = "UNKNOWN:Error received from peer  
{grpc_message:"worker id in ctx metadata is an empty string", grpc_status:2, 
created_time:"2024-12-23T16:08:00.205761402-08:00"}"
   >
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File "sdks/python/apache_beam/runners/worker/sdk_worker.py", line 311, in 
_execute
       response = task()
     File "sdks/python/apache_beam/runners/worker/sdk_worker.py", line 386, in 
<lambda>
       lambda: self.create_worker().do_instruction(request), request)
     File "sdks/python/apache_beam/runners/worker/sdk_worker.py", line 657, in 
do_instruction
       return getattr(self, request_type)(
     File "sdks/python/apache_beam/runners/worker/sdk_worker.py", line 694, in 
process_bundle
       bundle_processor.process_bundle(instruction_id))
     File "sdks/python/apache_beam/runners/worker/bundle_processor.py", line 
1274, in process_bundle
       input_op_by_transform_id[element.transform_id].process_encoded(
     File "sdks/python/apache_beam/runners/worker/bundle_processor.py", line 
237, in process_encoded
       self.output(decoded_value)
     File "apache_beam/runners/worker/operations.py", line 567, in 
apache_beam.runners.worker.operations.Operation.output
     File "apache_beam/runners/worker/operations.py", line 569, in 
apache_beam.runners.worker.operations.Operation.output
     File "apache_beam/runners/worker/operations.py", line 260, in 
apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
     File "apache_beam/runners/worker/operations.py", line 263, in 
apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
     File "apache_beam/runners/worker/operations.py", line 1159, in 
apache_beam.runners.worker.operations.CombineOperation.process
     File "apache_beam/runners/worker/operations.py", line 1163, in 
apache_beam.runners.worker.operations.CombineOperation.process
     File "apache_beam/runners/worker/operations.py", line 569, in 
apache_beam.runners.worker.operations.Operation.output
     File "apache_beam/runners/worker/operations.py", line 260, in 
apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
     File "apache_beam/runners/worker/operations.py", line 263, in 
apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
     File "apache_beam/runners/worker/operations.py", line 1159, in 
apache_beam.runners.worker.operations.CombineOperation.process
     File "apache_beam/runners/worker/operations.py", line 1163, in 
apache_beam.runners.worker.operations.CombineOperation.process
     File "apache_beam/runners/worker/operations.py", line 569, in 
apache_beam.runners.worker.operations.Operation.output
     File "apache_beam/runners/worker/operations.py", line 260, in 
apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
     File "apache_beam/runners/worker/operations.py", line 263, in 
apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
     File "apache_beam/runners/worker/operations.py", line 950, in 
apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam/runners/worker/operations.py", line 951, in 
apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam/runners/common.py", line 1503, in 
apache_beam.runners.common.DoFnRunner.process
     File "apache_beam/runners/common.py", line 1591, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
     File "apache_beam/runners/common.py", line 1501, in 
apache_beam.runners.common.DoFnRunner.process
     File "apache_beam/runners/common.py", line 689, in 
apache_beam.runners.common.SimpleInvoker.invoke_process
     File "apache_beam/runners/common.py", line 1686, in 
apache_beam.runners.common._OutputHandler.handle_process_outputs
     File "apache_beam/runners/common.py", line 1799, in 
apache_beam.runners.common._OutputHandler._write_value_to_tag
     File "apache_beam/runners/worker/operations.py", line 263, in 
apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
     File "apache_beam/runners/worker/operations.py", line 950, in 
apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam/runners/worker/operations.py", line 951, in 
apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam/runners/common.py", line 1503, in 
apache_beam.runners.common.DoFnRunner.process
     File "apache_beam/runners/common.py", line 1591, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
     File "apache_beam/runners/common.py", line 1501, in 
apache_beam.runners.common.DoFnRunner.process
     File "apache_beam/runners/common.py", line 917, in 
apache_beam.runners.common.PerWindowInvoker.invoke_process
     File "apache_beam/runners/common.py", line 1059, in 
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
     File "apache_beam/runners/common.py", line 1686, in 
apache_beam.runners.common._OutputHandler.handle_process_outputs
     File "apache_beam/runners/common.py", line 1799, in 
apache_beam.runners.common._OutputHandler._write_value_to_tag
     File "apache_beam/runners/worker/operations.py", line 263, in 
apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
     File "apache_beam/runners/worker/operations.py", line 950, in 
apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam/runners/worker/operations.py", line 951, in 
apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam/runners/common.py", line 1503, in 
apache_beam.runners.common.DoFnRunner.process
     File "apache_beam/runners/common.py", line 1612, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
     File "apache_beam/runners/common.py", line 1501, in 
apache_beam.runners.common.DoFnRunner.process
     File "apache_beam/runners/common.py", line 917, in 
apache_beam.runners.common.PerWindowInvoker.invoke_process
     File "apache_beam/runners/common.py", line 1000, in 
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
     File "sdks/python/apache_beam/runners/worker/bundle_processor.py", line 
499, in __getitem__
       self._cache[target_window] = self._side_input_data.view_fn(raw_view)
     File "sdks/python/apache_beam/pvalue.py", line 389, in <lambda>
       lambda iterable: from_runtime_iterable(iterable, view_options))
     File "sdks/python/apache_beam/pvalue.py", line 509, in 
_from_runtime_iterable
       head = list(itertools.islice(it, 2))
     File "sdks/python/apache_beam/runners/worker/sdk_worker.py", line 1289, in 
_lazy_iterator
       input_stream, continuation_token = self._get_raw(
     File "sdks/python/apache_beam/runners/worker/sdk_worker.py", line 1307, in 
_get_raw
       self._underlying.get_raw(state_key, continuation_token))
     File "sdks/python/apache_beam/runners/worker/sdk_worker.py", line 1094, in 
get_raw
       response = self._blocking_request(
     File "sdks/python/apache_beam/runners/worker/sdk_worker.py", line 1134, in 
_blocking_request
       raise self._exception
     File ".pyenv/versions/3.10.15/lib/python3.10/threading.py", line 1016, in 
_bootstrap_inner
       self.run()
     File ".pyenv/versions/3.10.15/lib/python3.10/threading.py", line 953, in 
run
       self._target(*self._args, **self._kwargs)
     File "sdks/python/apache_beam/runners/worker/sdk_worker.py", line 1069, in 
pull_responses
       for response in responses:
     File "sdks/python/.venv/lib/python3.10/site-packages/grpc/_channel.py", 
line 543, in __next__
       return self._next()
     File "sdks/python/.venv/lib/python3.10/site-packages/grpc/_channel.py", 
line 969, in _next
       raise self
   RuntimeError: grpc._channel._MultiThreadedRendezvous: 
<_MultiThreadedRendezvous of RPC that terminated with:
        status = StatusCode.UNKNOWN
        details = "worker id in ctx metadata is an empty string"
        debug_error_string = "UNKNOWN:Error received from peer  
{grpc_message:"worker id in ctx metadata is an empty string", grpc_status:2, 
created_time:"2024-12-23T16:08:00.205761402-08:00"}"
   > [while running 'Write/Write/WriteImpl/WriteBundles']
   ```
   
   </details>
   
   </summary>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to