Lee-W commented on code in PR #45634: URL: https://github.com/apache/airflow/pull/45634#discussion_r1925003561
########## providers/src/airflow/providers/dbt/cloud/hooks/dbt.py: ########## @@ -356,14 +356,23 @@ def get_account(self, account_id: int | None = None) -> Response: return self._run_and_get_response(endpoint=f"{account_id}/") @fallback_to_default_account - def list_projects(self, account_id: int | None = None) -> list[Response]: + def list_projects( + self, name_contains: str | None = None, account_id: int | None = None Review Comment: ```suggestion self, account_id: int | None = None, name_contains: str | None = None ``` changing order might break existing code ########## providers/src/airflow/providers/dbt/cloud/hooks/dbt.py: ########## @@ -376,27 +385,73 @@ def get_project(self, project_id: int, account_id: int | None = None) -> Respons """ return self._run_and_get_response(endpoint=f"{account_id}/projects/{project_id}/", api_version="v3") + @fallback_to_default_account + def list_environments( + self, project_id: int, name_contains: str | None = None, account_id: int | None = None + ) -> list[Response]: + """ + Retrieve metadata for all environments tied to a specified dbt Cloud project. + + :param project_id: The ID of a dbt Cloud project. + :param name_contains: Optional. The case-insensitive substring of a dbt Cloud environment name to filter by. + :param account_id: Optional. The ID of a dbt Cloud account. + :return: List of request responses. + """ + payload = {"name__icontains": name_contains} if name_contains else None + return self._run_and_get_response( + endpoint=f"{account_id}/projects/{project_id}/environments/", + payload=payload, + paginate=True, + api_version="v3", + ) + + @fallback_to_default_account + def get_environment( + self, project_id: int, environment_id: int, account_id: int | None = None + ) -> Response: + """ + Retrieve metadata for a specific project's environment. + + :param project_id: The ID of a dbt Cloud project. + :param environment_id: The ID of a dbt Cloud environment. + :param account_id: Optional. The ID of a dbt Cloud account. + :return: The request response. + """ + return self._run_and_get_response( + endpoint=f"{account_id}/projects/{project_id}/environments/{environment_id}/", api_version="v3" + ) + @fallback_to_default_account def list_jobs( self, account_id: int | None = None, order_by: str | None = None, project_id: int | None = None, + environment_id: int | None = None, + name_contains: str | None = None, ) -> list[Response]: """ Retrieve metadata for all jobs tied to a specified dbt Cloud account. If a ``project_id`` is supplied, only jobs pertaining to this project will be retrieved. + If an ``environment_id`` is supplied, only jobs pertaining to this environment will be retrieved. :param account_id: Optional. The ID of a dbt Cloud account. :param order_by: Optional. Field to order the result by. Use '-' to indicate reverse order. For example, to use reverse order by the run ID use ``order_by=-id``. - :param project_id: The ID of a dbt Cloud project. + :param project_id: Optional. The ID of a dbt Cloud project. + :param environment_id: Optional. The ID of a dbt Cloud environment. + :param name_contains: Optional. The case-insensitive substring of a dbt Cloud job name to filter by. Review Comment: Should we unify the name as `name_icontaines`? ########## providers/src/airflow/providers/dbt/cloud/hooks/dbt.py: ########## @@ -411,6 +466,72 @@ def get_job(self, job_id: int, account_id: int | None = None) -> Response: """ return self._run_and_get_response(endpoint=f"{account_id}/jobs/{job_id}") + @fallback_to_default_account + def get_job_by_name( + self, project_name: str, environment_name: str, job_name: str, account_id: int | None = None + ) -> dict: + """ + Retrieve metadata for a specific job by combination of project, environment, and job name. + + Raises AirflowException if the job is not found or cannot be uniquely identified by provided parameters. + + :param project_name: The name of a dbt Cloud project. + :param environment_name: The name of a dbt Cloud environment. + :param job_name: The name of a dbt Cloud job. + :param account_id: Optional. The ID of a dbt Cloud account. + :return: The details of a job. + """ + # get project_id using project_name + list_projects_responses = self.list_projects(name_contains=project_name, account_id=account_id) + # flatten & filter the list of responses to find the exact match + projects = [ + project + for response in list_projects_responses + for project in response.json()["data"] + if project["name"] == project_name + ] + if len(projects) != 1: + raise AirflowException(f"Found {len(projects)} projects with name `{project_name}`.") Review Comment: Let's create a custom exception (inherits AirflowException) in this provider instead of using AirflowException directly ########## providers/src/airflow/providers/dbt/cloud/operators/dbt.py: ########## @@ -135,6 +148,18 @@ def execute(self, context: Context): f"Triggered via Apache Airflow by task {self.task_id!r} in the {self.dag.dag_id} DAG." ) + if self.job_id is None: + if not all([self.project_name, self.environment_name, self.job_name]): + raise AirflowException( Review Comment: same here ########## providers/src/airflow/providers/dbt/cloud/hooks/dbt.py: ########## @@ -376,27 +385,73 @@ def get_project(self, project_id: int, account_id: int | None = None) -> Respons """ return self._run_and_get_response(endpoint=f"{account_id}/projects/{project_id}/", api_version="v3") + @fallback_to_default_account + def list_environments( + self, project_id: int, name_contains: str | None = None, account_id: int | None = None Review Comment: ```suggestion self, project_id: int, *, name_contains: str | None = None, account_id: int | None = None ``` ########## providers/src/airflow/providers/dbt/cloud/hooks/dbt.py: ########## @@ -411,6 +466,72 @@ def get_job(self, job_id: int, account_id: int | None = None) -> Response: """ return self._run_and_get_response(endpoint=f"{account_id}/jobs/{job_id}") + @fallback_to_default_account + def get_job_by_name( + self, project_name: str, environment_name: str, job_name: str, account_id: int | None = None Review Comment: ```suggestion self, *, project_name: str, environment_name: str, job_name: str, account_id: int | None = None ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org