Lazloo commented on issue #25286:
URL: https://github.com/apache/airflow/issues/25286#issuecomment-1194004837
Here the code that run the deployment:
```
from airflow import AirflowException
from airflow.models import Variable
from airflow.providers.databricks.operators.databricks import (
DatabricksRunNowOperator,
XCOM_RUN_ID_KEY,
XCOM_RUN_PAGE_URL_KEY,
_handle_databricks_operator_execution
)
class DatabricksRunNowAttachOperator(DatabricksRunNowOperator):
"""
Custom operator extending the existing DatabricksRunNowOperator.
The difference is, that it stores the the run_id in a airflow variable
and
uses this to attach to the run if the operator is restarted
This pattern avoids newly started jobs in case the operator is
restarted, e.g. because of a scheduler restart.
"""
def execute(self, context):
variable_key =
f"{context['task_instance_key_str']}_{XCOM_RUN_ID_KEY}"
existing_job_run = Variable.get(key=variable_key, default_var=None)
hook = self._get_hook()
if existing_job_run is None:
self.log.info("No job run found for this task, starting new run
in Databricks.")
self.run_id = hook.run_now(self.json)
self.log.debug(f"Job started. Writing run id to job variable
{variable_key}")
Variable.set(key=variable_key, value=self.run_id)
else:
self.log.info(f"Found existing job run {existing_job_run} for
this task. Attaching to this run.")
self.run_id = existing_job_run
try:
_handle_databricks_operator_execution(self, hook, self.log,
context)
except AirflowException as e:
if "failed with terminal state" in str(e):
self.log.debug(f"Databricks job failed. Cleaning job
variable {variable_key}")
Variable.delete(key=variable_key)
raise e
self.log.debug(f"Databricks job terminated. Cleaning job variable
{variable_key}")
Variable.delete(key=variable_key)
DatabricksRunNowAttachOperator(
task_id=job_config.name,
job_id=job_config.id,
polling_period_seconds=job_config.polling_period_seconds,
retries=job_config.retries
)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]