[ 
https://issues.apache.org/jira/browse/FLINK-33010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17760852#comment-17760852
 ] 

Yunhong Zheng commented on FLINK-33010:
---------------------------------------

Hi, [~Sergey Nuyanzin], It looks like master branch also will throw this error. 
The test pattern:
{code:java}
@Test
public void test1() {
    String query1 =
            "SELECT\n"
                    + "SecurityId,\n"
                    + "ccy1,\n"
                    + "CAST(publishTimestamp AS TIMESTAMP(3)) as 
publishTimestamp\n"
                    + "FROM (VALUES\n"
                    + "(1, 'USD', '2022-01-01'),\n"
                    + "(2, 'GBP', '2022-02-02'),\n"
                    + "(3, 'GBX', '2022-03-03'),\n"
                    + "(4, 'GBX', '2022-04-4'))\n"
                    + "AS ccy(SecurityId, ccy1, publishTimestamp)";
    Table table = tEnv().sqlQuery(query1);
    tEnv().createTemporaryView("Positions", table);

    String query2 =
            "SELECT\n"
                    + "SecurityId,\n"
                    + "ccy1,\n"
                    + "CAST(publishTimestamp AS TIMESTAMP(3)) as 
publishTimestamp\n"
                    + "FROM (VALUES\n"
                    + "(3, 'USD', '2023-01-01'),\n"
                    + "(4, 'GBP', '2023-02-02'),\n"
                    + "(5, 'GBX', '2023-03-03'),\n"
                    + "(6, 'GBX', '2023-04-4'))\n"
                    + "AS ccy(SecurityId, ccy1, publishTimestamp)";
    Table table2 = tEnv().sqlQuery(query2);
    tEnv().createTemporaryView("Benchmarks", table2);

    String sqlQuery =
            "SELECT *,\n"
                    + "GREATEST(\n"
                    + "IFNULL(Positions.publishTimestamp,CAST('1970-1-1' AS 
TIMESTAMP(3))),\n"
                    + "IFNULL(Benchmarks.publishTimestamp,CAST('1970-1-1' AS 
TIMESTAMP(3)))\n"
                    + ")\n"
                    + "FROM Positions\n"
                    + "FULL JOIN Benchmarks ON Positions.SecurityId = 
Benchmarks.SecurityId ";
    List<String> actual =
            
CollectionUtil.iteratorToList(tEnv().executeSql(sqlQuery).collect()).stream()
                    .map(Object::toString)
                    .collect(Collectors.toList());
    actual.sort(String::compareTo);
    List<String> expected = Arrays.asList("");
    assertEquals(expected, actual);
} {code}
The error msg:
{code:java}
Caused by: java.lang.NullPointerException
    at StreamExecCalc$90.processElement_split3(Unknown Source)
    at StreamExecCalc$90.processElement(Unknown Source)
    at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:94)
    at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:75)
    at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
    at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
    at 
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.outputNullPadding(StreamingJoinOperator.java:349)
    at 
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement(StreamingJoinOperator.java:234)
    at 
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processRight(StreamingJoinOperator.java:145)
    at 
org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingJoinOperator.processElement2(AbstractStreamingJoinOperator.java:141)
    at 
org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor2$2(RecordProcessorUtils.java:116)
    at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:254)
    at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:179)
    at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:143)
    at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:68)
    at 
org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:89)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:613)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:1059)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:1008)
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:959)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:938)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567) {code}
The stack looks like FLINK-30018, it need deeply debug to find the problem.

 

> NPE when using GREATEST() in Flink SQL
> --------------------------------------
>
>                 Key: FLINK-33010
>                 URL: https://issues.apache.org/jira/browse/FLINK-33010
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API, Table SQL / Planner
>    Affects Versions: 1.16.1, 1.16.2
>            Reporter: Hector Rios
>            Priority: Minor
>
> Hi,
> I see NPEs in flink 1.14 and flink 1.16 when running queries with GREATEST() 
> and timestamps. Below is an example to help in reproducing the issue.
> {code:java}
> CREATE TEMPORARY VIEW Positions AS
> SELECT
> SecurityId,
> ccy1,
> CAST(publishTimestamp AS TIMESTAMP(3)) as publishTimestamp
> FROM (VALUES
> (1, 'USD', '2022-01-01'),
> (2, 'GBP', '2022-02-02'),
> (3, 'GBX', '2022-03-03'),
> (4, 'GBX', '2022-04-4'))
> AS ccy(SecurityId, ccy1, publishTimestamp);
> CREATE TEMPORARY VIEW Benchmarks AS
> SELECT
> SecurityId,
> ccy1,
> CAST(publishTimestamp AS TIMESTAMP(3)) as publishTimestamp
> FROM (VALUES
> (3, 'USD', '2023-01-01'),
> (4, 'GBP', '2023-02-02'),
> (5, 'GBX', '2023-03-03'),
> (6, 'GBX', '2023-04-4'))
> AS ccy(SecurityId, ccy1, publishTimestamp);
> SELECT *,
> GREATEST(
> IFNULL(Positions.publishTimestamp,CAST('1970-1-1' AS TIMESTAMP(3))),
> IFNULL(Benchmarks.publishTimestamp,CAST('1970-1-1' AS TIMESTAMP(3)))
> )
> FROM Positions
> FULL JOIN Benchmarks ON Positions.SecurityId = Benchmarks.SecurityId {code}
>  
> Using "IF" is a workaround at the moment instead of using "GREATEST"
>   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to