[
https://issues.apache.org/jira/browse/FLINK-24082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-24082:
-----------------------------------
Labels: pull-request-available (was: )
> Python UDTF throws exception when the result type is generator of Row
> ---------------------------------------------------------------------
>
> Key: FLINK-24082
> URL: https://issues.apache.org/jira/browse/FLINK-24082
> Project: Flink
> Issue Type: Bug
> Components: API / Python
> Affects Versions: 1.13.0
> Reporter: Dian Fu
> Assignee: Dian Fu
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.13.3
>
>
> For job:
> {code}
> ################################################################################
> # Licensed to the Apache Software Foundation (ASF) under one
> # or more contributor license agreements. See the NOTICE file
> # distributed with this work for additional information
> # regarding copyright ownership. The ASF licenses this file
> # to you under the Apache License, Version 2.0 (the
> # "License"); you may not use this file except in compliance
> # with the License. You may obtain a copy of the License at
> #
> # http://www.apache.org/licenses/LICENSE-2.0
> #
> # Unless required by applicable law or agreed to in writing, software
> # distributed under the License is distributed on an "AS IS" BASIS,
> # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> # See the License for the specific language governing permissions and
> # limitations under the License.
> ################################################################################
> import argparse
> import logging
> import sys
> from pyflink.common import Row
> from pyflink.table import (EnvironmentSettings, TableEnvironment, DataTypes)
> from pyflink.table.expressions import lit, col
> from pyflink.table.udf import udtf
> word_count_data = ["To be, or not to be,--that is the question:--",
> "Whether 'tis nobler in the mind to suffer",
> "The slings and arrows of outrageous fortune",
> "Or to take arms against a sea of troubles,",
> "And by opposing end them?--To die,--to sleep,--",
> "No more; and by a sleep to say we end",
> "The heartache, and the thousand natural shocks",
> "That flesh is heir to,--'tis a consummation",
> "Devoutly to be wish'd. To die,--to sleep;--",
> "To sleep! perchance to dream:--ay, there's the rub;",
> "For in that sleep of death what dreams may come,",
> "When we have shuffled off this mortal coil,",
> "Must give us pause: there's the respect",
> "That makes calamity of so long life;",
> "For who would bear the whips and scorns of time,",
> "The oppressor's wrong, the proud man's contumely,",
> "The pangs of despis'd love, the law's delay,",
> "The insolence of office, and the spurns",
> "That patient merit of the unworthy takes,",
> "When he himself might his quietus make",
> "With a bare bodkin? who would these fardels bear,",
> "To grunt and sweat under a weary life,",
> "But that the dread of something after death,--",
> "The undiscover'd country, from whose bourn",
> "No traveller returns,--puzzles the will,",
> "And makes us rather bear those ills we have",
> "Than fly to others that we know not of?",
> "Thus conscience does make cowards of us all;",
> "And thus the native hue of resolution",
> "Is sicklied o'er with the pale cast of thought;",
> "And enterprises of great pith and moment,",
> "With this regard, their currents turn awry,",
> "And lose the name of action.--Soft you now!",
> "The fair Ophelia!--Nymph, in thy orisons",
> "Be all my sins remember'd."]
> def word_count(input_path, output_path):
> t_env =
> TableEnvironment.create(EnvironmentSettings.new_instance().in_streaming_mode().build())
> # write all the data to one file
> t_env.get_config().get_configuration().set_string("parallelism.default",
> "1")
> # define the source
> if input_path is not None:
> t_env.execute_sql("""
> CREATE TABLE source (
> word STRING
> ) WITH (
> 'connector' = 'filesystem',
> 'path' = {},
> 'format' = 'csv'
> )
> """.format(input_path))
> tab = t_env.from_path('source')
> else:
> print("Executing word_count example with default input data set.")
> print("Use --input to specify file input.")
> tab = t_env.from_elements(map(lambda i: (i,), word_count_data),
> DataTypes.ROW([DataTypes.FIELD('line',
> DataTypes.STRING())]))
> # define the sink
> if output_path is not None:
> t_env.execute_sql("""
> CREATE TABLE sink (
> word STRING,
> `count` BIGINT
> ) WITH (
> 'connector' = 'filesystem',
> 'path' = {},
> 'format' = 'canal-json'
> )
> """.format(output_path))
> else:
> print("Printing result to stdout. Use --output to specify output
> path.")
> t_env.execute_sql("""
> CREATE TABLE sink (
> word STRING,
> `count` BIGINT
> ) WITH (
> 'connector' = 'print'
> )
> """)
> @udtf(result_types=[DataTypes.STRING()])
> def split(line: Row):
> for s in line[0].split():
> yield Row(s)
> # compute word count
> tab.flat_map(split).alias('word') \
> .group_by(col('word')) \
> .select(col('word'), lit(1).count) \
> .execute_insert('sink') \
> .wait()
> # remove .wait if submitting to a remote cluster, refer to
> #
> https://ci.apache.org/projects/flink/flink-docs-stable/docs/dev/python/faq/#wait-for-jobs-to-finish-when-executing-jobs-in-mini-cluster
> # for more details
> if __name__ == '__main__':
> logging.basicConfig(stream=sys.stdout, level=logging.INFO,
> format="%(message)s")
> parser = argparse.ArgumentParser()
> parser.add_argument(
> '--input',
> dest='input',
> required=False,
> help='Input file to process.')
> parser.add_argument(
> '--output',
> dest='output',
> required=False,
> help='Output file to write results to.')
> argv = sys.argv[1:]
> known_args, _ = parser.parse_known_args(argv)
> word_count(known_args.input, known_args.output)
> {code}
> It will throw the following exception:
> {code}
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.RuntimeException: Error received from SDK harness for instruction
> 1: Traceback (most recent call last):
> File
> "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 289, in _execute
> response = task()
> File
> "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 362, in <lambda>
> lambda: self.create_worker().do_instruction(request), request)
> File
> "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 607, in do_instruction
> getattr(request, request_type), request.instruction_id)
> File
> "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 644, in process_bundle
> bundle_processor.process_bundle(instruction_id))
> File
> "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 1000, in process_bundle
> element.data)
> File
> "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 228, in process_encoded
> self.output(decoded_value)
> File "apache_beam/runners/worker/operations.py", line 357, in
> apache_beam.runners.worker.operations.Operation.output
> File "apache_beam/runners/worker/operations.py", line 359, in
> apache_beam.runners.worker.operations.Operation.output
> File "apache_beam/runners/worker/operations.py", line 221, in
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
> File "apache_beam/runners/worker/operations.py", line 319, in
> apache_beam.runners.worker.operations.Operation.process
> File
> "/Users/dianfu/code/src/apache/flink/flink-python/pyflink/fn_execution/beam/beam_operations_slow.py",
> line 75, in process
> self.process_element(value), output_stream, True)
> File
> "/Users/dianfu/code/src/apache/flink/flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
> line 708, in encode_to_stream
> self._value_coder.encode_to_stream(value, out, nested)
> File
> "/Users/dianfu/code/src/apache/flink/flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
> line 226, in encode_to_stream
> self._flatten_row_coder.encode_to_stream(value, out_stream, nested)
> File
> "/Users/dianfu/code/src/apache/flink/flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
> line 96, in encode_to_stream
> self._encode_one_row(value, out_stream, nested)
> File
> "/Users/dianfu/code/src/apache/flink/flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
> line 120, in _encode_one_row
> field_coders[i].encode_to_stream(item, data_out_stream, nested)
> File
> "/Users/dianfu/code/src/apache/flink/flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
> line 535, in encode_to_stream
> bytes_value = value.encode("utf-8")
> File
> "/Users/dianfu/code/src/apache/flink/flink-python/pyflink/common/types.py",
> line 196, in __getattr__
> idx = self._fields.index(item)
> File
> "/Users/dianfu/code/src/apache/flink/flink-python/pyflink/common/types.py",
> line 192, in __getattr__
> raise AttributeError(item)
> AttributeError: _fields
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
> at
> org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:504)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:555)
> at
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:381)
> ... 30 more
> Caused by: java.lang.RuntimeException: Error received from SDK harness for
> instruction 1: Traceback (most recent call last):
> File
> "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 289, in _execute
> response = task()
> File
> "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 362, in <lambda>
> lambda: self.create_worker().do_instruction(request), request)
> File
> "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 607, in do_instruction
> getattr(request, request_type), request.instruction_id)
> File
> "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 644, in process_bundle
> bundle_processor.process_bundle(instruction_id))
> File
> "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 1000, in process_bundle
> element.data)
> File
> "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 228, in process_encoded
> self.output(decoded_value)
> File "apache_beam/runners/worker/operations.py", line 357, in
> apache_beam.runners.worker.operations.Operation.output
> File "apache_beam/runners/worker/operations.py", line 359, in
> apache_beam.runners.worker.operations.Operation.output
> File "apache_beam/runners/worker/operations.py", line 221, in
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
> File "apache_beam/runners/worker/operations.py", line 319, in
> apache_beam.runners.worker.operations.Operation.process
> File
> "/Users/dianfu/code/src/apache/flink/flink-python/pyflink/fn_execution/beam/beam_operations_slow.py",
> line 75, in process
> self.process_element(value), output_stream, True)
> File
> "/Users/dianfu/code/src/apache/flink/flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
> line 708, in encode_to_stream
> self._value_coder.encode_to_stream(value, out, nested)
> File
> "/Users/dianfu/code/src/apache/flink/flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
> line 226, in encode_to_stream
> self._flatten_row_coder.encode_to_stream(value, out_stream, nested)
> File
> "/Users/dianfu/code/src/apache/flink/flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
> line 96, in encode_to_stream
> self._encode_one_row(value, out_stream, nested)
> File
> "/Users/dianfu/code/src/apache/flink/flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
> line 120, in _encode_one_row
> field_coders[i].encode_to_stream(item, data_out_stream, nested)
> File
> "/Users/dianfu/code/src/apache/flink/flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
> line 535, in encode_to_stream
> bytes_value = value.encode("utf-8")
> File
> "/Users/dianfu/code/src/apache/flink/flink-python/pyflink/common/types.py",
> line 196, in __getattr__
> idx = self._fields.index(item)
> File
> "/Users/dianfu/code/src/apache/flink/flink-python/pyflink/common/types.py",
> line 192, in __getattr__
> raise AttributeError(item)
> AttributeError: _fields
> Process finished with exit code 1
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)