chamikaramj commented on code in PR #29561:
URL: https://github.com/apache/beam/pull/29561#discussion_r1417928153


##########
sdks/python/apache_beam/transforms/wrapper_provider.py:
##########
@@ -0,0 +1,186 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import logging
+import typing
+from collections import namedtuple
+
+from apache_beam.transforms import PTransform
+from apache_beam.transforms.external import BeamJarExpansionService
+from apache_beam.transforms.external import SchemaAwareExternalTransform
+from apache_beam.transforms.external import SchemaTransformsConfig
+from apache_beam.typehints.schemas import named_tuple_to_schema
+from apache_beam.typehints.schemas import typing_from_runner_api
+
+
+def snake_case_to_upper_camel_case(string):
+  """Convert snake_case to UpperCamelCase"""
+  components = string.split('_')
+  output = ''.join(n.capitalize() for n in components)
+  return output
+
+
+def snake_case_to_lower_camel_case(string):
+  """Convert snake_case to lowerCamelCase"""
+  if len(string) <= 1:
+    return string.lower()
+  upper = snake_case_to_upper_camel_case(string)
+  return upper[0].lower() + upper[1:]
+
+
+def camel_case_to_snake_case(string):
+  """Convert camelCase to snake_case"""
+  arr = ['_' + n.lower() if n.isupper() else n for n in string]
+  return ''.join(arr).lstrip('_')
+
+
+# Information regarding a Wrapper parameter.
+ParamInfo = namedtuple('ParamInfo', ['type', 'description', 'original_name'])
+
+
+def get_config_with_descriptions(schematransform: SchemaTransformsConfig):
+  # Prepare a configuration schema that includes types and descriptions
+  schema = named_tuple_to_schema(schematransform.configuration_schema)
+  descriptions = schematransform.configuration_schema._field_descriptions
+  fields_with_descriptions = {}
+  for field in schema.fields:
+    fields_with_descriptions[camel_case_to_snake_case(field.name)] = ParamInfo(
+        typing_from_runner_api(field.type),
+        descriptions[field.name],
+        field.name)
+
+  return fields_with_descriptions
+
+
+class Wrapper(PTransform):
+  """Template for a SchemaTransform Python wrappeer"""
+
+  # These attributes need to be set when a Wrapper type is created
+  default_expansion_service = None
+  identifier = None
+
+  def __init__(self, expansion_service=None, **kwargs):
+    self._kwargs = kwargs
+    self._expansion_service = \
+        expansion_service or self.default_expansion_service
+    self.schematransform: SchemaTransformsConfig = \

Review Comment:
   Can the discover_config call be invoked once during wrapper generation 
instead of being invoked every time a transform is initialized ?



