[ 
https://issues.apache.org/jira/browse/FLINK-39094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-39094:
-----------------------------------
    Labels: pull-request-available  (was: )

> Avoid creating duplicate function instances in code generation
> --------------------------------------------------------------
>
>                 Key: FLINK-39094
>                 URL: https://issues.apache.org/jira/browse/FLINK-39094
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Planner
>            Reporter: dylanhz
>            Priority: Minor
>              Labels: pull-request-available
>
> When a function is called multiple times with the same context, multiple 
> function instances will be created while only the last one is actually used. 
> We can remove the duplicate ones to make the generated code cleaner and avoid 
> redundant object ser/de overhead.
> Here is a query example and the generated code.
> {code:sql}
> select split('a,b,c', ','), split('x,y', ',')
> {code}
> In the generated code below, the function field is assigned twice in the 
> constructor (from references[0] and references[1]), but only the last 
> assignment (references[1]) is effective.
> {code:java}
> public class BatchExecCalc$7 extends 
> org.apache.flink.table.runtime.operators.TableStreamOperator
>         implements 
> org.apache.flink.streaming.api.operators.OneInputStreamOperator {
>     private final Object[] references;
>     private final org.apache.flink.table.data.binary.BinaryStringData str$0 =
>             
> org.apache.flink.table.data.binary.BinaryStringData.fromString("a,b,c");
>     private final org.apache.flink.table.data.binary.BinaryStringData str$1 =
>             
> org.apache.flink.table.data.binary.BinaryStringData.fromString(",");
>     private transient 
> org.apache.flink.table.runtime.functions.scalar.SplitFunction
>             
> function_org$apache$flink$table$runtime$functions$scalar$SplitFunction$3bfba7b1caccc3b7427da3dc1be41425;
>     private final org.apache.flink.table.data.binary.BinaryStringData str$4 =
>             
> org.apache.flink.table.data.binary.BinaryStringData.fromString("x,y");
>     org.apache.flink.table.data.BoxedWrapperRowData out =
>             new org.apache.flink.table.data.BoxedWrapperRowData(2);
>     private final 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement =
>             new 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
>     public BatchExecCalc$7(
>             Object[] references,
>             org.apache.flink.streaming.runtime.tasks.StreamTask task,
>             org.apache.flink.streaming.api.graph.StreamConfig config,
>             org.apache.flink.streaming.api.operators.Output output,
>             org.apache.flink.streaming.runtime.tasks.ProcessingTimeService 
> processingTimeService)
>             throws Exception {
>         this.references = references;
>         
> function_org$apache$flink$table$runtime$functions$scalar$SplitFunction$3bfba7b1caccc3b7427da3dc1be41425
>  =
>                 
> (((org.apache.flink.table.runtime.functions.scalar.SplitFunction) 
> references[0]));
>         
> function_org$apache$flink$table$runtime$functions$scalar$SplitFunction$3bfba7b1caccc3b7427da3dc1be41425
>  =
>                 
> (((org.apache.flink.table.runtime.functions.scalar.SplitFunction) 
> references[1]));
>         this.setup(task, config, output);
>         if (this instanceof 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator) {
>             
> ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this)
>                     .setProcessingTimeService(processingTimeService);
>         }
>     }
>     @Override
>     public void open() throws Exception {
>         super.open();
>         
> function_org$apache$flink$table$runtime$functions$scalar$SplitFunction$3bfba7b1caccc3b7427da3dc1be41425
>                 .open(new 
> org.apache.flink.table.functions.FunctionContext(getRuntimeContext()));
>     }
>     @Override
>     public void 
> processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord 
> element)
>             throws Exception {
>         org.apache.flink.table.data.RowData in1 =
>                 (org.apache.flink.table.data.RowData) element.getValue();
>         org.apache.flink.table.data.ArrayData externalResult$2;
>         org.apache.flink.table.data.ArrayData result$3;
>         boolean isNull$3;
>         org.apache.flink.table.data.ArrayData externalResult$5;
>         org.apache.flink.table.data.ArrayData result$6;
>         boolean isNull$6;
>         externalResult$2 =
>                 (org.apache.flink.table.data.ArrayData)
>                         
> function_org$apache$flink$table$runtime$functions$scalar$SplitFunction$3bfba7b1caccc3b7427da3dc1be41425
>                                 .eval(
>                                         false
>                                                 ? null
>                                                 : 
> ((org.apache.flink.table.data.binary
>                                                                 
> .BinaryStringData)
>                                                         
> ((org.apache.flink.table.data.binary
>                                                                         
> .BinaryStringData)
>                                                                 str$0)),
>                                         false
>                                                 ? null
>                                                 : 
> ((org.apache.flink.table.data.binary
>                                                                 
> .BinaryStringData)
>                                                         
> ((org.apache.flink.table.data.binary
>                                                                         
> .BinaryStringData)
>                                                                 str$1)));
>         isNull$3 = externalResult$2 == null;
>         result$3 = null;
>         if (!isNull$3) {
>             result$3 = externalResult$2;
>         }
>         out.setNonPrimitiveValue(0, result$3);
>         externalResult$5 =
>                 (org.apache.flink.table.data.ArrayData)
>                         
> function_org$apache$flink$table$runtime$functions$scalar$SplitFunction$3bfba7b1caccc3b7427da3dc1be41425
>                                 .eval(
>                                         false
>                                                 ? null
>                                                 : 
> ((org.apache.flink.table.data.binary
>                                                                 
> .BinaryStringData)
>                                                         
> ((org.apache.flink.table.data.binary
>                                                                         
> .BinaryStringData)
>                                                                 str$4)),
>                                         false
>                                                 ? null
>                                                 : 
> ((org.apache.flink.table.data.binary
>                                                                 
> .BinaryStringData)
>                                                         
> ((org.apache.flink.table.data.binary
>                                                                         
> .BinaryStringData)
>                                                                 str$1)));
>         isNull$6 = externalResult$5 == null;
>         result$6 = null;
>         if (!isNull$6) {
>             result$6 = externalResult$5;
>         }
>         out.setNonPrimitiveValue(1, result$6);
>         output.collect(outElement.replace(out));
>     }
>     @Override
>     public void finish() throws Exception {
>         super.finish();
>     }
>     @Override
>     public void close() throws Exception {
>         super.close();
>         
> function_org$apache$flink$table$runtime$functions$scalar$SplitFunction$3bfba7b1caccc3b7427da3dc1be41425
>                 .close();
>     }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to