[ 
https://issues.apache.org/jira/browse/FLINK-30511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

RocMarshal updated FLINK-30511:
-------------------------------
    Description: 
* Code segment:

{code:java}
public class OnTimerDemo {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.setString("taskmanager.numberOfTaskSlots", "4");
        conf.setString("state.checkpoint-storage", "filesystem");
        conf.setString("state.checkpoints.dir", "file:///tmp/flinkjob");
        conf.setString("execution.checkpointing.interval", "30s");

        //conf.setString("execution.savepoint.path", 
"file:///tmp/flinkjob/159561b8c97c9e0b4f9eeb649086796a/chk-1"); // Anchor-A:

        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

        env.setParallelism(1);

        EnvironmentSettings envSetting = EnvironmentSettings
                .newInstance()
                .inStreamingMode()
                .build();

        StreamTableEnvironment tableEnv =  StreamTableEnvironment.create(env, 
envSetting);

        String sourceDDL = "CREATE TABLE orders (\n" +
                "  id           INT,\n" +
                "  app          INT,\n" +
                "  user_id      STRING" +
                ") WITH (\n" +
                "   'connector' = 'datagen',\n" +
                "   'rows-per-second'='1',\n" +
                "   'fields.app.min'='1',\n" +
                "   'fields.app.max'='10',\n" +
                "   'fields.user_id.length'='10'\n" +
                ")";

        tableEnv.executeSql(sourceDDL);

        Table query = tableEnv.sqlQuery("select * from orders");
        DataStream<Row> rowDataStream = tableEnv.toAppendStream(query, 
Row.class);

        TypeInformation<?>[] returnTypes = new TypeInformation[4];
        returnTypes[0] = Types.INT;

        returnTypes[1] = Types.INT; // Anchor-B:

        returnTypes[2] = Types.INT;
        returnTypes[3] = Types.INT;


        rowDataStream.keyBy(new KeySelector<Row, String>() {
                    @Override
                    public String getKey(Row value) throws Exception {
                        return value.getFieldAs(2);
                    }
                }).process(new KeyedProcessFunction<String, Row, Row>() {

                    private Row firstRow;

                    @Override
                    public void processElement(Row value, Context ctx, 
Collector<Row> out) throws Exception {
                        if (firstRow == null) {
                            firstRow = value;
                        }
                        
ctx.timerService().registerProcessingTimeTimer(System.currentTimeMillis() + 
3000);
                    }

                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, 
Collector<Row> out) throws Exception {
                        Row colRow = new Row(4);
                        colRow.setField(0, 0);
                        colRow.setField(1, 1);
                        colRow.setField(2, 2);
                        colRow.setField(3, 3);

                        out.collect(colRow); // Anchor-C

                    }
                }).name("TargetTestUDF")
                .returns(new RowTypeInfo(returnTypes))
                .print();

        env.execute(OnTimerDemo.class.getSimpleName());
    }

}
 {code}
 * Recurrence steps
 ** Run the job without state.
 ** Collect the latest available checkpoint path as 'checkpoint-path-a'
 ** Stop the job.
 ** Fill the real value of 'checkpoint-path-a' into 'Anchor-A' line and 
un-comment the line.
 ** Set 'returnTypes[1] = Types.INT;' -> 'returnTypes[1] = Types.LONG;' at the 
'Anchor-B' line.
 ** Then add break-point at 'StreamTask#handleAsyncException' method.
 ** Run the job. The 'java.lang.ClassCastException: java.lang.Integer cannot be 
cast to java.lang.Long' exception caused at the 'Anchor-C' line will ignore at  
'StreamTask#handleAsyncException' method.
 ** So, The framework can't catch the same exception in the case.
 * Root cause:
 ** !截屏2022-12-27 18.51.12.png!
 ** When job started from state data, the Task#restoreAndInvoke would be 
called. The exception 'java.lang.ClassCastException: java.lang.Integer cannot 
be cast to java.lang.Long' was ignored at the above 'handleAsyncException' 
method instead of catching at catch-block of 'Task#restoreAndInvoke'.

                       !截屏2022-12-27 19.20.00.png!

Could it be seen as the framework's missing handling of exceptions? 
If so, I prefer to re-throw the exception at 'StreamTask#handleAsyncException', 
which is suitable for the intention of the 'Task#restoreAndInvoke'.

Thank u.
 
 
 

 

  was:
* Code segment:

{code:java}
public class OnTimerDemo {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.setString("taskmanager.numberOfTaskSlots", "4");
        conf.setString("state.checkpoint-storage", "filesystem");
        conf.setString("state.checkpoints.dir", "file:///tmp/flinkjob");
        conf.setString("execution.checkpointing.interval", "30s");

