TheNeuralBit commented on a change in pull request #15410:
URL: https://github.com/apache/beam/pull/15410#discussion_r743844253



##########
File path: 
model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
##########
@@ -409,6 +409,40 @@ examples:
   
"\x01\x00\x00\x00\x00\x02\x03foo\x01\xa9F\x03bar\x01\xff\xff\xff\xff\xff\xff\xff\xff\x7f":
 {f_map: {"foo": 9001, "bar": 9223372036854775807}}
   
"\x01\x00\x00\x00\x00\x04\neverything\x00\x02is\x00\x05null!\x00\r\xc2\xaf\\_(\xe3\x83\x84)_/\xc2\xaf\x00":
 {f_map: {"everything": null, "is": null, "null!": null, "¯\\_(ツ)_/¯": null}}
 
+---
+# Binary data generated with the python SDK:
+# 
+# schema1 = schema_pb2.Schema(
+#    id="testcase",
+#    fields=[
+#      schema_pb2.Field(
+#        name="str",
+#        type=schema_pb2.FieldType(atomic_type=schema_pb2.STRING),
+#        encoding_position=2
+#      ),
+#      schema_pb2.Field(
+#        name="f_bool",
+#        type=schema_pb2.FieldType(atomic_type=schema_pb2.BOOLEAN),
+#        encoding_position=0),
+#      schema_pb2.Field(
+#        name="i32",
+#        type=schema_pb2.FieldType(
+#          atomic_type=schema_pb2.INT32, nullable=True),
+#        encoding_position=1)

Review comment:
       What I meant here is that you will need to add 
`encoding_positions_set=True` to the `Schema`, since when it's not True, the 
encoding positions should be ignored.

##########
File path: sdks/python/apache_beam/coders/row_coder.py
##########
@@ -181,9 +192,11 @@ def encode_to_stream(self, value, out, nested):
         for i, is_null in enumerate(nulls):
           words[i // 8] |= is_null << (i % 8)
 
+    attrs = self._enc_pos_idx(attrs)
+    fields = self._enc_pos_idx(self.schema.fields)

Review comment:
       We need to avoid using logic like this to determine the field order in 
the encode_to_stream method. This implementation is essentially determining the 
field order dynamically for every encoded element. It should be possible to use 
something like `np.argsort` to determine the order one time (likely in the 
`__init__` method), and then reference that order in the encode_to_stream 
method.
   
   Please note we'll also need to reference that order in the 
decode_from_stream method. RowCoder should respect the encoding positions when 
encoding _and_ decoding.

##########
File path: sdks/python/apache_beam/coders/row_coder_test.py
##########
@@ -267,6 +324,54 @@ def test_row_coder_fail_early_bad_schema(self):
     self.assertRaisesRegex(
         ValueError, "type_with_no_typeinfo", lambda: RowCoder(schema_proto))
 
+  def test_yaml(self):
+    schema1 = schema_pb2.Schema(
+        id="30ea5a25-dcd8-4cdb-abeb-5332d15ab4b9",
+        fields=[
+            schema_pb2.Field(
+                name="str",
+                type=schema_pb2.FieldType(atomic_type=schema_pb2.STRING),
+                encoding_position=2),
+            schema_pb2.Field(
+                name="f_bool",
+                type=schema_pb2.FieldType(atomic_type=schema_pb2.BOOLEAN),
+                encoding_position=3),
+            schema_pb2.Field(
+                name="i32",
+                type=schema_pb2.FieldType(
+                    atomic_type=schema_pb2.INT32, nullable=True),
+                encoding_position=1)
+        ],
+        encoding_positions_set=True)
+
+    coder = RowCoder(schema1)
+    c = coder.schema.SerializeToString()
+    print("payload = %s" % c)
+    test = typing.NamedTuple(
+        "test", [
+            ("i32", np.int32),
+            ("str", str),
+            ("f_bool", bool),
+        ])
+    example = coder.encode(test(21, "str2", False))
+    print("example = %s" % example)

Review comment:
       It looks like this was added just to generate the payload for 
`standard_coders.yaml`, please remove

##########
File path: sdks/python/apache_beam/coders/row_coder.py
##########
@@ -164,7 +164,13 @@ class RowCoderImpl(StreamCoderImpl):
   def __init__(self, schema, components):
     self.schema = schema
     self.constructor = named_tuple_from_schema(schema)
-    self.components = list(c.get_impl() for c in components)
+    self.encoding_positions = list(range(len(self.schema.fields)))
+    if any(field.encoding_position for field in self.schema.fields):
+      self.encoding_positions = list(
+          field.encoding_position for field in self.schema.fields)

Review comment:
       Please note the last part from the comment above:
   > If this Field is part of a Schema where encoding_positions_set is True 
then encoding_position must be defined, *otherwise this field is ignored.*
   
   What you have implemented now still respects the `encoding_position` fields 
even if  `encoding_positions_set` is False.

##########
File path: 
model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
##########
@@ -409,6 +409,40 @@ examples:
   
"\x01\x00\x00\x00\x00\x02\x03foo\x01\xa9F\x03bar\x01\xff\xff\xff\xff\xff\xff\xff\xff\x7f":
 {f_map: {"foo": 9001, "bar": 9223372036854775807}}
   
"\x01\x00\x00\x00\x00\x04\neverything\x00\x02is\x00\x05null!\x00\r\xc2\xaf\\_(\xe3\x83\x84)_/\xc2\xaf\x00":
 {f_map: {"everything": null, "is": null, "null!": null, "¯\\_(ツ)_/¯": null}}
 
+---
+# Binary data generated with the python SDK:
+# 
+# schema1 = schema_pb2.Schema(
+#    id="testcase",
+#    fields=[
+#      schema_pb2.Field(
+#        name="str",
+#        type=schema_pb2.FieldType(atomic_type=schema_pb2.STRING),
+#        encoding_position=2
+#      ),
+#      schema_pb2.Field(
+#        name="f_bool",
+#        type=schema_pb2.FieldType(atomic_type=schema_pb2.BOOLEAN),
+#        encoding_position=3),
+#      schema_pb2.Field(
+#        name="i32",
+#        type=schema_pb2.FieldType(
+#          atomic_type=schema_pb2.INT32, nullable=True),
+#        encoding_position=1)
+#   ])
+#
+# coder = RowCoder(schema1)
+# c = coder.schema.SerializeToString()
+# print("payload = %s" % c)
+# test = typing.NamedTuple("test", [ ("f_bool", bool), ("i32", np.int32), 
("str", str) ])
+# example = coder.encode(test(False,21,"str2"))
+# print("example = %s" % example)
+coder:
+  urn: "beam:coder:row:v1"
+  payload: 
"\n\x0b\n\x03str\x1a\x02\x10\x07(\x02\n\x0c\n\x06f_bool\x1a\x02\x10\x08\n\r\n\x03i32\x1a\x04\x08\x01\x10\x03(\x01\x12$30ea5a25-dcd8-4cdb-abeb-5332d15ab4b9"
+examples:
+  "\x03\x00\x04str2\x00\x15": {f_bool: False, i32: 21, str: "str2"}

Review comment:
       Since this example is passing, I think it indicates your implementation 
is flawed. Looking at the example, the fields appear to be encoded in the 
natural order, _not_ the order specified by encoding positions as they should:
   
   ```
   str2\x00\x15
   ```
   
   - str2 = value for str
   - \x00 (false) = value for f_bool
   - \x15 (21) = value for i32




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to