[
https://issues.apache.org/jira/browse/FLINK-39094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-39094:
-----------------------------------
Labels: pull-request-available (was: )
> Avoid creating duplicate function instances in code generation
> --------------------------------------------------------------
>
> Key: FLINK-39094
> URL: https://issues.apache.org/jira/browse/FLINK-39094
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / Planner
> Reporter: dylanhz
> Priority: Minor
> Labels: pull-request-available
>
> When a function is called multiple times with the same context, multiple
> function instances will be created while only the last one is actually used.
> We can remove the duplicate ones to make the generated code cleaner and avoid
> redundant object ser/de overhead.
> Here is a query example and the generated code.
> {code:sql}
> select split('a,b,c', ','), split('x,y', ',')
> {code}
> In the generated code below, the function field is assigned twice in the
> constructor (from references[0] and references[1]), but only the last
> assignment (references[1]) is effective.
> {code:java}
> public class BatchExecCalc$7 extends
> org.apache.flink.table.runtime.operators.TableStreamOperator
> implements
> org.apache.flink.streaming.api.operators.OneInputStreamOperator {
> private final Object[] references;
> private final org.apache.flink.table.data.binary.BinaryStringData str$0 =
>
> org.apache.flink.table.data.binary.BinaryStringData.fromString("a,b,c");
> private final org.apache.flink.table.data.binary.BinaryStringData str$1 =
>
> org.apache.flink.table.data.binary.BinaryStringData.fromString(",");
> private transient
> org.apache.flink.table.runtime.functions.scalar.SplitFunction
>
> function_org$apache$flink$table$runtime$functions$scalar$SplitFunction$3bfba7b1caccc3b7427da3dc1be41425;
> private final org.apache.flink.table.data.binary.BinaryStringData str$4 =
>
> org.apache.flink.table.data.binary.BinaryStringData.fromString("x,y");
> org.apache.flink.table.data.BoxedWrapperRowData out =
> new org.apache.flink.table.data.BoxedWrapperRowData(2);
> private final
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement =
> new
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
> public BatchExecCalc$7(
> Object[] references,
> org.apache.flink.streaming.runtime.tasks.StreamTask task,
> org.apache.flink.streaming.api.graph.StreamConfig config,
> org.apache.flink.streaming.api.operators.Output output,
> org.apache.flink.streaming.runtime.tasks.ProcessingTimeService
> processingTimeService)
> throws Exception {
> this.references = references;
>
> function_org$apache$flink$table$runtime$functions$scalar$SplitFunction$3bfba7b1caccc3b7427da3dc1be41425
> =
>
> (((org.apache.flink.table.runtime.functions.scalar.SplitFunction)
> references[0]));
>
> function_org$apache$flink$table$runtime$functions$scalar$SplitFunction$3bfba7b1caccc3b7427da3dc1be41425
> =
>
> (((org.apache.flink.table.runtime.functions.scalar.SplitFunction)
> references[1]));
> this.setup(task, config, output);
> if (this instanceof
> org.apache.flink.streaming.api.operators.AbstractStreamOperator) {
>
> ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this)
> .setProcessingTimeService(processingTimeService);
> }
> }
> @Override
> public void open() throws Exception {
> super.open();
>
> function_org$apache$flink$table$runtime$functions$scalar$SplitFunction$3bfba7b1caccc3b7427da3dc1be41425
> .open(new
> org.apache.flink.table.functions.FunctionContext(getRuntimeContext()));
> }
> @Override
> public void
> processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> element)
> throws Exception {
> org.apache.flink.table.data.RowData in1 =
> (org.apache.flink.table.data.RowData) element.getValue();
> org.apache.flink.table.data.ArrayData externalResult$2;
> org.apache.flink.table.data.ArrayData result$3;
> boolean isNull$3;
> org.apache.flink.table.data.ArrayData externalResult$5;
> org.apache.flink.table.data.ArrayData result$6;
> boolean isNull$6;
> externalResult$2 =
> (org.apache.flink.table.data.ArrayData)
>
> function_org$apache$flink$table$runtime$functions$scalar$SplitFunction$3bfba7b1caccc3b7427da3dc1be41425
> .eval(
> false
> ? null
> :
> ((org.apache.flink.table.data.binary
>
> .BinaryStringData)
>
> ((org.apache.flink.table.data.binary
>
> .BinaryStringData)
> str$0)),
> false
> ? null
> :
> ((org.apache.flink.table.data.binary
>
> .BinaryStringData)
>
> ((org.apache.flink.table.data.binary
>
> .BinaryStringData)
> str$1)));
> isNull$3 = externalResult$2 == null;
> result$3 = null;
> if (!isNull$3) {
> result$3 = externalResult$2;
> }
> out.setNonPrimitiveValue(0, result$3);
> externalResult$5 =
> (org.apache.flink.table.data.ArrayData)
>
> function_org$apache$flink$table$runtime$functions$scalar$SplitFunction$3bfba7b1caccc3b7427da3dc1be41425
> .eval(
> false
> ? null
> :
> ((org.apache.flink.table.data.binary
>
> .BinaryStringData)
>
> ((org.apache.flink.table.data.binary
>
> .BinaryStringData)
> str$4)),
> false
> ? null
> :
> ((org.apache.flink.table.data.binary
>
> .BinaryStringData)
>
> ((org.apache.flink.table.data.binary
>
> .BinaryStringData)
> str$1)));
> isNull$6 = externalResult$5 == null;
> result$6 = null;
> if (!isNull$6) {
> result$6 = externalResult$5;
> }
> out.setNonPrimitiveValue(1, result$6);
> output.collect(outElement.replace(out));
> }
> @Override
> public void finish() throws Exception {
> super.finish();
> }
> @Override
> public void close() throws Exception {
> super.close();
>
> function_org$apache$flink$table$runtime$functions$scalar$SplitFunction$3bfba7b1caccc3b7427da3dc1be41425
> .close();
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)