rohan472000 commented on issue #31147:
URL: https://github.com/apache/airflow/issues/31147#issuecomment-1549050414
One possible solution is to modify the operator to wait for the BigQuery job
to complete before marking the task as successful. You can achieve this by
checking the status of the BigQuery job periodically and only marking the task
as successful when the job is done.
You can modify the `BigQueryInsertParametrizedJobOperator` to wait for the
job to complete:
from google.cloud import bigquery
class
BigQueryInsertParametrizedJobOperator(BigQueryInsertJobOperator):
def execute(self, context):
query_params = self.configuration.get("query",
{}).get("queryParameters", None)
if query_params is not None:
self.configuration["query"]["queryParameters"] =
ast.literal_eval(query_params)
job = super().execute(context)
client = bigquery.Client()
while job.state != "DONE":
job = client.get_job(job.job_id,
location=self.configuration["query"]["location"])
time.sleep(5) # Wait for 5 seconds before checking
again or modify according to your need
if job.errors:
raise Exception("BigQuery job failed with errors:
{}".format(job.errors))
return job
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]