robertwb commented on code in PR #22802:
URL: https://github.com/apache/beam/pull/22802#discussion_r950617150
##########
sdks/python/apache_beam/transforms/external.py:
##########
@@ -104,6 +105,44 @@ def payload(self):
"""
return self.build().SerializeToString()
+ def get_schema_proto_and_payload(self, *args, **kwargs):
+ named_fields = []
+ fields_to_values = OrderedDict()
+ next_field_id = 0
+ for value in args:
+ if value is None:
+ raise ValueError(
+ 'Received value None. None values are currently not supported')
+ named_fields.append(
+ ((JavaClassLookupPayloadBuilder.IGNORED_ARG_FORMAT % next_field_id),
Review Comment:
This code isn't specific to JavaClassLookupPayload anymore, is it?
##########
model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto:
##########
@@ -106,4 +109,13 @@ message BuilderMethod {
bytes payload = 3;
}
+message SchemaTransformPayload {
+ // Identifier of the SchemaTransform
+ string identifier = 1;
+ // The config of the SchemaTransform.
+ // Should be decodable via beam:coder:row:v1.
+ // The schema of the Row should be compatible with the schema of the
+ // SchemaTransform denoted by the identifier.
+ bytes config_row = 2;
Review Comment:
Should we also provide the schema that this is encoded against, in case the
schema evolves (in a compatible way of course) between when the discovery
happens and when this request is made.
##########
sdks/python/apache_beam/transforms/external.py:
##########
@@ -104,6 +105,44 @@ def payload(self):
"""
return self.build().SerializeToString()
+ def get_schema_proto_and_payload(self, *args, **kwargs):
+ named_fields = []
+ fields_to_values = OrderedDict()
+ next_field_id = 0
+ for value in args:
+ if value is None:
+ raise ValueError(
+ 'Received value None. None values are currently not supported')
+ named_fields.append(
+ ((JavaClassLookupPayloadBuilder.IGNORED_ARG_FORMAT % next_field_id),
+ convert_to_typing_type(instance_to_type(value))))
+ fields_to_values[(
+ JavaClassLookupPayloadBuilder.IGNORED_ARG_FORMAT %
+ next_field_id)] = value
+ next_field_id += 1
+ for key, value in kwargs.items():
+ if not key:
+ raise ValueError('Parameter name cannot be empty')
+ if value is None:
+ raise ValueError(
+ 'Received value None for key %s. None values are currently not '
+ 'supported' % key)
+ named_fields.append(
+ (key, convert_to_typing_type(instance_to_type(value))))
+ fields_to_values[key] = value
+
+ schema_proto = named_fields_to_schema(named_fields)
+ row = named_tuple_from_schema(schema_proto)(**fields_to_values)
+
+ logging.error('********* xyz123 kwargs: %r', kwargs)
Review Comment:
Extra logging.
##########
model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto:
##########
@@ -106,4 +109,13 @@ message BuilderMethod {
bytes payload = 3;
}
+message SchemaTransformPayload {
+ // Identifier of the SchemaTransform
+ string identifier = 1;
Review Comment:
Should this typically be a urn as well?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]