Timo Walther created FLINK-23373:
------------------------------------

             Summary: Support object reuse disabled in OperatorChain
                 Key: FLINK-23373
                 URL: https://issues.apache.org/jira/browse/FLINK-23373
             Project: Flink
          Issue Type: Sub-task
          Components: Runtime / Task
            Reporter: Timo Walther


Currently, object reuse must be enabled in order to use chained sources.

Tests such as `HiveDialectQueryITCase` will fail with an exception:
{code}
2021-07-12T14:47:55.8233741Z Jul 12 14:47:55 [ERROR] 
testQueries(org.apache.flink.connectors.hive.HiveDialectQueryITCase)  Time 
elapsed: 12.283 s  <<< ERROR!
2021-07-12T14:47:55.8234433Z Jul 12 14:47:55 java.lang.RuntimeException: Failed 
to fetch next result
2021-07-12T14:47:55.8235133Z Jul 12 14:47:55    at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
2021-07-12T14:47:55.8235958Z Jul 12 14:47:55    at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
2021-07-12T14:47:55.8236774Z Jul 12 14:47:55    at 
org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370)
....
2021-07-12T14:47:55.8313594Z Jul 12 14:47:55 Caused by: 
java.lang.UnsupportedOperationException: Currently chained sources are 
supported only with objectReuse enabled
2021-07-12T14:47:55.8314356Z Jul 12 14:47:55    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedSourceOutput(OperatorChain.java:355)
2021-07-12T14:47:55.8315109Z Jul 12 14:47:55    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedSources(OperatorChain.java:322)
2021-07-12T14:47:55.8315820Z Jul 12 14:47:55    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:220)
2021-07-12T14:47:55.8316506Z Jul 12 14:47:55    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:558)
2021-07-12T14:47:55.8317209Z Jul 12 14:47:55    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:661)
2021-07-12T14:47:55.8317948Z Jul 12 14:47:55    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:547)
2021-07-12T14:47:55.8318626Z Jul 12 14:47:55    at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
2021-07-12T14:47:55.8319205Z Jul 12 14:47:55    at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
2021-07-12T14:47:55.8319725Z Jul 12 14:47:55    at 
java.lang.Thread.run(Thread.java:748)
2021-07-12T14:47:55.8320122Z Jul 12 1
{code}

The fix should looks as follows:

This particular exception should be rather straightforward to fix. The reason 
it's not implemented is because the chaining sources feature was implemented in 
the minimal scope required by blink planner and is missing around ~50-100 lines 
of production code to work with the object reuse disabled.
In the {{OperatorChain#createChainedSourceOutput}} we need to something similar 
as is done in {{OperatorChain#wrapOperatorIntoOutput}} , so something like:
{code}
        if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
            return closer.register(new ChainingOutput(input, metricGroup, 
outputTag));
        } else {
            TypeSerializer<IN> inSerializer =
                    operatorConfig.getTypeSerializerIn1(userCodeClassloader);
            return closer.register(new CopyingChainingOutput(input, 
inSerializer, metricGroup, outputTag));
        }
{code}
the missing part to do that is to make {{CopyingChainingOutput}} work with an 
Input instead of an Operator.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to