This is an automated email from the ASF dual-hosted git repository.

bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 52efbe4  [BEAM-10277] re-write encoding position tests to declare 
schema protos explicitly (#16267)
52efbe4 is described below

commit 52efbe42dd739b9026edc8cff1cc5c68fd59aa5b
Author: Brian Hulette <[email protected]>
AuthorDate: Thu Jan 6 10:23:30 2022 -0800

    [BEAM-10277] re-write encoding position tests to declare schema protos 
explicitly (#16267)
---
 sdks/python/apache_beam/coders/row_coder_test.py | 121 ++++++++++++-----------
 1 file changed, 64 insertions(+), 57 deletions(-)

diff --git a/sdks/python/apache_beam/coders/row_coder_test.py 
b/sdks/python/apache_beam/coders/row_coder_test.py
index 74a33b4..7b4b769 100644
--- a/sdks/python/apache_beam/coders/row_coder_test.py
+++ b/sdks/python/apache_beam/coders/row_coder_test.py
@@ -31,8 +31,7 @@ from apache_beam.portability.api import schema_pb2
 from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
-from apache_beam.typehints.schemas import SCHEMA_REGISTRY
-from apache_beam.typehints.schemas import named_tuple_to_schema
+from apache_beam.typehints.schemas import named_tuple_from_schema
 from apache_beam.typehints.schemas import typing_to_runner_api
 from apache_beam.utils.timestamp import Timestamp
 
@@ -285,59 +284,83 @@ class RowCoderTest(unittest.TestCase):
     self.assertEqual(value, coder.decode(coder.encode(value)))
 
   def test_encoding_position_reorder_fields(self):
-    fields = [("field1", str), ("field2", int), ("field3", int)]
-
-    expected = typing.NamedTuple('expected', fields)
-    reorder = schema_pb2.Schema(
-        id="new_order",
+    schema1 = schema_pb2.Schema(
+        id="reorder_test_schema1",
         fields=[
             schema_pb2.Field(
-                name="field3",
+                name="f_int32",
+                type=schema_pb2.FieldType(atomic_type=schema_pb2.INT32),
+            ),
+            schema_pb2.Field(
+                name="f_str",
                 type=schema_pb2.FieldType(atomic_type=schema_pb2.STRING),
-                encoding_position=2),
+            ),
+        ])
+    schema2 = schema_pb2.Schema(
+        id="reorder_test_schema2",
+        encoding_positions_set=True,
+        fields=[
             schema_pb2.Field(
-                name="field2",
-                type=schema_pb2.FieldType(atomic_type=schema_pb2.INT32),
-                encoding_position=1),
+                name="f_str",
+                type=schema_pb2.FieldType(atomic_type=schema_pb2.STRING),
+                encoding_position=1,
+            ),
             schema_pb2.Field(
-                name="field1",
+                name="f_int32",
                 type=schema_pb2.FieldType(atomic_type=schema_pb2.INT32),
-                encoding_position=0)
+                encoding_position=0,
+            ),
         ])
 
-    old_coder = RowCoder.from_type_hint(expected, None)
-    new_coder = RowCoder(reorder)
-
-    encode_expected = old_coder.encode(expected("foo", 7, 12))
-    encode_reorder = new_coder.encode(expected(12, 7, "foo"))
-    self.assertEqual(encode_expected, encode_reorder)
-
-  def test_encoding_position_add_fields(self):
-    fields = [("field1", str), ("field2", str)]
-
-    Old = typing.NamedTuple("Old", fields[:-1])
-    New = typing.NamedTuple("New", fields)
+    RowSchema1 = named_tuple_from_schema(schema1)
+    RowSchema2 = named_tuple_from_schema(schema2)
+    roundtripped = RowCoder(schema2).decode(
+        RowCoder(schema1).encode(RowSchema1(42, "Hello World!")))
 
-    old_coder = RowCoder.from_type_hint(Old, None)
-    new_coder = RowCoder.from_type_hint(New, None)
-
-    self.assertEqual(
-        New("bar", None), new_coder.decode(old_coder.encode(Old("bar"))))
+    self.assertEqual(RowSchema2(f_int32=42, f_str="Hello World!"), 
roundtripped)
 
   def test_encoding_position_add_fields_and_reorder(self):
-    fields = [("field1", typing.Optional[str]), ("field2", str),
-              ("field3", typing.Optional[str])]
-
-    Old = typing.NamedTuple("Old", fields[:-1])
-    New = typing.NamedTuple("New", fields)
+    old_schema = schema_pb2.Schema(
+        id="add_test_old",
+        fields=[
+            schema_pb2.Field(
+                name="f_int32",
+                type=schema_pb2.FieldType(atomic_type=schema_pb2.INT32),
+            ),
+            schema_pb2.Field(
+                name="f_str",
+                type=schema_pb2.FieldType(atomic_type=schema_pb2.STRING),
+            ),
+        ])
+    new_schema = schema_pb2.Schema(
+        encoding_positions_set=True,
+        id="add_test_new",
+        fields=[
+            schema_pb2.Field(
+                name="f_new_str",
+                type=schema_pb2.FieldType(
+                    atomic_type=schema_pb2.STRING, nullable=True),
+                encoding_position=2,
+            ),
+            schema_pb2.Field(
+                name="f_int32",
+                type=schema_pb2.FieldType(atomic_type=schema_pb2.INT32),
+                encoding_position=0,
+            ),
+            schema_pb2.Field(
+                name="f_str",
+                type=schema_pb2.FieldType(atomic_type=schema_pb2.STRING),
+                encoding_position=1,
+            ),
+        ])
 
-    old_coder = RowCoder.from_type_hint(Old, None)
-    new_coder = RowCoder.from_type_hint(New, None)
-    set_encoding_position(New, [("field3", 2), ("field2", 1), ("field1", 0)])
+    Old = named_tuple_from_schema(old_schema)
+    New = named_tuple_from_schema(new_schema)
+    roundtripped = RowCoder(new_schema).decode(
+        RowCoder(old_schema).encode(Old(42, "Hello World!")))
 
     self.assertEqual(
-        New("foo", "baz", None),
-        new_coder.decode(old_coder.encode(Old("foo", "baz"))))
+        New(f_new_str=None, f_int32=42, f_str="Hello World!"), roundtripped)
 
   def test_row_coder_fail_early_bad_schema(self):
     schema_proto = schema_pb2.Schema(
@@ -351,22 +374,6 @@ class RowCoderTest(unittest.TestCase):
         ValueError, "type_with_no_typeinfo", lambda: RowCoder(schema_proto))
 
 
-def get_encoding_position(schema):
-  return [f.encoding_position for f in schema.fields]
-
-
-def set_encoding_position(type_, values):
-  beam_schema_id = "_beam_schema_id"
-  if hasattr(type_, beam_schema_id):
-    schema = SCHEMA_REGISTRY.get_schema_by_id(getattr(type_, beam_schema_id))
-  else:
-    schema = named_tuple_to_schema(type_)
-  val = dict(values)
-  for idx, field in enumerate(schema.fields):
-    schema.fields[idx].encoding_position = val[field.name]
-  SCHEMA_REGISTRY.add(type_, schema)
-
-
 if __name__ == "__main__":
   logging.getLogger().setLevel(logging.INFO)
   unittest.main()

Reply via email to