mik-laj commented on a change in pull request #20998:
URL: https://github.com/apache/airflow/pull/20998#discussion_r789235758



##########
File path: airflow/providers/dbt/cloud/operators/dbt.py
##########
@@ -0,0 +1,203 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import json
+from typing import TYPE_CHECKING, Any, Dict, List, Optional
+
+from airflow.models import BaseOperator, BaseOperatorLink, TaskInstance
+from airflow.providers.dbt.cloud.hooks.dbt import DbtCloudHook, 
DbtCloudJobRunException, DbtCloudJobRunStatus
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class DbtCloudRunJobOperatorLink(BaseOperatorLink):
+    """
+    Operator link for DbtCloudRunJobOperator. This link allows users to 
monitor the triggered job run
+    directly in dbt Cloud.
+    """
+
+    name = "Monitor Job Run"
+
+    def get_link(self, operator, dttm):
+        ti = TaskInstance(task=operator, execution_date=dttm)
+        job_run_url = ti.xcom_pull(task_ids=operator.task_id, 
key="job_run_url")
+
+        return job_run_url
+
+
+class DbtCloudRunJobOperator(BaseOperator):
+    """
+    Executes a dbt Cloud job.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:DbtCloudRunJobOperator`
+
+    :param dbt_cloud_conn_id: The connection ID for connecting to dbt Cloud.
+    :param job_id: The ID of a dbt Cloud job.
+    :param account_id: Optional. The ID of a dbt Cloud account.
+    :param trigger_reason: Optional. Description of the reason to trigger the 
job.
+    :param steps_override: Optional. List of dbt commands to execute when 
triggering the job instead of those
+        configured in dbt Cloud.
+    :param schema_override: Optional. Override the destination schema in the 
configured target for this job.
+    :param wait_for_termination: Flag to wait on a job run's termination.  By 
default, this feature is
+        enabled but could be disabled to perform an asynchronous wait for a 
long-running job run execution
+        using the ``DbtCloudJobRunSensor``.
+    :param timeout: Time in seconds to wait for a job run to reach a terminal 
status for non-asynchronous
+        waits. Used only if ``wait_for_termination`` is True. Defaults to 7 
days.
+    :param check_interval: Time in seconds to check on a job run's status for 
non-asynchronous waits.
+        Used only if ``wait_for_termination`` is True. Defaults to 60 seconds.
+    :param additional_run_config: Optional. Any additional parameters that 
should be included in the API
+        request when triggering the job.
+    :return: The ID of the triggered dbt Cloud job run.
+    """
+
+    template_fields = ("dbt_cloud_conn_id", "job_id", "account_id", 
"trigger_reason")
+
+    operator_extra_links = (DbtCloudRunJobOperatorLink(),)
+
+    def __init__(
+        self,
+        *,
+        dbt_cloud_conn_id: str = DbtCloudHook.default_conn_name,
+        job_id: int,
+        account_id: Optional[int] = None,
+        trigger_reason: Optional[str] = None,
+        steps_override: Optional[List[str]] = None,
+        schema_override: Optional[str] = None,
+        wait_for_termination: bool = True,
+        timeout: int = 60 * 60 * 24 * 7,
+        check_interval: int = 60,
+        additional_run_config: Optional[Dict[str, Any]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.dbt_cloud_conn_id = dbt_cloud_conn_id
+        self.hook = DbtCloudHook(self.dbt_cloud_conn_id)

Review comment:
       We should avoid initializing hook in ctor. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to