RocMarshal created FLINK-30511:
----------------------------------

             Summary: Ignore the Exception in user-timer Triggerble when 
recover form state.
                 Key: FLINK-30511
                 URL: https://issues.apache.org/jira/browse/FLINK-30511
             Project: Flink
          Issue Type: Bug
          Components: API / DataStream
    Affects Versions: 1.16.0
         Environment: Flink 1.16.0

java8

deployment Mode: miniCluster in IDC; standalone, yarn-application.
            Reporter: RocMarshal
         Attachments: 截屏2022-12-27 18.51.12.png, 截屏2022-12-27 19.20.00.png

* Code segment:

{code:java}
public class OnTimerDemo {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.setString("taskmanager.numberOfTaskSlots", "4");
        conf.setString("state.checkpoint-storage", "filesystem");
        conf.setString("state.checkpoints.dir", "file:///tmp/flinkjob");
        conf.setString("execution.checkpointing.interval", "30s");

        //conf.setString("execution.savepoint.path", 
"file:///tmp/flinkjob/159561b8c97c9e0b4f9eeb649086796a/chk-1"); // Anchor-A:

        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

        env.setParallelism(1);

        EnvironmentSettings envSetting = EnvironmentSettings
                .newInstance()
                .inStreamingMode()
                .build();

        StreamTableEnvironment tableEnv =  StreamTableEnvironment.create(env, 
envSetting);

        String sourceDDL = "CREATE TABLE orders (\n" +
                "  id           INT,\n" +
                "  app          INT,\n" +
                "  user_id      STRING" +
                ") WITH (\n" +
                "   'connector' = 'datagen',\n" +
                "   'rows-per-second'='1',\n" +
                "   'fields.app.min'='1',\n" +
                "   'fields.app.max'='10',\n" +
                "   'fields.user_id.length'='10'\n" +
                ")";

        tableEnv.executeSql(sourceDDL);

        Table query = tableEnv.sqlQuery("select * from orders");
        DataStream<Row> rowDataStream = tableEnv.toAppendStream(query, 
Row.class);

        TypeInformation<?>[] returnTypes = new TypeInformation[4];
        returnTypes[0] = Types.INT;

        returnTypes[1] = Types.INT; // Anchor-B:

        returnTypes[2] = Types.INT;
        returnTypes[3] = Types.INT;


        rowDataStream.keyBy(new KeySelector<Row, String>() {
                    @Override
                    public String getKey(Row value) throws Exception {
                        return value.getFieldAs(2);
                    }
                }).process(new KeyedProcessFunction<String, Row, Row>() {

                    private Row firstRow;

                    @Override
                    public void processElement(Row value, Context ctx, 
Collector<Row> out) throws Exception {
                        if (firstRow == null) {
                            firstRow = value;
                        }
                        
ctx.timerService().registerProcessingTimeTimer(System.currentTimeMillis() + 
3000);
                    }

                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, 
Collector<Row> out) throws Exception {
                        Row colRow = new Row(4);
                        colRow.setField(0, 0);
                        colRow.setField(1, 1);
                        colRow.setField(2, 2);
                        colRow.setField(3, 3);

                        out.collect(colRow); // Anchor-C

                    }
                }).name("TargetTestUDF")
                .returns(new RowTypeInfo(returnTypes))
                .print();

        env.execute(OnTimerDemo.class.getSimpleName());
    }

}
 {code}
 * Recurrence steps
 ** Run the job without state.
 ** Collect the latest available checkpoint path as 'checkpoint-path-a'
 ** Stop the job.
 ** Fill the real value of 'checkpoint-path-a' into 'Anchor-A' line and 
un-comment the line.
 ** Set 'returnTypes[1] = Types.INT;' -> 'returnTypes[1] = Types.LONG;' at the 
'Anchor-B' line.
 ** Then add break-point at 'StreamTask#handleAsyncException' method.
 ** Run the job. The 'java.lang.ClassCastException: java.lang.Integer cannot be 
cast to java.lang.Long' exception caused at the 'Anchor-C' line will ignore at  
'StreamTask#handleAsyncException' method.
 ** So, The framework can't catch the same exception in the case.
 * Root cause:
 ** !截屏2022-12-27 18.51.12.png!
 ** When job started from state data, the Task#restoreAndInvoke would be 
called. The exception 'java.lang.ClassCastException: java.lang.Integer cannot 
be cast to java.lang.Long' was ignored at the above 'handleAsyncException' 
method instead of catching at catch-block of 'Task#restoreAndInvoke'.

                      !截屏2022-12-27 19.20.00.png!

Could it be set as the framework's missing handling of exceptions? 
If so, I prefer to re-throw the exception at 'StreamTask#handleAsyncException', 
which is suitable for the intention of the 'Task#restoreAndInvoke'.

Thank u.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to