RenGeng opened a new issue #21190:
URL: https://github.com/apache/airflow/issues/21190


   ### Apache Airflow Provider(s)
   
   google
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-google==6.0.0
   
   ### Apache Airflow version
   
   2.2.1
   
   ### Operating System
   
   Debian GNU/Linux 10 (buster)
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   We deploy airflow on GKE with Kubernetes executor, here's GKE version: 
   `Server Version: version.Info{Major:"1", Minor:"19+", 
GitVersion:"v1.19.15-gke.1801", 
GitCommit:"12532d9bc117118634f870e6083a51837b34a3e8", GitTreeState:"clean", 
BuildDate:"2021-10-21T21:27:39Z", GoVersion:"go1.15.15b5", Compiler:"gc", 
Platform:"linux/amd64"}`
   
   ### What happened
   
   I use SFTPToGCSOperator with the parameter `move_objectbool=True`, so the 
source file will be deleted in the ftp server, but sometimes I saw two 
different jobs launched at the same time in one pod, so one of them will delete 
de file and the other will fail because the file doesn't exist.
   
   ### What you expected to happen
   
   Only launch one job in the pod.
   
   ### How to reproduce
   
   Use SFTPToGCSOperator, but this problem is not recurrent.
   
   ### Anything else
   
   
   ```
   
--------------------------------------------------------------------------------
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1242} INFO - Starting attempt 1 
of 4
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1243} INFO - 
   
--------------------------------------------------------------------------------
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1262} INFO - Executing 
<Task(SFTPToGCSOperator): move_task> on 2022-01-26 03:00:00+00:00
   [2022-01-27, 03:09:32 UTC] {standard_task_runner.py:52} INFO - Started 
process 55 to run task
   [2022-01-27, 03:09:32 UTC] {standard_task_runner.py:76} INFO - Running: 
['airflow', 'tasks', 'run', 'move_dag', 'move_task', 
'scheduled__2022-01-26T03:00:00+00:00', '--job-id', '735927', '--raw', 
'--subdir', 'DAGS_FOLDER/move_dag.py', '--cfg-path', '/tmp/tmp1pjt_ykp', 
'--error-file', '/tmp/tmp2lkxj9im']
   [2022-01-27, 03:09:32 UTC] {standard_task_runner.py:77} INFO - Job 735927: 
Subtask move_task
   [2022-01-27, 03:09:32 UTC] {cli_action_loggers.py:66} DEBUG - Calling 
callbacks: [<function default_action_log at 0x7fde5b957f70>]
   [2022-01-27, 03:09:33 UTC] {settings.py:210} DEBUG - Setting up DB 
connection pool (PID 55)
   [2022-01-27, 03:09:33 UTC] {settings.py:267} DEBUG - 
settings.prepare_engine_args(): Using NullPool
   [2022-01-27, 03:09:33 UTC] {logging_mixin.py:109} INFO - Running 
<TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 
[running]> on host movedag.39daa8eb5a
   [2022-01-27, 03:09:33 UTC] {taskinstance.py:720} DEBUG - Refreshing 
TaskInstance <TaskInstance: move_dag.move_task 
scheduled__2022-01-26T03:00:00+00:00 [running]> from DB
   [2022-01-27, 03:09:33 UTC] {taskinstance.py:761} DEBUG - Refreshed 
TaskInstance <TaskInstance: move_dag.move_task 
scheduled__2022-01-26T03:00:00+00:00 [running]>
   [2022-01-27, 03:09:33 UTC] {taskinstance.py:790} DEBUG - Clearing XCom data
   [2022-01-27, 03:09:33 UTC] {taskinstance.py:797} DEBUG - XCom data cleared
   [2022-01-27, 03:09:33 UTC] {logging_mixin.py:109} WARNING - 
/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py:1941
 DeprecationWarning: Accessing 'next_execution_date' from the template is 
deprecated and will be removed in a future version. Please use 
'data_interval_end' instead.
   [2022-01-27, 03:09:33 UTC] {taskinstance.py:1427} INFO - Exporting the 
following env vars:
   AIRFLOW_CTX_DAG_OWNER=airflow
   AIRFLOW_CTX_DAG_ID=move_dag
   AIRFLOW_CTX_TASK_ID=move_task
   AIRFLOW_CTX_EXECUTION_DATE=2022-01-26T03:00:00+00:00
   AIRFLOW_CTX_DAG_RUN_ID=scheduled__2022-01-26T03:00:00+00:00
   [2022-01-27, 03:09:33 UTC] {__init__.py:146} DEBUG - Preparing lineage 
inlets and outlets
   [2022-01-27, 03:09:33 UTC] {__init__.py:190} DEBUG - inlets: [], outlets: []
   [2022-01-27, 03:09:33 UTC] {base.py:70} INFO - Using connection to: id: 
ftp_dl_talend. Host: 10.108.1.45, Port: 22, Schema: , Login: talend, Password: 
None, extra: {'key_file': '/home/airflow/.ssh/id_rsa', 'no_host_key_check': 
True}
   [2022-01-27, 03:09:33 UTC] {base.py:70} INFO - Using connection to: id: 
ftp_dl_talend. Host: 10.108.1.45, Port: 22, Schema: , Login: talend, Password: 
None, extra: {'key_file': '/home/airflow/.ssh/id_rsa', 'no_host_key_check': 
True}
   [2022-01-27, 03:09:33 UTC] {logging_mixin.py:109} WARNING - 
/home/airflow/.local/lib/python3.9/site-packages/pysftp/__init__.py:61 
UserWarning: Failed to load HostKeys from /home/airflow/.ssh/known_hosts.  You 
will need to explicitly load HostKeys (cnopts.hostkeys.load(filename)) or 
disableHostKey checking (cnopts.hostkeys = None).
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - starting thread 
(client mode): 0x3dab54f0
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Local 
version/idstring: SSH-2.0-paramiko_2.8.0
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Remote 
version/idstring: SSH-2.0-OpenSSH_7.4
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} INFO - Connected (version 
2.0, client OpenSSH_7.4)
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - kex 
algos:['curve25519-sha256', '[email protected]', 
'ecdh-sha2-nistp256', 'ecdh-sha2-nistp384', 'ecdh-sha2-nistp521', 
'diffie-hellman-group-exchange-sha256', 'diffie-hellman-group16-sha512', 
'diffie-hellman-group18-sha512', 'diffie-hellman-group-exchange-sha1', 
'diffie-hellman-group14-sha256', 'diffie-hellman-group14-sha1', 
'diffie-hellman-group1-sha1'] server key:['ssh-rsa', 'rsa-sha2-512', 
'rsa-sha2-256', 'ecdsa-sha2-nistp256', 'ssh-ed25519'] client 
encrypt:['[email protected]', 'aes128-ctr', 'aes192-ctr', 
'aes256-ctr', '[email protected]', '[email protected]', 'aes128-cbc', 
'aes192-cbc', 'aes256-cbc', 'blowfish-cbc', 'cast128-cbc', '3des-cbc'] server 
encrypt:['[email protected]', 'aes128-ctr', 'aes192-ctr', 
'aes256-ctr', '[email protected]', '[email protected]', 'aes128-cbc', 
'aes192-cbc', 'aes256-cbc', 'blowfish-cbc', 'cast128-cbc', '3des-cbc'] client 
mac:['umac-64-et
 [email protected]', '[email protected]', 
'[email protected]', '[email protected]', 
'[email protected]', '[email protected]', '[email protected]', 
'hmac-sha2-256', 'hmac-sha2-512', 'hmac-sha1'] server 
mac:['[email protected]', '[email protected]', 
'[email protected]', '[email protected]', 
'[email protected]', '[email protected]', '[email protected]', 
'hmac-sha2-256', 'hmac-sha2-512', 'hmac-sha1'] client compress:['none', 
'[email protected]'] server compress:['none', '[email protected]'] client 
lang:[''] server lang:[''] kex follows?False
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Kex agreed: 
[email protected]
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - HostKey agreed: 
ssh-ed25519
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Cipher agreed: 
aes128-ctr
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - MAC agreed: 
hmac-sha2-256
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Compression agreed: 
[email protected]
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - kex engine 
KexCurve25519 specified hash_algo <built-in function openssl_sha256>
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Switch to new keys ...
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Attempting public-key 
auth...
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - userauth is OK
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} INFO - Authentication 
(publickey) successful!
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Switching on outbound 
compression ...
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Switching on inbound 
compression ...
   [2022-01-27, 03:09:33 UTC] {channel.py:1212} DEBUG - [chan 0] Max packet in: 
32768 bytes
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Received global 
request "[email protected]"
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Rejecting 
"[email protected]" global request from server.
   [2022-01-27, 03:09:33 UTC] {channel.py:1212} DEBUG - [chan 0] Max packet 
out: 32768 bytes
   [2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Secsh channel 0 
opened.
   [2022-01-27, 03:09:33 UTC] {channel.py:1212} DEBUG - [chan 0] Sesch channel 
0 request ok
   [2022-01-27, 03:09:33 UTC] {sftp.py:158} INFO - [chan 0] Opened sftp 
connection (server version 3)
   [2022-01-27, 03:09:33 UTC] {sftp.py:158} DEBUG - [chan 0] 
listdir(b'input/PROD/polo/BKG_OFFER')
   [2022-01-27, 03:09:33 UTC] {sftp.py:158} DEBUG - [chan 0] 
stat(b'input_20210630.csv')
   [2022-01-27, 03:09:33 UTC] {sftp.py:158} DEBUG - [chan 0] 
stat(b'input_20210701.csv')
   [2022-01-27, 03:09:33 UTC] {sftp.py:158} DEBUG - [chan 0] 
stat(b'input_20210702.csv')
   .....
   [2022-01-27, 03:09:34 UTC] {sftp.py:158} DEBUG - [chan 0] 
stat(b'input_20220117.csv')
   [2022-01-27, 03:09:34 UTC] {sftp.py:158} DEBUG - [chan 0] 
stat(b'input_20220127.csv')
   [2022-01-27, 03:09:34 UTC] {sftp_to_gcs.py:175} INFO - Executing copy of 
input_20220127.csv to gs://XXXXXX/input_20220127.csv
   [2022-01-27, 03:09:34 UTC] {sftp.py:158} DEBUG - [chan 0] 
stat(b'input_20220127.csv')
   [2022-01-27, 03:09:34 UTC] {sftp.py:158} DEBUG - [chan 0] 
open(b'input_20220127.csv 'rb')
   [2022-01-27, 03:09:34 UTC] {sftp.py:158} DEBUG - [chan 0] 
open(b'input_20220127, 'rb') 000000
   [2022-01-27, 03:09:34 UTC] {sftp.py:158} DEBUG - [chan 0] close(00000000)
   [2022-01-27, 03:09:34 UTC] {credentials_provider.py:295} INFO - Getting 
connection using `google.auth.default()` since no key file is defined for hook.
   [2022-01-27, 03:09:34 UTC] {_default.py:206} DEBUG - Checking None for 
explicit credentials as part of auth process...
   [2022-01-27, 03:09:34 UTC] {_default.py:181} DEBUG - Checking Cloud SDK 
credentials as part of auth process...
   [2022-01-27, 03:09:34 UTC] {_default.py:187} DEBUG - Cloud SDK credentials 
not found on disk; not using them
   [2022-01-27, 03:09:34 UTC] {_http_client.py:104} DEBUG - Making request: GET 
http://169.254.169.254
   [2022-01-27, 03:09:34 UTC] {_http_client.py:104} DEBUG - Making request: GET 
http://metadata.google.internal/computeMetadata/v1/project/project-id
   [2022-01-27, 03:09:34 UTC] {requests.py:182} DEBUG - Making request: GET 
http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/?recursive=true
   [2022-01-27, 03:09:34 UTC] {requests.py:182} DEBUG - Making request: GET 
http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/<redacted>
   [2022-01-27, 03:09:34 UTC] {gcs.py:485} INFO - File /tmp/tmpv52p77bg 
uploaded to input_20220127.csv in XXXX bucket
   [2022-01-27, 03:09:34 UTC] {sftp_to_gcs.py:193} INFO - Executing delete of 
input_20220127.csv
   [2022-01-27, 03:09:34 UTC] {sftp.py:158} DEBUG - [chan 0] 
remove(b'input_20220127.csv')
   [2022-01-27, 03:09:34 UTC] {sftp.py:158} INFO - [chan 0] sftp session closed.
   [2022-01-27, 03:09:34 UTC] {channel.py:1212} DEBUG - [chan 0] EOF sent (0)
   [2022-01-27, 03:09:34 UTC] {__init__.py:107} DEBUG - Lineage called with 
inlets: [], outlets: []
   [2022-01-27, 03:09:34 UTC] {taskinstance.py:720} DEBUG - Refreshing 
TaskInstance <TaskInstance: move_dag.move_task 
scheduled__2022-01-26T03:00:00+00:00 [running]> from DB
   [2022-01-27, 03:09:34 UTC] {transport.py:1819} DEBUG - EOF in transport 
thread
   [2022-01-27, 03:09:35 UTC] {taskinstance.py:761} DEBUG - Refreshed 
TaskInstance <TaskInstance: move_dag.move_task 
scheduled__2022-01-26T03:00:00+00:00 [running]>
   [2022-01-27, 03:09:35 UTC] {taskinstance.py:1286} DEBUG - Clearing 
next_method and next_kwargs.
   [2022-01-27, 03:09:35 UTC] {taskinstance.py:1270} INFO - Marking task as 
SUCCESS. dag_id=move_dag, task_id=move_task, execution_date=20220126T030000, 
start_date=20220127T030932, end_date=20220127T030935
   [2022-01-27, 03:09:35 UTC] {taskinstance.py:2224} DEBUG - Task Duration set 
to 3.168044
   [2022-01-27, 03:09:35 UTC] {cli_action_loggers.py:84} DEBUG - Calling 
callbacks: []
   [2022-01-27, 03:09:35 UTC] {local_task_job.py:154} INFO - Task exited with 
return code 0
   [2022-01-27, 03:09:35 UTC] {taskinstance.py:720} DEBUG - Refreshing 
TaskInstance <TaskInstance: move_dag.move_task 
scheduled__2022-01-26T03:00:00+00:00 [running]> from DB
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:761} DEBUG - Refreshed 
TaskInstance <TaskInstance: move_dag.move_task 
scheduled__2022-01-26T03:00:00+00:00 [up_for_retry]>
   [2022-01-27, 03:09:36 UTC] {dagrun.py:588} DEBUG - number of tis tasks for 
<DagRun move_dag @ 2022-01-26 03:00:00+00:00: 
scheduled__2022-01-26T03:00:00+00:00, externally triggered: False>: 9 task(s)
   [2022-01-27, 03:09:36 UTC] {dagrun.py:603} DEBUG - number of scheduleable 
tasks for <DagRun move_dag @ 2022-01-26 03:00:00+00:00: 
scheduled__2022-01-26T03:00:00+00:00, externally triggered: False>: 9 task(s)
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [up_for_retry]> 
dependency 'Previous Dagrun State' PASSED: True, The task did not have 
depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [up_for_retry]> 
dependency 'Not In Retry Period' PASSED: False, Task is not ready for retry yet 
but will be retried automatically. Current date is 
2022-01-27T03:09:36.126713+00:00 and task will be retried at 
2022-01-27T03:11:15.404715+00:00.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not 
met for <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 
[up_for_retry]>, dependency 'Not In Retry Period' FAILED: Task is not ready for 
retry yet but will be retried automatically. Current date is 
2022-01-27T03:09:36.126713+00:00 and task will be retried at 
2022-01-27T03:11:15.404715+00:00.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [up_for_retry]> 
dependency 'Trigger Rule' PASSED: True, The task instance did not have any 
upstream tasks.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.deduplicate_unified_delta_bkg_offer 
scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Previous Dagrun State' 
PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.deduplicate_unified_delta_bkg_offer 
scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Not In Retry Period' 
PASSED: True, The task instance was not marked for retrying.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.deduplicate_unified_delta_bkg_offer 
scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Trigger Rule' PASSED: 
False, Task's trigger rule 'all_success' requires all upstream tasks to have 
succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, 
upstream_task_ids={'load_in_bq_unified_delta_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not 
met for <TaskInstance: move_dag.deduplicate_unified_delta_bkg_offer 
scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: 
Task's trigger rule 'all_success' requires all upstream tasks to have 
succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, 
upstream_task_ids={'load_in_bq_unified_delta_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.load_in_bq_unified_delta_bkg_offer 
scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Previous Dagrun State' 
PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.load_in_bq_unified_delta_bkg_offer 
scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Not In Retry Period' 
PASSED: True, The task instance was not marked for retrying.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.load_in_bq_unified_delta_bkg_offer 
scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Trigger Rule' PASSED: 
False, Task's trigger rule 'all_success' requires all upstream tasks to have 
succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, 
upstream_task_ids={'pseudo_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not 
met for <TaskInstance: move_dag.load_in_bq_unified_delta_bkg_offer 
scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: 
Task's trigger rule 'all_success' requires all upstream tasks to have 
succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, 
upstream_task_ids={'pseudo_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.deduplicate_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 
[None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have 
depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.deduplicate_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 
[None]> dependency 'Not In Retry Period' PASSED: True, The task instance was 
not marked for retrying.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.deduplicate_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 
[None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 
'all_success' requires all upstream tasks to have succeeded, but found 1 
non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 
0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, 
upstream_task_ids={'load_in_bq_work_delta_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not 
met for <TaskInstance: move_dag.deduplicate_work_delta_bkg_offer 
scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: 
Task's trigger rule 'all_success' requires all upstream tasks to have 
succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, 
upstream_task_ids={'load_in_bq_work_delta_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.load_in_bq_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 
[None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have 
depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.load_in_bq_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 
[None]> dependency 'Not In Retry Period' PASSED: True, The task instance was 
not marked for retrying.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.load_in_bq_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 
[None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 
'all_success' requires all upstream tasks to have succeeded, but found 1 
non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 
0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, 
upstream_task_ids={'dqm_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not 
met for <TaskInstance: move_dag.load_in_bq_work_delta_bkg_offer 
scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: 
Task's trigger rule 'all_success' requires all upstream tasks to have 
succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, 
upstream_task_ids={'dqm_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.pseudo_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> 
dependency 'Previous Dagrun State' PASSED: True, The task did not have 
depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.pseudo_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> 
dependency 'Not In Retry Period' PASSED: True, The task instance was not marked 
for retrying.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.pseudo_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> 
dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' 
requires all upstream tasks to have succeeded, but found 1 non-success(es). 
upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 
'upstream_failed': 0, 'done': 0}, upstream_task_ids={'dqm_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not 
met for <TaskInstance: move_dag.pseudo_bkg_offer 
scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: 
Task's trigger rule 'all_success' requires all upstream tasks to have 
succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, 
upstream_task_ids={'dqm_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.dqm_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 
'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.dqm_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 
'Not In Retry Period' PASSED: True, The task instance was not marked for 
retrying.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.dqm_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 
'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all 
upstream tasks to have succeeded, but found 1 non-success(es). 
upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 
'upstream_failed': 0, 'done': 0}, upstream_task_ids={'dispatcher_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not 
met for <TaskInstance: move_dag.dqm_bkg_offer 
scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: 
Task's trigger rule 'all_success' requires all upstream tasks to have 
succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, 
upstream_task_ids={'dispatcher_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.dispatcher_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> 
dependency 'Previous Dagrun State' PASSED: True, The task did not have 
depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.dispatcher_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> 
dependency 'Not In Retry Period' PASSED: True, The task instance was not marked 
for retrying.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.dispatcher_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> 
dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' 
requires all upstream tasks to have succeeded, but found 1 non-success(es). 
upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 
'upstream_failed': 0, 'done': 0}, upstream_task_ids={'move_task'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not 
met for <TaskInstance: move_dag.dispatcher_bkg_offer 
scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: 
Task's trigger rule 'all_success' requires all upstream tasks to have 
succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, 
upstream_task_ids={'move_task'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.archive_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> 
dependency 'Previous Dagrun State' PASSED: True, The task did not have 
depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.archive_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> 
dependency 'Not In Retry Period' PASSED: True, The task instance was not marked 
for retrying.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.archive_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> 
dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' 
requires all upstream tasks to have succeeded, but found 1 non-success(es). 
upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 
'upstream_failed': 0, 'done': 0}, upstream_task_ids={'move_task'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not 
met for <TaskInstance: move_dag.archive_bkg_offer 
scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: 
Task's trigger rule 'all_success' requires all upstream tasks to have 
succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, 
upstream_task_ids={'move_task'}
   [2022-01-27, 03:09:36 UTC] {local_task_job.py:264} INFO - 0 downstream tasks 
scheduled from follow-on schedule check
   
   [2022-01-27, 03:09:31 UTC] {base_task_runner.py:63} DEBUG - Planning to run 
as the  user
   [2022-01-27, 03:09:31 UTC] {taskinstance.py:720} DEBUG - Refreshing 
TaskInstance <TaskInstance: move_dag.move_task 
scheduled__2022-01-26T03:00:00+00:00 [queued]> from DB
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:761} DEBUG - Refreshed 
TaskInstance <TaskInstance: move_dag.move_task 
scheduled__2022-01-26T03:00:00+00:00 [queued]>
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]> dependency 
'Task Instance Not Running' PASSED: True, Task is not in running state.
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]> dependency 
'Not In Retry Period' PASSED: True, The task instance was not marked for 
retrying.
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]> dependency 
'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]> dependency 
'Task Instance State' PASSED: True, Task state queued was valid.
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]> dependency 
'Trigger Rule' PASSED: True, The task instance did not have any upstream tasks.
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1035} INFO - Dependencies all 
met for <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 
[queued]>
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]> dependency 
'Task Concurrency' PASSED: True, Task concurrency is not set.
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]> dependency 
'Pool Slots Available' PASSED: True, ('There are enough open slots in %s to 
execute the task', 'default_pool')
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]> dependency 
'Not In Retry Period' PASSED: True, The task instance was not marked for 
retrying.
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]> dependency 
'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]> dependency 
'Trigger Rule' PASSED: True, The task instance did not have any upstream tasks.
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1035} INFO - Dependencies all 
met for <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 
[queued]>
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1241} INFO - 
   
--------------------------------------------------------------------------------
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1242} INFO - Starting attempt 1 
of 4
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1243} INFO - 
   
--------------------------------------------------------------------------------
   [2022-01-27, 03:09:32 UTC] {taskinstance.py:1262} INFO - Executing 
<Task(SFTPToGCSOperator): move_task> on 2022-01-26 03:00:00+00:00
   [2022-01-27, 03:09:32 UTC] {standard_task_runner.py:52} INFO - Started 
process 55 to run task
   [2022-01-27, 03:09:32 UTC] {standard_task_runner.py:76} INFO - Running: 
['airflow', 'tasks', 'run', 'move_dag', 'move_task', 
'scheduled__2022-01-26T03:00:00+00:00', '--job-id', '735923', '--raw', 
'--subdir', 'DAGS_FOLDER/move_dag.py', '--cfg-path', '/tmp/tmp_kvghyhi', 
'--error-file', '/tmp/tmpxlr36n7u']
   [2022-01-27, 03:09:32 UTC] {standard_task_runner.py:77} INFO - Job 735923: 
Subtask move_task
   [2022-01-27, 03:09:32 UTC] {cli_action_loggers.py:66} DEBUG - Calling 
callbacks: [<function default_action_log at 0x7fc797444f70>]
   [2022-01-27, 03:09:32 UTC] {settings.py:210} DEBUG - Setting up DB 
connection pool (PID 55)
   [2022-01-27, 03:09:32 UTC] {settings.py:267} DEBUG - 
settings.prepare_engine_args(): Using NullPool
   [2022-01-27, 03:09:33 UTC] {logging_mixin.py:109} INFO - Running 
<TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 
[running]> on host movedag.68e76697f9
   [2022-01-27, 03:09:33 UTC] {taskinstance.py:720} DEBUG - Refreshing 
TaskInstance <TaskInstance: move_dag.move_task 
scheduled__2022-01-26T03:00:00+00:00 [running]> from DB
   [2022-01-27, 03:09:33 UTC] {taskinstance.py:761} DEBUG - Refreshed 
TaskInstance <TaskInstance: move_dag.move_task 
scheduled__2022-01-26T03:00:00+00:00 [running]>
   [2022-01-27, 03:09:33 UTC] {taskinstance.py:790} DEBUG - Clearing XCom data
   [2022-01-27, 03:09:33 UTC] {taskinstance.py:797} DEBUG - XCom data cleared
   [2022-01-27, 03:09:33 UTC] {logging_mixin.py:109} WARNING - 
/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py:1941
 DeprecationWarning: Accessing 'next_execution_date' from the template is 
deprecated and will be removed in a future version. Please use 
'data_interval_end' instead.
   [2022-01-27, 03:09:33 UTC] {taskinstance.py:1427} INFO - Exporting the 
following env vars:
   AIRFLOW_CTX_DAG_OWNER=airflow
   AIRFLOW_CTX_DAG_ID=move_dag
   AIRFLOW_CTX_TASK_ID=move_task
   AIRFLOW_CTX_EXECUTION_DATE=2022-01-26T03:00:00+00:00
   AIRFLOW_CTX_DAG_RUN_ID=scheduled__2022-01-26T03:00:00+00:00
   .......
   SAME AS BEFORE
   .......
   [2022-01-27, 03:09:34 UTC] {sftp_to_gcs.py:193} INFO - Executing delete of 
input_20220127.csv
   [2022-01-27, 03:09:34 UTC] {sftp.py:158} DEBUG - [chan 0] 
remove(b'input_20220127.csv')
   [2022-01-27, 03:09:34 UTC] {taskinstance.py:1703} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py",
 line 1332, in _run_raw_task
       self._execute_task_with_callbacks(context)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py",
 line 1458, in _execute_task_with_callbacks
       result = self._execute_task(context, self.task)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py",
 line 1514, in _execute_task
       result = execute_callable(context=context)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/google/cloud/transfers/sftp_to_gcs.py",
 line 144, in execute
       self._copy_single_object(
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/google/cloud/transfers/sftp_to_gcs.py",
 line 194, in _copy_single_object
       sftp_hook.delete_file(source_path)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/sftp/hooks/sftp.py",
 line 251, in delete_file
       conn.remove(path)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/pysftp/__init__.py", line 
728, in remove
       self._sftp.remove(remotefile)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/paramiko/sftp_client.py", 
line 398, in remove
       self._request(CMD_REMOVE, path)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/paramiko/sftp_client.py", 
line 822, in _request
       return self._read_response(num)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/paramiko/sftp_client.py", 
line 874, in _read_response
       self._convert_status(msg)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/paramiko/sftp_client.py", 
line 903, in _convert_status
       raise IOError(errno.ENOENT, text)
   FileNotFoundError: [Errno 2] No such file
   [2022-01-27, 03:09:34 UTC] {taskinstance.py:720} DEBUG - Refreshing 
TaskInstance <TaskInstance: move_dag.move_task 
scheduled__2022-01-26T03:00:00+00:00 [running]> from DB
   [2022-01-27, 03:09:34 UTC] {taskinstance.py:761} DEBUG - Refreshed 
TaskInstance <TaskInstance: move_dag.move_task 
scheduled__2022-01-26T03:00:00+00:00 [running]>
   [2022-01-27, 03:09:34 UTC] {taskinstance.py:2224} DEBUG - Task Duration set 
to 1.63822
   [2022-01-27, 03:09:34 UTC] {taskinstance.py:1286} DEBUG - Clearing 
next_method and next_kwargs.
   [2022-01-27, 03:09:34 UTC] {taskinstance.py:1270} INFO - Marking task as 
UP_FOR_RETRY. dag_id=move_dag, task_id=move_task, 
execution_date=20220126T030000, start_date=20220127T030932, 
end_date=20220127T030934
   [2022-01-27, 03:09:35 UTC] {cli_action_loggers.py:84} DEBUG - Calling 
callbacks: []
   [2022-01-27, 03:09:35 UTC] {standard_task_runner.py:88} ERROR - Failed to 
execute job 735923 for task move_task
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/task/task_runner/standard_task_runner.py",
 line 85, in _start_by_fork
       args.func(args, dag=self.dag)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/cli_parser.py", 
line 48, in command
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/cli.py", line 
92, in wrapper
       return f(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py",
 line 292, in task_run
       _run_task_by_selected_method(args, dag, ti)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py",
 line 107, in _run_task_by_selected_method
       _run_raw_task(args, ti)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py",
 line 180, in _run_raw_task
       ti._run_raw_task(
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", 
line 70, in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py",
 line 1332, in _run_raw_task
       self._execute_task_with_callbacks(context)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py",
 line 1458, in _execute_task_with_callbacks
       result = self._execute_task(context, self.task)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py",
 line 1514, in _execute_task
       result = execute_callable(context=context)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/google/cloud/transfers/sftp_to_gcs.py",
 line 144, in execute
       self._copy_single_object(
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/google/cloud/transfers/sftp_to_gcs.py",
 line 194, in _copy_single_object
       sftp_hook.delete_file(source_path)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/sftp/hooks/sftp.py",
 line 251, in delete_file
       conn.remove(path)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/pysftp/__init__.py", line 
728, in remove
       self._sftp.remove(remotefile)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/paramiko/sftp_client.py", 
line 398, in remove
       self._request(CMD_REMOVE, path)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/paramiko/sftp_client.py", 
line 822, in _request
       return self._read_response(num)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/paramiko/sftp_client.py", 
line 874, in _read_response
       self._convert_status(msg)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/paramiko/sftp_client.py", 
line 903, in _convert_status
       raise IOError(errno.ENOENT, text)
   FileNotFoundError: [Errno 2] No such file
   [2022-01-27, 03:09:35 UTC] {local_task_job.py:154} INFO - Task exited with 
return code 1
   [2022-01-27, 03:09:35 UTC] {taskinstance.py:720} DEBUG - Refreshing 
TaskInstance <TaskInstance: move_dag.move_task 
scheduled__2022-01-26T03:00:00+00:00 [running]> from DB
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:761} DEBUG - Refreshed 
TaskInstance <TaskInstance: move_dag.move_task 
scheduled__2022-01-26T03:00:00+00:00 [up_for_retry]>
   [2022-01-27, 03:09:36 UTC] {dagrun.py:588} DEBUG - number of tis tasks for 
<DagRun move_dag @ 2022-01-26 03:00:00+00:00: 
scheduled__2022-01-26T03:00:00+00:00, externally triggered: False>: 9 task(s)
   [2022-01-27, 03:09:36 UTC] {dagrun.py:603} DEBUG - number of scheduleable 
tasks for <DagRun move_dag @ 2022-01-26 03:00:00+00:00: 
scheduled__2022-01-26T03:00:00+00:00, externally triggered: False>: 9 task(s)
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [up_for_retry]> 
dependency 'Previous Dagrun State' PASSED: True, The task did not have 
depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [up_for_retry]> 
dependency 'Trigger Rule' PASSED: True, The task instance did not have any 
upstream tasks.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [up_for_retry]> 
dependency 'Not In Retry Period' PASSED: False, Task is not ready for retry yet 
but will be retried automatically. Current date is 
2022-01-27T03:09:36.252994+00:00 and task will be retried at 
2022-01-27T03:11:15.404715+00:00.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not 
met for <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 
[up_for_retry]>, dependency 'Not In Retry Period' FAILED: Task is not ready for 
retry yet but will be retried automatically. Current date is 
2022-01-27T03:09:36.252994+00:00 and task will be retried at 
2022-01-27T03:11:15.404715+00:00.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.deduplicate_unified_delta_bkg_offer 
scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Previous Dagrun State' 
PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.deduplicate_unified_delta_bkg_offer 
scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Trigger Rule' PASSED: 
False, Task's trigger rule 'all_success' requires all upstream tasks to have 
succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, 
upstream_task_ids={'load_in_bq_unified_delta_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not 
met for <TaskInstance: move_dag.deduplicate_unified_delta_bkg_offer 
scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: 
Task's trigger rule 'all_success' requires all upstream tasks to have 
succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, 
upstream_task_ids={'load_in_bq_unified_delta_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.deduplicate_unified_delta_bkg_offer 
scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Not In Retry Period' 
PASSED: True, The task instance was not marked for retrying.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.load_in_bq_unified_delta_bkg_offer 
scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Previous Dagrun State' 
PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.load_in_bq_unified_delta_bkg_offer 
scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Trigger Rule' PASSED: 
False, Task's trigger rule 'all_success' requires all upstream tasks to have 
succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, 
upstream_task_ids={'pseudo_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not 
met for <TaskInstance: move_dag.load_in_bq_unified_delta_bkg_offer 
scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: 
Task's trigger rule 'all_success' requires all upstream tasks to have 
succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, 
upstream_task_ids={'pseudo_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.load_in_bq_unified_delta_bkg_offer 
scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Not In Retry Period' 
PASSED: True, The task instance was not marked for retrying.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.deduplicate_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 
[None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have 
depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.deduplicate_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 
[None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 
'all_success' requires all upstream tasks to have succeeded, but found 1 
non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 
0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, 
upstream_task_ids={'load_in_bq_work_delta_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not 
met for <TaskInstance: move_dag.deduplicate_work_delta_bkg_offer 
scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: 
Task's trigger rule 'all_success' requires all upstream tasks to have 
succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, 
upstream_task_ids={'load_in_bq_work_delta_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.deduplicate_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 
[None]> dependency 'Not In Retry Period' PASSED: True, The task instance was 
not marked for retrying.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.load_in_bq_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 
[None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have 
depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.load_in_bq_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 
[None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 
'all_success' requires all upstream tasks to have succeeded, but found 1 
non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 
0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, 
upstream_task_ids={'dqm_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not 
met for <TaskInstance: move_dag.load_in_bq_work_delta_bkg_offer 
scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: 
Task's trigger rule 'all_success' requires all upstream tasks to have 
succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, 
upstream_task_ids={'dqm_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.load_in_bq_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00 
[None]> dependency 'Not In Retry Period' PASSED: True, The task instance was 
not marked for retrying.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.pseudo_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> 
dependency 'Previous Dagrun State' PASSED: True, The task did not have 
depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.pseudo_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> 
dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' 
requires all upstream tasks to have succeeded, but found 1 non-success(es). 
upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 
'upstream_failed': 0, 'done': 0}, upstream_task_ids={'dqm_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not 
met for <TaskInstance: move_dag.pseudo_bkg_offer 
scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: 
Task's trigger rule 'all_success' requires all upstream tasks to have 
succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, 
upstream_task_ids={'dqm_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.pseudo_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> 
dependency 'Not In Retry Period' PASSED: True, The task instance was not marked 
for retrying.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.dqm_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 
'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.dqm_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 
'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all 
upstream tasks to have succeeded, but found 1 non-success(es). 
upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 
'upstream_failed': 0, 'done': 0}, upstream_task_ids={'dispatcher_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not 
met for <TaskInstance: move_dag.dqm_bkg_offer 
scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: 
Task's trigger rule 'all_success' requires all upstream tasks to have 
succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, 
upstream_task_ids={'dispatcher_bkg_offer'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.dqm_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 
'Not In Retry Period' PASSED: True, The task instance was not marked for 
retrying.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.dispatcher_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> 
dependency 'Previous Dagrun State' PASSED: True, The task did not have 
depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.dispatcher_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> 
dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' 
requires all upstream tasks to have succeeded, but found 1 non-success(es). 
upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 
'upstream_failed': 0, 'done': 0}, upstream_task_ids={'move_task'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not 
met for <TaskInstance: move_dag.dispatcher_bkg_offer 
scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: 
Task's trigger rule 'all_success' requires all upstream tasks to have 
succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, 
upstream_task_ids={'move_task'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.dispatcher_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> 
dependency 'Not In Retry Period' PASSED: True, The task instance was not marked 
for retrying.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.archive_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> 
dependency 'Previous Dagrun State' PASSED: True, The task did not have 
depends_on_past set.
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.archive_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> 
dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' 
requires all upstream tasks to have succeeded, but found 1 non-success(es). 
upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 
'upstream_failed': 0, 'done': 0}, upstream_task_ids={'move_task'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not 
met for <TaskInstance: move_dag.archive_bkg_offer 
scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: 
Task's trigger rule 'all_success' requires all upstream tasks to have 
succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, 
upstream_task_ids={'move_task'}
   [2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
move_dag.archive_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> 
dependency 'Not In Retry Period' PASSED: True, The task instance was not marked 
for retrying.
   [2022-01-27, 03:09:36 UTC] {local_task_job.py:264} INFO - 0 downstream tasks 
scheduled from follow-on schedule check
   
   ```
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to