xBis7 commented on code in PR #63180:
URL: https://github.com/apache/airflow/pull/63180#discussion_r2994897453


##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py:
##########
@@ -59,6 +62,19 @@
 }
 
 
+def _batched_ndjson_stream(

Review Comment:
   @dsuhinin I would suggest renaming the method to something like 
`_buffered_ndjson_stream` because batch suggests that a timer is involved. 
Also, can you add some simple unit tests to avoid confusion on how it works?



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py:
##########
@@ -59,6 +62,19 @@
 }
 
 
+def _batched_ndjson_stream(

Review Comment:
   @jason810496 I tested it and also went over it with the debugger. There 
isn't an issue and let me explain why.
   
   Below in lines 164-171, we have these changes
   
   ```diff
   - log_stream = task_log_reader.read_log_stream(ti, try_number, metadata)
   + raw_stream = task_log_reader.read_log_stream(ti, try_number, metadata)
   + log_stream = _batched_ndjson_stream(raw_stream)
   ...
   ...
   return StreamingResponse(media_type="application/x-ndjson", 
content=log_stream, headers=headers)
   ```
   
   `read_log_stream` returns a generator that wraps the logs.
   
   `StreamingResponse` pulls logs from the generator `log_stream` until 
finished.
   
   `_batched_ndjson_stream` iterates the stream lines and appends them to `buf` 
and then yields the buf as a string.
   
   * Example: 700 log lines
       * `_batched_ndjson_stream` starts iterating over the lines (line 69)
       * when it reaches 500, then it yields
       * the method pauses
       * `log_stream` is passed to the `StreamingResponse`
       * once the `StreamingResponse` gets all 500 logs, it tries to read the 
next and that returns the execution back to the `_batched_ndjson_stream` 
because we haven't reached the end of the logs
       * next line is clearing the `buf` and the for loop continues
       * for loop exits and we don't reach 500, `buf` holds 200 logs and yields 
them
       * `StreamingResponse` unblocks and continues pulling logs, until all 200 
are pulled and the generator raises a `StopIteration` signal.
   
   
   * Example: 1 log record every 5 secs and total task execution 15 minutes
       * 180 log lines in total
       *  `read_log_stream` returns 1 log line
       *  `_batched_ndjson_stream` iterates over the 1 log line, exits the loop 
and yields the 1 log line (line 75)
       * `StreamingResponse` pulls 1 log line, the generator raises a 
`StopIteration` signal
   
   
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to