ashb commented on code in PR #51699: URL: https://github.com/apache/airflow/pull/51699#discussion_r2150424252
########## airflow-core/src/airflow/jobs/triggerer_job_runner.py: ########## @@ -691,14 +690,58 @@ class TriggerDetails(TypedDict): events: int +@attrs.define(kw_only=True) +class TriggerCommsDecoder(CommsDecoder[ToTriggerRunner, ToTriggerSupervisor]): + _async_writer: asyncio.StreamWriter = attrs.field(alias="async_writer") + _async_reader: asyncio.StreamReader = attrs.field(alias="async_reader") + + body_decoder: TypeAdapter[ToTriggerRunner] = attrs.field( + factory=lambda: TypeAdapter(ToTriggerRunner), repr=False + ) + + _lock: asyncio.Lock = attrs.field(factory=asyncio.Lock, repr=False) + + def _read_frame(self): + from asgiref.sync import async_to_sync + + return async_to_sync(self._aread_frame)() + + def send(self, msg: ToTriggerSupervisor) -> ToTriggerRunner | None: + from asgiref.sync import async_to_sync + + return async_to_sync(self.asend)(msg) + + async def _aread_frame(self): + len_bytes = await self._async_reader.readexactly(4) + len = int.from_bytes(len_bytes, byteorder="big") + + buffer = await self._async_reader.readexactly(len) + return self.resp_decoder.decode(buffer) Review Comment: decode? No, or at least it's not doing any IO -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org