[ 
https://issues.apache.org/jira/browse/BEAM-10277?focusedWorklogId=665903&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-665903
 ]

ASF GitHub Bot logged work on BEAM-10277:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 17/Oct/21 00:44
            Start Date: 17/Oct/21 00:44
    Worklog Time Spent: 10m 
      Work Description: TheNeuralBit commented on a change in pull request 
#15410:
URL: https://github.com/apache/beam/pull/15410#discussion_r730135297



##########
File path: sdks/python/apache_beam/coders/row_coder.py
##########
@@ -169,6 +170,7 @@ def __init__(self, schema, components):
         field.type.nullable for field in self.schema.fields)
 
   def encode_to_stream(self, value, out, nested):
+    self.schema = SCHEMA_REGISTRY.get_schema_by_id(self.schema.id)

Review comment:
       Is this necessary?

##########
File path: sdks/python/apache_beam/coders/row_coder.py
##########
@@ -190,7 +193,10 @@ def encode_to_stream(self, value, out, nested):
               "Attempted to encode null for non-nullable field \"{}\".".format(
                   field.name))
         continue
-      c.encode_to_stream(attr, out, True)
+      attrs_enc_pos.append((c, field.encoding_position, attr))

Review comment:
       I think we should only need to read the encoding positions once, when 
constructing the coder. We don't need to inspect the encoding positions and 
sort for every element.
   
   Instead, this should read the encoding positions in `__init__`, and 
transform them into a list of indexes that will encode in the proper order 
(`numpy.argsort` should do this). For the case where encoding positions aren't 
specified the list could just be `list(range(len(self.schema.fields)))`.
   
   It may also make sense to validate the encoding positions when constructing 
the coder (they should all be unique values in the range `[0, 
len(self.schema.fields))`), to be safe.

