ashb opened a new pull request, #51699:
URL: https://github.com/apache/airflow/pull/51699

   Closes #46426
   
   The existing JSON Lines based approach had two major drawbacks
   
   1. In the case of really large lines (in the region of 10 or 20MB) the python
      line buffering could _sometimes_ result in a partial read
   2. The JSON based approach didn't have the ability to add any metadata (such
      as errors).
   3. Not every message type/call-site waited for a response, which meant those
      client functions could never get told about an error
   
   One of the ways this line-based approach fell down was if you suddenly tried
   to run 100s of triggers at the same time you would get an error like this:
   
   ```
   Traceback (most recent call last):
     File 
"/Users/ash/.local/share/uv/python/cpython-3.12.7-macos-aarch64-none/lib/python3.12/asyncio/streams.py",
 line 568, in readline
       line = await self.readuntil(sep)
              ^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/Users/ash/.local/share/uv/python/cpython-3.12.7-macos-aarch64-none/lib/python3.12/asyncio/streams.py",
 line 663, in readuntil
       raise exceptions.LimitOverrunError(
   asyncio.exceptions.LimitOverrunError: Separator is found, but chunk is 
longer than limit
   ```
   
   The other way this caused problems was if you parse a large dag (as in one
   with 20k tasks or more) the DagFileProcessor could end up getting a partial
   read which would be invalid JSON.
   
   This changes the communications protocol in in a couple of ways.
   
   First off at the python level the separate send and receive methods in the
   client/task side have been removed and replaced with a single `send()` that
   sends the request, reads the response and raises an error if one is returned.
   (But note, right now almost nothing in the supervisor side sets the error,
   that will be a future PR.)
   
   Secondly the JSON Lines approach has been changed from a line-based protocol
   to a binary "frame" one. The protocol (which is the same for whichever side 
is
   sending) is length-prefixed, i.e. we first send the length of the data as a
   4byte big-endian integer, followed by the data itself. This should remove the
   possibility of JSON parse errors due to reading incomplete lines
   
   Finally the last change made in this PR is to remove the "extra" requests
   socket/channel. Upon closer examination with this comms path I realised that
   this socket is unnecessary: Since we are in 100% control of the client side 
we
   can make use of the bi-directional nature of `socketpair` and save file
   handles. This also happens to help the `run_as_user` feature which is
   currently broken, as without extra config to `sudoers` file, `sudo` will 
close
   all filehandles other than stdin, stdout, and stderr -- so by introducing 
this
   change we make it easier to re-add run_as_user support.
   
   In order to support this in the DagFileProcessor (as the fact that the proc
   manager uses a single selector for multiple processes) means I have moved the
   `on_close` callback to be part of the object we store in the `selector` 
object
   in the supervisors, previoulsy it was the "on_read" callback, now we store a
   tuple of `(on_read, on_close)` and on_close is called once universally.
   
   This also changes the way comms are handled from the (async) TriggerRunner
   process. Previously we had a sync+async lock, but that made it possible to 
end
   up deadlocking things. The change now is to have `send` on
   `TriggerCommsDecoder` "go back" to the async even loop via `async_to_sync`, 
so
   that only async code deals with the socket, and we can use an async lock
   (rather than the hybrid sync and async lock we tried before). This seems to
   help the deadlock issue, but I'm not 100% sure it will remove it entirely, 
but
   it makes it much much harder to hit - I've not been able to reprouce it with
   this change.
   
   Fixes #50185, #51213, closes #51279
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to