dabla commented on PR #55068:
URL: https://github.com/apache/airflow/pull/55068#issuecomment-3527722124
> Thanks for the PR! I just tested it with the following command and Dag to
verify the fix
>
> ```shell
> breeze start-airflow --backend postgres --db-reset --use-airflow-version
55068
> ```
>
> Example Dags
>
> ```python
> from typing import Any
>
> from datetime import timedelta
> from airflow.providers.common.compat.sdk import BaseSensorOperator
> from airflow.sdk import DAG, task, Context
> from airflow.triggers.base import StartTriggerArgs
>
>
> class WaitHoursSensor(BaseSensorOperator):
> start_trigger_args = StartTriggerArgs(
>
trigger_cls="airflow.providers.standard.triggers.temporal.TimeDeltaTrigger",
> trigger_kwargs={"moment": timedelta(hours=1)},
> next_method="execute_complete",
> next_kwargs=None,
> timeout=None,
> )
> start_from_trigger = True
>
> def __init__(
> self,
> *args: list[Any],
> trigger_kwargs: dict[str, Any] | None,
> start_from_trigger: bool,
> **kwargs: dict[str, Any],
> ) -> None:
> # This whole method will be skipped during dynamic task mapping.
>
> super().__init__(*args, **kwargs)
> self.start_trigger_args.trigger_kwargs = {
> "moment": timedelta(hours=trigger_kwargs["hours"])
> }
> self.start_from_trigger = start_from_trigger
>
> def poke(self, context) -> bool:
> # need to override poke from parent class, return False to mock
not ready
> return False
>
> def execute_complete(self, context: Context, event: dict[str, Any] |
None = None) -> None:
> # We have no more work to do here. Mark as complete.
> return
>
> with DAG(dag_id="start_from_triggerer", schedule=None):
>
> @task
> def items_to_process():
> return [{"hours": i+1} for i in range(10)]
>
> WaitHoursSensor.partial(task_id="wait_ready",
start_from_trigger=True).expand(
> trigger_kwargs=items_to_process(),
> )
> ```
>
> However, it resulted in the following error:
>
> ```
> ::group::Log message source
detailssources=["/root/airflow/logs/dag_id=large_mapped_deferrable/run_id=manual__2025-11-13T10:32:49+00:00/task_id=wait_ready/map_index=1/attempt=1.log"]
> ::endgroup::
> [2025-11-13T10:32:54.420552Z] INFO - DAG bundles loaded:
dags-foldersource=airflow.dag_processing.bundles.manager.DagBundlesManager
loc=manager.py:209
> [2025-11-13T10:32:54.421017Z] INFO - Filling up the DagBag from
/files/dags/thousand_trigger.pysource=airflow.dag_processing.dagbag.DagBag
loc=dagbag.py:629
> [2025-11-13T10:32:54.494551Z] ERROR - Task failed with
exceptionsource=task loc=task_runner.py:1034 TypeError: missing keyword
argument 'start_from_trigger'
> File
/usr/python/lib/python3.10/site-packages/airflow/sdk/execution_time/task_runner.py,
line 944 in run
> File
/usr/python/lib/python3.10/site-packages/airflow/sdk/execution_time/task_runner.py,
line 820 in _prepare
> File
/usr/python/lib/python3.10/site-packages/airflow/sdk/execution_time/task_runner.py,
line 293 in render_templates
> File
/usr/python/lib/python3.10/site-packages/airflow/sdk/definitions/mappedoperator.py,
line 832 in render_template_fields
> File
/usr/python/lib/python3.10/site-packages/airflow/sdk/definitions/mappedoperator.py,
line 775 in unmap
> File
/usr/python/lib/python3.10/site-packages/airflow/sdk/bases/operator.py, line
504 in apply_defaults
> ```
Thx @jason810496 for testing, I fixed it, it should work now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]