ashb commented on code in PR #48239:
URL: https://github.com/apache/airflow/pull/48239#discussion_r2012203947


##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -112,6 +112,7 @@ def _convert_variable_result_to_variable(var_result: 
VariableResult, deserialize
 
 def _get_connection(conn_id: str) -> Connection:
     from airflow.sdk.execution_time.supervisor import SECRETS_BACKEND
+

Review Comment:
   Nit: otherwise unchanged file
   ```suggestion
   ```



##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -244,6 +271,14 @@ def upload_to_remote(self):
         upload_to_remote(self.bound_logger)
 
 
[email protected]
+def in_process_api_server() -> InProcessExecutionAPI:

Review Comment:
   Do we need to cache this and the client property of the Supervisor? Wouldn't 
it be enough to only cache the resulting client object?
   
   (Yes, you are probably copying my own PR, so in effect I'm critiquing my own 
code here)



##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -443,7 +443,12 @@ def get_message(self) -> ReceiveMsgType:
 
         This will block until the message has been received.
         """
-        line = self.input.readline()
+        line = None
+
+        # We need to investigate why some lines are empty ("")
+        while not line:
+            line = self.input.readline()
+

Review Comment:
   That is slightly worrying, this means something is sending an empty line to 
the processes stdin, which "shouldn't" happen



##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -192,9 +202,22 @@ class TriggerStateChanges(BaseModel):
         finished: list[int] | None = None
 
 
+def get_discriminator_value(v: Any) -> Any:
+    if isinstance(v, dict):
+        return v.get("kind", v.get("type"))
+    return getattr(v, "kind", getattr(v, "type", None))
+
+
 ToTriggerRunner = Annotated[
-    Union[workloads.RunTrigger, messages.CancelTriggers, 
messages.StartTriggerer],
-    Field(discriminator="kind"),
+    Union[
+        Annotated[workloads.RunTrigger, Tag("RunTrigger")],
+        Annotated[messages.CancelTriggers, Tag("CancelTriggersMessage")],
+        Annotated[messages.StartTriggerer, Tag("StartTriggerer")],
+        Annotated[ConnectionResult, Tag("ConnectionResult")],
+        Annotated[VariableResult, Tag("VariableResult")],
+        Annotated[ErrorResponse, Tag("ErrorResponse")],
+    ],
+    Field(discriminator=Discriminator(get_discriminator_value)),

Review Comment:
   The Dag processor uses this pattern:
   
   ```
       Union["DagFileParsingResult", GetConnection, GetVariable],
       Field(discriminator="type"),
   ]
   ```
   Which is slightly simpler -- can we use that here too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to