        //conf.setString("execution.savepoint.path", 
"file:///tmp/flinkjob/159561b8c97c9e0b4f9eeb649086796a/chk-1"); // Anchor-A:

        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

        env.setParallelism(1);

        EnvironmentSettings envSetting = EnvironmentSettings
                .newInstance()
                .inStreamingMode()
                .build();

        StreamTableEnvironment tableEnv =  StreamTableEnvironment.create(env, 
envSetting);

        String sourceDDL = "CREATE TABLE orders (\n" +
                "  id           INT,\n" +
                "  app          INT,\n" +
                "  user_id      STRING" +
                ") WITH (\n" +
                "   'connector' = 'datagen',\n" +
                "   'rows-per-second'='1',\n" +
                "   'fields.app.min'='1',\n" +
                "   'fields.app.max'='10',\n" +
                "   'fields.user_id.length'='10'\n" +
                ")";

        tableEnv.executeSql(sourceDDL);

        Table query = tableEnv.sqlQuery("select * from orders");
        DataStream<Row> rowDataStream = tableEnv.toAppendStream(query, 
Row.class);

        TypeInformation<?>[] returnTypes = new TypeInformation[4];
        returnTypes[0] = Types.INT;

        returnTypes[1] = Types.INT; // Anchor-B:

        returnTypes[2] = Types.INT;
        returnTypes[3] = Types.INT;


        rowDataStream.keyBy(new KeySelector<Row, String>() {
                    @Override
                    public String getKey(Row value) throws Exception {
                        return value.getFieldAs(2);
                    }
                }).process(new KeyedProcessFunction<String, Row, Row>() {

                    private Row firstRow;

                    @Override
                    public void processElement(Row value, Context ctx, 
Collector<Row> out) throws Exception {
                        if (firstRow == null) {
                            firstRow = value;
                        }
                        
ctx.timerService().registerProcessingTimeTimer(System.currentTimeMillis() + 
3000);
                    }

                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, 
Collector<Row> out) throws Exception {
                        Row colRow = new Row(4);
                        colRow.setField(0, 0);
                        colRow.setField(1, 1);
                        colRow.setField(2, 2);
                        colRow.setField(3, 3);

                        out.collect(colRow); // Anchor-C

                    }
                }).name("TargetTestUDF")
                .returns(new RowTypeInfo(returnTypes))
                .print();

        env.execute(OnTimerDemo.class.getSimpleName());
    }

}
 {code}
 * Recurrence steps
 ** Run the job without state.
 ** Collect the latest available checkpoint path as 'checkpoint-path-a'
 ** Stop the job.
 ** Fill the real value of 'checkpoint-path-a' into 'Anchor-A' line and 
un-comment the line.
 ** Set 'returnTypes[1] = Types.INT;' -> 'returnTypes[1] = Types.LONG;' at the 
'Anchor-B' line.
 ** Then add break-point at 'StreamTask#handleAsyncException' method.
 ** Run the job. The 'java.lang.ClassCastException: java.lang.Integer cannot be 
cast to java.lang.Long' exception caused at the 'Anchor-C' line will ignore at  
'StreamTask#handleAsyncException' method.
 ** So, The framework can't catch the same exception in the case.
 * Root cause:
 ** !截屏2022-12-27 18.51.12.png!
 ** When job started from state data, the Task#restoreAndInvoke would be 
called. The exception 'java.lang.ClassCastException: java.lang.Integer cannot 
be cast to java.lang.Long' was ignored at the above 'handleAsyncException' 
method instead of catching at catch-block of 'Task#restoreAndInvoke'.

                      !截屏2022-12-27 19.20.00.png!

Could it be set as the framework's missing handling of exceptions? 
If so, I prefer to re-throw the exception at 'StreamTask#handleAsyncException', 
which is suitable for the intention of the 'Task#restoreAndInvoke'.

Thank u.