##########
sdks/python/apache_beam/transforms/wrapper_provider.py:
##########
@@ -0,0 +1,186 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import logging
+import typing
+from collections import namedtuple
+
+from apache_beam.transforms import PTransform
+from apache_beam.transforms.external import BeamJarExpansionService
+from apache_beam.transforms.external import SchemaAwareExternalTransform
+from apache_beam.transforms.external import SchemaTransformsConfig
+from apache_beam.typehints.schemas import named_tuple_to_schema
+from apache_beam.typehints.schemas import typing_from_runner_api
+
+
+def snake_case_to_upper_camel_case(string):
+  """Convert snake_case to UpperCamelCase"""
+  components = string.split('_')
+  output = ''.join(n.capitalize() for n in components)
+  return output
+
+
+def snake_case_to_lower_camel_case(string):
+  """Convert snake_case to lowerCamelCase"""
+  if len(string) <= 1:
+    return string.lower()
+  upper = snake_case_to_upper_camel_case(string)
+  return upper[0].lower() + upper[1:]
+
+
+def camel_case_to_snake_case(string):
+  """Convert camelCase to snake_case"""
+  arr = ['_' + n.lower() if n.isupper() else n for n in string]
+  return ''.join(arr).lstrip('_')
+
+
+# Information regarding a Wrapper parameter.
+ParamInfo = namedtuple('ParamInfo', ['type', 'description', 'original_name'])
+
+
+def get_config_with_descriptions(schematransform: SchemaTransformsConfig):
+  # Prepare a configuration schema that includes types and descriptions
+  schema = named_tuple_to_schema(schematransform.configuration_schema)
+  descriptions = schematransform.configuration_schema._field_descriptions
+  fields_with_descriptions = {}
+  for field in schema.fields:
+    fields_with_descriptions[camel_case_to_snake_case(field.name)] = ParamInfo(
+        typing_from_runner_api(field.type),
+        descriptions[field.name],
+        field.name)
+
+  return fields_with_descriptions
+
+
+class Wrapper(PTransform):
+  """Template for a SchemaTransform Python wrappeer"""
+
+  # These attributes need to be set when a Wrapper type is created
+  default_expansion_service = None
+  identifier = None
+
+  def __init__(self, expansion_service=None, **kwargs):
+    self._kwargs = kwargs
+    self._expansion_service = \
+        expansion_service or self.default_expansion_service
+    self.schematransform: SchemaTransformsConfig = \
+      SchemaAwareExternalTransform.discover_config(
+        self._expansion_service, self.identifier)
+
+  def expand(self, input):
+    camel_case_kwargs = {
+        snake_case_to_lower_camel_case(k): v
+        for k, v in self._kwargs.items()
+    }
+
+    external_schematransform = SchemaAwareExternalTransform(
+        identifier=self.identifier,
+        expansion_service=self._expansion_service,
+        rearrange_based_on_discovery=True,
+        **camel_case_kwargs)
+
+    input_tags = self.schematransform.inputs
+    # TODO(ahmedabu98): how do we handle the case of multiple input pcolls?
+    if input_tags and len(input_tags) == 1:
+      return {input_tags[0]: input} | external_schematransform
+    else:
+      return input.pipeline | external_schematransform
+
+
+class WrapperProvider:
+  def __init__(self, expansion_services=None):
+    self.wrappers = {}
+    self.urn_to_wrapper_name = {}
+
+    if expansion_services is None:
+      expansion_services = []
+    if isinstance(expansion_services, set):
+      expansion_services = list(expansion_services)
+    if not isinstance(expansion_services, list):
+      expansion_services = [expansion_services]
+    self.expansion_services = expansion_services
+
+  def _create_wrappers(self):
+    # multiple services can overlap and include the same URNs. If this happens,
+    # we prioritize by the order of services in the list
+    identifiers = set()
+    for service in self.expansion_services:
+      target = service
+      if isinstance(service, BeamJarExpansionService):
+        target = service.gradle_target
+      try:
+        schematransform_configs = 
SchemaAwareExternalTransform.discover(service)
+      except Exception as e:
+        logging.exception(
+            "Encountered an error while discovering expansion service %s:\n%s",
+            target,
+            e)
+        continue
+      skipped_urns = []
+      for config in schematransform_configs:
+        if config.identifier not in identifiers:
+          identifiers.add(config.identifier)
+          identifier_components = config.identifier.split(':')
+          # We expect URNs like
+          # `beam:schematransform:org.apache.beam:my_transform:v1`

Review Comment:
   Will it be possible to override this behavior via configs ? I don't think we 
should strictly rely on this URN format.



##########
sdks/python/apache_beam/transforms/wrapper_provider.py:
##########
@@ -0,0 +1,186 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import logging
+import typing
+from collections import namedtuple
+
+from apache_beam.transforms import PTransform
+from apache_beam.transforms.external import BeamJarExpansionService
+from apache_beam.transforms.external import SchemaAwareExternalTransform
+from apache_beam.transforms.external import SchemaTransformsConfig
+from apache_beam.typehints.schemas import named_tuple_to_schema
+from apache_beam.typehints.schemas import typing_from_runner_api
+
+
+def snake_case_to_upper_camel_case(string):
+  """Convert snake_case to UpperCamelCase"""
+  components = string.split('_')
+  output = ''.join(n.capitalize() for n in components)
+  return output
+
+
+def snake_case_to_lower_camel_case(string):
+  """Convert snake_case to lowerCamelCase"""
+  if len(string) <= 1:
+    return string.lower()
+  upper = snake_case_to_upper_camel_case(string)
+  return upper[0].lower() + upper[1:]
+
+
+def camel_case_to_snake_case(string):
+  """Convert camelCase to snake_case"""
+  arr = ['_' + n.lower() if n.isupper() else n for n in string]
+  return ''.join(arr).lstrip('_')
+
+
+# Information regarding a Wrapper parameter.
+ParamInfo = namedtuple('ParamInfo', ['type', 'description', 'original_name'])
+
+
+def get_config_with_descriptions(schematransform: SchemaTransformsConfig):
+  # Prepare a configuration schema that includes types and descriptions
+  schema = named_tuple_to_schema(schematransform.configuration_schema)
+  descriptions = schematransform.configuration_schema._field_descriptions
+  fields_with_descriptions = {}
+  for field in schema.fields:
+    fields_with_descriptions[camel_case_to_snake_case(field.name)] = ParamInfo(
+        typing_from_runner_api(field.type),
+        descriptions[field.name],
+        field.name)
+
+  return fields_with_descriptions
+
+
+class Wrapper(PTransform):
+  """Template for a SchemaTransform Python wrappeer"""
+
+  # These attributes need to be set when a Wrapper type is created
+  default_expansion_service = None
+  identifier = None
+
+  def __init__(self, expansion_service=None, **kwargs):
+    self._kwargs = kwargs
+    self._expansion_service = \
+        expansion_service or self.default_expansion_service
+    self.schematransform: SchemaTransformsConfig = \
+      SchemaAwareExternalTransform.discover_config(
+        self._expansion_service, self.identifier)
+
+  def expand(self, input):
+    camel_case_kwargs = {
+        snake_case_to_lower_camel_case(k): v
+        for k, v in self._kwargs.items()
+    }
+
+    external_schematransform = SchemaAwareExternalTransform(
+        identifier=self.identifier,
+        expansion_service=self._expansion_service,
+        rearrange_based_on_discovery=True,
+        **camel_case_kwargs)
+
+    input_tags = self.schematransform.inputs
+    # TODO(ahmedabu98): how do we handle the case of multiple input pcolls?
+    if input_tags and len(input_tags) == 1:
+      return {input_tags[0]: input} | external_schematransform
+    else:
+      return input.pipeline | external_schematransform
+
+
+class WrapperProvider:

