ashb commented on a change in pull request #15037:
URL: https://github.com/apache/airflow/pull/15037#discussion_r602552115
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1245,6 +1252,9 @@ def _process_executor_events(self, session: Session =
None) -> int:
self.processor_agent.send_callback_to_execute(request)
+ if i % CALLBACK_SEND_BATCH_SIZE == 0:
Review comment:
Another option:
```python console
>>> p1, p2 = multiprocessing.Pipe()
>>> fcntl.fcntl(p1.fileno(), fcntl.F_SETFL, os.O_NONBLOCK |
fcntl.fcntl(p1.fileno(), fcntl.F_GETFL))
>>> p1.send('a' * 1024000) # Simulate something that would block
BlockingIOError: [Errno 11] Resource temporarily unavailable
```
We could set the socket in DagFileManager (the receiving side) to
non-blocking, that way if the send would fail we could catch it, and queue it
there, and then go and poll again.?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]