This is an automated email from the ASF dual-hosted git repository.
heejong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b08a484 [BEAM-11592] Adding cross-language test using third-party
Python dependencies
new 42826cd Merge pull request #13729 from ihji/BEAM-11592
b08a484 is described below
commit b08a4842cc666030ad4c963321ec838f5d54d18b
Author: Heejong Lee <[email protected]>
AuthorDate: Fri Jan 8 20:15:54 2021 -0800
[BEAM-11592] Adding cross-language test using third-party Python
dependencies
---
.../org/apache/beam/gradle/BeamModulePlugin.groovy | 29 ++++++++++++++++++---
.../core/construction/ValidateRunnerXlangTest.java | 24 +++++++++++++++++
runners/direct-java/build.gradle | 2 ++
runners/flink/job-server/flink_job_server.gradle | 1 +
runners/google-cloud-dataflow-java/build.gradle | 1 +
runners/spark/build.gradle | 2 ++
.../sdk/testing/UsesPythonExpansionService.java | 27 +++++++++++++++++++
.../runners/portability/artifact_service.py | 3 ++-
.../runners/portability/expansion_service_test.py | 30 ++++++++++++++++++++++
9 files changed, 115 insertions(+), 4 deletions(-)
diff --git
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 773fbbc..a78836c 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -2060,6 +2060,30 @@ class BeamModulePlugin implements Plugin<Project> {
cleanupTask.mustRunAfter pythonTask
config.cleanupJobServer.mustRunAfter pythonTask
}
+
+ // Task for running Python-only testcases in Java SDK
+ def javaUsingPythonOnlyTask = project.tasks.create(name:
config.name+"JavaUsingPythonOnly", type: Test) {
+ group = "Verification"
+ description = "Validates runner for cross-language capability of using
Python-only transforms from Java SDK"
+ systemProperty "beamTestPipelineOptions",
JsonOutput.toJson(config.javaPipelineOptions)
+ systemProperty "expansionJar", expansionJar
+ systemProperty "expansionPort", pythonPort
+ classpath = config.classpath
+ testClassesDirs =
project.files(project.project(":runners:core-construction-java").sourceSets.test.output.classesDirs)
+ maxParallelForks config.numParallelTests
+ useJUnit {
+ includeCategories
'org.apache.beam.sdk.testing.UsesPythonExpansionService'
+ }
+ // increase maxHeapSize as this is directly correlated to direct
memory,
+ // see https://issues.apache.org/jira/browse/BEAM-6698
+ maxHeapSize = '4g'
+ dependsOn setupTask
+ dependsOn config.startJobServer
+ }
+ mainTask.dependsOn javaUsingPythonOnlyTask
+ cleanupTask.mustRunAfter javaUsingPythonOnlyTask
+ config.cleanupJobServer.mustRunAfter javaUsingPythonOnlyTask
+
// Task for running testcases in Python SDK
def testOpts = [
"--attr=UsesSqlExpansionService"
@@ -2075,13 +2099,12 @@ class BeamModulePlugin implements Plugin<Project> {
description = "Validates runner for cross-language capability of using
Java's SqlTransform from Python SDK"
executable 'sh'
args '-c', ". $envDir/bin/activate && cd $pythonDir &&
./scripts/run_integration_test.sh $cmdArgs"
+ dependsOn setupTask
dependsOn config.startJobServer
- dependsOn ':sdks:java:container:java8:docker'
- dependsOn ':sdks:python:container:py'+pythonContainerSuffix+':docker'
dependsOn ':sdks:java:extensions:sql:expansion-service:shadowJar'
- dependsOn ":sdks:python:installGcpTest"
}
mainTask.dependsOn pythonSqlTask
+ cleanupTask.mustRunAfter pythonSqlTask
config.cleanupJobServer.mustRunAfter pythonSqlTask
}
diff --git
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ValidateRunnerXlangTest.java
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ValidateRunnerXlangTest.java
index 66e0133..40a0946 100644
---
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ValidateRunnerXlangTest.java
+++
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ValidateRunnerXlangTest.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.schemas.SchemaTranslation;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.UsesCrossLanguageTransforms;
+import org.apache.beam.sdk.testing.UsesPythonExpansionService;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
@@ -100,6 +101,7 @@ public class ValidateRunnerXlangTest implements
Serializable {
private static final String TEST_COMPK_URN =
"beam:transforms:xlang:test:compk";
private static final String TEST_FLATTEN_URN =
"beam:transforms:xlang:test:flatten";
private static final String TEST_PARTITION_URN =
"beam:transforms:xlang:test:partition";
+ private static final String TEST_PYTHON_BS4_URN =
"beam:transforms:xlang:test:python_bs4";
private static String expansionAddr;
private static String expansionJar;
@@ -317,6 +319,28 @@ public class ValidateRunnerXlangTest implements
Serializable {
PAssert.that(col.get("1")).containsInAnyOrder(1L, 3L, 5L);
}
+ @Test
+ @Category({ValidatesRunner.class, UsesPythonExpansionService.class})
+ public void pythonDependenciesTest() {
+ String html =
+ "<html><head><title>The Dormouse's story</title></head>\n"
+ + "<body>\n"
+ + "<p class=\"title\"><b>The Dormouse's story</b></p>\n"
+ + "\n"
+ + "<p class=\"story\">Once upon a time there were three little
sisters; and their names were\n"
+ + "<a href=\"http://example.com/elsie\" class=\"sister\"
id=\"link1\">Elsie</a>,\n"
+ + "<a href=\"http://example.com/lacie\" class=\"sister\"
id=\"link2\">Lacie</a> and\n"
+ + "<a href=\"http://example.com/tillie\" class=\"sister\"
id=\"link3\">Tillie</a>;\n"
+ + "and they lived at the bottom of a well.</p>\n"
+ + "\n"
+ + "<p class=\"story\">...</p>";
+ PCollection<String> col =
+ testPipeline
+ .apply(Create.of(html))
+ .apply(External.of(TEST_PYTHON_BS4_URN, new byte[] {},
expansionAddr));
+ PAssert.that(col).containsInAnyOrder("The Dormouse's story");
+ }
+
private byte[] toStringPayloadBytes(String data) throws IOException {
Row configRow =
Row.withSchema(Schema.of(Field.of("data", FieldType.STRING)))
diff --git a/runners/direct-java/build.gradle b/runners/direct-java/build.gradle
index eada181..89564cd 100644
--- a/runners/direct-java/build.gradle
+++ b/runners/direct-java/build.gradle
@@ -130,6 +130,7 @@ task needsRunnerTests(type: Test) {
// MetricsPusher isn't implemented in direct runner
excludeCategories "org.apache.beam.sdk.testing.UsesMetricsPusher"
excludeCategories "org.apache.beam.sdk.testing.UsesCrossLanguageTransforms"
+ excludeCategories "org.apache.beam.sdk.testing.UsesPythonExpansionService"
excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
}
testLogging {
@@ -159,6 +160,7 @@ task validatesRunner(type: Test) {
excludeCategories "org.apache.beam.sdk.testing.LargeKeys\$Above100MB"
excludeCategories 'org.apache.beam.sdk.testing.UsesMetricsPusher'
excludeCategories "org.apache.beam.sdk.testing.UsesCrossLanguageTransforms"
+ excludeCategories "org.apache.beam.sdk.testing.UsesPythonExpansionService"
}
}
diff --git a/runners/flink/job-server/flink_job_server.gradle
b/runners/flink/job-server/flink_job_server.gradle
index 3c83ce9..c12f84f 100644
--- a/runners/flink/job-server/flink_job_server.gradle
+++ b/runners/flink/job-server/flink_job_server.gradle
@@ -157,6 +157,7 @@ def portableValidatesRunnerTask(String name, Boolean
streaming, Boolean checkpoi
excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above10MB'
excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
excludeCategories
'org.apache.beam.sdk.testing.UsesCrossLanguageTransforms'
+ excludeCategories
'org.apache.beam.sdk.testing.UsesPythonExpansionService'
excludeCategories 'org.apache.beam.sdk.testing.UsesCustomWindowMerging'
excludeCategories 'org.apache.beam.sdk.testing.UsesFailureMessage'
excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics'
diff --git a/runners/google-cloud-dataflow-java/build.gradle
b/runners/google-cloud-dataflow-java/build.gradle
index d8fe8ff..9cc104c 100644
--- a/runners/google-cloud-dataflow-java/build.gradle
+++ b/runners/google-cloud-dataflow-java/build.gradle
@@ -152,6 +152,7 @@ def commonLegacyExcludeCategories = [
'org.apache.beam.sdk.testing.LargeKeys$Above10MB',
'org.apache.beam.sdk.testing.UsesAttemptedMetrics',
'org.apache.beam.sdk.testing.UsesCrossLanguageTransforms',
+ 'org.apache.beam.sdk.testing.UsesPythonExpansionService',
'org.apache.beam.sdk.testing.UsesDistributionMetrics',
'org.apache.beam.sdk.testing.UsesGaugeMetrics',
'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs',
diff --git a/runners/spark/build.gradle b/runners/spark/build.gradle
index 0ed83db..d2f171f 100644
--- a/runners/spark/build.gradle
+++ b/runners/spark/build.gradle
@@ -185,6 +185,7 @@ task validatesRunnerBatch(type: Test) {
excludeCategories
'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
// Portability
excludeCategories 'org.apache.beam.sdk.testing.UsesCrossLanguageTransforms'
+ excludeCategories 'org.apache.beam.sdk.testing.UsesPythonExpansionService'
excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
}
jvmArgs '-Xmx3g'
@@ -255,6 +256,7 @@ task validatesStructuredStreamingRunnerBatch(type: Test) {
excludeCategories
'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
// Portability
excludeCategories 'org.apache.beam.sdk.testing.UsesCrossLanguageTransforms'
+ excludeCategories 'org.apache.beam.sdk.testing.UsesPythonExpansionService'
excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
}
filter {
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesPythonExpansionService.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesPythonExpansionService.java
new file mode 100644
index 0000000..b92742e
--- /dev/null
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesPythonExpansionService.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import org.apache.beam.sdk.annotations.Internal;
+
+/**
+ * Category tag for tests which use the expansion service in Python. Tests
tagged with {@link
+ * UsesPythonExpansionService} should be run for runners which support
cross-language transforms.
+ */
+@Internal
+public interface UsesPythonExpansionService {}
diff --git a/sdks/python/apache_beam/runners/portability/artifact_service.py
b/sdks/python/apache_beam/runners/portability/artifact_service.py
index 18537f4..d0c8f8e8 100644
--- a/sdks/python/apache_beam/runners/portability/artifact_service.py
+++ b/sdks/python/apache_beam/runners/portability/artifact_service.py
@@ -271,7 +271,8 @@ class BeamFilesystemHandler(object):
def file_writer(self, name=None):
full_path = filesystems.FileSystems.join(self._root, name)
- return filesystems.FileSystems.create(full_path), full_path
+ return filesystems.FileSystems.create(
+ full_path, compression_type=CompressionTypes.UNCOMPRESSED), full_path
def resolve_artifacts(artifacts, service, dest_dir):
diff --git
a/sdks/python/apache_beam/runners/portability/expansion_service_test.py
b/sdks/python/apache_beam/runners/portability/expansion_service_test.py
index bd8f72e..11ade9e 100644
--- a/sdks/python/apache_beam/runners/portability/expansion_service_test.py
+++ b/sdks/python/apache_beam/runners/portability/expansion_service_test.py
@@ -31,10 +31,13 @@ import apache_beam as beam
import apache_beam.transforms.combiners as combine
from apache_beam.coders import RowCoder
from apache_beam.pipeline import PipelineOptions
+from apache_beam.portability.api import beam_artifact_api_pb2_grpc
from apache_beam.portability.api import beam_expansion_api_pb2_grpc
from apache_beam.portability.api.external_transforms_pb2 import
ExternalConfigurationPayload
+from apache_beam.runners.portability import artifact_service
from apache_beam.runners.portability import expansion_service
from apache_beam.transforms import ptransform
+from apache_beam.transforms.environments import PyPIArtifactRegistry
from apache_beam.transforms.external import ImplicitSchemaPayloadBuilder
from apache_beam.utils import thread_pool_executor
@@ -51,6 +54,7 @@ TEST_COMGL_URN = "beam:transforms:xlang:test:comgl"
TEST_COMPK_URN = "beam:transforms:xlang:test:compk"
TEST_FLATTEN_URN = "beam:transforms:xlang:test:flatten"
TEST_PARTITION_URN = "beam:transforms:xlang:test:partition"
+TEST_PYTHON_BS4_URN = "beam:transforms:xlang:test:python_bs4"
@ptransform.PTransform.register_urn('beam:transforms:xlang:count', None)
@@ -226,6 +230,27 @@ class PartitionTransform(ptransform.PTransform):
return PartitionTransform()
+class ExtractHtmlTitleDoFn(beam.DoFn):
+ def process(self, element):
+ from bs4 import BeautifulSoup
+ soup = BeautifulSoup(element, 'html.parser')
+ return [soup.title.string]
+
+
[email protected]_urn(TEST_PYTHON_BS4_URN, None)
+class ExtractHtmlTitleTransform(ptransform.PTransform):
+ def expand(self, pcoll):
+ return pcoll |
beam.ParDo(ExtractHtmlTitleDoFn()).with_output_types(unicode)
+
+ def to_runner_api_parameter(self, unused_context):
+ return TEST_PYTHON_BS4_URN, None
+
+ @staticmethod
+ def from_runner_api_parameter(
+ unused_ptransform, unused_parameter, unused_context):
+ return ExtractHtmlTitleTransform()
+
+
@ptransform.PTransform.register_urn('payload', bytes)
class PayloadTransform(ptransform.PTransform):
def __init__(self, payload):
@@ -287,6 +312,7 @@ def cleanup(unused_signum, unused_frame):
def main(unused_argv):
+ PyPIArtifactRegistry.register_artifact('beautifulsoup4', '>=4.9,<5.0')
parser = argparse.ArgumentParser()
parser.add_argument(
'-p', '--port', type=int, help='port on which to serve the job api')
@@ -298,6 +324,10 @@ def main(unused_argv):
PipelineOptions(
["--experiments", "beam_fn_api", "--sdk_location",
"container"])),
server)
+ beam_artifact_api_pb2_grpc.add_ArtifactRetrievalServiceServicer_to_server(
+ artifact_service.ArtifactRetrievalService(
+ artifact_service.BeamFilesystemHandler(None).file_reader),
+ server)
server.add_insecure_port('localhost:{}'.format(options.port))
server.start()
_LOGGER.info('Listening for expansion requests at %d', options.port)