n-oden opened a new issue #14319:
URL: https://github.com/apache/airflow/issues/14319


   **Apache Airflow version**: 1.10.12
   **Kubernetes version (if you are using kubernetes)** (use `kubectl 
version`): v1.16.15-gke.6000
   
   **Environment**:
   
   - **Cloud provider or hardware configuration**: Google Kubernetes Engine
   - **OS** (e.g. from /etc/os-release): "Container-Optimized OS"
   ```
   BUILD_ID=12371.1088.0
   NAME="Container-Optimized OS"
   KERNEL_COMMIT_ID=52bdab9330bdd9e50dc967f8aa850829921ca217
   GOOGLE_CRASH_ID=Lakitu
   VERSION_ID=77
   
BUG_REPORT_URL="https://cloud.google.com/container-optimized-os/docs/resources/support-policy#contact_us";
   PRETTY_NAME="Container-Optimized OS from Google"
   VERSION=77
   GOOGLE_METRICS_PRODUCT_ID=26
   HOME_URL="https://cloud.google.com/container-optimized-os/docs";
   ID=cos
   ```
   - **Kernel** (e.g. `uname -a`): `Linux 
gke-services-1-default-pool-3ef08c09-v95l 4.19.112+ #1 SMP Sat Oct 10 13:45:37 
PDT 2020 x86_64 Intel(R) Xeon(R) CPU @ 2.30GHz GenuineIntel GNU/Linux`
   - **Install tools**: helm, chart `airflow-7.16.0`
   - **Others**:
   
   **What happened**:
   
   Even after https://github.com/apache/airflow/pull/7464, we are finding that 
tasks with the `none_failed` trigger are still being skipped when their direct 
upstream is skipped.
   
   In a simple three-task DAG where, the first step is the 
GoogleCloudStoragePrefixSensor, followed by a processing task and ending with a 
heartbeat check operator:
   
   ```
   check_for_late_data >> run_statekeeper >> passive_check
   ```
   The passive_check task is configured with NONE_FAILED:
   ```
   passive_check = PassiveCheckOperator(
       task_id="passive_check",
       dag=dag,
       trigger_rule=TriggerRule.NONE_FAILED)
   ```
   The GCS sensor operator exits thusly if it finds no keys:
   ```
   [2021-02-12 00:32:18,130] {taskinstance.py:1025} INFO - Marking task as 
SKIPPED.dag_id=pipeline_v1, task_id=check_for_late_data, 
execution_date=20210211T003000, start_date=20210212T003017, end_date=
   [2021-02-12 00:32:18,130] {taskinstance.py:1070} INFO - Marking task as 
SUCCESS.dag_id=pipeline_v1, task_id=check_for_late_data, 
execution_date=20210211T003000, start_date=20210212T003017, 
end_date=20210212T003218
   ```
   
   The intermediate step is also skipped as is intended (it uses the default 
trigger rule). But the final step is skipped as well, which should not happen:
   
   
![image](https://user-images.githubusercontent.com/70606471/108516838-af7f1680-7294-11eb-82f3-828d2d39dcf2.png)
   
   The same thing happens if we put a dummy shared start task upstream:
   
![image](https://user-images.githubusercontent.com/70606471/108516875-bb6ad880-7294-11eb-90d3-6625cc7d82ea.png)
   
   
   **What you expected to happen**:
   
   The "passive_check" task should have run, because its trigger is 
`none_failed` and no tasks upstream of it have failed, they have only been 
skipped.
   
   As this was allegedly fixed in https://github.com/apache/airflow/pull/7464, 
I suspect that either something has regressed since 1.10.10 or there is a 
corner case not yet caught.
   
   **How to reproduce it**:
   
   The following DAG reproduces the issue (presuming that you have a working 
Google Cloud Platform connection and a GCS bucket which the DAG can read):
   
   ```
   import datetime
   from airflow.models import DAG
   from airflow.operators.dummy_operator import DummyOperator
   from airflow.contrib.sensors.gcs_sensor import GoogleCloudStoragePrefixSensor
   
   SOURCE_FILE_BUCKET =  "GCS_BUCKET_NAME"  # replace this with a GCS bucket 
under your control
   SOURCE_FILE_PREFIX = "nofileshere/"  # make sure there are zero keys under 
this prefix
   
   with DAG(
       dag_id="simple_skip",
       start_date=datetime.datetime(2021, 2, 19),
       schedule_interval=None,
   ) as dag:
       find_no_data = GoogleCloudStoragePrefixSensor(
           dag=dag,
           task_id="find_no_data",
           soft_fail=True,
           timeout=60 * 2,
           bucket=SOURCE_FILE_BUCKET,
           prefix=SOURCE_FILE_PREFIX,
       )
       step_1 = DummyOperator(task_id="step_1", dag=dag)
       step_2 = DummyOperator(task_id="step_2", dag=dag, 
trigger_rule="none_failed")
       find_no_data >> step_1 >> step_2
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to