robertwb commented on code in PR #24667: URL: https://github.com/apache/beam/pull/24667#discussion_r1049129527
########## sdks/python/apache_beam/yaml/yaml_provider.py: ########## @@ -0,0 +1,398 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""This module defines Providers usable from yaml, which is a specification +for where to find and how to invoke services that vend implementations of +various PTransforms.""" + +import collections +import hashlib +import json +import os +import subprocess +import sys +import uuid +from typing import Iterable + +import apache_beam as beam +import apache_beam.io +import apache_beam.dataframe.io +import apache_beam.transforms.util +from apache_beam.internal import pickler +from apache_beam.portability.api import schema_pb2 +from apache_beam.transforms import external +from apache_beam.typehints import schemas +from apache_beam.typehints import trivial_inference +from apache_beam.utils import python_callable +from apache_beam.utils import subprocess_server +from apache_beam.transforms.fully_qualified_named_transform import FullyQualifiedNamedTransform + +import yaml +from yaml.loader import SafeLoader + + +class Provider: + """Maps transform types to concrete PTransform instances.""" + def available(self): + raise NotImplementedError(type(self)) + + def provided_transforms(self): + raise NotImplementedError(type(self)) + + def create_transform(self, typ, args): + raise NotImplementedError(type(self)) + + +class ExternalProvider(Provider): + """A Provider implemented via the cross language transform service.""" + def __init__(self, urns, service): + self._urns = urns + self._service = service + self._schema_transforms = None + + def provided_transforms(self): + return self._urns.keys() + + def create_transform(self, type, args): + if self._schema_transforms is None: + try: + self._schema_transforms = [ + config.identifier + for config in external.SchemaAwareExternalTransform.discover( + self._service) + ] + except Exception: + self._schema_transforms = [] + urn = self._urns[type] + if urn in self._schema_transforms: + return external.SchemaAwareExternalTransform(urn, self._service, **args) + else: + return type >> self.create_external_transform(urn, args) + + def create_external_transform(self, urn, args): + return external.ExternalTransform( + urn, + external.ImplicitSchemaPayloadBuilder(args).payload(), + self._service) + + @staticmethod + def provider_from_spec(spec): + urns = spec['transforms'] + type = spec['type'] + if type == 'jar': + return ExternalJavaProvider(urns, spec['jar']) + elif type == 'mavenJar': + return ExternalJavaProvider( + urns, + subprocess_server.JavaJarServer.path_to_maven_jar( + **{ + key: value + for (key, value) in spec.items() if key in [ + 'artifact_id', + 'group_id', + 'version', + 'repository', + 'classifier', + 'appendix' + ] + })) + elif type == 'beamJar': + return ExternalJavaProvider( + urns, + subprocess_server.JavaJarServer.path_to_beam_jar( + **{ + key: value + for (key, value) in spec.items() if key in + ['gradle_target', 'version', 'appendix', 'artifact_id'] + })) + elif type == 'pypi': + return ExternalPythonProvider(urns, spec['packages']) + elif type == 'docker': + raise NotImplementedError() + else: Review Comment: Yep. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
