albertocalderari opened a new issue #8903: URL: https://github.com/apache/airflow/issues/8903
<!-- Welcome to Apache Airflow! For a smooth issue process, try to answer the following questions. Don't worry if they're not all applicable; just try to include what you can :-) If you need to include code snippets or logs, please put them in fenced code blocks. If they're super-long, please use the details tag like <details><summary>super-long log</summary> lots of stuff </details> Please delete these comment blocks before submitting the issue. --> **Description** Looking at the code it seems like a lot of the logic in the BQ Hook is already implemented in the google API python library. This includes job polling, a nicer way to use job config and of course las the validations that we now do manually. It would be ice to make use of these and simplify the code. My idea is then to refactor the run_<job> methods to take the google job config and a deterministic job id. This would help in case of a pod dies because of any given reason, we’d restart polling for the async job previously started (I apologize for the crappy explanation). See my hacky spike below: This is the job id definition for reference `job_id = re.sub(r"[^0-9a-zA-Z_\-]+", "-", f"{self.dag_id}_{self.task_id}_{context['execution_date'].isoformat()}__try_0")` and here roughly how un query would work ` def run_query(self, job_id: str, job_config: QueryJobConfig, sql: str, destination_dataset_table: str = None) -> str: def _recurse(job_id: str): [j, try_num] = job_id.split("__try_") new_job_id = f"{j}__try_{int(try_num) + 1}" return run_query(new_job_id, job_config, sql) def run_query(job_id: str, job_config: QueryJobConfig, sql: str): if not self.project_id: raise ValueError("The project_id should be set") if destination_dataset_table is not None: job_config.destination = TableReference.from_string(destination_dataset_table, self.project_id) try: job: QueryJob = self.client.get_job(job_id, self.project_id) if job.state == 'RUNNING': if job.query != sql: job.cancel() self.log.info(f"Job {job_id} found, but sql is different. " f"Cancelling the current job and starting a new one") return _recurse(job_id) self.log.info(f"Job {job_id} still running, re-starting to poll.") return job.result() else: self.log.info(f"Job {job_id} already executed once. Restarting") return _recurse(job_id) except NotFound: self.log.info(f"Job {job_id} not found, starting a new job.") job: QueryJob = self.client.query(sql, job_config, job_id, project=self.project_id) self.log.info(f"Running Job {job_id}...") return job.result() return run_query(job_id, job_config, sql)` **the encoded __try_<try_num> is not the airflow but a secondary try in case the task is cleared since BQ Job Ids are a unique key and can't be re-used.** **Use case / motivation** Trying to use the functionalities in the google cloud library rather than re-implementing them ourselves. This would allow us to pass through a Deterministic Job ID too, useful for picking up jobs which are still running in case a pod dies. **Related Issues** <!-- Is there currently another issue associated with this? --> ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org