This is an automated email from the ASF dual-hosted git repository.
ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 11f9bce485c Generate external transform wrappers using a script
(#29834)
11f9bce485c is described below
commit 11f9bce485c4f6fe466ff4bf5073d2414e43678c
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Thu Feb 22 16:13:55 2024 -0500
Generate external transform wrappers using a script (#29834)
---
...=> beam_PostCommit_Python_Examples_Direct.json} | 0
.../beam_PostCommit_Python_Xlang_Gcp_Dataflow.json | 1 +
...> beam_PostCommit_Python_Xlang_Gcp_Direct.json} | 0
... beam_PostCommit_Python_Xlang_IO_Dataflow.json} | 0
.github/workflows/README.md | 1 +
.../beam_PreCommit_Xlang_Generated_Transforms.yml | 114 ++++++
.gitignore | 1 +
CHANGES.md | 1 +
build.gradle.kts | 5 +
.../org/apache/beam/gradle/BeamModulePlugin.groovy | 100 ++---
scripts/ci/release/test/resources/mass_comment.txt | 2 +
.../GenerateSequenceSchemaTransformProvider.java | 2 +-
sdks/python/MANIFEST.in | 1 +
sdks/python/apache_beam/io/__init__.py | 1 +
.../io/external/xlang_bigqueryio_it_test.py | 23 +-
.../apache_beam/io/gcp/bigtableio_it_test.py | 22 +-
.../transforms/external_transform_provider.py | 8 +-
.../external_transform_provider_it_test.py | 413 +++++++++++++++++++
.../transforms/external_transform_provider_test.py | 140 -------
.../transforms/xlang/__init__.py} | 15 +-
sdks/python/build.gradle | 19 +
sdks/python/gen_xlang_wrappers.py | 447 +++++++++++++++++++++
sdks/python/pyproject.toml | 6 +
sdks/python/pytest.ini | 1 +
sdks/python/python_xlang_wrapper.template | 36 ++
sdks/python/setup.py | 49 ++-
sdks/python/test-suites/dataflow/common.gradle | 5 +-
sdks/python/test-suites/direct/build.gradle | 7 +
sdks/python/test-suites/direct/common.gradle | 8 +-
sdks/python/test-suites/xlang/build.gradle | 56 +--
sdks/python/tox.ini | 4 +-
sdks/standard_expansion_services.yaml | 77 ++++
sdks/standard_external_transforms.yaml | 52 +++
33 files changed, 1330 insertions(+), 287 deletions(-)
diff --git
a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json
b/.github/trigger_files/beam_PostCommit_Python_Examples_Direct.json
similarity index 100%
copy from .github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json
copy to .github/trigger_files/beam_PostCommit_Python_Examples_Direct.json
diff --git
a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json
b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json
index e69de29bb2d..8b137891791 100644
--- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json
+++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json
@@ -0,0 +1 @@
+
diff --git
a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json
b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
similarity index 100%
copy from .github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json
copy to .github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
diff --git
a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json
b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json
similarity index 100%
copy from .github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json
copy to .github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json
diff --git a/.github/workflows/README.md b/.github/workflows/README.md
index f882553ef9b..42eaaaafdac 100644
--- a/.github/workflows/README.md
+++ b/.github/workflows/README.md
@@ -271,6 +271,7 @@ PreCommit Jobs run in a schedule and also get triggered in
a PR if relevant sour
| [ PreCommit Website
](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Website.yml)
| N/A |`Run Website PreCommit`|
[](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Website.yml?query=event%3Aschedule)
|
| [ PreCommit Website Stage GCS
](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Website_Stage_GCS.yml)
| N/A |`Run Website_Stage_GCS PreCommit`|
[](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Website_Stage_GCS.yml?query=event%3Aschedule)
|
| [ PreCommit Whitespace
](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Whitespace.yml)
| N/A |`Run Whitespace PreCommit`|
[](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Whitespace.yml?query=event%3Aschedule)
|
+| [ PreCommit Xlang Generated Transforms
](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Xlang_Generated_Transforms.yml)
| N/A |`Run Xlang_Generated_Transforms PreCommit`|
[](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Xlang_Generated_Transforms.yml?query=event%3Asch
[...]
### PostCommit Jobs
diff --git a/.github/workflows/beam_PreCommit_Xlang_Generated_Transforms.yml
b/.github/workflows/beam_PreCommit_Xlang_Generated_Transforms.yml
new file mode 100644
index 00000000000..f8d64eb5d4a
--- /dev/null
+++ b/.github/workflows/beam_PreCommit_Xlang_Generated_Transforms.yml
@@ -0,0 +1,114 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: PreCommit Xlang Generated Transforms
+
+on:
+ push:
+ tags: ['v*']
+ branches: ['master', 'release-*']
+ paths:
+ - 'model/**'
+ - 'sdks/python/**'
+ - 'sdks/java/expansion-service/**'
+ - 'sdks/java/core/**'
+ - 'sdks/java/io/**'
+ - 'sdks/java/extensions/sql/**'
+ - 'release/**'
+ - '.github/workflows/beam_PreCommit_Xlang_Generated_Transforms.yml'
+ pull_request_target:
+ branches: ['master', 'release-*']
+ paths:
+ - 'model/**'
+ - 'sdks/python/**'
+ - 'sdks/java/expansion-service/**'
+ - 'sdks/java/core/**'
+ - 'sdks/java/io/**'
+ - 'sdks/java/extensions/sql/**'
+ - 'release/**'
+ - 'release/trigger_all_tests.json'
+ - '.github/workflows/beam_PreCommit_Xlang_Generated_Transforms.yml'
+ issue_comment:
+ types: [created]
+ schedule:
+ - cron: '30 2/6 * * *'
+ workflow_dispatch:
+
+#Setting explicit permissions for the action to avoid the default permissions
which are `write-all` in case of pull_request_target event
+permissions:
+ actions: write
+ pull-requests: read
+ checks: read
+ contents: read
+ deployments: read
+ id-token: none
+ issues: read
+ discussions: read
+ packages: read
+ pages: read
+ repository-projects: read
+ security-events: read
+ statuses: read
+
+# This allows a subsequently queued workflow run to interrupt previous runs
+concurrency:
+ group: '${{ github.workflow }} @ ${{ github.event.issue.number ||
github.event.pull_request.head.label || github.sha || github.head_ref ||
github.ref }}-${{ github.event.schedule || github.event.comment.id ||
github.event.sender.login }}'
+ cancel-in-progress: true
+
+env:
+ GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
+ GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }}
+ GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }}
+
+jobs:
+ beam_PreCommit_Xlang_Generated_Transforms:
+ name: ${{ matrix.job_name }} (${{ matrix.job_phrase }} ${{
matrix.python_version }})
+ timeout-minutes: 120
+ runs-on: ['self-hosted', ubuntu-20.04, main]
+ strategy:
+ fail-fast: false
+ matrix:
+ job_name: ['beam_PreCommit_Xlang_Generated_Transforms']
+ job_phrase: ['Run Xlang_Generated_Transforms PreCommit']
+ python_version: ['3.8']
+ if: |
+ github.event_name == 'push' ||
+ github.event_name == 'workflow_dispatch' ||
+ github.event_name == 'pull_request_target' ||
+ (github.event_name == 'schedule' && github.repository == 'apache/beam')
||
+ startsWith(github.event.comment.body, 'Run Xlang_Generated_Transforms
PreCommit')
+ steps:
+ - uses: actions/checkout@v4
+ - name: Setup repository
+ uses: ./.github/actions/setup-action
+ with:
+ comment_phrase: ${{ matrix.job_phrase }}
+ github_token: ${{ secrets.GITHUB_TOKEN }}
+ github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }}
+ - name: Setup environment
+ uses: ./.github/actions/setup-environment-action
+ with:
+ java-version: 8
+ python-version: ${{ matrix.python_version }}
+ - name: Set PY_VER_CLEAN
+ id: set_py_ver_clean
+ run: |
+ PY_VER=${{ matrix.python_version }}
+ PY_VER_CLEAN=${PY_VER//.}
+ echo "py_ver_clean=$PY_VER_CLEAN" >> $GITHUB_OUTPUT
+ - name: run Cross-Language Wrapper Validation script
+ uses: ./.github/actions/gradle-command-self-hosted-action
+ with:
+ gradle-command:
:sdks:python:test-suites:direct:crossLanguageWrapperValidationPreCommit
\ No newline at end of file
diff --git a/.gitignore b/.gitignore
index 3f7d7af44a8..c76441078f3 100644
--- a/.gitignore
+++ b/.gitignore
@@ -52,6 +52,7 @@ sdks/python/**/*.egg
sdks/python/LICENSE
sdks/python/NOTICE
sdks/python/README.md
+sdks/python/apache_beam/transforms/xlang/*
sdks/python/apache_beam/portability/api/*
sdks/python/apache_beam/yaml/docs/*
sdks/python/nosetests*.xml
diff --git a/CHANGES.md b/CHANGES.md
index 249897b1697..2e23d72e664 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -58,6 +58,7 @@
* New highly anticipated feature X added to Python SDK
([#X](https://github.com/apache/beam/issues/X)).
* New highly anticipated feature Y added to Java SDK
([#Y](https://github.com/apache/beam/issues/Y)).
+* The Python SDK will now include automatically generated wrappers for
external Java transforms! ([#29834](https://github.com/apache/beam/pull/29834))
## I/Os
diff --git a/build.gradle.kts b/build.gradle.kts
index 67653e59c0d..788d9a48dc0 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -647,6 +647,11 @@ tasks.register("checkSetup") {
dependsOn(":examples:java:wordCount")
}
+// Generates external transform config
+project.tasks.register("generateExternalTransformsConfig") {
+ dependsOn(":sdks:python:generateExternalTransformsConfig")
+}
+
// Configure the release plugin to do only local work; the release manager
determines what, if
// anything, to push. On failure, the release manager can reset the branch
without pushing.
release {
diff --git
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 5b73940b99d..6434746fd3a 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -321,19 +321,17 @@ class BeamModulePlugin implements Plugin<Project> {
// A class defining the common properties in a given suite of cross-language
tests
// Properties are shared across runners and are used when creating a
CrossLanguageUsingJavaExpansionConfiguration object
- static class CrossLanguageTaskCommon {
+ static class CrossLanguageTask {
// Used as the task name for cross-language
String name
- // The expansion service's project path (required)
- String expansionProjectPath
+ // List of project paths for required expansion services
+ List<String> expansionProjectPaths
// Collect Python pipeline tests with this marker
String collectMarker
- // Job server startup task.
- TaskProvider startJobServer
- // Job server cleanup task.
- TaskProvider cleanupJobServer
- // any additional environment variables specific to the suite of tests
+ // Additional environment variables to set before running tests
Map<String,String> additionalEnvs
+ // Additional Python dependencies to install before running tests
+ List<String> additionalDeps
}
// A class defining the configuration for CrossLanguageUsingJavaExpansion.
@@ -349,18 +347,16 @@ class BeamModulePlugin implements Plugin<Project> {
]
// Additional pytest options
List<String> pytestOptions = []
- // Job server startup task.
- TaskProvider startJobServer
- // Job server cleanup task.
- TaskProvider cleanupJobServer
// Number of parallel test runs.
Integer numParallelTests = 1
- // Project path for the expansion service to start up
- String expansionProjectPath
+ // List of project paths for required expansion services
+ List<String> expansionProjectPaths
// Collect Python pipeline tests with this marker
String collectMarker
// any additional environment variables to be exported
Map<String,String> additionalEnvs
+ // Additional Python dependencies to install before running tests
+ List<String> additionalDeps
}
// A class defining the configuration for CrossLanguageValidatesRunner.
@@ -2576,7 +2572,7 @@ class BeamModulePlugin implements Plugin<Project> {
/**
***********************************************************************************************/
// Method to create the createCrossLanguageUsingJavaExpansionTask.
// The method takes CrossLanguageUsingJavaExpansionConfiguration as
parameter.
- // This method creates a task that runs Python SDK pipeline tests that use
Java transforms via an input expansion service
+ // This method creates a task that runs Python SDK test-suites that use
external Java transforms
project.ext.createCrossLanguageUsingJavaExpansionTask = {
// This task won't work if the python build file doesn't exist.
if (!project.project(":sdks:python").buildFile.exists()) {
@@ -2586,49 +2582,29 @@ class BeamModulePlugin implements Plugin<Project> {
def config = it ? it as CrossLanguageUsingJavaExpansionConfiguration :
new CrossLanguageUsingJavaExpansionConfiguration()
project.evaluationDependsOn(":sdks:python")
- project.evaluationDependsOn(config.expansionProjectPath)
+ for (path in config.expansionProjectPaths) {
+ project.evaluationDependsOn(path)
+ }
project.evaluationDependsOn(":sdks:java:extensions:python")
- // Setting up args to launch the expansion service
def pythonDir = project.project(":sdks:python").projectDir
- def javaExpansionPort = -1 // will be populated in setupTask
- def expansionJar =
project.project(config.expansionProjectPath).shadowJar.archivePath
- def javaClassLookupAllowlistFile =
project.project(config.expansionProjectPath).projectDir.getPath()
- def expansionServiceOpts = [
- "group_id": project.name,
- "java_expansion_service_jar": expansionJar,
- "java_expansion_service_allowlist_file": javaClassLookupAllowlistFile,
- ]
def usesDataflowRunner =
config.pythonPipelineOptions.contains("--runner=TestDataflowRunner") ||
config.pythonPipelineOptions.contains("--runner=DataflowRunner")
def javaContainerSuffix = getSupportedJavaVersion()
- // 1. Builds the chosen expansion service jar and launches it
- def setupTask = project.tasks.register(config.name+"Setup") {
- dependsOn ':sdks:java:container:' + javaContainerSuffix + ':docker'
- dependsOn
project.project(config.expansionProjectPath).shadowJar.getPath()
- dependsOn 'installGcpTest'
+ // Sets up, collects, and runs Python pipeline tests
+ project.tasks.register(config.name+"PythonUsingJava") {
+ group = "Verification"
+ description = "Runs Python SDK pipeline tests that use a Java
expansion service"
+ // Each expansion service we use needs to be built before running
these tests
+ // The built jars will be started up automatically using the
BeamJarExpansionService utility
+ for (path in config.expansionProjectPaths) {
+ dependsOn project.project(path).shadowJar.getPath()
+ }
+ dependsOn ":sdks:java:container:$javaContainerSuffix:docker"
+ dependsOn "installGcpTest"
if (usesDataflowRunner) {
dependsOn
":sdks:python:test-suites:dataflow:py${project.ext.pythonVersion.replace('.',
'')}:initializeForDataflowJob"
}
- doLast {
- project.exec {
- // Prepare a port to use for the expansion service
- javaExpansionPort = getRandomPort()
- expansionServiceOpts.put("java_port", javaExpansionPort)
- // setup test env
- def serviceArgs =
project.project(':sdks:python').mapToArgString(expansionServiceOpts)
- executable 'sh'
- args '-c', ". ${project.ext.envdir}/bin/activate &&
$pythonDir/scripts/run_expansion_services.sh stop --group_id ${project.name} &&
$pythonDir/scripts/run_expansion_services.sh start $serviceArgs"
- }
- }
- }
-
- // 2. Sets up, collects, and runs Python pipeline tests
- def pythonTask = project.tasks.register(config.name+"PythonUsingJava") {
- group = "Verification"
- description = "Runs Python SDK pipeline tests that use a Java
expansion service"
- dependsOn setupTask
- dependsOn config.startJobServer
doLast {
def beamPythonTestPipelineOptions = [
"pipeline_opts": config.pythonPipelineOptions +
(usesDataflowRunner ? [
@@ -2641,29 +2617,19 @@ class BeamModulePlugin implements Plugin<Project> {
def cmdArgs =
project.project(':sdks:python').mapToArgString(beamPythonTestPipelineOptions)
project.exec {
- environment "EXPANSION_JAR", expansionJar
- environment "EXPANSION_PORT", javaExpansionPort
- for (envs in config.additionalEnvs){
- environment envs.getKey(), envs.getValue()
+ // environment variable to indicate that jars have been built
+ environment "EXPANSION_JARS", config.expansionProjectPaths
+ String additionalDependencyCmd = ""
+ if (config.additionalDeps != null &&
!config.additionalDeps.isEmpty()){
+ additionalDependencyCmd = "&& pip install
${config.additionalDeps.join(' ')} "
}
executable 'sh'
- args '-c', ". ${project.ext.envdir}/bin/activate && cd $pythonDir
&& ./scripts/run_integration_test.sh $cmdArgs"
+ args '-c', ". ${project.ext.envdir}/bin/activate " +
+ additionalDependencyCmd +
+ "&& cd $pythonDir && ./scripts/run_integration_test.sh
$cmdArgs"
}
}
}
-
- // 3. Shuts down the expansion service
- def cleanupTask = project.tasks.register(config.name+'Cleanup', Exec) {
- // teardown test env
- executable 'sh'
- args '-c', ". ${project.ext.envdir}/bin/activate &&
$pythonDir/scripts/run_expansion_services.sh stop --group_id ${project.name}"
- }
-
- setupTask.configure {finalizedBy cleanupTask}
- config.startJobServer.configure {finalizedBy config.cleanupJobServer}
-
- cleanupTask.configure{mustRunAfter pythonTask}
- config.cleanupJobServer.configure{mustRunAfter pythonTask}
}
/**
***********************************************************************************************/
diff --git a/scripts/ci/release/test/resources/mass_comment.txt
b/scripts/ci/release/test/resources/mass_comment.txt
index 93468b0c961..1f6f340eb0b 100644
--- a/scripts/ci/release/test/resources/mass_comment.txt
+++ b/scripts/ci/release/test/resources/mass_comment.txt
@@ -79,6 +79,7 @@ Run PythonDocs PreCommit
Run PythonFormatter PreCommit
Run PythonLint PreCommit
Run Python_PVR_Flink PreCommit
+Run Python_Xlang_Gcp_Direct PostCommit
Run RAT PreCommit
Run SQL PostCommit
Run SQL PreCommit
@@ -94,6 +95,7 @@ Run Twister2 ValidatesRunner
Run Typescript PreCommit
Run ULR Loopback ValidatesRunner
Run Whitespace PreCommit
+Run Xlang_Generated_Transforms PreCommit
Run XVR_Direct PostCommit
Run XVR_Flink PostCommit
Run XVR_JavaUsingPython_Dataflow PostCommit
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/providers/GenerateSequenceSchemaTransformProvider.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/providers/GenerateSequenceSchemaTransformProvider.java
index 4b693f883fb..d9dfc2a90bd 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/providers/GenerateSequenceSchemaTransformProvider.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/providers/GenerateSequenceSchemaTransformProvider.java
@@ -66,7 +66,7 @@ public class GenerateSequenceSchemaTransformProvider
public String description() {
return String.format(
"Outputs a PCollection of Beam Rows, each containing a single INT64 "
- + "number called \"value\". The count is produced from the given
\"start\""
+ + "number called \"value\". The count is produced from the given
\"start\" "
+ "value and either up to the given \"end\" or until 2^63 - 1.%n"
+ "To produce an unbounded PCollection, simply do not specify an
\"end\" value. "
+ "Unbounded sequences can specify a \"rate\" for output
elements.%n"
diff --git a/sdks/python/MANIFEST.in b/sdks/python/MANIFEST.in
index 60b0989d9af..d40273a9340 100644
--- a/sdks/python/MANIFEST.in
+++ b/sdks/python/MANIFEST.in
@@ -16,6 +16,7 @@
#
include gen_protos.py
+include gen_xlang_wrappers.py
include README.md
include NOTICE
include LICENSE
diff --git a/sdks/python/apache_beam/io/__init__.py
b/sdks/python/apache_beam/io/__init__.py
index 4945da97d90..83d45d81a5a 100644
--- a/sdks/python/apache_beam/io/__init__.py
+++ b/sdks/python/apache_beam/io/__init__.py
@@ -36,6 +36,7 @@ try:
from apache_beam.io.gcp.bigquery import *
from apache_beam.io.gcp.pubsub import *
from apache_beam.io.gcp import gcsio
+ from apache_beam.transforms.xlang.io import *
except ImportError:
pass
# pylint: enable=wrong-import-order, wrong-import-position
diff --git a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py
b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py
index c1e9754526e..cfbb411b4e5 100644
--- a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py
+++ b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py
@@ -53,6 +53,10 @@ _LOGGER = logging.getLogger(__name__)
@pytest.mark.uses_gcp_java_expansion_service
[email protected](
+ os.environ.get('EXPANSION_JARS'),
+ "EXPANSION_JARS environment var is not provided, "
+ "indicating that jars have not been built")
class BigQueryXlangStorageWriteIT(unittest.TestCase):
BIGQUERY_DATASET = 'python_xlang_storage_write'
@@ -112,10 +116,6 @@ class BigQueryXlangStorageWriteIT(unittest.TestCase):
_LOGGER.info(
"Created dataset %s in project %s", self.dataset_id, self.project)
- self.assertTrue(
- os.environ.get('EXPANSION_PORT'), "Expansion service port not found!")
- self.expansion_service = ('localhost:%s' %
os.environ.get('EXPANSION_PORT'))
-
def tearDown(self):
try:
_LOGGER.info(
@@ -162,8 +162,7 @@ class BigQueryXlangStorageWriteIT(unittest.TestCase):
table=table_id,
method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API,
schema=schema,
- use_at_least_once=use_at_least_once,
- expansion_service=self.expansion_service))
+ use_at_least_once=use_at_least_once))
hamcrest_assert(p, bq_matcher)
def test_all_types(self):
@@ -243,8 +242,7 @@ class BigQueryXlangStorageWriteIT(unittest.TestCase):
_ = (
p
| beam.Create(row_elements)
- | StorageWriteToBigQuery(
- table=table_id, expansion_service=self.expansion_service))
+ | StorageWriteToBigQuery(table=table_id))
hamcrest_assert(p, bq_matcher)
def test_write_to_dynamic_destinations(self):
@@ -268,8 +266,7 @@ class BigQueryXlangStorageWriteIT(unittest.TestCase):
table=lambda record: spec_with_project + str(record['int']),
method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API,
schema=self.ALL_TYPES_SCHEMA,
- use_at_least_once=False,
- expansion_service=self.expansion_service))
+ use_at_least_once=False))
hamcrest_assert(p, all_of(*bq_matchers))
def test_write_to_dynamic_destinations_with_beam_rows(self):
@@ -303,8 +300,7 @@ class BigQueryXlangStorageWriteIT(unittest.TestCase):
| beam.io.WriteToBigQuery(
table=lambda record: spec_with_project + str(record.my_int),
method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API,
- use_at_least_once=False,
- expansion_service=self.expansion_service))
+ use_at_least_once=False))
hamcrest_assert(p, all_of(*bq_matchers))
def run_streaming(self, table_name, num_streams=0, use_at_least_once=False):
@@ -335,8 +331,7 @@ class BigQueryXlangStorageWriteIT(unittest.TestCase):
triggering_frequency=1,
with_auto_sharding=auto_sharding,
num_storage_api_streams=num_streams,
- use_at_least_once=use_at_least_once,
- expansion_service=self.expansion_service))
+ use_at_least_once=use_at_least_once))
hamcrest_assert(p, bq_matcher)
def skip_if_not_dataflow_runner(self) -> bool:
diff --git a/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py
b/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py
index 867dca9a5e7..13909cded1f 100644
--- a/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py
@@ -58,10 +58,11 @@ def instance_prefix(instance):
@pytest.mark.uses_gcp_java_expansion_service
@pytest.mark.uses_transform_service
[email protected](
- os.environ.get('EXPANSION_PORT'),
- "EXPANSION_PORT environment var is not provided.")
@unittest.skipIf(client is None, 'Bigtable dependencies are not installed')
[email protected](
+ os.environ.get('EXPANSION_JARS'),
+ "EXPANSION_JARS environment var is not provided, "
+ "indicating that jars have not been built")
class TestReadFromBigTableIT(unittest.TestCase):
INSTANCE = "bt-read-tests"
TABLE_ID = "test-table"
@@ -70,7 +71,6 @@ class TestReadFromBigTableIT(unittest.TestCase):
self.test_pipeline = TestPipeline(is_integration_test=True)
self.args = self.test_pipeline.get_full_options_as_args()
self.project = self.test_pipeline.get_option('project')
- self.expansion_service = ('localhost:%s' %
os.environ.get('EXPANSION_PORT'))
instance_id = instance_prefix(self.INSTANCE)
@@ -141,8 +141,7 @@ class TestReadFromBigTableIT(unittest.TestCase):
| bigtableio.ReadFromBigtable(
project_id=self.project,
instance_id=self.instance.instance_id,
- table_id=self.table.table_id,
- expansion_service=self.expansion_service)
+ table_id=self.table.table_id)
| "Extract cells" >> beam.Map(lambda row: row._cells))
assert_that(cells, equal_to(expected_cells))
@@ -150,10 +149,11 @@ class TestReadFromBigTableIT(unittest.TestCase):
@pytest.mark.uses_gcp_java_expansion_service
@pytest.mark.uses_transform_service
[email protected](
- os.environ.get('EXPANSION_PORT'),
- "EXPANSION_PORT environment var is not provided.")
@unittest.skipIf(client is None, 'Bigtable dependencies are not installed')
[email protected](
+ os.environ.get('EXPANSION_JARS'),
+ "EXPANSION_JARS environment var is not provided, "
+ "indicating that jars have not been built")
class TestWriteToBigtableXlangIT(unittest.TestCase):
# These are integration tests for the cross-language write transform.
INSTANCE = "bt-write-xlang"
@@ -164,7 +164,6 @@ class TestWriteToBigtableXlangIT(unittest.TestCase):
cls.test_pipeline = TestPipeline(is_integration_test=True)
cls.project = cls.test_pipeline.get_option('project')
cls.args = cls.test_pipeline.get_full_options_as_args()
- cls.expansion_service = ('localhost:%s' % os.environ.get('EXPANSION_PORT'))
instance_id = instance_prefix(cls.INSTANCE)
@@ -215,8 +214,7 @@ class TestWriteToBigtableXlangIT(unittest.TestCase):
project_id=self.project,
instance_id=self.instance.instance_id,
table_id=self.table.table_id,
- use_cross_language=True,
- expansion_service=self.expansion_service))
+ use_cross_language=True))
def test_set_mutation(self):
row1: DirectRow = DirectRow('key-1')
diff --git a/sdks/python/apache_beam/transforms/external_transform_provider.py
b/sdks/python/apache_beam/transforms/external_transform_provider.py
index 26cc31471e6..2799bd1b9e9 100644
--- a/sdks/python/apache_beam/transforms/external_transform_provider.py
+++ b/sdks/python/apache_beam/transforms/external_transform_provider.py
@@ -144,7 +144,7 @@ class ExternalTransformProvider:
A :class:`ExternalTransform` subclass is generated for each external
transform, and is named based on what can be inferred from the URN
- (see :param urn_pattern).
+ (see the `urn_pattern` parameter).
These classes are generated when :class:`ExternalTransformProvider` is
initialized. We need to give it one or more expansion service addresses that
@@ -256,7 +256,7 @@ class ExternalTransformProvider:
if skipped_urns:
logging.info(
- "Skipped URN(s) in %s that don't follow the pattern [%s]: %s",
+ "Skipped URN(s) in %s that don't follow the pattern \"%s\": %s",
target,
self._urn_pattern,
skipped_urns)
@@ -268,6 +268,10 @@ class ExternalTransformProvider:
"""Get a list of available ExternalTransform names and identifiers"""
return list(self._name_to_urn.items())
+ def get_all(self) -> Dict[str, ExternalTransform]:
+ """Get all ExternalTransform"""
+ return self._transforms
+
def get(self, name) -> ExternalTransform:
"""Get an ExternalTransform by its inferred class name"""
return self._transforms[self._name_to_urn[name]]
diff --git
a/sdks/python/apache_beam/transforms/external_transform_provider_it_test.py
b/sdks/python/apache_beam/transforms/external_transform_provider_it_test.py
new file mode 100644
index 00000000000..a53001c85fd
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/external_transform_provider_it_test.py
@@ -0,0 +1,413 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import importlib
+import logging
+import os
+import secrets
+import shutil
+import time
+import typing
+import unittest
+from os.path import dirname
+
+import numpy
+import pytest
+import yaml
+
+import apache_beam as beam
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+from apache_beam.transforms.external import BeamJarExpansionService
+from apache_beam.transforms.external_transform_provider import
STANDARD_URN_PATTERN
+from apache_beam.transforms.external_transform_provider import
ExternalTransform
+from apache_beam.transforms.external_transform_provider import
ExternalTransformProvider
+from apache_beam.transforms.external_transform_provider import
camel_case_to_snake_case
+from apache_beam.transforms.external_transform_provider import
infer_name_from_identifier
+from apache_beam.transforms.external_transform_provider import
snake_case_to_lower_camel_case
+from apache_beam.transforms.external_transform_provider import
snake_case_to_upper_camel_case
+from apache_beam.transforms.xlang.io import GenerateSequence
+
+
+class NameAndTypeUtilsTest(unittest.TestCase):
+ def test_snake_case_to_upper_camel_case(self):
+ test_cases = [("", ""), ("test", "Test"), ("test_name", "TestName"),
+ ("test_double_underscore", "TestDoubleUnderscore"),
+ ("TEST_CAPITALIZED", "TestCapitalized"),
+ ("_prepended_underscore", "PrependedUnderscore"),
+ ("appended_underscore_", "AppendedUnderscore")]
+ for case in test_cases:
+ self.assertEqual(case[1], snake_case_to_upper_camel_case(case[0]))
+
+ def test_snake_case_to_lower_camel_case(self):
+ test_cases = [("", ""), ("test", "test"), ("test_name", "testName"),
+ ("test_double_underscore", "testDoubleUnderscore"),
+ ("TEST_CAPITALIZED", "testCapitalized"),
+ ("_prepended_underscore", "prependedUnderscore"),
+ ("appended_underscore_", "appendedUnderscore")]
+ for case in test_cases:
+ self.assertEqual(case[1], snake_case_to_lower_camel_case(case[0]))
+
+ def test_camel_case_to_snake_case(self):
+ test_cases = [("", ""), ("Test", "test"), ("TestName", "test_name"),
+ ("TestDoubleUnderscore",
+ "test_double_underscore"), ("MyToLoFo", "my_to_lo_fo"),
+ ("BEGINNINGAllCaps",
+ "beginning_all_caps"), ("AllCapsENDING", "all_caps_ending"),
+ ("AllCapsMIDDLEWord", "all_caps_middle_word"),
+ ("lowerCamelCase", "lower_camel_case")]
+ for case in test_cases:
+ self.assertEqual(case[1], camel_case_to_snake_case(case[0]))
+
+ def test_infer_name_from_identifier(self):
+ standard_test_cases = [
+ ("beam:schematransform:org.apache.beam:transform:v1", "Transform"),
+ ("beam:schematransform:org.apache.beam:my_transform:v1",
+ "MyTransform"), (
+ "beam:schematransform:org.apache.beam:my_transform:v2",
+ "MyTransformV2"),
+ ("beam:schematransform:org.apache.beam:fe_fi_fo_fum:v2",
"FeFiFoFumV2"),
+ ("beam:schematransform:bad_match:my_transform:v1", None)
+ ]
+ for case in standard_test_cases:
+ self.assertEqual(
+ case[1], infer_name_from_identifier(case[0], STANDARD_URN_PATTERN))
+
+ custom_pattern_cases = [
+ # (<pattern>, <urn>, <expected output>)
+ (
+ r"^custom:transform:([\w-]+):(\w+)$",
+ "custom:transform:my_transform:v1",
+ "MyTransformV1"),
+ (
+ r"^org.user:([\w-]+):([\w-]+):([\w-]+):external$",
+ "org.user:some:custom_transform:we_made:external",
+ "SomeCustomTransformWeMade"),
+ (
+ r"^([\w-]+):user.transforms",
+ "my_eXTErnal:user.transforms",
+ "MyExternal"),
+ (r"^([\w-]+):user.transforms", "my_external:badinput.transforms",
None),
+ ]
+ for case in custom_pattern_cases:
+ self.assertEqual(case[2], infer_name_from_identifier(case[1], case[0]))
+
+
[email protected]_io_java_expansion_service
[email protected](
+ os.environ.get('EXPANSION_JARS'),
+ "EXPANSION_JARS environment var is not provided, "
+ "indicating that jars have not been built")
+class ExternalTransformProviderIT(unittest.TestCase):
+ def test_generate_sequence_config_schema_and_description(self):
+ provider = ExternalTransformProvider(
+ BeamJarExpansionService(":sdks:java:io:expansion-service:shadowJar"))
+
+ self.assertTrue((
+ 'GenerateSequence',
+ 'beam:schematransform:org.apache.beam:generate_sequence:v1'
+ ) in provider.get_available())
+
+ GenerateSequence = provider.get('GenerateSequence')
+ config_schema = GenerateSequence.configuration_schema
+ for param in ['start', 'end', 'rate']:
+ self.assertTrue(param in config_schema)
+
+ description_substring = (
+ "Outputs a PCollection of Beam Rows, each "
+ "containing a single INT64")
+ self.assertTrue(description_substring in GenerateSequence.description)
+
+ def test_run_generate_sequence(self):
+ provider = ExternalTransformProvider(
+ BeamJarExpansionService(":sdks:java:io:expansion-service:shadowJar"))
+
+ with beam.Pipeline() as p:
+ numbers = p | provider.GenerateSequence(
+ start=0, end=10) | beam.Map(lambda row: row.value)
+
+ assert_that(numbers, equal_to([i for i in range(10)]))
+
+
[email protected]_wrapper_generation
[email protected](
+ os.environ.get('EXPANSION_JARS'),
+ "EXPANSION_JARS environment var is not provided, "
+ "indicating that jars have not been built")
+class AutoGenerationScriptIT(unittest.TestCase):
+ """
+ This class tests the generation and regeneration operations in
+ `gen_xlang_wrappers.py`.
+ """
+
+ # tests cases will use GenerateSequence
+ GEN_SEQ_IDENTIFIER = \
+ 'beam:schematransform:org.apache.beam:generate_sequence:v1'
+
+ def setUp(self):
+ # import script from top-level sdks/python directory
+ self.sdk_dir = os.path.abspath(dirname(dirname(dirname(__file__))))
+ spec = importlib.util.spec_from_file_location(
+ 'gen_xlang_wrappers',
+ os.path.join(self.sdk_dir, 'gen_xlang_wrappers.py'))
+ self.script = importlib.util.module_from_spec(spec)
+ spec.loader.exec_module(self.script)
+ args = TestPipeline(is_integration_test=True).get_full_options_as_args()
+ runner = PipelineOptions(args).get_all_options()['runner']
+ if runner and "direct" not in runner.lower():
+ self.skipTest(
+ "It is sufficient to run this test in the DirectRunner "
+ "test suite only.")
+
+ self.test_dir_name = 'test_gen_script_%d_%s' % (
+ int(time.time()), secrets.token_hex(3))
+ self.test_dir = os.path.join(
+ os.path.abspath(dirname(__file__)), self.test_dir_name)
+ self.service_config_path = os.path.join(
+ self.test_dir, "test_expansion_service_config.yaml")
+ self.transform_config_path = os.path.join(
+ self.test_dir, "test_transform_config.yaml")
+ os.mkdir(self.test_dir)
+
+ def tearDown(self):
+ shutil.rmtree(self.test_dir, ignore_errors=False)
+
+ def delete_and_validate(self):
+ self.script.delete_generated_files(self.test_dir)
+ self.assertEqual(len(os.listdir(self.test_dir)), 0)
+
+ def test_script_fails_with_invalid_destinations(self):
+ expansion_service_config = {
+ "gradle_target": 'sdks:java:io:expansion-service:shadowJar',
+ 'destinations': {
+ 'python': 'apache_beam/some_nonexistent_dir'
+ }
+ }
+ with self.assertRaises(ValueError):
+ self.create_and_check_transforms_config_exists(expansion_service_config)
+
+ def test_pretty_types(self):
+ types = [
+ typing.Optional[typing.List[str]],
+ numpy.int16,
+ str,
+ typing.Dict[str, numpy.float64],
+ typing.Optional[typing.Dict[str, typing.List[numpy.int64]]],
+ typing.Dict[int, typing.Optional[str]]
+ ]
+
+ expected_type_names = [('List[str]', True), ('numpy.int16', False),
+ ('str', False), ('Dict[str, numpy.float64]', False),
+ ('Dict[str, List[numpy.int64]]', True),
+ ('Dict[int, Union[str, NoneType]]', False)]
+
+ for i in range(len(types)):
+ self.assertEqual(
+ self.script.pretty_type(types[i]), expected_type_names[i])
+
+ def create_and_check_transforms_config_exists(self,
expansion_service_config):
+ with open(self.service_config_path, 'w') as f:
+ yaml.dump([expansion_service_config], f)
+
+ self.script.generate_transforms_config(
+ self.service_config_path, self.transform_config_path)
+ self.assertTrue(os.path.exists(self.transform_config_path))
+
+ def create_and_validate_transforms_config(
+ self, expansion_service_config, expected_name, expected_destination):
+ self.create_and_check_transforms_config_exists(expansion_service_config)
+
+ with open(self.transform_config_path) as f:
+ configs = yaml.safe_load(f)
+ gen_seq_config = None
+ for config in configs:
+ if config['identifier'] == self.GEN_SEQ_IDENTIFIER:
+ gen_seq_config = config
+ self.assertIsNotNone(gen_seq_config)
+ self.assertEqual(
+ gen_seq_config['default_service'],
+ expansion_service_config['gradle_target'])
+ self.assertEqual(gen_seq_config['name'], expected_name)
+ self.assertEqual(
+ gen_seq_config['destinations']['python'], expected_destination)
+ self.assertIn("end", gen_seq_config['fields'])
+ self.assertIn("start", gen_seq_config['fields'])
+ self.assertIn("rate", gen_seq_config['fields'])
+
+ def get_module(self, dest):
+ module_name = dest.replace('apache_beam/', '').replace('/', '_')
+ module = 'apache_beam.transforms.%s.%s' % (self.test_dir_name, module_name)
+ return importlib.import_module(module)
+
+ def write_wrappers_to_destinations_and_validate(
+ self, destinations: typing.List[str]):
+ """
+ Generate wrappers from the config path and validate all destinations are
+ included.
+ Then write wrappers to destinations and validate all destination paths
+ exist.
+
+ :return: Generated wrappers grouped by destination
+ """
+ grouped_wrappers = self.script.get_wrappers_from_transform_configs(
+ self.transform_config_path)
+ for dest in destinations:
+ self.assertIn(dest, grouped_wrappers)
+
+ # write to our test directory to avoid messing with other files
+ self.script.write_wrappers_to_destinations(
+ grouped_wrappers, self.test_dir, format_code=False)
+
+ for dest in destinations:
+ self.assertTrue(
+ os.path.exists(
+ os.path.join(
+ self.test_dir,
+ dest.replace('apache_beam/', '').replace('/', '_') + ".py")))
+ return grouped_wrappers
+
+ def test_script_workflow(self):
+ expected_destination = 'apache_beam/transforms'
+ expansion_service_config = {
+ "gradle_target": 'sdks:java:io:expansion-service:shadowJar',
+ 'destinations': {
+ 'python': expected_destination
+ }
+ }
+
+ self.create_and_validate_transforms_config(
+ expansion_service_config, 'GenerateSequence', expected_destination)
+ grouped_wrappers = self.write_wrappers_to_destinations_and_validate(
+ [expected_destination])
+ # at least the GenerateSequence wrapper is set to this destination
+ self.assertGreaterEqual(len(grouped_wrappers[expected_destination]), 1)
+
+ # check the wrapper exists in this destination and has correct properties
+ output_module = self.get_module(expected_destination)
+ self.assertTrue(hasattr(output_module, 'GenerateSequence'))
+ # Since our config isn't skipping any transforms,
+ # it should include these two Kafka IOs as well
+ self.assertTrue(hasattr(output_module, 'KafkaWrite'))
+ self.assertTrue(hasattr(output_module, 'KafkaRead'))
+ self.assertTrue(
+ isinstance(output_module.GenerateSequence(start=0), ExternalTransform))
+ self.assertEqual(
+ output_module.GenerateSequence.identifier, self.GEN_SEQ_IDENTIFIER)
+
+ self.delete_and_validate()
+
+ def test_script_workflow_with_modified_transforms(self):
+ modified_name = 'ModifiedSequence'
+ modified_dest = 'apache_beam/io/gcp'
+ expansion_service_config = {
+ "gradle_target": 'sdks:java:io:expansion-service:shadowJar',
+ 'destinations': {
+ 'python': 'apache_beam/transforms'
+ },
+ 'transforms': {
+ 'beam:schematransform:org.apache.beam:generate_sequence:v1': {
+ 'name': modified_name,
+ 'destinations': {
+ 'python': modified_dest
+ }
+ }
+ }
+ }
+
+ self.create_and_validate_transforms_config(
+ expansion_service_config, modified_name, modified_dest)
+
+ grouped_wrappers = self.write_wrappers_to_destinations_and_validate(
+ [modified_dest])
+ self.assertIn(modified_name, grouped_wrappers[modified_dest][0])
+ self.assertEqual(len(grouped_wrappers[modified_dest]), 1)
+
+ # check the modified wrapper exists in the modified destination
+ # and check it has the correct properties
+ output_module = self.get_module(modified_dest)
+ self.assertTrue(
+ isinstance(output_module.ModifiedSequence(start=0), ExternalTransform))
+ self.assertEqual(
+ output_module.ModifiedSequence.identifier, self.GEN_SEQ_IDENTIFIER)
+
+ self.delete_and_validate()
+
+ def test_script_workflow_with_skipped_transform(self):
+ expansion_service_config = {
+ "gradle_target": 'sdks:java:io:expansion-service:shadowJar',
+ 'destinations': {
+ 'python': f'apache_beam/transforms/{self.test_dir_name}'
+ },
+ 'skip_transforms': [
+ 'beam:schematransform:org.apache.beam:generate_sequence:v1'
+ ]
+ }
+
+ with open(self.service_config_path, 'w') as f:
+ yaml.dump([expansion_service_config], f)
+
+ self.script.generate_transforms_config(
+ self.service_config_path, self.transform_config_path)
+
+ # gen sequence shouldn't exist in the transform config
+ with open(self.transform_config_path) as f:
+ transforms = yaml.safe_load(f)
+ gen_seq_config = None
+ for transform in transforms:
+ if transform['identifier'] == self.GEN_SEQ_IDENTIFIER:
+ gen_seq_config = transform
+ self.assertIsNone(gen_seq_config)
+
+ def test_run_pipeline_with_generated_transform(self):
+ with beam.Pipeline() as p:
+ numbers = (
+ p | GenerateSequence(start=0, end=10)
+ | beam.Map(lambda row: row.value))
+ assert_that(numbers, equal_to([i for i in range(10)]))
+
+ def test_check_standard_external_transforms_config_in_sync(self):
+ """
+ This test creates a transforms config file and checks it against
+ `sdks/standard_external_transforms.yaml`. Fails if the two configs don't
+ match.
+
+ Fix by running `./gradlew generateExternalTransformsConfig` and
+ committing the changes.
+ """
+ sdks_dir = os.path.abspath(dirname(self.sdk_dir))
+ self.script.generate_transforms_config(
+ os.path.join(sdks_dir, 'standard_expansion_services.yaml'),
+ self.transform_config_path)
+ with open(self.transform_config_path) as f:
+ test_config = yaml.safe_load(f)
+ with open(os.path.join(sdks_dir, 'standard_external_transforms.yaml'),
+ 'r') as f:
+ standard_config = yaml.safe_load(f)
+
+ self.assertEqual(
+ test_config,
+ standard_config,
+ "The standard xlang transforms config file "
+ "\"standard_external_transforms.yaml\" is out of sync! Please update"
+ "by running './gradlew generateExternalTransformsConfig'"
+ "and committing the changes.")
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()
diff --git
a/sdks/python/apache_beam/transforms/external_transform_provider_test.py
b/sdks/python/apache_beam/transforms/external_transform_provider_test.py
deleted file mode 100644
index 36fe9b5c4bd..00000000000
--- a/sdks/python/apache_beam/transforms/external_transform_provider_test.py
+++ /dev/null
@@ -1,140 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-import logging
-import os
-import unittest
-
-import pytest
-
-import apache_beam as beam
-from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.testing.util import assert_that
-from apache_beam.testing.util import equal_to
-from apache_beam.transforms.external import BeamJarExpansionService
-from apache_beam.transforms.external_transform_provider import
STANDARD_URN_PATTERN
-from apache_beam.transforms.external_transform_provider import
ExternalTransformProvider
-from apache_beam.transforms.external_transform_provider import
camel_case_to_snake_case
-from apache_beam.transforms.external_transform_provider import
infer_name_from_identifier
-from apache_beam.transforms.external_transform_provider import
snake_case_to_lower_camel_case
-from apache_beam.transforms.external_transform_provider import
snake_case_to_upper_camel_case
-
-
-class NameUtilsTest(unittest.TestCase):
- def test_snake_case_to_upper_camel_case(self):
- test_cases = [("", ""), ("test", "Test"), ("test_name", "TestName"),
- ("test_double_underscore", "TestDoubleUnderscore"),
- ("TEST_CAPITALIZED", "TestCapitalized"),
- ("_prepended_underscore", "PrependedUnderscore"),
- ("appended_underscore_", "AppendedUnderscore")]
- for case in test_cases:
- self.assertEqual(case[1], snake_case_to_upper_camel_case(case[0]))
-
- def test_snake_case_to_lower_camel_case(self):
- test_cases = [("", ""), ("test", "test"), ("test_name", "testName"),
- ("test_double_underscore", "testDoubleUnderscore"),
- ("TEST_CAPITALIZED", "testCapitalized"),
- ("_prepended_underscore", "prependedUnderscore"),
- ("appended_underscore_", "appendedUnderscore")]
- for case in test_cases:
- self.assertEqual(case[1], snake_case_to_lower_camel_case(case[0]))
-
- def test_camel_case_to_snake_case(self):
- test_cases = [("", ""), ("Test", "test"), ("TestName", "test_name"),
- ("TestDoubleUnderscore",
- "test_double_underscore"), ("MyToLoFo", "my_to_lo_fo"),
- ("BEGINNINGAllCaps",
- "beginning_all_caps"), ("AllCapsENDING", "all_caps_ending"),
- ("AllCapsMIDDLEWord", "all_caps_middle_word"),
- ("lowerCamelCase", "lower_camel_case")]
- for case in test_cases:
- self.assertEqual(case[1], camel_case_to_snake_case(case[0]))
-
- def test_infer_name_from_identifier(self):
- standard_test_cases = [
- ("beam:schematransform:org.apache.beam:transform:v1", "Transform"),
- ("beam:schematransform:org.apache.beam:my_transform:v1",
- "MyTransform"), (
- "beam:schematransform:org.apache.beam:my_transform:v2",
- "MyTransformV2"),
- ("beam:schematransform:org.apache.beam:fe_fi_fo_fum:v2",
"FeFiFoFumV2"),
- ("beam:schematransform:bad_match:my_transform:v1", None)
- ]
- for case in standard_test_cases:
- self.assertEqual(
- case[1], infer_name_from_identifier(case[0], STANDARD_URN_PATTERN))
-
- custom_pattern_cases = [
- # (<pattern>, <urn>, <expected output>)
- (
- r"^custom:transform:([\w-]+):(\w+)$",
- "custom:transform:my_transform:v1",
- "MyTransformV1"),
- (
- r"^org.user:([\w-]+):([\w-]+):([\w-]+):external$",
- "org.user:some:custom_transform:we_made:external",
- "SomeCustomTransformWeMade"),
- (
- r"^([\w-]+):user.transforms",
- "my_eXTErnal:user.transforms",
- "MyExternal"),
- (r"^([\w-]+):user.transforms", "my_external:badinput.transforms",
None),
- ]
- for case in custom_pattern_cases:
- self.assertEqual(case[2], infer_name_from_identifier(case[1], case[0]))
-
-
[email protected]_io_java_expansion_service
[email protected](
- os.environ.get('EXPANSION_PORT'),
- "EXPANSION_PORT environment var is not provided.")
-class ExternalTransformProviderTest(unittest.TestCase):
- def setUp(self):
- self.test_pipeline = TestPipeline(is_integration_test=True)
-
- def test_generate_sequence_config_schema_and_description(self):
- provider = ExternalTransformProvider(
- BeamJarExpansionService(":sdks:java:io:expansion-service:shadowJar"))
-
- self.assertTrue((
- 'GenerateSequence',
- 'beam:schematransform:org.apache.beam:generate_sequence:v1'
- ) in provider.get_available())
-
- GenerateSequence = provider.get('GenerateSequence')
- config_schema = GenerateSequence.configuration_schema
- for param in ['start', 'end', 'rate']:
- self.assertTrue(param in config_schema)
-
- description_substring = (
- "Outputs a PCollection of Beam Rows, each "
- "containing a single INT64")
- self.assertTrue(description_substring in GenerateSequence.description)
-
- def test_run_generate_sequence(self):
- provider = ExternalTransformProvider(
- BeamJarExpansionService(":sdks:java:io:expansion-service:shadowJar"))
-
- with beam.Pipeline() as p:
- numbers = p | provider.GenerateSequence(
- start=0, end=10) | beam.Map(lambda row: row.value)
-
- assert_that(numbers, equal_to([i for i in range(10)]))
-
-
-if __name__ == '__main__':
- logging.getLogger().setLevel(logging.INFO)
- unittest.main()
diff --git a/sdks/python/MANIFEST.in
b/sdks/python/apache_beam/transforms/xlang/__init__.py
similarity index 67%
copy from sdks/python/MANIFEST.in
copy to sdks/python/apache_beam/transforms/xlang/__init__.py
index 60b0989d9af..ba896615cbc 100644
--- a/sdks/python/MANIFEST.in
+++ b/sdks/python/apache_beam/transforms/xlang/__init__.py
@@ -15,8 +15,13 @@
# limitations under the License.
#
-include gen_protos.py
-include README.md
-include NOTICE
-include LICENSE
-include LICENSE.python
+"""
+This package contains autogenerated Python wrappers for cross-language
+transforms available in other Beam SDKs.
+
+For documentation on creating and using cross-language transforms, see:
+https://beam.apache.org/documentation/programming-guide/#use-x-lang-transforms
+
+For more information on transform wrapper generation, see the top level script:
+`gen_xlang_wrappers.py`
+"""
diff --git a/sdks/python/build.gradle b/sdks/python/build.gradle
index 7f2bc7f5d42..85b26ca3a46 100644
--- a/sdks/python/build.gradle
+++ b/sdks/python/build.gradle
@@ -66,6 +66,25 @@ artifacts {
distTarBall file: file("${buildDir}/${tarball}"), builtBy: sdist
}
+tasks.register("generateExternalTransformsConfig") {
+ description "Discovers external transforms and regenerates the config at
sdks/standard_expansion_services.yaml"
+
+ dependsOn buildPython
+ // Need to build all expansion services listed in
sdks/standard_expansion_services.yaml
+ dependsOn ":sdks:java:io:google-cloud-platform:expansion-service:build"
+ dependsOn ":sdks:java:io:expansion-service:build"
+ // Keep this in-sync with pyproject.toml
+ def PyYaml = "'pyyaml>=3.12,<7.0.0'"
+
+ doLast {
+ exec {
+ executable 'sh'
+ args '-c', "pip install $PyYaml && " +
+ "python gen_xlang_wrappers.py --cleanup --generate-config-only"
+ }
+ }
+}
+
// Create Python wheels for given platform and Python version
// build identifiers for cibuildwheel
def platform_identifiers_map = [
diff --git a/sdks/python/gen_xlang_wrappers.py
b/sdks/python/gen_xlang_wrappers.py
new file mode 100644
index 00000000000..a75fc05cba7
--- /dev/null
+++ b/sdks/python/gen_xlang_wrappers.py
@@ -0,0 +1,447 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Generates Python wrappers for external transforms (specifically,
+SchemaTransforms)
+"""
+
+import argparse
+import datetime
+import logging
+import os
+import shutil
+import subprocess
+import typing
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Union
+
+import yaml
+
+from gen_protos import LICENSE_HEADER
+from gen_protos import PROJECT_ROOT
+from gen_protos import PYTHON_SDK_ROOT
+
+SUPPORTED_SDK_DESTINATIONS = ['python']
+PYTHON_SUFFIX = "_et.py"
+PY_WRAPPER_OUTPUT_DIR = os.path.join(
+ PYTHON_SDK_ROOT, 'apache_beam', 'transforms', 'xlang')
+
+
+def generate_transforms_config(input_services, output_file):
+ """
+ Generates a YAML file containing a list of transform configurations.
+
+ Takes an input YAML file containing a list of expansion service gradle
+ targets. Each service must provide a `destinations` field that specifies the
+ default package (relative path) that generated wrappers should be imported
+ to. A default destination package is specified for each SDK, like so::
+
+ - gradle_target: 'sdks:java:io:expansion-service:shadowJar'
+ destinations:
+ python: 'apache_beam/io'
+
+ We use :class:`ExternalTransformProvider` to discover external
+ transforms. Then, we extract the necessary details of each transform and
+ compile them into a new YAML file, which is later used to generate wrappers.
+
+ Importing generated transforms to an existing package
+ -----------------------------------------------------
+ When running the script on the config above, a new module will be created at
+ `apache_beam/transforms/xlang/io.py`. This contains all
+ generated wrappers that are set to destination 'apache_beam/io'. Finally,
+ to make these available to the `apache_beam.io` package (or any package
+ really), just add the following line to the package's `__init__.py` file::
+ from apache_beam.transforms.xlang.io import *
+
+ Modifying a transform's name and destination
+ --------------------------------------------
+ Each service may also specify modifications for particular transform.
+ Currently, one can modify the generated wrapper's **name** and
+ **destination** package:
+ - By default, the transform's identifier is used to generate the wrapper
+ class name. This can be overriden by manually providing a name.
+ - By default, generated wrappers are made available to the package provided
+ by their respective expansion service. This can be overridden by
+ providing a relative path to a different package.
+
+ See the following example for what such modifications can look like::
+
+ - gradle_target: 'sdks:java:io:expansion-service:shadowJar'
+ destinations:
+ python: 'apache_beam/io'
+ transforms:
+ 'beam:schematransform:org.apache.beam:my_transform:v1':
+ name: 'MyCustomTransformName'
+ destinations:
+ python: 'apache_beam/io/gcp'
+
+ For the above example, we would take the transform with identifier
+ `beam:schematransform:org.apache.beam:my_transform:v1` and by default infer
+ a wrapper class name of `MyTransform` then write it to the module
+ `apache_beam/transforms/xlang/io.py`. With these modifications
+ however, we instead use the provided name `MyCustomTransformName` and write
+ it to `apache_beam/transforms/xlang/io_gcp.py`.
+ Similar to above, this can be made available by importing it in the
+ `__init__.py` file like so::
+ from apache_beam.transforms.xlang.io_gcp import *
+
+ Skipping transforms
+ -------------------
+ To skip a particular transform, simply list its identifier in the
+ `skip_transforms` field, like so::
+
+ - gradle_target: 'sdks:java:io:expansion-service:shadowJar'
+ destinations:
+ python: 'apache_beam/io'
+ skip_transforms:
+ - 'beam:schematransform:org.apache.beam:some_transform:v1'
+ """
+ from apache_beam.transforms.external import BeamJarExpansionService
+ from apache_beam.transforms.external_transform_provider import
ExternalTransform
+ from apache_beam.transforms.external_transform_provider import
ExternalTransformProvider
+
+ transform_list: List[Dict[str, Any]] = []
+
+ with open(input_services) as f:
+ services = yaml.safe_load(f)
+ for service in services:
+ target = service['gradle_target']
+
+ if "destinations" not in service:
+ raise ValueError(
+ f"Expansion service with target '{target}' does not "
+ "specify any default destinations.")
+ service_destinations: Dict[str, str] = service['destinations']
+ for sdk, dest in service_destinations.items():
+ validate_sdks_destinations(sdk, dest, target)
+
+ transforms_to_skip = service.get('skip_transforms', [])
+
+ # use dynamic provider to discover and populate wrapper details
+ provider = ExternalTransformProvider(BeamJarExpansionService(target))
+ discovered: Dict[str, ExternalTransform] = provider.get_all()
+ for identifier, wrapper in discovered.items():
+ if identifier in transforms_to_skip:
+ continue
+
+ transform_destinations = service_destinations.copy()
+
+ # apply any modifications
+ modified_transform = {}
+ if 'transforms' in service and identifier in service['transforms']:
+ modified_transform = service['transforms'][identifier]
+ for sdk, dest in modified_transform.get('destinations', {}).items():
+ validate_sdks_destinations(sdk, dest, target, identifier)
+ transform_destinations[sdk] = dest # override the destination
+ name = modified_transform.get('name', wrapper.__name__)
+
+ fields = {}
+ for param in wrapper.configuration_schema.values():
+ (tp, nullable) = pretty_type(param.type)
+ field_info = {
+ 'type': str(tp),
+ 'description': param.description,
+ 'nullable': nullable
+ }
+ fields[param.original_name] = field_info
+
+ transform = {
+ 'identifier': identifier,
+ 'name': name,
+ 'destinations': transform_destinations,
+ 'default_service': target,
+ 'fields': fields,
+ 'description': wrapper.description
+ }
+ transform_list.append(transform)
+
+ with open(output_file, 'w') as f:
+ f.write(LICENSE_HEADER.lstrip())
+ f.write(
+ "# NOTE: This file is autogenerated and should "
+ "not be edited by hand.\n")
+ f.write(
+ "# Configs are generated based on the expansion service\n"
+ f"# configuration in {input_services.replace(PROJECT_ROOT, '')}.\n")
+ f.write("# Refer to gen_xlang_wrappers.py for more info.\n")
+ dt = datetime.datetime.now().date()
+ f.write(f"#\n# Last updated on: {dt}\n\n")
+ yaml.dump(transform_list, f)
+ logging.info("Successfully wrote transform configs to file: %s", output_file)
+
+
+def validate_sdks_destinations(sdk, dest, service, identifier=None):
+ if identifier:
+ message = f"Identifier '{identifier}'"
+ else:
+ message = f"Service '{service}'"
+ if sdk not in SUPPORTED_SDK_DESTINATIONS:
+ raise ValueError(
+ message + " specifies a destination for an invalid SDK:"
+ f" '{sdk}'. The supported SDKs are {SUPPORTED_SDK_DESTINATIONS}")
+ if not os.path.isdir(os.path.join(PYTHON_SDK_ROOT, *dest.split('/'))):
+ raise ValueError(
+ message + f" specifies an invalid destination '{dest}'."
+ " Please make sure the destination is an existing directory.")
+
+
+def pretty_type(tp):
+ """
+ Takes a type and returns a tuple containing a pretty string representing it
+ and a bool signifying if it is nullable or not.
+
+ For optional types, the contained type is unwrapped and returned. This does
+ not recurse however, so inner Optional types are not affected.
+ E.g. the input typing.Optional[typing.Dict[int, typing.Optional[str]]] will
+ return (Dict[int, Union[str, NoneType]], True)
+ """
+ nullable = False
+ if (typing.get_origin(tp) is Union and type(None) in typing.get_args(tp)):
+ nullable = True
+ # only unwrap if it's a single nullable type. if the type is truly a union
+ # of multiple types, leave it alone.
+ args = typing.get_args(tp)
+ if len(args) == 2:
+ tp = list(filter(lambda t: not isinstance(t, type(None)), args))[0]
+
+ # TODO(ahmedabu98): Make this more generic to support other remote SDKs
+ # Potentially use Runner API types
+ if tp.__module__ == 'builtins':
+ tp = tp.__name__
+ elif tp.__module__ == 'typing':
+ tp = str(tp).replace("typing.", "")
+ elif tp.__module__ == 'numpy':
+ tp = "%s.%s" % (tp.__module__, tp.__name__)
+
+ return (tp, nullable)
+
+
+def camel_case_to_snake_case(string):
+ """Convert camelCase to snake_case"""
+ arr = []
+ word = []
+ for i, n in enumerate(string):
+ # If seeing an upper letter after a lower letter, we just witnessed a word
+ # If seeing an upper letter and the next letter is lower, we may have just
+ # witnessed an all caps word
+ if n.isupper() and ((i > 0 and string[i - 1].islower()) or
+ (i + 1 < len(string) and string[i + 1].islower())):
+ arr.append(''.join(word))
+ word = [n.lower()]
+ else:
+ word.append(n.lower())
+ arr.append(''.join(word))
+ return '_'.join(arr).strip('_')
+
+
+def get_wrappers_from_transform_configs(config_file) -> Dict[str, List[str]]:
+ """
+ Generates code for external transform wrapper classes (subclasses of
+ :class:`ExternalTransform`).
+
+ Takes a YAML file containing a list of SchemaTransform configurations. For
+ each configuration, the code for a wrapper class is generated, along with any
+ documentation that may be included.
+
+ Each configuration must include a destination file that the generated class
+ will be written to.
+
+ Returns the generated classes, grouped by destination.
+ """
+ from jinja2 import Environment
+ from jinja2 import FileSystemLoader
+
+ env = Environment(loader=FileSystemLoader(PYTHON_SDK_ROOT))
+ python_wrapper_template = env.get_template("python_xlang_wrapper.template")
+
+ # maintain a list of wrappers to write in each file. if modified destinations
+ # are used, we may end up with multiple wrappers in one file.
+ destinations: Dict[str, List[str]] = {}
+
+ with open(config_file) as f:
+ transforms = yaml.safe_load(f)
+ for config in transforms:
+ default_service = config['default_service']
+ description = config['description']
+ destination = config['destinations']['python']
+ name = config['name']
+ fields = config['fields']
+ identifier = config['identifier']
+
+ parameters = []
+ for param, info in fields.items():
+ pythonic_name = camel_case_to_snake_case(param)
+ param_details = {
+ "name": pythonic_name,
+ "type": info['type'],
+ "description": info['description'],
+ }
+
+ if info['nullable']:
+ param_details["default"] = None
+ parameters.append(param_details)
+
+ # Python syntax requires function definitions to have
+ # non-default parameters first
+ parameters = sorted(parameters, key=lambda p: 'default' in p)
+ default_service = f"BeamJarExpansionService(\"{default_service}\")"
+
+ python_wrapper_class = python_wrapper_template.render(
+ class_name=name,
+ identifier=identifier,
+ parameters=parameters,
+ description=description,
+ default_expansion_service=default_service)
+
+ if destination not in destinations:
+ destinations[destination] = []
+ destinations[destination].append(python_wrapper_class)
+
+ return destinations
+
+
+def write_wrappers_to_destinations(
+ grouped_wrappers: Dict[str, List[str]],
+ output_dir=PY_WRAPPER_OUTPUT_DIR,
+ format_code=True):
+ """
+ Takes a dictionary of generated wrapper code, grouped by destination.
+ For each destination, create a new file containing the respective wrapper
+ classes. Each file includes the Apache License header and relevant imports.
+ Note: the Jinja template should already follow linting and formatting rules.
+ """
+ written_files = []
+ for dest, wrappers in grouped_wrappers.items():
+ module_name = dest.replace('apache_beam/', '').replace('/', '_')
+ module_path = os.path.join(output_dir, module_name) + ".py"
+ with open(module_path, "w") as file:
+ file.write(LICENSE_HEADER.lstrip())
+ file.write(
+ "\n# NOTE: This file contains autogenerated external transform(s)\n"
+ "# and should not be edited by hand.\n"
+ "# Refer to gen_xlang_wrappers.py for more info.\n\n")
+ file.write(
+ "\"\"\""
+ "Cross-language transforms in this module can be imported from the\n"
+ f":py:mod:`{dest.replace('/', '.')}` package."
+ "\"\"\"\n\n")
+ file.write(
+ "# pylint:disable=line-too-long\n\n"
+ "from apache_beam.transforms.external import "
+ "BeamJarExpansionService\n"
+ "from apache_beam.transforms.external_transform_provider "
+ "import ExternalTransform\n")
+ for wrapper in wrappers:
+ file.write(wrapper + "\n")
+ written_files.append(module_path)
+
+ logging.info("Created external transform wrapper modules: %s", written_files)
+
+ if format_code:
+ formatting_cmd = ['yapf', '--in-place', *written_files]
+ subprocess.run(formatting_cmd, capture_output=True, check=True)
+
+
+def delete_generated_files(root_dir):
+ """Scans for and deletes generated wrapper files."""
+ logging.info("Deleting external transform wrappers from dir %s", root_dir)
+ deleted_files = os.listdir(root_dir)
+ for file in deleted_files:
+ if file == '__init__.py':
+ deleted_files.remove(file)
+ continue
+ path = os.path.join(root_dir, file)
+ if os.path.isfile(path) or os.path.islink(path):
+ os.unlink(os.path.join(root_dir, file))
+ else:
+ shutil.rmtree(path)
+ logging.info("Successfully deleted files: %s", deleted_files)
+
+
+def run_script(
+ cleanup,
+ generate_config_only,
+ input_expansion_services,
+ transforms_config_source):
+ # Cleanup first if requested. This is needed to remove outdated wrappers.
+ if cleanup:
+ delete_generated_files(PY_WRAPPER_OUTPUT_DIR)
+
+ # This step requires the expansion service.
+ # Only generate a transforms config file if none are provided
+ if not transforms_config_source:
+ output_transforms_config = os.path.join(
+ PROJECT_ROOT, 'sdks', 'standard_external_transforms.yaml')
+ generate_transforms_config(
+ input_services=input_expansion_services,
+ output_file=output_transforms_config)
+
+ transforms_config_source = output_transforms_config
+ else:
+ if not os.path.exists(transforms_config_source):
+ raise RuntimeError(
+ "Could not find the provided transforms config "
+ f"source: {transforms_config_source}")
+
+ if generate_config_only:
+ return
+
+ wrappers_grouped_by_destination = get_wrappers_from_transform_configs(
+ transforms_config_source)
+
+ write_wrappers_to_destinations(wrappers_grouped_by_destination)
+
+
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ '--cleanup',
+ dest='cleanup',
+ action='store_true',
+ help="Whether to cleanup existing generated wrappers first.")
+ parser.add_argument(
+ '--generate-config-only',
+ dest='generate_config_only',
+ action='store_true',
+ help="If set, will generate the transform config only without generating"
+ "any wrappers.")
+ parser.add_argument(
+ '--input-expansion-services',
+ dest='input_expansion_services',
+ default=os.path.join(
+ PROJECT_ROOT, 'sdks', 'standard_expansion_services.yaml'),
+ help=(
+ "Absolute path to the input YAML file that contains "
+ "expansion service configs. Ignored if a transforms config"
+ "source is provided."))
+ parser.add_argument(
+ '--transforms-config-source',
+ dest='transforms_config_source',
+ help=(
+ "Absolute path to a source transforms config YAML file to "
+ "generate wrapper modules from. If not provided, one will be "
+ "created by this script."))
+ args = parser.parse_args()
+
+ run_script(
+ args.cleanup,
+ args.generate_config_only,
+ args.input_expansion_services,
+ args.transforms_config_source)
diff --git a/sdks/python/pyproject.toml b/sdks/python/pyproject.toml
index f1a65c842d5..9829671ccb7 100644
--- a/sdks/python/pyproject.toml
+++ b/sdks/python/pyproject.toml
@@ -29,6 +29,12 @@ requires = [
"numpy>=1.14.3,<1.27", # Update setup.py as well.
# having cython here will create wheels that are platform dependent.
"cython==0.29.36",
+ ## deps for generating external transform wrappers:
+ # also update PyYaml bounds in sdks:python:generateExternalTransformsConfig
+ 'pyyaml>=3.12,<7.0.0',
+ # also update Jinja2 bounds in test-suites/xlang/build.gradle (look for
xlangWrapperValidation task)
+ "jinja2>=2.7.1,<4.0.0",
+ 'yapf==0.29.0'
]
diff --git a/sdks/python/pytest.ini b/sdks/python/pytest.ini
index c95aa5974da..e78697169bb 100644
--- a/sdks/python/pytest.ini
+++ b/sdks/python/pytest.ini
@@ -34,6 +34,7 @@ markers =
uses_java_expansion_service: collect Cross Language Java transforms test
runs
uses_python_expansion_service: collect Cross Language Python transforms
test runs
uses_io_java_expansion_service: collect Cross Language IO Java transform
test runs (with Kafka bootstrap server)
+ xlang_wrapper_generation: collect tests that validate Cross Language
wrapper generation
uses_transform_service: collect Cross Language test runs that uses the
Transform Service
xlang_sql_expansion_service: collect for Cross Language with SQL expansion
service test runs
it_postcommit: collect for post-commit integration test runs
diff --git a/sdks/python/python_xlang_wrapper.template
b/sdks/python/python_xlang_wrapper.template
new file mode 100644
index 00000000000..f3d3728aace
--- /dev/null
+++ b/sdks/python/python_xlang_wrapper.template
@@ -0,0 +1,36 @@
+{#
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+#}
+
+class {{ class_name }}(ExternalTransform):
+{% if description %} """
+ {{ description | wordwrap(78) | replace('\n', '\n ') }}
+ """{% endif %}
+ identifier = "{{ identifier }}"
+
+ def __init__(
+ self,{% if parameters %}{% for param in parameters%}
+ {{ param.name }}{% if 'default' in param %}={{ param.default }}{% endif
%},{% endfor %}{% endif %}
+ expansion_service=None):
+ {% if parameters %}"""{% for param in parameters %}
+ :param {{ param.name }}: ({{ param.type }}){% if param.description %}
+ {{ param.description | wordwrap(72) | replace('\n', '\n ') }} {%
endif %}{% endfor %}
+ """{% endif %}
+ self.default_expansion_service = {{ default_expansion_service }}
+ super().__init__(
+ {% for param in parameters %}{{ param.name }}={{ param.name }},
+ {% endfor %}expansion_service=expansion_service)
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index a5a14c035dc..409951cbc41 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -16,8 +16,8 @@
#
"""Apache Beam SDK for Python setup file."""
-
import glob
+import logging
import os
import shutil
import subprocess
@@ -227,6 +227,50 @@ def copy_tests_from_docs():
f'Could not locate yaml docs in {docs_src} or {docs_dest}.')
+def generate_external_transform_wrappers():
+ try:
+ sdk_dir = os.path.abspath(os.path.dirname(__file__))
+ script_exists = os.path.exists(
+ os.path.join(sdk_dir, 'gen_xlang_wrappers.py'))
+ config_exists = os.path.exists(
+ os.path.join(os.path.dirname(sdk_dir),
+ 'standard_external_transforms.yaml'))
+ # we need both the script and the standard transforms config file.
+ # at build time, we don't have access to apache_beam to discover and
+ # retrieve external transforms, so the config file has to already exist
+ if not script_exists or not config_exists:
+ generated_transforms_dir = os.path.join(
+ sdk_dir, 'apache_beam', 'transforms', 'xlang')
+
+ # if exists, this directory will have at least its __init__.py file
+ if (not os.path.exists(generated_transforms_dir) or
+ len(os.listdir(generated_transforms_dir)) <= 1):
+ message = 'External transform wrappers have not been generated '
+ if not script_exists:
+ message += 'and the generation script `gen_xlang_wrappers.py`'
+ if not config_exists:
+ message += 'and the standard external transforms config'
+ message += ' could not be found'
+ raise RuntimeError(message)
+ else:
+ logging.info(
+ 'Skipping external transform wrapper generation as they '
+ 'are already generated.')
+ return
+ subprocess.run([
+ sys.executable,
+ os.path.join(sdk_dir, 'gen_xlang_wrappers.py'),
+ '--cleanup',
+ '--transforms-config-source',
+ os.path.join(os.path.dirname(sdk_dir),
+ 'standard_external_transforms.yaml')
+ ], capture_output=True, check=True)
+ except subprocess.CalledProcessError as err:
+ raise RuntimeError(
+ 'Could not generate external transform wrappers due to '
+ 'error: %s', err.stderr)
+
+
def get_portability_package_data():
files = []
portability_dir = Path(__file__).parent / 'apache_beam' / \
@@ -253,6 +297,8 @@ if __name__ == '__main__':
# executes below.
generate_protos_first()
+ generate_external_transform_wrappers()
+
# These data files live elsewhere in the full Beam repository.
copy_tests_from_docs()
@@ -385,7 +431,6 @@ if __name__ == '__main__':
'testcontainers[mysql]>=3.0.3,<4.0.0',
'cryptography>=41.0.2',
'hypothesis>5.0.0,<=7.0.0',
- 'pyyaml>=3.12,<7.0.0',
],
'gcp': [
'cachetools>=3.1.0,<6',
diff --git a/sdks/python/test-suites/dataflow/common.gradle
b/sdks/python/test-suites/dataflow/common.gradle
index d29d85af301..cadf3a6ae2c 100644
--- a/sdks/python/test-suites/dataflow/common.gradle
+++ b/sdks/python/test-suites/dataflow/common.gradle
@@ -536,10 +536,8 @@ def dataflowRegion =
project.findProperty('dataflowRegion') ?: 'us-central1'
project(":sdks:python:test-suites:xlang").ext.xlangTasks.each { taskMetadata ->
createCrossLanguageUsingJavaExpansionTask(
name: taskMetadata.name,
- expansionProjectPath: taskMetadata.expansionProjectPath,
+ expansionProjectPaths: taskMetadata.expansionProjectPaths,
collectMarker: taskMetadata.collectMarker,
- startJobServer: taskMetadata.startJobServer,
- cleanupJobServer: taskMetadata.cleanupJobServer,
pythonPipelineOptions: [
"--runner=TestDataflowRunner",
"--project=${dataflowProject}",
@@ -548,6 +546,7 @@
project(":sdks:python:test-suites:xlang").ext.xlangTasks.each { taskMetadata ->
"--sdk_harness_container_image_overrides=.*java.*,gcr.io/apache-beam-testing/beam-sdk/beam_java8_sdk:latest"
],
pytestOptions: basicPytestOpts,
+ additionalDeps: taskMetadata.additionalDeps,
additionalEnvs: taskMetadata.additionalEnvs
)
}
diff --git a/sdks/python/test-suites/direct/build.gradle
b/sdks/python/test-suites/direct/build.gradle
index ea643c3303a..4b102534398 100644
--- a/sdks/python/test-suites/direct/build.gradle
+++ b/sdks/python/test-suites/direct/build.gradle
@@ -42,3 +42,10 @@ task ioCrossLanguagePostCommit {
dependsOn.add(":sdks:python:test-suites:direct:py${getVersionSuffix(it)}:ioCrossLanguagePythonUsingJava")
}
}
+
+task crossLanguageWrapperValidationPreCommit {
+ // Different python versions may output types that look different and lead to
+ // false failures. To be consistent, we test on the lowest version only
+ def lowestSupportedVersion =
getVersionsAsList('cross_language_validates_py_versions')[0]
+
dependsOn.add(":sdks:python:test-suites:direct:py${getVersionSuffix(lowestSupportedVersion)}:xlangWrapperValidationPythonUsingJava")
+}
diff --git a/sdks/python/test-suites/direct/common.gradle
b/sdks/python/test-suites/direct/common.gradle
index 657f7adf801..b5680c2e1e9 100644
--- a/sdks/python/test-suites/direct/common.gradle
+++ b/sdks/python/test-suites/direct/common.gradle
@@ -412,10 +412,8 @@ def gcpProject = project.findProperty('dataflowProject')
?: 'apache-beam-testing
project(":sdks:python:test-suites:xlang").ext.xlangTasks.each { taskMetadata ->
createCrossLanguageUsingJavaExpansionTask(
name: taskMetadata.name,
- expansionProjectPath: taskMetadata.expansionProjectPath,
+ expansionProjectPaths: taskMetadata.expansionProjectPaths,
collectMarker: taskMetadata.collectMarker,
- startJobServer: taskMetadata.startJobServer,
- cleanupJobServer: taskMetadata.cleanupJobServer,
numParallelTests: 1,
pythonPipelineOptions: [
"--runner=TestDirectRunner",
@@ -426,6 +424,8 @@
project(":sdks:python:test-suites:xlang").ext.xlangTasks.each { taskMetadata ->
"--timeout=4500", // timeout of whole command execution
"--color=yes", // console color
"--log-cli-level=INFO" //log level info
- ]
+ ],
+ additionalDeps: taskMetadata.additionalDeps,
+ additionalEnvs: taskMetadata.additionalEnvs
)
}
diff --git a/sdks/python/test-suites/xlang/build.gradle
b/sdks/python/test-suites/xlang/build.gradle
index 5a124ac20ce..3065ad8377e 100644
--- a/sdks/python/test-suites/xlang/build.gradle
+++ b/sdks/python/test-suites/xlang/build.gradle
@@ -16,57 +16,43 @@
* limitations under the License.
*/
// This is a base file to set up cross language tests for different runners
-import org.apache.beam.gradle.BeamModulePlugin
-import static org.apache.beam.gradle.BeamModulePlugin.CrossLanguageTaskCommon
+import static org.apache.beam.gradle.BeamModulePlugin.CrossLanguageTask
project.evaluationDependsOn(":sdks:python")
-// Set up cross language tests
-def envDir = project.project(":sdks:python").envdir
-def jobPort = BeamModulePlugin.getRandomPort()
-def tmpDir = System.getenv("TMPDIR") ?: System.getenv("WORKSPACE") ?: "/tmp"
-def pidFile = "${tmpDir}/local_job_service_main-${jobPort}.pid"
-
-def setupTask = project.tasks.register("fnApiJobServerSetup", Exec) {
- dependsOn ':sdks:python:installGcpTest'
-
- executable 'sh'
- args '-c', ". ${envDir}/bin/activate && python -m
apache_beam.runners.portability.local_job_service_main --job_port ${jobPort}
--pid_file ${pidFile} --background --stdout_file
${tmpDir}/beam-fnapi-job-server.log"
-}
-
-def cleanupTask = project.tasks.register("fnApiJobServerCleanup", Exec) {
- executable 'sh'
- args '-c', ". ${envDir}/bin/activate && python -m
apache_beam.runners.portability.local_job_service_main --pid_file ${pidFile}
--stop"
-}
-
-// List of objects representing task metadata to create cross-language tasks
from.
-// Each object contains the minimum relevant metadata.
-def xlangTasks = []
-
// ******** Java GCP expansion service ********
// Note: this only runs cross-language tests that use the Java GCP expansion
service
-// To run tests that use another expansion service, create a new
CrossLanguageTaskCommon with the
+// To run tests that use another expansion service, create a new
CrossLanguageTask with the
// relevant fields as done here, then add it to `xlangTasks`.
-def gcpExpansionProject =
project.project(':sdks:java:io:google-cloud-platform:expansion-service')
+def gcpExpansionPath =
project.project(':sdks:java:io:google-cloud-platform:expansion-service').getPath()
+def ioExpansionPath =
project.project(':sdks:java:io:expansion-service').getPath()
// Properties that are common across runners.
// Used to launch the expansion service, collect the right tests, and cleanup
afterwards
-def gcpXlangCommon = new CrossLanguageTaskCommon().tap {
+def gcpXlang = new CrossLanguageTask().tap {
name = "gcpCrossLanguage"
- expansionProjectPath = gcpExpansionProject.getPath()
+ expansionProjectPaths = [gcpExpansionPath]
collectMarker = "uses_gcp_java_expansion_service"
- startJobServer = setupTask
- cleanupJobServer = cleanupTask
}
-def ioXlangCommon = new CrossLanguageTaskCommon().tap {
+def ioXlang = new CrossLanguageTask().tap {
name = "ioCrossLanguage"
- expansionProjectPath =
project.project(':sdks:java:io:expansion-service').getPath()
+ expansionProjectPaths = [ioExpansionPath]
collectMarker = "uses_io_java_expansion_service"
- startJobServer = setupTask
- cleanupJobServer = cleanupTask
//See .test-infra/kafka/bitnami/README.md for setup instructions
additionalEnvs =
["KAFKA_BOOTSTRAP_SERVER":project.findProperty('kafkaBootstrapServer')]
}
-xlangTasks.addAll(gcpXlangCommon, ioXlangCommon)
+// This list should include all expansion service targets in
sdks/python/standard_expansion_services.yaml
+def servicesToGenerateFrom = [ioExpansionPath, gcpExpansionPath]
+def xlangWrapperValidation = new CrossLanguageTask().tap {
+ name = "xlangWrapperValidation"
+ expansionProjectPaths = servicesToGenerateFrom
+ collectMarker = "xlang_wrapper_generation"
+ // update Jinja2 bounds in pyproject.toml as well
+ additionalDeps = ['\"Jinja2>=2.7.1,<4.0.0\"']
+}
+
+// List of task metadata objects to create cross-language tasks from.
+// Each object contains the minimum relevant metadata.
+def xlangTasks = [gcpXlang, ioXlang, xlangWrapperValidation]
ext.xlangTasks = xlangTasks
\ No newline at end of file
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index f8e2b63a509..ca35c383eea 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -220,7 +220,7 @@ commands =
# --azure_managed_identity_client_id "abc123"
[testenv:py3-yapf]
-# keep the version of yapf in sync with the 'rev' in .pre-commit-config.yaml
+# keep the version of yapf in sync with the 'rev' in .pre-commit-config.yaml
and pyproject.toml
deps =
yapf==0.29.0
commands =
@@ -228,7 +228,7 @@ commands =
time yapf --in-place --parallel --recursive apache_beam
[testenv:py3-yapf-check]
-# keep the version of yapf in sync with the 'rev' in .pre-commit-config.yaml
+# keep the version of yapf in sync with the 'rev' in .pre-commit-config.yaml
and pyproject.toml
deps =
yapf==0.29.0
commands =
diff --git a/sdks/standard_expansion_services.yaml
b/sdks/standard_expansion_services.yaml
new file mode 100644
index 00000000000..e9e6871be82
--- /dev/null
+++ b/sdks/standard_expansion_services.yaml
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# This file enumerates the standard Apache Beam expansion services.
+# Each service must specify a package destination for each supported SDK, which
+# is where generated wrappers will go by default.
+#
+# Individual transforms can modify their destination module as well as their
+# generated wrapper class name.
+#
+# Transform identifiers listed in the `skip_transforms` field will be skipped.
+#
+# Any new gradle targets added here should also be added to:
+# - sdks/python/build.gradle (as a dependency in the
'generateExternalTransformsConfig' task)
+# - sdks/python/test-suites/xlang/build.gradle (look for
'servicesToGenerateFrom')
+#
+# Refer to sdks/python/gen_xlang_wrappers.py for more info.
+
+- gradle_target: 'sdks:java:io:expansion-service:shadowJar'
+ destinations:
+ python: 'apache_beam/io'
+ transforms:
+ 'beam:schematransform:org.apache.beam:kafka_write:v1':
+ name: 'WriteToKafka'
+ 'beam:schematransform:org.apache.beam:kafka_read:v1':
+ name: 'ReadFromKafka'
+ skip_transforms:
+ # Handwritten Kafka wrappers already exist in apache_beam/io/kafka.py
+ - 'beam:schematransform:org.apache.beam:kafka_write:v1'
+ - 'beam:schematransform:org.apache.beam:kafka_read:v1'
+
+# TODO(ahmedabu98): Enable this service in a future PR
+#- gradle_target:
'sdks:java:io:google-cloud-platform:expansion-service:shadowJar'
+# destinations:
+# python: 'apache_beam/io/gcp'
+# transforms:
+# 'beam:schematransform:org.apache.beam:spanner_cdc_read:v1':
+# name: 'ReadFromSpannerChangeStreams'
+# skip_transforms:
+# # generate_sequence is already included in the Java IO expansion service
+# - 'beam:schematransform:org.apache.beam:generate_sequence:v1'
+# # Handwritten wrappers exist in apache_beam/io/gcp/pubsublite/
+# - 'beam:schematransform:org.apache.beam:pubsublite_read:v1'
+# - 'beam:schematransform:org.apache.beam:pubsublite_write:v1'
+# # Handwritten wrapper exists in apache_beam/io/gcp/spanner.py
+# - 'beam:schematransform:org.apache.beam:spanner_write:v1'
+# # Native IO exists in apache_beam/io/gcp/pubsub.py
+# - 'beam:schematransform:org.apache.beam:pubsub_read:v1'
+# - 'beam:schematransform:org.apache.beam:pubsub_write:v1'
+# # Native IO exists in apache_beam/io/gcp/bigquery.py
+# - 'beam:schematransform:org.apache.beam:bigquery_fileloads_write:v1'
+# - 'beam:schematransform:org.apache.beam:bigquery_export_read:v1'
+# - 'beam:schematransform:org.apache.beam:bigquery_storage_write:v2'
+# - 'beam:schematransform:org.apache.beam:bigquery_storage_read:v1'
+## - 'beam:schematransform:org.apache.beam:bigquery_storage_write:v2'
+# # Handwritten wrappers exists in apache_beam/io/jdbc.py
+# - 'beam:schematransform:org.apache.beam:jdbc_write:v1'
+# - 'beam:schematransform:org.apache.beam:jdbc_read:v1'
+# # Handwritten wrappers exist in apache_beam/io/gcp/bigtableio.py
+# - 'beam:schematransform:org.apache.beam:bigtable_write:v1'
+# - 'beam:schematransform:org.apache.beam:bigtable_read:v1'
diff --git a/sdks/standard_external_transforms.yaml
b/sdks/standard_external_transforms.yaml
new file mode 100644
index 00000000000..b43e93ab491
--- /dev/null
+++ b/sdks/standard_external_transforms.yaml
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# NOTE: This file is autogenerated and should not be edited by hand.
+# Configs are generated based on the expansion service
+# configuration in /sdks/standard_expansion_services.yaml.
+# Refer to gen_xlang_wrappers.py for more info.
+#
+# Last updated on: 2024-02-22
+
+- default_service: sdks:java:io:expansion-service:shadowJar
+ description: 'Outputs a PCollection of Beam Rows, each containing a single
INT64
+ number called "value". The count is produced from the given "start" value
and
+ either up to the given "end" or until 2^63 - 1.
+
+ To produce an unbounded PCollection, simply do not specify an "end" value.
Unbounded
+ sequences can specify a "rate" for output elements.
+
+ In all cases, the sequence of numbers is generated in parallel, so there
is no
+ inherent ordering between the generated values'
+ destinations:
+ python: apache_beam/io
+ fields:
+ end:
+ description: The maximum number to generate (exclusive). Will be an
unbounded
+ sequence if left unspecified.
+ nullable: true
+ type: numpy.int64
+ rate:
+ description: Specifies the rate to generate a given number of elements
per a
+ given number of seconds. Applicable only to unbounded sequences.
+ nullable: true
+ type: Row(seconds=typing.Union[numpy.int64, NoneType], elements=<class
'numpy.int64'>)
+ start:
+ description: The minimum number to generate (inclusive).
+ nullable: false
+ type: numpy.int64
+ identifier: beam:schematransform:org.apache.beam:generate_sequence:v1
+ name: GenerateSequence