TheNeuralBit commented on a change in pull request #15410:
URL: https://github.com/apache/beam/pull/15410#discussion_r739466199



##########
File path: sdks/python/apache_beam/typehints/schemas.py
##########
@@ -285,6 +285,21 @@ def _hydrate_namedtuple_instance(encoded_schema, values):
       proto_utils.parse_Bytes(encoded_schema, schema_pb2.Schema))(*values)
 
 
+def get_encoding_position(schema):
+  return [f.encoding_position for f in schema.fields]
+
+
+def set_encoding_position(type_, values):
+  if hasattr(type_, _BEAM_SCHEMA_ID):
+    schema = SCHEMA_REGISTRY.get_schema_by_id(getattr(type_, _BEAM_SCHEMA_ID))
+  else:
+    schema = named_tuple_to_schema(type_)
+  val = dict(values)
+  for idx, field in enumerate(schema.fields):
+    schema.fields[idx].encoding_position = val[field.name]
+  SCHEMA_REGISTRY.add(type_, schema)

Review comment:
       Could these helpers be moved to row_coder_test? They don't seem to be 
used otherwise.

##########
File path: 
model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
##########
@@ -409,6 +409,40 @@ examples:
   
"\x01\x00\x00\x00\x00\x02\x03foo\x01\xa9F\x03bar\x01\xff\xff\xff\xff\xff\xff\xff\xff\x7f":
 {f_map: {"foo": 9001, "bar": 9223372036854775807}}
   
"\x01\x00\x00\x00\x00\x04\neverything\x00\x02is\x00\x05null!\x00\r\xc2\xaf\\_(\xe3\x83\x84)_/\xc2\xaf\x00":
 {f_map: {"everything": null, "is": null, "null!": null, "¯\\_(ツ)_/¯": null}}
 
+---
+# Binary data generated with the python SDK:
+# 
+# schema1 = schema_pb2.Schema(
+#    id="testcase",
+#    fields=[
+#      schema_pb2.Field(
+#        name="str",
+#        type=schema_pb2.FieldType(atomic_type=schema_pb2.STRING),
+#        encoding_position=2
+#      ),
+#      schema_pb2.Field(
+#        name="f_bool",
+#        type=schema_pb2.FieldType(atomic_type=schema_pb2.BOOLEAN),
+#        encoding_position=0),
+#      schema_pb2.Field(
+#        name="i32",
+#        type=schema_pb2.FieldType(
+#          atomic_type=schema_pb2.INT32, nullable=True),
+#        encoding_position=1)

Review comment:
       Thanks this looks good. Note we'll need to make sure 
`encoding_positions_set` is true though, to be consistent with the other 
comment.

##########
File path: sdks/python/apache_beam/coders/row_coder.py
##########
@@ -182,7 +188,6 @@ def encode_to_stream(self, value, out, nested):
           words[i // 8] |= is_null << (i % 8)
 
     self.NULL_MARKER_CODER.encode_to_stream(words.tobytes(), out, True)
-
     for c, field, attr in zip(self.components, self.schema.fields, attrs):

Review comment:
       Don't we also need to adjust the order for `self.schema.fields` and 
`attrs` to be consistent with the order of `self.components`? Maybe I'm missing 
something though, since this is passing tests.

##########
File path: sdks/python/apache_beam/coders/row_coder.py
##########
@@ -164,7 +164,13 @@ class RowCoderImpl(StreamCoderImpl):
   def __init__(self, schema, components):
     self.schema = schema
     self.constructor = named_tuple_from_schema(schema)
-    self.components = list(c.get_impl() for c in components)
+    self.encoding_positions = list(range(len(self.schema.fields)))
+    if any(field.encoding_position for field in self.schema.fields):
+      self.encoding_positions = list(
+          field.encoding_position for field in self.schema.fields)

Review comment:
       Sorry I just realized that the spec changed a bit from what I was 
thinking: 
https://github.com/apache/beam/blob/2e448dee58f1ee60551cc47b9aa7df6bc832734a/model/pipeline/src/main/proto/schema.proto#L52-L58
   
   Note the part about `encoding_positions_set`. So this logic should look 
something like this (I also added a conservative check to verify 
encoding_positions are actually set):
   
   ```suggestion
       if self.schema.encoding_positions_set:
         if not all(field.encoding_position for field in self.schema.fields):
           raise ValueError(f"Schema with id {schema.id} has 
encoding_positions_set=True, but not all fields have encoding_position set")
         self.encoding_positions = list(
             field.encoding_position for field in self.schema.fields)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to