[ 
https://issues.apache.org/jira/browse/BEAM-14471?focusedWorklogId=772072&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-772072
 ]

ASF GitHub Bot logged work on BEAM-14471:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 18/May/22 18:36
            Start Date: 18/May/22 18:36
    Worklog Time Spent: 10m 
      Work Description: chamikaramj commented on code in PR #17674:
URL: https://github.com/apache/beam/pull/17674#discussion_r876198486


##########
buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy:
##########
@@ -2283,6 +2275,7 @@ class BeamModulePlugin implements Plugin<Project> {
         "java_expansion_service_jar": expansionJar,
         "java_port": javaPort,
         "java_expansion_service_allowlist_file": javaClassLookupAllowlistFile,
+        "python_expansion_service_fqn_glob": "\\*",

Review Comment:
   "python_expansion_service_allowlist" ?



##########
sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/DataframeTransformTest.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.python;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.UsesPythonExpansionService;
+import org.apache.beam.sdk.testing.ValidatesRunner;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.util.PythonCallableSource;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ConnectivityState;
+import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ManagedChannel;
+import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ManagedChannelBuilder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class DataframeTransformTest {
+  @Rule public transient TestPipeline testPipeline = TestPipeline.create();
+  private PipelineResult pipelineResult;
+  private static String expansionAddr;
+
+  @BeforeClass
+  public static void setUpClass() {

Review Comment:
   Can these setup/teardown methods be moved to a common class instead of 
repeating in all Java x-lang tests ?



##########
examples/java/src/main/java/org/apache/beam/examples/multilang/PythonDataframeWordCount.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one

Review Comment:
   Can we add this to "examples/multi-language" instead ?
   https://github.com/apache/beam/tree/master/examples/multi-language



##########
buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy:
##########
@@ -351,15 +351,6 @@ class BeamModulePlugin implements Plugin<Project> {
     Integer numParallelTests = 1
     // Whether the pipeline needs --sdk_location option
     boolean needsSdkLocation = false
-    // Categories for Java tests to run.
-    Closure javaTestCategories = {
-      includeCategories 
'org.apache.beam.sdk.testing.UsesCrossLanguageTransforms'
-      // Use the following to include / exclude categories:

Review Comment:
   Should this comment be removed ?



##########
examples/java/src/main/java/org/apache/beam/examples/multilang/PythonDataframeWordCount.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.multilang;
+
+import org.apache.beam.examples.common.ExampleUtils;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.python.PythonExternalTransform;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation.Required;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.util.PythonCallableSource;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * An example that counts words in Shakespeare and utilizes a Python external 
transform.
+ *
+ * <p>This class, {@link PythonDataframeWordCount}, uses Python 
DataframeTransform to count words
+ * from the input text file. The Python expansion service provided by 
--expansionService must allow
+ * the expansion of apache_beam.dataframe.transforms.DataframeTransform (which 
can be done by
+ * passing --fully_qualified_name_glob commandline option when launching the 
expansion service).
+ *
+ * <p>Note that, for using Dataflow Runner, you should specify the following 
two additional
+ * arguments:
+ *
+ * <pre>{@code
+ * --experiments=use_runner_v2
+ * 
--sdkHarnessContainerImageOverrides=.*python.*,gcr.io/apache-beam-testing/beam-sdk/beam_python3.8_sdk:latest

Review Comment:
   In the following examples, I simplified by providing instructions for 
running with the latest released Beam version. Can we do the same here ? Users 
should be able to just run this example without additional setup (no need to 
startup an expansion service or push containers for example).
   
   
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/kafkataxi/README.md
   https://github.com/apache/beam/tree/master/examples/multi-language



##########
sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/DataframeTransformTest.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.python;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.UsesPythonExpansionService;
+import org.apache.beam.sdk.testing.ValidatesRunner;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.util.PythonCallableSource;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ConnectivityState;
+import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ManagedChannel;
+import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ManagedChannelBuilder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class DataframeTransformTest {
+  @Rule public transient TestPipeline testPipeline = TestPipeline.create();
+  private PipelineResult pipelineResult;
+  private static String expansionAddr;
+
+  @BeforeClass
+  public static void setUpClass() {
+    expansionAddr =
+        String.format("localhost:%s", 
Integer.valueOf(System.getProperty("expansionPort")));
+  }
+
+  @Before
+  public void setUp() {
+    waitForReady();
+  }
+
+  @After
+  public void tearDown() {
+    pipelineResult = testPipeline.run();
+    pipelineResult.waitUntilFinish();
+    assertThat(pipelineResult.getState(), equalTo(PipelineResult.State.DONE));
+  }
+
+  private void waitForReady() {
+    try {
+      ManagedChannel channel = 
ManagedChannelBuilder.forTarget(expansionAddr).build();
+      ConnectivityState state = channel.getState(true);
+      for (int retry = 0; retry < 30 && state != ConnectivityState.READY; 
retry++) {
+        Thread.sleep(500);
+        state = channel.getState(true);
+      }
+      channel.shutdownNow();
+    } catch (InterruptedException e) {
+      throw new RuntimeException("interrupted.");
+    }
+  }
+
+  @Test
+  @Category({ValidatesRunner.class, UsesPythonExpansionService.class})
+  public void testDataframeSum() {
+    Schema schema =
+        Schema.of(
+            Schema.Field.of("a", Schema.FieldType.INT64),
+            Schema.Field.of("b", Schema.FieldType.INT32));
+    Row foo1 = Row.withSchema(schema).withFieldValue("a", 
100L).withFieldValue("b", 1).build();
+    Row foo2 = Row.withSchema(schema).withFieldValue("a", 
100L).withFieldValue("b", 2).build();
+    Row foo3 = Row.withSchema(schema).withFieldValue("a", 
100L).withFieldValue("b", 3).build();
+    Row bar4 = Row.withSchema(schema).withFieldValue("a", 
200L).withFieldValue("b", 4).build();
+    PCollection<Row> col =
+        testPipeline
+            .apply(Create.of(foo1, foo2, bar4))
+            .setRowSchema(schema)
+            .apply(
+                PythonExternalTransform.<PCollection<Row>, 
PCollection<Row>>from(

Review Comment:
   I think we should add a Java wrapper to simplify this for users (even though 
that might end up being a small class). 



##########
examples/java/src/main/java/org/apache/beam/examples/multilang/PythonDataframeWordCount.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.multilang;
+
+import org.apache.beam.examples.common.ExampleUtils;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.python.PythonExternalTransform;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation.Required;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.util.PythonCallableSource;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * An example that counts words in Shakespeare and utilizes a Python external 
transform.
+ *
+ * <p>This class, {@link PythonDataframeWordCount}, uses Python 
DataframeTransform to count words
+ * from the input text file. The Python expansion service provided by 
--expansionService must allow
+ * the expansion of apache_beam.dataframe.transforms.DataframeTransform (which 
can be done by
+ * passing --fully_qualified_name_glob commandline option when launching the 
expansion service).
+ *
+ * <p>Note that, for using Dataflow Runner, you should specify the following 
two additional
+ * arguments:
+ *
+ * <pre>{@code
+ * --experiments=use_runner_v2
+ * 
--sdkHarnessContainerImageOverrides=.*python.*,gcr.io/apache-beam-testing/beam-sdk/beam_python3.8_sdk:latest
+ * }</pre>
+ */
+public class PythonDataframeWordCount {
+
+  // Extract the words and create the rows for counting.
+  static class ExtractWordsFn extends DoFn<String, Row> {
+    public static final Schema SCHEMA =
+        Schema.of(
+            Schema.Field.of("word", Schema.FieldType.STRING),
+            Schema.Field.of("count", Schema.FieldType.INT32));
+    private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, 
"emptyLines");
+    private final Distribution lineLenDist =
+        Metrics.distribution(ExtractWordsFn.class, "lineLenDistro");
+
+    @ProcessElement
+    public void processElement(@Element String element, OutputReceiver<Row> 
receiver) {
+      lineLenDist.update(element.length());
+      if (element.trim().isEmpty()) {
+        emptyLines.inc();
+      }
+
+      // Split the line into words.
+      String[] words = element.split(ExampleUtils.TOKENIZER_PATTERN, -1);
+
+      // Output each word encountered into the output PCollection.
+      for (String word : words) {
+        if (!word.isEmpty()) {
+          receiver.output(
+              Row.withSchema(SCHEMA)
+                  .withFieldValue("word", word)
+                  .withFieldValue("count", 1)
+                  .build());
+        }
+      }
+    }
+  }
+
+  /** A SimpleFunction that converts a counted row into a printable string. */
+  public static class FormatAsTextFn extends SimpleFunction<Row, String> {
+    @Override
+    public String apply(Row input) {
+      return input.getString("word") + ": " + input.getInt32("count");
+    }
+  }
+
+  /** Options supported by {@link PythonDataframeWordCount}. */
+  public interface WordCountOptions extends PipelineOptions {
+
+    /**
+     * By default, this example reads from a public dataset containing the 
text of King Lear. Set
+     * this option to choose a different input file or glob.
+     */
+    @Description("Path of the file to read from")
+    @Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt")
+    String getInputFile();
+
+    void setInputFile(String value);
+
+    /** Set this required option to specify where to write the output. */
+    @Description("Path of the file to write to")
+    @Required
+    String getOutput();
+
+    void setOutput(String value);
+
+    /** Set this required option to specify Python expansion service URL. */
+    @Description("URL of Python expansion service")
+    @Required

Review Comment:
   Please make this option so that the example can just worth with the default 
expansion service.



##########
runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ValidateRunnerXlangTest.java:
##########
@@ -287,7 +287,11 @@ protected void pythonDependenciesTest(Pipeline pipeline) {
   @RunWith(JUnit4.class)
   public static class SingleInputOutputTest extends 
ValidateRunnerXlangTestBase {
     @Test
-    @Category({ValidatesRunner.class, UsesCrossLanguageTransforms.class})
+    @Category({
+      ValidatesRunner.class,

Review Comment:
   Is there value in running both Java and Python pipelines with Java and 
Python expansion services ? I wonder if we can simplify (and hopefully reduce 
flakiness) by running Java tests only with Python expansion service and vice 
versa.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 772072)
    Time Spent: 4h 40m  (was: 4.5h)

> Adding testcases and examples for xlang Python DataframeTransform 
> ------------------------------------------------------------------
>
>                 Key: BEAM-14471
>                 URL: https://issues.apache.org/jira/browse/BEAM-14471
>             Project: Beam
>          Issue Type: Improvement
>          Components: cross-language, testing
>            Reporter: Heejong Lee
>            Assignee: Heejong Lee
>            Priority: P2
>          Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> Adding testcases and examples for xlang Python DataframeTransform 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to