claudevdm commented on PR #35243:
URL: https://github.com/apache/beam/pull/35243#issuecomment-2994720249
Here is one
```
ValueError: Error decoding input stream with coder
WindowedValueCoder[window_coder=GlobalWindowCoder, value_coder=RowCoder]
self = <apache_beam.runners.worker.bundle_processor.DataInputOperation
object at 0x7917f5f45d60>
encoded_windowed_values =
b'\x7f\xdf;dZ\x1c\xac\t\x00\x00\x00\x01\x0fQ\x0b\x00\xfd\xff\xff\xff\x0f\xfd\xff\xff\xff\xff\xff\xff\xff\xff\x01?\xb9\...\x05Test9\x05Test9\x05Test9\x80\x00\x01\x97{"\xe0C\x02\x02\x037\x80\x00\x00@t\x17p\x00\x80\x00\x00\x00\x01\xf6\xc3\x11'
def process_encoded(self, encoded_windowed_values: bytes) -> None:
input_stream = coder_impl.create_InputStream(encoded_windowed_values)
while input_stream.size() > 0:
with self.splitting_lock:
if self.index == self.stop - 1:
return
self.index += 1
try:
> decoded_value = self.windowed_coder_impl.decode_from_stream(
input_stream, True)
apache_beam/runners/worker/bundle_processor.py:231:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _
apache_beam/coders/coder_impl.py:1549: in decode_from_stream
value = self._value_coder.decode_from_stream(in_stream, nested)
apache_beam/coders/coder_impl.py:1641: in decode_from_stream
return self._value_coder.decode(in_stream.read(value_length))
apache_beam/coders/coder_impl.py:239: in decode
return self.decode_from_stream(create_InputStream(encoded), False)
apache_beam/coders/coder_impl.py:1952: in decode_from_stream
item = component_coder.decode_from_stream(in_stream, True)
apache_beam/coders/coder_impl.py:2005: in decode_from_stream
return self.logical_type.to_language_type(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _
self = <apache_beam.typehints.schemas.MicrosInstant object at 0x79182e358fa0>
value = BeamSchema_ee15bbf4_32de_4611_a9dc_b1b5c5981f57(seconds=None,
micros=None)
def to_language_type(self, value):
# type: (MicrosInstantRepresentation) -> Timestamp
> return Timestamp(seconds=int(value.seconds), micros=int(value.micros))
E TypeError: int() argument must be a string, a bytes-like object or a
number, not 'NoneType'
apache_beam/typehints/schemas.py:934: TypeError
The above exception was the direct cause of the following exception:
a = (<apache_beam.io.external.xlang_jdbcio_it_test.CrossLanguageJdbcIOTest
testMethod=test_xlang_jdbc_write_read_0_postgres>,)
kw = {}
@wraps(func)
def standalone_func(*a, **kw):
> return func(*(a + p.args), **p.kwargs, **kw)
../../build/gradleenv/1398941893/lib/python3.9/site-packages/parameterized/parameterized.py:620:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _
apache_beam/io/external/xlang_jdbcio_it_test.py:268: in
test_xlang_jdbc_write_read
assert_that(result, equal_to(expected_rows))
apache_beam/pipeline.py:661: in __exit__
self.result = self.run()
apache_beam/testing/test_pipeline.py:115: in run
result = super().run(
apache_beam/pipeline.py:635: in run
return self.runner.run_pipeline(self, self._options)
apache_beam/runners/direct/test_direct_runner.py:42: in run_pipeline
self.result = super().run_pipeline(pipeline, options)
apache_beam/runners/direct/direct_runner.py:185: in run_pipeline
return runner.run_pipeline(pipeline, options)
apache_beam/runners/portability/fn_api_runner/fn_runner.py:196: in
run_pipeline
self._latest_run_result = self.run_via_runner_api(
apache_beam/runners/portability/fn_api_runner/fn_runner.py:223: in
run_via_runner_api
return self.run_stages(stage_context, stages)
apache_beam/runners/portability/fn_api_runner/fn_runner.py:470: in run_stages
bundle_results = self._execute_bundle(
apache_beam/runners/portability/fn_api_runner/fn_runner.py:795: in
_execute_bundle
self._run_bundle(
apache_beam/runners/portability/fn_api_runner/fn_runner.py:1034: in
_run_bundle
result, splits = bundle_manager.process_bundle(
apache_beam/runners/portability/fn_api_runner/fn_runner.py:1360: in
process_bundle
result_future =
self._worker_handler.control_conn.push(process_bundle_req)
apache_beam/runners/portability/fn_api_runner/worker_handlers.py:389: in push
response = self.worker.do_instruction(request)
apache_beam/runners/worker/sdk_worker.py:657: in do_instruction
return getattr(self, request_type)(
apache_beam/runners/worker/sdk_worker.py:695: in process_bundle
bundle_processor.process_bundle(instruction_id))
apache_beam/runners/worker/bundle_processor.py:1274: in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _
self = <apache_beam.runners.worker.bundle_processor.DataInputOperation
object at 0x7917f5f45d60>
encoded_windowed_values =
b'\x7f\xdf;dZ\x1c\xac\t\x00\x00\x00\x01\x0fQ\x0b\x00\xfd\xff\xff\xff\x0f\xfd\xff\xff\xff\xff\xff\xff\xff\xff\x01?\xb9\...\x05Test9\x05Test9\x05Test9\x80\x00\x01\x97{"\xe0C\x02\x02\x037\x80\x00\x00@t\x17p\x00\x80\x00\x00\x00\x01\xf6\xc3\x11'
def process_encoded(self, encoded_windowed_values: bytes) -> None:
input_stream = coder_impl.create_InputStream(encoded_windowed_values)
while input_stream.size() > 0:
with self.splitting_lock:
if self.index == self.stop - 1:
return
self.index += 1
try:
decoded_value = self.windowed_coder_impl.decode_from_stream(
input_stream, True)
except Exception as exn:
> raise ValueError(
"Error decoding input stream with coder " +
str(self.windowed_coder)) from exn
E ValueError: Error decoding input stream with coder
WindowedValueCoder[window_coder=GlobalWindowCoder, value_coder=RowCoder]
apache_beam/runners/worker/bundle_processor.py:234: ValueError
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]