zoucao created FLINK-34964:
------------------------------
Summary: ScheduledTask leak in registering the processing timer
Key: FLINK-34964
URL: https://issues.apache.org/jira/browse/FLINK-34964
Project: Flink
Issue Type: Bug
Affects Versions: 1.20.0
Reporter: zoucao
Attachments: image-2024-03-29-16-40-11-928.png
I have come across a problem regarding a leak in the 'ScheduledTask' while
registering the processing timer. Upon further investigation, I have identified
two factors that are responsible for the leak.
*1. Registered 'ScheduledTask' has not been canceled*
see
`org.apache.flink.streaming.api.operators.InternalTimerServiceImpl#deleteProcessingTimeTimer`,
when a registered timer want be deleted, flink only removes it from the
'processingTimeTimersQueue'. However, it's possible that this timer is the
earliest one that will be triggered in the future and has been scheduled as a
task submitted to the ScheduledThreadPoolExecutor.
When deleting a registered timer, flink should check whether this timer is the
next triggered time, if true, the current 'ScheduledTask' should be canceled.
*2. Re-submit a timer earlier than the System.currentTimeMillis*
Considering a case, the current time-millis is 100, and there exist 100、101、102
in the processingQueue, timer-100 has been submitted to ScheduledThreadPool. At
this moment, the user registers a timer-99. 99 is less than 100(the peek timer
in queue), then Flink will cancel timer 100‘s task, and re-register using timer
99. However, before canceling timer-100, the thread pool has submitted it to
mailbox.
Then, the mail in mailbox is as follows:
{code:java}
-> * register timer-99
-> trigger timer-100
-> trigger timer-99
{code}
- when executing 'trigger timer 100', Flink will flush records whose timer
belongs to 99 and 100, then submit timer-101 to the scheduled thread pool.
- when executing 'trigger timer-99', no records need to flush, then it also
submits timer-101 to the scheduled thread pool, because timer-101 is the next
timer needs to trigger.
Obviously, Two tasks are registered to Flink's scheduled thread pool with the
same timer.
In our online job, the number of these leaked Scheduled Tasks could be in the
thousands, see the following figure.
Here an example is posted, convenient for reproducing the case-2.
{code:java}
@Test
public void testTimerTaskLeak() {
TaskMailboxImpl mailbox = new TaskMailboxImpl();
MailboxExecutor mailboxExecutor =
new MailboxExecutorImpl(
mailbox, 0, StreamTaskActionExecutor.IMMEDIATE);
SystemProcessingTimeService processingTimeService =
new SystemProcessingTimeService(ex -> handleException(ex));
ProcessingTimeServiceImpl timeService = new ProcessingTimeServiceImpl(
processingTimeService,
callback -> deferCallbackToMailbox(mailboxExecutor, callback));
TestKeyContext keyContext = new TestKeyContext();
Queue<String> mailQueue = new LinkedBlockingDeque<>();
long curr = System.currentTimeMillis();
InternalTimerServiceImpl<Integer, String> timerService =
createAndStartInternalTimerService(
mock(Triggerable.class),
keyContext,
timeService,
testKeyGroupRange,
createQueueFactory());
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(
() -> {
try {
keyContext.setCurrentKey(1);
mailboxExecutor.execute(
() ->
timerService.registerProcessingTimeTimer("void", curr + 6 * 1000L), "6");
Thread.sleep(2L);
mailboxExecutor.execute(
() ->
timerService.registerProcessingTimeTimer("void", curr + 7 * 1000L), "7");
Thread.sleep(2L);
mailboxExecutor.execute(
() ->
timerService.registerProcessingTimeTimer("void", curr + 8 * 1000L), "8");
Thread.sleep(2L);
mailboxExecutor.execute(
() -> {
timerService.registerProcessingTimeTimer("void", curr + 1);
}, "1");
mailboxExecutor.execute(
() -> {
Thread.sleep(3); // wait timer +1
submitted to mailbox
timerService.registerProcessingTimeTimer("void", curr - 5);
}, "-5");
Thread.sleep(5L);
mailboxExecutor.execute(
() ->
timerService.registerProcessingTimeTimer("void", curr + 4), "4");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
);
while (mailQueue.size() < 14) {
if (mailbox.mailQueue().size() > 0) {
String mail = mailbox.mailQueue().peek().toString();
if (mail.length() > 5) {
mailQueue.add("trigger " + (Long.parseLong(mail.split("@
")[1]) - curr));
} else {
mailQueue.add("register " + mail);
}
}
mailboxExecutor.tryYield();
}
System.out.println(mailQueue);
executorService.shutdownNow();
}
{code}
!image-2024-03-29-16-40-11-928.png!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)