dheerajturaga commented on code in PR #60675:
URL: https://github.com/apache/airflow/pull/60675#discussion_r2701247925


##########
providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py:
##########
@@ -73,7 +73,7 @@
     # the type as the union of both kinds
     CommandType = Sequence[str]
 
-    TaskInstanceInCelery: TypeAlias = tuple[TaskInstanceKey, workloads.All | 
CommandType, str | None, Task]
+    TaskInstanceInCelery: TypeAlias = tuple[TaskInstanceKey, workloads.All | 
CommandType, str | None, str]

Review Comment:
   The fourth element is typed as str (non-optional), but self.team_name in the 
BaseExecutor class is typed as str | None. When the code creates tuples with 
self.team_name, it's passing a potentially None value where a non-optional str 
is expected.
   ```suggestion
       TaskInstanceInCelery: TypeAlias = tuple[
           TaskInstanceKey, workloads.All | CommandType, str | None, str | None
       ]
   ```



##########
providers/celery/tests/integration/celery/test_celery_executor.py:
##########
@@ -152,11 +162,24 @@ def teardown_method(self) -> None:
     def test_celery_integration(self, broker_url, executor_config):
         from airflow.providers.celery.executors import celery_executor, 
celery_executor_utils
 
-        def fake_execute_workload(command):
-            if "fail" in command:
-                raise AirflowException("fail")
-
-        with _prepare_app(broker_url, execute=fake_execute_workload) as app:
+        if AIRFLOW_V_3_0_PLUS:
+            # Airflow 3: execute_workload receives JSON string
+            def fake_execute(input: str):
+                """Fake execute_workload that parses JSON and fails for tasks 
with 'fail' in task_id."""
+                import json
+
+                workload_dict = json.loads(input)
+                # Check if this is a task that should fail (task_id contains 
"fail")
+                if "ti" in workload_dict and "task_id" in workload_dict["ti"]:
+                    if "fail" in workload_dict["ti"]["task_id"]:
+                        raise AirflowException("fail")
+        else:
+            # Airflow 2: execute_command receives command list
+            def fake_execute(command):

Review Comment:
   Mypy requires all conditional function variants to have identical signatures
   ```suggestion
               def fake_execute(command: str):
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to