Caizhi Weng created FLINK-28190:
-----------------------------------

             Summary: NullPointerException is thrown if the intermediate result 
of nesting UDFs is used
                 Key: FLINK-28190
                 URL: https://issues.apache.org/jira/browse/FLINK-28190
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Runtime
    Affects Versions: 1.14.5, 1.15.0
            Reporter: Caizhi Weng


Add the following test case to {{TableEnvironmentITCase}} to reproduce this bug.

{code:scala}
@Test
def myTest(): Unit = {
  tEnv.executeSql("create temporary function myfun1 as 'MyFun1'")
  tEnv.executeSql("create temporary function myfun2 as 'MyFun2'")

  val data: Seq[Row] = Seq(
    Row.of("Hi", "Hello")
  )
  tEnv.executeSql(
    s"""
      |create table T (
      |  a string,
      |  b string
      |) with (
      |  'connector' = 'values',
      |  'data-id' = '${TestValuesTableFactory.registerData(data)}',
      |  'bounded' = 'true'
      |)
      |""".stripMargin)
  tEnv.executeSql("create temporary view my_view as select myfun1(a, b) as mp 
from T")
  tEnv.executeSql("select myfun2(mp), mp['Hi'] from my_view").print()
}
{code}

UDF classes are
{code:java}
import org.apache.flink.table.functions.ScalarFunction;

import java.util.HashMap;
import java.util.Map;

public class MyFun1 extends ScalarFunction {

    public Map<String, String> eval(String k, String v) {
        Map<String, String> returnMap = new HashMap<>();
        returnMap.put(k, v);
        return returnMap;
    }
}
{code}

{code:java}
import org.apache.flink.table.functions.ScalarFunction;

import java.util.Map;

public class MyFun2 extends ScalarFunction {

    public String eval(Map<String, String> input) {
        return String.valueOf(input);
    }
}
{code}

The exception stack is
{code}
Caused by: java.lang.NullPointerException
        at StreamExecCalc$25.processElement_split1(Unknown Source)
        at StreamExecCalc$25.processElement(Unknown Source)
        at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)
        at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80)
        at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:418)
        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:513)
        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:103)
        at 
org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions$FromElementSourceFunction.run(TestValuesRuntimeFunctions.java:530)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:332)
{code}

The generated code is
{code}
public class ToBinary$0 implements 
org.apache.flink.table.runtime.generated.Projection<org.apache.flink.table.data.RowData,
 org.apache.flink.table.data.binary.BinaryRowData> {

  org.apache.flink.table.data.binary.BinaryRowData out = new 
org.apache.flink.table.data.binary.BinaryRowData(2);
org.apache.flink.table.data.writer.BinaryRowWriter outWriter = new 
org.apache.flink.table.data.writer.BinaryRowWriter(out);

  public ToBinary$0(Object[] references) throws Exception {
    
  }

  @Override
  public org.apache.flink.table.data.binary.BinaryRowData 
apply(org.apache.flink.table.data.RowData in1) {
    
if (in1 instanceof org.apache.flink.table.data.binary.BinaryRowData) {
  return ((org.apache.flink.table.data.binary.BinaryRowData) in1);
}

    innerApply(in1);
    return out;
  }

  /* Fit into JavaCodeSplitter's void function limitation. */
  private void innerApply(org.apache.flink.table.data.RowData in1) {
    
    
outWriter.reset();


if (in1.isNullAt(0)) {
  outWriter.setNullAt(0);
} else {
  outWriter.writeString(0, 
((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0)));
}
             


if (in1.isNullAt(1)) {
  outWriter.setNullAt(1);
} else {
  outWriter.writeString(1, 
((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1)));
}
             
outWriter.complete();
        out.setRowKind(in1.getRowKind());
  }
}
        

