raphaelauv commented on issue #23826:
URL: https://github.com/apache/airflow/issues/23826#issuecomment-1142095122

   try this 
   
   ```python
   from airflow.providers.google.cloud.operators.bigquery import 
BigQueryInsertJobOperator
   from typing import Any
   from google.api_core.exceptions import Conflict
   from airflow.exceptions import AirflowException
   from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
   from airflow.providers.google.cloud.links.bigquery import BigQueryTableLink
   
   
   class FixBigQueryInsertJobOperator(BigQueryInsertJobOperator):
   
       def execute(self, context: Any):
           hook = BigQueryHook(
               gcp_conn_id=self.gcp_conn_id,
               delegate_to=self.delegate_to,
               impersonation_chain=self.impersonation_chain,
           )
           self.hook = hook
           job_id = self._job_id(context)
           try:
               job = self._submit_job(hook, job_id)
               self._handle_job_error(job)
           except Conflict:
               # If the job already exists retrieve it
               job = hook.get_job(
                   project_id=self.project_id,
                   location=self.location,
                   job_id=job_id,
               )
               if job.state in self.reattach_states:
                   # We are reattaching to a job
                   job.result(timeout=self.result_timeout, 
retry=self.result_retry)
                   self._handle_job_error(job)
               else:
                   # Same job configuration so we need force_rerun
                   raise AirflowException(
                       f"Job with id: {job_id} already exists and is in 
{job.state} state. If you "
                       f"want to force rerun it consider setting 
`force_rerun=True`."
                       f"Or, if you want to reattach in this scenario add 
{job.state} to `reattach_states`"
                   )
   
           if "query" in job.to_api_repr()["configuration"]:
               if "destinationTable" in 
job.to_api_repr()["configuration"]["query"]:
                   table = 
job.to_api_repr()["configuration"]["query"]["destinationTable"]
                   BigQueryTableLink.persist(
                       context=context,
                       task_instance=self,
                       dataset_id=table["datasetId"],
                       project_id=table["projectId"],
                       table_id=table["tableId"],
                   )
           self.job_id = job.job_id
           return job.job_id
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to