philipmargeotab opened a new issue, #23507: URL: https://github.com/apache/airflow/issues/23507
### Apache Airflow version 2.1.4 ### What happened I used a custom "Bigquery" SQL sensor lifted from here: https://github.com/apache/airflow/issues/13750, and was expecting the Sensor task to fail, but instead it is skipped, even when `soft_fail` is set to `False`. **Dag args dict** ``` "dag_id": os.path.splitext(os.path.basename(__file__))[0], "schedule_interval": "0 0 * * *", "max_active_runs": 1, "dagrun_timeout": datetime.timedelta(minutes=1), "catchup": False, # Operators "default_args": { "start_date": None, # time: UTC, uses dag_utils.suggest_start_date "depends_on_past": False, # new instance will not run if past job failed "retries": 0, "retry_delay": datetime.timedelta(seconds=30), "email_on_failure": True, "email_on_retry": False, # BigQueryOperator "write_disposition": "WRITE_TRUNCATE", "use_legacy_sql": False, "time_partitioning": {"type": "DAY"}, "priority": "BATCH", }, ``` **Imports:** ``` from airflow.utils.decorators import apply_defaults from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook from airflow.sensors.sql import BaseSensorOperator ``` ** Custom BQ sensor:** (from: https://github.com/apache/airflow/issues/13750) ``` class BigQuerySqlSensor(BaseSensorOperator): template_fields = ('sql',) template_ext = ('.sql',) @apply_defaults def __init__( self, bigquery_conn_id='[own bq conn id]', delegate_to=None, location='US', sql=None, use_legacy_sql=False, **kwargs ): self.bigquery_conn_id = bigquery_conn_id self.delegate_to = delegate_to self.location = location self.sql = sql self.use_legacy_sql = use_legacy_sql super().__init__(**kwargs) def poke(self, context): hook = BigQueryHook( bigquery_conn_id=self.bigquery_conn_id, delegate_to=self.delegate_to, location=self.location, use_legacy_sql=self.use_legacy_sql, ) connection = hook.get_conn() cursor = connection.cursor() cursor.execute(self.sql) for row in cursor.fetchall(): self.log.info("printing rows ...") row_value = row[0] self.log.info(f"sql output: {row_value}") self.log.info("rows printed.") if row_value: return True return False ``` **Task in dag context manager:** ``` sql_test = '''SELECT False;''' t_sql = BigQuerySqlSensor( task_id="sense_data", mode='reschedule', # mode='poke', soft_fail=False, sql=sql_test, ) ``` When testing, the status is SKIPPED instead of FAILED as expected. A colleague also experienced a similar issue in using ExternalTaskSensor, where the task was skipped instead of failed (soft_fail=False is the default arg) ``` ExternalTaskSensor( task_id='[task name]', poke_interval=60 * 30, external_task_id='[external task]', external_dag_id='[external task id]', execution_date_fn=lambda dt: dt + timedelta(hours=7, minutes=30), dag=dag, mode='reschedule', # soft_fail=False, ) ``` ### What you think should happen instead What should have happened is that the sensor task has status FAILED instead of SKIPPED. ### How to reproduce _No response_ ### Operating System Using Google Cloud Composer ### Versions of Apache Airflow Providers Google Cloud Composer ### Deployment Composer ### Deployment details _No response_ ### Anything else _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
