philipmargeotab opened a new issue, #23507:
URL: https://github.com/apache/airflow/issues/23507

   ### Apache Airflow version
   
   2.1.4
   
   ### What happened
   
   I used a custom "Bigquery" SQL sensor lifted from here:         
https://github.com/apache/airflow/issues/13750, and was expecting the Sensor 
task to fail, but instead it is skipped, even when `soft_fail` is set to 
`False`.
   
   **Dag args dict**
   ```
   "dag_id": os.path.splitext(os.path.basename(__file__))[0],
   "schedule_interval": "0 0 * * *",
   "max_active_runs": 1,
   "dagrun_timeout": datetime.timedelta(minutes=1),
     "catchup": False,
   # Operators
   "default_args": {
       "start_date": None,  # time: UTC, uses dag_utils.suggest_start_date
       "depends_on_past": False,  # new instance will not run if past job failed
       "retries": 0,
       "retry_delay": datetime.timedelta(seconds=30),
       "email_on_failure": True,
       "email_on_retry": False,
       # BigQueryOperator
       "write_disposition": "WRITE_TRUNCATE",
       "use_legacy_sql": False,
       "time_partitioning": {"type": "DAY"},
       "priority": "BATCH",
   },
   ```
   
   **Imports:**
   ```
   from airflow.utils.decorators import apply_defaults
   from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
   from airflow.sensors.sql import BaseSensorOperator
   ```
   
   ** Custom BQ sensor:** (from: https://github.com/apache/airflow/issues/13750)
   ```
   class BigQuerySqlSensor(BaseSensorOperator):
       template_fields = ('sql',)
       template_ext = ('.sql',)
   
       @apply_defaults
       def __init__(
           self,
           bigquery_conn_id='[own bq conn id]',
           delegate_to=None,
           location='US',
           sql=None,
           use_legacy_sql=False,
           **kwargs
       ):
   
           self.bigquery_conn_id = bigquery_conn_id
           self.delegate_to = delegate_to
           self.location = location
           self.sql = sql
           self.use_legacy_sql = use_legacy_sql
           super().__init__(**kwargs)
   
       def poke(self, context):
           hook = BigQueryHook(
               bigquery_conn_id=self.bigquery_conn_id,
               delegate_to=self.delegate_to,
               location=self.location,
               use_legacy_sql=self.use_legacy_sql,
           )
           connection = hook.get_conn()
           cursor = connection.cursor()
           cursor.execute(self.sql)
   
           for row in cursor.fetchall():
               self.log.info("printing rows ...")
               row_value = row[0]
               self.log.info(f"sql output: {row_value}")
               self.log.info("rows printed.")
   
               if row_value:
                   return True
   
           return False
   ```
   
   **Task in dag context manager:**
   ```
   sql_test = '''SELECT False;'''
   
   t_sql = BigQuerySqlSensor(
       task_id="sense_data",
       mode='reschedule',
       # mode='poke',
       soft_fail=False,
       sql=sql_test,
   )
   ```
   When testing, the status is SKIPPED instead of FAILED as expected. 
   
   
   A colleague also experienced a similar issue in using ExternalTaskSensor, 
where the task was skipped instead of failed (soft_fail=False is the default 
arg)
   ```
   ExternalTaskSensor(
       task_id='[task name]',
       poke_interval=60 * 30,
       external_task_id='[external task]',
       external_dag_id='[external task id]',
       execution_date_fn=lambda dt: dt + timedelta(hours=7, minutes=30),
       dag=dag,
       mode='reschedule',
       # soft_fail=False,
   )
   ```
   
   
   ### What you think should happen instead
   
   What should have happened is that the sensor task has status FAILED instead 
of SKIPPED.
   
   ### How to reproduce
   
   _No response_
   
   ### Operating System
   
   Using Google Cloud Composer
   
   ### Versions of Apache Airflow Providers
   
   Google Cloud Composer
   
   ### Deployment
   
   Composer
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to