[ 
https://issues.apache.org/jira/browse/FLINK-26504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Huang Xingbo resolved FLINK-26504.
----------------------------------
    Resolution: Fixed

Merged into master via 8746017cc5c3f2f69a0c5cf09d01462ded470a14
Merged into release-1.14 via 6d3aa1839b5e4a9a099d4c59384430965e80bc00
Merged into release-1.13 via 812e674ee00cbca78ad8db26d5e2ccd739a36975

> Fix the incorrect type error in unbounded Python UDAF
> -----------------------------------------------------
>
>                 Key: FLINK-26504
>                 URL: https://issues.apache.org/jira/browse/FLINK-26504
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>    Affects Versions: 1.14.0, 1.13.2, 1.15.0
>            Reporter: Huang Xingbo
>            Assignee: Huang Xingbo
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.15.0, 1.13.7, 1.14.5
>
>
> The stack trace is 
> {code:java}
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.RuntimeException: Error received from SDK harness for instruction 
> 1: Traceback (most recent call last):
>   File 
> "/usr/local/python3/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 289, in _execute
>     response = task()
>   File 
> "/usr/local/python3/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 362, in <lambda>
>     lambda: self.create_worker().do_instruction(request), request)
>   File 
> "/usr/local/python3/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 607, in do_instruction
>     getattr(request, request_type), request.instruction_id)
>   File 
> "/usr/local/python3/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 644, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File 
> "/usr/local/python3/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 1000, in process_bundle
>     element.data)
>   File 
> "/usr/local/python3/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 228, in process_encoded
>     self.output(decoded_value)
>   File "apache_beam/runners/worker/operations.py", line 357, in 
> apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 359, in 
> apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 221, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 71, in 
> pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
>   File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 84, in 
> pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
>   File 
> "/usr/local/python3/lib/python3.7/site-packages/pyflink/fn_execution/operations.py",
>  line 115, in process_element
>     return self.func(value)
>   File 
> "/usr/local/python3/lib/python3.7/site-packages/pyflink/fn_execution/operations.py",
>  line 384, in process_element_or_timer
>     self.group_agg_function.on_timer(input_data[3])
> TypeError: Argument 'key' has incorrect type (expected 
> pyflink.fn_execution.coder_impl_fast.InternalRow, got Row)
>       at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>       at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>       at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
>       at 
> org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:504)
>       at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:555)
>       at 
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:381)
> {code}
> The condition for this bug to occur is that state clean is triggered. The 
> workaround is to disable table.exec.state.ttl



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to