[
https://issues.apache.org/jira/browse/FLINK-29816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17692250#comment-17692250
]
Rui Fan commented on FLINK-29816:
---------------------------------
Hi [~Weijie Guo] , thanks for your reminder. I have requested them, and I can
go ahead after they agree.
Anyway, I don't have strong opinion about which PR is merged first.
> Userfunction exception in ProcessWindowFunction was called before invoke
> during restore state(subtask was in INITIALIZING state), but SteamTask skip
> handle Exception
> ---------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-29816
> URL: https://issues.apache.org/jira/browse/FLINK-29816
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Task
> Affects Versions: 1.14.0, 1.16.0, 1.15.3
> Reporter: Xie Yi
> Assignee: RocMarshal
> Priority: Major
> Labels: pull-request-available
> Attachments: image-2022-10-31-19-49-52-432.png,
> image-2022-10-31-19-54-12-546.png, image-2022-11-02-10-42-21-099.png,
> image-2022-11-02-10-57-08-064.png, image-2022-11-02-11-06-37-925.png,
> image-2022-11-02-11-10-25-508.png, image-2023-02-22-17-26-06-200.png
>
>
> h4. 1. How to repeat
> ProcessWindowFunction, and make some exception in process()
> test code
> {code:java}
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> env.enableCheckpointing(60 * 1000);
>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> env.getCheckpointConfig().setCheckpointTimeout(60000);
> KafkaSource<String> kafkaConsumer = KafkaSource.<String>builder()
> .setBootstrapServers("****")
> .setTopics("****")
> .setGroupId("****")
> .setValueOnlyDeserializer(new SimpleStringSchema())
> .setStartingOffsets(OffsetsInitializer.earliest())
> .build();
> DataStreamSource<String> kafkaSource = env.fromSource(kafkaConsumer,
> WatermarkStrategy.noWatermarks(), "Kafka Source");
> SingleOutputStreamOperator<String> mapSourse = kafkaSource.keyBy(s ->
> s).window(TumblingProcessingTimeWindows.of(Time.seconds(15)))
> .process(new ProcessWindowFunction<String, String, String,
> TimeWindow>() {
> @Override
> public void process(String s,
> ProcessWindowFunction<String, String, String, TimeWindow>.Context context,
> Iterable<String> iterable, Collector<String> collector) throws Exception {
> //when process event:"abc" .It causes
> java.lang.NumberFormatException
> Integer intS = Integer.valueOf(s);
> collector.collect(s);
> }
> })
> .name("name-process").uid("uid-process");
> mapSourse.print();
> env.execute();
> }
> {code}
> kafka input event
> {code:java}
> >1
> >1
> >2
> >2
> >3
> >3
> >abc
> >abc
> >
> {code}
> h4. 2. fault phenomena
> when job process the event:"abc",It will cause
> java.lang.NumberFormatException and failover ,Then attempt and failover
> continuously.
> However, it only failover 2 times(attempt 0, attempt 1) and when attempt for
> third time, It work normally, and no exception
> !image-2022-10-31-19-54-12-546.png!
> checkpoint 1 complete in attempt 1,before failover exception 1
> {code:java}
> 2022-10-31 16:59:53,644 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering
> checkpoint 1 (type=CHECKPOINT) @ 1667206793605 for job
> 7bca78a75b089d447bb4c99efcfd6527.2022-10-31 16:59:54,010 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed
> checkpoint 1 for job 7bca78a75b089d447bb4c99efcfd6527 (21630 bytes,
> checkpointDuration=333 ms, finalizationTime=72 ms). {code}
>
> attempt 2 was restore from checkpoint
> {code:java}
> 2022-10-31 17:00:30,033 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Restoring
> job 7bca78a75b089d447bb4c99efcfd6527 from Checkpoint 1 @ 1667206793605 for
> 7bca78a75b089d447bb4c99efcfd6527 located at
> hdfs://eadhadoop/user/sloth/sloth-fs-checkpoints/meta/1_7/7bca78a75b089d447bb4c99efcfd6527/chk-1.
> {code}
>
>
> h4. 3. possible reasons
> during attempt 2 , task restore from checkpoint, userfunction in
> ProcessWindowFunction was called in SteamTask.restore and produce
> "java.lang.NumberFormatException", However, SteamTask catch exception and
> didn't handle exception because subtask is not in RUNNING state.
> *the stack trace in attempt 2*
> user function was called in SteamTask.restore(subtask state is INITIALIZING)
> {code:java}
> java.lang.Thread.getStackTrace(Thread.java:1552)
> com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:45)
> com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:40)
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:57)
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:568)
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:524)
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:284)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1693)
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1684)
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:690)
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> java.lang.Thread.run(Thread.java:745)
> {code}
> stack trace(which cause failover) in attempt 0 and attempt 1
> user function was called in SteamTask.invoke
> {code:java}
> com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:45)
> com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:40)
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:57)
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:568)
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:524)
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:284)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1693)
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1684)
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> java.lang.Thread.run(Thread.java:745)
> {code}
> in org.apache.flink.streaming.runtime.tasks.StreamTask handleAsyncException
> SteamTask only handleAsyncException when is Running==true
> [https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L1540]
> {code:java}
> @Override
> public void handleAsyncException(String message, Throwable exception) {
> if (isRunning) {
> // only fail if the task is still running
> asyncExceptionHandler.handleAsyncException(message, exception);
> }
> }
> {code}
> but during restore,isRunning==false
> [https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L673]
>
> So during Steam.restore, SteamTask skip exception in userfunction of
> ProcessWindowFunction.
>
>
> h4.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)