Caizhi Weng created FLINK-28190:
-----------------------------------
Summary: NullPointerException is thrown if the intermediate result
of nesting UDFs is used
Key: FLINK-28190
URL: https://issues.apache.org/jira/browse/FLINK-28190
Project: Flink
Issue Type: Bug
Components: Table SQL / Runtime
Affects Versions: 1.14.5, 1.15.0
Reporter: Caizhi Weng
Add the following test case to {{TableEnvironmentITCase}} to reproduce this bug.
{code:scala}
@Test
def myTest(): Unit = {
tEnv.executeSql("create temporary function myfun1 as 'MyFun1'")
tEnv.executeSql("create temporary function myfun2 as 'MyFun2'")
val data: Seq[Row] = Seq(
Row.of("Hi", "Hello")
)
tEnv.executeSql(
s"""
|create table T (
| a string,
| b string
|) with (
| 'connector' = 'values',
| 'data-id' = '${TestValuesTableFactory.registerData(data)}',
| 'bounded' = 'true'
|)
|""".stripMargin)
tEnv.executeSql("create temporary view my_view as select myfun1(a, b) as mp
from T")
tEnv.executeSql("select myfun2(mp), mp['Hi'] from my_view").print()
}
{code}
UDF classes are
{code:java}
import org.apache.flink.table.functions.ScalarFunction;
import java.util.HashMap;
import java.util.Map;
public class MyFun1 extends ScalarFunction {
public Map<String, String> eval(String k, String v) {
Map<String, String> returnMap = new HashMap<>();
returnMap.put(k, v);
return returnMap;
}
}
{code}
{code:java}
import org.apache.flink.table.functions.ScalarFunction;
import java.util.Map;
public class MyFun2 extends ScalarFunction {
public String eval(Map<String, String> input) {
return String.valueOf(input);
}
}
{code}
The exception stack is
{code}
Caused by: java.lang.NullPointerException
at StreamExecCalc$25.processElement_split1(Unknown Source)
at StreamExecCalc$25.processElement(Unknown Source)
at
org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)
at
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80)
at
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:418)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:513)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:103)
at
org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions$FromElementSourceFunction.run(TestValuesRuntimeFunctions.java:530)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:332)
{code}
The generated code is
{code}
public class ToBinary$0 implements
org.apache.flink.table.runtime.generated.Projection<org.apache.flink.table.data.RowData,
org.apache.flink.table.data.binary.BinaryRowData> {
org.apache.flink.table.data.binary.BinaryRowData out = new
org.apache.flink.table.data.binary.BinaryRowData(2);
org.apache.flink.table.data.writer.BinaryRowWriter outWriter = new
org.apache.flink.table.data.writer.BinaryRowWriter(out);
public ToBinary$0(Object[] references) throws Exception {
}
@Override
public org.apache.flink.table.data.binary.BinaryRowData
apply(org.apache.flink.table.data.RowData in1) {
if (in1 instanceof org.apache.flink.table.data.binary.BinaryRowData) {
return ((org.apache.flink.table.data.binary.BinaryRowData) in1);
}
innerApply(in1);
return out;
}
/* Fit into JavaCodeSplitter's void function limitation. */
private void innerApply(org.apache.flink.table.data.RowData in1) {
outWriter.reset();
if (in1.isNullAt(0)) {
outWriter.setNullAt(0);
} else {
outWriter.writeString(0,
((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0)));
}
if (in1.isNullAt(1)) {
outWriter.setNullAt(1);
} else {
outWriter.writeString(1,
((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1)));
}
outWriter.complete();
out.setRowKind(in1.getRowKind());
}
}
public class ToBinary$1 implements
org.apache.flink.table.runtime.generated.Projection<org.apache.flink.table.data.RowData,
org.apache.flink.table.data.binary.BinaryRowData> {
org.apache.flink.table.data.binary.BinaryRowData out = new
org.apache.flink.table.data.binary.BinaryRowData(2);
org.apache.flink.table.data.writer.BinaryRowWriter outWriter = new
org.apache.flink.table.data.writer.BinaryRowWriter(out);
public ToBinary$1(Object[] references) throws Exception {
}
@Override
public org.apache.flink.table.data.binary.BinaryRowData
apply(org.apache.flink.table.data.RowData in1) {
if (in1 instanceof org.apache.flink.table.data.binary.BinaryRowData) {
return ((org.apache.flink.table.data.binary.BinaryRowData) in1);
}
innerApply(in1);
return out;
}
/* Fit into JavaCodeSplitter's void function limitation. */
private void innerApply(org.apache.flink.table.data.RowData in1) {
outWriter.reset();
if (in1.isNullAt(0)) {
outWriter.setNullAt(0);
} else {
outWriter.writeString(0,
((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0)));
}
if (in1.isNullAt(1)) {
outWriter.setNullAt(1);
} else {
outWriter.writeString(1,
((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1)));
}
outWriter.complete();
out.setRowKind(in1.getRowKind());
}
}
public class ToBinary$2 implements
org.apache.flink.table.runtime.generated.Projection<org.apache.flink.table.data.RowData,
org.apache.flink.table.data.binary.BinaryRowData> {
org.apache.flink.table.data.binary.BinaryRowData out = new
org.apache.flink.table.data.binary.BinaryRowData(2);
org.apache.flink.table.data.writer.BinaryRowWriter outWriter = new
org.apache.flink.table.data.writer.BinaryRowWriter(out);
public ToBinary$2(Object[] references) throws Exception {
}
@Override
public org.apache.flink.table.data.binary.BinaryRowData
apply(org.apache.flink.table.data.RowData in1) {
if (in1 instanceof org.apache.flink.table.data.binary.BinaryRowData) {
return ((org.apache.flink.table.data.binary.BinaryRowData) in1);
}
innerApply(in1);
return out;
}
/* Fit into JavaCodeSplitter's void function limitation. */
private void innerApply(org.apache.flink.table.data.RowData in1) {
outWriter.reset();
if (in1.isNullAt(0)) {
outWriter.setNullAt(0);
} else {
outWriter.writeString(0,
((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0)));
}
if (in1.isNullAt(1)) {
outWriter.setNullAt(1);
} else {
outWriter.writeString(1,
((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1)));
}
outWriter.complete();
out.setRowKind(in1.getRowKind());
}
}
public class ToBinary$3 implements
org.apache.flink.table.runtime.generated.Projection<org.apache.flink.table.data.RowData,
org.apache.flink.table.data.binary.BinaryRowData> {
org.apache.flink.table.data.binary.BinaryRowData out = new
org.apache.flink.table.data.binary.BinaryRowData(2);
org.apache.flink.table.data.writer.BinaryRowWriter outWriter = new
org.apache.flink.table.data.writer.BinaryRowWriter(out);
public ToBinary$3(Object[] references) throws Exception {
}
@Override
public org.apache.flink.table.data.binary.BinaryRowData
apply(org.apache.flink.table.data.RowData in1) {
if (in1 instanceof org.apache.flink.table.data.binary.BinaryRowData) {
return ((org.apache.flink.table.data.binary.BinaryRowData) in1);
}
innerApply(in1);
return out;
}
/* Fit into JavaCodeSplitter's void function limitation. */
private void innerApply(org.apache.flink.table.data.RowData in1) {
outWriter.reset();
if (in1.isNullAt(0)) {
outWriter.setNullAt(0);
} else {
outWriter.writeString(0,
((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0)));
}
if (in1.isNullAt(1)) {
outWriter.setNullAt(1);
} else {
outWriter.writeString(1,
((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1)));
}
outWriter.complete();
out.setRowKind(in1.getRowKind());
}
}
public class StreamExecCalc$25 extends
org.apache.flink.table.runtime.operators.TableStreamOperator
implements
org.apache.flink.streaming.api.operators.OneInputStreamOperator {
private final Object[] references;
private transient MyFun1 function_MyFun1;
private transient
org.apache.flink.table.data.conversion.StringStringConverter converter$6;
private transient
org.apache.flink.table.data.conversion.MapMapConverter converter$8;
private transient MyFun2 function_MyFun2;
private final org.apache.flink.table.data.binary.BinaryStringData
str$12 = org.apache.flink.table.data.binary.BinaryStringData.fromString("Hi");
org.apache.flink.table.data.BoxedWrapperRowData out = new
org.apache.flink.table.data.BoxedWrapperRowData(2);
private final
org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new
org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
public StreamExecCalc$25(
Object[] references,
org.apache.flink.streaming.runtime.tasks.StreamTask task,
org.apache.flink.streaming.api.graph.StreamConfig config,
org.apache.flink.streaming.api.operators.Output output,
org.apache.flink.streaming.runtime.tasks.ProcessingTimeService
processingTimeService) throws Exception {
this.references = references;
function_MyFun1 = (((MyFun1) references[0]));
converter$6 =
(((org.apache.flink.table.data.conversion.StringStringConverter)
references[1]));
converter$8 =
(((org.apache.flink.table.data.conversion.MapMapConverter) references[2]));
function_MyFun2 = (((MyFun2) references[3]));
this.setup(task, config, output);
if (this instanceof
org.apache.flink.streaming.api.operators.AbstractStreamOperator) {
((org.apache.flink.streaming.api.operators.AbstractStreamOperator)
this)
.setProcessingTimeService(processingTimeService);
}
}
@Override
public void open() throws Exception {
super.open();
function_MyFun1.open(new
org.apache.flink.table.functions.FunctionContext(getRuntimeContext()));
converter$6.open(getRuntimeContext().getUserCodeClassLoader());
converter$8.open(getRuntimeContext().getUserCodeClassLoader());
function_MyFun2.open(new
org.apache.flink.table.functions.FunctionContext(getRuntimeContext()));
}
@Override
public void
processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord
element) throws Exception {
org.apache.flink.table.data.RowData in1 =
(org.apache.flink.table.data.RowData) element.getValue();
org.apache.flink.table.data.binary.BinaryStringData field$4;
boolean isNull$4;
org.apache.flink.table.data.binary.BinaryStringData field$5;
boolean isNull$5;
java.util.Map externalResult$7;
org.apache.flink.table.data.MapData result$9;
boolean isNull$9;
java.lang.String externalResult$10;
org.apache.flink.table.data.binary.BinaryStringData result$11;
boolean isNull$11;
boolean isNull$23 = false;
boolean result$24 = false;
isNull$4 = in1.isNullAt(0);
field$4 =
org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
if (!isNull$4) {
field$4 = ((org.apache.flink.table.data.binary.BinaryStringData)
in1.getString(0));
}
isNull$5 = in1.isNullAt(1);
field$5 =
org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
if (!isNull$5) {
field$5 = ((org.apache.flink.table.data.binary.BinaryStringData)
in1.getString(1));
}
out.setRowKind(in1.getRowKind());
externalResult$7 = (java.util.Map) function_MyFun1
.eval(isNull$4 ? null : ((java.lang.String)
converter$6.toExternal((org.apache.flink.table.data.binary.BinaryStringData)
field$4)), isNull$5 ? null : ((java.lang.String)
converter$6.toExternal((org.apache.flink.table.data.binary.BinaryStringData)
field$5)));
externalResult$10 = (java.lang.String) function_MyFun2
.eval(externalResult$7);
isNull$11 = externalResult$10 == null;
result$11 =
org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
if (!isNull$11) {
result$11 = (org.apache.flink.table.data.binary.BinaryStringData)
converter$6.toInternalOrNull((java.lang.String) externalResult$10);
}
if (isNull$11) {
out.setNullAt(0);
} else {
out.setNonPrimitiveValue(0, result$11);
}
boolean isNull$13 = (isNull$9 || false);
org.apache.flink.table.data.binary.BinaryStringData result$13 =
org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
if (!isNull$13) {
if (result$9 instanceof
org.apache.flink.table.data.binary.BinaryMapData) {
org.apache.flink.table.data.binary.BinaryMapData binaryMap$21 =
(org.apache.flink.table.data.binary.BinaryMapData) result$9;
final int length$15 = binaryMap$21.size();
final org.apache.flink.table.data.binary.BinaryArrayData keys$16 =
binaryMap$21.keyArray();
final org.apache.flink.table.data.binary.BinaryArrayData values$17
= binaryMap$21.valueArray();
int index$18 = 0;
boolean found$19 = false;
if (false) {
while (index$18 < length$15 && !found$19) {
if (keys$16.isNullAt(index$18)) {
found$19 = true;
} else {
index$18++;
}
}
} else {
while (index$18 < length$15 && !found$19) {
final org.apache.flink.table.data.binary.BinaryStringData
key$14 = ((org.apache.flink.table.data.binary.BinaryStringData)
keys$16.getString(index$18));
isNull$23 = false || false;
result$24 = false;
if (!isNull$23) {
result$24 = ((org.apache.flink.table.data.binary.BinaryStringData)
str$12).equals(key$14);
}
if (result$24) {
found$19 = true;
} else {
index$18++;
}
}
}
if (!found$19 || values$17.isNullAt(index$18)) {
isNull$13 = true;
} else {
result$13 =
((org.apache.flink.table.data.binary.BinaryStringData)
values$17.getString(index$18));
}
} else {
org.apache.flink.table.data.GenericMapData genericMap$22 =
(org.apache.flink.table.data.GenericMapData) result$9;
org.apache.flink.table.data.binary.BinaryStringData value$20 =
(org.apache.flink.table.data.binary.BinaryStringData)
genericMap$22.get((org.apache.flink.table.data.binary.BinaryStringData)
((org.apache.flink.table.data.binary.BinaryStringData) str$12));
if (value$20 == null) {
isNull$13 = true;
} else {
result$13 = value$20;
}
}
}
if (isNull$13) {
out.setNullAt(1);
} else {
out.setNonPrimitiveValue(1, result$13);
}
output.collect(outElement.replace(out));
}
@Override
public void close() throws Exception {
super.close();
function_MyFun1.close();
function_MyFun2.close();
}
}
public class ToBinary$26 implements
org.apache.flink.table.runtime.generated.Projection<org.apache.flink.table.data.RowData,
org.apache.flink.table.data.binary.BinaryRowData> {
org.apache.flink.table.data.binary.BinaryRowData out = new
org.apache.flink.table.data.binary.BinaryRowData(2);
org.apache.flink.table.data.writer.BinaryRowWriter outWriter = new
org.apache.flink.table.data.writer.BinaryRowWriter(out);
public ToBinary$26(Object[] references) throws Exception {
}
@Override
public org.apache.flink.table.data.binary.BinaryRowData
apply(org.apache.flink.table.data.RowData in1) {
if (in1 instanceof org.apache.flink.table.data.binary.BinaryRowData) {
return ((org.apache.flink.table.data.binary.BinaryRowData) in1);
}
innerApply(in1);
return out;
}
/* Fit into JavaCodeSplitter's void function limitation. */
private void innerApply(org.apache.flink.table.data.RowData in1) {
outWriter.reset();
if (in1.isNullAt(0)) {
outWriter.setNullAt(0);
} else {
outWriter.writeString(0,
((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0)));
}
if (in1.isNullAt(1)) {
outWriter.setNullAt(1);
} else {
outWriter.writeString(1,
((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1)));
}
outWriter.complete();
out.setRowKind(in1.getRowKind());
}
}
public class ToBinary$27 implements
org.apache.flink.table.runtime.generated.Projection<org.apache.flink.table.data.RowData,
org.apache.flink.table.data.binary.BinaryRowData> {
org.apache.flink.table.data.binary.BinaryRowData out = new
org.apache.flink.table.data.binary.BinaryRowData(2);
org.apache.flink.table.data.writer.BinaryRowWriter outWriter = new
org.apache.flink.table.data.writer.BinaryRowWriter(out);
public ToBinary$27(Object[] references) throws Exception {
}
@Override
public org.apache.flink.table.data.binary.BinaryRowData
apply(org.apache.flink.table.data.RowData in1) {
if (in1 instanceof org.apache.flink.table.data.binary.BinaryRowData) {
return ((org.apache.flink.table.data.binary.BinaryRowData) in1);
}
innerApply(in1);
return out;
}
/* Fit into JavaCodeSplitter's void function limitation. */
private void innerApply(org.apache.flink.table.data.RowData in1) {
outWriter.reset();
if (in1.isNullAt(0)) {
outWriter.setNullAt(0);
} else {
outWriter.writeString(0,
((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0)));
}
if (in1.isNullAt(1)) {
outWriter.setNullAt(1);
} else {
outWriter.writeString(1,
((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1)));
}
outWriter.complete();
out.setRowKind(in1.getRowKind());
}
}
public class ToBinary$28 implements
org.apache.flink.table.runtime.generated.Projection<org.apache.flink.table.data.RowData,
org.apache.flink.table.data.binary.BinaryRowData> {
org.apache.flink.table.data.binary.BinaryRowData out = new
org.apache.flink.table.data.binary.BinaryRowData(2);
org.apache.flink.table.data.writer.BinaryRowWriter outWriter = new
org.apache.flink.table.data.writer.BinaryRowWriter(out);
public ToBinary$28(Object[] references) throws Exception {
}
@Override
public org.apache.flink.table.data.binary.BinaryRowData
apply(org.apache.flink.table.data.RowData in1) {
if (in1 instanceof org.apache.flink.table.data.binary.BinaryRowData) {
return ((org.apache.flink.table.data.binary.BinaryRowData) in1);
}
innerApply(in1);
return out;
}
/* Fit into JavaCodeSplitter's void function limitation. */
private void innerApply(org.apache.flink.table.data.RowData in1) {
outWriter.reset();
if (in1.isNullAt(0)) {
outWriter.setNullAt(0);
} else {
outWriter.writeString(0,
((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0)));
}
if (in1.isNullAt(1)) {
outWriter.setNullAt(1);
} else {
outWriter.writeString(1,
((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1)));
}
outWriter.complete();
out.setRowKind(in1.getRowKind());
}
}
{code}
You can see that {{result$9}} is never assigned a value, causing this bug.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)