Just a quick question, are we talking about classic or portable local Flink runner? Does the same behavior exhibit on both of these?

 Jan

On 2/4/22 02:24, John Casey wrote:
I was able to reproduce this using a Flink runner as well, with a local Flink cluster. I haven't tried using a timer yet with a non-KafkaIO transform, which is my next step of investigation.

Thanks,
John

On Thu, Feb 3, 2022 at 7:30 PM Kenneth Knowles <[email protected]> wrote:

    I know we chatted about this off-list, but I wanted to just follow
    up and see if you figured it out. Sounds like it could be an
    important bug in the DirectRunner. I don't recall whether it
    reproduces on e.g. local Flink/Spark Runner or the Python local
    portable runner.

    Kenn

    On Tue, Feb 1, 2022 at 1:10 PM John Casey <[email protected]>
    wrote:

        I'm investigating an issue where KafkaIO.read.withDynamicRead
        doesn't appear to be working properly when used with the SDF
        based reader. Specifically, it doesn't appear that the
        pipeline picks up any new topics or partitions.

        I'm running locally using the DirectRunner, and I've set
        breakpoints at the start of
        WatchKafkaTopPartitionDoFn::processElement and ~::onTimer.

        It looks like the initial processElement works fine. It is
        called once, and populates the pipeline with the initial state
        of Kafka. However, the onTimer method is never called. I've
        configured the timer to be 1 minute, and I've waited about 20
        minutes, but the method never gets called, which means that no
        new partitions are set up.

        My current (unvalidated) suspicion is that the way we are
        creating splits for Kafka is causing the timer to never hit
        that 1 minute mark, preventing onTimer from being called.

        Is someone familiar with how Java Timers work or what might be
        causing a timer to not trigger?

        Thanks,
        John

Reply via email to