TheNeuralBit commented on a change in pull request #15410:
URL: https://github.com/apache/beam/pull/15410#discussion_r739466199
##########
File path: sdks/python/apache_beam/typehints/schemas.py
##########
@@ -285,6 +285,21 @@ def _hydrate_namedtuple_instance(encoded_schema, values):
proto_utils.parse_Bytes(encoded_schema, schema_pb2.Schema))(*values)
+def get_encoding_position(schema):
+ return [f.encoding_position for f in schema.fields]
+
+
+def set_encoding_position(type_, values):
+ if hasattr(type_, _BEAM_SCHEMA_ID):
+ schema = SCHEMA_REGISTRY.get_schema_by_id(getattr(type_, _BEAM_SCHEMA_ID))
+ else:
+ schema = named_tuple_to_schema(type_)
+ val = dict(values)
+ for idx, field in enumerate(schema.fields):
+ schema.fields[idx].encoding_position = val[field.name]
+ SCHEMA_REGISTRY.add(type_, schema)
Review comment:
Could these helpers be moved to row_coder_test? They don't seem to be
used otherwise.
##########
File path:
model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
##########
@@ -409,6 +409,40 @@ examples:
"\x01\x00\x00\x00\x00\x02\x03foo\x01\xa9F\x03bar\x01\xff\xff\xff\xff\xff\xff\xff\xff\x7f":
{f_map: {"foo": 9001, "bar": 9223372036854775807}}
"\x01\x00\x00\x00\x00\x04\neverything\x00\x02is\x00\x05null!\x00\r\xc2\xaf\\_(\xe3\x83\x84)_/\xc2\xaf\x00":
{f_map: {"everything": null, "is": null, "null!": null, "¯\\_(ツ)_/¯": null}}
+---
+# Binary data generated with the python SDK:
+#
+# schema1 = schema_pb2.Schema(
+# id="testcase",
+# fields=[
+# schema_pb2.Field(
+# name="str",
+# type=schema_pb2.FieldType(atomic_type=schema_pb2.STRING),
+# encoding_position=2
+# ),
+# schema_pb2.Field(
+# name="f_bool",
+# type=schema_pb2.FieldType(atomic_type=schema_pb2.BOOLEAN),
+# encoding_position=0),
+# schema_pb2.Field(
+# name="i32",
+# type=schema_pb2.FieldType(
+# atomic_type=schema_pb2.INT32, nullable=True),
+# encoding_position=1)
Review comment:
Thanks this looks good. Note we'll need to make sure
`encoding_positions_set` is true though, to be consistent with the other
comment.
##########
File path: sdks/python/apache_beam/coders/row_coder.py
##########
@@ -182,7 +188,6 @@ def encode_to_stream(self, value, out, nested):
words[i // 8] |= is_null << (i % 8)
self.NULL_MARKER_CODER.encode_to_stream(words.tobytes(), out, True)
-
for c, field, attr in zip(self.components, self.schema.fields, attrs):
Review comment:
Don't we also need to adjust the order for `self.schema.fields` and
`attrs` to be consistent with the order of `self.components`? Maybe I'm missing
something though, since this is passing tests.
##########
File path: sdks/python/apache_beam/coders/row_coder.py
##########
@@ -164,7 +164,13 @@ class RowCoderImpl(StreamCoderImpl):
def __init__(self, schema, components):
self.schema = schema
self.constructor = named_tuple_from_schema(schema)
- self.components = list(c.get_impl() for c in components)
+ self.encoding_positions = list(range(len(self.schema.fields)))
+ if any(field.encoding_position for field in self.schema.fields):
+ self.encoding_positions = list(
+ field.encoding_position for field in self.schema.fields)
Review comment:
Sorry I just realized that the spec changed a bit from what I was
thinking:
https://github.com/apache/beam/blob/2e448dee58f1ee60551cc47b9aa7df6bc832734a/model/pipeline/src/main/proto/schema.proto#L52-L58
Note the part about `encoding_positions_set`. So this logic should look
something like this (I also added a conservative check to verify
encoding_positions are actually set):
```suggestion
if self.schema.encoding_positions_set:
if not all(field.encoding_position for field in self.schema.fields):
raise ValueError(f"Schema with id {schema.id} has
encoding_positions_set=True, but not all fields have encoding_position set")
self.encoding_positions = list(
field.encoding_position for field in self.schema.fields)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]