[
https://issues.apache.org/jira/browse/FLINK-33010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17760852#comment-17760852
]
Yunhong Zheng commented on FLINK-33010:
---------------------------------------
Hi, [~Sergey Nuyanzin], It looks like master branch also will throw this error.
The test pattern:
{code:java}
@Test
public void test1() {
String query1 =
"SELECT\n"
+ "SecurityId,\n"
+ "ccy1,\n"
+ "CAST(publishTimestamp AS TIMESTAMP(3)) as
publishTimestamp\n"
+ "FROM (VALUES\n"
+ "(1, 'USD', '2022-01-01'),\n"
+ "(2, 'GBP', '2022-02-02'),\n"
+ "(3, 'GBX', '2022-03-03'),\n"
+ "(4, 'GBX', '2022-04-4'))\n"
+ "AS ccy(SecurityId, ccy1, publishTimestamp)";
Table table = tEnv().sqlQuery(query1);
tEnv().createTemporaryView("Positions", table);
String query2 =
"SELECT\n"
+ "SecurityId,\n"
+ "ccy1,\n"
+ "CAST(publishTimestamp AS TIMESTAMP(3)) as
publishTimestamp\n"
+ "FROM (VALUES\n"
+ "(3, 'USD', '2023-01-01'),\n"
+ "(4, 'GBP', '2023-02-02'),\n"
+ "(5, 'GBX', '2023-03-03'),\n"
+ "(6, 'GBX', '2023-04-4'))\n"
+ "AS ccy(SecurityId, ccy1, publishTimestamp)";
Table table2 = tEnv().sqlQuery(query2);
tEnv().createTemporaryView("Benchmarks", table2);
String sqlQuery =
"SELECT *,\n"
+ "GREATEST(\n"
+ "IFNULL(Positions.publishTimestamp,CAST('1970-1-1' AS
TIMESTAMP(3))),\n"
+ "IFNULL(Benchmarks.publishTimestamp,CAST('1970-1-1' AS
TIMESTAMP(3)))\n"
+ ")\n"
+ "FROM Positions\n"
+ "FULL JOIN Benchmarks ON Positions.SecurityId =
Benchmarks.SecurityId ";
List<String> actual =
CollectionUtil.iteratorToList(tEnv().executeSql(sqlQuery).collect()).stream()
.map(Object::toString)
.collect(Collectors.toList());
actual.sort(String::compareTo);
List<String> expected = Arrays.asList("");
assertEquals(expected, actual);
} {code}
The error msg:
{code:java}
Caused by: java.lang.NullPointerException
at StreamExecCalc$90.processElement_split3(Unknown Source)
at StreamExecCalc$90.processElement(Unknown Source)
at
org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:94)
at
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:75)
at
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.outputNullPadding(StreamingJoinOperator.java:349)
at
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement(StreamingJoinOperator.java:234)
at
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processRight(StreamingJoinOperator.java:145)
at
org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingJoinOperator.processElement2(AbstractStreamingJoinOperator.java:141)
at
org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor2$2(RecordProcessorUtils.java:116)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:254)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:179)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:143)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:68)
at
org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:89)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:613)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:1059)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:1008)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:959)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:938)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567) {code}
The stack looks like FLINK-30018, it need deeply debug to find the problem.
> NPE when using GREATEST() in Flink SQL
> --------------------------------------
>
> Key: FLINK-33010
> URL: https://issues.apache.org/jira/browse/FLINK-33010
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API, Table SQL / Planner
> Affects Versions: 1.16.1, 1.16.2
> Reporter: Hector Rios
> Priority: Minor
>
> Hi,
> I see NPEs in flink 1.14 and flink 1.16 when running queries with GREATEST()
> and timestamps. Below is an example to help in reproducing the issue.
> {code:java}
> CREATE TEMPORARY VIEW Positions AS
> SELECT
> SecurityId,
> ccy1,
> CAST(publishTimestamp AS TIMESTAMP(3)) as publishTimestamp
> FROM (VALUES
> (1, 'USD', '2022-01-01'),
> (2, 'GBP', '2022-02-02'),
> (3, 'GBX', '2022-03-03'),
> (4, 'GBX', '2022-04-4'))
> AS ccy(SecurityId, ccy1, publishTimestamp);
> CREATE TEMPORARY VIEW Benchmarks AS
> SELECT
> SecurityId,
> ccy1,
> CAST(publishTimestamp AS TIMESTAMP(3)) as publishTimestamp
> FROM (VALUES
> (3, 'USD', '2023-01-01'),
> (4, 'GBP', '2023-02-02'),
> (5, 'GBX', '2023-03-03'),
> (6, 'GBX', '2023-04-4'))
> AS ccy(SecurityId, ccy1, publishTimestamp);
> SELECT *,
> GREATEST(
> IFNULL(Positions.publishTimestamp,CAST('1970-1-1' AS TIMESTAMP(3))),
> IFNULL(Benchmarks.publishTimestamp,CAST('1970-1-1' AS TIMESTAMP(3)))
> )
> FROM Positions
> FULL JOIN Benchmarks ON Positions.SecurityId = Benchmarks.SecurityId {code}
>
> Using "IF" is a workaround at the moment instead of using "GREATEST"
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)