[
https://issues.apache.org/jira/browse/FLINK-33549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
luoyuxia updated FLINK-33549:
-----------------------------
Description:
When run a job in batch, it throws the following exception
{code:java}
java.lang.NullPointerException: Factory does not implement interface
org.apache.flink.streaming.api.operators.YieldingOperatorFactory
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory.getMailboxExecutor(AbstractStreamOperatorFactory.java:67)
at
org.apache.flink.table.runtime.operators.AsyncWaitOperatorFactory.createStreamOperator(AsyncWaitOperatorFactory.java:79)
at
org.apache.flink.table.runtime.operators.multipleinput.TableOperatorWrapper.createOperator(TableOperatorWrapper.java:115)
at
org.apache.flink.table.runtime.operators.multipleinput.MultipleInputStreamOperatorBase.createAllOperators(MultipleInputStreamOperatorBase.java:259)
at
org.apache.flink.table.runtime.operators.multipleinput.MultipleInputStreamOperatorBase.<init>(MultipleInputStreamOperatorBase.java:88)
at
org.apache.flink.table.runtime.operators.multipleinput.BatchMultipleInputStreamOperator.<init>(BatchMultipleInputStreamOperator.java:48)
at
org.apache.flink.table.runtime.operators.multipleinput.BatchMultipleInputStreamOperatorFactory.createStreamOperator(BatchMultipleInputStreamOperatorFactory.java:51)
at
org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:81)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:212)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.<init>(RegularOperatorChain.java:60)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:756)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:743)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:959)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:928)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567)
at java.lang.Thread.run(Thread.java:834) {code}
When I disable multiple-input by setting table.optimizer.multiple-input-enabled
= false, it works then.
Should be introduced by FLINK-23621.
[In
here|https://github.com/apache/flink/blob/a1aed4f877099328d4833f8a2781d2edbaaddc70/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactoryUtil.java#L60],
when the operator factory is instanceof YieldingOperatorFactory, it will set
mailbox executor. But when it's
BatchMultipleInputStreamOperatorFactory, althogh it'll still set mailbox
executor but it won't set the mailbox executor. for the operators wrapped by
the BatchMultipleInputStreamOperator. Then the exception is thrown.
> Exception "Factory does not implement interface YieldingOperatorFactory"
> thrown in batch mode
> ----------------------------------------------------------------------------------------------
>
> Key: FLINK-33549
> URL: https://issues.apache.org/jira/browse/FLINK-33549
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Reporter: luoyuxia
> Priority: Major
>
> When run a job in batch, it throws the following exception
> {code:java}
> java.lang.NullPointerException: Factory does not implement interface
> org.apache.flink.streaming.api.operators.YieldingOperatorFactory
> at
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory.getMailboxExecutor(AbstractStreamOperatorFactory.java:67)
> at
> org.apache.flink.table.runtime.operators.AsyncWaitOperatorFactory.createStreamOperator(AsyncWaitOperatorFactory.java:79)
> at
> org.apache.flink.table.runtime.operators.multipleinput.TableOperatorWrapper.createOperator(TableOperatorWrapper.java:115)
> at
> org.apache.flink.table.runtime.operators.multipleinput.MultipleInputStreamOperatorBase.createAllOperators(MultipleInputStreamOperatorBase.java:259)
> at
> org.apache.flink.table.runtime.operators.multipleinput.MultipleInputStreamOperatorBase.<init>(MultipleInputStreamOperatorBase.java:88)
> at
> org.apache.flink.table.runtime.operators.multipleinput.BatchMultipleInputStreamOperator.<init>(BatchMultipleInputStreamOperator.java:48)
> at
> org.apache.flink.table.runtime.operators.multipleinput.BatchMultipleInputStreamOperatorFactory.createStreamOperator(BatchMultipleInputStreamOperatorFactory.java:51)
> at
> org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:81)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:212)
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.<init>(RegularOperatorChain.java:60)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:756)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:743)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:959)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:928)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567)
> at java.lang.Thread.run(Thread.java:834) {code}
>
> When I disable multiple-input by setting
> table.optimizer.multiple-input-enabled = false, it works then.
> Should be introduced by FLINK-23621.
> [In
> here|https://github.com/apache/flink/blob/a1aed4f877099328d4833f8a2781d2edbaaddc70/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactoryUtil.java#L60],
> when the operator factory is instanceof YieldingOperatorFactory, it will set
> mailbox executor. But when it's
> BatchMultipleInputStreamOperatorFactory, althogh it'll still set mailbox
> executor but it won't set the mailbox executor. for the operators wrapped by
> the BatchMultipleInputStreamOperator. Then the exception is thrown.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)