Review Comment:
   Please add documentation.



##########
sdks/java/core/expansion-service/build.gradle:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software

Review Comment:
   This creates a circular dependency since sdks/java/expansion-service library 
depends on sdks/java/core. Was this created just to use GenerateSequence ?
   
   Transforms in core should be already available in all the expansion services 
so you could just use one of them (for example, sdks/java/io/expansion-service).
   
   



##########
sdks/python/apache_beam/transforms/wrapper_provider_test.py:
##########
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import logging
+import os
+import unittest
+
+import pytest
+
+import apache_beam as beam
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+from apache_beam.transforms.external import BeamJarExpansionService
+from apache_beam.transforms.wrapper_provider import WrapperProvider
+
+
[email protected]_core_java_expansion_service
[email protected](
+    os.environ.get('EXPANSION_PORT'),
+    "EXPANSION_PORT environment var is not provided.")
+class WrapperProviderTest(unittest.TestCase):
+  def setUp(self):
+    self.test_pipeline = TestPipeline(is_integration_test=True)
+
+  def test_generate_sequence_config_schema(self):
+    wrapper_provider = WrapperProvider(
+        BeamJarExpansionService(":sdks:java:core:expansion-service:shadowJar"))
+
+    self.assertTrue('GenerateSequence' in wrapper_provider.get_available())
+    generate_sequence = wrapper_provider.get('GenerateSequence')
+
+    config_schema = generate_sequence.configuration_schema
+    for param in ['start', 'end', 'rate']:
+      self.assertTrue(param in config_schema)
+
+  def test_run_generate_sequence(self):
+    wrapper_provider = WrapperProvider(
+        BeamJarExpansionService(":sdks:java:core:expansion-service:shadowJar"))
+
+    self.assertTrue('GenerateSequence' in wrapper_provider.get_available())
+
+    generate_sequence = wrapper_provider.get('GenerateSequence')

Review Comment:
   Is it possible to generate generate_sequence.py as an actual Python module 
that can be inspected by IDEs etc ?



##########
sdks/python/apache_beam/transforms/wrapper_provider.py:
##########
@@ -0,0 +1,186 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import logging
+import typing
+from collections import namedtuple
+
+from apache_beam.transforms import PTransform
+from apache_beam.transforms.external import BeamJarExpansionService
+from apache_beam.transforms.external import SchemaAwareExternalTransform
+from apache_beam.transforms.external import SchemaTransformsConfig
+from apache_beam.typehints.schemas import named_tuple_to_schema
+from apache_beam.typehints.schemas import typing_from_runner_api
+
+
+def snake_case_to_upper_camel_case(string):
+  """Convert snake_case to UpperCamelCase"""
+  components = string.split('_')
+  output = ''.join(n.capitalize() for n in components)
+  return output
+
+
+def snake_case_to_lower_camel_case(string):
+  """Convert snake_case to lowerCamelCase"""
+  if len(string) <= 1:
+    return string.lower()
+  upper = snake_case_to_upper_camel_case(string)
+  return upper[0].lower() + upper[1:]
+
+
+def camel_case_to_snake_case(string):
+  """Convert camelCase to snake_case"""
+  arr = ['_' + n.lower() if n.isupper() else n for n in string]
+  return ''.join(arr).lstrip('_')
+
+
+# Information regarding a Wrapper parameter.
+ParamInfo = namedtuple('ParamInfo', ['type', 'description', 'original_name'])
+
+
+def get_config_with_descriptions(schematransform: SchemaTransformsConfig):
+  # Prepare a configuration schema that includes types and descriptions
+  schema = named_tuple_to_schema(schematransform.configuration_schema)
+  descriptions = schematransform.configuration_schema._field_descriptions
+  fields_with_descriptions = {}
+  for field in schema.fields:
+    fields_with_descriptions[camel_case_to_snake_case(field.name)] = ParamInfo(
+        typing_from_runner_api(field.type),
+        descriptions[field.name],
+        field.name)
+
+  return fields_with_descriptions
+
+
+class Wrapper(PTransform):
+  """Template for a SchemaTransform Python wrappeer"""

Review Comment:
   s/wrappeer/wrapper



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to