This is an automated email from the ASF dual-hosted git repository.
bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 52efbe4 [BEAM-10277] re-write encoding position tests to declare
schema protos explicitly (#16267)
52efbe4 is described below
commit 52efbe42dd739b9026edc8cff1cc5c68fd59aa5b
Author: Brian Hulette <[email protected]>
AuthorDate: Thu Jan 6 10:23:30 2022 -0800
[BEAM-10277] re-write encoding position tests to declare schema protos
explicitly (#16267)
---
sdks/python/apache_beam/coders/row_coder_test.py | 121 ++++++++++++-----------
1 file changed, 64 insertions(+), 57 deletions(-)
diff --git a/sdks/python/apache_beam/coders/row_coder_test.py
b/sdks/python/apache_beam/coders/row_coder_test.py
index 74a33b4..7b4b769 100644
--- a/sdks/python/apache_beam/coders/row_coder_test.py
+++ b/sdks/python/apache_beam/coders/row_coder_test.py
@@ -31,8 +31,7 @@ from apache_beam.portability.api import schema_pb2
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
-from apache_beam.typehints.schemas import SCHEMA_REGISTRY
-from apache_beam.typehints.schemas import named_tuple_to_schema
+from apache_beam.typehints.schemas import named_tuple_from_schema
from apache_beam.typehints.schemas import typing_to_runner_api
from apache_beam.utils.timestamp import Timestamp
@@ -285,59 +284,83 @@ class RowCoderTest(unittest.TestCase):
self.assertEqual(value, coder.decode(coder.encode(value)))
def test_encoding_position_reorder_fields(self):
- fields = [("field1", str), ("field2", int), ("field3", int)]
-
- expected = typing.NamedTuple('expected', fields)
- reorder = schema_pb2.Schema(
- id="new_order",
+ schema1 = schema_pb2.Schema(
+ id="reorder_test_schema1",
fields=[
schema_pb2.Field(
- name="field3",
+ name="f_int32",
+ type=schema_pb2.FieldType(atomic_type=schema_pb2.INT32),
+ ),
+ schema_pb2.Field(
+ name="f_str",
type=schema_pb2.FieldType(atomic_type=schema_pb2.STRING),
- encoding_position=2),
+ ),
+ ])
+ schema2 = schema_pb2.Schema(
+ id="reorder_test_schema2",
+ encoding_positions_set=True,
+ fields=[
schema_pb2.Field(
- name="field2",
- type=schema_pb2.FieldType(atomic_type=schema_pb2.INT32),
- encoding_position=1),
+ name="f_str",
+ type=schema_pb2.FieldType(atomic_type=schema_pb2.STRING),
+ encoding_position=1,
+ ),
schema_pb2.Field(
- name="field1",
+ name="f_int32",
type=schema_pb2.FieldType(atomic_type=schema_pb2.INT32),
- encoding_position=0)
+ encoding_position=0,
+ ),
])
- old_coder = RowCoder.from_type_hint(expected, None)
- new_coder = RowCoder(reorder)
-
- encode_expected = old_coder.encode(expected("foo", 7, 12))
- encode_reorder = new_coder.encode(expected(12, 7, "foo"))
- self.assertEqual(encode_expected, encode_reorder)
-
- def test_encoding_position_add_fields(self):
- fields = [("field1", str), ("field2", str)]
-
- Old = typing.NamedTuple("Old", fields[:-1])
- New = typing.NamedTuple("New", fields)
+ RowSchema1 = named_tuple_from_schema(schema1)
+ RowSchema2 = named_tuple_from_schema(schema2)
+ roundtripped = RowCoder(schema2).decode(
+ RowCoder(schema1).encode(RowSchema1(42, "Hello World!")))
- old_coder = RowCoder.from_type_hint(Old, None)
- new_coder = RowCoder.from_type_hint(New, None)
-
- self.assertEqual(
- New("bar", None), new_coder.decode(old_coder.encode(Old("bar"))))
+ self.assertEqual(RowSchema2(f_int32=42, f_str="Hello World!"),
roundtripped)
def test_encoding_position_add_fields_and_reorder(self):
- fields = [("field1", typing.Optional[str]), ("field2", str),
- ("field3", typing.Optional[str])]
-
- Old = typing.NamedTuple("Old", fields[:-1])
- New = typing.NamedTuple("New", fields)
+ old_schema = schema_pb2.Schema(
+ id="add_test_old",
+ fields=[
+ schema_pb2.Field(
+ name="f_int32",
+ type=schema_pb2.FieldType(atomic_type=schema_pb2.INT32),
+ ),
+ schema_pb2.Field(
+ name="f_str",
+ type=schema_pb2.FieldType(atomic_type=schema_pb2.STRING),
+ ),
+ ])
+ new_schema = schema_pb2.Schema(
+ encoding_positions_set=True,
+ id="add_test_new",
+ fields=[
+ schema_pb2.Field(
+ name="f_new_str",
+ type=schema_pb2.FieldType(
+ atomic_type=schema_pb2.STRING, nullable=True),
+ encoding_position=2,
+ ),
+ schema_pb2.Field(
+ name="f_int32",
+ type=schema_pb2.FieldType(atomic_type=schema_pb2.INT32),
+ encoding_position=0,
+ ),
+ schema_pb2.Field(
+ name="f_str",
+ type=schema_pb2.FieldType(atomic_type=schema_pb2.STRING),
+ encoding_position=1,
+ ),
+ ])
- old_coder = RowCoder.from_type_hint(Old, None)
- new_coder = RowCoder.from_type_hint(New, None)
- set_encoding_position(New, [("field3", 2), ("field2", 1), ("field1", 0)])
+ Old = named_tuple_from_schema(old_schema)
+ New = named_tuple_from_schema(new_schema)
+ roundtripped = RowCoder(new_schema).decode(
+ RowCoder(old_schema).encode(Old(42, "Hello World!")))
self.assertEqual(
- New("foo", "baz", None),
- new_coder.decode(old_coder.encode(Old("foo", "baz"))))
+ New(f_new_str=None, f_int32=42, f_str="Hello World!"), roundtripped)
def test_row_coder_fail_early_bad_schema(self):
schema_proto = schema_pb2.Schema(
@@ -351,22 +374,6 @@ class RowCoderTest(unittest.TestCase):
ValueError, "type_with_no_typeinfo", lambda: RowCoder(schema_proto))
-def get_encoding_position(schema):
- return [f.encoding_position for f in schema.fields]
-
-
-def set_encoding_position(type_, values):
- beam_schema_id = "_beam_schema_id"
- if hasattr(type_, beam_schema_id):
- schema = SCHEMA_REGISTRY.get_schema_by_id(getattr(type_, beam_schema_id))
- else:
- schema = named_tuple_to_schema(type_)
- val = dict(values)
- for idx, field in enumerate(schema.fields):
- schema.fields[idx].encoding_position = val[field.name]
- SCHEMA_REGISTRY.add(type_, schema)
-
-
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
unittest.main()