robertwb commented on code in PR #29834:
URL: https://github.com/apache/beam/pull/29834#discussion_r1476360807
##########
.gitignore:
##########
@@ -52,6 +52,7 @@ sdks/python/**/*.egg
sdks/python/LICENSE
sdks/python/NOTICE
sdks/python/README.md
+sdks/python/apache_beam/transforms/_external_transforms/*
Review Comment:
Maybe _generated_external_transforms, just to be very clear?
##########
sdks/python/apache_beam/io/__init__.py:
##########
@@ -36,6 +36,7 @@
from apache_beam.io.gcp.bigquery import *
from apache_beam.io.gcp.pubsub import *
from apache_beam.io.gcp import gcsio
+ from apache_beam.transforms._external_transforms.io import *
Review Comment:
One could automate this by having a file in _external_transforms that
injects these into the right modules. But this is less magical and seems a nice
balance (it only needs to be done per package).
##########
sdks/python/setup.py:
##########
@@ -205,6 +204,51 @@ def generate_protos_first():
err.stderr)
+def generate_external_transform_wrappers():
+ try:
+ sdks_dir = os.path.realpath(
+ os.path.join(os.path.realpath(__file__), '..', '..'))
+ script_exists = os.path.exists(
+ os.path.join(sdks_dir, 'python', 'gen_xlang_wrappers.py'))
+ config_exists = os.path.exists(
+ os.path.join(sdks_dir, 'standard_external_transforms.yaml'))
+ # we need both the script and the standard transforms config file.
+ # at build time, we don't have access to apache_beam to discover and
+ # retrieve external transforms, so the config file has to already exist
+ if not script_exists or not config_exists:
+ generated_transforms_dir = os.path.join(
+ sdks_dir, 'python', 'apache_beam',
+ 'transforms', '_external_transforms')
+
+ # if exists, this directory will have at least its __init__.py file
+ if (not os.path.exists(generated_transforms_dir) or
+ len(os.listdir(generated_transforms_dir)) <= 1):
+ message = 'External transform wrappers have not been generated '
+ if not script_exists:
+ message += 'and the generation script `gen_xlang_wrappers.py`'
+ if not config_exists:
+ message += 'and the standard external transforms config'
+ message += ' could not be found'
+ warnings.warn(message)
+ else:
+ warnings.warn(
Review Comment:
Will users see this every time?
##########
sdks/python/python_xlang_wrapper.template:
##########
@@ -0,0 +1,36 @@
+{#
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+#}
+
Review Comment:
Module level doc?
In particular, we should (here or in each docstring) note that the user
should import this from its proper location in the beam tree, not this module
directly. (E.g. I'm thinking of how it will show up in the generated docs).
##########
sdks/python/setup.py:
##########
@@ -205,6 +204,51 @@ def generate_protos_first():
err.stderr)
+def generate_external_transform_wrappers():
+ try:
+ sdks_dir = os.path.realpath(
+ os.path.join(os.path.realpath(__file__), '..', '..'))
+ script_exists = os.path.exists(
+ os.path.join(sdks_dir, 'python', 'gen_xlang_wrappers.py'))
+ config_exists = os.path.exists(
+ os.path.join(sdks_dir, 'standard_external_transforms.yaml'))
+ # we need both the script and the standard transforms config file.
+ # at build time, we don't have access to apache_beam to discover and
+ # retrieve external transforms, so the config file has to already exist
+ if not script_exists or not config_exists:
+ generated_transforms_dir = os.path.join(
+ sdks_dir, 'python', 'apache_beam',
+ 'transforms', '_external_transforms')
+
+ # if exists, this directory will have at least its __init__.py file
+ if (not os.path.exists(generated_transforms_dir) or
+ len(os.listdir(generated_transforms_dir)) <= 1):
+ message = 'External transform wrappers have not been generated '
+ if not script_exists:
+ message += 'and the generation script `gen_xlang_wrappers.py`'
+ if not config_exists:
+ message += 'and the standard external transforms config'
+ message += ' could not be found'
+ warnings.warn(message)
Review Comment:
Should this be an error?
##########
sdks/python/apache_beam/transforms/_external_transforms/__init__.py:
##########
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+This is a subdirectory for generated external transforms.
+Refer to sdks/python/gen_xlang_wrappers.py for more details.
Review Comment:
sdk/python won't be present for users who install this from pypi. Bare
gen_xlang_wrappers.py should be unambiguous enough, or one could say "the top
level gen_xlang_wrappers.py"
##########
sdks/python/apache_beam/transforms/external_transform_provider_test.py:
##########
@@ -14,26 +14,48 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+
import logging
import os
+import secrets
+import shutil
+import time
+import typing
import unittest
+from importlib import import_module
+import numpy
import pytest
+import yaml
import apache_beam as beam
+from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
+from apache_beam.transforms._external_transforms.io import GenerateSequence
from apache_beam.transforms.external import BeamJarExpansionService
from apache_beam.transforms.external_transform_provider import
STANDARD_URN_PATTERN
+from apache_beam.transforms.external_transform_provider import
ExternalTransform
from apache_beam.transforms.external_transform_provider import
ExternalTransformProvider
from apache_beam.transforms.external_transform_provider import
camel_case_to_snake_case
from apache_beam.transforms.external_transform_provider import
infer_name_from_identifier
from apache_beam.transforms.external_transform_provider import
snake_case_to_lower_camel_case
from apache_beam.transforms.external_transform_provider import
snake_case_to_upper_camel_case
+try:
Review Comment:
Why was this made conditional again?
##########
sdks/python/setup.py:
##########
@@ -205,6 +204,51 @@ def generate_protos_first():
err.stderr)
+def generate_external_transform_wrappers():
+ try:
+ sdks_dir = os.path.realpath(
+ os.path.join(os.path.realpath(__file__), '..', '..'))
+ script_exists = os.path.exists(
+ os.path.join(sdks_dir, 'python', 'gen_xlang_wrappers.py'))
+ config_exists = os.path.exists(
+ os.path.join(sdks_dir, 'standard_external_transforms.yaml'))
+ # we need both the script and the standard transforms config file.
+ # at build time, we don't have access to apache_beam to discover and
+ # retrieve external transforms, so the config file has to already exist
+ if not script_exists or not config_exists:
+ generated_transforms_dir = os.path.join(
+ sdks_dir, 'python', 'apache_beam',
+ 'transforms', '_external_transforms')
+
+ # if exists, this directory will have at least its __init__.py file
+ if (not os.path.exists(generated_transforms_dir) or
+ len(os.listdir(generated_transforms_dir)) <= 1):
+ message = 'External transform wrappers have not been generated '
+ if not script_exists:
+ message += 'and the generation script `gen_xlang_wrappers.py`'
+ if not config_exists:
+ message += 'and the standard external transforms config'
+ message += ' could not be found'
+ warnings.warn(message)
+ else:
+ warnings.warn(
+ 'Skipping external transform wrapper generation as they '
+ 'are already generated.')
+ return
+ out = subprocess.run([
+ sys.executable,
+ os.path.join(sdks_dir, 'python', 'gen_xlang_wrappers.py'),
+ '--cleanup',
+ '--transforms-config-source',
+ os.path.join(sdks_dir, 'standard_external_transforms.yaml')
+ ], capture_output=True, check=True)
+ print(out.stdout)
Review Comment:
Maybe don't capture the stdout if we are just going to print it?
##########
buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy:
##########
@@ -2587,40 +2587,49 @@ class BeamModulePlugin implements Plugin<Project> {
def config = it ? it as CrossLanguageUsingJavaExpansionConfiguration :
new CrossLanguageUsingJavaExpansionConfiguration()
project.evaluationDependsOn(":sdks:python")
- project.evaluationDependsOn(config.expansionProjectPath)
+ for (path in config.expansionProjectPaths) {
+ project.evaluationDependsOn(path)
+ }
project.evaluationDependsOn(":runners:core-construction-java")
project.evaluationDependsOn(":sdks:java:extensions:python")
// Setting up args to launch the expansion service
def pythonDir = project.project(":sdks:python").projectDir
- def javaExpansionPort = -1 // will be populated in setupTask
- def expansionJar =
project.project(config.expansionProjectPath).shadowJar.archivePath
- def javaClassLookupAllowlistFile =
project.project(config.expansionProjectPath).projectDir.getPath()
- def expansionServiceOpts = [
- "group_id": project.name,
- "java_expansion_service_jar": expansionJar,
- "java_expansion_service_allowlist_file": javaClassLookupAllowlistFile,
- ]
+ // initialize all expansion ports to -1. Will be populated in setupTask
+ def javaExpansionPorts = config.expansionProjectPaths.inject([:]) { map,
k -> map[k] = -1; map }
+
def usesDataflowRunner =
config.pythonPipelineOptions.contains("--runner=TestDataflowRunner") ||
config.pythonPipelineOptions.contains("--runner=DataflowRunner")
def javaContainerSuffix = getSupportedJavaVersion()
// 1. Builds the chosen expansion service jar and launches it
def setupTask = project.tasks.register(config.name+"Setup") {
dependsOn ':sdks:java:container:' + javaContainerSuffix + ':docker'
- dependsOn
project.project(config.expansionProjectPath).shadowJar.getPath()
+ for (path in config.expansionProjectPaths) {
+ dependsOn project.project(path).shadowJar.getPath()
+ }
dependsOn 'installGcpTest'
if (usesDataflowRunner) {
dependsOn
":sdks:python:test-suites:dataflow:py${project.ext.pythonVersion.replace('.',
'')}:initializeForDataflowJob"
}
doLast {
- project.exec {
- // Prepare a port to use for the expansion service
- javaExpansionPort = getRandomPort()
- expansionServiceOpts.put("java_port", javaExpansionPort)
- // setup test env
- def serviceArgs =
project.project(':sdks:python').mapToArgString(expansionServiceOpts)
- executable 'sh'
- args '-c', ". ${project.ext.envdir}/bin/activate &&
$pythonDir/scripts/run_expansion_services.sh stop --group_id ${project.name} &&
$pythonDir/scripts/run_expansion_services.sh start $serviceArgs"
+ // iterate through list of expansion service paths and build each jar
+ for (path in config.expansionProjectPaths) {
+ project.exec {
+ def expansionJar = project.project(path).shadowJar.archivePath
+ def javaClassLookupAllowlistFile =
project.project(path).projectDir.getPath()
+ def expansionServiceOpts = [
+ "group_id": project.name,
+ "java_expansion_service_jar": expansionJar,
+ "java_expansion_service_allowlist_file":
javaClassLookupAllowlistFile,
+ ]
+ // Prepare a port to use for the expansion service
+ javaExpansionPorts[path] = getRandomPort()
+ expansionServiceOpts.put("java_port", javaExpansionPorts[path])
+ // setup test env
+ def serviceArgs =
project.project(':sdks:python').mapToArgString(expansionServiceOpts)
+ executable 'sh'
+ args '-c', ". ${project.ext.envdir}/bin/activate &&
$pythonDir/scripts/run_expansion_services.sh stop --group_id ${project.name} &&
$pythonDir/scripts/run_expansion_services.sh start $serviceArgs"
Review Comment:
Maybe we could defer this to a future CL, but possibly we could still use
(and test) the auto-jar-expansion-service setup here rather than manually
starting the services and injecting all the ports? @chamikaramj
##########
sdks/python/setup.py:
##########
@@ -205,6 +204,51 @@ def generate_protos_first():
err.stderr)
+def generate_external_transform_wrappers():
+ try:
+ sdks_dir = os.path.realpath(
+ os.path.join(os.path.realpath(__file__), '..', '..'))
+ script_exists = os.path.exists(
+ os.path.join(sdks_dir, 'python', 'gen_xlang_wrappers.py'))
+ config_exists = os.path.exists(
+ os.path.join(sdks_dir, 'standard_external_transforms.yaml'))
+ # we need both the script and the standard transforms config file.
+ # at build time, we don't have access to apache_beam to discover and
+ # retrieve external transforms, so the config file has to already exist
+ if not script_exists or not config_exists:
+ generated_transforms_dir = os.path.join(
+ sdks_dir, 'python', 'apache_beam',
+ 'transforms', '_external_transforms')
+
+ # if exists, this directory will have at least its __init__.py file
+ if (not os.path.exists(generated_transforms_dir) or
+ len(os.listdir(generated_transforms_dir)) <= 1):
+ message = 'External transform wrappers have not been generated '
+ if not script_exists:
+ message += 'and the generation script `gen_xlang_wrappers.py`'
+ if not config_exists:
+ message += 'and the standard external transforms config'
+ message += ' could not be found'
+ warnings.warn(message)
+ else:
+ warnings.warn(
+ 'Skipping external transform wrapper generation as they '
+ 'are already generated.')
+ return
+ out = subprocess.run([
Review Comment:
Perhaps this should only be run if they're not up to date?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]