Alan Sheinberg created FLINK-36472:
--------------------------------------
Summary: Correlates with UDTF don't handle right-side conditions
or projections
Key: FLINK-36472
URL: https://issues.apache.org/jira/browse/FLINK-36472
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Affects Versions: 1.19.1
Reporter: Alan Sheinberg
Basic Table Function:
{code:java}
@FunctionHint(output = @DataTypeHint("ROW<s STRING >"))
public static class Func extends TableFunction<Row> {
public void eval( Integer i) {
collect(Row.of("blah " + i));
collect(Row.of("foo " + i));
}
}{code}
Then a test case:
{code:java}
@Test
public void testTableFuncWithRightCalcWithSelect() {
Table t1 = tEnv.fromValues(1, 2).as("f1");
tEnv.createTemporaryView("t1", t1);
tEnv.createTemporarySystemFunction("func", new Func());
TableResult result = tEnv.executeSql("select * FROM t1, LATERAL (SELECT
CONCAT(foo, ' abc') "
+ "FROM TABLE(func(f1)) as T(foo))");
final List<Row> results = new ArrayList<>();
result.collect().forEachRemaining(results::add);
final List<Row> expectedRows =
Arrays.asList(
Row.of(1, "blah 1 abc"),
Row.of(1, "foo 1 abc"),
Row.of(2, "blah 2 abc"),
Row.of(2, "foo 2 abc"));
assertThat(results).containsSequence(expectedRows);
} {code}
It has no condition and results in NPE:
{code:java}
...
Caused by: java.lang.NullPointerException
at org.apache.calcite.rex.RexProgram.expandLocalRef(RexProgram.java:549)
at
org.apache.flink.table.planner.plan.rules.physical.stream.StreamPhysicalCorrelateRule.convertToCorrelate$1(StreamPhysicalCorrelateRule.scala:84)
at
org.apache.flink.table.planner.plan.rules.physical.stream.StreamPhysicalCorrelateRule.convert(StreamPhysicalCorrelateRule.scala:98)
at
org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:172){code}
Even fixing it, it doesn't work. Looking in the code, it seems to just discard
the projections.
Running the test above, I get:
{code:java}
java.lang.AssertionError:
Expecting actual:
[+I[1, blah 1], +I[1, foo 1], +I[2, blah 2], +I[2, foo 2]]
to contain sequence:
[+I[1, blah 1 abc], +I[1, foo 1 abc], +I[2, blah 2 abc], +I[2, foo 2 abc]]
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)