Lazloo commented on issue #25286:
URL: https://github.com/apache/airflow/issues/25286#issuecomment-1194004837

   Here the code that run the deployment:
   
   ```
   from airflow import AirflowException
   from airflow.models import Variable
   from airflow.providers.databricks.operators.databricks import (
       DatabricksRunNowOperator,
       XCOM_RUN_ID_KEY,
       XCOM_RUN_PAGE_URL_KEY,
       _handle_databricks_operator_execution
   )
   
   class DatabricksRunNowAttachOperator(DatabricksRunNowOperator):
       """
       Custom operator extending the existing DatabricksRunNowOperator.
   
       The difference is, that it stores the the run_id in a airflow variable 
and
       uses this to attach to the run if the operator is restarted
   
       This pattern avoids newly started jobs in case the operator is 
restarted, e.g. because of a scheduler restart.
       """
   
       def execute(self, context):
           variable_key = 
f"{context['task_instance_key_str']}_{XCOM_RUN_ID_KEY}"
           existing_job_run = Variable.get(key=variable_key, default_var=None)
           hook = self._get_hook()
           if existing_job_run is None:
               self.log.info("No job run found for this task, starting new run 
in Databricks.")
               self.run_id = hook.run_now(self.json)
               self.log.debug(f"Job started. Writing run id to job variable 
{variable_key}")
               Variable.set(key=variable_key, value=self.run_id)
           else:
               self.log.info(f"Found existing job run {existing_job_run} for 
this task. Attaching to this run.")
               self.run_id = existing_job_run
   
           try:
               _handle_databricks_operator_execution(self, hook, self.log, 
context)
           except AirflowException as e:
               if "failed with terminal state" in str(e):
                   self.log.debug(f"Databricks job failed. Cleaning job 
variable {variable_key}")
                   Variable.delete(key=variable_key)
               raise e
           self.log.debug(f"Databricks job terminated. Cleaning job variable 
{variable_key}")
           Variable.delete(key=variable_key)
   
   DatabricksRunNowAttachOperator(
                   task_id=job_config.name,
                   job_id=job_config.id,
                   polling_period_seconds=job_config.polling_period_seconds,
                   retries=job_config.retries
               )
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to