[
https://issues.apache.org/jira/browse/FLINK-31967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17717672#comment-17717672
]
padavan commented on FLINK-31967:
---------------------------------
[~martijnvisser]
wow , intresting if use DataGen then all work fine:
{code:java}
private static void t1_LeadLag(StreamExecutionEnvironment env) {
StreamTableEnvironment te = StreamTableEnvironment.create(env);
te.executeSql(
"CREATE TABLE users (\n" +
" userId INT,\n" +
" `count` INT,\n" +
" proctime AS PROCTIME()\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'fields.userId.min' = '1',\n" +
" 'fields.userId.max' = '10',\n" +
" 'fields.count.min' = '0',\n" +
" 'fields.count.max' = '10'\n" +
");");
/*
Table t = te.fromDataStream(ds,
Schema.newBuilder().columnByExpression("proctime", "proctime()").build());
t.printSchema();
te.createTemporaryView("users", t);
*/
Table res = te.sqlQuery("SELECT userId, `count`, proctime," +
" LAG(`count`, 1, 1) OVER w AS prev_quantity" +
" FROM users" +
" WINDOW w AS (ORDER BY proctime)");
te.toChangelogStream(res).print();
}
{code}
I work with kafka and did various ETLs with it, Window Agg all worked fine.
But when I started working with the LAG and above code, it gives an exception.
What can it be?
If need, Maybe I should make a project archive with my example (but you need
kafka (for example in docker))?
> SQL with LAG function NullPointerException
> ------------------------------------------
>
> Key: FLINK-31967
> URL: https://issues.apache.org/jira/browse/FLINK-31967
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API
> Reporter: padavan
> Priority: Major
> Attachments: image-2023-04-28-14-46-19-736.png,
> image-2023-04-28-15-06-48-184.png, image-2023-04-28-15-14-58-788.png,
> image-2023-04-28-15-17-49-144.png
>
>
> I want to make a query with the LAG function. And got Job Exception without
> any explanations.
>
> *Code:*
> {code:java}
> private static void t1_LeadLag(DataStream<UserModel> ds,
> StreamExecutionEnvironment env) {
> StreamTableEnvironment te = StreamTableEnvironment.create(env);
> Table t = te.fromDataStream(ds,
> Schema.newBuilder().columnByExpression("proctime", "proctime()").build());
> te.createTemporaryView("users", t);
> Table res = te.sqlQuery("SELECT userId, `count`,\n" +
> " LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS
> prev_quantity\n" +
> " FROM users");
> te.toChangelogStream(res).print();
> }{code}
>
> *Input:*
> {"userId":3,"count":0,"dt":"2023-04-28T07:44:21.551Z"}
>
> *Exception:* I remove part about basic JobExecutionException and kept the
> important(i think)
> {code:java}
> Caused by: java.lang.NullPointerException
> at
> org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149)
> at
> org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245)
> at
> org$apache$flink$table$runtime$functions$aggregate$LagAggFunction$LagAcc$2$Converter.toExternal(Unknown
> Source)
> at
> org.apache.flink.table.data.conversion.StructuredObjectConverter.toExternal(StructuredObjectConverter.java:101)
> at UnboundedOverAggregateHelper$15.setAccumulators(Unknown Source)
> at
> org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:92)
> at
> org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:42)
> at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
> at
> org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> at java.base/java.lang.Thread.run(Thread.java:829){code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)