public class ToBinary$1 implements 
org.apache.flink.table.runtime.generated.Projection<org.apache.flink.table.data.RowData,
 org.apache.flink.table.data.binary.BinaryRowData> {

  org.apache.flink.table.data.binary.BinaryRowData out = new 
org.apache.flink.table.data.binary.BinaryRowData(2);
org.apache.flink.table.data.writer.BinaryRowWriter outWriter = new 
org.apache.flink.table.data.writer.BinaryRowWriter(out);

  public ToBinary$1(Object[] references) throws Exception {
    
  }

  @Override
  public org.apache.flink.table.data.binary.BinaryRowData 
apply(org.apache.flink.table.data.RowData in1) {
    
if (in1 instanceof org.apache.flink.table.data.binary.BinaryRowData) {
  return ((org.apache.flink.table.data.binary.BinaryRowData) in1);
}

    innerApply(in1);
    return out;
  }

  /* Fit into JavaCodeSplitter's void function limitation. */
  private void innerApply(org.apache.flink.table.data.RowData in1) {
    
    
outWriter.reset();


if (in1.isNullAt(0)) {
  outWriter.setNullAt(0);
} else {
  outWriter.writeString(0, 
((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0)));
}
             


if (in1.isNullAt(1)) {
  outWriter.setNullAt(1);
} else {
  outWriter.writeString(1, 
((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1)));
}
             
outWriter.complete();
        out.setRowKind(in1.getRowKind());
  }
}
        

public class ToBinary$2 implements 
org.apache.flink.table.runtime.generated.Projection<org.apache.flink.table.data.RowData,
 org.apache.flink.table.data.binary.BinaryRowData> {

  org.apache.flink.table.data.binary.BinaryRowData out = new 
org.apache.flink.table.data.binary.BinaryRowData(2);
org.apache.flink.table.data.writer.BinaryRowWriter outWriter = new 
org.apache.flink.table.data.writer.BinaryRowWriter(out);

  public ToBinary$2(Object[] references) throws Exception {
    
  }

  @Override
  public org.apache.flink.table.data.binary.BinaryRowData 
apply(org.apache.flink.table.data.RowData in1) {
    
if (in1 instanceof org.apache.flink.table.data.binary.BinaryRowData) {
  return ((org.apache.flink.table.data.binary.BinaryRowData) in1);
}

    innerApply(in1);
    return out;
  }

  /* Fit into JavaCodeSplitter's void function limitation. */
  private void innerApply(org.apache.flink.table.data.RowData in1) {
    
    
outWriter.reset();


if (in1.isNullAt(0)) {
  outWriter.setNullAt(0);
} else {
  outWriter.writeString(0, 
((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0)));
}
             


if (in1.isNullAt(1)) {
  outWriter.setNullAt(1);
} else {
  outWriter.writeString(1, 
((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1)));
}
             
outWriter.complete();
        out.setRowKind(in1.getRowKind());
  }
}
        

public class ToBinary$3 implements 
org.apache.flink.table.runtime.generated.Projection<org.apache.flink.table.data.RowData,
 org.apache.flink.table.data.binary.BinaryRowData> {

  org.apache.flink.table.data.binary.BinaryRowData out = new 
org.apache.flink.table.data.binary.BinaryRowData(2);
org.apache.flink.table.data.writer.BinaryRowWriter outWriter = new 
org.apache.flink.table.data.writer.BinaryRowWriter(out);

  public ToBinary$3(Object[] references) throws Exception {
    
  }

  @Override
  public org.apache.flink.table.data.binary.BinaryRowData 
apply(org.apache.flink.table.data.RowData in1) {
    
if (in1 instanceof org.apache.flink.table.data.binary.BinaryRowData) {
  return ((org.apache.flink.table.data.binary.BinaryRowData) in1);
}

    innerApply(in1);
    return out;
  }

  /* Fit into JavaCodeSplitter's void function limitation. */
  private void innerApply(org.apache.flink.table.data.RowData in1) {
    
    
outWriter.reset();


if (in1.isNullAt(0)) {
  outWriter.setNullAt(0);
} else {
  outWriter.writeString(0, 
((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0)));
}
             


if (in1.isNullAt(1)) {
  outWriter.setNullAt(1);
} else {
  outWriter.writeString(1, 
((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1)));
}
             
outWriter.complete();
        out.setRowKind(in1.getRowKind());
  }
}
        

      public class StreamExecCalc$25 extends 
