Hi Beam community,

We have a simple pipeline which consumes data from a kafka source topic,
runs some transformations, and outputs to a kafka output topic. When we
have a high traffic, we observe that sometimes the checkpoint fails due to
timeout (>10 min). We are using flinkrunner 1.12. We have 60 Task Managers
and I have tested with maxParallelism=360 and maxParallelism=-1. The input
topic has 180 partitions.

We did some debugging and experiments and belows are our findings. Please
let us know if this makes sense or if there is anything we did wrong.

After some debugging, we are guessing the cause for the checkpoint timeout
is probably due to a lock contention between the application thread,
"Legacy Source Thread" (
https://github.com/apache/beam/blob/master/runners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2Ftranslation%2Fwrappers%2Fstreaming%2Fio%2FUnboundedSourceWrapper.java#L290)
and the checkpoint thread. Doing several thread dumps and getting the
ThreadInfo shows that the Legacy Source Thread is holding the lock, while
other threads are blocking on the checkpoint lock. For example, below shows
a sample output of the ThreadInfo:

threadName: Legacy Source Thread - Source: EventBusIO.Read/Read Bytes From
Kafka/Read(KafkaUnboundedSource) -> Flat Map -> EventBusIO.Read/Log
offset/Map/ParMultiDo(Anonymous) -> Filter Unreadeable
Messages/ParDo(Anonymous)/ParMultiDo(Anonymous) -> Json to
Event/FlatMap/ParMultiDo(Anonymous) -> Seperate empty and non empty ivids
-> (ToKV/AddKeys/Map/ParMultiDo(Anonymous), Unenriched Event to
Json/ParMultiDo(UnenrichedEventToString) -> Split Unenriched Events based
on 7216 compliance -> (Unenriched 7216 Producer
Record/ParMultiDo(ProducerService) ->
EventBusIO.WriteRecord2/formatter/ParMultiDo(EbFormatterDoFnPrdRec) ->
EventBusIO.WriteRecord2/actWriter/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter),
Unenriched Non 7216 Producer Record/ParMultiDo(ProducerService) ->
EventBusIO.WriteRecord/formatter/ParMultiDo(EbFormatterDoFnPrdRec) ->
EventBusIO.WriteRecord/actWriter/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter)))
(6/15)#0, threadPriority: -1, blockCount: 2332, blockTime: 23813,
waitCount: 780, waitTime: 44639, lockOwner: null, threadState: RUNNABLE

threadName: Source: EventBusIO.Read/Read Bytes From
Kafka/Read(KafkaUnboundedSource) -> Flat Map -> EventBusIO.Read/Log
offset/Map/ParMultiDo(Anonymous) -> Filter Unreadeable
Messages/ParDo(Anonymous)/ParMultiDo(Anonymous) -> Json to
Event/FlatMap/ParMultiDo(Anonymous) -> Seperate empty and non empty ivids
-> (ToKV/AddKeys/Map/ParMultiDo(Anonymous), Unenriched Event to
Json/ParMultiDo(UnenrichedEventToString) -> Split Unenriched Events based
on 7216 compliance -> (Unenriched 7216 Producer
Record/ParMultiDo(ProducerService) ->
EventBusIO.WriteRecord2/formatter/ParMultiDo(EbFormatterDoFnPrdRec) ->
EventBusIO.WriteRecord2/actWriter/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter),
Unenriched Non 7216 Producer Record/ParMultiDo(ProducerService) ->
EventBusIO.WriteRecord/formatter/ParMultiDo(EbFormatterDoFnPrdRec) ->
EventBusIO.WriteRecord/actWriter/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter)))
(6/15)#0, threadPriority: -1, blockCount: 2964, blockTime: 1061663,
waitCount: 1780, waitTime: 70783, lockOwner: Legacy Source Thread - Source:
EventBusIO.Read/Read Bytes From Kafka/Read(KafkaUnboundedSource) ...

We are guessing since the Legacy Source Thread has a while loop and within
the while loop, it is acquiring the checkpoint lock, perhaps there is some
optimization within JVM which causes non-fair thread scheduling or
acquisition of the checkpoint lock.

Our first hack is to add a Thread.yield() to here:
https://github.com/apache/beam/blob/master/runners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2Ftranslation%2Fwrappers%2Fstreaming%2Fio%2FUnboundedSourceWrapper.java#L305
This basically yield the thread so others can compete for the checkpoint
lock again.

With this hack, our checkpoint can be finished within 1 min.

We thus extract the code pattern of UnboundedSourceWrapper and write a
simple testing java program as attached to simulate the behavior. In this
code, there is a thread running Runner1. It has a loop and within the loop,
it synchronized on an object and sleeps for 10ms. I am using this to
simulate the application thread of Beam. I also have a Timer, which gets
triggered every 5 ms and when it gets triggered, it runs the Runner2 code.
The Runner2 code is also synchronized on the same object and just does an
output. I am using this timer to simulate the checkpointing thread of Beam.

I run this code for about 10 second and here is the output:

No of times Runner1 got called: 877
No of times Runner2 got called: 10
Percentage Runner1 acquire the lock: 98
Percentage Runner2 acquire the lock: 2

As indicated here, the simulated application thread, Thread1, is able to
acquire the lock 98% while the simulated checkpoint thread, timer, is only
able to acquire the lock 2%. This behavior is consistent with our
observation in our pipeline.

If I uncomment out the Thread.yield() statement and rerun it, the output
shows:

No of times Runner1 got called: 858
No of times Runner2 got called: 1001
Percentage Runner1 acquire the lock: 46
Percentage Runner2 acquire the lock: 54

This shows that Runner1 and Runner2 share the lock more uniformly. This is
also consistent with our workaround modification of the Beam code.

Any suggestions of how to approach the checkpoint timeout issue are
appreciated.

Thanks.

Antonio.

===============Testing Code===================

import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicInteger;

public class TestRunner {
    public static final Object lockObject = new Object();

    public static final AtomicInteger counter1 = new AtomicInteger();
    public static final AtomicInteger counter2 = new AtomicInteger();

    public static class Runner1 implements Runnable {
        private volatile boolean running = true;
        @Override
        public void run() {
            while (running) {
                synchronized (lockObject) {
                    counter1.getAndIncrement();
                    try {
                        Thread.sleep(10);
                    } catch (Exception e) {

                    }
                }
                // Thread.yield();
            }
        }

        public void stop() {
            running = false;
        }
    }

    public static class Runner2 extends TimerTask {
        @Override
        public void run() {
                synchronized (lockObject) {
                    counter2.getAndIncrement();
                }
        }
    }

    public static void main(String[] args) throws Exception {
        Runner1 runner1 = new Runner1();
        Runner2 runner2 = new Runner2();

        Thread thread1 = new Thread(runner1);
        Timer timer = new Timer();
        timer.scheduleAtFixedRate(runner2, 0, 10);

        thread1.start();

        Thread.sleep(10000);
        timer.cancel();
        runner1.stop();

        int runner1Percent = (counter1.get()*100
/(counter1.get()+counter2.get()));
        System.out.println("No of times Runner1 got called: " + counter1.get());
        System.out.println("No of times Runner2 got called: " + counter2.get());
        System.out.println("Percentage Runner1 acquire the lock: " +
runner1Percent);
        System.out.println("Percentage Runner2 acquire the lock: " +
(100-runner1Percent));
    }
}

Reply via email to