phanikumv commented on code in PR #26475:
URL: https://github.com/apache/airflow/pull/26475#discussion_r978953353
##########
airflow/providers/dbt/cloud/operators/dbt.py:
##########
@@ -218,3 +218,57 @@ def execute(self, context: Context) -> None:
json.dump(response.json(), file)
else:
file.write(response.text)
+
+
+class DbtCloudListJobsOperator(BaseOperator):
+ """
+ List jobs in a dbt Cloud project.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:DbtCloudListJobsOperator`
+
+ Retrieves metadata for all jobs tied to a specified dbt Cloud account. If
a ``project_id`` is
+ supplied, only jobs pertaining to this project id will be retrieved.
+
+ :param account_id: Optional. If an account ID is not provided explicitly,
+ the account ID from the dbt Cloud connection will be used.
+ :param order_by: Optional. Field to order the result by. Use '-' to
indicate reverse order.
+ For example, to use reverse order by the run ID use ``order_by=-id``.
+ :param project_id: Optional. The ID of a dbt Cloud project.
+ """
+
+ template_fields = (
+ "account_id",
+ "project_id",
+ )
+
+ def __init__(
+ self,
+ *,
+ dbt_cloud_conn_id: str = DbtCloudHook.default_conn_name,
+ account_id: int | None = None,
+ project_id: int | None = None,
+ order_by: str | None = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.dbt_cloud_conn_id = dbt_cloud_conn_id
+ self.account_id = account_id
+ self.project_id = project_id
+ self.order_by = order_by
+
+ def execute(self, context: Context) -> list:
+ hook = DbtCloudHook(self.dbt_cloud_conn_id)
+ list_jobs_response = hook.list_jobs(
+ account_id=self.account_id, order_by=self.order_by,
project_id=self.project_id
+ )
+ self.log.info(
+ "Jobs in the specified dbt Cloud account are:",
+ )
+ buffer = []
+ for job_metadata in list_jobs_response:
+ for job in job_metadata.json()['data']:
+ self.log.info(str(job["id"]))
+ buffer.append(job["id"])
Review Comment:
done with this change in the latest commit 74606ca
##########
tests/providers/dbt/cloud/operators/test_dbt_cloud.py:
##########
@@ -388,3 +389,27 @@ def test_get_text_artifact_with_step(self,
mock_get_artifact, conn_id, account_i
account_id=account_id,
step=2,
)
+
+
+class TestDbtCloudListJobsOperator:
+ def setup_method(self):
+ self.dag = DAG("test_dbt_cloud_list_jobs_op", start_date=DEFAULT_DATE)
+ self.mock_ti = MagicMock()
+ self.mock_context = {"ti": self.mock_ti}
+
+ @patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.list_jobs")
+ @pytest.mark.parametrize(
Review Comment:
done with this change in the latest commit 74606ca
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]