set92 opened a new issue, #31147:
URL: https://github.com/apache/airflow/issues/31147

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   It is true that the apache-airflow-providers-google version is 6.8.0 (apr 
2022), and Airflow 2.4.2, so maybe updating the packages could fix the error.
   
   The error happened with a long query (~ 3h), in which the task marked as 
successfully before the task really finished. In the logs we can see both 
started almost at the same time, but airflow thought it was already finished at 
7:17 UTC, but Bigquery didn't really finish until 8:05 UTC. The query was the 
first step of a transactional query, in which it calculates some st_intersetcs 
and cross join between 2 tables.
   
   The **log from the task** in Airflow:
   ```
   *** Reading remote log from 
s3://.../dag_id=dag_name/run_id=manual__2023-05-03T14:36:11+00:00/task_id=task_1_name.insert_into_prod.insert_data_and_update_valid_dates/attempt=1.log.
   [2023-05-04, 05:15:39 UTC] {taskinstance.py:1165} INFO - Dependencies all 
met for <TaskInstance: 
dag_name.task_1_name.insert_into_prod.insert_data_and_update_valid_dates 
manual__2023-05-03T14:36:11+00:00 [queued]>
   [2023-05-04, 05:15:40 UTC] {taskinstance.py:1165} INFO - Dependencies all 
met for <TaskInstance: 
dag_name.task_1_name.insert_into_prod.insert_data_and_update_valid_dates 
manual__2023-05-03T14:36:11+00:00 [queued]>
   [2023-05-04, 05:15:40 UTC] {taskinstance.py:1362} INFO - 
   
--------------------------------------------------------------------------------
   [2023-05-04, 05:15:40 UTC] {taskinstance.py:1363} INFO - Starting attempt 1 
of 1
   [2023-05-04, 05:15:40 UTC] {taskinstance.py:1364} INFO - 
   
--------------------------------------------------------------------------------
   [2023-05-04, 05:15:40 UTC] {taskinstance.py:1383} INFO - Executing 
<Task(BigQueryInsertParametrizedJobOperator): 
task_1_name.insert_into_prod.insert_data_and_update_valid_dates> on 2023-05-03 
14:36:11+00:00
   [2023-05-04, 05:15:40 UTC] {standard_task_runner.py:55} INFO - Started 
process 23 to run task
   [2023-05-04, 05:15:40 UTC] {standard_task_runner.py:82} INFO - Running: 
['***', 'tasks', 'run', 'dag_name', 
'task_1_name.insert_into_prod.insert_data_and_update_valid_dates', 
'manual__2023-05-03T14:36:11+00:00', '--job-id', '78337', '--raw', '--subdir', 
'DAGS_FOLDER/master_dag_factory_dag_name.py', '--cfg-path', '/tmp/tmppm9tmkfj']
   [2023-05-04, 05:15:40 UTC] {standard_task_runner.py:83} INFO - Job 78337: 
Subtask task_1_name.insert_into_prod.insert_data_and_update_valid_dates
   [2023-05-04, 05:15:40 UTC] {task_command.py:376} INFO - Running 
<TaskInstance: 
dag_name.task_1_name.insert_into_prod.insert_data_and_update_valid_dates 
manual__2023-05-03T14:36:11+00:00 [running]> on host 
pod-c8ddc6bb711e4d1ab5f1a9feb9a16501
   [2023-05-04, 05:15:40 UTC] {taskinstance.py:1590} INFO - Exporting the 
following env vars:
   AIRFLOW_CTX_DAG_ID=dag_name
   
AIRFLOW_CTX_TASK_ID=task_1_name.insert_into_prod.insert_data_and_update_valid_dates
   AIRFLOW_CTX_EXECUTION_DATE=2023-05-03T14:36:11+00:00
   AIRFLOW_CTX_TRY_NUMBER=1
   AIRFLOW_CTX_DAG_RUN_ID=manual__2023-05-03T14:36:11+00:00
   [2023-05-04, 05:15:42 UTC] {base.py:71} INFO - Using connection ID 
'default_conn' for task execution.
   [2023-05-04, 05:15:42 UTC] {bigquery.py:1554} INFO - Inserting job 
***_transaction_dag_name_task_1_name_insert_into_prod_generate_job_id_1683124743200_1_472eb3e9b7aabcbdab449fbf7ca0ca0b
   [2023-05-04, 07:16:41 UTC] {taskinstance.py:1401} INFO - Marking task as 
SUCCESS. dag_id=dag_name, 
task_id=task_1_name.insert_into_prod.insert_data_and_update_valid_dates, 
execution_date=20230503T143611, start_date=20230504T051539, 
end_date=20230504T071641
   [2023-05-04, 07:16:44 UTC] {local_task_job.py:164} INFO - Task exited with 
return code 0
   ```
   
   **Bigquery Job information**
   ```
   Job ID -- XXX
   User -- XXX
   Location -- EU
   Creation time -- May 4, 2023, 7:15:44 AM UTC+2
   Start time -- May 4, 2023, 7:15:45 AM UTC+2
   End time -- May 4, 2023, 10:05:20 AM UTC+2
   Duration -- 2 hr 49 min
   Bytes processed -- 236.05 GB
   Bytes billed -- 236.05 GB
   Job priority -- INTERACTIVE
   Use legacy SQL -- false
   Destination table -- XXX
   ```
   
   The code used to run the operator:
   ```python
           config = {
               "query": {
                   "query": transaction_query,
                   "useLegacySql": BigQueryToProdValues.USE_LEGACY_SQL,
                   "allowLargeResults": 
BigQueryToProdValues.ALLOW_LARGE_RESULTS,
                   "location": "eu",
               }
           }
   
           if self.query_params:
               config["query"]["queryParameters"] = self.query_params
   
           insert_data = BigQueryInsertParametrizedJobOperator(
               task_id=CommonTaskIds.INSERT_DATA_AND_UPDATE_VALID_DATES,
               task_group=self.task_group,
               gcp_conn_id=self.gcp_conn_id,
               force_rerun=False,
               reattach_states={"PENDING", "RUNNING", "DONE"},
               job_id=kwargs.pop("job_id"),
               configuration=config,
               pool=self.pool,
               trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
               **kwargs,
           )
   
           return insert_data, insert_data
   
   
   class BigQueryInsertParametrizedJobOperator(BigQueryInsertJobOperator):
       def execute(self, context):
           query_params = self.configuration.get("query", 
{}).get("queryParameters", None)
           if query_params is not None:
               self.configuration["query"]["queryParameters"] = 
ast.literal_eval(query_params)
           super().execute(context)
   ```
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   ### What you think should happen instead
   
   I think the task should run until the task is finished, since in this 
version, there wasn't a deferrable parameter, but maybe it didn't work quite 
well, and it got detached at some moment, and thought it was completed because 
of something I'm not seeing.
   
   
   ### How to reproduce
   
   Didn't try to test it because it took too long to run. My idea was to ask if 
you knew about this or had some theory to test it further, and if not, upgrade 
`apache-airflow-providers-google` to 10.0.0 and hope it doesn't happen more, or 
only happens in really long task runs.
   
   ### Operating System
   
   Ubuntu 20.04
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-amazon==6.0.0
   apache-airflow-providers-cncf-kubernetes==4.4.0
   apache-airflow-providers-common-sql==1.2.0
   apache-airflow-providers-ftp==3.1.0
   apache-airflow-providers-google==6.8.0
   apache-airflow-providers-http==4.0.0
   apache-airflow-providers-imap==3.0.0
   apache-airflow-providers-postgres==5.2.2
   apache-airflow-providers-sendgrid==3.0.0
   apache-airflow-providers-slack==6.0.0
   apache-airflow-providers-sqlite==3.2.1
   
   
   ### Deployment
   
   Other 3rd-party Helm chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to