[ 
https://issues.apache.org/jira/browse/FLINK-31967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17717628#comment-17717628
 ] 

padavan edited comment on FLINK-31967 at 4/28/23 11:47 AM:
-----------------------------------------------------------

[~martijnvisser] 

/usr/lib/jvm/java-1.11.0-openjdk-amd64/

!image-2023-04-28-14-46-19-736.png!

 
{code:java}
 public static void main(String[] args) throws Exception {
  
  final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
  KafkaSource<UserModel> source = KafkaSource.<UserModel>builder()
                .setBootstrapServers(kafka)
                .setTopics("x1").setGroupId("flink_group")
                .setValueOnlyDeserializer(new JsonConverter()).build();
 
        WatermarkStrategy<UserModel> strategy = WatermarkStrategy.              
 <UserModel>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                .withTimestampAssigner((i, timestamp) -> {
                  long time = i.dt.toInstant(ZoneOffset.UTC).toEpochMilli();
                    return time;
                });
        DataStream<UserModel> ds = env.fromSource(source, strategy, "Kafka 
Source");
        
        t1_LeadLag(ds, env);
        env.execute("Flink Java API Skeleton");
}
{code}
 
{code:java}
private static void t1_LeadLag(DataStream<UserModel> ds, 
StreamExecutionEnvironment env) {
    StreamTableEnvironment te = StreamTableEnvironment.create(env);
    Table t = te.fromDataStream(ds, 
Schema.newBuilder().columnByExpression("proctime", "proctime()").build());

    te.createTemporaryView("users", t);

    Table res = te.sqlQuery("SELECT userId, `count`,\n" +
            " LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS 
prev_quantity\n" +
            " FROM users");

    te.toChangelogStream(res).print();
}{code}
 

 

 


was (Author: JIRAUSER287909):
[~martijnvisser] 

/usr/lib/jvm/java-1.11.0-openjdk-amd64/

!image-2023-04-28-14-46-19-736.png!

 
{code:java}
 public static void main(String[] args) throws Exception {
  
  final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
  KafkaSource<UserModel> source = KafkaSource.<UserModel>builder()
                .setBootstrapServers(kafka)
                .setTopics("x1").setGroupId("flink_group")
                .setValueOnlyDeserializer(new JsonConverter()).build();
 
        WatermarkStrategy<UserModel> strategy = WatermarkStrategy.              
 <UserModel>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                .withTimestampAssigner((i, timestamp) -> {
                  long time = i.dt.toInstant(ZoneOffset.UTC).toEpochMilli();
                    return time;
                });
        DataStream<UserModel> ds = env.fromSource(source, strategy, "Kafka 
Source");
        
        t1_LeadLag(ds, env)
        env.execute("Flink Java API Skeleton");
}
{code}
 
{code:java}
private static void t1_LeadLag(DataStream<UserModel> ds, 
StreamExecutionEnvironment env) {
    StreamTableEnvironment te = StreamTableEnvironment.create(env);
    Table t = te.fromDataStream(ds, 
Schema.newBuilder().columnByExpression("proctime", "proctime()").build());

    te.createTemporaryView("users", t);

    Table res = te.sqlQuery("SELECT userId, `count`,\n" +
            " LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS 
prev_quantity\n" +
            " FROM users");

    te.toChangelogStream(res).print();
}{code}
 

 

 

> SQL with LAG function NullPointerException
> ------------------------------------------
>
>                 Key: FLINK-31967
>                 URL: https://issues.apache.org/jira/browse/FLINK-31967
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>            Reporter: padavan
>            Priority: Major
>         Attachments: image-2023-04-28-14-46-19-736.png
>
>
> I want to make a query with the LAG function. And got Job Exception without 
> any explanations.
>  
> *Code:*
> {code:java}
> private static void t1_LeadLag(DataStream<UserModel> ds, 
> StreamExecutionEnvironment env) {
>     StreamTableEnvironment te = StreamTableEnvironment.create(env);
>     Table t = te.fromDataStream(ds, 
> Schema.newBuilder().columnByExpression("proctime", "proctime()").build());
>     te.createTemporaryView("users", t);
>     Table res = te.sqlQuery("SELECT userId, `count`,\n" +
>             " LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS 
> prev_quantity\n" +
>             " FROM users");
>     te.toChangelogStream(res).print();
> }{code}
>  
> *Input:*
> {"userId":3,"count":0,"dt":"2023-04-28T07:44:21.551Z"}
>  
> *Exception:* I remove part about basic JobExecutionException and kept the 
> important(i think)
> {code:java}
> Caused by: java.lang.NullPointerException
>     at 
> org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149)
>     at 
> org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245)
>     at 
> org$apache$flink$table$runtime$functions$aggregate$LagAggFunction$LagAcc$2$Converter.toExternal(Unknown
>  Source)
>     at 
> org.apache.flink.table.data.conversion.StructuredObjectConverter.toExternal(StructuredObjectConverter.java:101)
>     at UnboundedOverAggregateHelper$15.setAccumulators(Unknown Source)
>     at 
> org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:92)
>     at 
> org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:42)
>     at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
>     at 
> org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60)
>     at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>     at java.base/java.lang.Thread.run(Thread.java:829){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to