[ 
https://issues.apache.org/jira/browse/FLINK-29816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17692057#comment-17692057
 ] 

RocMarshal commented on FLINK-29816:
------------------------------------

As described in the historical comments and description text,

The exceptions happen to `StreamTask` during the `restore()` was ignored by 
`asyncExceptionHandler`.
At the `Execution` side, it is possible to enter the \{@code FAILED} state from 
any other state described at `ExecutionState` class. However, here's no 
`isInitializing` flag or `Initializing` state in StreamTask.
We can deal the issue with the state rule of `ExecutionState`.

- Introduce is `isInitializing` flag for `StreamTask` in order to help 
`asyncExceptionHandler` judge handle branch.   It is worth noting that such an 
approach would result in two adjacent states where it is unsafe to change the 
value of the flags, and we can only rely on overlapping boundary conditions to 
ensure that exceptions can be handled

!image-2023-02-22-17-26-06-200.png!


 * Or we can introduce a State Enum for `StreamTask` like `ExecutionState`, If 
so, we should ensure that the state introduced is simple and overrides the 
current StreamTask state transition as a basic standard,  and the security of 
state transitions(thread-safe).


Please let me know what's your opinon. Thanks so much~

CC [~xieyi] [~Weijie Guo] [~kevin.cyj] 

> Userfunction exception in ProcessWindowFunction was called before invoke 
> during restore state(subtask was in INITIALIZING state), but SteamTask skip 
> handle Exception
> ---------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-29816
>                 URL: https://issues.apache.org/jira/browse/FLINK-29816
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>    Affects Versions: 1.14.0, 1.16.0, 1.15.3
>            Reporter: Xie Yi
>            Assignee: RocMarshal
>            Priority: Major
>         Attachments: image-2022-10-31-19-49-52-432.png, 
> image-2022-10-31-19-54-12-546.png, image-2022-11-02-10-42-21-099.png, 
> image-2022-11-02-10-57-08-064.png, image-2022-11-02-11-06-37-925.png, 
> image-2022-11-02-11-10-25-508.png, image-2023-02-22-17-26-06-200.png
>
>
> h4. 1. How to repeat 
> ProcessWindowFunction, and make some exception in process()
> test code
> {code:java}
> public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setParallelism(1);
>         env.enableCheckpointing(60 * 1000);
>         
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>         env.getCheckpointConfig().setCheckpointTimeout(60000);
>         KafkaSource<String> kafkaConsumer = KafkaSource.<String>builder()
>                 .setBootstrapServers("****")
>                 .setTopics("****")
>                 .setGroupId("****")
>                 .setValueOnlyDeserializer(new SimpleStringSchema())
>                 .setStartingOffsets(OffsetsInitializer.earliest())
>                 .build();
>         DataStreamSource<String> kafkaSource = env.fromSource(kafkaConsumer, 
> WatermarkStrategy.noWatermarks(), "Kafka Source");
>         SingleOutputStreamOperator<String> mapSourse = kafkaSource.keyBy(s -> 
> s).window(TumblingProcessingTimeWindows.of(Time.seconds(15)))
>                 .process(new ProcessWindowFunction<String, String, String, 
> TimeWindow>() {
>                     @Override
>                     public void process(String s, 
> ProcessWindowFunction<String, String, String, TimeWindow>.Context context, 
> Iterable<String> iterable, Collector<String> collector) throws Exception {
>                         //when process event:"abc" .It causes 
> java.lang.NumberFormatException
>                         Integer intS = Integer.valueOf(s);
>                         collector.collect(s);
>                     }
>                 })
>                 .name("name-process").uid("uid-process");
>         mapSourse.print();
>         env.execute();
>     }
> {code}
> kafka input event
> {code:java}
> >1
> >1
> >2
> >2
> >3
> >3
> >abc
> >abc
> >
> {code}
> h4. 2. fault phenomena
> when job process the event:"abc",It will cause 
> java.lang.NumberFormatException and failover ,Then attempt and failover 
> continuously.
> However, it only failover 2 times(attempt 0, attempt 1) and when attempt for 
> third time, It work normally, and no exception
> !image-2022-10-31-19-54-12-546.png!
> checkpoint 1  complete in attempt 1,before failover exception 1
> {code:java}
> 2022-10-31 16:59:53,644 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering 
> checkpoint 1 (type=CHECKPOINT) @ 1667206793605 for job 
> 7bca78a75b089d447bb4c99efcfd6527.2022-10-31 16:59:54,010 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed 
> checkpoint 1 for job 7bca78a75b089d447bb4c99efcfd6527 (21630 bytes, 
> checkpointDuration=333 ms, finalizationTime=72 ms).  {code}
>  
> attempt 2 was restore from checkpoint
> {code:java}
> 2022-10-31 17:00:30,033 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Restoring 
> job 7bca78a75b089d447bb4c99efcfd6527 from Checkpoint 1 @ 1667206793605 for 
> 7bca78a75b089d447bb4c99efcfd6527 located at 
> hdfs://eadhadoop/user/sloth/sloth-fs-checkpoints/meta/1_7/7bca78a75b089d447bb4c99efcfd6527/chk-1.
> {code}
>  
>  
> h4. 3. possible reasons
> during attempt 2 , task restore from checkpoint, userfunction in 
> ProcessWindowFunction was called in SteamTask.restore and produce 
> "java.lang.NumberFormatException", However, SteamTask catch exception and 
> didn't handle exception because subtask is not in RUNNING state.
> *the stack trace in attempt 2*
> user function was called in SteamTask.restore(subtask state is INITIALIZING)
> {code:java}
> java.lang.Thread.getStackTrace(Thread.java:1552)
> com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:45)
> com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:40)
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:57)
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:568)
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:524)
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:284)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1693)
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1684)
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:690)
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> java.lang.Thread.run(Thread.java:745)
> {code}
> stack trace(which cause failover) in attempt 0 and attempt 1
> user function was called in SteamTask.invoke
> {code:java}
> com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:45)
> com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:40)
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:57)
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:568)
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:524)
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:284)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1693)
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1684)
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> java.lang.Thread.run(Thread.java:745)
> {code}
> in org.apache.flink.streaming.runtime.tasks.StreamTask handleAsyncException
> SteamTask only handleAsyncException when is Running==true
> [https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L1540]
> {code:java}
>     @Override
>     public void handleAsyncException(String message, Throwable exception) {
>         if (isRunning) {
>             // only fail if the task is still running
>             asyncExceptionHandler.handleAsyncException(message, exception);
>         }
>     }
> {code}
> but during restore,isRunning==false
> [https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L673]
>  
> So during Steam.restore, SteamTask skip exception in userfunction of 
> ProcessWindowFunction.
>  
>  
> h4.  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to