This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch users/damccorm/encodingError
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 64f368295352f7ca6eb49fcd9a312892f7f68a4b
Author: Danny McCormick <[email protected]>
AuthorDate: Fri Nov 14 14:23:04 2025 -0500

    Improve error message for decoding input stream
---
 sdks/python/apache_beam/runners/worker/bundle_processor.py | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py 
b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 4094fd1d805..5c2d75f246f 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -234,9 +234,11 @@ class DataInputOperation(RunnerIOOperation):
         decoded_value = self.windowed_coder_impl.decode_from_stream(
             input_stream, True)
       except Exception as exn:
+        coder = str(self.windowed_coder)
+        step = self.name_context.step_name
         raise ValueError(
-            "Error decoding input stream with coder " +
-            str(self.windowed_coder)) from exn
+            f"Error decoding input stream with coder ${coder} in step ${step}"
+        ) from exn
       self.output(decoded_value)
 
   def monitoring_infos(

Reply via email to