Bowrna commented on code in PR #38001:
URL: https://github.com/apache/airflow/pull/38001#discussion_r1535255749
##########
airflow/providers/dbt/cloud/operators/dbt.py:
##########
@@ -250,6 +250,152 @@ def get_openlineage_facets_on_complete(self,
task_instance) -> OperatorLineage:
return OperatorLineage()
+class DbtCloudRetryJobOperator(BaseOperator):
+ """
+ Retries a dbt Cloud job from the point of failure, if the run failed.
Otherwise trigger a new run.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:DbtCloudRetryJobOperator`
+
+ :param dbt_cloud_conn_id: The connection ID for connecting to dbt Cloud.
+ :param job_id: The ID of a dbt Cloud job.
+ :param account_id: Optional. The ID of a dbt Cloud account.
+ :param wait_for_termination: Flag to wait on a job run's termination. By
default, this feature is
+ enabled but could be disabled to perform an asynchronous wait for a
long-running job run execution
+ using the ``DbtCloudJobRunSensor``.
+ :param timeout: Time in seconds to wait for a job run to reach a terminal
status for non-asynchronous
+ waits. Used only if ``wait_for_termination`` is True. Defaults to 7
days.
+ :param check_interval: Time in seconds to check on a job run's status for
non-asynchronous waits.
+ Used only if ``wait_for_termination`` is True. Defaults to 60 seconds.
+ :param deferrable: Run operator in the deferrable mode
+ :return: The ID of the new dbt Cloud job run.
+ """
+
+ template_fields = (
+ "dbt_cloud_conn_id",
+ "job_id",
+ "account_id",
+ )
+
+ operator_extra_links = (DbtCloudRunJobOperatorLink(),)
+
+ def __init__(
+ self,
+ *,
+ dbt_cloud_conn_id: str = DbtCloudHook.default_conn_name,
+ job_id: int,
+ account_id: int | None = None,
+ wait_for_termination: bool = True,
+ timeout: int = 60 * 60 * 24 * 7,
+ check_interval: int = 60,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.dbt_cloud_conn_id = dbt_cloud_conn_id
+ self.account_id = account_id
+ self.job_id = job_id
+ self.wait_for_termination = wait_for_termination
+ self.timeout = timeout
+ self.check_interval = check_interval
+ self.run_id: int | None = None
+ self.deferrable = deferrable
+
+ def execute(self, context: Context):
+ non_terminal_runs = self.hook.get_job_runs(
+ account_id=self.account_id,
+ payload={
+ "job_definition_id": self.job_id,
+ "status": DbtCloudJobRunStatus.NON_TERMINAL_STATUSES.value,
+ "order_by": "-created_at",
+ },
+ ).json()["data"]
+
+ if non_terminal_runs:
+ self.run_id = non_terminal_runs[0]["id"]
+ job_run_url = non_terminal_runs[0]["href"]
+ self.log.info("Non terminal runs for this job exist. Skipping
retry.")
+ else:
+ retry_job_response = self.hook.retry_job_run(
+ account_id=self.account_id,
+ job_id=self.job_id,
+ )
+ self.run_id = retry_job_response.json()["data"]["id"]
+ job_run_url = retry_job_response.json()["data"]["id"]
+
+ # Push the ``job_run_url`` value to XCom regardless of what happens
during execution so that the job
+ # run can be monitored via the operator link.
+ context["ti"].xcom_push(key="job_run_url", value=job_run_url)
+
+ if self.wait_for_termination and isinstance(self.run_id, int):
+ if self.deferrable is False:
+ self.log.info("Waiting for job run %s to terminate.",
self.run_id)
+
+ if self.hook.wait_for_job_run_status(
+ run_id=self.run_id,
+ account_id=self.account_id,
+ expected_statuses=DbtCloudJobRunStatus.SUCCESS.value,
+ check_interval=self.check_interval,
+ timeout=self.timeout,
+ ):
+ self.log.info("Job run %s has completed successfully.",
self.run_id)
+ else:
+ raise DbtCloudJobRunException(f"Job run {self.run_id} has
failed or has been cancelled.")
+
+ return self.run_id
+ else:
+ end_time = time.time() + self.timeout
+ job_run_info = JobRunInfo(account_id=self.account_id,
run_id=self.run_id)
+ job_run_status = self.hook.get_job_run_status(**job_run_info)
+ if not DbtCloudJobRunStatus.is_terminal(job_run_status):
+ self.defer(
+ timeout=self.execution_timeout,
+ trigger=DbtCloudRunJobTrigger(
+ conn_id=self.dbt_cloud_conn_id,
+ run_id=self.run_id,
+ end_time=end_time,
+ account_id=self.account_id,
+ poll_interval=self.check_interval,
+ ),
+ method_name="execute_complete",
+ )
+ elif job_run_status == DbtCloudJobRunStatus.SUCCESS.value:
+ self.log.info("Job run %s has completed successfully.",
self.run_id)
+ return self.run_id
+ elif job_run_status in (
Review Comment:
@andyguwc is there any reason to check the cancelled and error in the same
condition statement? exception raised could be more specific if we put in
different conditions right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]