org.apache.flink.table.runtime.operators.TableStreamOperator
          implements 
org.apache.flink.streaming.api.operators.OneInputStreamOperator {

        private final Object[] references;
        private transient MyFun1 function_MyFun1;
        private transient 
org.apache.flink.table.data.conversion.StringStringConverter converter$6;
        private transient 
org.apache.flink.table.data.conversion.MapMapConverter converter$8;
        private transient MyFun2 function_MyFun2;
        
        private final org.apache.flink.table.data.binary.BinaryStringData 
str$12 = org.apache.flink.table.data.binary.BinaryStringData.fromString("Hi");
                   
        org.apache.flink.table.data.BoxedWrapperRowData out = new 
org.apache.flink.table.data.BoxedWrapperRowData(2);
        private final 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);

        public StreamExecCalc$25(
            Object[] references,
            org.apache.flink.streaming.runtime.tasks.StreamTask task,
            org.apache.flink.streaming.api.graph.StreamConfig config,
            org.apache.flink.streaming.api.operators.Output output,
            org.apache.flink.streaming.runtime.tasks.ProcessingTimeService 
processingTimeService) throws Exception {
          this.references = references;
          function_MyFun1 = (((MyFun1) references[0]));
          converter$6 = 
(((org.apache.flink.table.data.conversion.StringStringConverter) 
references[1]));
          converter$8 = 
(((org.apache.flink.table.data.conversion.MapMapConverter) references[2]));
          function_MyFun2 = (((MyFun2) references[3]));
          this.setup(task, config, output);
          if (this instanceof 
org.apache.flink.streaming.api.operators.AbstractStreamOperator) {
            ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) 
this)
              .setProcessingTimeService(processingTimeService);
          }
        }

        @Override
        public void open() throws Exception {
          super.open();
          
          function_MyFun1.open(new 
org.apache.flink.table.functions.FunctionContext(getRuntimeContext()));
                 
          
          converter$6.open(getRuntimeContext().getUserCodeClassLoader());
                     
          
          converter$8.open(getRuntimeContext().getUserCodeClassLoader());
                     
          
          function_MyFun2.open(new 
org.apache.flink.table.functions.FunctionContext(getRuntimeContext()));
                 
        }

        @Override
        public void 
processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord 
element) throws Exception {
          org.apache.flink.table.data.RowData in1 = 
(org.apache.flink.table.data.RowData) element.getValue();
          
          org.apache.flink.table.data.binary.BinaryStringData field$4;
          boolean isNull$4;
          org.apache.flink.table.data.binary.BinaryStringData field$5;
          boolean isNull$5;
          java.util.Map externalResult$7;
          org.apache.flink.table.data.MapData result$9;
          boolean isNull$9;
          java.lang.String externalResult$10;
          org.apache.flink.table.data.binary.BinaryStringData result$11;
          boolean isNull$11;
          boolean isNull$23 = false;
          boolean result$24 = false;
          
          
          isNull$4 = in1.isNullAt(0);
          field$4 = 
org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
          if (!isNull$4) {
            field$4 = ((org.apache.flink.table.data.binary.BinaryStringData) 
in1.getString(0));
          }
          isNull$5 = in1.isNullAt(1);
          field$5 = 
org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
          if (!isNull$5) {
            field$5 = ((org.apache.flink.table.data.binary.BinaryStringData) 
in1.getString(1));
          }
          
          out.setRowKind(in1.getRowKind());
          
          
          
          
          
          
          
          
          externalResult$7 = (java.util.Map) function_MyFun1
            .eval(isNull$4 ? null : ((java.lang.String) 
converter$6.toExternal((org.apache.flink.table.data.binary.BinaryStringData) 
field$4)), isNull$5 ? null : ((java.lang.String) 
converter$6.toExternal((org.apache.flink.table.data.binary.BinaryStringData) 
field$5)));
          
          externalResult$10 = (java.lang.String) function_MyFun2
            .eval(externalResult$7);
          
          isNull$11 = externalResult$10 == null;
          result$11 = 
org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
          if (!isNull$11) {
            result$11 = (org.apache.flink.table.data.binary.BinaryStringData) 
converter$6.toInternalOrNull((java.lang.String) externalResult$10);
          }
          
          if (isNull$11) {
            out.setNullAt(0);
          } else {
            out.setNonPrimitiveValue(0, result$11);
          }
                    
          
          
          
          
          boolean isNull$13 = (isNull$9 || false);
          org.apache.flink.table.data.binary.BinaryStringData result$13 = 
org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
          if (!isNull$13) {
           
          if (result$9 instanceof 
org.apache.flink.table.data.binary.BinaryMapData) {
            org.apache.flink.table.data.binary.BinaryMapData binaryMap$21 = 
(org.apache.flink.table.data.binary.BinaryMapData) result$9;
            final int length$15 = binaryMap$21.size();
            final org.apache.flink.table.data.binary.BinaryArrayData keys$16 = 
binaryMap$21.keyArray();
            final org.apache.flink.table.data.binary.BinaryArrayData values$17 
= binaryMap$21.valueArray();
          
            int index$18 = 0;
            boolean found$19 = false;
            if (false) {
              while (index$18 < length$15 && !found$19) {
                if (keys$16.isNullAt(index$18)) {
                  found$19 = true;
                } else {
                  index$18++;
                }
              }
            } else {
              while (index$18 < length$15 && !found$19) {
                final org.apache.flink.table.data.binary.BinaryStringData 
key$14 = ((org.apache.flink.table.data.binary.BinaryStringData) 
keys$16.getString(index$18));
                
          
          
          isNull$23 = false || false;
          result$24 = false;
          if (!isNull$23) {
            
          
          result$24 = ((org.apache.flink.table.data.binary.BinaryStringData) 
str$12).equals(key$14);
          
            
          }
          
                if (result$24) {
                  found$19 = true;
                } else {
                  index$18++;
                }
              }
            }
          
            if (!found$19 || values$17.isNullAt(index$18)) {
              isNull$13 = true;
            } else {
              result$13 = 
((org.apache.flink.table.data.binary.BinaryStringData) 
values$17.getString(index$18));
            }
          } else {
            org.apache.flink.table.data.GenericMapData genericMap$22 = 
(org.apache.flink.table.data.GenericMapData) result$9;
            org.apache.flink.table.data.binary.BinaryStringData value$20 =
              (org.apache.flink.table.data.binary.BinaryStringData) 
genericMap$22.get((org.apache.flink.table.data.binary.BinaryStringData) 
((org.apache.flink.table.data.binary.BinaryStringData) str$12));
            if (value$20 == null) {
              isNull$13 = true;
            } else {
              result$13 = value$20;
            }
          }
                  
          }
                  
          if (isNull$13) {
            out.setNullAt(1);
          } else {
            out.setNonPrimitiveValue(1, result$13);
          }
                    
                  
          output.collect(outElement.replace(out));
          
          
        }

        

        @Override
        public void close() throws Exception {
           super.close();
          
          function_MyFun1.close();
                 
          
          function_MyFun2.close();
                 
        }

        
      }
    

