set92 commented on issue #31147:
URL: https://github.com/apache/airflow/issues/31147#issuecomment-1539787104
**Second time**
We have 4 DAGs almost identical, they use the same code, and only differs
the ingested data. And the odd thing is that in the last complete run of each
DAG, the error has only happened twice in one of them. Although we are
uncertain if it could have happened in other partial runs since we don't have a
check for it. In the complete run we noticed it because they were runs from
zero, so the tables didn't exist, and when the step after the error tried to
get the data said that the table was empty.
But well, the second time happened in a similar situation, calculate some
areas, then aggregate them to different sources, and finally getting the latest
of them. My hypothesis was that could be related to the run time of the task
but in this case the error happened around 30 and 40 minutes, so probably is
not the case.
So, the bug happened while aggregating, same as before, the log of it is:
```
*** Reading remote log from
s3://.../dag_id=dag_name/run_id=manual__2023-05-03T14:36:11+00:00/task_id=task_2_name.insert_into_prod.insert_data_and_update_valid_dates/attempt=1.log.
[2023-05-06, 06:09:16 UTC] {taskinstance.py:1165} INFO - Dependencies all
met for <TaskInstance:
dag_name.task_2_name.insert_into_prod.insert_data_and_update_valid_dates
manual__2023-05-03T14:36:11+00:00 [queued]>
[2023-05-06, 06:09:16 UTC] {taskinstance.py:1165} INFO - Dependencies all
met for <TaskInstance:
dag_name.task_2_name.insert_into_prod.insert_data_and_update_valid_dates
manual__2023-05-03T14:36:11+00:00 [queued]>
[2023-05-06, 06:09:16 UTC] {taskinstance.py:1362} INFO -
--------------------------------------------------------------------------------
[2023-05-06, 06:09:16 UTC] {taskinstance.py:1363} INFO - Starting attempt 1
of 1
[2023-05-06, 06:09:16 UTC] {taskinstance.py:1364} INFO -
--------------------------------------------------------------------------------
[2023-05-06, 06:09:16 UTC] {taskinstance.py:1383} INFO - Executing
<Task(BigQueryInsertParametrizedJobOperator):
task_2_name.insert_into_prod.insert_data_and_update_valid_dates> on 2023-05-03
14:36:11+00:00
[2023-05-06, 06:09:16 UTC] {standard_task_runner.py:55} INFO - Started
process 23 to run task
[2023-05-06, 06:09:16 UTC] {standard_task_runner.py:82} INFO - Running:
['***', 'tasks', 'run', 'dag_name',
'task_2_name.insert_into_prod.insert_data_and_update_valid_dates',
'manual__2023-05-03T14:36:11+00:00', '--job-id', '87667', '--raw', '--subdir',
'DAGS_FOLDER/master_dag_factory_dag_name.py', '--cfg-path', '/tmp/tmpauj2z6j9']
[2023-05-06, 06:09:16 UTC] {standard_task_runner.py:83} INFO - Job 87667:
Subtask task_2_name.insert_into_prod.insert_data_and_update_valid_dates
[2023-05-06, 06:09:16 UTC] {task_command.py:376} INFO - Running
<TaskInstance:
dag_name.task_2_name.insert_into_prod.insert_data_and_update_valid_dates
manual__2023-05-03T14:36:11+00:00 [running]> on host
pod-179162d8c29a4248afac2116a5114436
[2023-05-06, 06:09:16 UTC] {taskinstance.py:1590} INFO - Exporting the
following env vars:
AIRFLOW_CTX_DAG_ID=dag_name
AIRFLOW_CTX_TASK_ID=task_2_name.insert_into_prod.insert_data_and_update_valid_dates
AIRFLOW_CTX_EXECUTION_DATE=2023-05-03T14:36:11+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2023-05-03T14:36:11+00:00
[2023-05-06, 06:09:16 UTC] {base.py:71} INFO - Using connection ID
'default_conn' for task execution.
[2023-05-06, 06:09:17 UTC] {bigquery.py:1554} INFO - Inserting job
***_transaction_dag_name_task_2_name_insert_into_prod_generate_job_id_1683124743200_1_33b432ee7e08c19795cea72799e8ab23
[2023-05-06, 06:33:11 UTC] {taskinstance.py:1401} INFO - Marking task as
SUCCESS. dag_id=dag_name,
task_id=task_2_name.insert_into_prod.insert_data_and_update_valid_dates,
execution_date=20230503T143611, start_date=20230506T060916,
end_date=20230506T063311
[2023-05-06, 06:33:13 UTC] {local_task_job.py:164} INFO - Task exited with
return code 0
```
(Don't think is important, but just in case) And in the next task group it
failed with the following log:
```
*** Reading remote log from
s3://.../dag_id=dag_name/run_id=manual__2023-05-03T14:36:11+00:00/task_id=task_3_name.check_schema_and_input_exist_and_transfo_version/attempt=1.log.
[2023-05-06, 06:35:38 UTC] {taskinstance.py:1165} INFO - Dependencies all
met for <TaskInstance:
dag_name.task_3_name.check_schema_and_input_exist_and_transfo_version
manual__2023-05-03T14:36:11+00:00 [queued]>
[2023-05-06, 06:35:39 UTC] {taskinstance.py:1165} INFO - Dependencies all
met for <TaskInstance:
dag_name.task_3_name.check_schema_and_input_exist_and_transfo_version
manual__2023-05-03T14:36:11+00:00 [queued]>
[2023-05-06, 06:35:39 UTC] {taskinstance.py:1362} INFO -
--------------------------------------------------------------------------------
[2023-05-06, 06:35:39 UTC] {taskinstance.py:1363} INFO - Starting attempt 1
of 1
[2023-05-06, 06:35:39 UTC] {taskinstance.py:1364} INFO -
--------------------------------------------------------------------------------
[2023-05-06, 06:35:39 UTC] {taskinstance.py:1383} INFO - Executing
<Task(BranchPythonOperator):
task_3_name.check_schema_and_input_exist_and_transfo_version> on 2023-05-03
14:36:11+00:00
[2023-05-06, 06:35:39 UTC] {standard_task_runner.py:55} INFO - Started
process 23 to run task
[2023-05-06, 06:35:39 UTC] {standard_task_runner.py:82} INFO - Running:
['***', 'tasks', 'run', 'dag_name',
'task_3_name.check_schema_and_input_exist_and_transfo_version',
'manual__2023-05-03T14:36:11+00:00', '--job-id', '87710', '--raw', '--subdir',
'DAGS_FOLDER/master_dag_factory_dag_name.py', '--cfg-path', '/tmp/tmpsjrb6rp_']
[2023-05-06, 06:35:39 UTC] {standard_task_runner.py:83} INFO - Job 87710:
Subtask task_3_name.check_schema_and_input_exist_and_transfo_version
[2023-05-06, 06:35:39 UTC] {task_command.py:376} INFO - Running
<TaskInstance:
dag_name.task_3_name.check_schema_and_input_exist_and_transfo_version
manual__2023-05-03T14:36:11+00:00 [running]> on host
pod-b0e87574e38c49f0aa5a6fc402d02626
[2023-05-06, 06:35:39 UTC] {taskinstance.py:1590} INFO - Exporting the
following env vars:
AIRFLOW_CTX_DAG_ID=dag_name
AIRFLOW_CTX_TASK_ID=task_3_name.check_schema_and_input_exist_and_transfo_version
AIRFLOW_CTX_EXECUTION_DATE=2023-05-03T14:36:11+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2023-05-03T14:36:11+00:00
[2023-05-06, 06:35:39 UTC] {base.py:71} INFO - Using connection ID
'default_conn' for task execution.
[2023-05-06, 06:35:39 UTC] {logging_mixin.py:120} WARNING -
/opt/company_name/project_name/.venv/lib/python3.9/site-packages/***/providers/google/cloud/hooks/bigquery.py:131
DeprecationWarning: This method will be deprecated. Please use
`BigQueryHook.get_client` method
[2023-05-06, 06:35:40 UTC] {logging_mixin.py:120} WARNING -
/opt/company_name/project_name/.venv/lib/python3.9/site-packages/***/providers/google/cloud/hooks/bigquery.py:2057
DeprecationWarning: This method is deprecated. Please use
`BigQueryHook.insert_job` method.
[2023-05-06, 06:35:40 UTC] {bigquery.py:1554} INFO - Inserting job
***_1683354940069838_0d68b9a31335d5bf94fdaa02926ff943
[2023-05-06, 06:35:41 UTC] {taskinstance.py:1851} ERROR - Task failed with
exception
Traceback (most recent call last):
File
"/opt/company_name/project_name/.venv/lib/python3.9/site-packages/airflow/operators/python.py",
line 211, in execute
branch = super().execute(context)
File
"/opt/company_name/project_name/.venv/lib/python3.9/site-packages/airflow/operators/python.py",
line 175, in execute
return_value = self.execute_callable()
File
"/opt/company_name/project_name/.venv/lib/python3.9/site-packages/airflow/operators/python.py",
line 193, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File
"/opt/company_name/project_name/project_name/transformations/bq_run_query/actions.py",
line 95, in branch_input_exist_and_more_recent
is_new_data =
bq_utils.check_input_exist_and_more_recent(input_tables=input_tables,
output_table=output_table, bq_environment=bq_environment)
File "/opt/company_name/project_name/project_name/externals/bq_utils.py",
line 285, in check_input_exist_and_more_recent
raise EmptyBqTable(f"All tables {input_tables} are empty")
project_name.exceptions.EmptyBqTable: All tables [...] are empty
[2023-05-06, 06:35:41 UTC] {taskinstance.py:1401} INFO - Marking task as
FAILED. dag_id=dag_name,
task_id=task_3_name.check_schema_and_input_exist_and_transfo_version,
execution_date=20230503T143611, start_date=20230506T063538,
end_date=20230506T063541
```
And the same, in Bigquery the end timestamp is later than what Airflow logs
says has finished.
**Bigquery Job information**
```
Job ID -- XXX
User -- XXX
Location -- EU
Creation time -- May 6, 2023, 8:09:17 AM UTC+2
Start time -- May 6, 2023, 8:09:17 AM UTC+2
End time -- May 6, 2023, 8:48:53 AM UTC+2
Duration -- 39 min 36 sec
Bytes processed -- 648.57 GB
Bytes billed -- 648.57 GB
Job priority -- INTERACTIVE
Use legacy SQL -- false
```
A coworker of mine was checking the Google logs to see if he could find
something else, but he only saw that the last `method:
jobservice.getqueryresults` was at the same time than Airflow marked the task
as success.
log:
```
{
"protoPayload": {
"@type": "type.googleapis.com/google.cloud.audit.AuditLog",
"status": {},
"authenticationInfo": {
"principalEmail": "account_name"
},
"requestMetadata": {
"callerIp": "3.248.171.10",
"callerSuppliedUserAgent": "gl-python/3.9.16 grpc/1.49.1 gax/1.33.2
gapic/2.34.4 gccl/2.34.4,gzip(gfe)",
"requestAttributes": {},
"destinationAttributes": {}
},
"serviceName": "bigquery.googleapis.com",
"methodName": "jobservice.getqueryresults",
"authorizationInfo": [
{
"resource": "projects/project_id_name",
"permission": "bigquery.jobs.create",
"granted": true,
"resourceAttributes": {}
}
],
"resourceName":
"projects/project_id_name/queries/airflow_transaction_dag_name_task_2_name_insert_into_prod_generate_job_id_1683124743200_1_33b432ee7e08c19795cea72799e8ab23",
"serviceData": {
"@type":
"type.googleapis.com/google.cloud.bigquery.logging.v1.AuditData",
"jobGetQueryResultsRequest": {},
"jobGetQueryResultsResponse": {
"job": {
"jobName": {
"projectId": "project_id_name",
"jobId":
"airflow_transaction_dag_name_task_2_name_insert_into_prod_generate_job_id_1683124743200_1_33b432ee7e08c19795cea72799e8ab23",
"location": "EU"
},
"jobConfiguration": {
"query": {
"query": " XXX ",
"destinationTable": {},
"createDisposition": "CREATE_IF_NEEDED",
"writeDisposition": "WRITE_EMPTY",
"defaultDataset": {},
"queryPriority": "QUERY_INTERACTIVE",
"statementType": "SCRIPT"
}
},
"jobStatus": {
"state": "RUNNING",
"error": {}
},
"jobStatistics": {
"createTime": "2023-05-06T06:09:17.329Z",
"startTime": "2023-05-06T06:09:17.445Z"
}
}
}
}
},
"insertId": "2s0ay2d16p2",
"resource": {
"type": "bigquery_resource",
"labels": {
"project_id": "project_id_name"
}
},
"timestamp": "2023-05-06T06:33:11.276494Z",
"severity": "INFO",
"logName":
"projects/project_id_name/logs/cloudaudit.googleapis.com%2Fdata_access",
"receiveTimestamp": "2023-05-06T06:33:11.416461299Z"
}
```
And the next log I think is Bigquery moving the calculate data to the table,
but then why in this specific case Airflow didn't wait for this log? Because I
checked other tasks that didn't return errors, and they had the same logs.
```
{
"protoPayload": {
"@type": "type.googleapis.com/google.cloud.audit.AuditLog",
"status": {},
"authenticationInfo": {
"principalEmail": "account_name"
},
"requestMetadata": {
"callerIp": "3.248.171.10",
"callerSuppliedUserAgent": "gl-python/3.9.16 grpc/1.49.1 gax/1.33.2
gapic/2.34.4 gccl/2.34.4,gzip(gfe)"
},
"serviceName": "bigquery.googleapis.com",
"methodName": "google.cloud.bigquery.v2.JobService.InsertJob",
"authorizationInfo": [
{
"resource": "projects/project_id_name",
"permission": "bigquery.jobs.create",
"granted": true
}
],
"resourceName":
"projects/project_id_name/jobs/script_job_9a0f77016c0849a88e43481e139440f6_1",
"metadata": {
"jobChange": {
"after": "DONE",
"job": {
"jobName":
"projects/project_id_name/jobs/script_job_9a0f77016c0849a88e43481e139440f6_1",
"jobConfig": {
"type": "QUERY",
"queryConfig": {
"query": "XXX",
"destinationTable":
"projects/project_name/datasets/dataset/tables/table_name",
"createDisposition": "CREATE_IF_NEEDED",
"writeDisposition": "WRITE_EMPTY",
"priority": "QUERY_INTERACTIVE",
"statementType": "INSERT"
}
},
"jobStatus": {
"jobState": "DONE"
},
"jobStats": {
"createTime": "2023-05-06T06:09:17.925Z",
"startTime": "2023-05-06T06:09:18.092Z",
"endTime": "2023-05-06T06:48:50.824Z",
"queryStats": {
"totalProcessedBytes": "695081952951",
"totalBilledBytes": "695082156032",
"billingTier": 1,
"referencedTables": [
"projects/project_name/datasets/dataset/tables/table_name1",
"projects/project_name/datasets/dataset/tables/table_name2",
"projects/project_name/datasets/dataset/tables/table_name3",
"projects/project_name/datasets/dataset/tables/table_name4",
],
"outputRowCount": "82170898"
},
"totalSlotMs": "1461958311",
"parentJobName":
"projects/project_id_name/jobs/airflow_transaction_dag_name_task_2_name_insert_into_prod_generate_job_id_1683124743200_1_33b432ee7e08c19795cea72799e8ab23"
}
}
},
"@type": "type.googleapis.com/google.cloud.audit.BigQueryAuditMetadata"
}
},
"insertId": "bnrb0od7eea",
"resource": {
"type": "bigquery_project",
"labels": {
"project_id": "project_id_name",
"location": "EU"
}
},
"timestamp": "2023-05-06T06:48:50.876369Z",
"severity": "INFO",
"logName":
"projects/project_id_name/logs/cloudaudit.googleapis.com%2Fdata_access",
"operation": {
"id":
"1683353357925-project_id_name:script_job_9a0f77016c0849a88e43481e139440f6_1",
"producer": "bigquery.googleapis.com",
"last": true
},
"receiveTimestamp": "2023-05-06T06:48:51.692700328Z"
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]