Caizhi Weng created FLINK-28190: ----------------------------------- Summary: NullPointerException is thrown if the intermediate result of nesting UDFs is used Key: FLINK-28190 URL: https://issues.apache.org/jira/browse/FLINK-28190 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.14.5, 1.15.0 Reporter: Caizhi Weng
Add the following test case to {{TableEnvironmentITCase}} to reproduce this bug. {code:scala} @Test def myTest(): Unit = { tEnv.executeSql("create temporary function myfun1 as 'MyFun1'") tEnv.executeSql("create temporary function myfun2 as 'MyFun2'") val data: Seq[Row] = Seq( Row.of("Hi", "Hello") ) tEnv.executeSql( s""" |create table T ( | a string, | b string |) with ( | 'connector' = 'values', | 'data-id' = '${TestValuesTableFactory.registerData(data)}', | 'bounded' = 'true' |) |""".stripMargin) tEnv.executeSql("create temporary view my_view as select myfun1(a, b) as mp from T") tEnv.executeSql("select myfun2(mp), mp['Hi'] from my_view").print() } {code} UDF classes are {code:java} import org.apache.flink.table.functions.ScalarFunction; import java.util.HashMap; import java.util.Map; public class MyFun1 extends ScalarFunction { public Map<String, String> eval(String k, String v) { Map<String, String> returnMap = new HashMap<>(); returnMap.put(k, v); return returnMap; } } {code} {code:java} import org.apache.flink.table.functions.ScalarFunction; import java.util.Map; public class MyFun2 extends ScalarFunction { public String eval(Map<String, String> input) { return String.valueOf(input); } } {code} The exception stack is {code} Caused by: java.lang.NullPointerException at StreamExecCalc$25.processElement_split1(Unknown Source) at StreamExecCalc$25.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99) at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80) at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:418) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:513) at org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:103) at org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions$FromElementSourceFunction.run(TestValuesRuntimeFunctions.java:530) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:332) {code} The generated code is {code} public class ToBinary$0 implements org.apache.flink.table.runtime.generated.Projection<org.apache.flink.table.data.RowData, org.apache.flink.table.data.binary.BinaryRowData> { org.apache.flink.table.data.binary.BinaryRowData out = new org.apache.flink.table.data.binary.BinaryRowData(2); org.apache.flink.table.data.writer.BinaryRowWriter outWriter = new org.apache.flink.table.data.writer.BinaryRowWriter(out); public ToBinary$0(Object[] references) throws Exception { } @Override public org.apache.flink.table.data.binary.BinaryRowData apply(org.apache.flink.table.data.RowData in1) { if (in1 instanceof org.apache.flink.table.data.binary.BinaryRowData) { return ((org.apache.flink.table.data.binary.BinaryRowData) in1); } innerApply(in1); return out; } /* Fit into JavaCodeSplitter's void function limitation. */ private void innerApply(org.apache.flink.table.data.RowData in1) { outWriter.reset(); if (in1.isNullAt(0)) { outWriter.setNullAt(0); } else { outWriter.writeString(0, ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0))); } if (in1.isNullAt(1)) { outWriter.setNullAt(1); } else { outWriter.writeString(1, ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1))); } outWriter.complete(); out.setRowKind(in1.getRowKind()); } } public class ToBinary$1 implements org.apache.flink.table.runtime.generated.Projection<org.apache.flink.table.data.RowData, org.apache.flink.table.data.binary.BinaryRowData> { org.apache.flink.table.data.binary.BinaryRowData out = new org.apache.flink.table.data.binary.BinaryRowData(2); org.apache.flink.table.data.writer.BinaryRowWriter outWriter = new org.apache.flink.table.data.writer.BinaryRowWriter(out); public ToBinary$1(Object[] references) throws Exception { } @Override public org.apache.flink.table.data.binary.BinaryRowData apply(org.apache.flink.table.data.RowData in1) { if (in1 instanceof org.apache.flink.table.data.binary.BinaryRowData) { return ((org.apache.flink.table.data.binary.BinaryRowData) in1); } innerApply(in1); return out; } /* Fit into JavaCodeSplitter's void function limitation. */ private void innerApply(org.apache.flink.table.data.RowData in1) { outWriter.reset(); if (in1.isNullAt(0)) { outWriter.setNullAt(0); } else { outWriter.writeString(0, ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0))); } if (in1.isNullAt(1)) { outWriter.setNullAt(1); } else { outWriter.writeString(1, ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1))); } outWriter.complete(); out.setRowKind(in1.getRowKind()); } } public class ToBinary$2 implements org.apache.flink.table.runtime.generated.Projection<org.apache.flink.table.data.RowData, org.apache.flink.table.data.binary.BinaryRowData> { org.apache.flink.table.data.binary.BinaryRowData out = new org.apache.flink.table.data.binary.BinaryRowData(2); org.apache.flink.table.data.writer.BinaryRowWriter outWriter = new org.apache.flink.table.data.writer.BinaryRowWriter(out); public ToBinary$2(Object[] references) throws Exception { } @Override public org.apache.flink.table.data.binary.BinaryRowData apply(org.apache.flink.table.data.RowData in1) { if (in1 instanceof org.apache.flink.table.data.binary.BinaryRowData) { return ((org.apache.flink.table.data.binary.BinaryRowData) in1); } innerApply(in1); return out; } /* Fit into JavaCodeSplitter's void function limitation. */ private void innerApply(org.apache.flink.table.data.RowData in1) { outWriter.reset(); if (in1.isNullAt(0)) { outWriter.setNullAt(0); } else { outWriter.writeString(0, ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0))); } if (in1.isNullAt(1)) { outWriter.setNullAt(1); } else { outWriter.writeString(1, ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1))); } outWriter.complete(); out.setRowKind(in1.getRowKind()); } } public class ToBinary$3 implements org.apache.flink.table.runtime.generated.Projection<org.apache.flink.table.data.RowData, org.apache.flink.table.data.binary.BinaryRowData> { org.apache.flink.table.data.binary.BinaryRowData out = new org.apache.flink.table.data.binary.BinaryRowData(2); org.apache.flink.table.data.writer.BinaryRowWriter outWriter = new org.apache.flink.table.data.writer.BinaryRowWriter(out); public ToBinary$3(Object[] references) throws Exception { } @Override public org.apache.flink.table.data.binary.BinaryRowData apply(org.apache.flink.table.data.RowData in1) { if (in1 instanceof org.apache.flink.table.data.binary.BinaryRowData) { return ((org.apache.flink.table.data.binary.BinaryRowData) in1); } innerApply(in1); return out; } /* Fit into JavaCodeSplitter's void function limitation. */ private void innerApply(org.apache.flink.table.data.RowData in1) { outWriter.reset(); if (in1.isNullAt(0)) { outWriter.setNullAt(0); } else { outWriter.writeString(0, ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0))); } if (in1.isNullAt(1)) { outWriter.setNullAt(1); } else { outWriter.writeString(1, ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1))); } outWriter.complete(); out.setRowKind(in1.getRowKind()); } } public class StreamExecCalc$25 extends org.apache.flink.table.runtime.operators.TableStreamOperator implements org.apache.flink.streaming.api.operators.OneInputStreamOperator { private final Object[] references; private transient MyFun1 function_MyFun1; private transient org.apache.flink.table.data.conversion.StringStringConverter converter$6; private transient org.apache.flink.table.data.conversion.MapMapConverter converter$8; private transient MyFun2 function_MyFun2; private final org.apache.flink.table.data.binary.BinaryStringData str$12 = org.apache.flink.table.data.binary.BinaryStringData.fromString("Hi"); org.apache.flink.table.data.BoxedWrapperRowData out = new org.apache.flink.table.data.BoxedWrapperRowData(2); private final org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null); public StreamExecCalc$25( Object[] references, org.apache.flink.streaming.runtime.tasks.StreamTask task, org.apache.flink.streaming.api.graph.StreamConfig config, org.apache.flink.streaming.api.operators.Output output, org.apache.flink.streaming.runtime.tasks.ProcessingTimeService processingTimeService) throws Exception { this.references = references; function_MyFun1 = (((MyFun1) references[0])); converter$6 = (((org.apache.flink.table.data.conversion.StringStringConverter) references[1])); converter$8 = (((org.apache.flink.table.data.conversion.MapMapConverter) references[2])); function_MyFun2 = (((MyFun2) references[3])); this.setup(task, config, output); if (this instanceof org.apache.flink.streaming.api.operators.AbstractStreamOperator) { ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this) .setProcessingTimeService(processingTimeService); } } @Override public void open() throws Exception { super.open(); function_MyFun1.open(new org.apache.flink.table.functions.FunctionContext(getRuntimeContext())); converter$6.open(getRuntimeContext().getUserCodeClassLoader()); converter$8.open(getRuntimeContext().getUserCodeClassLoader()); function_MyFun2.open(new org.apache.flink.table.functions.FunctionContext(getRuntimeContext())); } @Override public void processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element) throws Exception { org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) element.getValue(); org.apache.flink.table.data.binary.BinaryStringData field$4; boolean isNull$4; org.apache.flink.table.data.binary.BinaryStringData field$5; boolean isNull$5; java.util.Map externalResult$7; org.apache.flink.table.data.MapData result$9; boolean isNull$9; java.lang.String externalResult$10; org.apache.flink.table.data.binary.BinaryStringData result$11; boolean isNull$11; boolean isNull$23 = false; boolean result$24 = false; isNull$4 = in1.isNullAt(0); field$4 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8; if (!isNull$4) { field$4 = ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0)); } isNull$5 = in1.isNullAt(1); field$5 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8; if (!isNull$5) { field$5 = ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1)); } out.setRowKind(in1.getRowKind()); externalResult$7 = (java.util.Map) function_MyFun1 .eval(isNull$4 ? null : ((java.lang.String) converter$6.toExternal((org.apache.flink.table.data.binary.BinaryStringData) field$4)), isNull$5 ? null : ((java.lang.String) converter$6.toExternal((org.apache.flink.table.data.binary.BinaryStringData) field$5))); externalResult$10 = (java.lang.String) function_MyFun2 .eval(externalResult$7); isNull$11 = externalResult$10 == null; result$11 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8; if (!isNull$11) { result$11 = (org.apache.flink.table.data.binary.BinaryStringData) converter$6.toInternalOrNull((java.lang.String) externalResult$10); } if (isNull$11) { out.setNullAt(0); } else { out.setNonPrimitiveValue(0, result$11); } boolean isNull$13 = (isNull$9 || false); org.apache.flink.table.data.binary.BinaryStringData result$13 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8; if (!isNull$13) { if (result$9 instanceof org.apache.flink.table.data.binary.BinaryMapData) { org.apache.flink.table.data.binary.BinaryMapData binaryMap$21 = (org.apache.flink.table.data.binary.BinaryMapData) result$9; final int length$15 = binaryMap$21.size(); final org.apache.flink.table.data.binary.BinaryArrayData keys$16 = binaryMap$21.keyArray(); final org.apache.flink.table.data.binary.BinaryArrayData values$17 = binaryMap$21.valueArray(); int index$18 = 0; boolean found$19 = false; if (false) { while (index$18 < length$15 && !found$19) { if (keys$16.isNullAt(index$18)) { found$19 = true; } else { index$18++; } } } else { while (index$18 < length$15 && !found$19) { final org.apache.flink.table.data.binary.BinaryStringData key$14 = ((org.apache.flink.table.data.binary.BinaryStringData) keys$16.getString(index$18)); isNull$23 = false || false; result$24 = false; if (!isNull$23) { result$24 = ((org.apache.flink.table.data.binary.BinaryStringData) str$12).equals(key$14); } if (result$24) { found$19 = true; } else { index$18++; } } } if (!found$19 || values$17.isNullAt(index$18)) { isNull$13 = true; } else { result$13 = ((org.apache.flink.table.data.binary.BinaryStringData) values$17.getString(index$18)); } } else { org.apache.flink.table.data.GenericMapData genericMap$22 = (org.apache.flink.table.data.GenericMapData) result$9; org.apache.flink.table.data.binary.BinaryStringData value$20 = (org.apache.flink.table.data.binary.BinaryStringData) genericMap$22.get((org.apache.flink.table.data.binary.BinaryStringData) ((org.apache.flink.table.data.binary.BinaryStringData) str$12)); if (value$20 == null) { isNull$13 = true; } else { result$13 = value$20; } } } if (isNull$13) { out.setNullAt(1); } else { out.setNonPrimitiveValue(1, result$13); } output.collect(outElement.replace(out)); } @Override public void close() throws Exception { super.close(); function_MyFun1.close(); function_MyFun2.close(); } } public class ToBinary$26 implements org.apache.flink.table.runtime.generated.Projection<org.apache.flink.table.data.RowData, org.apache.flink.table.data.binary.BinaryRowData> { org.apache.flink.table.data.binary.BinaryRowData out = new org.apache.flink.table.data.binary.BinaryRowData(2); org.apache.flink.table.data.writer.BinaryRowWriter outWriter = new org.apache.flink.table.data.writer.BinaryRowWriter(out); public ToBinary$26(Object[] references) throws Exception { } @Override public org.apache.flink.table.data.binary.BinaryRowData apply(org.apache.flink.table.data.RowData in1) { if (in1 instanceof org.apache.flink.table.data.binary.BinaryRowData) { return ((org.apache.flink.table.data.binary.BinaryRowData) in1); } innerApply(in1); return out; } /* Fit into JavaCodeSplitter's void function limitation. */ private void innerApply(org.apache.flink.table.data.RowData in1) { outWriter.reset(); if (in1.isNullAt(0)) { outWriter.setNullAt(0); } else { outWriter.writeString(0, ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0))); } if (in1.isNullAt(1)) { outWriter.setNullAt(1); } else { outWriter.writeString(1, ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1))); } outWriter.complete(); out.setRowKind(in1.getRowKind()); } } public class ToBinary$27 implements org.apache.flink.table.runtime.generated.Projection<org.apache.flink.table.data.RowData, org.apache.flink.table.data.binary.BinaryRowData> { org.apache.flink.table.data.binary.BinaryRowData out = new org.apache.flink.table.data.binary.BinaryRowData(2); org.apache.flink.table.data.writer.BinaryRowWriter outWriter = new org.apache.flink.table.data.writer.BinaryRowWriter(out); public ToBinary$27(Object[] references) throws Exception { } @Override public org.apache.flink.table.data.binary.BinaryRowData apply(org.apache.flink.table.data.RowData in1) { if (in1 instanceof org.apache.flink.table.data.binary.BinaryRowData) { return ((org.apache.flink.table.data.binary.BinaryRowData) in1); } innerApply(in1); return out; } /* Fit into JavaCodeSplitter's void function limitation. */ private void innerApply(org.apache.flink.table.data.RowData in1) { outWriter.reset(); if (in1.isNullAt(0)) { outWriter.setNullAt(0); } else { outWriter.writeString(0, ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0))); } if (in1.isNullAt(1)) { outWriter.setNullAt(1); } else { outWriter.writeString(1, ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1))); } outWriter.complete(); out.setRowKind(in1.getRowKind()); } } public class ToBinary$28 implements org.apache.flink.table.runtime.generated.Projection<org.apache.flink.table.data.RowData, org.apache.flink.table.data.binary.BinaryRowData> { org.apache.flink.table.data.binary.BinaryRowData out = new org.apache.flink.table.data.binary.BinaryRowData(2); org.apache.flink.table.data.writer.BinaryRowWriter outWriter = new org.apache.flink.table.data.writer.BinaryRowWriter(out); public ToBinary$28(Object[] references) throws Exception { } @Override public org.apache.flink.table.data.binary.BinaryRowData apply(org.apache.flink.table.data.RowData in1) { if (in1 instanceof org.apache.flink.table.data.binary.BinaryRowData) { return ((org.apache.flink.table.data.binary.BinaryRowData) in1); } innerApply(in1); return out; } /* Fit into JavaCodeSplitter's void function limitation. */ private void innerApply(org.apache.flink.table.data.RowData in1) { outWriter.reset(); if (in1.isNullAt(0)) { outWriter.setNullAt(0); } else { outWriter.writeString(0, ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0))); } if (in1.isNullAt(1)) { outWriter.setNullAt(1); } else { outWriter.writeString(1, ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1))); } outWriter.complete(); out.setRowKind(in1.getRowKind()); } } {code} You can see that {{result$9}} is never assigned a value, causing this bug. -- This message was sent by Atlassian Jira (v8.20.7#820007)