chamikaramj commented on code in PR #26100:
URL: https://github.com/apache/beam/pull/26100#discussion_r1157573798
##########
sdks/python/apache_beam/transforms/external.py:
##########
@@ -180,14 +180,52 @@ def _get_named_tuple_instance(self):
class SchemaTransformPayloadBuilder(PayloadBuilder):
- def __init__(self, identifier, **kwargs):
- self._identifier = identifier
+ def __init__(self, schematransform_config, strict_schema=False, **kwargs):
Review Comment:
I think it should be possible to use SchemaTransforms without the full
config or the schema (i.e. just using the schema transform ID and a set of
kwargs). Can you adjust the change so that the additional validation is
optional ?
##########
sdks/python/apache_beam/transforms/external.py:
##########
@@ -180,14 +180,52 @@ def _get_named_tuple_instance(self):
class SchemaTransformPayloadBuilder(PayloadBuilder):
- def __init__(self, identifier, **kwargs):
- self._identifier = identifier
+ def __init__(self, schematransform_config, strict_schema=False, **kwargs):
+ self._schematransform_config = schematransform_config
+ self._strict_schema = strict_schema
self._kwargs = kwargs
+ def _get_schema_proto_and_payload(self, **kwargs):
Review Comment:
Can we move the additional checks before this call and continue to use the
existing external._get_schema_proto_and_payload() method ?
##########
sdks/python/apache_beam/transforms/external.py:
##########
@@ -180,14 +180,52 @@ def _get_named_tuple_instance(self):
class SchemaTransformPayloadBuilder(PayloadBuilder):
- def __init__(self, identifier, **kwargs):
- self._identifier = identifier
+ def __init__(self, schematransform_config, strict_schema=False, **kwargs):
+ self._schematransform_config = schematransform_config
+ self._strict_schema = strict_schema
self._kwargs = kwargs
+ def _get_schema_proto_and_payload(self, **kwargs):
+ named_fields = []
+ fields_to_values = OrderedDict()
+ external_config_schema_fields = \
+ self._schematransform_config.configuration_schema._fields
+ kwargs_fields = tuple(self._kwargs.keys())
+
+ if self._strict_schema and external_config_schema_fields != kwargs_fields:
+ raise ValueError(
+ "Parameters in kwargs: %s do not match the external "
+ "SchemaTransform's configuration fields: %s" %
+ (kwargs_fields, external_config_schema_fields))
+
+ # The discover API allows us to obtain an ordered configuration schema
Review Comment:
I think instead of the "strict_schema" option, we should do a
"rearrange_based_on_discovery" option. If the option is not provided, we use
kwargs as is without the overhead of the additional RPC (this will work for
anything other than TypedSchemaTransformProvider). For
TypedSchemaTransformProvider, we would set the "rearrange_based_on_discovery"
option to true and would rearrange kwargs based on a discovery call before the
"_get_schema_proto_and_payload" invocation. WDYT ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]