SuccessMoses commented on issue #43744:
URL: https://github.com/apache/airflow/issues/43744#issuecomment-2496132209

   @adamszustak I created a simple DAG and `CloudBatchSubmitJobOperator` always 
succeed regardless of whether `deferable` is True or False. It only fails if 
timeout occurs while `wait_for_job`. 
   
   Although I used `unittest.mock.patch` to mock the client not actual google 
cloud connection.
   
   ```
   from unittest.mock import patch
   from airflow.providers.google.cloud.operators.cloud_batch import 
CloudBatchSubmitJobOperator
   from airflow import DAG
   from google.cloud.batch_v1 import JobStatus, Job
   
   # Define your DAG
   default_args = {
       'owner': 'airflow',
   }
   
   dag = DAG(
       'cloud_batch_submit_example',
       default_args=default_args,
   )
   
   # Job definition
   CLOUD_BATCH_HOOK_PATH = 
"airflow.providers.google.cloud.operators.cloud_batch.CloudBatchHook"
   TASK_ID = "test"
   PROJECT_ID = "testproject"
   REGION = "us-central1"
   JOB_NAME = "test"
   JOB = Job()
   JOB.name = JOB_NAME
   
   
   submit_job_task = CloudBatchSubmitJobOperator(
       task_id=TASK_ID, project_id=PROJECT_ID, region=REGION, 
job_name=JOB_NAME, job=JOB,
       dag=dag, deferrable=False, timeout_seconds=0
   )
   
   submit_job_task
   
   if __name__ == "__main__":
       with (
           
patch('airflow.providers.google.cloud.hooks.cloud_batch.CloudBatchHook.get_conn')
 as mock,
           patch('google.cloud.batch_v1.Job.to_dict') as mock_to_dict
       ):
           mock.return_value.get_job.return_value.status.state = 
JobStatus.State.FAILED #make job fail
           dag.test()
   ```
   log:
   
   ```
   [2024-11-24T17:40:38.338+0000] {dag.py:2474} INFO - dagrun id: 
cloud_batch_submit_example
   [2024-11-24T17:40:38.357+0000] {dag.py:2493} INFO - created dagrun <DagRun 
cloud_batch_submit_example @ 2024-11-24 17:40:37.987067+00:00: 
manual__2024-11-24T17:40:37.987067+00:00, state:running, queued_at: None. 
externally triggered: False>
   [2024-11-24T17:40:38.385+0000] {dag.py:2433} INFO - [DAG TEST] starting 
task_id=test map_index=-1
   [2024-11-24T17:40:38.385+0000] {dag.py:2436} INFO - [DAG TEST] running task 
<TaskInstance: cloud_batch_submit_example.test 
manual__2024-11-24T17:40:37.987067+00:00 [scheduled]>
   [2024-11-24 17:40:39,926] {taskinstance.py:2923} INFO - Exporting env vars: 
AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='cloud_batch_submit_example' 
AIRFLOW_CTX_TASK_ID='test' 
AIRFLOW_CTX_LOGICAL_DATE='2024-11-24T17:40:37.987067+00:00' 
AIRFLOW_CTX_TRY_NUMBER='1' 
AIRFLOW_CTX_DAG_RUN_ID='manual__2024-11-24T17:40:37.987067+00:00'
   [2024-11-24T17:40:39.926+0000] {taskinstance.py:2923} INFO - Exporting env 
vars: AIRFLOW_CTX_DAG_OWNER='airflow' 
AIRFLOW_CTX_DAG_ID='cloud_batch_submit_example' AIRFLOW_CTX_TASK_ID='test' 
AIRFLOW_CTX_LOGICAL_DATE='2024-11-24T17:40:37.987067+00:00' 
AIRFLOW_CTX_TRY_NUMBER='1' 
AIRFLOW_CTX_DAG_RUN_ID='manual__2024-11-24T17:40:37.987067+00:00'
   Task instance is in running state
    Previous state of the Task instance: queued
   Current task name:test state:scheduled start_date:None
   Dag name:cloud_batch_submit_example and current dag run status:running
   [2024-11-24T17:40:39.928+0000] {taskinstance.py:723} INFO - ::endgroup::
   [2024-11-24T17:40:39.949+0000] {connection.py:250} WARNING - Connection 
schemes (type: google_cloud_platform) shall not contain '_' according to 
RFC3986.
   [2024-11-24T17:40:39.949+0000] {base.py:66} INFO - Retrieving connection 
'google_cloud_default'
   [2024-11-24T17:40:39.966+0000] {taskinstance.py:346} INFO - ::group::Post 
task execution logs
   [2024-11-24T17:40:39.966+0000] {taskinstance.py:358} INFO - Marking task as 
SUCCESS. dag_id=cloud_batch_submit_example, task_id=test, 
run_id=manual__2024-11-24T17:40:37.987067+00:00, logical_date=20241124T174037, 
start_date=, end_date=20241124T174039
   Task instance in success state
    Previous state of the Task instance: running
   Dag name:cloud_batch_submit_example queued_at:None
   Task hostname:51c25e8d352b operator:CloudBatchSubmitJobOperator
   [2024-11-24T17:40:39.976+0000] {dag.py:2447} INFO - [DAG TEST] end task 
task_id=test map_index=-1
   [2024-11-24T17:40:39.981+0000] {dagrun.py:935} INFO - Marking run <DagRun 
cloud_batch_submit_example @ 2024-11-24 17:40:37.987067+00:00: 
manual__2024-11-24T17:40:37.987067+00:00, state:running, queued_at: None. 
externally triggered: False> successful
   Dag run in success state
   Dag run start:2024-11-24 17:40:37.987067+00:00 end:2024-11-24 
17:40:39.981712+00:00
   [2024-11-24T17:40:39.983+0000] {dagrun.py:987} INFO - DagRun Finished: 
dag_id=cloud_batch_submit_example, logical_date=2024-11-24 
17:40:37.987067+00:00, run_id=manual__2024-11-24T17:40:37.987067+00:00, 
run_start_date=2024-11-24 17:40:37.987067+00:00, run_end_date=2024-11-24 
17:40:39.981712+00:00, run_duration=1.994645, state=success, 
external_trigger=False, run_type=manual, data_interval_start=2024-11-24 
17:40:37.987067+00:00, data_interval_end=2024-11-24 17:40:37.987067+00:00, 
dag_version_name=None
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to