YufeiLiu created FLINK-21208:
--------------------------------
Summary: pyarrow exception when using window with pandas udaf
Key: FLINK-21208
URL: https://issues.apache.org/jira/browse/FLINK-21208
Project: Flink
Issue Type: Improvement
Components: API / Python
Affects Versions: 1.12.0
Reporter: YufeiLiu
I write a pyflink demo and execute in local environment, the logic is
simple:generate records and aggerate in 100s tumle window, using a pandas udaf.
But the job failed after several minutes, I don't think it's a resource problem
because the amount of data is small, here is the error trace.
{code:java}
Caused by: org.apache.flink.streaming.runtime.tasks.AsynchronousException:
Caught exception while processing timer.
at
org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1108)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1082)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1213)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$17(StreamTask.java:1202)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
at
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:302)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:184)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:748)
Caused by: TimerException{java.lang.RuntimeException: Failed to close remote
bundle}
... 11 more
Caused by: java.lang.RuntimeException: Failed to close remote bundle
at
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:371)
at
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:325)
at
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:291)
at
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:285)
at
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:134)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1211)
... 10 more
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException:
Error received from SDK harness for instruction 3: Traceback (most recent call
last):
File
"/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 253, in _execute
response = task()
File
"/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 310, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File
"/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 480, in do_instruction
getattr(request, request_type), request.instruction_id)
File
"/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 515, in process_bundle
bundle_processor.process_bundle(instruction_id))
File
"/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 978, in process_bundle
element.data)
File
"/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 218, in process_encoded
self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 330, in
apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 332, in
apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 195, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 292, in
apache_beam.runners.worker.operations.Operation.process
File
"/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyflink/fn_execution/beam/beam_operations_slow.py",
line 73, in process
for value in o.value:
File
"/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
line 625, in decode_from_stream
yield self._decode_one_batch_from_stream(in_stream,
in_stream.read_var_int64())
File
"/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
line 636, in _decode_one_batch_from_stream
return arrow_to_pandas(self._timezone, self._field_types,
[next(self._batch_reader)])
File
"/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
line 629, in _load_from_stream
reader = pa.ipc.open_stream(stream)
File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyarrow/ipc.py",
line 146, in open_stream
return RecordBatchStreamReader(source)
File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyarrow/ipc.py",
line 62, in __init__
self._open(source)
File "pyarrow/ipc.pxi", line 360, in
pyarrow.lib._RecordBatchStreamReader._open
File "pyarrow/error.pxi", line 123, in
pyarrow.lib.pyarrow_internal_check_status
File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
OSError: Expected IPC message of type schema but got record batch
at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
at
org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:458)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:547)
at
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:369)
... 15 more
Caused by: java.lang.RuntimeException: Error received from SDK harness for
instruction 3: Traceback (most recent call last):
File
"/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 253, in _execute
response = task()
File
"/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 310, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File
"/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 480, in do_instruction
getattr(request, request_type), request.instruction_id)
File
"/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 515, in process_bundle
bundle_processor.process_bundle(instruction_id))
File
"/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 978, in process_bundle
element.data)
File
"/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 218, in process_encoded
self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 330, in
apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 332, in
apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 195, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 292, in
apache_beam.runners.worker.operations.Operation.process
File
"/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyflink/fn_execution/beam/beam_operations_slow.py",
line 73, in process
for value in o.value:
File
"/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
line 625, in decode_from_stream
yield self._decode_one_batch_from_stream(in_stream,
in_stream.read_var_int64())
File
"/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
line 636, in _decode_one_batch_from_stream
return arrow_to_pandas(self._timezone, self._field_types,
[next(self._batch_reader)])
File
"/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
line 629, in _load_from_stream
reader = pa.ipc.open_stream(stream)
File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyarrow/ipc.py",
line 146, in open_stream
return RecordBatchStreamReader(source)
File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyarrow/ipc.py",
line 62, in __init__
self._open(source)
File "pyarrow/ipc.pxi", line 360, in
pyarrow.lib._RecordBatchStreamReader._open
File "pyarrow/error.pxi", line 123, in
pyarrow.lib.pyarrow_internal_check_status
File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
OSError: Expected IPC message of type schema but got record batch
at
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177)
at
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157)
at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251)
at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309)
at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292)
at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782)
at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
{code}
And my test code:
{code:python}
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import *
from pyflink.table.udf import udaf, AggregateFunction
from pyflink.table.window import Tumble
class MyTestAggregateFunction(AggregateFunction):
def get_value(self, accumulator):
return accumulator[0]
def create_accumulator(self):
return Row(0)
def accumulate(self, accumulator, *args):
accumulator[0] = len(args[0])
def get_result_type(self):
return DataTypes.BIGINT()
if __name__ == '__main__':
env = StreamExecutionEnvironment.get_execution_environment()
f_s_settings =
EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, None, f_s_settings)
my_udaf = udaf(MyTestAggregateFunction(), func_type="pandas")
t_env.register_function('my_udaf', my_udaf)
t_env.sql_update("""
CREATE TABLE `source_table` (
`header` STRING,
ts AS PROCTIME()
) WITH (
'connector' = 'datagen',
'rows-per-second' = '100'
)
""")
t_env.sql_update("""
CREATE TABLE `sink_table` (
`content` BIGINT,
`wstart` TIMESTAMP(3)
) WITH (
'connector' = 'print'
)
""")
t_env.scan("source_table") \
.window(Tumble.over("100.second").on("ts").alias("w")) \
.group_by('w') \
.select("my_udaf(header), w.start")\
.insert_into("sink_table")
t_env.execute("test_job")
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)