SuccessMoses commented on issue #43744:
URL: https://github.com/apache/airflow/issues/43744#issuecomment-2496132209
@adamszustak I created a simple DAG and `CloudBatchSubmitJobOperator` always
succeed regardless of whether `deferable` is True or False. It only fails if
timeout occurs while `wait_for_job`.
Although I used `unittest.mock.patch` to mock the client not actual google
cloud connection.
```
from unittest.mock import patch
from airflow.providers.google.cloud.operators.cloud_batch import
CloudBatchSubmitJobOperator
from airflow import DAG
from google.cloud.batch_v1 import JobStatus, Job
# Define your DAG
default_args = {
'owner': 'airflow',
}
dag = DAG(
'cloud_batch_submit_example',
default_args=default_args,
)
# Job definition
CLOUD_BATCH_HOOK_PATH =
"airflow.providers.google.cloud.operators.cloud_batch.CloudBatchHook"
TASK_ID = "test"
PROJECT_ID = "testproject"
REGION = "us-central1"
JOB_NAME = "test"
JOB = Job()
JOB.name = JOB_NAME
submit_job_task = CloudBatchSubmitJobOperator(
task_id=TASK_ID, project_id=PROJECT_ID, region=REGION,
job_name=JOB_NAME, job=JOB,
dag=dag, deferrable=False, timeout_seconds=0
)
submit_job_task
if __name__ == "__main__":
with (
patch('airflow.providers.google.cloud.hooks.cloud_batch.CloudBatchHook.get_conn')
as mock,
patch('google.cloud.batch_v1.Job.to_dict') as mock_to_dict
):
mock.return_value.get_job.return_value.status.state =
JobStatus.State.FAILED #make job fail
dag.test()
```
log:
```
[2024-11-24T17:40:38.338+0000] {dag.py:2474} INFO - dagrun id:
cloud_batch_submit_example
[2024-11-24T17:40:38.357+0000] {dag.py:2493} INFO - created dagrun <DagRun
cloud_batch_submit_example @ 2024-11-24 17:40:37.987067+00:00:
manual__2024-11-24T17:40:37.987067+00:00, state:running, queued_at: None.
externally triggered: False>
[2024-11-24T17:40:38.385+0000] {dag.py:2433} INFO - [DAG TEST] starting
task_id=test map_index=-1
[2024-11-24T17:40:38.385+0000] {dag.py:2436} INFO - [DAG TEST] running task
<TaskInstance: cloud_batch_submit_example.test
manual__2024-11-24T17:40:37.987067+00:00 [scheduled]>
[2024-11-24 17:40:39,926] {taskinstance.py:2923} INFO - Exporting env vars:
AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='cloud_batch_submit_example'
AIRFLOW_CTX_TASK_ID='test'
AIRFLOW_CTX_LOGICAL_DATE='2024-11-24T17:40:37.987067+00:00'
AIRFLOW_CTX_TRY_NUMBER='1'
AIRFLOW_CTX_DAG_RUN_ID='manual__2024-11-24T17:40:37.987067+00:00'
[2024-11-24T17:40:39.926+0000] {taskinstance.py:2923} INFO - Exporting env
vars: AIRFLOW_CTX_DAG_OWNER='airflow'
AIRFLOW_CTX_DAG_ID='cloud_batch_submit_example' AIRFLOW_CTX_TASK_ID='test'
AIRFLOW_CTX_LOGICAL_DATE='2024-11-24T17:40:37.987067+00:00'
AIRFLOW_CTX_TRY_NUMBER='1'
AIRFLOW_CTX_DAG_RUN_ID='manual__2024-11-24T17:40:37.987067+00:00'
Task instance is in running state
Previous state of the Task instance: queued
Current task name:test state:scheduled start_date:None
Dag name:cloud_batch_submit_example and current dag run status:running
[2024-11-24T17:40:39.928+0000] {taskinstance.py:723} INFO - ::endgroup::
[2024-11-24T17:40:39.949+0000] {connection.py:250} WARNING - Connection
schemes (type: google_cloud_platform) shall not contain '_' according to
RFC3986.
[2024-11-24T17:40:39.949+0000] {base.py:66} INFO - Retrieving connection
'google_cloud_default'
[2024-11-24T17:40:39.966+0000] {taskinstance.py:346} INFO - ::group::Post
task execution logs
[2024-11-24T17:40:39.966+0000] {taskinstance.py:358} INFO - Marking task as
SUCCESS. dag_id=cloud_batch_submit_example, task_id=test,
run_id=manual__2024-11-24T17:40:37.987067+00:00, logical_date=20241124T174037,
start_date=, end_date=20241124T174039
Task instance in success state
Previous state of the Task instance: running
Dag name:cloud_batch_submit_example queued_at:None
Task hostname:51c25e8d352b operator:CloudBatchSubmitJobOperator
[2024-11-24T17:40:39.976+0000] {dag.py:2447} INFO - [DAG TEST] end task
task_id=test map_index=-1
[2024-11-24T17:40:39.981+0000] {dagrun.py:935} INFO - Marking run <DagRun
cloud_batch_submit_example @ 2024-11-24 17:40:37.987067+00:00:
manual__2024-11-24T17:40:37.987067+00:00, state:running, queued_at: None.
externally triggered: False> successful
Dag run in success state
Dag run start:2024-11-24 17:40:37.987067+00:00 end:2024-11-24
17:40:39.981712+00:00
[2024-11-24T17:40:39.983+0000] {dagrun.py:987} INFO - DagRun Finished:
dag_id=cloud_batch_submit_example, logical_date=2024-11-24
17:40:37.987067+00:00, run_id=manual__2024-11-24T17:40:37.987067+00:00,
run_start_date=2024-11-24 17:40:37.987067+00:00, run_end_date=2024-11-24
17:40:39.981712+00:00, run_duration=1.994645, state=success,
external_trigger=False, run_type=manual, data_interval_start=2024-11-24
17:40:37.987067+00:00, data_interval_end=2024-11-24 17:40:37.987067+00:00,
dag_version_name=None
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]