rbkn opened a new issue #10780:
URL: https://github.com/apache/airflow/issues/10780
**Airflow Version: v1.10.10**
**Kubernetes version (if you are using kubernetes)** (use `kubectl
version`):
`Client Version: version.Info{Major:"1", Minor:"19", GitVersion:"v1.19.0",
GitCommit:"e19964183377d0ec2052d1f1fa930c4d7575bd50", GitTreeState:"clean",
BuildDate:"2020-08-26T14:30:33Z", GoVersion:"go1.15", Compiler:"gc",
Platform:"linux/amd64"}
Server Version: version.Info{Major:"1", Minor:"14+",
GitVersion:"v1.14.10-gke.42",
GitCommit:"42bef28c2031a74fc68840fce56834ff7ea08518", GitTreeState:"clean",
BuildDate:"2020-06-02T16:07:00Z", GoVersion:"go1.12.12b4", Compiler:"gc",
Platform:"l
inux/amd64"}`
**Environment**:
- **Cloud provider or hardware configuration**: GCP
- **OS** (e.g. from /etc/os-release): Debian GNU/Linux 10 (buster)
- **Kernel** (e.g. `uname -a`): Linux cs-695081671381-default-default-9zgvw
4.19.112+ #1 SMP Wed Aug 19 07:58:58 PDT 2020 x86_64 GNU/Linux
- **Install tools**: From puckel docker image
- **Others**:
**What happened**:
The job failed with the following error:
`[2020-09-06 17:35:17,013] {{taskinstance.py:1145}} ERROR - 409 POST
https://bigquery.googleapis.com/bigquery/v2/projects/<project_id>/jobs: Already
Exists: Job <project-id>:EU.airflow_1599413716`
**What you expected to happen**:
**What I think went wrong:**
It looks like when there are multiple BigQueryExecuteQueryOperator tasks in
a DAG and they run for more than a very short time and end up running in
parallel, there is a conflict with the job Id between the jobs.
The issue can be resolved by waiting for ~1 minute and re-running.
**How to reproduce it**:
I have not reproduced in minikube/kind as this appears to be more of an
operator problem that can be reproduced in a non-k8s environment.
In my local envrionment, this replicates it by running 3 simple queries in
parallel.
You will need:
* google_cloud_default to be set up as a connection to a valid gcp instance
* 0_dummy to be a valid bigquery dataset in that instance
```python
from airflow.utils.dates import days_ago
from airflow.providers.google.cloud.operators.bigquery import
BigQueryExecuteQueryOperator
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(1),
'retries': 1,
}
dag = DAG(
dag_id='test_bq',
default_args=args,
max_active_runs=1,
schedule_interval=None,
)
start = DummyOperator(task_id="run_queries_parallel",dag=dag)
first_query = BigQueryExecuteQueryOperator(
task_id="first_query",
sql="""
SELECT CURRENT_DATE() as first,2,3
""",
gcp_conn_id="google_cloud_default",
destination_dataset_table="0_dummy.first_query${{ ds_nodash }}",
write_disposition="WRITE_TRUNCATE",
allow_large_results=True,
use_legacy_sql=False,
time_partitioning={"field": "first", "require_partition_filter": False},
dag=dag,
)
second_query = BigQueryExecuteQueryOperator(
task_id="second_query",
sql="SELECT CURRENT_DATE() as first,5,6",
gcp_conn_id="google_cloud_default",
destination_dataset_table="0_dummy.second_query${{ ds_nodash }}",
write_disposition="WRITE_TRUNCATE",
time_partitioning={"field": "first", "require_partition_filter": False},
allow_large_results=True,
use_legacy_sql=False,
dag=dag,
)
third_query = BigQueryExecuteQueryOperator(
task_id="third_query",
sql="SELECT CURRENT_DATE() as first,5,6",
gcp_conn_id="google_cloud_default",
destination_dataset_table="0_dummy.third_query${{ ds_nodash }}",
write_disposition="WRITE_TRUNCATE",
time_partitioning={"field": "first", "require_partition_filter": False},
allow_large_results=True,
use_legacy_sql=False,
dag=dag,
)
start >> [first_query, second_query, third_query]
```
The outcome of this is:
<details><summary>Log output for DAG</summary>
<p>
```
> *** Reading local file:
/opt/airflow/logs/test_bq/third_query/2020-09-07T11:12:48.939603+00:00/1.log
> [2020-09-07 11:13:01,903] {taskinstance.py:670} INFO - Dependencies all
met for <TaskInstance: test_bq.third_query 2020-09-07T11:12:48.939603+00:00
[queued]>
> [2020-09-07 11:13:01,994] {taskinstance.py:670} INFO - Dependencies all
met for <TaskInstance: test_bq.third_query 2020-09-07T11:12:48.939603+00:00
[queued]>
> [2020-09-07 11:13:01,995] {taskinstance.py:880} INFO -
>
--------------------------------------------------------------------------------
> [2020-09-07 11:13:01,996] {taskinstance.py:881} INFO - Starting attempt 1
of 2
> [2020-09-07 11:13:01,997] {taskinstance.py:882} INFO -
>
--------------------------------------------------------------------------------
> [2020-09-07 11:13:02,059] {taskinstance.py:901} INFO - Executing
<Task(BigQueryExecuteQueryOperator): third_query> on
2020-09-07T11:12:48.939603+00:00
> [2020-09-07 11:13:02,093] {standard_task_runner.py:54} INFO - Started
process 4889 to run task
> [2020-09-07 11:13:02,318] {standard_task_runner.py:77} INFO - Running:
['airflow', 'run', 'test_bq', 'third_query',
'2020-09-07T11:12:48.939603+00:00', '--job_id', '35', '--pool', 'default_pool',
'--raw', '-sd', 'DAGS_FOLDER/test.py', '--cfg_path', '/tmp/tmp0snb5oo2']
> [2020-09-07 11:13:02,325] {standard_task_runner.py:78} INFO - Job 35:
Subtask third_query
> [2020-09-07 11:13:02,526] {logging_mixin.py:112} INFO - Running %s on host
%s <TaskInstance: test_bq.third_query 2020-09-07T11:12:48.939603+00:00
[running]> e26af7fa4bfb
> [2020-09-07 11:13:02,647] {bigquery.py:597} INFO - Executing: SELECT
CURRENT_DATE() as first,5,6
> [2020-09-07 11:13:02,662] {logging_mixin.py:112} WARNING -
/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/google/cloud/hooks/bigquery.py:2029:
DeprecationWarning: This method is deprecated. Please use
`BigQueryHook.insert_job` method.
> DeprecationWarning
> [2020-09-07 11:13:03,320] {taskinstance.py:1150} ERROR - 409 POST
https://bigquery.googleapis.com/bigquery/v2/projects/<project>/jobs: Already
Exists: Job p*** Reading local file:
/opt/airflow/logs/test_bq/third_query/2020-09-07T11:12:48.939603+00:00/1.log
[2020-09-07 11:13:01,903] {taskinstance.py:670} INFO - Dependencies all met
for <TaskInstance: test_bq.third_query 2020-09-07T11:12:48.939603+00:00
[queued]>
[2020-09-07 11:13:01,994] {taskinstance.py:670} INFO - Dependencies all met
for <TaskInstance: test_bq.third_query 2020-09-07T11:12:48.939603+00:00
[queued]>
[2020-09-07 11:13:01,995] {taskinstance.py:880} INFO -
--------------------------------------------------------------------------------
[2020-09-07 11:13:01,996] {taskinstance.py:881} INFO - Starting attempt 1 of
2
[2020-09-07 11:13:01,997] {taskinstance.py:882} INFO -
--------------------------------------------------------------------------------
[2020-09-07 11:13:02,059] {taskinstance.py:901} INFO - Executing
<Task(BigQueryExecuteQueryOperator): third_query> on
2020-09-07T11:12:48.939603+00:00
[2020-09-07 11:13:02,093] {standard_task_runner.py:54} INFO - Started
process 4889 to run task
[2020-09-07 11:13:02,318] {standard_task_runner.py:77} INFO - Running:
['airflow', 'run', 'test_bq', 'third_query',
'2020-09-07T11:12:48.939603+00:00', '--job_id', '35', '--pool', 'default_pool',
'--raw', '-sd', 'DAGS_FOLDER/test.py', '--cfg_path', '/tmp/tmp0snb5oo2']
[2020-09-07 11:13:02,325] {standard_task_runner.py:78} INFO - Job 35:
Subtask third_query
[2020-09-07 11:13:02,526] {logging_mixin.py:112} INFO - Running %s on host
%s <TaskInstance: test_bq.third_query 2020-09-07T11:12:48.939603+00:00
[running]> e26af7fa4bfb
[2020-09-07 11:13:02,647] {bigquery.py:597} INFO - Executing: SELECT
CURRENT_DATE() as first,5,6
[2020-09-07 11:13:02,662] {logging_mixin.py:112} WARNING -
/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/google/cloud/hooks/bigquery.py:2029:
DeprecationWarning: This method is deprecated. Please use
`BigQueryHook.insert_job` method.
DeprecationWarning
[2020-09-07 11:13:03,320] {taskinstance.py:1150} ERROR - 409 POST
https://bigquery.googleapis.com/bigquery/v2/projects/<project>/jobs: Already
Exists: Job <project>:EU.airflow_1599477182
(job ID: airflow_1599477182)
-----Query Job SQL Follows-----
| . | . | . |
1:SELECT CURRENT_DATE() as first,5,6
| . | . | . |
(job ID: airflow_1599477182)
-----Query Job SQL Follows-----
| . | . | . |
1:SELECT CURRENT_DATE() as first,5,6
| . | . | . |
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py",
line 984, in _run_raw_task
result = task_copy.execute(context=context)
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/google/cloud/operators/bigquery.py",
line 622, in execute
encryption_configuration=self.encryption_configuration
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/google/cloud/hooks/bigquery.py",
line 2171, in run_query
job.result()
File
"/home/airflow/.local/lib/python3.7/site-packages/google/cloud/bigquery/job.py",
line 3207, in result
super(QueryJob, self).result(retry=retry, timeout=timeout)
File
"/home/airflow/.local/lib/python3.7/site-packages/google/cloud/bigquery/job.py",
line 810, in result
self._begin(retry=retry, timeout=timeout)
File
"/home/airflow/.local/lib/python3.7/site-packages/google/cloud/bigquery/job.py",
line 3156, in _begin
super(QueryJob, self)._begin(client=client, retry=retry, timeout=timeout)
File
"/home/airflow/.local/lib/python3.7/site-packages/google/cloud/bigquery/job.py",
line 638, in _begin
retry, method="POST", path=path, data=self.to_api_repr(), timeout=timeout
File
"/home/airflow/.local/lib/python3.7/site-packages/google/cloud/bigquery/client.py",
line 574, in _call_api
return call()
File
"/home/airflow/.local/lib/python3.7/site-packages/google/api_core/retry.py",
line 286, in retry_wrapped_func
on_error=on_error,
File
"/home/airflow/.local/lib/python3.7/site-packages/google/api_core/retry.py",
line 184, in retry_target
return target()
File
"/home/airflow/.local/lib/python3.7/site-packages/google/cloud/_http.py", line
423, in api_request
raise exceptions.from_http_response(response)
google.api_core.exceptions.Conflict: 409 POST
https://bigquery.googleapis.com/bigquery/v2/projects/<project>/jobs: Already
Exists: Job <project>:EU.airflow_1599477182
(job ID: airflow_1599477182)
-----Query Job SQL Follows-----
| . | . | . |
1:SELECT CURRENT_DATE() as first,5,6
| . | . | . |
(job ID: airflow_1599477182)
-----Query Job SQL Follows-----
| . | . | . |
1:SELECT CURRENT_DATE() as first,5,6
| . | . | . |
[2020-09-07 11:13:03,351] {taskinstance.py:1194} INFO - Marking task as
UP_FOR_RETRY. dag_id=test_bq, task_id=third_query,
execution_date=20200907T111248, start_date=20200907T111301,
end_date=20200907T111303
[2020-09-07 11:13:06,749] {local_task_job.py:102} INFO - Task exited with
return code 1:EU.airflow_1599477182
>
> (job ID: airflow_1599477182)
>
> -----Query Job SQL Follows-----
>
> | . | . | . |
> 1:SELECT CURRENT_DATE() as first,5,6
> | . | . | . |
>
> (job ID: airflow_1599477182)
>
> -----Query Job SQL Follows-----
>
> | . | . | . |
> 1:SELECT CURRENT_DATE() as first,5,6
> | . | . | . |
> Traceback (most recent call last):
> File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py",
line 984, in _run_raw_task
> result = task_copy.execute(context=context)
> File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/google/cloud/operators/bigquery.py",
line 622, in execute
> encryption_configuration=self.encryption_configuration
> File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/google/cloud/hooks/bigquery.py",
line 2171, in run_query
> job.result()
> File
"/home/airflow/.local/lib/python3.7/site-packages/google/cloud/bigquery/job.py",
line 3207, in result
> super(QueryJob, self).result(retry=retry, timeout=timeout)
> File
"/home/airflow/.local/lib/python3.7/site-packages/google/cloud/bigquery/job.py",
line 810, in result
> self._begin(retry=retry, timeout=timeout)
> File
"/home/airflow/.local/lib/python3.7/site-packages/google/cloud/bigquery/job.py",
line 3156, in _begin
> super(QueryJob, self)._begin(client=client, retry=retry,
timeout=timeout)
> File
"/home/airflow/.local/lib/python3.7/site-packages/google/cloud/bigquery/job.py",
line 638, in _begin
> retry, method="POST", path=path, data=self.to_api_repr(),
timeout=timeout
> File
"/home/airflow/.local/lib/python3.7/site-packages/google/cloud/bigquery/client.py",
line 574, in _call_api
> return call()
> File
"/home/airflow/.local/lib/python3.7/site-packages/google/api_core/retry.py",
line 286, in retry_wrapped_func
> on_error=on_error,
> File
"/home/airflow/.local/lib/python3.7/site-packages/google/api_core/retry.py",
line 184, in retry_target
> return target()
> File
"/home/airflow/.local/lib/python3.7/site-packages/google/cloud/_http.py", line
423, in api_request
> raise exceptions.from_http_response(response)
> google.api_core.exceptions.Conflict: 409 POST
https://bigquery.googleapis.com/bigquery/v2/projects/<project>/jobs: Already
Exists: Job <project>:EU.airflow_1599477182
>
> (job ID: airflow_1599477182)
>
> -----Query Job SQL Follows-----
>
> | . | . | . |
> 1:SELECT CURRENT_DATE() as first,5,6
> | . | . | . |
>
> (job ID: airflow_1599477182)
>
> -----Query Job SQL Follows-----
>
> | . | . | . |
> 1:SELECT CURRENT_DATE() as first,5,6
> | . | . | . |
> [2020-09-07 11:13:03,351] {taskinstance.py:1194} INFO - Marking task as
UP_FOR_RETRY. dag_id=test_bq, task_id=third_query,
execution_date=20200907T111248, start_date=20200907T111301,
end_date=20200907T111303
> [2020-09-07 11:13:06,749] {local_task_job.py:102} INFO - Task exited with
return code 1
```
</p>
</details>
**Anything else we need to know**:
See reproduction - can be consistently reproduced.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]