[ 
https://issues.apache.org/jira/browse/BEAM-10277?focusedWorklogId=672132&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-672132
 ]

ASF GitHub Bot logged work on BEAM-10277:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 29/Oct/21 18:55
            Start Date: 29/Oct/21 18:55
    Worklog Time Spent: 10m 
      Work Description: TheNeuralBit commented on a change in pull request 
#15410:
URL: https://github.com/apache/beam/pull/15410#discussion_r739466199



##########
File path: sdks/python/apache_beam/typehints/schemas.py
##########
@@ -285,6 +285,21 @@ def _hydrate_namedtuple_instance(encoded_schema, values):
       proto_utils.parse_Bytes(encoded_schema, schema_pb2.Schema))(*values)
 
 
+def get_encoding_position(schema):
+  return [f.encoding_position for f in schema.fields]
+
+
+def set_encoding_position(type_, values):
+  if hasattr(type_, _BEAM_SCHEMA_ID):
+    schema = SCHEMA_REGISTRY.get_schema_by_id(getattr(type_, _BEAM_SCHEMA_ID))
+  else:
+    schema = named_tuple_to_schema(type_)
+  val = dict(values)
+  for idx, field in enumerate(schema.fields):
+    schema.fields[idx].encoding_position = val[field.name]
+  SCHEMA_REGISTRY.add(type_, schema)

Review comment:
       Could these helpers be moved to row_coder_test? They don't seem to be 
used otherwise.

##########
File path: 
model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
##########
@@ -409,6 +409,40 @@ examples:
   
"\x01\x00\x00\x00\x00\x02\x03foo\x01\xa9F\x03bar\x01\xff\xff\xff\xff\xff\xff\xff\xff\x7f":
 {f_map: {"foo": 9001, "bar": 9223372036854775807}}
   
"\x01\x00\x00\x00\x00\x04\neverything\x00\x02is\x00\x05null!\x00\r\xc2\xaf\\_(\xe3\x83\x84)_/\xc2\xaf\x00":
 {f_map: {"everything": null, "is": null, "null!": null, "¯\\_(ツ)_/¯": null}}
 
+---
+# Binary data generated with the python SDK:
+# 
+# schema1 = schema_pb2.Schema(
+#    id="testcase",
+#    fields=[
+#      schema_pb2.Field(
+#        name="str",
+#        type=schema_pb2.FieldType(atomic_type=schema_pb2.STRING),
+#        encoding_position=2
+#      ),
+#      schema_pb2.Field(
+#        name="f_bool",
+#        type=schema_pb2.FieldType(atomic_type=schema_pb2.BOOLEAN),
+#        encoding_position=0),
+#      schema_pb2.Field(
+#        name="i32",
+#        type=schema_pb2.FieldType(
+#          atomic_type=schema_pb2.INT32, nullable=True),
+#        encoding_position=1)

Review comment:
       Thanks this looks good. Note we'll need to make sure 
`encoding_positions_set` is true though, to be consistent with the other 
comment.

##########
File path: sdks/python/apache_beam/coders/row_coder.py
##########
@@ -182,7 +188,6 @@ def encode_to_stream(self, value, out, nested):
           words[i // 8] |= is_null << (i % 8)
 
     self.NULL_MARKER_CODER.encode_to_stream(words.tobytes(), out, True)
-
     for c, field, attr in zip(self.components, self.schema.fields, attrs):

Review comment:
       Don't we also need to adjust the order for `self.schema.fields` and 
`attrs` to be consistent with the order of `self.components`? Maybe I'm missing 
something though, since this is passing tests.

##########
File path: sdks/python/apache_beam/coders/row_coder.py
##########
@@ -164,7 +164,13 @@ class RowCoderImpl(StreamCoderImpl):
   def __init__(self, schema, components):
     self.schema = schema
     self.constructor = named_tuple_from_schema(schema)
-    self.components = list(c.get_impl() for c in components)
+    self.encoding_positions = list(range(len(self.schema.fields)))
+    if any(field.encoding_position for field in self.schema.fields):
+      self.encoding_positions = list(
+          field.encoding_position for field in self.schema.fields)

Review comment:
       Sorry I just realized that the spec changed a bit from what I was 
thinking: 
https://github.com/apache/beam/blob/2e448dee58f1ee60551cc47b9aa7df6bc832734a/model/pipeline/src/main/proto/schema.proto#L52-L58
   
   Note the part about `encoding_positions_set`. So this logic should look 
something like this (I also added a conservative check to verify 
encoding_positions are actually set):
   
   ```suggestion
       if self.schema.encoding_positions_set:
         if not all(field.encoding_position for field in self.schema.fields):
           raise ValueError(f"Schema with id {schema.id} has 
encoding_positions_set=True, but not all fields have encoding_position set")
         self.encoding_positions = list(
             field.encoding_position for field in self.schema.fields)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 672132)
    Time Spent: 18h 10m  (was: 18h)

> beam:coder:row:v1 implementations should respect encoding_position
> ------------------------------------------------------------------
>
>                 Key: BEAM-10277
>                 URL: https://issues.apache.org/jira/browse/BEAM-10277
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-core, sdk-py-core
>            Reporter: Brian Hulette
>            Assignee: Irwin Alejandro Rodirguez Ramirez
>            Priority: P3
>              Labels: Clarified
>          Time Spent: 18h 10m
>  Remaining Estimate: 0h
>
> h3. Problem/Status
> The schema proto has an [encoding_position 
> field|https://github.com/apache/beam/blob/2c619c81082839e054f16efee9311b9f74b6e436/model/pipeline/src/main/proto/schema.proto#L55]
>  that is currently unused in every row coder implementation. The intention of 
> this field is that it indicates an alternative order for the fields to be 
> encoded in by [beam:coder:row:v1 
> implementations|https://github.com/apache/beam/blob/1e60f383fb39b9ff8d44edcbe5357da4c1e52378/model/pipeline/src/main/proto/beam_runner_api.proto#L937-L990].
>  Currently all the implementations ignore this field, and always encode the 
> fields in the order that they appear in the schema.
> h3. Motivation
> The idea with the encoding position is that it will give runners a way to 
> enforce schema compatibility (BEAM-9502), by re-ordering the way fields are 
> encoded when the schema changes between two job submissions. Schema changes 
> could be due to fields re-ordering, or field additions/deletions.
> h3. Code pointers
> The Python beam:coder:row:v1 implementation lives in 
> [row_coder.py|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/coders/row_coder.py]
> The Java implementation is a little more complicated, distributed between 
> [SchemaCoder|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java],
>  
> [RowCoder|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java],
>  and 
> [RowCoderGenerator|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java].
>  RowCoderGenerator contains the code relevant to this jira - it uses 
> bytebuddy to generate Java code for the coder. We need it to generate code 
> that puts fields in the order specified by encoding_position.
> h3. Testing
> Python and Java implementations should be tested with unit tests 
> ([RowCoderTest|https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/RowCoderTest.java],
>  
> [row_coder_test|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/coders/row_coder_test.py]).
>  We should also test them for compatibility by adding test cases that 
> exercise the encoding_position in 
> [standard_coders.yaml|https://github.com/apache/beam/blob/master/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml].
>  These tests will be executed by 
> [CommonCoderTest|https://github.com/apache/beam/blob/master/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java]
>  and 
> [standard_coders_test|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/coders/standard_coders_test.py].
>  There's some example code for generating a new test case 
> [here|https://github.com/apache/beam/blob/master/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml#L387-L400].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to