[ 
https://issues.apache.org/jira/browse/BEAM-10277?focusedWorklogId=677293&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-677293
 ]

ASF GitHub Bot logged work on BEAM-10277:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 05/Nov/21 19:34
            Start Date: 05/Nov/21 19:34
    Worklog Time Spent: 10m 
      Work Description: TheNeuralBit commented on a change in pull request 
#15410:
URL: https://github.com/apache/beam/pull/15410#discussion_r743844253



##########
File path: 
model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
##########
@@ -409,6 +409,40 @@ examples:
   
"\x01\x00\x00\x00\x00\x02\x03foo\x01\xa9F\x03bar\x01\xff\xff\xff\xff\xff\xff\xff\xff\x7f":
 {f_map: {"foo": 9001, "bar": 9223372036854775807}}
   
"\x01\x00\x00\x00\x00\x04\neverything\x00\x02is\x00\x05null!\x00\r\xc2\xaf\\_(\xe3\x83\x84)_/\xc2\xaf\x00":
 {f_map: {"everything": null, "is": null, "null!": null, "¯\\_(ツ)_/¯": null}}
 
+---
+# Binary data generated with the python SDK:
+# 
+# schema1 = schema_pb2.Schema(
+#    id="testcase",
+#    fields=[
+#      schema_pb2.Field(
+#        name="str",
+#        type=schema_pb2.FieldType(atomic_type=schema_pb2.STRING),
+#        encoding_position=2
+#      ),
+#      schema_pb2.Field(
+#        name="f_bool",
+#        type=schema_pb2.FieldType(atomic_type=schema_pb2.BOOLEAN),
+#        encoding_position=0),
+#      schema_pb2.Field(
+#        name="i32",
+#        type=schema_pb2.FieldType(
+#          atomic_type=schema_pb2.INT32, nullable=True),
+#        encoding_position=1)

Review comment:
       What I meant here is that you will need to add 
`encoding_positions_set=True` to the `Schema`, since when it's not True, the 
encoding positions should be ignored.

##########
File path: sdks/python/apache_beam/coders/row_coder.py
##########
@@ -181,9 +192,11 @@ def encode_to_stream(self, value, out, nested):
         for i, is_null in enumerate(nulls):
           words[i // 8] |= is_null << (i % 8)
 
+    attrs = self._enc_pos_idx(attrs)
+    fields = self._enc_pos_idx(self.schema.fields)

Review comment:
       We need to avoid using logic like this to determine the field order in 
the encode_to_stream method. This implementation is essentially determining the 
field order dynamically for every encoded element. It should be possible to use 
something like `np.argsort` to determine the order one time (likely in the 
`__init__` method), and then reference that order in the encode_to_stream 
method.
   
   Please note we'll also need to reference that order in the 
decode_from_stream method. RowCoder should respect the encoding positions when 
encoding _and_ decoding.

##########
File path: sdks/python/apache_beam/coders/row_coder_test.py
##########
@@ -267,6 +324,54 @@ def test_row_coder_fail_early_bad_schema(self):
     self.assertRaisesRegex(
         ValueError, "type_with_no_typeinfo", lambda: RowCoder(schema_proto))
 
+  def test_yaml(self):
+    schema1 = schema_pb2.Schema(
+        id="30ea5a25-dcd8-4cdb-abeb-5332d15ab4b9",
+        fields=[
+            schema_pb2.Field(
+                name="str",
+                type=schema_pb2.FieldType(atomic_type=schema_pb2.STRING),
+                encoding_position=2),
+            schema_pb2.Field(
+                name="f_bool",
+                type=schema_pb2.FieldType(atomic_type=schema_pb2.BOOLEAN),
+                encoding_position=3),
+            schema_pb2.Field(
+                name="i32",
+                type=schema_pb2.FieldType(
+                    atomic_type=schema_pb2.INT32, nullable=True),
+                encoding_position=1)
+        ],
+        encoding_positions_set=True)
+
+    coder = RowCoder(schema1)
+    c = coder.schema.SerializeToString()
+    print("payload = %s" % c)
+    test = typing.NamedTuple(
+        "test", [
+            ("i32", np.int32),
+            ("str", str),
+            ("f_bool", bool),
+        ])
+    example = coder.encode(test(21, "str2", False))
+    print("example = %s" % example)

Review comment:
       It looks like this was added just to generate the payload for 
`standard_coders.yaml`, please remove

##########
File path: sdks/python/apache_beam/coders/row_coder.py
##########
@@ -164,7 +164,13 @@ class RowCoderImpl(StreamCoderImpl):
   def __init__(self, schema, components):
     self.schema = schema
     self.constructor = named_tuple_from_schema(schema)
-    self.components = list(c.get_impl() for c in components)
+    self.encoding_positions = list(range(len(self.schema.fields)))
+    if any(field.encoding_position for field in self.schema.fields):
+      self.encoding_positions = list(
+          field.encoding_position for field in self.schema.fields)

Review comment:
       Please note the last part from the comment above:
   > If this Field is part of a Schema where encoding_positions_set is True 
then encoding_position must be defined, *otherwise this field is ignored.*
   
   What you have implemented now still respects the `encoding_position` fields 
even if  `encoding_positions_set` is False.

##########
File path: 
model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
##########
@@ -409,6 +409,40 @@ examples:
   
"\x01\x00\x00\x00\x00\x02\x03foo\x01\xa9F\x03bar\x01\xff\xff\xff\xff\xff\xff\xff\xff\x7f":
 {f_map: {"foo": 9001, "bar": 9223372036854775807}}
   
"\x01\x00\x00\x00\x00\x04\neverything\x00\x02is\x00\x05null!\x00\r\xc2\xaf\\_(\xe3\x83\x84)_/\xc2\xaf\x00":
 {f_map: {"everything": null, "is": null, "null!": null, "¯\\_(ツ)_/¯": null}}
 
+---
+# Binary data generated with the python SDK:
+# 
+# schema1 = schema_pb2.Schema(
+#    id="testcase",
+#    fields=[
+#      schema_pb2.Field(
+#        name="str",
+#        type=schema_pb2.FieldType(atomic_type=schema_pb2.STRING),
+#        encoding_position=2
+#      ),
+#      schema_pb2.Field(
+#        name="f_bool",
+#        type=schema_pb2.FieldType(atomic_type=schema_pb2.BOOLEAN),
+#        encoding_position=3),
+#      schema_pb2.Field(
+#        name="i32",
+#        type=schema_pb2.FieldType(
+#          atomic_type=schema_pb2.INT32, nullable=True),
+#        encoding_position=1)
+#   ])
+#
+# coder = RowCoder(schema1)
+# c = coder.schema.SerializeToString()
+# print("payload = %s" % c)
+# test = typing.NamedTuple("test", [ ("f_bool", bool), ("i32", np.int32), 
("str", str) ])
+# example = coder.encode(test(False,21,"str2"))
+# print("example = %s" % example)
+coder:
+  urn: "beam:coder:row:v1"
+  payload: 
"\n\x0b\n\x03str\x1a\x02\x10\x07(\x02\n\x0c\n\x06f_bool\x1a\x02\x10\x08\n\r\n\x03i32\x1a\x04\x08\x01\x10\x03(\x01\x12$30ea5a25-dcd8-4cdb-abeb-5332d15ab4b9"
+examples:
+  "\x03\x00\x04str2\x00\x15": {f_bool: False, i32: 21, str: "str2"}

