dylanhz created FLINK-39094:
-------------------------------
Summary: Avoid creating duplicate function instances in code
generation
Key: FLINK-39094
URL: https://issues.apache.org/jira/browse/FLINK-39094
Project: Flink
Issue Type: Improvement
Components: Table SQL / Planner
Reporter: dylanhz
When a function is called multiple times with the same context, multiple
function instances will be created while only the last one is actually used.
We can remove the duplicate ones to make the generated code cleaner and avoid
redundant object ser/de overhead.
Here is a query example and the generated code.
{code:sql}
select split('a,b,c', ','), split('x,y', ',')
{code}
In the generated code below, the function field is assigned twice in the
constructor (from references[0] and references[1]), but only the last
assignment (references[1]) is effective.
{code:java}
public class BatchExecCalc$7 extends
org.apache.flink.table.runtime.operators.TableStreamOperator
implements
org.apache.flink.streaming.api.operators.OneInputStreamOperator {
private final Object[] references;
private final org.apache.flink.table.data.binary.BinaryStringData str$0 =
org.apache.flink.table.data.binary.BinaryStringData.fromString("a,b,c");
private final org.apache.flink.table.data.binary.BinaryStringData str$1 =
org.apache.flink.table.data.binary.BinaryStringData.fromString(",");
private transient
org.apache.flink.table.runtime.functions.scalar.SplitFunction
function_org$apache$flink$table$runtime$functions$scalar$SplitFunction$3bfba7b1caccc3b7427da3dc1be41425;
private final org.apache.flink.table.data.binary.BinaryStringData str$4 =
org.apache.flink.table.data.binary.BinaryStringData.fromString("x,y");
org.apache.flink.table.data.BoxedWrapperRowData out =
new org.apache.flink.table.data.BoxedWrapperRowData(2);
private final org.apache.flink.streaming.runtime.streamrecord.StreamRecord
outElement =
new
org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
public BatchExecCalc$7(
Object[] references,
org.apache.flink.streaming.runtime.tasks.StreamTask task,
org.apache.flink.streaming.api.graph.StreamConfig config,
org.apache.flink.streaming.api.operators.Output output,
org.apache.flink.streaming.runtime.tasks.ProcessingTimeService
processingTimeService)
throws Exception {
this.references = references;
function_org$apache$flink$table$runtime$functions$scalar$SplitFunction$3bfba7b1caccc3b7427da3dc1be41425
=
(((org.apache.flink.table.runtime.functions.scalar.SplitFunction)
references[0]));
function_org$apache$flink$table$runtime$functions$scalar$SplitFunction$3bfba7b1caccc3b7427da3dc1be41425
=
(((org.apache.flink.table.runtime.functions.scalar.SplitFunction)
references[1]));
this.setup(task, config, output);
if (this instanceof
org.apache.flink.streaming.api.operators.AbstractStreamOperator) {
((org.apache.flink.streaming.api.operators.AbstractStreamOperator)
this)
.setProcessingTimeService(processingTimeService);
}
}
@Override
public void open() throws Exception {
super.open();
function_org$apache$flink$table$runtime$functions$scalar$SplitFunction$3bfba7b1caccc3b7427da3dc1be41425
.open(new
org.apache.flink.table.functions.FunctionContext(getRuntimeContext()));
}
@Override
public void
processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord
element)
throws Exception {
org.apache.flink.table.data.RowData in1 =
(org.apache.flink.table.data.RowData) element.getValue();
org.apache.flink.table.data.ArrayData externalResult$2;
org.apache.flink.table.data.ArrayData result$3;
boolean isNull$3;
org.apache.flink.table.data.ArrayData externalResult$5;
org.apache.flink.table.data.ArrayData result$6;
boolean isNull$6;
externalResult$2 =
(org.apache.flink.table.data.ArrayData)
function_org$apache$flink$table$runtime$functions$scalar$SplitFunction$3bfba7b1caccc3b7427da3dc1be41425
.eval(
false
? null
:
((org.apache.flink.table.data.binary
.BinaryStringData)
((org.apache.flink.table.data.binary
.BinaryStringData)
str$0)),
false
? null
:
((org.apache.flink.table.data.binary
.BinaryStringData)
((org.apache.flink.table.data.binary
.BinaryStringData)
str$1)));
isNull$3 = externalResult$2 == null;
result$3 = null;
if (!isNull$3) {
result$3 = externalResult$2;
}
out.setNonPrimitiveValue(0, result$3);
externalResult$5 =
(org.apache.flink.table.data.ArrayData)
function_org$apache$flink$table$runtime$functions$scalar$SplitFunction$3bfba7b1caccc3b7427da3dc1be41425
.eval(
false
? null
:
((org.apache.flink.table.data.binary
.BinaryStringData)
((org.apache.flink.table.data.binary
.BinaryStringData)
str$4)),
false
? null
:
((org.apache.flink.table.data.binary
.BinaryStringData)
((org.apache.flink.table.data.binary
.BinaryStringData)
str$1)));
isNull$6 = externalResult$5 == null;
result$6 = null;
if (!isNull$6) {
result$6 = externalResult$5;
}
out.setNonPrimitiveValue(1, result$6);
output.collect(outElement.replace(out));
}
@Override
public void finish() throws Exception {
super.finish();
}
@Override
public void close() throws Exception {
super.close();
function_org$apache$flink$table$runtime$functions$scalar$SplitFunction$3bfba7b1caccc3b7427da3dc1be41425
.close();
}
}
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)