##########
File path: sdks/python/apache_beam/typehints/schemas.py
##########
@@ -138,8 +138,9 @@ def named_fields_to_schema(names_and_types):
   # type: (Sequence[Tuple[str, type]]) -> schema_pb2.Schema
   return schema_pb2.Schema(
       fields=[
-          schema_pb2.Field(name=name, type=typing_to_runner_api(type))
-          for (name, type) in names_and_types
+          schema_pb2.Field(
+              name=name, type=typing_to_runner_api(type), 
encoding_position=idx)
+          for idx, (name, type) in enumerate(names_and_types)

Review comment:
       I think we should actually leave the encoding positions undefined when 
we create schemas in this file. The encoding position is something that the 
runner will manipulate to ensure update compatibility, the SDK doesn't need to 
worry about it.

##########
File path: sdks/go/test/regression/coders/fromyaml/fromyaml.go
##########
@@ -83,11 +88,13 @@ func (s *Spec) testStandardCoder() (err error) {
                log.Printf("skipping unimplemented coder urn: %v", s.Coder.Urn)
                return nil
        }
-       // TODO(BEAM-9615): Support Logical types, and produce a better error 
message.
-       if strings.Contains(s.Coder.Payload, "logical") {
-               log.Printf("skipping coder with logical type. Unsupported in 
the Go SDK for now. Payload: %v", s.Coder.Payload)
-               return nil
+       for _, c := range filteredCases {
+               if strings.Contains(s.Coder.Payload, c.filter) {
+                       log.Printf("skipping coder case. Unsupported in the Go 
SDK for now: %v Payload: %v", c.reason, s.Coder.Payload)
+                       return nil
+               }

Review comment:
       I have very limited knowledge of Go, but this LGTM and works. Does it 
look ok to you @lostluck?

##########
File path: 
model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
##########
@@ -409,6 +409,26 @@ examples:
   
"\x01\x00\x00\x00\x00\x02\x03foo\x01\xa9F\x03bar\x01\xff\xff\xff\xff\xff\xff\xff\xff\x7f":
 {f_map: {"foo": 9001, "bar": 9223372036854775807}}
   
"\x01\x00\x00\x00\x00\x04\neverything\x00\x02is\x00\x05null!\x00\r\xc2\xaf\\_(\xe3\x83\x84)_/\xc2\xaf\x00":
 {f_map: {"everything": null, "is": null, "null!": null, "¯\\_(ツ)_/¯": null}}
 
+---
+# Binary data generated with the python SDK:
+# 
+# fields = [("foo", str), ("bar", str)]
+# schema = typing.NamedTuple( "test", fields)
+# coder = RowCoder.from_type_hint(schema, None)
+# examples = (
+#             coder.encode(schema(foo="str1",bar="str2")),
+#             coder.encode(schema(bar="str2",foo="str1"))
+#            )
+# print("schema = %s" % coder.schema.SerializeToString())

Review comment:
       You may need to use a different approach to generate test cases here. 
We'll want to have some test cases where the encoding position is different 
from the field order. In order to do that I think you'll need to manually 
construct a Schema proto object (as is done in some places in 
`schemas_test.py`), and make sure it has encoding positions that are out of 
order. Then you can serialize it with `SerializeToString()`. Does that make 
sense?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 665903)
    Time Spent: 15h 10m  (was: 15h)

> beam:coder:row:v1 implementations should respect encoding_position
> ------------------------------------------------------------------
>
>                 Key: BEAM-10277
>                 URL: https://issues.apache.org/jira/browse/BEAM-10277
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-core, sdk-py-core
>            Reporter: Brian Hulette
>            Assignee: Irwin Alejandro Rodirguez Ramirez
>            Priority: P3
>              Labels: Clarified
>          Time Spent: 15h 10m
>  Remaining Estimate: 0h
>
> h3. Problem/Status
> The schema proto has an [encoding_position 
> field|https://github.com/apache/beam/blob/2c619c81082839e054f16efee9311b9f74b6e436/model/pipeline/src/main/proto/schema.proto#L55]
>  that is currently unused in every row coder implementation. The intention of 
> this field is that it indicates an alternative order for the fields to be 
> encoded in by [beam:coder:row:v1 
> implementations|https://github.com/apache/beam/blob/1e60f383fb39b9ff8d44edcbe5357da4c1e52378/model/pipeline/src/main/proto/beam_runner_api.proto#L937-L990].
>  Currently all the implementations ignore this field, and always encode the 
> fields in the order that they appear in the schema.
> h3. Motivation
> The idea with the encoding position is that it will give runners a way to 
> enforce schema compatibility (BEAM-9502), by re-ordering the way fields are 
> encoded when the schema changes between two job submissions. Schema changes 
> could be due to fields re-ordering, or field additions/deletions.
> h3. Code pointers
> The Python beam:coder:row:v1 implementation lives in 
> [row_coder.py|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/coders/row_coder.py]
> The Java implementation is a little more complicated, distributed between 
> [SchemaCoder|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java],
>  
> [RowCoder|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java],
>  and 
> [RowCoderGenerator|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java].
>  RowCoderGenerator contains the code relevant to this jira - it uses 
> bytebuddy to generate Java code for the coder. We need it to generate code 
> that puts fields in the order specified by encoding_position.
> h3. Testing
> Python and Java implementations should be tested with unit tests 
> ([RowCoderTest|https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/RowCoderTest.java],
>  
> [row_coder_test|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/coders/row_coder_test.py]).
>  We should also test them for compatibility by adding test cases that 
> exercise the encoding_position in 
> [standard_coders.yaml|https://github.com/apache/beam/blob/master/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml].
>  These tests will be executed by 
> [CommonCoderTest|https://github.com/apache/beam/blob/master/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java]
>  and 
> [standard_coders_test|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/coders/standard_coders_test.py].
>  There's some example code for generating a new test case 
> [here|https://github.com/apache/beam/blob/master/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml#L387-L400].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to