aliosia opened a new issue, #27238:
URL: https://github.com/apache/beam/issues/27238

   ### What happened?
   
   I wrote my stream processing task with Python and Apache Beam. My pipeline 
has ReadFromKafka, WindowInto fixed windows and GroupByKey. I run it on 
Dataflow runner. Everything works fine except window triggering which has 
delay. There is a lag even when I use early AfterProcessing:
   
   ```
   | 'Read from Kafka' >>  ReadFromKafka(consumer_config=consumer_config, 
topics=[topic])
   | 'Convert' >> ParDo(Convert())
   | 'Fixed window' >> WindowInto(window.FixedWindows(1)
            , 
trigger=trigger.AfterWatermark(early=trigger.AfterProcessingTime(delay=1))
            , accumulation_mode=trigger.AccumulationMode.ACCUMULATING)
   | "Group by key" >> GroupByKey()
   | "Debug windows" >> 
ParDo(DebugWindow()).with_output_types(typing.Tuple[bytes, bytes]) 
   ```
   
   In the DebugWindow I have added a log to estimate the delay in this way: 
   
   ```
   window_end = window.end.to_utc_datetime()
   current_time = datetime.utcnow()
   delay = (current_time - window_end).total_seconds()
   logging.info(f'window delay: {str(delay)}')
   ```
   The behavior of this delay is strange: it is decreasing and the suddenly 
jump and increase again. The following image show this behavior: 
   
   
   
![beam](https://github.com/apache/beam/assets/5106765/e623cf1b-a473-4f58-be64-4ef4fcc015d7)
   
   The left column is the dataflow log time which is trigger time I guess and 
the right column is the logged delay. As you see there is more than a 1s delay 
in trigger time (42s to 44s) and the processing delay suddenly jumps to 172s.
   What is the problem? I need to have a near real-time (less than 2s) pipeline.
   
   P.S. I have tested increasing number of workers and increasing the number of 
Kafka partitions but none of them help.
   
   
   ### Issue Priority
   
   Priority: 1 (data loss / total loss of function)
   
   ### Issue Components
   
   - [X] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [X] Component: IO connector
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [X] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to