alekseiloginov commented on a change in pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#discussion_r817920326



##########
File path: airflow/providers/google/cloud/hooks/looker.py
##########
@@ -0,0 +1,308 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+"""This module contains a Google Cloud Looker hook."""
+
+import json
+import time
+from enum import Enum
+from typing import Dict, Optional
+
+# TODO: Temporary import for Looker API response stub, remove once looker sdk 
22.2 released
+import attr
+from looker_sdk.rtl import api_settings, auth_session, requests_transport, 
serialize
+# TODO: Temporary import for Looker API response stub, remove once looker sdk 
22.2 released
+from looker_sdk.rtl.model import Model
+from looker_sdk.sdk.api40 import methods as methods40
+from packaging.version import parse as parse_version
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base import BaseHook
+from airflow.models.connection import Connection
+from airflow.version import version
+
+# TODO: uncomment once looker sdk 22.2 released
+# from looker_sdk.sdk.api40.models import MaterializePDT
+
+
[email protected](auto_attribs=True, init=False)
+class MaterializePDT(Model):
+    """
+    TODO: Temporary API response stub. Use Looker API class once looker sdk 
22.2 released.
+    Attributes:
+    materialization_id: The ID of the enqueued materialization job, if any
+    resp_text: The string response
+    """
+
+    materialization_id: str
+    resp_text: Optional[str] = None
+
+    def __init__(self, *, materialization_id: str, resp_text: Optional[str] = 
None):
+        self.materialization_id = materialization_id
+        self.resp_text = resp_text
+
+
+class LookerHook(BaseHook):
+    """Hook for Looker APIs."""
+
+    def __init__(
+        self,
+        looker_conn_id: str,
+    ) -> None:
+        super().__init__()
+        self.looker_conn_id = looker_conn_id
+        self.source = self.get_api_source()
+
+    def start_pdt_build(
+        self,
+        model: str,
+        view: str,
+        query_params: Optional[Dict] = None,
+    ) -> MaterializePDT:
+        """
+        Submits a PDT materialization job to Looker.
+
+        :param model: Required. The model of the PDT to start building.
+        :param view: Required. The view of the PDT to start building.
+        :param query_params: Optional. Additional materialization parameters.
+        """
+        self.log.info("Submitting PDT materialization job. Model: '%s', view: 
'%s'.", model, view)
+        self.log.debug("PDT materialization job source: '%s'.", self.source)
+
+        sdk = self.get_looker_sdk()
+        looker_ver = sdk.versions().looker_release_version
+        if parse_version(looker_ver) < parse_version("22.2.0"):
+            raise AirflowException(f'This API requires Looker version 22.2+. 
Found: {looker_ver}.')
+
+        # TODO: Temporary API response stub. Use Looker API once looker sdk 
22.2 released.
+        resp = MaterializePDT(materialization_id='366', resp_text=None)
+        # unpack query_params dict into kwargs (if not None)
+        # if query_params:
+        #     resp = sdk.start_pdt_build(model_name=model, view_name=view, 
source=self.source, **query_params)
+        # else:
+        #     resp = sdk.start_pdt_build(model_name=model, view_name=view, 
source=self.source)
+
+        self.log.info("Start PDT build response: '%s'.", resp)
+
+        return resp
+
+    def check_pdt_build(
+        self,
+        materialization_id: str,
+    ) -> MaterializePDT:
+        """
+        Gets the PDT materialization job status from Looker.
+
+        :param materialization_id: Required. The materialization id to check 
status for.
+        """
+        self.log.info("Requesting PDT materialization job status. Job id: 
%s.", materialization_id)
+
+        # TODO: Temporary API response stub. Use Looker API once looker sdk 
22.2 released.
+        resp = MaterializePDT(
+            materialization_id='366',
+            resp_text='{"status":"running","runtime":null,'
+            '"message":"Materialize 100m_sum_aggregate_sdt, ",'
+            '"task_slug":"f522424e00f0039a8c8f2d53d773f310"}',
+        )
+        sdk = self.get_looker_sdk()  # NOQA
+        # resp = sdk.check_pdt_build(materialization_id=materialization_id)
+
+        self.log.info("Check PDT build response: '%s'.", resp)
+        return resp
+
+    def pdt_build_status(
+        self,
+        materialization_id: str,
+    ) -> Dict:
+        """
+        Gets the PDT materialization job status.
+
+        :param materialization_id: Required. The materialization id to check 
status for.
+        """
+        resp = self.check_pdt_build(materialization_id=materialization_id)
+
+        status_json = resp['resp_text']
+        status_dict = json.loads(status_json)
+
+        self.log.info(
+            "PDT materialization job id: %s. Status: '%s'.", 
materialization_id, status_dict['status']
+        )
+
+        return status_dict
+
+    def stop_pdt_build(
+        self,
+        materialization_id: str,
+    ) -> MaterializePDT:
+        """
+        Starts a PDT materialization job cancellation request.
+
+        :param materialization_id: Required. The materialization id to stop.
+        """
+        self.log.info("Stopping PDT materialization. Job id: %s.", 
materialization_id)
+        self.log.debug("PDT materialization job source: '%s'.", self.source)
+
+        # TODO: Temporary API response stub. Use Looker API once looker sdk 
22.2 released.
+        resp = MaterializePDT(
+            materialization_id='366',
+            resp_text='{"success":true,"connection":"incremental_pdts_test",'
+            '"statusment":"KILL 810852",'
+            '"task_slug":"6bad8184f94407134251be8fd18af834"}',
+        )

Review comment:
       same here - I removed this completely as looker sdk 22.2 is finally 
launched now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to