[
https://issues.apache.org/jira/browse/FLINK-23373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Timo Walther reassigned FLINK-23373:
------------------------------------
Assignee: Timo Walther
> Support object reuse disabled in OperatorChain
> ----------------------------------------------
>
> Key: FLINK-23373
> URL: https://issues.apache.org/jira/browse/FLINK-23373
> Project: Flink
> Issue Type: Sub-task
> Components: Runtime / Task
> Reporter: Timo Walther
> Assignee: Timo Walther
> Priority: Major
>
> Currently, object reuse must be enabled in order to use chained sources.
> Tests such as `HiveDialectQueryITCase` will fail with an exception:
> {code}
> 2021-07-12T14:47:55.8233741Z Jul 12 14:47:55 [ERROR]
> testQueries(org.apache.flink.connectors.hive.HiveDialectQueryITCase) Time
> elapsed: 12.283 s <<< ERROR!
> 2021-07-12T14:47:55.8234433Z Jul 12 14:47:55 java.lang.RuntimeException:
> Failed to fetch next result
> 2021-07-12T14:47:55.8235133Z Jul 12 14:47:55 at
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
> 2021-07-12T14:47:55.8235958Z Jul 12 14:47:55 at
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
> 2021-07-12T14:47:55.8236774Z Jul 12 14:47:55 at
> org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370)
> ....
> 2021-07-12T14:47:55.8313594Z Jul 12 14:47:55 Caused by:
> java.lang.UnsupportedOperationException: Currently chained sources are
> supported only with objectReuse enabled
> 2021-07-12T14:47:55.8314356Z Jul 12 14:47:55 at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedSourceOutput(OperatorChain.java:355)
> 2021-07-12T14:47:55.8315109Z Jul 12 14:47:55 at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedSources(OperatorChain.java:322)
> 2021-07-12T14:47:55.8315820Z Jul 12 14:47:55 at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:220)
> 2021-07-12T14:47:55.8316506Z Jul 12 14:47:55 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:558)
> 2021-07-12T14:47:55.8317209Z Jul 12 14:47:55 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:661)
> 2021-07-12T14:47:55.8317948Z Jul 12 14:47:55 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:547)
> 2021-07-12T14:47:55.8318626Z Jul 12 14:47:55 at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
> 2021-07-12T14:47:55.8319205Z Jul 12 14:47:55 at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
> 2021-07-12T14:47:55.8319725Z Jul 12 14:47:55 at
> java.lang.Thread.run(Thread.java:748)
> 2021-07-12T14:47:55.8320122Z Jul 12 1
> {code}
> The fix should looks as follows:
> This particular exception should be rather straightforward to fix. The reason
> it's not implemented is because the chaining sources feature was implemented
> in the minimal scope required by blink planner and is missing around ~50-100
> lines of production code to work with the object reuse disabled.
> In the {{OperatorChain#createChainedSourceOutput}} we need to something
> similar as is done in {{OperatorChain#wrapOperatorIntoOutput}} , so something
> like:
> {code}
> if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
> return closer.register(new ChainingOutput(input, metricGroup,
> outputTag));
> } else {
> TypeSerializer<IN> inSerializer =
> operatorConfig.getTypeSerializerIn1(userCodeClassloader);
> return closer.register(new CopyingChainingOutput(input,
> inSerializer, metricGroup, outputTag));
> }
> {code}
> the missing part to do that is to make {{CopyingChainingOutput}} work with an
> Input instead of an Operator.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)