o-nikolas commented on code in PR #46265:
URL: https://github.com/apache/airflow/pull/46265#discussion_r1936038993


##########
tests/integration/executors/test_celery_executor.py:
##########
@@ -181,54 +178,47 @@ def fake_execute_command(command):
                         num_tasks,
                     )
                     sleep(0.4)
-                assert list(executor.tasks.keys()) == [
-                    ("success", "fake_simple_ti", execute_date, 0),
-                    ("fail", "fake_simple_ti", execute_date, 0),
-                ]
-                assert (
-                    executor.event_buffer[("success", "fake_simple_ti", 
execute_date, 0)][0] == State.QUEUED
-                )
-                assert executor.event_buffer[("fail", "fake_simple_ti", 
execute_date, 0)][0] == State.QUEUED
+                assert list(executor.tasks.keys()) == keys
+                assert executor.event_buffer[keys[0]][0] == State.QUEUED
+                assert executor.event_buffer[keys[1]][0] == State.QUEUED
 
                 executor.end(synchronous=True)
 
-        assert executor.event_buffer[("success", "fake_simple_ti", 
execute_date, 0)][0] == State.SUCCESS
-        assert executor.event_buffer[("fail", "fake_simple_ti", execute_date, 
0)][0] == State.FAILED
+        assert executor.event_buffer[keys[0]][0] == State.SUCCESS
+        assert executor.event_buffer[keys[1]][0] == State.FAILED
 
-        assert "success" not in executor.tasks
-        assert "fail" not in executor.tasks
+        assert keys[0] not in executor.tasks
+        assert keys[1] not in executor.tasks
 
         assert executor.queued_tasks == {}
 
     def test_error_sending_task(self):
         from airflow.providers.celery.executors import celery_executor
 
-        def fake_execute_command():
+        def fake_task():
             pass
 
-        with _prepare_app(execute=fake_execute_command):
-            # fake_execute_command takes no arguments while execute_command 
takes 1,
+        with _prepare_app(execute=fake_task):
+            # fake_execute_command takes no arguments while execute_workload 
takes 1,

Review Comment:
   ```suggestion
               # fake_task takes no arguments while execute_workload takes 1,
   ```



##########
airflow/executors/base_executor.py:
##########
@@ -332,12 +351,12 @@ def trigger_tasks(self, open_slots: int) -> None:
 
         :param open_slots: Number of open slots
         """
-        span = Trace.get_current_span()

Review Comment:
   l'm also curious about the state of this one.



##########
providers/celery/src/airflow/providers/celery/executors/celery_executor.py:
##########
@@ -228,6 +231,11 @@ class CeleryExecutor(BaseExecutor):
     supports_ad_hoc_ti_run: bool = True
     supports_sentry: bool = True
 
+    if TYPE_CHECKING and AIRFLOW_V_3_0_PLUS:
+        # In the v3 path, we store workloads, not commands as strings.
+        # TODO: TaskSDK: move this type change into BaseExecutor

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to