[
https://issues.apache.org/jira/browse/BEAM-14471?focusedWorklogId=772072&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-772072
]
ASF GitHub Bot logged work on BEAM-14471:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 18/May/22 18:36
Start Date: 18/May/22 18:36
Worklog Time Spent: 10m
Work Description: chamikaramj commented on code in PR #17674:
URL: https://github.com/apache/beam/pull/17674#discussion_r876198486
##########
buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy:
##########
@@ -2283,6 +2275,7 @@ class BeamModulePlugin implements Plugin<Project> {
"java_expansion_service_jar": expansionJar,
"java_port": javaPort,
"java_expansion_service_allowlist_file": javaClassLookupAllowlistFile,
+ "python_expansion_service_fqn_glob": "\\*",
Review Comment:
"python_expansion_service_allowlist" ?
##########
sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/DataframeTransformTest.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.python;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.UsesPythonExpansionService;
+import org.apache.beam.sdk.testing.ValidatesRunner;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.util.PythonCallableSource;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ConnectivityState;
+import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ManagedChannel;
+import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ManagedChannelBuilder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class DataframeTransformTest {
+ @Rule public transient TestPipeline testPipeline = TestPipeline.create();
+ private PipelineResult pipelineResult;
+ private static String expansionAddr;
+
+ @BeforeClass
+ public static void setUpClass() {
Review Comment:
Can these setup/teardown methods be moved to a common class instead of
repeating in all Java x-lang tests ?
##########
examples/java/src/main/java/org/apache/beam/examples/multilang/PythonDataframeWordCount.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
Review Comment:
Can we add this to "examples/multi-language" instead ?
https://github.com/apache/beam/tree/master/examples/multi-language
##########
buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy:
##########
@@ -351,15 +351,6 @@ class BeamModulePlugin implements Plugin<Project> {
Integer numParallelTests = 1
// Whether the pipeline needs --sdk_location option
boolean needsSdkLocation = false
- // Categories for Java tests to run.
- Closure javaTestCategories = {
- includeCategories
'org.apache.beam.sdk.testing.UsesCrossLanguageTransforms'
- // Use the following to include / exclude categories:
Review Comment:
Should this comment be removed ?
##########
examples/java/src/main/java/org/apache/beam/examples/multilang/PythonDataframeWordCount.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.multilang;
+
+import org.apache.beam.examples.common.ExampleUtils;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.python.PythonExternalTransform;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation.Required;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.util.PythonCallableSource;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * An example that counts words in Shakespeare and utilizes a Python external
transform.
+ *
+ * <p>This class, {@link PythonDataframeWordCount}, uses Python
DataframeTransform to count words
+ * from the input text file. The Python expansion service provided by
--expansionService must allow
+ * the expansion of apache_beam.dataframe.transforms.DataframeTransform (which
can be done by
+ * passing --fully_qualified_name_glob commandline option when launching the
expansion service).
+ *
+ * <p>Note that, for using Dataflow Runner, you should specify the following
two additional
+ * arguments:
+ *
+ * <pre>{@code
+ * --experiments=use_runner_v2
+ *
--sdkHarnessContainerImageOverrides=.*python.*,gcr.io/apache-beam-testing/beam-sdk/beam_python3.8_sdk:latest
Review Comment:
In the following examples, I simplified by providing instructions for
running with the latest released Beam version. Can we do the same here ? Users
should be able to just run this example without additional setup (no need to
startup an expansion service or push containers for example).
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/kafkataxi/README.md
https://github.com/apache/beam/tree/master/examples/multi-language
##########
sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/DataframeTransformTest.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.python;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.UsesPythonExpansionService;
+import org.apache.beam.sdk.testing.ValidatesRunner;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.util.PythonCallableSource;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ConnectivityState;
+import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ManagedChannel;
+import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ManagedChannelBuilder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class DataframeTransformTest {
+ @Rule public transient TestPipeline testPipeline = TestPipeline.create();
+ private PipelineResult pipelineResult;
+ private static String expansionAddr;
+
+ @BeforeClass
+ public static void setUpClass() {
+ expansionAddr =
+ String.format("localhost:%s",
Integer.valueOf(System.getProperty("expansionPort")));
+ }
+
+ @Before
+ public void setUp() {
+ waitForReady();
+ }
+
+ @After
+ public void tearDown() {
+ pipelineResult = testPipeline.run();
+ pipelineResult.waitUntilFinish();
+ assertThat(pipelineResult.getState(), equalTo(PipelineResult.State.DONE));
+ }
+
+ private void waitForReady() {
+ try {
+ ManagedChannel channel =
ManagedChannelBuilder.forTarget(expansionAddr).build();
+ ConnectivityState state = channel.getState(true);
+ for (int retry = 0; retry < 30 && state != ConnectivityState.READY;
retry++) {
+ Thread.sleep(500);
+ state = channel.getState(true);
+ }
+ channel.shutdownNow();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("interrupted.");
+ }
+ }
+
+ @Test
+ @Category({ValidatesRunner.class, UsesPythonExpansionService.class})
+ public void testDataframeSum() {
+ Schema schema =
+ Schema.of(
+ Schema.Field.of("a", Schema.FieldType.INT64),
+ Schema.Field.of("b", Schema.FieldType.INT32));
+ Row foo1 = Row.withSchema(schema).withFieldValue("a",
100L).withFieldValue("b", 1).build();
+ Row foo2 = Row.withSchema(schema).withFieldValue("a",
100L).withFieldValue("b", 2).build();
+ Row foo3 = Row.withSchema(schema).withFieldValue("a",
100L).withFieldValue("b", 3).build();
+ Row bar4 = Row.withSchema(schema).withFieldValue("a",
200L).withFieldValue("b", 4).build();
+ PCollection<Row> col =
+ testPipeline
+ .apply(Create.of(foo1, foo2, bar4))
+ .setRowSchema(schema)
+ .apply(
+ PythonExternalTransform.<PCollection<Row>,
PCollection<Row>>from(
Review Comment:
I think we should add a Java wrapper to simplify this for users (even though
that might end up being a small class).
##########
examples/java/src/main/java/org/apache/beam/examples/multilang/PythonDataframeWordCount.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.multilang;
+
+import org.apache.beam.examples.common.ExampleUtils;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.python.PythonExternalTransform;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation.Required;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.util.PythonCallableSource;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * An example that counts words in Shakespeare and utilizes a Python external
transform.
+ *
+ * <p>This class, {@link PythonDataframeWordCount}, uses Python
DataframeTransform to count words
+ * from the input text file. The Python expansion service provided by
--expansionService must allow
+ * the expansion of apache_beam.dataframe.transforms.DataframeTransform (which
can be done by
+ * passing --fully_qualified_name_glob commandline option when launching the
expansion service).
+ *
+ * <p>Note that, for using Dataflow Runner, you should specify the following
two additional
+ * arguments:
+ *
+ * <pre>{@code
+ * --experiments=use_runner_v2
+ *
--sdkHarnessContainerImageOverrides=.*python.*,gcr.io/apache-beam-testing/beam-sdk/beam_python3.8_sdk:latest
+ * }</pre>
+ */
+public class PythonDataframeWordCount {
+
+ // Extract the words and create the rows for counting.
+ static class ExtractWordsFn extends DoFn<String, Row> {
+ public static final Schema SCHEMA =
+ Schema.of(
+ Schema.Field.of("word", Schema.FieldType.STRING),
+ Schema.Field.of("count", Schema.FieldType.INT32));
+ private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class,
"emptyLines");
+ private final Distribution lineLenDist =
+ Metrics.distribution(ExtractWordsFn.class, "lineLenDistro");
+
+ @ProcessElement
+ public void processElement(@Element String element, OutputReceiver<Row>
receiver) {
+ lineLenDist.update(element.length());
+ if (element.trim().isEmpty()) {
+ emptyLines.inc();
+ }
+
+ // Split the line into words.
+ String[] words = element.split(ExampleUtils.TOKENIZER_PATTERN, -1);
+
+ // Output each word encountered into the output PCollection.
+ for (String word : words) {
+ if (!word.isEmpty()) {
+ receiver.output(
+ Row.withSchema(SCHEMA)
+ .withFieldValue("word", word)
+ .withFieldValue("count", 1)
+ .build());
+ }
+ }
+ }
+ }
+
+ /** A SimpleFunction that converts a counted row into a printable string. */
+ public static class FormatAsTextFn extends SimpleFunction<Row, String> {
+ @Override
+ public String apply(Row input) {
+ return input.getString("word") + ": " + input.getInt32("count");
+ }
+ }
+
+ /** Options supported by {@link PythonDataframeWordCount}. */
+ public interface WordCountOptions extends PipelineOptions {
+
+ /**
+ * By default, this example reads from a public dataset containing the
text of King Lear. Set
+ * this option to choose a different input file or glob.
+ */
+ @Description("Path of the file to read from")
+ @Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt")
+ String getInputFile();
+
+ void setInputFile(String value);
+
+ /** Set this required option to specify where to write the output. */
+ @Description("Path of the file to write to")
+ @Required
+ String getOutput();
+
+ void setOutput(String value);
+
+ /** Set this required option to specify Python expansion service URL. */
+ @Description("URL of Python expansion service")
+ @Required
Review Comment:
Please make this option so that the example can just worth with the default
expansion service.
##########
runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ValidateRunnerXlangTest.java:
##########
@@ -287,7 +287,11 @@ protected void pythonDependenciesTest(Pipeline pipeline) {
@RunWith(JUnit4.class)
public static class SingleInputOutputTest extends
ValidateRunnerXlangTestBase {
@Test
- @Category({ValidatesRunner.class, UsesCrossLanguageTransforms.class})
+ @Category({
+ ValidatesRunner.class,
Review Comment:
Is there value in running both Java and Python pipelines with Java and
Python expansion services ? I wonder if we can simplify (and hopefully reduce
flakiness) by running Java tests only with Python expansion service and vice
versa.
Issue Time Tracking
-------------------
Worklog Id: (was: 772072)
Time Spent: 4h 40m (was: 4.5h)
> Adding testcases and examples for xlang Python DataframeTransform
> ------------------------------------------------------------------
>
> Key: BEAM-14471
> URL: https://issues.apache.org/jira/browse/BEAM-14471
> Project: Beam
> Issue Type: Improvement
> Components: cross-language, testing
> Reporter: Heejong Lee
> Assignee: Heejong Lee
> Priority: P2
> Time Spent: 4h 40m
> Remaining Estimate: 0h
>
> Adding testcases and examples for xlang Python DataframeTransform
--
This message was sent by Atlassian Jira
(v8.20.7#820007)