albertocalderari opened a new issue #8903:
URL: https://github.com/apache/airflow/issues/8903


   <!--
   
   Welcome to Apache Airflow!  For a smooth issue process, try to answer the 
following questions.
   Don't worry if they're not all applicable; just try to include what you can 
:-)
   
   If you need to include code snippets or logs, please put them in fenced code
   blocks.  If they're super-long, please use the details tag like
   <details><summary>super-long log</summary> lots of stuff </details>
   
   Please delete these comment blocks before submitting the issue.
   
   -->
   
   **Description**
   
   Looking at the code it seems like  a lot of the logic in the BQ Hook is 
already implemented in the google API python library. This includes job 
polling, a nicer way to use job config and of course las the validations that 
we now do manually.
   It would be ice to make use of these and simplify the code.
   
   My idea is then to refactor the run_<job> methods to take the google job 
config and a deterministic job id.
   This would help in case of a pod dies because of any given reason, we’d 
restart polling for the async job previously started (I apologize for the 
crappy explanation).
   
   See my hacky spike below:
   
   This is the job id definition for reference
   `job_id = re.sub(r"[^0-9a-zA-Z_\-]+", "-", 
f"{self.dag_id}_{self.task_id}_{context['execution_date'].isoformat()}__try_0")`
   
   
   and here roughly how un query would work
   `    def run_query(self, job_id: str, job_config: QueryJobConfig, sql: str, 
destination_dataset_table: str = None) -> str:
           def _recurse(job_id: str):
               [j, try_num] = job_id.split("__try_")
               new_job_id = f"{j}__try_{int(try_num) + 1}"
               return run_query(new_job_id, job_config, sql)
   
           def run_query(job_id: str, job_config: QueryJobConfig, sql: str):
               if not self.project_id:
                   raise ValueError("The project_id should be set")
   
               if destination_dataset_table is not None:
                   job_config.destination = 
TableReference.from_string(destination_dataset_table, self.project_id)
   
               try:
                   job: QueryJob = self.client.get_job(job_id, self.project_id)
                   if job.state == 'RUNNING':
                       if job.query != sql:
                           job.cancel()
                           self.log.info(f"Job {job_id} found, but sql is 
different. "
                                         f"Cancelling the current job and 
starting a new one")
                           return _recurse(job_id)
                       self.log.info(f"Job {job_id} still running, re-starting 
to poll.")
                       return job.result()
                   else:
                       self.log.info(f"Job {job_id} already executed once. 
Restarting")
                       return _recurse(job_id)
               except NotFound:
                   self.log.info(f"Job {job_id} not found, starting a new job.")
               job: QueryJob = self.client.query(sql, job_config, job_id, 
project=self.project_id)
               self.log.info(f"Running Job {job_id}...")
               return job.result()
   
           return run_query(job_id, job_config, sql)`
   
   **the encoded __try_<try_num> is not the airflow but a secondary try in case 
the task is cleared since BQ Job Ids are a unique key and can't be re-used.**
   
   **Use case / motivation**
   Trying to use the functionalities in the google cloud library rather than 
re-implementing them ourselves.
   This would allow us to pass through a Deterministic Job ID too, useful for 
picking up jobs which are still running in case a pod dies.
   **Related Issues**
   
   <!-- Is there currently another issue associated with this? -->
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to