ahmedabu98 commented on code in PR #29834:
URL: https://github.com/apache/beam/pull/29834#discussion_r1456345701


##########
sdks/python/gen_xlang_wrappers.py:
##########
@@ -0,0 +1,366 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Generates Python wrappers for external transforms (specifically,
+SchemaTransforms)
+"""
+
+import argparse
+import logging
+import os
+import re
+import typing
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Union
+
+import yaml
+from jinja2 import Environment
+from jinja2 import FileSystemLoader
+
+from apache_beam.transforms.external import BeamJarExpansionService
+from apache_beam.transforms.external_schematransform_provider import 
STANDARD_URN_PATTERN
+from apache_beam.transforms.external_schematransform_provider import 
ExternalSchemaTransform
+from apache_beam.transforms.external_schematransform_provider import 
ExternalSchemaTransformProvider
+from apache_beam.transforms.external_schematransform_provider import 
camel_case_to_snake_case
+from gen_protos import LICENSE_HEADER
+from gen_protos import PYTHON_SDK_ROOT
+from gen_protos import find_by_ext
+
+SUPPORTED_SDK_DESTINATIONS = ['python']
+PYTHON_SUFFIX = "_et.py"
+
+
+def generate_transform_configs(input_services, output_file):
+  """
+  Generates a YAML file containing a list of transform configurations.
+
+  Takes an input YAML file containing a list of expansion service gradle
+  targets. Each service must provide a `destinations` field that specifies the
+  default package (relative path) that generated wrappers should be written
+  under. A default destination is specified for each SDK, like so::
+
+    - gradle_target: 'sdks:java:io:expansion-service:shadowJar'
+      destinations:
+        python: 'apache_beam/io'
+
+
+  Each service may also specify modifications for particular transform.
+  Currently, one can modify the generated wrapper's name and destination file:
+
+    - By default, the transform's identifier is used to generate the wrapper
+      class name. This can be overriden by manually providing a name.
+    - By default, generated wrappers are written to files within the package
+      provided by the expansion service. This can be overridden by manually
+      providing a relative file path.
+
+  See the following example for what such modifications can look like::
+
+    - gradle_target: 'sdks:java:io:expansion-service:shadowJar'
+      destinations:
+        python: 'apache_beam/io'
+      transforms:
+        'beam:schematransform:org.apache.beam:my_transform:v1':
+          name: 'MyCustomTransformName'
+          destinations:
+            python: 'apache_beam/io/gcp/my_custom_module'
+
+  For the above example, we would take the transform with identifier
+  `beam:schematransform:org.apache.beam:my_transform:v1` and by default infer
+  a wrapper class name of `MyTransform` and write the generated code to
+  the module `apache_beam/io/my_transform_et.py`. With the modifications, we
+  instead write the wrapper to `apache_beam/io/gcp/my_custom_module_et.py` and
+  name the class `MyCustomTransformName`.
+
+  Note: we add the prefix `"_et.py"` to the module name so that we can find
+  these generated files later (e.g. to tell Git to ignore them, and to
+  delete them when needed)
+
+  To ignore a particular transform, simply list its identifier in the `ignore`
+  field, like so::
+
+    - gradle_target: 'sdks:java:io:expansion-service:shadowJar'
+      destinations:
+        python: 'apache_beam/io'
+      ignore:
+        - 'beam:schematransform:org.apache.beam:skip_me:v1':
+
+
+  We use :class:`ExternalSchemaTransformProvider` to discover external
+  transforms. Then, we extract the necessary details of each transform and
+  compile them into a new YAML file, which is later used to generate wrappers.
+  """
+  transform_list: List[Dict[str, Any]] = []
+
+  with open(input_services) as f:
+    services = yaml.safe_load(f)
+  for service in services:
+    target = service['gradle_target']
+
+    # validate expansion service destinations
+    if "destinations" not in service:
+      raise ValueError(
+          f"Expansion service with target [{target}] does not "
+          "specify any default destinations.")
+    service_destinations: Dict[str, str] = service['destinations']
+    for sdk in service_destinations.keys():
+      if sdk not in SUPPORTED_SDK_DESTINATIONS:
+        raise ValueError(
+            f"Service with target {target} specifies a "
+            f"destination for an invalid SDK: {sdk}. The "
+            f"supported SDKs are {SUPPORTED_SDK_DESTINATIONS}")
+
+    # get transforms to skip, if any
+    ignore = service.get('ignore', [])
+
+    # use dynamic provider to discover and populate wrapper details
+    provider = ExternalSchemaTransformProvider(BeamJarExpansionService(target))
+    discovered: Dict[str, ExternalSchemaTransform] = provider.get_all()
+    for identifier, wrapper in discovered.items():
+      if identifier in ignore:
+        continue
+      # We infer the destination from the URN and service destination.
+      # For example, the Java IO expansion service defaults to Python
+      # package apache_beam/io. Kafka Write is a transform in this service
+      # with URN beam:schematransform:org.apache.beam:kafka_write:v1
+      # In this case, we infer the destination apache_beam/io/kafka_write
+      functionality_identifier = re.match(STANDARD_URN_PATTERN,
+                                          identifier).groups()[0]
+      destinations = {
+          sdk: f"{destination}/{functionality_identifier}"
+          for sdk,
+          destination in service_destinations.items()
+      }
+      name = wrapper.__name__
+
+      # apply any modifications
+      modified_transform = {}
+      if 'transforms' in service and identifier in service['transforms']:
+        modified_transform = service['transforms'][identifier]
+      if 'name' in modified_transform:
+        name = modified_transform['name']  # override the name
+      if 'destinations' in modified_transform:
+        for sdk, destination in modified_transform['destinations'].items():
+          if sdk not in SUPPORTED_SDK_DESTINATIONS:
+            raise ValueError(
+                f"Identifier {identifier} specifies a destination for "
+                f"an invalid SDK: [{sdk}]. The supported SDKs "
+                f"are {SUPPORTED_SDK_DESTINATIONS}")
+          destinations[sdk] = destination  # override the destination
+
+      # prepare information about parameters
+      fields = {}
+      for param in wrapper.configuration_schema.values():
+        tp = param.type
+        nullable = False
+        # if type is typing.Optional[...]
+        if (typing.get_origin(tp) is Union and

Review Comment:
   The purpose of the logic here is to take the wrapper's Python types and 
clean them up for documentation. 
   
   For example, we want to display `Dict[str, str]` instead of 
`typing.Optional[typing.Dict[str, str]]` (user can see if the param is optional 
by looking the constructor definition).
   
   In the next iteration of this, we should figure out how to express these 
types in prototext (via runner api) then convert that prototext back to native 
types. That will most likely include work in our schema utils.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to