> Ignore the Exception in user-timer Triggerble when recover form state.
> ----------------------------------------------------------------------
>
>                 Key: FLINK-30511
>                 URL: https://issues.apache.org/jira/browse/FLINK-30511
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.16.0
>         Environment: Flink 1.16.0
> java8
> deployment Mode: miniCluster in IDC; standalone, yarn-application.
>            Reporter: RocMarshal
>            Priority: Minor
>         Attachments: 截屏2022-12-27 18.51.12.png, 截屏2022-12-27 19.20.00.png
>
>
> * Code segment:
> {code:java}
> public class OnTimerDemo {
>     public static void main(String[] args) throws Exception {
>         Configuration conf = new Configuration();
>         conf.setString("taskmanager.numberOfTaskSlots", "4");
>         conf.setString("state.checkpoint-storage", "filesystem");
>         conf.setString("state.checkpoints.dir", "file:///tmp/flinkjob");
>         conf.setString("execution.checkpointing.interval", "30s");
>         //conf.setString("execution.savepoint.path", 
> "file:///tmp/flinkjob/159561b8c97c9e0b4f9eeb649086796a/chk-1"); // Anchor-A:
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
>         env.setParallelism(1);
>         EnvironmentSettings envSetting = EnvironmentSettings
>                 .newInstance()
>                 .inStreamingMode()
>                 .build();
>         StreamTableEnvironment tableEnv =  StreamTableEnvironment.create(env, 
> envSetting);
>         String sourceDDL = "CREATE TABLE orders (\n" +
>                 "  id           INT,\n" +
>                 "  app          INT,\n" +
>                 "  user_id      STRING" +
>                 ") WITH (\n" +
>                 "   'connector' = 'datagen',\n" +
>                 "   'rows-per-second'='1',\n" +
>                 "   'fields.app.min'='1',\n" +
>                 "   'fields.app.max'='10',\n" +
>                 "   'fields.user_id.length'='10'\n" +
>                 ")";
>         tableEnv.executeSql(sourceDDL);
>         Table query = tableEnv.sqlQuery("select * from orders");
>         DataStream<Row> rowDataStream = tableEnv.toAppendStream(query, 
> Row.class);
>         TypeInformation<?>[] returnTypes = new TypeInformation[4];
>         returnTypes[0] = Types.INT;
>         returnTypes[1] = Types.INT; // Anchor-B:
>         returnTypes[2] = Types.INT;
>         returnTypes[3] = Types.INT;
>         rowDataStream.keyBy(new KeySelector<Row, String>() {
>                     @Override
>                     public String getKey(Row value) throws Exception {
>                         return value.getFieldAs(2);
>                     }
>                 }).process(new KeyedProcessFunction<String, Row, Row>() {
>                     private Row firstRow;
>                     @Override
>                     public void processElement(Row value, Context ctx, 
> Collector<Row> out) throws Exception {
>                         if (firstRow == null) {
>                             firstRow = value;
>                         }
>                         
> ctx.timerService().registerProcessingTimeTimer(System.currentTimeMillis() + 
> 3000);
>                     }
>                     @Override
>                     public void onTimer(long timestamp, OnTimerContext ctx, 
> Collector<Row> out) throws Exception {
>                         Row colRow = new Row(4);
>                         colRow.setField(0, 0);
>                         colRow.setField(1, 1);
>                         colRow.setField(2, 2);
>                         colRow.setField(3, 3);
>                         out.collect(colRow); // Anchor-C
>                     }
>                 }).name("TargetTestUDF")
>                 .returns(new RowTypeInfo(returnTypes))
>                 .print();
>         env.execute(OnTimerDemo.class.getSimpleName());
>     }
> }
>  {code}
>  * Recurrence steps
>  ** Run the job without state.
>  ** Collect the latest available checkpoint path as 'checkpoint-path-a'
>  ** Stop the job.
>  ** Fill the real value of 'checkpoint-path-a' into 'Anchor-A' line and 
> un-comment the line.
>  ** Set 'returnTypes[1] = Types.INT;' -> 'returnTypes[1] = Types.LONG;' at 
> the 'Anchor-B' line.
>  ** Then add break-point at 'StreamTask#handleAsyncException' method.
>  ** Run the job. The 'java.lang.ClassCastException: java.lang.Integer cannot 
> be cast to java.lang.Long' exception caused at the 'Anchor-C' line will 
> ignore at  'StreamTask#handleAsyncException' method.
>  ** So, The framework can't catch the same exception in the case.
>  * Root cause:
>  ** !截屏2022-12-27 18.51.12.png!
>  ** When job started from state data, the Task#restoreAndInvoke would be 
> called. The exception 'java.lang.ClassCastException: java.lang.Integer cannot 
> be cast to java.lang.Long' was ignored at the above 'handleAsyncException' 
> method instead of catching at catch-block of 'Task#restoreAndInvoke'.
>                        !截屏2022-12-27 19.20.00.png!
> Could it be seen as the framework's missing handling of exceptions? 
> If so, I prefer to re-throw the exception at 
> 'StreamTask#handleAsyncException', which is suitable for the intention of the 
> 'Task#restoreAndInvoke'.
> Thank u.
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to