ashb commented on code in PR #51699:
URL: https://github.com/apache/airflow/pull/51699#discussion_r2150424252


##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -691,14 +690,58 @@ class TriggerDetails(TypedDict):
     events: int
 
 
+@attrs.define(kw_only=True)
+class TriggerCommsDecoder(CommsDecoder[ToTriggerRunner, ToTriggerSupervisor]):
+    _async_writer: asyncio.StreamWriter = attrs.field(alias="async_writer")
+    _async_reader: asyncio.StreamReader = attrs.field(alias="async_reader")
+
+    body_decoder: TypeAdapter[ToTriggerRunner] = attrs.field(
+        factory=lambda: TypeAdapter(ToTriggerRunner), repr=False
+    )
+
+    _lock: asyncio.Lock = attrs.field(factory=asyncio.Lock, repr=False)
+
+    def _read_frame(self):
+        from asgiref.sync import async_to_sync
+
+        return async_to_sync(self._aread_frame)()
+
+    def send(self, msg: ToTriggerSupervisor) -> ToTriggerRunner | None:
+        from asgiref.sync import async_to_sync
+
+        return async_to_sync(self.asend)(msg)
+
+    async def _aread_frame(self):
+        len_bytes = await self._async_reader.readexactly(4)
+        len = int.from_bytes(len_bytes, byteorder="big")
+
+        buffer = await self._async_reader.readexactly(len)
+        return self.resp_decoder.decode(buffer)

Review Comment:
   decode? No, or at least it's not doing any IO



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to