dylanhz created FLINK-39094:
-------------------------------

             Summary: Avoid creating duplicate function instances in code 
generation
                 Key: FLINK-39094
                 URL: https://issues.apache.org/jira/browse/FLINK-39094
             Project: Flink
          Issue Type: Improvement
          Components: Table SQL / Planner
            Reporter: dylanhz


When a function is called multiple times with the same context, multiple 
function instances will be created while only the last one is actually used. 
We can remove the duplicate ones to make the generated code cleaner and avoid 
redundant object ser/de overhead.

Here is a query example and the generated code.
{code:sql}
select split('a,b,c', ','), split('x,y', ',')
{code}

In the generated code below, the function field is assigned twice in the 
constructor (from references[0] and references[1]), but only the last 
assignment (references[1]) is effective.
{code:java}
public class BatchExecCalc$7 extends 
org.apache.flink.table.runtime.operators.TableStreamOperator
        implements 
org.apache.flink.streaming.api.operators.OneInputStreamOperator {

    private final Object[] references;

    private final org.apache.flink.table.data.binary.BinaryStringData str$0 =
            
org.apache.flink.table.data.binary.BinaryStringData.fromString("a,b,c");

    private final org.apache.flink.table.data.binary.BinaryStringData str$1 =
            org.apache.flink.table.data.binary.BinaryStringData.fromString(",");

    private transient 
org.apache.flink.table.runtime.functions.scalar.SplitFunction
            
function_org$apache$flink$table$runtime$functions$scalar$SplitFunction$3bfba7b1caccc3b7427da3dc1be41425;

    private final org.apache.flink.table.data.binary.BinaryStringData str$4 =
            
org.apache.flink.table.data.binary.BinaryStringData.fromString("x,y");

    org.apache.flink.table.data.BoxedWrapperRowData out =
            new org.apache.flink.table.data.BoxedWrapperRowData(2);
    private final org.apache.flink.streaming.runtime.streamrecord.StreamRecord 
outElement =
            new 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);

    public BatchExecCalc$7(
            Object[] references,
            org.apache.flink.streaming.runtime.tasks.StreamTask task,
            org.apache.flink.streaming.api.graph.StreamConfig config,
            org.apache.flink.streaming.api.operators.Output output,
            org.apache.flink.streaming.runtime.tasks.ProcessingTimeService 
processingTimeService)
            throws Exception {
        this.references = references;
        
function_org$apache$flink$table$runtime$functions$scalar$SplitFunction$3bfba7b1caccc3b7427da3dc1be41425
 =
                
(((org.apache.flink.table.runtime.functions.scalar.SplitFunction) 
references[0]));
        
function_org$apache$flink$table$runtime$functions$scalar$SplitFunction$3bfba7b1caccc3b7427da3dc1be41425
 =
                
(((org.apache.flink.table.runtime.functions.scalar.SplitFunction) 
references[1]));
        this.setup(task, config, output);
        if (this instanceof 
org.apache.flink.streaming.api.operators.AbstractStreamOperator) {
            ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) 
this)
                    .setProcessingTimeService(processingTimeService);
        }
    }

    @Override
    public void open() throws Exception {
        super.open();

        
function_org$apache$flink$table$runtime$functions$scalar$SplitFunction$3bfba7b1caccc3b7427da3dc1be41425
                .open(new 
org.apache.flink.table.functions.FunctionContext(getRuntimeContext()));
    }

    @Override
    public void 
processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord 
element)
            throws Exception {
        org.apache.flink.table.data.RowData in1 =
                (org.apache.flink.table.data.RowData) element.getValue();

        org.apache.flink.table.data.ArrayData externalResult$2;
        org.apache.flink.table.data.ArrayData result$3;
        boolean isNull$3;
        org.apache.flink.table.data.ArrayData externalResult$5;
        org.apache.flink.table.data.ArrayData result$6;
        boolean isNull$6;

        externalResult$2 =
                (org.apache.flink.table.data.ArrayData)
                        
function_org$apache$flink$table$runtime$functions$scalar$SplitFunction$3bfba7b1caccc3b7427da3dc1be41425
                                .eval(
                                        false
                                                ? null
                                                : 
((org.apache.flink.table.data.binary
                                                                
.BinaryStringData)
                                                        
((org.apache.flink.table.data.binary
                                                                        
.BinaryStringData)
                                                                str$0)),
                                        false
                                                ? null
                                                : 
((org.apache.flink.table.data.binary
                                                                
.BinaryStringData)
                                                        
((org.apache.flink.table.data.binary
                                                                        
.BinaryStringData)
                                                                str$1)));

        isNull$3 = externalResult$2 == null;
        result$3 = null;
        if (!isNull$3) {
            result$3 = externalResult$2;
        }

        out.setNonPrimitiveValue(0, result$3);

        externalResult$5 =
                (org.apache.flink.table.data.ArrayData)
                        
function_org$apache$flink$table$runtime$functions$scalar$SplitFunction$3bfba7b1caccc3b7427da3dc1be41425
                                .eval(
                                        false
                                                ? null
                                                : 
((org.apache.flink.table.data.binary
                                                                
.BinaryStringData)
                                                        
((org.apache.flink.table.data.binary
                                                                        
.BinaryStringData)
                                                                str$4)),
                                        false
                                                ? null
                                                : 
((org.apache.flink.table.data.binary
                                                                
.BinaryStringData)
                                                        
((org.apache.flink.table.data.binary
                                                                        
.BinaryStringData)
                                                                str$1)));

        isNull$6 = externalResult$5 == null;
        result$6 = null;
        if (!isNull$6) {
            result$6 = externalResult$5;
        }

        out.setNonPrimitiveValue(1, result$6);

        output.collect(outElement.replace(out));
    }

    @Override
    public void finish() throws Exception {

        super.finish();
    }

    @Override
    public void close() throws Exception {
        super.close();
        
function_org$apache$flink$table$runtime$functions$scalar$SplitFunction$3bfba7b1caccc3b7427da3dc1be41425
                .close();
    }
}

{code}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to