Would it be possible to recreate the experiments to try and isolate variables? Right now the 3 cases change both beam and flink versions.
On Tue, Jul 26, 2022 at 11:35 PM Kenneth Knowles <k...@apache.org> wrote: > Bumping this and adding +John Casey <johnjca...@google.com> who knows > about KafkaIO and unbounded sources, though probably less about the > FlinkRunner. It seems you have isolated it to the Flink translation logic. > I'm not sure who would be the best expert to evaluate if that logic is > still OK. > > Kenn > > On Wed, Jun 29, 2022 at 11:07 AM Kathula, Sandeep < > sandeep_kath...@intuit.com> wrote: > >> Hi, >> >> We have a stateless application which >> >> >> >> 1. Reads from kafka >> 2. Doing some stateless transformations by reading from in memory >> databases and updating the records >> 3. Writing back to Kafka. >> >> >> >> >> >> >> >> *With Beam 2.23 and Flink 1.9, we are seeing checkpoints are working fine >> (it takes max 1 min).* >> >> >> >> *With Beam 2.29 and Flink 1.12, we are seeing checkpoints taking longer >> time (it takes max 6-7 min sometimes)* >> >> >> >> *With Beam 2.38 and Flink 1.14, we are seeing checkpoints timing out >> after 10 minutes.* >> >> >> >> >> >> I am checking Beam code and after some logging and analysis found the >> problem is at >> https://github.com/apache/beam/blob/release-2.38.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L287-L307 >> >> >> >> >> >> We are still using the old API to read from Kafka and not yet using KafkaIO >> based on SplittableDoFn. >> >> >> >> There are two threads >> >> 1. Legacy source thread reading from kafka and doing entire >> processing. >> 2. Thread which emits watermark on timer >> >> https://github.com/apache/beam/blob/release-2.38.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L454-L474 >> >> >> >> Both these code blocks are in synchronized block waiting for same >> checkpoint lock. Under heavy load, the thread reading from kafka is running >> for ever in the while loop and the thread emitting the watermarks is >> waiting for ever to get the lock not emitting the watermarks and the >> checkpoint times out. >> >> >> >> >> >> Is it a known issue and do we have any solution here? For now we are >> putting Thread.sleep(1) once for every 10 sec after the synchronized block >> so that the thread emitting the watermarks can be unblocked and run. >> >> >> >> One of my colleagues tried to follow up on this (attaching the previous >> email here) but we didn’t get any reply. Any help on this would be >> appreciated. >> >> >> >> Thanks, >> >> Sandeep >> >