robertwb commented on code in PR #29834: URL: https://github.com/apache/beam/pull/29834#discussion_r1486885504
########## sdks/standard_expansion_services.yaml: ########## @@ -0,0 +1,87 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# This file enumerates the standard Apache Beam expansion services. +# Each service must specify a package destination for each supported SDK, which +# is where generated wrappers will go by default. +# +# Individual transforms can modify their destination module as well as their +# generated wrapper class name. +# +# Transform identifiers listed in the `skip_transforms` field will be skipped. +# +# Any new gradle targets added here should also be added to: +# - sdks/python/build.gradle (as a dependency in the 'generateExternalTransformsConfig' task) +# - sdks/python/test-suites/xlang/build.gradle (look for 'servicesToGenerateFrom') +# +# Refer to sdks/python/gen_xlang_wrappers.py for more info. + +- gradle_target: 'sdks:java:io:expansion-service:shadowJar' + destinations: + python: 'apache_beam/io' + transforms: + 'beam:schematransform:org.apache.beam:kafka_write:v1': + name: 'WriteToKafka' + 'beam:schematransform:org.apache.beam:kafka_read:v1': + name: 'ReadFromKafka' + skip_transforms: + # Handwritten Kafka wrappers already exist in apache_beam/io/kafka.py + - 'beam:schematransform:org.apache.beam:kafka_write:v1' + - 'beam:schematransform:org.apache.beam:kafka_read:v1' + +# TODO(ahmedabu98): Enable this service in a future PR +#- gradle_target: 'sdks:java:io:google-cloud-platform:expansion-service:shadowJar' +# destinations: +# python: 'apache_beam/io/gcp' +# transforms: +# 'beam:schematransform:org.apache.beam:bigquery_storage_write:v2': +# name: 'StorageWriteToBigQuery' +# 'beam:schematransform:org.apache.beam:bigquery_storage_read:v1': +# name: 'StorageReadFromBigQuery' +# 'beam:schematransform:org.apache.beam:bigquery_fileloads_write:v1': +# name: 'FileLoadsToBigQuery' +# 'beam:schematransform:org.apache.beam:bigquery_export_read:v1': +# name: 'ExportReadFromBigQuery' +# 'beam:schematransform:org.apache.beam:bigtable_write:v1': +# name: 'WriteToBigtable' +# 'beam:schematransform:org.apache.beam:bigtable_read:v1': +# name: 'ReadFromBigtable' +# 'beam:schematransform:org.apache.beam:pubsub_read:v1': +# name: 'ReadFromPubSub' +# 'beam:schematransform:org.apache.beam:pubsub_write:v1': +# name: 'WriteToPubSub' Review Comment: FWIW, I'm not sure we want to offer these redundantly with what is already available in Python (wrapped or native), at least not under the same name. ########## sdks/python/setup.py: ########## @@ -205,6 +204,51 @@ def generate_protos_first(): err.stderr) +def generate_external_transform_wrappers(): + try: + sdks_dir = os.path.realpath( + os.path.join(os.path.realpath(__file__), '..', '..')) + script_exists = os.path.exists( + os.path.join(sdks_dir, 'python', 'gen_xlang_wrappers.py')) + config_exists = os.path.exists( + os.path.join(sdks_dir, 'standard_external_transforms.yaml')) + # we need both the script and the standard transforms config file. + # at build time, we don't have access to apache_beam to discover and + # retrieve external transforms, so the config file has to already exist + if not script_exists or not config_exists: + generated_transforms_dir = os.path.join( + sdks_dir, 'python', 'apache_beam', + 'transforms', '_external_transforms') + + # if exists, this directory will have at least its __init__.py file + if (not os.path.exists(generated_transforms_dir) or + len(os.listdir(generated_transforms_dir)) <= 1): + message = 'External transform wrappers have not been generated ' + if not script_exists: + message += 'and the generation script `gen_xlang_wrappers.py`' + if not config_exists: + message += 'and the standard external transforms config' + message += ' could not be found' + warnings.warn(message) + else: + warnings.warn( + 'Skipping external transform wrapper generation as they ' + 'are already generated.') + return + out = subprocess.run([ Review Comment: This will make pretty much every run of setup.py slow, right? What if we check the timestamp of the jar files vs. that of the yaml file? ########## buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy: ########## @@ -2587,40 +2587,49 @@ class BeamModulePlugin implements Plugin<Project> { def config = it ? it as CrossLanguageUsingJavaExpansionConfiguration : new CrossLanguageUsingJavaExpansionConfiguration() project.evaluationDependsOn(":sdks:python") - project.evaluationDependsOn(config.expansionProjectPath) + for (path in config.expansionProjectPaths) { + project.evaluationDependsOn(path) + } project.evaluationDependsOn(":runners:core-construction-java") project.evaluationDependsOn(":sdks:java:extensions:python") // Setting up args to launch the expansion service def pythonDir = project.project(":sdks:python").projectDir - def javaExpansionPort = -1 // will be populated in setupTask - def expansionJar = project.project(config.expansionProjectPath).shadowJar.archivePath - def javaClassLookupAllowlistFile = project.project(config.expansionProjectPath).projectDir.getPath() - def expansionServiceOpts = [ - "group_id": project.name, - "java_expansion_service_jar": expansionJar, - "java_expansion_service_allowlist_file": javaClassLookupAllowlistFile, - ] + // initialize all expansion ports to -1. Will be populated in setupTask + def javaExpansionPorts = config.expansionProjectPaths.inject([:]) { map, k -> map[k] = -1; map } + def usesDataflowRunner = config.pythonPipelineOptions.contains("--runner=TestDataflowRunner") || config.pythonPipelineOptions.contains("--runner=DataflowRunner") def javaContainerSuffix = getSupportedJavaVersion() // 1. Builds the chosen expansion service jar and launches it def setupTask = project.tasks.register(config.name+"Setup") { dependsOn ':sdks:java:container:' + javaContainerSuffix + ':docker' - dependsOn project.project(config.expansionProjectPath).shadowJar.getPath() + for (path in config.expansionProjectPaths) { + dependsOn project.project(path).shadowJar.getPath() + } dependsOn 'installGcpTest' if (usesDataflowRunner) { dependsOn ":sdks:python:test-suites:dataflow:py${project.ext.pythonVersion.replace('.', '')}:initializeForDataflowJob" } doLast { - project.exec { - // Prepare a port to use for the expansion service - javaExpansionPort = getRandomPort() - expansionServiceOpts.put("java_port", javaExpansionPort) - // setup test env - def serviceArgs = project.project(':sdks:python').mapToArgString(expansionServiceOpts) - executable 'sh' - args '-c', ". ${project.ext.envdir}/bin/activate && $pythonDir/scripts/run_expansion_services.sh stop --group_id ${project.name} && $pythonDir/scripts/run_expansion_services.sh start $serviceArgs" + // iterate through list of expansion service paths and build each jar + for (path in config.expansionProjectPaths) { + project.exec { + def expansionJar = project.project(path).shadowJar.archivePath + def javaClassLookupAllowlistFile = project.project(path).projectDir.getPath() + def expansionServiceOpts = [ + "group_id": project.name, + "java_expansion_service_jar": expansionJar, + "java_expansion_service_allowlist_file": javaClassLookupAllowlistFile, + ] + // Prepare a port to use for the expansion service + javaExpansionPorts[path] = getRandomPort() + expansionServiceOpts.put("java_port", javaExpansionPorts[path]) + // setup test env + def serviceArgs = project.project(':sdks:python').mapToArgString(expansionServiceOpts) + executable 'sh' + args '-c', ". ${project.ext.envdir}/bin/activate && $pythonDir/scripts/run_expansion_services.sh stop --group_id ${project.name} && $pythonDir/scripts/run_expansion_services.sh start $serviceArgs" Review Comment: If we don't do this now, could you at least file a bug and drop a TODO. ########## .github/workflows/beam_PreCommit_Xlang_Generated_Transforms.yml: ########## @@ -0,0 +1,176 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: PreCommit Xlang Generated Transforms + +on: + push: + tags: ['v*'] + branches: ['master', 'release-*'] + paths: + - 'model/**' + - 'sdks/python/**' + - 'sdks/java/expansion-service/**' + - 'sdks/java/core/**' + - 'sdks/java/io/**' + - 'sdks/java/extensions/sql/**' + - '!sdks/java/io/amazon-web-services/**' Review Comment: Is there a reason to explicitly exclude these directories? (This seems a long list that could easily get out of date, and the risk of some transform service depending on these in the future might be non-obvious.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
