Lee-W commented on code in PR #45634:
URL: https://github.com/apache/airflow/pull/45634#discussion_r1925003561


##########
providers/src/airflow/providers/dbt/cloud/hooks/dbt.py:
##########
@@ -356,14 +356,23 @@ def get_account(self, account_id: int | None = None) -> 
Response:
         return self._run_and_get_response(endpoint=f"{account_id}/")
 
     @fallback_to_default_account
-    def list_projects(self, account_id: int | None = None) -> list[Response]:
+    def list_projects(
+        self, name_contains: str | None = None, account_id: int | None = None

Review Comment:
   ```suggestion
           self, account_id: int | None = None, name_contains: str | None = None
   ```
   changing order might break existing code



##########
providers/src/airflow/providers/dbt/cloud/hooks/dbt.py:
##########
@@ -376,27 +385,73 @@ def get_project(self, project_id: int, account_id: int | 
None = None) -> Respons
         """
         return 
self._run_and_get_response(endpoint=f"{account_id}/projects/{project_id}/", 
api_version="v3")
 
+    @fallback_to_default_account
+    def list_environments(
+        self, project_id: int, name_contains: str | None = None, account_id: 
int | None = None
+    ) -> list[Response]:
+        """
+        Retrieve metadata for all environments tied to a specified dbt Cloud 
project.
+
+        :param project_id: The ID of a dbt Cloud project.
+        :param name_contains: Optional. The case-insensitive substring of a 
dbt Cloud environment name to filter by.
+        :param account_id: Optional. The ID of a dbt Cloud account.
+        :return: List of request responses.
+        """
+        payload = {"name__icontains": name_contains} if name_contains else None
+        return self._run_and_get_response(
+            endpoint=f"{account_id}/projects/{project_id}/environments/",
+            payload=payload,
+            paginate=True,
+            api_version="v3",
+        )
+
+    @fallback_to_default_account
+    def get_environment(
+        self, project_id: int, environment_id: int, account_id: int | None = 
None
+    ) -> Response:
+        """
+        Retrieve metadata for a specific project's environment.
+
+        :param project_id: The ID of a dbt Cloud project.
+        :param environment_id: The ID of a dbt Cloud environment.
+        :param account_id: Optional. The ID of a dbt Cloud account.
+        :return: The request response.
+        """
+        return self._run_and_get_response(
+            
endpoint=f"{account_id}/projects/{project_id}/environments/{environment_id}/", 
api_version="v3"
+        )
+
     @fallback_to_default_account
     def list_jobs(
         self,
         account_id: int | None = None,
         order_by: str | None = None,
         project_id: int | None = None,
+        environment_id: int | None = None,
+        name_contains: str | None = None,
     ) -> list[Response]:
         """
         Retrieve metadata for all jobs tied to a specified dbt Cloud account.
 
         If a ``project_id`` is supplied, only jobs pertaining to this project 
will be retrieved.
+        If an ``environment_id`` is supplied, only jobs pertaining to this 
environment will be retrieved.
 
         :param account_id: Optional. The ID of a dbt Cloud account.
         :param order_by: Optional. Field to order the result by. Use '-' to 
indicate reverse order.
             For example, to use reverse order by the run ID use 
``order_by=-id``.
-        :param project_id: The ID of a dbt Cloud project.
+        :param project_id: Optional. The ID of a dbt Cloud project.
+        :param environment_id: Optional. The ID of a dbt Cloud environment.
+        :param name_contains: Optional. The case-insensitive substring of a 
dbt Cloud job name to filter by.

Review Comment:
   Should we unify the name as `name_icontaines`?



##########
providers/src/airflow/providers/dbt/cloud/hooks/dbt.py:
##########
@@ -411,6 +466,72 @@ def get_job(self, job_id: int, account_id: int | None = 
None) -> Response:
         """
         return 
self._run_and_get_response(endpoint=f"{account_id}/jobs/{job_id}")
 
+    @fallback_to_default_account
+    def get_job_by_name(
+        self, project_name: str, environment_name: str, job_name: str, 
account_id: int | None = None
+    ) -> dict:
+        """
+        Retrieve metadata for a specific job by combination of project, 
environment, and job name.
+
+        Raises AirflowException if the job is not found or cannot be uniquely 
identified by provided parameters.
+
+        :param project_name: The name of a dbt Cloud project.
+        :param environment_name: The name of a dbt Cloud environment.
+        :param job_name: The name of a dbt Cloud job.
+        :param account_id: Optional. The ID of a dbt Cloud account.
+        :return: The details of a job.
+        """
+        # get project_id using project_name
+        list_projects_responses = 
self.list_projects(name_contains=project_name, account_id=account_id)
+        # flatten & filter the list of responses to find the exact match
+        projects = [
+            project
+            for response in list_projects_responses
+            for project in response.json()["data"]
+            if project["name"] == project_name
+        ]
+        if len(projects) != 1:
+            raise AirflowException(f"Found {len(projects)} projects with name 
`{project_name}`.")

Review Comment:
   Let's create a custom exception (inherits AirflowException) in this provider 
instead of using AirflowException directly



##########
providers/src/airflow/providers/dbt/cloud/operators/dbt.py:
##########
@@ -135,6 +148,18 @@ def execute(self, context: Context):
                 f"Triggered via Apache Airflow by task {self.task_id!r} in the 
{self.dag.dag_id} DAG."
             )
 
+        if self.job_id is None:
+            if not all([self.project_name, self.environment_name, 
self.job_name]):
+                raise AirflowException(

Review Comment:
   same here



##########
providers/src/airflow/providers/dbt/cloud/hooks/dbt.py:
##########
@@ -376,27 +385,73 @@ def get_project(self, project_id: int, account_id: int | 
None = None) -> Respons
         """
         return 
self._run_and_get_response(endpoint=f"{account_id}/projects/{project_id}/", 
api_version="v3")
 
+    @fallback_to_default_account
+    def list_environments(
+        self, project_id: int, name_contains: str | None = None, account_id: 
int | None = None

Review Comment:
   ```suggestion
           self, project_id: int, *, name_contains: str | None = None, 
account_id: int | None = None
   ```



##########
providers/src/airflow/providers/dbt/cloud/hooks/dbt.py:
##########
@@ -411,6 +466,72 @@ def get_job(self, job_id: int, account_id: int | None = 
None) -> Response:
         """
         return 
self._run_and_get_response(endpoint=f"{account_id}/jobs/{job_id}")
 
+    @fallback_to_default_account
+    def get_job_by_name(
+        self, project_name: str, environment_name: str, job_name: str, 
account_id: int | None = None

Review Comment:
   ```suggestion
           self, *, project_name: str, environment_name: str, job_name: str, 
account_id: int | None = None
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to