[ 
https://issues.apache.org/jira/browse/FLINK-24082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu closed FLINK-24082.
---------------------------
    Resolution: Fixed

Fixed in release-1.13 via 62f3c9c9dbacf27e297dd2a64788a4d26efccf27

> Python UDTF throws exception when the result type is generator of Row
> ---------------------------------------------------------------------
>
>                 Key: FLINK-24082
>                 URL: https://issues.apache.org/jira/browse/FLINK-24082
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>    Affects Versions: 1.13.0
>            Reporter: Dian Fu
>            Assignee: Dian Fu
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.13.3
>
>
> For job:
> {code}
> ################################################################################
> #  Licensed to the Apache Software Foundation (ASF) under one
> #  or more contributor license agreements.  See the NOTICE file
> #  distributed with this work for additional information
> #  regarding copyright ownership.  The ASF licenses this file
> #  to you under the Apache License, Version 2.0 (the
> #  "License"); you may not use this file except in compliance
> #  with the License.  You may obtain a copy of the License at
> #
> #      http://www.apache.org/licenses/LICENSE-2.0
> #
> #  Unless required by applicable law or agreed to in writing, software
> #  distributed under the License is distributed on an "AS IS" BASIS,
> #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> #  See the License for the specific language governing permissions and
> # limitations under the License.
> ################################################################################
> import argparse
> import logging
> import sys
> from pyflink.common import Row
> from pyflink.table import (EnvironmentSettings, TableEnvironment, DataTypes)
> from pyflink.table.expressions import lit, col
> from pyflink.table.udf import udtf
> word_count_data = ["To be, or not to be,--that is the question:--",
>                    "Whether 'tis nobler in the mind to suffer",
>                    "The slings and arrows of outrageous fortune",
>                    "Or to take arms against a sea of troubles,",
>                    "And by opposing end them?--To die,--to sleep,--",
>                    "No more; and by a sleep to say we end",
>                    "The heartache, and the thousand natural shocks",
>                    "That flesh is heir to,--'tis a consummation",
>                    "Devoutly to be wish'd. To die,--to sleep;--",
>                    "To sleep! perchance to dream:--ay, there's the rub;",
>                    "For in that sleep of death what dreams may come,",
>                    "When we have shuffled off this mortal coil,",
>                    "Must give us pause: there's the respect",
>                    "That makes calamity of so long life;",
>                    "For who would bear the whips and scorns of time,",
>                    "The oppressor's wrong, the proud man's contumely,",
>                    "The pangs of despis'd love, the law's delay,",
>                    "The insolence of office, and the spurns",
>                    "That patient merit of the unworthy takes,",
>                    "When he himself might his quietus make",
>                    "With a bare bodkin? who would these fardels bear,",
>                    "To grunt and sweat under a weary life,",
>                    "But that the dread of something after death,--",
>                    "The undiscover'd country, from whose bourn",
>                    "No traveller returns,--puzzles the will,",
>                    "And makes us rather bear those ills we have",
>                    "Than fly to others that we know not of?",
>                    "Thus conscience does make cowards of us all;",
>                    "And thus the native hue of resolution",
>                    "Is sicklied o'er with the pale cast of thought;",
>                    "And enterprises of great pith and moment,",
>                    "With this regard, their currents turn awry,",
>                    "And lose the name of action.--Soft you now!",
>                    "The fair Ophelia!--Nymph, in thy orisons",
>                    "Be all my sins remember'd."]
> def word_count(input_path, output_path):
>     t_env = 
> TableEnvironment.create(EnvironmentSettings.new_instance().in_streaming_mode().build())
>     # write all the data to one file
>     t_env.get_config().get_configuration().set_string("parallelism.default", 
> "1")
>     # define the source
>     if input_path is not None:
>         t_env.execute_sql("""
>                 CREATE TABLE source (
>                     word STRING
>                 ) WITH (
>                     'connector' = 'filesystem',
>                     'path' = {},
>                     'format' = 'csv'
>                 )
>             """.format(input_path))
>         tab = t_env.from_path('source')
>     else:
>         print("Executing word_count example with default input data set.")
>         print("Use --input to specify file input.")
>         tab = t_env.from_elements(map(lambda i: (i,), word_count_data),
>                                   DataTypes.ROW([DataTypes.FIELD('line', 
> DataTypes.STRING())]))
>     # define the sink
>     if output_path is not None:
>         t_env.execute_sql("""
>             CREATE TABLE sink (
>                 word STRING,
>                 `count` BIGINT
>             ) WITH (
>                 'connector' = 'filesystem',
>                 'path' = {},
>                 'format' = 'canal-json'
>             )
>         """.format(output_path))
>     else:
>         print("Printing result to stdout. Use --output to specify output 
> path.")
>         t_env.execute_sql("""
>                     CREATE TABLE sink (
>                         word STRING,
>                         `count` BIGINT
>                     ) WITH (
>                         'connector' = 'print'
>                     )
>                 """)
>     @udtf(result_types=[DataTypes.STRING()])
>     def split(line: Row):
>         for s in line[0].split():
>             yield Row(s)
>     # compute word count
>     tab.flat_map(split).alias('word') \
>        .group_by(col('word')) \
>        .select(col('word'), lit(1).count) \
>        .execute_insert('sink') \
>        .wait()
>     # remove .wait if submitting to a remote cluster, refer to
>     # 
> https://ci.apache.org/projects/flink/flink-docs-stable/docs/dev/python/faq/#wait-for-jobs-to-finish-when-executing-jobs-in-mini-cluster
>     # for more details
> if __name__ == '__main__':
>     logging.basicConfig(stream=sys.stdout, level=logging.INFO, 
> format="%(message)s")
>     parser = argparse.ArgumentParser()
>     parser.add_argument(
>         '--input',
>         dest='input',
>         required=False,
>         help='Input file to process.')
>     parser.add_argument(
>         '--output',
>         dest='output',
>         required=False,
>         help='Output file to write results to.')
>     argv = sys.argv[1:]
>     known_args, _ = parser.parse_known_args(argv)
>     word_count(known_args.input, known_args.output)
> {code}
> It will throw the following exception:
> {code}
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.RuntimeException: Error received from SDK harness for instruction 
> 1: Traceback (most recent call last):
>   File 
> "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 289, in _execute
>     response = task()
>   File 
> "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 362, in <lambda>
>     lambda: self.create_worker().do_instruction(request), request)
>   File 
> "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 607, in do_instruction
>     getattr(request, request_type), request.instruction_id)
>   File 
> "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 644, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File 
> "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 1000, in process_bundle
>     element.data)
>   File 
> "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 228, in process_encoded
>     self.output(decoded_value)
>   File "apache_beam/runners/worker/operations.py", line 357, in 
> apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 359, in 
> apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 221, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 319, in 
> apache_beam.runners.worker.operations.Operation.process
>   File 
> "/Users/dianfu/code/src/apache/flink/flink-python/pyflink/fn_execution/beam/beam_operations_slow.py",
>  line 75, in process
>     self.process_element(value), output_stream, True)
>   File 
> "/Users/dianfu/code/src/apache/flink/flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
>  line 708, in encode_to_stream
>     self._value_coder.encode_to_stream(value, out, nested)
>   File 
> "/Users/dianfu/code/src/apache/flink/flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
>  line 226, in encode_to_stream
>     self._flatten_row_coder.encode_to_stream(value, out_stream, nested)
>   File 
> "/Users/dianfu/code/src/apache/flink/flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
>  line 96, in encode_to_stream
>     self._encode_one_row(value, out_stream, nested)
>   File 
> "/Users/dianfu/code/src/apache/flink/flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
>  line 120, in _encode_one_row
>     field_coders[i].encode_to_stream(item, data_out_stream, nested)
>   File 
> "/Users/dianfu/code/src/apache/flink/flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
>  line 535, in encode_to_stream
>     bytes_value = value.encode("utf-8")
>   File 
> "/Users/dianfu/code/src/apache/flink/flink-python/pyflink/common/types.py", 
> line 196, in __getattr__
>     idx = self._fields.index(item)
>   File 
> "/Users/dianfu/code/src/apache/flink/flink-python/pyflink/common/types.py", 
> line 192, in __getattr__
>     raise AttributeError(item)
> AttributeError: _fields
>       at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>       at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>       at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
>       at 
> org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:504)
>       at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:555)
>       at 
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:381)
>       ... 30 more
> Caused by: java.lang.RuntimeException: Error received from SDK harness for 
> instruction 1: Traceback (most recent call last):
>   File 
> "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 289, in _execute
>     response = task()
>   File 
> "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 362, in <lambda>
>     lambda: self.create_worker().do_instruction(request), request)
>   File 
> "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 607, in do_instruction
>     getattr(request, request_type), request.instruction_id)
>   File 
> "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 644, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File 
> "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 1000, in process_bundle
>     element.data)
>   File 
> "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 228, in process_encoded
>     self.output(decoded_value)
>   File "apache_beam/runners/worker/operations.py", line 357, in 
> apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 359, in 
> apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 221, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 319, in 
> apache_beam.runners.worker.operations.Operation.process
>   File 
> "/Users/dianfu/code/src/apache/flink/flink-python/pyflink/fn_execution/beam/beam_operations_slow.py",
>  line 75, in process
>     self.process_element(value), output_stream, True)
>   File 
> "/Users/dianfu/code/src/apache/flink/flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
>  line 708, in encode_to_stream
>     self._value_coder.encode_to_stream(value, out, nested)
>   File 
> "/Users/dianfu/code/src/apache/flink/flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
>  line 226, in encode_to_stream
>     self._flatten_row_coder.encode_to_stream(value, out_stream, nested)
>   File 
> "/Users/dianfu/code/src/apache/flink/flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
>  line 96, in encode_to_stream
>     self._encode_one_row(value, out_stream, nested)
>   File 
> "/Users/dianfu/code/src/apache/flink/flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
>  line 120, in _encode_one_row
>     field_coders[i].encode_to_stream(item, data_out_stream, nested)
>   File 
> "/Users/dianfu/code/src/apache/flink/flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
>  line 535, in encode_to_stream
>     bytes_value = value.encode("utf-8")
>   File 
> "/Users/dianfu/code/src/apache/flink/flink-python/pyflink/common/types.py", 
> line 196, in __getattr__
>     idx = self._fields.index(item)
>   File 
> "/Users/dianfu/code/src/apache/flink/flink-python/pyflink/common/types.py", 
> line 192, in __getattr__
>     raise AttributeError(item)
> AttributeError: _fields
> Process finished with exit code 1
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to