Review comment:
       Since this example is passing, I think it indicates your implementation 
is flawed. Looking at the example, the fields appear to be encoded in the 
natural order, _not_ the order specified by encoding positions as they should:
   
   ```
   str2\x00\x15
   ```
   
   - str2 = value for str
   - \x00 (false) = value for f_bool
   - \x15 (21) = value for i32




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 677293)
    Time Spent: 22.5h  (was: 22h 20m)

> beam:coder:row:v1 implementations should respect encoding_position
> ------------------------------------------------------------------
>
>                 Key: BEAM-10277
>                 URL: https://issues.apache.org/jira/browse/BEAM-10277
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-core, sdk-py-core
>            Reporter: Brian Hulette
>            Assignee: Irwin Alejandro Rodirguez Ramirez
>            Priority: P3
>              Labels: Clarified
>          Time Spent: 22.5h
>  Remaining Estimate: 0h
>
> h3. Problem/Status
> The schema proto has an [encoding_position 
> field|https://github.com/apache/beam/blob/2c619c81082839e054f16efee9311b9f74b6e436/model/pipeline/src/main/proto/schema.proto#L55]
>  that is currently unused in every row coder implementation. The intention of 
> this field is that it indicates an alternative order for the fields to be 
> encoded in by [beam:coder:row:v1 
> implementations|https://github.com/apache/beam/blob/1e60f383fb39b9ff8d44edcbe5357da4c1e52378/model/pipeline/src/main/proto/beam_runner_api.proto#L937-L990].
>  Currently all the implementations ignore this field, and always encode the 
> fields in the order that they appear in the schema.
> h3. Motivation
> The idea with the encoding position is that it will give runners a way to 
> enforce schema compatibility (BEAM-9502), by re-ordering the way fields are 
> encoded when the schema changes between two job submissions. Schema changes 
> could be due to fields re-ordering, or field additions/deletions.
> h3. Code pointers
> The Python beam:coder:row:v1 implementation lives in 
> [row_coder.py|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/coders/row_coder.py]
> The Java implementation is a little more complicated, distributed between 
> [SchemaCoder|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java],
>  
> [RowCoder|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java],
>  and 
> [RowCoderGenerator|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java].
>  RowCoderGenerator contains the code relevant to this jira - it uses 
> bytebuddy to generate Java code for the coder. We need it to generate code 
> that puts fields in the order specified by encoding_position.
> h3. Testing
> Python and Java implementations should be tested with unit tests 
> ([RowCoderTest|https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/RowCoderTest.java],
>  
> [row_coder_test|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/coders/row_coder_test.py]).
>  We should also test them for compatibility by adding test cases that 
> exercise the encoding_position in 
> [standard_coders.yaml|https://github.com/apache/beam/blob/master/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml].
>  These tests will be executed by 
> [CommonCoderTest|https://github.com/apache/beam/blob/master/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java]
>  and 
> [standard_coders_test|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/coders/standard_coders_test.py].
>  There's some example code for generating a new test case 
> [here|https://github.com/apache/beam/blob/master/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml#L387-L400].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to