This is an automated email from the ASF dual-hosted git repository. pierrejeambrun pushed a commit to branch v2-5-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 97912c84ddef640bac2061e3838ff1dde3e66a28 Author: Srinivasa Surabathini <[email protected]> AuthorDate: Fri Jan 20 18:15:31 2023 +0000 logging poke info when external dag is not none and task_id and task_ids are none (#28097) (cherry picked from commit 760c52949ac41ffa7a2357aa1af0cdca163ddac8) --- airflow/sensors/external_task.py | 28 +++++++++++++---- tests/sensors/test_external_task_sensor.py | 48 ++++++++++++++++++++++++++++-- 2 files changed, 68 insertions(+), 8 deletions(-) diff --git a/airflow/sensors/external_task.py b/airflow/sensors/external_task.py index 967bb5a276..d1c5443b92 100644 --- a/airflow/sensors/external_task.py +++ b/airflow/sensors/external_task.py @@ -143,18 +143,27 @@ class ExternalTaskSensor(BaseSensorOperator): if external_task_id is not None and external_task_ids is not None: raise ValueError( "Only one of `external_task_id` or `external_task_ids` may " - "be provided to ExternalTaskSensor; not both." + "be provided to ExternalTaskSensor; " + "use external_task_id or external_task_ids or external_task_group_id." ) - if external_task_id is not None: - external_task_ids = [external_task_id] + if external_task_group_id is not None and external_task_id is not None: + raise ValueError( + "Only one of `external_task_group_id` or `external_task_id` may " + "be provided to ExternalTaskSensor; " + "use external_task_id or external_task_ids or external_task_group_id." + ) - if external_task_group_id and external_task_ids: + if external_task_group_id is not None and external_task_ids is not None: raise ValueError( - "Values for `external_task_group_id` and `external_task_id` or `external_task_ids` " - "can't be set at the same time" + "Only one of `external_task_group_id` or `external_task_ids` may " + "be provided to ExternalTaskSensor; " + "use external_task_id or external_task_ids or external_task_group_id." ) + if external_task_id is not None: + external_task_ids = [external_task_id] + if external_task_ids or external_task_group_id: if not total_states <= set(State.task_states): raise ValueError( @@ -217,6 +226,13 @@ class ExternalTaskSensor(BaseSensorOperator): serialized_dttm_filter, ) + if self.external_dag_id and not self.external_task_group_id and not self.external_task_ids: + self.log.info( + "Poking for DAG '%s' on %s ... ", + self.external_dag_id, + serialized_dttm_filter, + ) + # In poke mode this will check dag existence only once if self.check_existence and not self._has_checked_existence: self._check_for_existence(session=session) diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py index b594210b13..1b5a5032cf 100644 --- a/tests/sensors/test_external_task_sensor.py +++ b/tests/sensors/test_external_task_sensor.py @@ -145,18 +145,49 @@ class TestExternalTaskSensor(unittest.TestCase): ) op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + def test_raise_with_external_task_sensor_task_id_and_task_ids(self): + with pytest.raises(ValueError) as ctx: + ExternalTaskSensor( + task_id="test_external_task_sensor_task_id_with_task_ids_failed_status", + external_dag_id=TEST_DAG_ID, + external_task_id=TEST_TASK_ID, + external_task_ids=TEST_TASK_ID, + dag=self.dag, + ) + assert ( + str(ctx.value) == "Only one of `external_task_id` or `external_task_ids` may " + "be provided to ExternalTaskSensor; " + "use external_task_id or external_task_ids or external_task_group_id." + ) + def test_raise_with_external_task_sensor_task_group_and_task_id(self): with pytest.raises(ValueError) as ctx: ExternalTaskSensor( task_id="test_external_task_sensor_task_group_with_task_id_failed_status", external_dag_id=TEST_DAG_ID, + external_task_id=TEST_TASK_ID, + external_task_group_id=TEST_TASK_GROUP_ID, + dag=self.dag, + ) + assert ( + str(ctx.value) == "Only one of `external_task_group_id` or `external_task_id` may " + "be provided to ExternalTaskSensor; " + "use external_task_id or external_task_ids or external_task_group_id." + ) + + def test_raise_with_external_task_sensor_task_group_and_task_ids(self): + with pytest.raises(ValueError) as ctx: + ExternalTaskSensor( + task_id="test_external_task_sensor_task_group_with_task_ids_failed_status", + external_dag_id=TEST_DAG_ID, external_task_ids=TEST_TASK_ID, external_task_group_id=TEST_TASK_GROUP_ID, dag=self.dag, ) assert ( - str(ctx.value) == "Values for `external_task_group_id` and `external_task_id` or " - "`external_task_ids` can't be set at the same time" + str(ctx.value) == "Only one of `external_task_group_id` or `external_task_ids` may " + "be provided to ExternalTaskSensor; " + "use external_task_id or external_task_ids or external_task_group_id." ) # by default i.e. check_existence=False, if task_group doesn't exist, the sensor will run till timeout, @@ -354,6 +385,19 @@ class TestExternalTaskSensor(unittest.TestCase): ) op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + def test_external_dag_sensor_log(self, caplog): + other_dag = DAG("other_dag", default_args=self.args, end_date=DEFAULT_DATE, schedule="@once") + other_dag.create_dagrun( + run_id="test", start_date=DEFAULT_DATE, execution_date=DEFAULT_DATE, state=State.SUCCESS + ) + op = ExternalTaskSensor( + task_id="test_external_dag_sensor_check", + external_dag_id="other_dag", + dag=self.dag, + ) + op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + assert (f"Poking for DAG 'other_dag' on {DEFAULT_DATE.isoformat()} ... ") in caplog.messages + def test_external_dag_sensor_soft_fail_as_skipped(self): other_dag = DAG("other_dag", default_args=self.args, end_date=DEFAULT_DATE, schedule="@once") other_dag.create_dagrun(
