This is an automated email from the ASF dual-hosted git repository.
ahmedabualsaud pushed a commit to branch release-2.54.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.54.0 by this push:
new 22ef257784f rename ExternalSchemaTransform to ExternalTransform
(#30114)
22ef257784f is described below
commit 22ef257784f1259f1c38448f4eb502f9c9233f04
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Thu Jan 25 12:07:00 2024 -0500
rename ExternalSchemaTransform to ExternalTransform (#30114)
---
..._provider.py => external_transform_provider.py} | 34 +++++++++++-----------
...test.py => external_transform_provider_test.py} | 18 ++++++------
2 files changed, 26 insertions(+), 26 deletions(-)
diff --git
a/sdks/python/apache_beam/transforms/external_schematransform_provider.py
b/sdks/python/apache_beam/transforms/external_transform_provider.py
similarity index 90%
rename from
sdks/python/apache_beam/transforms/external_schematransform_provider.py
rename to sdks/python/apache_beam/transforms/external_transform_provider.py
index fd650087893..26cc31471e6 100644
--- a/sdks/python/apache_beam/transforms/external_schematransform_provider.py
+++ b/sdks/python/apache_beam/transforms/external_transform_provider.py
@@ -29,7 +29,7 @@ from apache_beam.transforms.external import
SchemaTransformsConfig
from apache_beam.typehints.schemas import named_tuple_to_schema
from apache_beam.typehints.schemas import typing_from_runner_api
-__all__ = ['ExternalSchemaTransform', 'ExternalSchemaTransformProvider']
+__all__ = ['ExternalTransform', 'ExternalTransformProvider']
def snake_case_to_upper_camel_case(string):
@@ -84,7 +84,7 @@ def get_config_with_descriptions(
return fields_with_descriptions
-class ExternalSchemaTransform(PTransform):
+class ExternalTransform(PTransform):
"""Template for a wrapper class of an external SchemaTransform
This is a superclass for dynamically generated SchemaTransform wrappers and
@@ -93,7 +93,7 @@ class ExternalSchemaTransform(PTransform):
Experimental; no backwards compatibility guarantees."""
# These attributes need to be set when
- # creating an ExternalSchemaTransform type
+ # creating an ExternalTransform type
default_expansion_service = None
description: str = ""
identifier: str = ""
@@ -138,21 +138,21 @@ def infer_name_from_identifier(identifier: str, pattern:
str):
return ''.join(components)
-class ExternalSchemaTransformProvider:
+class ExternalTransformProvider:
"""Dynamically discovers Schema-aware external transforms from a given list
of expansion services and provides them as ready PTransforms.
- A :class:`ExternalSchemaTransform` subclass is generated for each external
+ A :class:`ExternalTransform` subclass is generated for each external
transform, and is named based on what can be inferred from the URN
(see :param urn_pattern).
- These classes are generated when :class:`ExternalSchemaTransformProvider` is
+ These classes are generated when :class:`ExternalTransformProvider` is
initialized. We need to give it one or more expansion service addresses that
are already up and running:
- >>> provider = ExternalSchemaTransformProvider(["localhost:12345",
+ >>> provider = ExternalTransformProvider(["localhost:12345",
... "localhost:12121"])
We can also give it the gradle target of a standard Beam expansion service:
- >>> provider = ExternalSchemaTransform(BeamJarExpansionService(
+ >>> provider = ExternalTransform(BeamJarExpansionService(
... "sdks:java:io:google-cloud-platform:expansion-service:shadowJar"))
Let's take a look at the output of :func:`get_available()` to know the
available transforms in the expansion service(s) we provided:
@@ -162,7 +162,7 @@ class ExternalSchemaTransformProvider:
...]
Then retrieve a transform by :func:`get()`, :func:`get_urn()`, or by directly
- accessing it as an attribute of :class:`ExternalSchemaTransformProvider`.
+ accessing it as an attribute of :class:`ExternalTransformProvider`.
All of the following commands do the same thing:
>>> provider.get('BigqueryStorageRead')
>>> provider.get_urn(
@@ -194,7 +194,7 @@ class ExternalSchemaTransformProvider:
Experimental; no backwards compatibility guarantees.
"""
def __init__(self, expansion_services, urn_pattern=STANDARD_URN_PATTERN):
- f"""Initialize an ExternalSchemaTransformProvider
+ f"""Initialize an ExternalTransformProvider
:param expansion_services:
A list of expansion services to discover transforms from.
@@ -207,7 +207,7 @@ class ExternalSchemaTransformProvider:
By default, the following pattern is used: [{STANDARD_URN_PATTERN}]
"""
self._urn_pattern = urn_pattern
- self._transforms: Dict[str, type(ExternalSchemaTransform)] = {}
+ self._transforms: Dict[str, type(ExternalTransform)] = {}
self._name_to_urn: Dict[str, str] = {}
if isinstance(expansion_services, set):
@@ -245,7 +245,7 @@ class ExternalSchemaTransformProvider:
continue
self._transforms[identifier] = type(
- name, (ExternalSchemaTransform, ),
+ name, (ExternalTransform, ),
dict(
identifier=identifier,
default_expansion_service=service,
@@ -265,13 +265,13 @@ class ExternalSchemaTransformProvider:
setattr(self, transform.__name__, transform)
def get_available(self) -> List[Tuple[str, str]]:
- """Get a list of available ExternalSchemaTransform names and identifiers"""
+ """Get a list of available ExternalTransform names and identifiers"""
return list(self._name_to_urn.items())
- def get(self, name) -> ExternalSchemaTransform:
- """Get an ExternalSchemaTransform by its inferred class name"""
+ def get(self, name) -> ExternalTransform:
+ """Get an ExternalTransform by its inferred class name"""
return self._transforms[self._name_to_urn[name]]
- def get_urn(self, identifier) -> ExternalSchemaTransform:
- """Get an ExternalSchemaTransform by its SchemaTransform identifier"""
+ def get_urn(self, identifier) -> ExternalTransform:
+ """Get an ExternalTransform by its SchemaTransform identifier"""
return self._transforms[identifier]
diff --git
a/sdks/python/apache_beam/transforms/external_schematransform_provider_test.py
b/sdks/python/apache_beam/transforms/external_transform_provider_test.py
similarity index 87%
rename from
sdks/python/apache_beam/transforms/external_schematransform_provider_test.py
rename to sdks/python/apache_beam/transforms/external_transform_provider_test.py
index bf951e671c2..36fe9b5c4bd 100644
---
a/sdks/python/apache_beam/transforms/external_schematransform_provider_test.py
+++ b/sdks/python/apache_beam/transforms/external_transform_provider_test.py
@@ -25,12 +25,12 @@ from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.transforms.external import BeamJarExpansionService
-from apache_beam.transforms.external_schematransform_provider import
STANDARD_URN_PATTERN
-from apache_beam.transforms.external_schematransform_provider import
ExternalSchemaTransformProvider
-from apache_beam.transforms.external_schematransform_provider import
camel_case_to_snake_case
-from apache_beam.transforms.external_schematransform_provider import
infer_name_from_identifier
-from apache_beam.transforms.external_schematransform_provider import
snake_case_to_lower_camel_case
-from apache_beam.transforms.external_schematransform_provider import
snake_case_to_upper_camel_case
+from apache_beam.transforms.external_transform_provider import
STANDARD_URN_PATTERN
+from apache_beam.transforms.external_transform_provider import
ExternalTransformProvider
+from apache_beam.transforms.external_transform_provider import
camel_case_to_snake_case
+from apache_beam.transforms.external_transform_provider import
infer_name_from_identifier
+from apache_beam.transforms.external_transform_provider import
snake_case_to_lower_camel_case
+from apache_beam.transforms.external_transform_provider import
snake_case_to_upper_camel_case
class NameUtilsTest(unittest.TestCase):
@@ -101,12 +101,12 @@ class NameUtilsTest(unittest.TestCase):
@unittest.skipUnless(
os.environ.get('EXPANSION_PORT'),
"EXPANSION_PORT environment var is not provided.")
-class ExternalSchemaTransformProviderTest(unittest.TestCase):
+class ExternalTransformProviderTest(unittest.TestCase):
def setUp(self):
self.test_pipeline = TestPipeline(is_integration_test=True)
def test_generate_sequence_config_schema_and_description(self):
- provider = ExternalSchemaTransformProvider(
+ provider = ExternalTransformProvider(
BeamJarExpansionService(":sdks:java:io:expansion-service:shadowJar"))
self.assertTrue((
@@ -125,7 +125,7 @@ class
ExternalSchemaTransformProviderTest(unittest.TestCase):
self.assertTrue(description_substring in GenerateSequence.description)
def test_run_generate_sequence(self):
- provider = ExternalSchemaTransformProvider(
+ provider = ExternalTransformProvider(
BeamJarExpansionService(":sdks:java:io:expansion-service:shadowJar"))
with beam.Pipeline() as p: