ahmedabu98 commented on code in PR #34755: URL: https://github.com/apache/beam/pull/34755#discussion_r2064711882
########## sdks/python/apache_beam/transforms/external.py: ########## @@ -420,16 +455,56 @@ def __init__( named_tuple_to_schema(config.configuration_schema), **_kwargs) + if self._managed_replacement: + # We have to do the replacement at the expansion instead of at + # construction + # since we don't have access to the PipelineOptions object at the + # construction. + underlying_transform_id = ( + self._managed_replacement.underlying_transform_identifier) + if not (underlying_transform_id in + MANAGED_TRANSFORM_URN_TO_JAR_TARGET_MAPPING): + raise ValueError( + 'Could not find an expansion service jar for the managed ' + + 'transform ' + underlying_transform_id) + managed_expansion_service_jar = ( + MANAGED_TRANSFORM_URN_TO_JAR_TARGET_MAPPING + )[underlying_transform_id] + self._managed_expansion_service = BeamJarExpansionService( + managed_expansion_service_jar) + managed_config = SchemaAwareExternalTransform.discover_config( + self._managed_expansion_service, + MANAGED_SCHEMA_TRANSFORM_IDENTIFIER) + + yaml_config = yaml.dump(kwargs) + self._managed_payload_builder = ( + ExplicitSchemaTransformPayloadBuilder( + MANAGED_SCHEMA_TRANSFORM_IDENTIFIER, + named_tuple_to_schema(managed_config.configuration_schema), + transform_identifier=underlying_transform_id, + config=yaml_config)) else: self._payload_builder = SchemaTransformPayloadBuilder( identifier, **_kwargs) def expand(self, pcolls): # Expand the transform using the expansion service. + payload_builder = self._payload_builder + expansion_service = self._expansion_service + + compatibility_version_match = True + if self._managed_replacement and compatibility_version_match: + compat_version_match = not is_compat_version_prior_to( Review Comment: can we use a clearer name than `compat_version_match`? Not sure if this means the SDK version is before or after the specified `update_compatibility_version` ########## sdks/python/apache_beam/transforms/managed.py: ########## @@ -183,10 +178,8 @@ def _resolve_expansion_service( return expansion_service default_target = None - for gradle_target, transforms in _EXPANSION_SERVICE_JAR_TARGETS.items(): - if transform_name.lower() in transforms: - default_target = gradle_target - break + if identifier in MANAGED_TRANSFORM_URN_TO_JAR_TARGET_MAPPING: + default_target = MANAGED_TRANSFORM_URN_TO_JAR_TARGET_MAPPING.get(identifier) Review Comment: Might as well simplify and throw an error if identifier is not in `MANAGED_TRANSFORM_URN_TO_JAR_TARGET_MAPPING` ########## sdks/python/apache_beam/transforms/external.py: ########## @@ -378,6 +399,10 @@ def _has_constructor(self): 'SchemaTransformsConfig', ['identifier', 'configuration_schema', 'inputs', 'outputs', 'description']) +ManagedReplacement = namedtuple( + 'ManagedReplacement', + 'underlying_transform_identifier update_compatibility_version') Review Comment: ```suggestion 'underlying_transform_identifier, update_compatibility_version') ``` ########## sdks/python/apache_beam/yaml/yaml_provider.py: ########## @@ -48,6 +48,7 @@ import apache_beam.dataframe.io import apache_beam.io import apache_beam.transforms.util +from apache_beam import ManagedReplacement Review Comment: Curious how this is getting imported from top-level `apache_beam` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org