Would it be possible to recreate the experiments to try and isolate
variables? Right now the 3 cases change both beam and flink versions.



On Tue, Jul 26, 2022 at 11:35 PM Kenneth Knowles <k...@apache.org> wrote:

> Bumping this and adding +John Casey <johnjca...@google.com> who knows
> about KafkaIO and unbounded sources, though probably less about the
> FlinkRunner. It seems you have isolated it to the Flink translation logic.
> I'm not sure who would be the best expert to evaluate if that logic is
> still OK.
>
> Kenn
>
> On Wed, Jun 29, 2022 at 11:07 AM Kathula, Sandeep <
> sandeep_kath...@intuit.com> wrote:
>
>> Hi,
>>
>>    We have a stateless application which
>>
>>
>>
>>    1. Reads from kafka
>>    2. Doing some stateless transformations by reading from in memory
>>    databases and updating the records
>>    3. Writing back to Kafka.
>>
>>
>>
>>
>>
>>
>>
>> *With Beam 2.23 and Flink 1.9, we are seeing checkpoints are working fine
>> (it takes max 1 min).*
>>
>>
>>
>> *With Beam 2.29 and Flink 1.12, we are seeing checkpoints taking longer
>> time (it takes max 6-7 min sometimes)*
>>
>>
>>
>> *With Beam 2.38 and Flink 1.14, we are seeing checkpoints timing out
>> after 10 minutes.*
>>
>>
>>
>>
>>
>> I am checking Beam code and after some logging and analysis found the
>> problem is at
>> https://github.com/apache/beam/blob/release-2.38.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L287-L307
>>
>>
>>
>>
>>
>> We are still using the old API to read from Kafka and not yet using KafkaIO
>> based on SplittableDoFn.
>>
>>
>>
>> There are two threads
>>
>>    1. Legacy source thread reading from kafka and doing entire
>>    processing.
>>    2. Thread which emits watermark on timer
>>    
>> https://github.com/apache/beam/blob/release-2.38.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L454-L474
>>
>>
>>
>> Both these code blocks are in synchronized block waiting for same
>> checkpoint lock. Under heavy load, the thread reading from kafka is running
>> for ever in the while loop and  the thread emitting the watermarks is
>> waiting for ever to get the lock not emitting the watermarks and the
>> checkpoint times out.
>>
>>
>>
>>
>>
>> Is it a known issue and do we have any solution here? For now we are
>> putting Thread.sleep(1) once for every 10 sec after the synchronized block
>> so that the thread emitting the watermarks can be unblocked and run.
>>
>>
>>
>> One of my colleagues tried to follow up on this (attaching the previous
>> email here) but we didn’t get any reply. Any help on this would be
>> appreciated.
>>
>>
>>
>> Thanks,
>>
>> Sandeep
>>
>

Reply via email to