set92 commented on issue #31147:
URL: https://github.com/apache/airflow/issues/31147#issuecomment-1539787104

   
   **Second time**
   We have 4 DAGs almost identical, they use the same code, and only differs 
the ingested data. And the odd thing is that in the last complete run of each 
DAG, the error has only happened twice in one of them. Although we are 
uncertain if it could have happened in other partial runs since we don't have a 
check for it. In the complete run we noticed it because they were runs from 
zero, so the tables didn't exist, and when the step after the error tried to 
get the data said that the table was empty.
   
   But well, the second time happened in a similar situation, calculate some 
areas, then aggregate them to different sources, and finally getting the latest 
of them. My hypothesis was that could be related to the run time of the task 
but in this case the error happened around 30 and 40 minutes, so probably is 
not the case.
   
   So, the bug happened while aggregating, same as before, the log of it is:
   ```
   *** Reading remote log from 
s3://.../dag_id=dag_name/run_id=manual__2023-05-03T14:36:11+00:00/task_id=task_2_name.insert_into_prod.insert_data_and_update_valid_dates/attempt=1.log.
   [2023-05-06, 06:09:16 UTC] {taskinstance.py:1165} INFO - Dependencies all 
met for <TaskInstance: 
dag_name.task_2_name.insert_into_prod.insert_data_and_update_valid_dates 
manual__2023-05-03T14:36:11+00:00 [queued]>
   [2023-05-06, 06:09:16 UTC] {taskinstance.py:1165} INFO - Dependencies all 
met for <TaskInstance: 
dag_name.task_2_name.insert_into_prod.insert_data_and_update_valid_dates 
manual__2023-05-03T14:36:11+00:00 [queued]>
   [2023-05-06, 06:09:16 UTC] {taskinstance.py:1362} INFO - 
   
--------------------------------------------------------------------------------
   [2023-05-06, 06:09:16 UTC] {taskinstance.py:1363} INFO - Starting attempt 1 
of 1
   [2023-05-06, 06:09:16 UTC] {taskinstance.py:1364} INFO - 
   
--------------------------------------------------------------------------------
   [2023-05-06, 06:09:16 UTC] {taskinstance.py:1383} INFO - Executing 
<Task(BigQueryInsertParametrizedJobOperator): 
task_2_name.insert_into_prod.insert_data_and_update_valid_dates> on 2023-05-03 
14:36:11+00:00
   [2023-05-06, 06:09:16 UTC] {standard_task_runner.py:55} INFO - Started 
process 23 to run task
   [2023-05-06, 06:09:16 UTC] {standard_task_runner.py:82} INFO - Running: 
['***', 'tasks', 'run', 'dag_name', 
'task_2_name.insert_into_prod.insert_data_and_update_valid_dates', 
'manual__2023-05-03T14:36:11+00:00', '--job-id', '87667', '--raw', '--subdir', 
'DAGS_FOLDER/master_dag_factory_dag_name.py', '--cfg-path', '/tmp/tmpauj2z6j9']
   [2023-05-06, 06:09:16 UTC] {standard_task_runner.py:83} INFO - Job 87667: 
Subtask task_2_name.insert_into_prod.insert_data_and_update_valid_dates
   [2023-05-06, 06:09:16 UTC] {task_command.py:376} INFO - Running 
<TaskInstance: 
dag_name.task_2_name.insert_into_prod.insert_data_and_update_valid_dates 
manual__2023-05-03T14:36:11+00:00 [running]> on host 
pod-179162d8c29a4248afac2116a5114436
   [2023-05-06, 06:09:16 UTC] {taskinstance.py:1590} INFO - Exporting the 
following env vars:
   AIRFLOW_CTX_DAG_ID=dag_name
   
AIRFLOW_CTX_TASK_ID=task_2_name.insert_into_prod.insert_data_and_update_valid_dates
   AIRFLOW_CTX_EXECUTION_DATE=2023-05-03T14:36:11+00:00
   AIRFLOW_CTX_TRY_NUMBER=1
   AIRFLOW_CTX_DAG_RUN_ID=manual__2023-05-03T14:36:11+00:00
   [2023-05-06, 06:09:16 UTC] {base.py:71} INFO - Using connection ID 
'default_conn' for task execution.
   [2023-05-06, 06:09:17 UTC] {bigquery.py:1554} INFO - Inserting job 
***_transaction_dag_name_task_2_name_insert_into_prod_generate_job_id_1683124743200_1_33b432ee7e08c19795cea72799e8ab23
   [2023-05-06, 06:33:11 UTC] {taskinstance.py:1401} INFO - Marking task as 
SUCCESS. dag_id=dag_name, 
task_id=task_2_name.insert_into_prod.insert_data_and_update_valid_dates, 
execution_date=20230503T143611, start_date=20230506T060916, 
end_date=20230506T063311
   [2023-05-06, 06:33:13 UTC] {local_task_job.py:164} INFO - Task exited with 
return code 0
   ```
   
   (Don't think is important, but just in case) And in the next task group it 
failed with the following log:
   ```
   *** Reading remote log from 
s3://.../dag_id=dag_name/run_id=manual__2023-05-03T14:36:11+00:00/task_id=task_3_name.check_schema_and_input_exist_and_transfo_version/attempt=1.log.
   [2023-05-06, 06:35:38 UTC] {taskinstance.py:1165} INFO - Dependencies all 
met for <TaskInstance: 
dag_name.task_3_name.check_schema_and_input_exist_and_transfo_version 
manual__2023-05-03T14:36:11+00:00 [queued]>
   [2023-05-06, 06:35:39 UTC] {taskinstance.py:1165} INFO - Dependencies all 
met for <TaskInstance: 
dag_name.task_3_name.check_schema_and_input_exist_and_transfo_version 
manual__2023-05-03T14:36:11+00:00 [queued]>
   [2023-05-06, 06:35:39 UTC] {taskinstance.py:1362} INFO - 
   
--------------------------------------------------------------------------------
   [2023-05-06, 06:35:39 UTC] {taskinstance.py:1363} INFO - Starting attempt 1 
of 1
   [2023-05-06, 06:35:39 UTC] {taskinstance.py:1364} INFO - 
   
--------------------------------------------------------------------------------
   [2023-05-06, 06:35:39 UTC] {taskinstance.py:1383} INFO - Executing 
<Task(BranchPythonOperator): 
task_3_name.check_schema_and_input_exist_and_transfo_version> on 2023-05-03 
14:36:11+00:00
   [2023-05-06, 06:35:39 UTC] {standard_task_runner.py:55} INFO - Started 
process 23 to run task
   [2023-05-06, 06:35:39 UTC] {standard_task_runner.py:82} INFO - Running: 
['***', 'tasks', 'run', 'dag_name', 
'task_3_name.check_schema_and_input_exist_and_transfo_version', 
'manual__2023-05-03T14:36:11+00:00', '--job-id', '87710', '--raw', '--subdir', 
'DAGS_FOLDER/master_dag_factory_dag_name.py', '--cfg-path', '/tmp/tmpsjrb6rp_']
   [2023-05-06, 06:35:39 UTC] {standard_task_runner.py:83} INFO - Job 87710: 
Subtask task_3_name.check_schema_and_input_exist_and_transfo_version
   [2023-05-06, 06:35:39 UTC] {task_command.py:376} INFO - Running 
<TaskInstance: 
dag_name.task_3_name.check_schema_and_input_exist_and_transfo_version 
manual__2023-05-03T14:36:11+00:00 [running]> on host 
pod-b0e87574e38c49f0aa5a6fc402d02626
   [2023-05-06, 06:35:39 UTC] {taskinstance.py:1590} INFO - Exporting the 
following env vars:
   AIRFLOW_CTX_DAG_ID=dag_name
   
AIRFLOW_CTX_TASK_ID=task_3_name.check_schema_and_input_exist_and_transfo_version
   AIRFLOW_CTX_EXECUTION_DATE=2023-05-03T14:36:11+00:00
   AIRFLOW_CTX_TRY_NUMBER=1
   AIRFLOW_CTX_DAG_RUN_ID=manual__2023-05-03T14:36:11+00:00
   [2023-05-06, 06:35:39 UTC] {base.py:71} INFO - Using connection ID 
'default_conn' for task execution.
   [2023-05-06, 06:35:39 UTC] {logging_mixin.py:120} WARNING - 
/opt/company_name/project_name/.venv/lib/python3.9/site-packages/***/providers/google/cloud/hooks/bigquery.py:131
 DeprecationWarning: This method will be deprecated. Please use 
`BigQueryHook.get_client` method
   [2023-05-06, 06:35:40 UTC] {logging_mixin.py:120} WARNING - 
/opt/company_name/project_name/.venv/lib/python3.9/site-packages/***/providers/google/cloud/hooks/bigquery.py:2057
 DeprecationWarning: This method is deprecated. Please use 
`BigQueryHook.insert_job` method.
   [2023-05-06, 06:35:40 UTC] {bigquery.py:1554} INFO - Inserting job 
***_1683354940069838_0d68b9a31335d5bf94fdaa02926ff943
   [2023-05-06, 06:35:41 UTC] {taskinstance.py:1851} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File 
"/opt/company_name/project_name/.venv/lib/python3.9/site-packages/airflow/operators/python.py",
 line 211, in execute
       branch = super().execute(context)
     File 
"/opt/company_name/project_name/.venv/lib/python3.9/site-packages/airflow/operators/python.py",
 line 175, in execute
       return_value = self.execute_callable()
     File 
"/opt/company_name/project_name/.venv/lib/python3.9/site-packages/airflow/operators/python.py",
 line 193, in execute_callable
       return self.python_callable(*self.op_args, **self.op_kwargs)
     File 
"/opt/company_name/project_name/project_name/transformations/bq_run_query/actions.py",
 line 95, in branch_input_exist_and_more_recent
       is_new_data = 
bq_utils.check_input_exist_and_more_recent(input_tables=input_tables, 
output_table=output_table, bq_environment=bq_environment)
     File "/opt/company_name/project_name/project_name/externals/bq_utils.py", 
line 285, in check_input_exist_and_more_recent
       raise EmptyBqTable(f"All tables {input_tables} are empty")
   project_name.exceptions.EmptyBqTable: All tables [...] are empty
   [2023-05-06, 06:35:41 UTC] {taskinstance.py:1401} INFO - Marking task as 
FAILED. dag_id=dag_name, 
task_id=task_3_name.check_schema_and_input_exist_and_transfo_version, 
execution_date=20230503T143611, start_date=20230506T063538, 
end_date=20230506T063541
   ```
   
   And the same, in Bigquery the end timestamp is later than what Airflow logs 
says has finished.
   **Bigquery Job information**
   ```
   Job ID -- XXX
   User -- XXX
   Location -- EU
   Creation time -- May 6, 2023, 8:09:17 AM UTC+2
   Start time -- May 6, 2023, 8:09:17 AM UTC+2
   End time -- May 6, 2023, 8:48:53 AM UTC+2
   Duration -- 39 min 36 sec
   Bytes processed -- 648.57 GB
   Bytes billed -- 648.57 GB
   Job priority -- INTERACTIVE
   Use legacy SQL -- false
   ```
   
   A coworker of mine was checking the Google logs to see if he could find 
something else, but he only saw that the last `method: 
jobservice.getqueryresults` was at the same time than Airflow marked the task 
as success.
   log:
   ```
   {
     "protoPayload": {
       "@type": "type.googleapis.com/google.cloud.audit.AuditLog",
       "status": {},
       "authenticationInfo": {
         "principalEmail": "account_name"
       },
       "requestMetadata": {
         "callerIp": "3.248.171.10",
         "callerSuppliedUserAgent": "gl-python/3.9.16 grpc/1.49.1 gax/1.33.2 
gapic/2.34.4 gccl/2.34.4,gzip(gfe)",
         "requestAttributes": {},
         "destinationAttributes": {}
       },
       "serviceName": "bigquery.googleapis.com",
       "methodName": "jobservice.getqueryresults",
       "authorizationInfo": [
         {
           "resource": "projects/project_id_name",
           "permission": "bigquery.jobs.create",
           "granted": true,
           "resourceAttributes": {}
         }
       ],
       "resourceName": 
"projects/project_id_name/queries/airflow_transaction_dag_name_task_2_name_insert_into_prod_generate_job_id_1683124743200_1_33b432ee7e08c19795cea72799e8ab23",
       "serviceData": {
         "@type": 
"type.googleapis.com/google.cloud.bigquery.logging.v1.AuditData",
         "jobGetQueryResultsRequest": {},
         "jobGetQueryResultsResponse": {
           "job": {
             "jobName": {
               "projectId": "project_id_name",
               "jobId": 
"airflow_transaction_dag_name_task_2_name_insert_into_prod_generate_job_id_1683124743200_1_33b432ee7e08c19795cea72799e8ab23",
               "location": "EU"
             },
             "jobConfiguration": {
               "query": {
                 "query": " XXX ",
                 "destinationTable": {},
                 "createDisposition": "CREATE_IF_NEEDED",
                 "writeDisposition": "WRITE_EMPTY",
                 "defaultDataset": {},
                 "queryPriority": "QUERY_INTERACTIVE",
                 "statementType": "SCRIPT"
               }
             },
             "jobStatus": {
               "state": "RUNNING",
               "error": {}
             },
             "jobStatistics": {
               "createTime": "2023-05-06T06:09:17.329Z",
               "startTime": "2023-05-06T06:09:17.445Z"
             }
           }
         }
       }
     },
     "insertId": "2s0ay2d16p2",
     "resource": {
       "type": "bigquery_resource",
       "labels": {
         "project_id": "project_id_name"
       }
     },
     "timestamp": "2023-05-06T06:33:11.276494Z",
     "severity": "INFO",
     "logName": 
"projects/project_id_name/logs/cloudaudit.googleapis.com%2Fdata_access",
     "receiveTimestamp": "2023-05-06T06:33:11.416461299Z"
   }
   ```
   
   
   And the next log I think is Bigquery moving the calculate data to the table, 
but then why in this specific case Airflow didn't wait for this log? Because I 
checked other tasks that didn't return errors, and they had the same logs.
   ```
   {
     "protoPayload": {
       "@type": "type.googleapis.com/google.cloud.audit.AuditLog",
       "status": {},
       "authenticationInfo": {
         "principalEmail": "account_name"
       },
       "requestMetadata": {
         "callerIp": "3.248.171.10",
         "callerSuppliedUserAgent": "gl-python/3.9.16 grpc/1.49.1 gax/1.33.2 
gapic/2.34.4 gccl/2.34.4,gzip(gfe)"
       },
       "serviceName": "bigquery.googleapis.com",
       "methodName": "google.cloud.bigquery.v2.JobService.InsertJob",
       "authorizationInfo": [
         {
           "resource": "projects/project_id_name",
           "permission": "bigquery.jobs.create",
           "granted": true
         }
       ],
       "resourceName": 
"projects/project_id_name/jobs/script_job_9a0f77016c0849a88e43481e139440f6_1",
       "metadata": {
         "jobChange": {
           "after": "DONE",
           "job": {
             "jobName": 
"projects/project_id_name/jobs/script_job_9a0f77016c0849a88e43481e139440f6_1",
             "jobConfig": {
               "type": "QUERY",
               "queryConfig": {
                 "query": "XXX",
                 "destinationTable": 
"projects/project_name/datasets/dataset/tables/table_name",
                 "createDisposition": "CREATE_IF_NEEDED",
                 "writeDisposition": "WRITE_EMPTY",
                 "priority": "QUERY_INTERACTIVE",
                 "statementType": "INSERT"
               }
             },
             "jobStatus": {
               "jobState": "DONE"
             },
             "jobStats": {
               "createTime": "2023-05-06T06:09:17.925Z",
               "startTime": "2023-05-06T06:09:18.092Z",
               "endTime": "2023-05-06T06:48:50.824Z",
               "queryStats": {
                 "totalProcessedBytes": "695081952951",
                 "totalBilledBytes": "695082156032",
                 "billingTier": 1,
                 "referencedTables": [
                   "projects/project_name/datasets/dataset/tables/table_name1",
                   "projects/project_name/datasets/dataset/tables/table_name2",
                   "projects/project_name/datasets/dataset/tables/table_name3",
                   "projects/project_name/datasets/dataset/tables/table_name4",
                 ],
                 "outputRowCount": "82170898"
               },
               "totalSlotMs": "1461958311",
               "parentJobName": 
"projects/project_id_name/jobs/airflow_transaction_dag_name_task_2_name_insert_into_prod_generate_job_id_1683124743200_1_33b432ee7e08c19795cea72799e8ab23"
             }
           }
         },
         "@type": "type.googleapis.com/google.cloud.audit.BigQueryAuditMetadata"
       }
     },
     "insertId": "bnrb0od7eea",
     "resource": {
       "type": "bigquery_project",
       "labels": {
         "project_id": "project_id_name",
         "location": "EU"
       }
     },
     "timestamp": "2023-05-06T06:48:50.876369Z",
     "severity": "INFO",
     "logName": 
"projects/project_id_name/logs/cloudaudit.googleapis.com%2Fdata_access",
     "operation": {
       "id": 
"1683353357925-project_id_name:script_job_9a0f77016c0849a88e43481e139440f6_1",
       "producer": "bigquery.googleapis.com",
       "last": true
     },
     "receiveTimestamp": "2023-05-06T06:48:51.692700328Z"
   }
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to