This is an automated email from the ASF dual-hosted git repository.
bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new cb5913a [BEAM-13081] Fixes a compatible issue of decoding null-value
bitmap between JVM coder and Python Coder (#15829)
cb5913a is described below
commit cb5913aa51243ed3fedc6030ad33bf2d52f52bc8
Author: Jiayang Wu <[email protected]>
AuthorDate: Thu Nov 4 13:51:43 2021 -0700
[BEAM-13081] Fixes a compatible issue of decoding null-value bitmap between
JVM coder and Python Coder (#15829)
* tests passed
* make one line shorter
* reformat
* reformat again
* reformat for the last time
Co-authored-by: Jiayang Wu <[email protected]>
---
sdks/python/apache_beam/coders/row_coder.py | 4 +++-
sdks/python/apache_beam/coders/row_coder_test.py | 26 ++++++++++++++++++++++++
2 files changed, 29 insertions(+), 1 deletion(-)
diff --git a/sdks/python/apache_beam/coders/row_coder.py
b/sdks/python/apache_beam/coders/row_coder.py
index ec3778d..20fa867 100644
--- a/sdks/python/apache_beam/coders/row_coder.py
+++ b/sdks/python/apache_beam/coders/row_coder.py
@@ -198,7 +198,9 @@ class RowCoderImpl(StreamCoderImpl):
words.frombytes(self.NULL_MARKER_CODER.decode_from_stream(in_stream, True))
if words:
- nulls = ((words[i // 8] >> (i % 8)) & 0x01 for i in range(nvals))
+ nulls = (
+ 0 if i // 8 >= len(words) else ((words[i // 8] >> (i % 8)) & 0x01)
+ for i in range(nvals))
else:
nulls = itertools.repeat(False, nvals)
diff --git a/sdks/python/apache_beam/coders/row_coder_test.py
b/sdks/python/apache_beam/coders/row_coder_test.py
index 331f824..2fdf7f8 100644
--- a/sdks/python/apache_beam/coders/row_coder_test.py
+++ b/sdks/python/apache_beam/coders/row_coder_test.py
@@ -47,6 +47,15 @@ Person = typing.NamedTuple(
("favorite_time", Timestamp),
])
+NullablePerson = typing.NamedTuple(
+ "NullablePerson",
+ [("name", typing.Optional[str]), ("age", np.int32),
+ ("address", typing.Optional[str]), ("aliases", typing.List[str]),
+ ("knows_javascript", bool), ("payload", typing.Optional[bytes]),
+ ("custom_metadata", typing.Mapping[str, int]),
+ ("favorite_time", typing.Optional[Timestamp]),
+ ("one_more_field", typing.Optional[str])])
+
coders_registry.register_coder(Person, RowCoder)
@@ -82,6 +91,23 @@ class RowCoderTest(unittest.TestCase):
Timestamp.from_rfc3339('2020-08-12T15:51:00.032Z'))
]
+ def test_row_accepts_trailing_zeros_truncated(self):
+ expected_coder = RowCoder(
+ typing_to_runner_api(NullablePerson).row_type.schema)
+ person = NullablePerson(
+ None,
+ np.int32(25),
+ "Westeros", ["Mother of Dragons"],
+ False,
+ None, {"dragons": 3},
+ None,
+ "NotNull")
+ out = expected_coder.encode(person)
+ # 9 fields, 1 null byte, field 0, 5, 7 are null
+ new_payload = bytes([9, 1, 1 | 1 << 5 | 1 << 7]) + out[4:]
+ new_value = expected_coder.decode(new_payload)
+ self.assertEqual(person, new_value)
+
def test_create_row_coder_from_named_tuple(self):
expected_coder = RowCoder(typing_to_runner_api(Person).row_type.schema)
real_coder = coders_registry.get_coder(Person)