This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f63e9ed28e4 Additional context for decoding errors. (#27024)
f63e9ed28e4 is described below

commit f63e9ed28e4889c2645bfaf5a3450c10a205cf62
Author: Robert Bradshaw <[email protected]>
AuthorDate: Mon Oct 9 11:19:21 2023 -0700

    Additional context for decoding errors. (#27024)
---
 sdks/python/apache_beam/runners/worker/bundle_processor.py | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py 
b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 935ba83709c..c7fcb958745 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -227,8 +227,13 @@ class DataInputOperation(RunnerIOOperation):
         if self.index == self.stop - 1:
           return
         self.index += 1
-      decoded_value = self.windowed_coder_impl.decode_from_stream(
-          input_stream, True)
+      try:
+        decoded_value = self.windowed_coder_impl.decode_from_stream(
+            input_stream, True)
+      except Exception as exn:
+        raise ValueError(
+            "Error decoding input stream with coder " +
+            self.windowed_coder) from exn
       self.output(decoded_value)
 
   def monitoring_infos(self, transform_id, tag_to_pcollection_id):

Reply via email to