potiuk commented on PR #27758:
URL: https://github.com/apache/airflow/pull/27758#issuecomment-1369663970

   Just for the record here as well (and maybe comments from others). 
   
   I have a proposal how we can improve the proposal from that PR to make it 
more scalable in case we find we need it. 
   It might be a premature optimisation to do it now, so [I do not block the 
current approach at 
all](https://github.com/apache/airflow/pull/27758#discussion_r1060424069). 
   
   But if we find that the current propsal has a significant limitations, we 
can adjust the current proposal even now I think. It will maybe take a bit more 
time to implement but it's scalability factor is far better IMHO. 
   
   ## Basic assumption
   
   * all of the deferrred tasks in Triggerer are executing as very short tasks 
executed in asyncio loop. Usually they are mostly doing nothing (i.e. loop is 
handling other tasks or doing nothing) but they are executing in triggerer, 
they do something for maybe a 100 ms at a time max. 
   * some of those deffered tasks might write several log lines during those 
100ms or so in a quick succession. Sometimes this will be a single log entry 
("waiting") but sometimes they want to write several messages for more details. 
Both cases are expected
   * due to overhead connected with opening file/socket and the fact that we do 
not possibly want to keep those files/sockets opened all the time, this means 
that the "ideal" way of handling logs (focusing of file but socket is the same):
   
   1) open_file()
   2) write all messages from the single "awake"
   3) close_file()
   
   Current proposal works in the way that it opens the file the first time the 
log is written, keeps it open through the whole lifecycle of the task and then 
closes the file when the task closes it (possibly - I am not 100% sure if all 
the closing scenarios are handled here - but certainly it can be done to close 
all handlers after the task is not deferred any more).
   
   ## Improvement proposal
   
   The proposal is that we:
   
   1) keep the proposal from this PR that logs from triggerer are sent to 
memory queue (or queues - see below) - this is needed for asyncio non-blocking 
behaviour
   
   2) make all handlers re-entrable  - i.e. make all handlers handle the case 
where you can call the set_context(), loop of emits(), close() multiple times 
on the same handler
   
   3) handle the complexity of opening/closing in the right way/time to the 
process that reads the messages from the queues). The process that reads from 
the queue will work in the way to accomodate the "burst" pattern. When a 
message is received from a specific task instance, it will wait for a while for 
all messages from that task instance and utilise exsistinng re-entrable handler 
(single handler for all task instances) to do it this way:
   
   * set_context(task_instance)
   * write all messages for that task instance()
   * close()
   
   This way the file will be opened only for the time of writing all messages 
from that particular task instance.
   
   This can use today's "airlfow.task" logger without any modifications (The 
queue reading process will just write all the messages to standard 
"airflow.task" logger - as long as the handler is re-eentrable. It would loook 
something like:
   
   ```
   logs = log_queue.pop_all_recent_messages(task_instance)
   logger  = Logging.get_logger("airflow.task")
   handler = logger.get_handler()
   handler.set_context(ti=task_instance)
   try:
     for log in logs:
        logger.log(log)
   finally:
     handler.close()
   ```
   
   The only difficulty is to make requireent that handlers should be 
re-entreable (in terms of being able to handle repeated set_context()/close()). 
I think most of our handlers are very close to be able to handle it (from 
looking at the code at least).
   
   
   4) we can also make it more scalable introducing multiple queues/processes 
handling them - reducing 1000s of deferred tasks to small N (say 10 for 
example) processes that handle logs send by the tasks. It should be rather easy 
- we coul calculate a hash of task instance and direct the messages to 1 of the 
N queues based on the hash - this way all the logs from the same task instance 
would go to the same queue. Then we can very easily multiply the number of 
processes to handle writing the logs to low-single-digit numbers - thus we will 
have at most N opened files/sockets at a time.
   
   
   I think this is not very complex and doable, I think it **might** be a 
premature optimisation, and we should run tests to see if the proposed approch 
in this PR is good-enough, but if we find that the limits are "acceptable", we 
can go for the Wrapper first and see if there is a need to implement a more 
complete/scalable solution. 
   
   I hope I explained it clearly and if there are any comments - happy to hear 
them. Anyway I would be really interested in seeing the limits/tests results 
and reviewing the wrapper code/docs to make sure we communicate any problems 
coming from resource limiting to the users in the way they can react to those 
problems on their own and configure their system properly if such problems 
occur.
    
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to