raphaelauv commented on issue #23826:
URL: https://github.com/apache/airflow/issues/23826#issuecomment-1142095122
try this
```python
from airflow.providers.google.cloud.operators.bigquery import
BigQueryInsertJobOperator
from typing import Any
from google.api_core.exceptions import Conflict
from airflow.exceptions import AirflowException
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from airflow.providers.google.cloud.links.bigquery import BigQueryTableLink
class FixBigQueryInsertJobOperator(BigQueryInsertJobOperator):
def execute(self, context: Any):
hook = BigQueryHook(
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
impersonation_chain=self.impersonation_chain,
)
self.hook = hook
job_id = self._job_id(context)
try:
job = self._submit_job(hook, job_id)
self._handle_job_error(job)
except Conflict:
# If the job already exists retrieve it
job = hook.get_job(
project_id=self.project_id,
location=self.location,
job_id=job_id,
)
if job.state in self.reattach_states:
# We are reattaching to a job
job.result(timeout=self.result_timeout,
retry=self.result_retry)
self._handle_job_error(job)
else:
# Same job configuration so we need force_rerun
raise AirflowException(
f"Job with id: {job_id} already exists and is in
{job.state} state. If you "
f"want to force rerun it consider setting
`force_rerun=True`."
f"Or, if you want to reattach in this scenario add
{job.state} to `reattach_states`"
)
if "query" in job.to_api_repr()["configuration"]:
if "destinationTable" in
job.to_api_repr()["configuration"]["query"]:
table =
job.to_api_repr()["configuration"]["query"]["destinationTable"]
BigQueryTableLink.persist(
context=context,
task_instance=self,
dataset_id=table["datasetId"],
project_id=table["projectId"],
table_id=table["tableId"],
)
self.job_id = job.job_id
return job.job_id
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]