RenGeng opened a new issue #21190:
URL: https://github.com/apache/airflow/issues/21190
### Apache Airflow Provider(s)
google
### Versions of Apache Airflow Providers
apache-airflow-providers-google==6.0.0
### Apache Airflow version
2.2.1
### Operating System
Debian GNU/Linux 10 (buster)
### Deployment
Official Apache Airflow Helm Chart
### Deployment details
We deploy airflow on GKE with Kubernetes executor, here's GKE version:
`Server Version: version.Info{Major:"1", Minor:"19+",
GitVersion:"v1.19.15-gke.1801",
GitCommit:"12532d9bc117118634f870e6083a51837b34a3e8", GitTreeState:"clean",
BuildDate:"2021-10-21T21:27:39Z", GoVersion:"go1.15.15b5", Compiler:"gc",
Platform:"linux/amd64"}`
### What happened
I use SFTPToGCSOperator with the parameter `move_objectbool=True`, so the
source file will be deleted in the ftp server, but sometimes I saw two
different jobs launched at the same time in one pod, so one of them will delete
de file and the other will fail because the file doesn't exist.
### What you expected to happen
Only launch one job in the pod.
### How to reproduce
Use SFTPToGCSOperator, but this problem is not recurrent.
### Anything else
```
--------------------------------------------------------------------------------
[2022-01-27, 03:09:32 UTC] {taskinstance.py:1242} INFO - Starting attempt 1
of 4
[2022-01-27, 03:09:32 UTC] {taskinstance.py:1243} INFO -
--------------------------------------------------------------------------------
[2022-01-27, 03:09:32 UTC] {taskinstance.py:1262} INFO - Executing
<Task(SFTPToGCSOperator): move_task> on 2022-01-26 03:00:00+00:00
[2022-01-27, 03:09:32 UTC] {standard_task_runner.py:52} INFO - Started
process 55 to run task
[2022-01-27, 03:09:32 UTC] {standard_task_runner.py:76} INFO - Running:
['airflow', 'tasks', 'run', 'move_dag', 'move_task',
'scheduled__2022-01-26T03:00:00+00:00', '--job-id', '735927', '--raw',
'--subdir', 'DAGS_FOLDER/move_dag.py', '--cfg-path', '/tmp/tmp1pjt_ykp',
'--error-file', '/tmp/tmp2lkxj9im']
[2022-01-27, 03:09:32 UTC] {standard_task_runner.py:77} INFO - Job 735927:
Subtask move_task
[2022-01-27, 03:09:32 UTC] {cli_action_loggers.py:66} DEBUG - Calling
callbacks: [<function default_action_log at 0x7fde5b957f70>]
[2022-01-27, 03:09:33 UTC] {settings.py:210} DEBUG - Setting up DB
connection pool (PID 55)
[2022-01-27, 03:09:33 UTC] {settings.py:267} DEBUG -
settings.prepare_engine_args(): Using NullPool
[2022-01-27, 03:09:33 UTC] {logging_mixin.py:109} INFO - Running
<TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00
[running]> on host movedag.39daa8eb5a
[2022-01-27, 03:09:33 UTC] {taskinstance.py:720} DEBUG - Refreshing
TaskInstance <TaskInstance: move_dag.move_task
scheduled__2022-01-26T03:00:00+00:00 [running]> from DB
[2022-01-27, 03:09:33 UTC] {taskinstance.py:761} DEBUG - Refreshed
TaskInstance <TaskInstance: move_dag.move_task
scheduled__2022-01-26T03:00:00+00:00 [running]>
[2022-01-27, 03:09:33 UTC] {taskinstance.py:790} DEBUG - Clearing XCom data
[2022-01-27, 03:09:33 UTC] {taskinstance.py:797} DEBUG - XCom data cleared
[2022-01-27, 03:09:33 UTC] {logging_mixin.py:109} WARNING -
/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py:1941
DeprecationWarning: Accessing 'next_execution_date' from the template is
deprecated and will be removed in a future version. Please use
'data_interval_end' instead.
[2022-01-27, 03:09:33 UTC] {taskinstance.py:1427} INFO - Exporting the
following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=move_dag
AIRFLOW_CTX_TASK_ID=move_task
AIRFLOW_CTX_EXECUTION_DATE=2022-01-26T03:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2022-01-26T03:00:00+00:00
[2022-01-27, 03:09:33 UTC] {__init__.py:146} DEBUG - Preparing lineage
inlets and outlets
[2022-01-27, 03:09:33 UTC] {__init__.py:190} DEBUG - inlets: [], outlets: []
[2022-01-27, 03:09:33 UTC] {base.py:70} INFO - Using connection to: id:
ftp_dl_talend. Host: 10.108.1.45, Port: 22, Schema: , Login: talend, Password:
None, extra: {'key_file': '/home/airflow/.ssh/id_rsa', 'no_host_key_check':
True}
[2022-01-27, 03:09:33 UTC] {base.py:70} INFO - Using connection to: id:
ftp_dl_talend. Host: 10.108.1.45, Port: 22, Schema: , Login: talend, Password:
None, extra: {'key_file': '/home/airflow/.ssh/id_rsa', 'no_host_key_check':
True}
[2022-01-27, 03:09:33 UTC] {logging_mixin.py:109} WARNING -
/home/airflow/.local/lib/python3.9/site-packages/pysftp/__init__.py:61
UserWarning: Failed to load HostKeys from /home/airflow/.ssh/known_hosts. You
will need to explicitly load HostKeys (cnopts.hostkeys.load(filename)) or
disableHostKey checking (cnopts.hostkeys = None).
[2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - starting thread
(client mode): 0x3dab54f0
[2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Local
version/idstring: SSH-2.0-paramiko_2.8.0
[2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Remote
version/idstring: SSH-2.0-OpenSSH_7.4
[2022-01-27, 03:09:33 UTC] {transport.py:1819} INFO - Connected (version
2.0, client OpenSSH_7.4)
[2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - kex
algos:['curve25519-sha256', '[email protected]',
'ecdh-sha2-nistp256', 'ecdh-sha2-nistp384', 'ecdh-sha2-nistp521',
'diffie-hellman-group-exchange-sha256', 'diffie-hellman-group16-sha512',
'diffie-hellman-group18-sha512', 'diffie-hellman-group-exchange-sha1',
'diffie-hellman-group14-sha256', 'diffie-hellman-group14-sha1',
'diffie-hellman-group1-sha1'] server key:['ssh-rsa', 'rsa-sha2-512',
'rsa-sha2-256', 'ecdsa-sha2-nistp256', 'ssh-ed25519'] client
encrypt:['[email protected]', 'aes128-ctr', 'aes192-ctr',
'aes256-ctr', '[email protected]', '[email protected]', 'aes128-cbc',
'aes192-cbc', 'aes256-cbc', 'blowfish-cbc', 'cast128-cbc', '3des-cbc'] server
encrypt:['[email protected]', 'aes128-ctr', 'aes192-ctr',
'aes256-ctr', '[email protected]', '[email protected]', 'aes128-cbc',
'aes192-cbc', 'aes256-cbc', 'blowfish-cbc', 'cast128-cbc', '3des-cbc'] client
mac:['umac-64-et
[email protected]', '[email protected]',
'[email protected]', '[email protected]',
'[email protected]', '[email protected]', '[email protected]',
'hmac-sha2-256', 'hmac-sha2-512', 'hmac-sha1'] server
mac:['[email protected]', '[email protected]',
'[email protected]', '[email protected]',
'[email protected]', '[email protected]', '[email protected]',
'hmac-sha2-256', 'hmac-sha2-512', 'hmac-sha1'] client compress:['none',
'[email protected]'] server compress:['none', '[email protected]'] client
lang:[''] server lang:[''] kex follows?False
[2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Kex agreed:
[email protected]
[2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - HostKey agreed:
ssh-ed25519
[2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Cipher agreed:
aes128-ctr
[2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - MAC agreed:
hmac-sha2-256
[2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Compression agreed:
[email protected]
[2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - kex engine
KexCurve25519 specified hash_algo <built-in function openssl_sha256>
[2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Switch to new keys ...
[2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Attempting public-key
auth...
[2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - userauth is OK
[2022-01-27, 03:09:33 UTC] {transport.py:1819} INFO - Authentication
(publickey) successful!
[2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Switching on outbound
compression ...
[2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Switching on inbound
compression ...
[2022-01-27, 03:09:33 UTC] {channel.py:1212} DEBUG - [chan 0] Max packet in:
32768 bytes
[2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Received global
request "[email protected]"
[2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Rejecting
"[email protected]" global request from server.
[2022-01-27, 03:09:33 UTC] {channel.py:1212} DEBUG - [chan 0] Max packet
out: 32768 bytes
[2022-01-27, 03:09:33 UTC] {transport.py:1819} DEBUG - Secsh channel 0
opened.
[2022-01-27, 03:09:33 UTC] {channel.py:1212} DEBUG - [chan 0] Sesch channel
0 request ok
[2022-01-27, 03:09:33 UTC] {sftp.py:158} INFO - [chan 0] Opened sftp
connection (server version 3)
[2022-01-27, 03:09:33 UTC] {sftp.py:158} DEBUG - [chan 0]
listdir(b'input/PROD/polo/BKG_OFFER')
[2022-01-27, 03:09:33 UTC] {sftp.py:158} DEBUG - [chan 0]
stat(b'input_20210630.csv')
[2022-01-27, 03:09:33 UTC] {sftp.py:158} DEBUG - [chan 0]
stat(b'input_20210701.csv')
[2022-01-27, 03:09:33 UTC] {sftp.py:158} DEBUG - [chan 0]
stat(b'input_20210702.csv')
.....
[2022-01-27, 03:09:34 UTC] {sftp.py:158} DEBUG - [chan 0]
stat(b'input_20220117.csv')
[2022-01-27, 03:09:34 UTC] {sftp.py:158} DEBUG - [chan 0]
stat(b'input_20220127.csv')
[2022-01-27, 03:09:34 UTC] {sftp_to_gcs.py:175} INFO - Executing copy of
input_20220127.csv to gs://XXXXXX/input_20220127.csv
[2022-01-27, 03:09:34 UTC] {sftp.py:158} DEBUG - [chan 0]
stat(b'input_20220127.csv')
[2022-01-27, 03:09:34 UTC] {sftp.py:158} DEBUG - [chan 0]
open(b'input_20220127.csv 'rb')
[2022-01-27, 03:09:34 UTC] {sftp.py:158} DEBUG - [chan 0]
open(b'input_20220127, 'rb') 000000
[2022-01-27, 03:09:34 UTC] {sftp.py:158} DEBUG - [chan 0] close(00000000)
[2022-01-27, 03:09:34 UTC] {credentials_provider.py:295} INFO - Getting
connection using `google.auth.default()` since no key file is defined for hook.
[2022-01-27, 03:09:34 UTC] {_default.py:206} DEBUG - Checking None for
explicit credentials as part of auth process...
[2022-01-27, 03:09:34 UTC] {_default.py:181} DEBUG - Checking Cloud SDK
credentials as part of auth process...
[2022-01-27, 03:09:34 UTC] {_default.py:187} DEBUG - Cloud SDK credentials
not found on disk; not using them
[2022-01-27, 03:09:34 UTC] {_http_client.py:104} DEBUG - Making request: GET
http://169.254.169.254
[2022-01-27, 03:09:34 UTC] {_http_client.py:104} DEBUG - Making request: GET
http://metadata.google.internal/computeMetadata/v1/project/project-id
[2022-01-27, 03:09:34 UTC] {requests.py:182} DEBUG - Making request: GET
http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/?recursive=true
[2022-01-27, 03:09:34 UTC] {requests.py:182} DEBUG - Making request: GET
http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/<redacted>
[2022-01-27, 03:09:34 UTC] {gcs.py:485} INFO - File /tmp/tmpv52p77bg
uploaded to input_20220127.csv in XXXX bucket
[2022-01-27, 03:09:34 UTC] {sftp_to_gcs.py:193} INFO - Executing delete of
input_20220127.csv
[2022-01-27, 03:09:34 UTC] {sftp.py:158} DEBUG - [chan 0]
remove(b'input_20220127.csv')
[2022-01-27, 03:09:34 UTC] {sftp.py:158} INFO - [chan 0] sftp session closed.
[2022-01-27, 03:09:34 UTC] {channel.py:1212} DEBUG - [chan 0] EOF sent (0)
[2022-01-27, 03:09:34 UTC] {__init__.py:107} DEBUG - Lineage called with
inlets: [], outlets: []
[2022-01-27, 03:09:34 UTC] {taskinstance.py:720} DEBUG - Refreshing
TaskInstance <TaskInstance: move_dag.move_task
scheduled__2022-01-26T03:00:00+00:00 [running]> from DB
[2022-01-27, 03:09:34 UTC] {transport.py:1819} DEBUG - EOF in transport
thread
[2022-01-27, 03:09:35 UTC] {taskinstance.py:761} DEBUG - Refreshed
TaskInstance <TaskInstance: move_dag.move_task
scheduled__2022-01-26T03:00:00+00:00 [running]>
[2022-01-27, 03:09:35 UTC] {taskinstance.py:1286} DEBUG - Clearing
next_method and next_kwargs.
[2022-01-27, 03:09:35 UTC] {taskinstance.py:1270} INFO - Marking task as
SUCCESS. dag_id=move_dag, task_id=move_task, execution_date=20220126T030000,
start_date=20220127T030932, end_date=20220127T030935
[2022-01-27, 03:09:35 UTC] {taskinstance.py:2224} DEBUG - Task Duration set
to 3.168044
[2022-01-27, 03:09:35 UTC] {cli_action_loggers.py:84} DEBUG - Calling
callbacks: []
[2022-01-27, 03:09:35 UTC] {local_task_job.py:154} INFO - Task exited with
return code 0
[2022-01-27, 03:09:35 UTC] {taskinstance.py:720} DEBUG - Refreshing
TaskInstance <TaskInstance: move_dag.move_task
scheduled__2022-01-26T03:00:00+00:00 [running]> from DB
[2022-01-27, 03:09:36 UTC] {taskinstance.py:761} DEBUG - Refreshed
TaskInstance <TaskInstance: move_dag.move_task
scheduled__2022-01-26T03:00:00+00:00 [up_for_retry]>
[2022-01-27, 03:09:36 UTC] {dagrun.py:588} DEBUG - number of tis tasks for
<DagRun move_dag @ 2022-01-26 03:00:00+00:00:
scheduled__2022-01-26T03:00:00+00:00, externally triggered: False>: 9 task(s)
[2022-01-27, 03:09:36 UTC] {dagrun.py:603} DEBUG - number of scheduleable
tasks for <DagRun move_dag @ 2022-01-26 03:00:00+00:00:
scheduled__2022-01-26T03:00:00+00:00, externally triggered: False>: 9 task(s)
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [up_for_retry]>
dependency 'Previous Dagrun State' PASSED: True, The task did not have
depends_on_past set.
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [up_for_retry]>
dependency 'Not In Retry Period' PASSED: False, Task is not ready for retry yet
but will be retried automatically. Current date is
2022-01-27T03:09:36.126713+00:00 and task will be retried at
2022-01-27T03:11:15.404715+00:00.
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not
met for <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00
[up_for_retry]>, dependency 'Not In Retry Period' FAILED: Task is not ready for
retry yet but will be retried automatically. Current date is
2022-01-27T03:09:36.126713+00:00 and task will be retried at
2022-01-27T03:11:15.404715+00:00.
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [up_for_retry]>
dependency 'Trigger Rule' PASSED: True, The task instance did not have any
upstream tasks.
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.deduplicate_unified_delta_bkg_offer
scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Previous Dagrun State'
PASSED: True, The task did not have depends_on_past set.
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.deduplicate_unified_delta_bkg_offer
scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Not In Retry Period'
PASSED: True, The task instance was not marked for retrying.
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.deduplicate_unified_delta_bkg_offer
scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Trigger Rule' PASSED:
False, Task's trigger rule 'all_success' requires all upstream tasks to have
succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1,
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0},
upstream_task_ids={'load_in_bq_unified_delta_bkg_offer'}
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not
met for <TaskInstance: move_dag.deduplicate_unified_delta_bkg_offer
scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED:
Task's trigger rule 'all_success' requires all upstream tasks to have
succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1,
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0},
upstream_task_ids={'load_in_bq_unified_delta_bkg_offer'}
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.load_in_bq_unified_delta_bkg_offer
scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Previous Dagrun State'
PASSED: True, The task did not have depends_on_past set.
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.load_in_bq_unified_delta_bkg_offer
scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Not In Retry Period'
PASSED: True, The task instance was not marked for retrying.
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.load_in_bq_unified_delta_bkg_offer
scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Trigger Rule' PASSED:
False, Task's trigger rule 'all_success' requires all upstream tasks to have
succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1,
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0},
upstream_task_ids={'pseudo_bkg_offer'}
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not
met for <TaskInstance: move_dag.load_in_bq_unified_delta_bkg_offer
scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED:
Task's trigger rule 'all_success' requires all upstream tasks to have
succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1,
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0},
upstream_task_ids={'pseudo_bkg_offer'}
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.deduplicate_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00
[None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have
depends_on_past set.
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.deduplicate_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00
[None]> dependency 'Not In Retry Period' PASSED: True, The task instance was
not marked for retrying.
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.deduplicate_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00
[None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule
'all_success' requires all upstream tasks to have succeeded, but found 1
non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped':
0, 'failed': 0, 'upstream_failed': 0, 'done': 0},
upstream_task_ids={'load_in_bq_work_delta_bkg_offer'}
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not
met for <TaskInstance: move_dag.deduplicate_work_delta_bkg_offer
scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED:
Task's trigger rule 'all_success' requires all upstream tasks to have
succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1,
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0},
upstream_task_ids={'load_in_bq_work_delta_bkg_offer'}
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.load_in_bq_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00
[None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have
depends_on_past set.
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.load_in_bq_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00
[None]> dependency 'Not In Retry Period' PASSED: True, The task instance was
not marked for retrying.
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.load_in_bq_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00
[None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule
'all_success' requires all upstream tasks to have succeeded, but found 1
non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped':
0, 'failed': 0, 'upstream_failed': 0, 'done': 0},
upstream_task_ids={'dqm_bkg_offer'}
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not
met for <TaskInstance: move_dag.load_in_bq_work_delta_bkg_offer
scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED:
Task's trigger rule 'all_success' requires all upstream tasks to have
succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1,
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0},
upstream_task_ids={'dqm_bkg_offer'}
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.pseudo_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>
dependency 'Previous Dagrun State' PASSED: True, The task did not have
depends_on_past set.
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.pseudo_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>
dependency 'Not In Retry Period' PASSED: True, The task instance was not marked
for retrying.
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.pseudo_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>
dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success'
requires all upstream tasks to have succeeded, but found 1 non-success(es).
upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0,
'upstream_failed': 0, 'done': 0}, upstream_task_ids={'dqm_bkg_offer'}
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not
met for <TaskInstance: move_dag.pseudo_bkg_offer
scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED:
Task's trigger rule 'all_success' requires all upstream tasks to have
succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1,
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0},
upstream_task_ids={'dqm_bkg_offer'}
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.dqm_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency
'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.dqm_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency
'Not In Retry Period' PASSED: True, The task instance was not marked for
retrying.
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.dqm_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency
'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all
upstream tasks to have succeeded, but found 1 non-success(es).
upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0,
'upstream_failed': 0, 'done': 0}, upstream_task_ids={'dispatcher_bkg_offer'}
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not
met for <TaskInstance: move_dag.dqm_bkg_offer
scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED:
Task's trigger rule 'all_success' requires all upstream tasks to have
succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1,
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0},
upstream_task_ids={'dispatcher_bkg_offer'}
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.dispatcher_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>
dependency 'Previous Dagrun State' PASSED: True, The task did not have
depends_on_past set.
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.dispatcher_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>
dependency 'Not In Retry Period' PASSED: True, The task instance was not marked
for retrying.
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.dispatcher_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>
dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success'
requires all upstream tasks to have succeeded, but found 1 non-success(es).
upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0,
'upstream_failed': 0, 'done': 0}, upstream_task_ids={'move_task'}
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not
met for <TaskInstance: move_dag.dispatcher_bkg_offer
scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED:
Task's trigger rule 'all_success' requires all upstream tasks to have
succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1,
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0},
upstream_task_ids={'move_task'}
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.archive_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>
dependency 'Previous Dagrun State' PASSED: True, The task did not have
depends_on_past set.
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.archive_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>
dependency 'Not In Retry Period' PASSED: True, The task instance was not marked
for retrying.
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.archive_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>
dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success'
requires all upstream tasks to have succeeded, but found 1 non-success(es).
upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0,
'upstream_failed': 0, 'done': 0}, upstream_task_ids={'move_task'}
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not
met for <TaskInstance: move_dag.archive_bkg_offer
scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED:
Task's trigger rule 'all_success' requires all upstream tasks to have
succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1,
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0},
upstream_task_ids={'move_task'}
[2022-01-27, 03:09:36 UTC] {local_task_job.py:264} INFO - 0 downstream tasks
scheduled from follow-on schedule check
[2022-01-27, 03:09:31 UTC] {base_task_runner.py:63} DEBUG - Planning to run
as the user
[2022-01-27, 03:09:31 UTC] {taskinstance.py:720} DEBUG - Refreshing
TaskInstance <TaskInstance: move_dag.move_task
scheduled__2022-01-26T03:00:00+00:00 [queued]> from DB
[2022-01-27, 03:09:32 UTC] {taskinstance.py:761} DEBUG - Refreshed
TaskInstance <TaskInstance: move_dag.move_task
scheduled__2022-01-26T03:00:00+00:00 [queued]>
[2022-01-27, 03:09:32 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]> dependency
'Task Instance Not Running' PASSED: True, Task is not in running state.
[2022-01-27, 03:09:32 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]> dependency
'Not In Retry Period' PASSED: True, The task instance was not marked for
retrying.
[2022-01-27, 03:09:32 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]> dependency
'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2022-01-27, 03:09:32 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]> dependency
'Task Instance State' PASSED: True, Task state queued was valid.
[2022-01-27, 03:09:32 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]> dependency
'Trigger Rule' PASSED: True, The task instance did not have any upstream tasks.
[2022-01-27, 03:09:32 UTC] {taskinstance.py:1035} INFO - Dependencies all
met for <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00
[queued]>
[2022-01-27, 03:09:32 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]> dependency
'Task Concurrency' PASSED: True, Task concurrency is not set.
[2022-01-27, 03:09:32 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]> dependency
'Pool Slots Available' PASSED: True, ('There are enough open slots in %s to
execute the task', 'default_pool')
[2022-01-27, 03:09:32 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]> dependency
'Not In Retry Period' PASSED: True, The task instance was not marked for
retrying.
[2022-01-27, 03:09:32 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]> dependency
'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2022-01-27, 03:09:32 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [queued]> dependency
'Trigger Rule' PASSED: True, The task instance did not have any upstream tasks.
[2022-01-27, 03:09:32 UTC] {taskinstance.py:1035} INFO - Dependencies all
met for <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00
[queued]>
[2022-01-27, 03:09:32 UTC] {taskinstance.py:1241} INFO -
--------------------------------------------------------------------------------
[2022-01-27, 03:09:32 UTC] {taskinstance.py:1242} INFO - Starting attempt 1
of 4
[2022-01-27, 03:09:32 UTC] {taskinstance.py:1243} INFO -
--------------------------------------------------------------------------------
[2022-01-27, 03:09:32 UTC] {taskinstance.py:1262} INFO - Executing
<Task(SFTPToGCSOperator): move_task> on 2022-01-26 03:00:00+00:00
[2022-01-27, 03:09:32 UTC] {standard_task_runner.py:52} INFO - Started
process 55 to run task
[2022-01-27, 03:09:32 UTC] {standard_task_runner.py:76} INFO - Running:
['airflow', 'tasks', 'run', 'move_dag', 'move_task',
'scheduled__2022-01-26T03:00:00+00:00', '--job-id', '735923', '--raw',
'--subdir', 'DAGS_FOLDER/move_dag.py', '--cfg-path', '/tmp/tmp_kvghyhi',
'--error-file', '/tmp/tmpxlr36n7u']
[2022-01-27, 03:09:32 UTC] {standard_task_runner.py:77} INFO - Job 735923:
Subtask move_task
[2022-01-27, 03:09:32 UTC] {cli_action_loggers.py:66} DEBUG - Calling
callbacks: [<function default_action_log at 0x7fc797444f70>]
[2022-01-27, 03:09:32 UTC] {settings.py:210} DEBUG - Setting up DB
connection pool (PID 55)
[2022-01-27, 03:09:32 UTC] {settings.py:267} DEBUG -
settings.prepare_engine_args(): Using NullPool
[2022-01-27, 03:09:33 UTC] {logging_mixin.py:109} INFO - Running
<TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00
[running]> on host movedag.68e76697f9
[2022-01-27, 03:09:33 UTC] {taskinstance.py:720} DEBUG - Refreshing
TaskInstance <TaskInstance: move_dag.move_task
scheduled__2022-01-26T03:00:00+00:00 [running]> from DB
[2022-01-27, 03:09:33 UTC] {taskinstance.py:761} DEBUG - Refreshed
TaskInstance <TaskInstance: move_dag.move_task
scheduled__2022-01-26T03:00:00+00:00 [running]>
[2022-01-27, 03:09:33 UTC] {taskinstance.py:790} DEBUG - Clearing XCom data
[2022-01-27, 03:09:33 UTC] {taskinstance.py:797} DEBUG - XCom data cleared
[2022-01-27, 03:09:33 UTC] {logging_mixin.py:109} WARNING -
/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py:1941
DeprecationWarning: Accessing 'next_execution_date' from the template is
deprecated and will be removed in a future version. Please use
'data_interval_end' instead.
[2022-01-27, 03:09:33 UTC] {taskinstance.py:1427} INFO - Exporting the
following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=move_dag
AIRFLOW_CTX_TASK_ID=move_task
AIRFLOW_CTX_EXECUTION_DATE=2022-01-26T03:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2022-01-26T03:00:00+00:00
.......
SAME AS BEFORE
.......
[2022-01-27, 03:09:34 UTC] {sftp_to_gcs.py:193} INFO - Executing delete of
input_20220127.csv
[2022-01-27, 03:09:34 UTC] {sftp.py:158} DEBUG - [chan 0]
remove(b'input_20220127.csv')
[2022-01-27, 03:09:34 UTC] {taskinstance.py:1703} ERROR - Task failed with
exception
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py",
line 1332, in _run_raw_task
self._execute_task_with_callbacks(context)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py",
line 1458, in _execute_task_with_callbacks
result = self._execute_task(context, self.task)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py",
line 1514, in _execute_task
result = execute_callable(context=context)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/google/cloud/transfers/sftp_to_gcs.py",
line 144, in execute
self._copy_single_object(
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/google/cloud/transfers/sftp_to_gcs.py",
line 194, in _copy_single_object
sftp_hook.delete_file(source_path)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/sftp/hooks/sftp.py",
line 251, in delete_file
conn.remove(path)
File
"/home/airflow/.local/lib/python3.9/site-packages/pysftp/__init__.py", line
728, in remove
self._sftp.remove(remotefile)
File
"/home/airflow/.local/lib/python3.9/site-packages/paramiko/sftp_client.py",
line 398, in remove
self._request(CMD_REMOVE, path)
File
"/home/airflow/.local/lib/python3.9/site-packages/paramiko/sftp_client.py",
line 822, in _request
return self._read_response(num)
File
"/home/airflow/.local/lib/python3.9/site-packages/paramiko/sftp_client.py",
line 874, in _read_response
self._convert_status(msg)
File
"/home/airflow/.local/lib/python3.9/site-packages/paramiko/sftp_client.py",
line 903, in _convert_status
raise IOError(errno.ENOENT, text)
FileNotFoundError: [Errno 2] No such file
[2022-01-27, 03:09:34 UTC] {taskinstance.py:720} DEBUG - Refreshing
TaskInstance <TaskInstance: move_dag.move_task
scheduled__2022-01-26T03:00:00+00:00 [running]> from DB
[2022-01-27, 03:09:34 UTC] {taskinstance.py:761} DEBUG - Refreshed
TaskInstance <TaskInstance: move_dag.move_task
scheduled__2022-01-26T03:00:00+00:00 [running]>
[2022-01-27, 03:09:34 UTC] {taskinstance.py:2224} DEBUG - Task Duration set
to 1.63822
[2022-01-27, 03:09:34 UTC] {taskinstance.py:1286} DEBUG - Clearing
next_method and next_kwargs.
[2022-01-27, 03:09:34 UTC] {taskinstance.py:1270} INFO - Marking task as
UP_FOR_RETRY. dag_id=move_dag, task_id=move_task,
execution_date=20220126T030000, start_date=20220127T030932,
end_date=20220127T030934
[2022-01-27, 03:09:35 UTC] {cli_action_loggers.py:84} DEBUG - Calling
callbacks: []
[2022-01-27, 03:09:35 UTC] {standard_task_runner.py:88} ERROR - Failed to
execute job 735923 for task move_task
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/task/task_runner/standard_task_runner.py",
line 85, in _start_by_fork
args.func(args, dag=self.dag)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/cli_parser.py",
line 48, in command
return func(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/cli.py", line
92, in wrapper
return f(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py",
line 292, in task_run
_run_task_by_selected_method(args, dag, ti)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py",
line 107, in _run_task_by_selected_method
_run_raw_task(args, ti)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py",
line 180, in _run_raw_task
ti._run_raw_task(
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py",
line 70, in wrapper
return func(*args, session=session, **kwargs)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py",
line 1332, in _run_raw_task
self._execute_task_with_callbacks(context)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py",
line 1458, in _execute_task_with_callbacks
result = self._execute_task(context, self.task)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py",
line 1514, in _execute_task
result = execute_callable(context=context)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/google/cloud/transfers/sftp_to_gcs.py",
line 144, in execute
self._copy_single_object(
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/google/cloud/transfers/sftp_to_gcs.py",
line 194, in _copy_single_object
sftp_hook.delete_file(source_path)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/sftp/hooks/sftp.py",
line 251, in delete_file
conn.remove(path)
File
"/home/airflow/.local/lib/python3.9/site-packages/pysftp/__init__.py", line
728, in remove
self._sftp.remove(remotefile)
File
"/home/airflow/.local/lib/python3.9/site-packages/paramiko/sftp_client.py",
line 398, in remove
self._request(CMD_REMOVE, path)
File
"/home/airflow/.local/lib/python3.9/site-packages/paramiko/sftp_client.py",
line 822, in _request
return self._read_response(num)
File
"/home/airflow/.local/lib/python3.9/site-packages/paramiko/sftp_client.py",
line 874, in _read_response
self._convert_status(msg)
File
"/home/airflow/.local/lib/python3.9/site-packages/paramiko/sftp_client.py",
line 903, in _convert_status
raise IOError(errno.ENOENT, text)
FileNotFoundError: [Errno 2] No such file
[2022-01-27, 03:09:35 UTC] {local_task_job.py:154} INFO - Task exited with
return code 1
[2022-01-27, 03:09:35 UTC] {taskinstance.py:720} DEBUG - Refreshing
TaskInstance <TaskInstance: move_dag.move_task
scheduled__2022-01-26T03:00:00+00:00 [running]> from DB
[2022-01-27, 03:09:36 UTC] {taskinstance.py:761} DEBUG - Refreshed
TaskInstance <TaskInstance: move_dag.move_task
scheduled__2022-01-26T03:00:00+00:00 [up_for_retry]>
[2022-01-27, 03:09:36 UTC] {dagrun.py:588} DEBUG - number of tis tasks for
<DagRun move_dag @ 2022-01-26 03:00:00+00:00:
scheduled__2022-01-26T03:00:00+00:00, externally triggered: False>: 9 task(s)
[2022-01-27, 03:09:36 UTC] {dagrun.py:603} DEBUG - number of scheduleable
tasks for <DagRun move_dag @ 2022-01-26 03:00:00+00:00:
scheduled__2022-01-26T03:00:00+00:00, externally triggered: False>: 9 task(s)
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [up_for_retry]>
dependency 'Previous Dagrun State' PASSED: True, The task did not have
depends_on_past set.
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [up_for_retry]>
dependency 'Trigger Rule' PASSED: True, The task instance did not have any
upstream tasks.
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.move_task scheduled__2022-01-26T03:00:00+00:00 [up_for_retry]>
dependency 'Not In Retry Period' PASSED: False, Task is not ready for retry yet
but will be retried automatically. Current date is
2022-01-27T03:09:36.252994+00:00 and task will be retried at
2022-01-27T03:11:15.404715+00:00.
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not
met for <TaskInstance: move_dag.move_task scheduled__2022-01-26T03:00:00+00:00
[up_for_retry]>, dependency 'Not In Retry Period' FAILED: Task is not ready for
retry yet but will be retried automatically. Current date is
2022-01-27T03:09:36.252994+00:00 and task will be retried at
2022-01-27T03:11:15.404715+00:00.
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.deduplicate_unified_delta_bkg_offer
scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Previous Dagrun State'
PASSED: True, The task did not have depends_on_past set.
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.deduplicate_unified_delta_bkg_offer
scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Trigger Rule' PASSED:
False, Task's trigger rule 'all_success' requires all upstream tasks to have
succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1,
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0},
upstream_task_ids={'load_in_bq_unified_delta_bkg_offer'}
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not
met for <TaskInstance: move_dag.deduplicate_unified_delta_bkg_offer
scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED:
Task's trigger rule 'all_success' requires all upstream tasks to have
succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1,
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0},
upstream_task_ids={'load_in_bq_unified_delta_bkg_offer'}
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.deduplicate_unified_delta_bkg_offer
scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Not In Retry Period'
PASSED: True, The task instance was not marked for retrying.
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.load_in_bq_unified_delta_bkg_offer
scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Previous Dagrun State'
PASSED: True, The task did not have depends_on_past set.
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.load_in_bq_unified_delta_bkg_offer
scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Trigger Rule' PASSED:
False, Task's trigger rule 'all_success' requires all upstream tasks to have
succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1,
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0},
upstream_task_ids={'pseudo_bkg_offer'}
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not
met for <TaskInstance: move_dag.load_in_bq_unified_delta_bkg_offer
scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED:
Task's trigger rule 'all_success' requires all upstream tasks to have
succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1,
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0},
upstream_task_ids={'pseudo_bkg_offer'}
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.load_in_bq_unified_delta_bkg_offer
scheduled__2022-01-26T03:00:00+00:00 [None]> dependency 'Not In Retry Period'
PASSED: True, The task instance was not marked for retrying.
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.deduplicate_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00
[None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have
depends_on_past set.
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.deduplicate_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00
[None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule
'all_success' requires all upstream tasks to have succeeded, but found 1
non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped':
0, 'failed': 0, 'upstream_failed': 0, 'done': 0},
upstream_task_ids={'load_in_bq_work_delta_bkg_offer'}
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not
met for <TaskInstance: move_dag.deduplicate_work_delta_bkg_offer
scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED:
Task's trigger rule 'all_success' requires all upstream tasks to have
succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1,
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0},
upstream_task_ids={'load_in_bq_work_delta_bkg_offer'}
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.deduplicate_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00
[None]> dependency 'Not In Retry Period' PASSED: True, The task instance was
not marked for retrying.
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.load_in_bq_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00
[None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have
depends_on_past set.
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.load_in_bq_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00
[None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule
'all_success' requires all upstream tasks to have succeeded, but found 1
non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped':
0, 'failed': 0, 'upstream_failed': 0, 'done': 0},
upstream_task_ids={'dqm_bkg_offer'}
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not
met for <TaskInstance: move_dag.load_in_bq_work_delta_bkg_offer
scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED:
Task's trigger rule 'all_success' requires all upstream tasks to have
succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1,
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0},
upstream_task_ids={'dqm_bkg_offer'}
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.load_in_bq_work_delta_bkg_offer scheduled__2022-01-26T03:00:00+00:00
[None]> dependency 'Not In Retry Period' PASSED: True, The task instance was
not marked for retrying.
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.pseudo_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>
dependency 'Previous Dagrun State' PASSED: True, The task did not have
depends_on_past set.
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.pseudo_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>
dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success'
requires all upstream tasks to have succeeded, but found 1 non-success(es).
upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0,
'upstream_failed': 0, 'done': 0}, upstream_task_ids={'dqm_bkg_offer'}
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not
met for <TaskInstance: move_dag.pseudo_bkg_offer
scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED:
Task's trigger rule 'all_success' requires all upstream tasks to have
succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1,
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0},
upstream_task_ids={'dqm_bkg_offer'}
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.pseudo_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>
dependency 'Not In Retry Period' PASSED: True, The task instance was not marked
for retrying.
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.dqm_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency
'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.dqm_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency
'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all
upstream tasks to have succeeded, but found 1 non-success(es).
upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0,
'upstream_failed': 0, 'done': 0}, upstream_task_ids={'dispatcher_bkg_offer'}
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not
met for <TaskInstance: move_dag.dqm_bkg_offer
scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED:
Task's trigger rule 'all_success' requires all upstream tasks to have
succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1,
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0},
upstream_task_ids={'dispatcher_bkg_offer'}
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.dqm_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]> dependency
'Not In Retry Period' PASSED: True, The task instance was not marked for
retrying.
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.dispatcher_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>
dependency 'Previous Dagrun State' PASSED: True, The task did not have
depends_on_past set.
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.dispatcher_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>
dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success'
requires all upstream tasks to have succeeded, but found 1 non-success(es).
upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0,
'upstream_failed': 0, 'done': 0}, upstream_task_ids={'move_task'}
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not
met for <TaskInstance: move_dag.dispatcher_bkg_offer
scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED:
Task's trigger rule 'all_success' requires all upstream tasks to have
succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1,
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0},
upstream_task_ids={'move_task'}
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.dispatcher_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>
dependency 'Not In Retry Period' PASSED: True, The task instance was not marked
for retrying.
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.archive_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>
dependency 'Previous Dagrun State' PASSED: True, The task did not have
depends_on_past set.
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.archive_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>
dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success'
requires all upstream tasks to have succeeded, but found 1 non-success(es).
upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0,
'upstream_failed': 0, 'done': 0}, upstream_task_ids={'move_task'}
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1025} DEBUG - Dependencies not
met for <TaskInstance: move_dag.archive_bkg_offer
scheduled__2022-01-26T03:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED:
Task's trigger rule 'all_success' requires all upstream tasks to have
succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1,
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0},
upstream_task_ids={'move_task'}
[2022-01-27, 03:09:36 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
move_dag.archive_bkg_offer scheduled__2022-01-26T03:00:00+00:00 [None]>
dependency 'Not In Retry Period' PASSED: True, The task instance was not marked
for retrying.
[2022-01-27, 03:09:36 UTC] {local_task_job.py:264} INFO - 0 downstream tasks
scheduled from follow-on schedule check
```
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]