set92 opened a new issue, #31147:
URL: https://github.com/apache/airflow/issues/31147
### Apache Airflow version
Other Airflow 2 version (please specify below)
### What happened
It is true that the apache-airflow-providers-google version is 6.8.0 (apr
2022), and Airflow 2.4.2, so maybe updating the packages could fix the error.
The error happened with a long query (~ 3h), in which the task marked as
successfully before the task really finished. In the logs we can see both
started almost at the same time, but airflow thought it was already finished at
7:17 UTC, but Bigquery didn't really finish until 8:05 UTC. The query was the
first step of a transactional query, in which it calculates some st_intersetcs
and cross join between 2 tables.
The **log from the task** in Airflow:
```
*** Reading remote log from
s3://.../dag_id=dag_name/run_id=manual__2023-05-03T14:36:11+00:00/task_id=task_1_name.insert_into_prod.insert_data_and_update_valid_dates/attempt=1.log.
[2023-05-04, 05:15:39 UTC] {taskinstance.py:1165} INFO - Dependencies all
met for <TaskInstance:
dag_name.task_1_name.insert_into_prod.insert_data_and_update_valid_dates
manual__2023-05-03T14:36:11+00:00 [queued]>
[2023-05-04, 05:15:40 UTC] {taskinstance.py:1165} INFO - Dependencies all
met for <TaskInstance:
dag_name.task_1_name.insert_into_prod.insert_data_and_update_valid_dates
manual__2023-05-03T14:36:11+00:00 [queued]>
[2023-05-04, 05:15:40 UTC] {taskinstance.py:1362} INFO -
--------------------------------------------------------------------------------
[2023-05-04, 05:15:40 UTC] {taskinstance.py:1363} INFO - Starting attempt 1
of 1
[2023-05-04, 05:15:40 UTC] {taskinstance.py:1364} INFO -
--------------------------------------------------------------------------------
[2023-05-04, 05:15:40 UTC] {taskinstance.py:1383} INFO - Executing
<Task(BigQueryInsertParametrizedJobOperator):
task_1_name.insert_into_prod.insert_data_and_update_valid_dates> on 2023-05-03
14:36:11+00:00
[2023-05-04, 05:15:40 UTC] {standard_task_runner.py:55} INFO - Started
process 23 to run task
[2023-05-04, 05:15:40 UTC] {standard_task_runner.py:82} INFO - Running:
['***', 'tasks', 'run', 'dag_name',
'task_1_name.insert_into_prod.insert_data_and_update_valid_dates',
'manual__2023-05-03T14:36:11+00:00', '--job-id', '78337', '--raw', '--subdir',
'DAGS_FOLDER/master_dag_factory_dag_name.py', '--cfg-path', '/tmp/tmppm9tmkfj']
[2023-05-04, 05:15:40 UTC] {standard_task_runner.py:83} INFO - Job 78337:
Subtask task_1_name.insert_into_prod.insert_data_and_update_valid_dates
[2023-05-04, 05:15:40 UTC] {task_command.py:376} INFO - Running
<TaskInstance:
dag_name.task_1_name.insert_into_prod.insert_data_and_update_valid_dates
manual__2023-05-03T14:36:11+00:00 [running]> on host
pod-c8ddc6bb711e4d1ab5f1a9feb9a16501
[2023-05-04, 05:15:40 UTC] {taskinstance.py:1590} INFO - Exporting the
following env vars:
AIRFLOW_CTX_DAG_ID=dag_name
AIRFLOW_CTX_TASK_ID=task_1_name.insert_into_prod.insert_data_and_update_valid_dates
AIRFLOW_CTX_EXECUTION_DATE=2023-05-03T14:36:11+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2023-05-03T14:36:11+00:00
[2023-05-04, 05:15:42 UTC] {base.py:71} INFO - Using connection ID
'default_conn' for task execution.
[2023-05-04, 05:15:42 UTC] {bigquery.py:1554} INFO - Inserting job
***_transaction_dag_name_task_1_name_insert_into_prod_generate_job_id_1683124743200_1_472eb3e9b7aabcbdab449fbf7ca0ca0b
[2023-05-04, 07:16:41 UTC] {taskinstance.py:1401} INFO - Marking task as
SUCCESS. dag_id=dag_name,
task_id=task_1_name.insert_into_prod.insert_data_and_update_valid_dates,
execution_date=20230503T143611, start_date=20230504T051539,
end_date=20230504T071641
[2023-05-04, 07:16:44 UTC] {local_task_job.py:164} INFO - Task exited with
return code 0
```
**Bigquery Job information**
```
Job ID -- XXX
User -- XXX
Location -- EU
Creation time -- May 4, 2023, 7:15:44 AM UTC+2
Start time -- May 4, 2023, 7:15:45 AM UTC+2
End time -- May 4, 2023, 10:05:20 AM UTC+2
Duration -- 2 hr 49 min
Bytes processed -- 236.05 GB
Bytes billed -- 236.05 GB
Job priority -- INTERACTIVE
Use legacy SQL -- false
Destination table -- XXX
```
The code used to run the operator:
```python
config = {
"query": {
"query": transaction_query,
"useLegacySql": BigQueryToProdValues.USE_LEGACY_SQL,
"allowLargeResults":
BigQueryToProdValues.ALLOW_LARGE_RESULTS,
"location": "eu",
}
}
if self.query_params:
config["query"]["queryParameters"] = self.query_params
insert_data = BigQueryInsertParametrizedJobOperator(
task_id=CommonTaskIds.INSERT_DATA_AND_UPDATE_VALID_DATES,
task_group=self.task_group,
gcp_conn_id=self.gcp_conn_id,
force_rerun=False,
reattach_states={"PENDING", "RUNNING", "DONE"},
job_id=kwargs.pop("job_id"),
configuration=config,
pool=self.pool,
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
**kwargs,
)
return insert_data, insert_data
class BigQueryInsertParametrizedJobOperator(BigQueryInsertJobOperator):
def execute(self, context):
query_params = self.configuration.get("query",
{}).get("queryParameters", None)
if query_params is not None:
self.configuration["query"]["queryParameters"] =
ast.literal_eval(query_params)
super().execute(context)
```
### What you think should happen instead
I think the task should run until the task is finished, since in this
version, there wasn't a deferrable parameter, but maybe it didn't work quite
well, and it got detached at some moment, and thought it was completed because
of something I'm not seeing.
### How to reproduce
Didn't try to test it because it took too long to run. My idea was to ask if
you knew about this or had some theory to test it further, and if not, upgrade
`apache-airflow-providers-google` to 10.0.0 and hope it doesn't happen more, or
only happens in really long task runs.
### Operating System
Ubuntu 20.04
### Versions of Apache Airflow Providers
apache-airflow-providers-amazon==6.0.0
apache-airflow-providers-cncf-kubernetes==4.4.0
apache-airflow-providers-common-sql==1.2.0
apache-airflow-providers-ftp==3.1.0
apache-airflow-providers-google==6.8.0
apache-airflow-providers-http==4.0.0
apache-airflow-providers-imap==3.0.0
apache-airflow-providers-postgres==5.2.2
apache-airflow-providers-sendgrid==3.0.0
apache-airflow-providers-slack==6.0.0
apache-airflow-providers-sqlite==3.2.1
### Deployment
Other 3rd-party Helm chart
### Deployment details
_No response_
### Anything else
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]