public class ToBinary$26 implements 
org.apache.flink.table.runtime.generated.Projection<org.apache.flink.table.data.RowData,
 org.apache.flink.table.data.binary.BinaryRowData> {

  org.apache.flink.table.data.binary.BinaryRowData out = new 
org.apache.flink.table.data.binary.BinaryRowData(2);
org.apache.flink.table.data.writer.BinaryRowWriter outWriter = new 
org.apache.flink.table.data.writer.BinaryRowWriter(out);

  public ToBinary$26(Object[] references) throws Exception {
    
  }

  @Override
  public org.apache.flink.table.data.binary.BinaryRowData 
apply(org.apache.flink.table.data.RowData in1) {
    
if (in1 instanceof org.apache.flink.table.data.binary.BinaryRowData) {
  return ((org.apache.flink.table.data.binary.BinaryRowData) in1);
}

    innerApply(in1);
    return out;
  }

  /* Fit into JavaCodeSplitter's void function limitation. */
  private void innerApply(org.apache.flink.table.data.RowData in1) {
    
    
outWriter.reset();


if (in1.isNullAt(0)) {
  outWriter.setNullAt(0);
} else {
  outWriter.writeString(0, 
((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0)));
}
             


if (in1.isNullAt(1)) {
  outWriter.setNullAt(1);
} else {
  outWriter.writeString(1, 
((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1)));
}
             
outWriter.complete();
        out.setRowKind(in1.getRowKind());
  }
}
        

public class ToBinary$27 implements 
org.apache.flink.table.runtime.generated.Projection<org.apache.flink.table.data.RowData,
 org.apache.flink.table.data.binary.BinaryRowData> {

  org.apache.flink.table.data.binary.BinaryRowData out = new 
org.apache.flink.table.data.binary.BinaryRowData(2);
org.apache.flink.table.data.writer.BinaryRowWriter outWriter = new 
org.apache.flink.table.data.writer.BinaryRowWriter(out);

  public ToBinary$27(Object[] references) throws Exception {
    
  }

  @Override
  public org.apache.flink.table.data.binary.BinaryRowData 
apply(org.apache.flink.table.data.RowData in1) {
    
if (in1 instanceof org.apache.flink.table.data.binary.BinaryRowData) {
  return ((org.apache.flink.table.data.binary.BinaryRowData) in1);
}

    innerApply(in1);
    return out;
  }

  /* Fit into JavaCodeSplitter's void function limitation. */
  private void innerApply(org.apache.flink.table.data.RowData in1) {
    
    
outWriter.reset();


if (in1.isNullAt(0)) {
  outWriter.setNullAt(0);
} else {
  outWriter.writeString(0, 
((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0)));
}
             


if (in1.isNullAt(1)) {
  outWriter.setNullAt(1);
} else {
  outWriter.writeString(1, 
((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1)));
}
             
outWriter.complete();
        out.setRowKind(in1.getRowKind());
  }
}
        

public class ToBinary$28 implements 
org.apache.flink.table.runtime.generated.Projection<org.apache.flink.table.data.RowData,
 org.apache.flink.table.data.binary.BinaryRowData> {

  org.apache.flink.table.data.binary.BinaryRowData out = new 
org.apache.flink.table.data.binary.BinaryRowData(2);
org.apache.flink.table.data.writer.BinaryRowWriter outWriter = new 
org.apache.flink.table.data.writer.BinaryRowWriter(out);

  public ToBinary$28(Object[] references) throws Exception {
    
  }

  @Override
  public org.apache.flink.table.data.binary.BinaryRowData 
apply(org.apache.flink.table.data.RowData in1) {
    
if (in1 instanceof org.apache.flink.table.data.binary.BinaryRowData) {
  return ((org.apache.flink.table.data.binary.BinaryRowData) in1);
}

    innerApply(in1);
    return out;
  }

  /* Fit into JavaCodeSplitter's void function limitation. */
  private void innerApply(org.apache.flink.table.data.RowData in1) {
    
    
outWriter.reset();


if (in1.isNullAt(0)) {
  outWriter.setNullAt(0);
} else {
  outWriter.writeString(0, 
((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0)));
}
             


if (in1.isNullAt(1)) {
  outWriter.setNullAt(1);
} else {
  outWriter.writeString(1, 
((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1)));
}
             
outWriter.complete();
        out.setRowKind(in1.getRowKind());
  }
}
{code}

You can see that {{result$9}} is never assigned a value, causing this bug.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to