This is an automated email from the ASF dual-hosted git repository.

amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 784e1a9344c Construct task instances with `ExecuteTask.make` for 
celery executor integration tests (#61311)
784e1a9344c is described below

commit 784e1a9344c0f27f59d4130d8f38f5fc1692f581
Author: Pratiksha <[email protected]>
AuthorDate: Mon Feb 2 21:57:54 2026 +0530

    Construct task instances with `ExecuteTask.make` for celery executor 
integration tests (#61311)
---
 .../integration/celery/test_celery_executor.py     | 25 ++++++++++++++++++----
 1 file changed, 21 insertions(+), 4 deletions(-)

diff --git a/providers/celery/tests/integration/celery/test_celery_executor.py 
b/providers/celery/tests/integration/celery/test_celery_executor.py
index 0b92eec6ce9..3b055dd0b4d 100644
--- a/providers/celery/tests/integration/celery/test_celery_executor.py
+++ b/providers/celery/tests/integration/celery/test_celery_executor.py
@@ -23,7 +23,7 @@ import logging
 import os
 import sys
 from ast import literal_eval
-from datetime import datetime
+from datetime import datetime, timedelta
 from time import sleep
 from unittest import mock
 
@@ -39,12 +39,14 @@ from kombu.asynchronous import set_event_loop
 from kubernetes.client import models as k8s
 from uuid6 import uuid7
 
+from airflow._shared.timezones import timezone
 from airflow.configuration import conf
 from airflow.executors import workloads
 from airflow.models.dag import DAG
 from airflow.models.taskinstance import TaskInstance
 from airflow.providers.common.compat.sdk import AirflowException, 
AirflowTaskTimeout, TaskInstanceKey
 from airflow.providers.standard.operators.bash import BashOperator
+from airflow.sdk import BaseOperator
 from airflow.utils.state import State
 
 from tests_common.test_utils import db
@@ -126,6 +128,17 @@ class TestCeleryExecutor:
         db.clear_db_runs()
         db.clear_db_jobs()
 
+
+def setup_dagrun_with_success_and_fail_tasks(dag_maker):
+    date = timezone.utcnow()
+    start_date = date - timedelta(days=2)
+
+    with dag_maker("test_celery_integration"):
+        BaseOperator(task_id="success", start_date=start_date)
+        BaseOperator(task_id="fail", start_date=start_date)
+
+    return dag_maker.create_dagrun(logical_date=date)
+
     @pytest.mark.flaky(reruns=5, reruns_delay=3)
     @pytest.mark.parametrize("broker_url", _prepare_test_bodies())
     @pytest.mark.parametrize(
@@ -184,7 +197,7 @@ class TestCeleryExecutor:
             executor.start()
 
             with start_worker(app=app, logfile=sys.stdout, loglevel="info"):
-                ti = workloads.TaskInstance.model_construct(
+                ti_success = workloads.TaskInstance.model_construct(
                     id=uuid7(),
                     task_id="success",
                     dag_id="id",
@@ -198,9 +211,13 @@ class TestCeleryExecutor:
                     TaskInstanceKey("id", "success", "abc", 0, -1),
                     TaskInstanceKey("id", "fail", "abc", 0, -1),
                 ]
+                dagrun = setup_dagrun_with_success_and_fail_tasks(dag_maker)
+                ti_success, ti_fail = dagrun.task_instances
                 for w in (
-                    workloads.ExecuteTask.model_construct(ti=ti),
-                    
workloads.ExecuteTask.model_construct(ti=ti.model_copy(update={"task_id": 
"fail"})),
+                    workloads.ExecuteTask.make(
+                        ti=ti_success,
+                    ),
+                    workloads.ExecuteTask.make(ti=ti_fail),
                 ):
                     executor.queue_workload(w, session=None)
 

Reply via email to