[ 
https://issues.apache.org/jira/browse/FLINK-30511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17654895#comment-17654895
 ] 

RocMarshal commented on FLINK-30511:
------------------------------------

Thank you [~Weijie Guo] [~kevin.cyj] . Looking forward to the fix.
 
 
 

 

> Ignore the Exception in user-timer Triggerble when recover form state.
> ----------------------------------------------------------------------
>
>                 Key: FLINK-30511
>                 URL: https://issues.apache.org/jira/browse/FLINK-30511
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.16.0
>         Environment: Flink 1.16.0
> java8
> deployment Mode: miniCluster in IDC; standalone, yarn-application.
>            Reporter: RocMarshal
>            Priority: Minor
>         Attachments: 截屏2022-12-27 18.51.12.png, 截屏2022-12-27 19.20.00.png
>
>
> * Code segment:
> {code:java}
> public class OnTimerDemo {
>     public static void main(String[] args) throws Exception {
>         Configuration conf = new Configuration();
>         conf.setString("taskmanager.numberOfTaskSlots", "4");
>         conf.setString("state.checkpoint-storage", "filesystem");
>         conf.setString("state.checkpoints.dir", "file:///tmp/flinkjob");
>         conf.setString("execution.checkpointing.interval", "30s");
>         //conf.setString("execution.savepoint.path", 
> "file:///tmp/flinkjob/159561b8c97c9e0b4f9eeb649086796a/chk-1"); // Anchor-A:
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
>         env.setParallelism(1);
>         EnvironmentSettings envSetting = EnvironmentSettings
>                 .newInstance()
>                 .inStreamingMode()
>                 .build();
>         StreamTableEnvironment tableEnv =  StreamTableEnvironment.create(env, 
> envSetting);
>         String sourceDDL = "CREATE TABLE orders (\n" +
>                 "  id           INT,\n" +
>                 "  app          INT,\n" +
>                 "  user_id      STRING" +
>                 ") WITH (\n" +
>                 "   'connector' = 'datagen',\n" +
>                 "   'rows-per-second'='1',\n" +
>                 "   'fields.app.min'='1',\n" +
>                 "   'fields.app.max'='10',\n" +
>                 "   'fields.user_id.length'='10'\n" +
>                 ")";
>         tableEnv.executeSql(sourceDDL);
>         Table query = tableEnv.sqlQuery("select * from orders");
>         DataStream<Row> rowDataStream = tableEnv.toAppendStream(query, 
> Row.class);
>         TypeInformation<?>[] returnTypes = new TypeInformation[4];
>         returnTypes[0] = Types.INT;
>         returnTypes[1] = Types.INT; // Anchor-B:
>         returnTypes[2] = Types.INT;
>         returnTypes[3] = Types.INT;
>         rowDataStream.keyBy(new KeySelector<Row, String>() {
>                     @Override
>                     public String getKey(Row value) throws Exception {
>                         return value.getFieldAs(2);
>                     }
>                 }).process(new KeyedProcessFunction<String, Row, Row>() {
>                     private Row firstRow;
>                     @Override
>                     public void processElement(Row value, Context ctx, 
> Collector<Row> out) throws Exception {
>                         if (firstRow == null) {
>                             firstRow = value;
>                         }
>                         
> ctx.timerService().registerProcessingTimeTimer(System.currentTimeMillis() + 
> 3000);
>                     }
>                     @Override
>                     public void onTimer(long timestamp, OnTimerContext ctx, 
> Collector<Row> out) throws Exception {
>                         Row colRow = new Row(4);
>                         colRow.setField(0, 0);
>                         colRow.setField(1, 1);
>                         colRow.setField(2, 2);
>                         colRow.setField(3, 3);
>                         out.collect(colRow); // Anchor-C
>                     }
>                 }).name("TargetTestUDF")
>                 .returns(new RowTypeInfo(returnTypes))
>                 .print();
>         env.execute(OnTimerDemo.class.getSimpleName());
>     }
> }
>  {code}
>  * Recurrence steps
>  ** Run the job without state.
>  ** Collect the latest available checkpoint path as 'checkpoint-path-a'
>  ** Stop the job.
>  ** Fill the real value of 'checkpoint-path-a' into 'Anchor-A' line and 
> un-comment the line.
>  ** Set 'returnTypes[1] = Types.INT;' -> 'returnTypes[1] = Types.LONG;' at 
> the 'Anchor-B' line.
>  ** Then add break-point at 'StreamTask#handleAsyncException' method.
>  ** Run the job. The 'java.lang.ClassCastException: java.lang.Integer cannot 
> be cast to java.lang.Long' exception caused at the 'Anchor-C' line will 
> ignore at  'StreamTask#handleAsyncException' method.
>  ** So, The framework can't catch the same exception in the case.
>  * Root cause:
>  ** !截屏2022-12-27 18.51.12.png!
>  ** When job started from state data, the Task#restoreAndInvoke would be 
> called. The exception 'java.lang.ClassCastException: java.lang.Integer cannot 
> be cast to java.lang.Long' was ignored at the above 'handleAsyncException' 
> method instead of catching at catch-block of 'Task#restoreAndInvoke'.
>                        !截屏2022-12-27 19.20.00.png!
> Could it be seen as the framework's missing handling of exceptions? 
> If so, I prefer to re-throw the exception at 
> 'StreamTask#handleAsyncException', which is suitable for the intention of the 
> 'Task#restoreAndInvoke'.
> Thank u.
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to