[
https://issues.apache.org/jira/browse/BEAM-14471?focusedWorklogId=773087&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-773087
]
ASF GitHub Bot logged work on BEAM-14471:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 21/May/22 01:53
Start Date: 21/May/22 01:53
Worklog Time Spent: 10m
Work Description: ihji commented on code in PR #17674:
URL: https://github.com/apache/beam/pull/17674#discussion_r878624245
##########
examples/java/src/main/java/org/apache/beam/examples/multilang/PythonDataframeWordCount.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.multilang;
+
+import org.apache.beam.examples.common.ExampleUtils;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.python.PythonExternalTransform;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation.Required;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.util.PythonCallableSource;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * An example that counts words in Shakespeare and utilizes a Python external
transform.
+ *
+ * <p>This class, {@link PythonDataframeWordCount}, uses Python
DataframeTransform to count words
+ * from the input text file. The Python expansion service provided by
--expansionService must allow
+ * the expansion of apache_beam.dataframe.transforms.DataframeTransform (which
can be done by
+ * passing --fully_qualified_name_glob commandline option when launching the
expansion service).
+ *
+ * <p>Note that, for using Dataflow Runner, you should specify the following
two additional
+ * arguments:
+ *
+ * <pre>{@code
+ * --experiments=use_runner_v2
+ *
--sdkHarnessContainerImageOverrides=.*python.*,gcr.io/apache-beam-testing/beam-sdk/beam_python3.8_sdk:latest
+ * }</pre>
+ */
+public class PythonDataframeWordCount {
+
+ // Extract the words and create the rows for counting.
+ static class ExtractWordsFn extends DoFn<String, Row> {
+ public static final Schema SCHEMA =
+ Schema.of(
+ Schema.Field.of("word", Schema.FieldType.STRING),
+ Schema.Field.of("count", Schema.FieldType.INT32));
+ private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class,
"emptyLines");
+ private final Distribution lineLenDist =
+ Metrics.distribution(ExtractWordsFn.class, "lineLenDistro");
+
+ @ProcessElement
+ public void processElement(@Element String element, OutputReceiver<Row>
receiver) {
+ lineLenDist.update(element.length());
+ if (element.trim().isEmpty()) {
+ emptyLines.inc();
+ }
+
+ // Split the line into words.
+ String[] words = element.split(ExampleUtils.TOKENIZER_PATTERN, -1);
+
+ // Output each word encountered into the output PCollection.
+ for (String word : words) {
+ if (!word.isEmpty()) {
+ receiver.output(
+ Row.withSchema(SCHEMA)
+ .withFieldValue("word", word)
+ .withFieldValue("count", 1)
+ .build());
+ }
+ }
+ }
+ }
+
+ /** A SimpleFunction that converts a counted row into a printable string. */
+ public static class FormatAsTextFn extends SimpleFunction<Row, String> {
+ @Override
+ public String apply(Row input) {
+ return input.getString("word") + ": " + input.getInt32("count");
+ }
+ }
+
+ /** Options supported by {@link PythonDataframeWordCount}. */
+ public interface WordCountOptions extends PipelineOptions {
+
+ /**
+ * By default, this example reads from a public dataset containing the
text of King Lear. Set
+ * this option to choose a different input file or glob.
+ */
+ @Description("Path of the file to read from")
+ @Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt")
+ String getInputFile();
+
+ void setInputFile(String value);
+
+ /** Set this required option to specify where to write the output. */
+ @Description("Path of the file to write to")
+ @Required
+ String getOutput();
+
+ void setOutput(String value);
+
+ /** Set this required option to specify Python expansion service URL. */
+ @Description("URL of Python expansion service")
+ @Required
Review Comment:
It's now optional but users need to specify it until we release a new SDK.
Issue Time Tracking
-------------------
Worklog Id: (was: 773087)
Time Spent: 5h 20m (was: 5h 10m)
> Adding testcases and examples for xlang Python DataframeTransform
> ------------------------------------------------------------------
>
> Key: BEAM-14471
> URL: https://issues.apache.org/jira/browse/BEAM-14471
> Project: Beam
> Issue Type: Improvement
> Components: cross-language, testing
> Reporter: Heejong Lee
> Assignee: Heejong Lee
> Priority: P2
> Time Spent: 5h 20m
> Remaining Estimate: 0h
>
> Adding testcases and examples for xlang Python DataframeTransform
--
This message was sent by Atlassian Jira
(v8.20.7#820007)