This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f63e9ed28e4 Additional context for decoding errors. (#27024)
f63e9ed28e4 is described below
commit f63e9ed28e4889c2645bfaf5a3450c10a205cf62
Author: Robert Bradshaw <[email protected]>
AuthorDate: Mon Oct 9 11:19:21 2023 -0700
Additional context for decoding errors. (#27024)
---
sdks/python/apache_beam/runners/worker/bundle_processor.py | 9 +++++++--
1 file changed, 7 insertions(+), 2 deletions(-)
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py
b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 935ba83709c..c7fcb958745 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -227,8 +227,13 @@ class DataInputOperation(RunnerIOOperation):
if self.index == self.stop - 1:
return
self.index += 1
- decoded_value = self.windowed_coder_impl.decode_from_stream(
- input_stream, True)
+ try:
+ decoded_value = self.windowed_coder_impl.decode_from_stream(
+ input_stream, True)
+ except Exception as exn:
+ raise ValueError(
+ "Error decoding input stream with coder " +
+ self.windowed_coder) from exn
self.output(decoded_value)
def monitoring_infos(self, transform_id, tag_to_pcollection_id):