o-nikolas commented on code in PR #46265:
URL: https://github.com/apache/airflow/pull/46265#discussion_r1936038993
##########
tests/integration/executors/test_celery_executor.py:
##########
@@ -181,54 +178,47 @@ def fake_execute_command(command):
num_tasks,
)
sleep(0.4)
- assert list(executor.tasks.keys()) == [
- ("success", "fake_simple_ti", execute_date, 0),
- ("fail", "fake_simple_ti", execute_date, 0),
- ]
- assert (
- executor.event_buffer[("success", "fake_simple_ti",
execute_date, 0)][0] == State.QUEUED
- )
- assert executor.event_buffer[("fail", "fake_simple_ti",
execute_date, 0)][0] == State.QUEUED
+ assert list(executor.tasks.keys()) == keys
+ assert executor.event_buffer[keys[0]][0] == State.QUEUED
+ assert executor.event_buffer[keys[1]][0] == State.QUEUED
executor.end(synchronous=True)
- assert executor.event_buffer[("success", "fake_simple_ti",
execute_date, 0)][0] == State.SUCCESS
- assert executor.event_buffer[("fail", "fake_simple_ti", execute_date,
0)][0] == State.FAILED
+ assert executor.event_buffer[keys[0]][0] == State.SUCCESS
+ assert executor.event_buffer[keys[1]][0] == State.FAILED
- assert "success" not in executor.tasks
- assert "fail" not in executor.tasks
+ assert keys[0] not in executor.tasks
+ assert keys[1] not in executor.tasks
assert executor.queued_tasks == {}
def test_error_sending_task(self):
from airflow.providers.celery.executors import celery_executor
- def fake_execute_command():
+ def fake_task():
pass
- with _prepare_app(execute=fake_execute_command):
- # fake_execute_command takes no arguments while execute_command
takes 1,
+ with _prepare_app(execute=fake_task):
+ # fake_execute_command takes no arguments while execute_workload
takes 1,
Review Comment:
```suggestion
# fake_task takes no arguments while execute_workload takes 1,
```
##########
airflow/executors/base_executor.py:
##########
@@ -332,12 +351,12 @@ def trigger_tasks(self, open_slots: int) -> None:
:param open_slots: Number of open slots
"""
- span = Trace.get_current_span()
Review Comment:
l'm also curious about the state of this one.
##########
providers/celery/src/airflow/providers/celery/executors/celery_executor.py:
##########
@@ -228,6 +231,11 @@ class CeleryExecutor(BaseExecutor):
supports_ad_hoc_ti_run: bool = True
supports_sentry: bool = True
+ if TYPE_CHECKING and AIRFLOW_V_3_0_PLUS:
+ # In the v3 path, we store workloads, not commands as strings.
+ # TODO: TaskSDK: move this type change into BaseExecutor
Review Comment:
+1
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]