rohan472000 commented on issue #31147:
URL: https://github.com/apache/airflow/issues/31147#issuecomment-1549050414

   One possible solution is to modify the operator to wait for the BigQuery job 
to complete before marking the task as successful. You can achieve this by 
checking the status of the BigQuery job periodically and only marking the task 
as successful when the job is done.
   
   You can modify the `BigQueryInsertParametrizedJobOperator` to wait for the 
job to complete:
   
           from google.cloud import bigquery
           
           class 
BigQueryInsertParametrizedJobOperator(BigQueryInsertJobOperator):
               def execute(self, context):
                   query_params = self.configuration.get("query", 
{}).get("queryParameters", None)
                   if query_params is not None:
                       self.configuration["query"]["queryParameters"] = 
ast.literal_eval(query_params)
           
                   job = super().execute(context)
           
                   client = bigquery.Client()
                   while job.state != "DONE":
                       job = client.get_job(job.job_id, 
location=self.configuration["query"]["location"])
                       time.sleep(5)  # Wait for 5 seconds before checking 
again or modify according to your need
           
                   if job.errors:
                       raise Exception("BigQuery job failed with errors: 
{}".format(job.errors))
           
                   return job
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to