Hi Beam community, We have a simple pipeline which consumes data from a kafka source topic, runs some transformations, and outputs to a kafka output topic. When we have a high traffic, we observe that sometimes the checkpoint fails due to timeout (>10 min). We are using flinkrunner 1.12. We have 60 Task Managers and I have tested with maxParallelism=360 and maxParallelism=-1. The input topic has 180 partitions.
We did some debugging and experiments and belows are our findings. Please let us know if this makes sense or if there is anything we did wrong. After some debugging, we are guessing the cause for the checkpoint timeout is probably due to a lock contention between the application thread, "Legacy Source Thread" ( https://github.com/apache/beam/blob/master/runners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2Ftranslation%2Fwrappers%2Fstreaming%2Fio%2FUnboundedSourceWrapper.java#L290) and the checkpoint thread. Doing several thread dumps and getting the ThreadInfo shows that the Legacy Source Thread is holding the lock, while other threads are blocking on the checkpoint lock. For example, below shows a sample output of the ThreadInfo: threadName: Legacy Source Thread - Source: EventBusIO.Read/Read Bytes From Kafka/Read(KafkaUnboundedSource) -> Flat Map -> EventBusIO.Read/Log offset/Map/ParMultiDo(Anonymous) -> Filter Unreadeable Messages/ParDo(Anonymous)/ParMultiDo(Anonymous) -> Json to Event/FlatMap/ParMultiDo(Anonymous) -> Seperate empty and non empty ivids -> (ToKV/AddKeys/Map/ParMultiDo(Anonymous), Unenriched Event to Json/ParMultiDo(UnenrichedEventToString) -> Split Unenriched Events based on 7216 compliance -> (Unenriched 7216 Producer Record/ParMultiDo(ProducerService) -> EventBusIO.WriteRecord2/formatter/ParMultiDo(EbFormatterDoFnPrdRec) -> EventBusIO.WriteRecord2/actWriter/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter), Unenriched Non 7216 Producer Record/ParMultiDo(ProducerService) -> EventBusIO.WriteRecord/formatter/ParMultiDo(EbFormatterDoFnPrdRec) -> EventBusIO.WriteRecord/actWriter/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter))) (6/15)#0, threadPriority: -1, blockCount: 2332, blockTime: 23813, waitCount: 780, waitTime: 44639, lockOwner: null, threadState: RUNNABLE threadName: Source: EventBusIO.Read/Read Bytes From Kafka/Read(KafkaUnboundedSource) -> Flat Map -> EventBusIO.Read/Log offset/Map/ParMultiDo(Anonymous) -> Filter Unreadeable Messages/ParDo(Anonymous)/ParMultiDo(Anonymous) -> Json to Event/FlatMap/ParMultiDo(Anonymous) -> Seperate empty and non empty ivids -> (ToKV/AddKeys/Map/ParMultiDo(Anonymous), Unenriched Event to Json/ParMultiDo(UnenrichedEventToString) -> Split Unenriched Events based on 7216 compliance -> (Unenriched 7216 Producer Record/ParMultiDo(ProducerService) -> EventBusIO.WriteRecord2/formatter/ParMultiDo(EbFormatterDoFnPrdRec) -> EventBusIO.WriteRecord2/actWriter/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter), Unenriched Non 7216 Producer Record/ParMultiDo(ProducerService) -> EventBusIO.WriteRecord/formatter/ParMultiDo(EbFormatterDoFnPrdRec) -> EventBusIO.WriteRecord/actWriter/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter))) (6/15)#0, threadPriority: -1, blockCount: 2964, blockTime: 1061663, waitCount: 1780, waitTime: 70783, lockOwner: Legacy Source Thread - Source: EventBusIO.Read/Read Bytes From Kafka/Read(KafkaUnboundedSource) ... We are guessing since the Legacy Source Thread has a while loop and within the while loop, it is acquiring the checkpoint lock, perhaps there is some optimization within JVM which causes non-fair thread scheduling or acquisition of the checkpoint lock. Our first hack is to add a Thread.yield() to here: https://github.com/apache/beam/blob/master/runners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2Ftranslation%2Fwrappers%2Fstreaming%2Fio%2FUnboundedSourceWrapper.java#L305 This basically yield the thread so others can compete for the checkpoint lock again. With this hack, our checkpoint can be finished within 1 min. We thus extract the code pattern of UnboundedSourceWrapper and write a simple testing java program as attached to simulate the behavior. In this code, there is a thread running Runner1. It has a loop and within the loop, it synchronized on an object and sleeps for 10ms. I am using this to simulate the application thread of Beam. I also have a Timer, which gets triggered every 5 ms and when it gets triggered, it runs the Runner2 code. The Runner2 code is also synchronized on the same object and just does an output. I am using this timer to simulate the checkpointing thread of Beam. I run this code for about 10 second and here is the output: No of times Runner1 got called: 877 No of times Runner2 got called: 10 Percentage Runner1 acquire the lock: 98 Percentage Runner2 acquire the lock: 2 As indicated here, the simulated application thread, Thread1, is able to acquire the lock 98% while the simulated checkpoint thread, timer, is only able to acquire the lock 2%. This behavior is consistent with our observation in our pipeline. If I uncomment out the Thread.yield() statement and rerun it, the output shows: No of times Runner1 got called: 858 No of times Runner2 got called: 1001 Percentage Runner1 acquire the lock: 46 Percentage Runner2 acquire the lock: 54 This shows that Runner1 and Runner2 share the lock more uniformly. This is also consistent with our workaround modification of the Beam code. Any suggestions of how to approach the checkpoint timeout issue are appreciated. Thanks. Antonio. ===============Testing Code=================== import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.atomic.AtomicInteger; public class TestRunner { public static final Object lockObject = new Object(); public static final AtomicInteger counter1 = new AtomicInteger(); public static final AtomicInteger counter2 = new AtomicInteger(); public static class Runner1 implements Runnable { private volatile boolean running = true; @Override public void run() { while (running) { synchronized (lockObject) { counter1.getAndIncrement(); try { Thread.sleep(10); } catch (Exception e) { } } // Thread.yield(); } } public void stop() { running = false; } } public static class Runner2 extends TimerTask { @Override public void run() { synchronized (lockObject) { counter2.getAndIncrement(); } } } public static void main(String[] args) throws Exception { Runner1 runner1 = new Runner1(); Runner2 runner2 = new Runner2(); Thread thread1 = new Thread(runner1); Timer timer = new Timer(); timer.scheduleAtFixedRate(runner2, 0, 10); thread1.start(); Thread.sleep(10000); timer.cancel(); runner1.stop(); int runner1Percent = (counter1.get()*100 /(counter1.get()+counter2.get())); System.out.println("No of times Runner1 got called: " + counter1.get()); System.out.println("No of times Runner2 got called: " + counter2.get()); System.out.println("Percentage Runner1 acquire the lock: " + runner1Percent); System.out.println("Percentage Runner2 acquire the lock: " + (100-runner1Percent)); } }
