mik-laj commented on a change in pull request #10365:
URL: https://github.com/apache/airflow/pull/10365#discussion_r472022699



##########
File path: airflow/providers/google/cloud/log/gcs_task_handler.py
##########
@@ -16,48 +16,87 @@
 # specific language governing permissions and limitations
 # under the License.
 import os
-from urllib.parse import urlparse
+from typing import Collection, Optional
 
 from cached_property import cached_property
+from google.api_core.client_info import ClientInfo
+from google.cloud import storage
 
-from airflow.configuration import conf
-from airflow.exceptions import AirflowException
+from airflow import version
+from airflow.providers.google.cloud.utils.credentials_provider import 
get_credentials_and_project_id
 from airflow.utils.log.file_task_handler import FileTaskHandler
 from airflow.utils.log.logging_mixin import LoggingMixin
 
+_DEFAULT_SCOPESS = frozenset([
+    "https://www.googleapis.com/auth/devstorage.read_write";,
+])
+
 
 class GCSTaskHandler(FileTaskHandler, LoggingMixin):
     """
     GCSTaskHandler is a python log handler that handles and reads
     task instance logs. It extends airflow FileTaskHandler and
     uploads to and reads from GCS remote storage. Upon log reading
     failure, it reads from host machine's local disk.
+
+    :param base_log_folder: Base log folder to place logs.
+    :type base_log_folder: str
+    :param gcs_log_folder: Path to a remote location where logs will be saved. 
It must have the prefix
+        ``gs://``. For example: ``gs://bucket/remote/log/location``
+    :type gcs_log_folder: str
+    :param filename_template: template filename string
+    :type filename_template: str
+    :param gcp_key_path: Path to GCP Credential JSON file. Mutually exclusive 
with gcp_keyfile_dict.
+        If omitted, authorization based on `the Application Default Credentials
+        
<https://cloud.google.com/docs/authentication/production#finding_credentials_automatically>`__
 will
+        be used.
+    :type gcp_key_path: str
+    :param gcp_keyfile_dict: Dictionary of keyfile parameters. Mutually 
exclusive with gcp_key_path.
+    :type gcp_keyfile_dict: dict
+    :param gcp_scopes: Comma-separated string containing GCP scopes
+    :type gcp_scopes: str
+    :param project_id: Project ID to read the secrets from. If not passed, the 
project ID from credentials
+        will be used.
+    :type project_id: str
     """
-    def __init__(self, base_log_folder, gcs_log_folder, filename_template):
+    def __init__(
+        self,
+        *,
+        base_log_folder: str,
+        gcs_log_folder: str,
+        filename_template: str,
+        gcp_key_path: Optional[str] = None,
+        gcp_keyfile_dict: Optional[dict] = None,
+        # See: https://github.com/PyCQA/pylint/issues/2377
+        gcp_scopes: Optional[Collection[str]] = _DEFAULT_SCOPESS,  # pylint: 
disable=unsubscriptable-object
+        project_id: Optional[str] = None,
+    ):
         super().__init__(base_log_folder, filename_template)
         self.remote_base = gcs_log_folder
         self.log_relative_path = ''
         self._hook = None
         self.closed = False
         self.upload_on_close = True
+        self.gcp_key_path = gcp_key_path
+        self.gcp_keyfile_dict = gcp_keyfile_dict
+        self.scopes = gcp_scopes
+        self.project_id = project_id
 
     @cached_property
-    def hook(self):
-        """
-        Returns GCS hook.
-        """
-        remote_conn_id = conf.get('logging', 'REMOTE_LOG_CONN_ID')
-        try:
-            from airflow.providers.google.cloud.hooks.gcs import GCSHook
-            return GCSHook(
-                google_cloud_storage_conn_id=remote_conn_id
-            )
-        except Exception as e:  # pylint: disable=broad-except
-            self.log.error(
-                'Could not create a GoogleCloudStorageHook with connection id '
-                '"%s". %s\n\nPlease make sure that airflow[gcp] is installed '
-                'and the GCS connection exists.', remote_conn_id, str(e)
-            )
+    def client(self) -> storage.Client:
+        """Returns GCS Client."""
+        credentials, project_id = get_credentials_and_project_id(
+            key_path=self.gcp_key_path,
+            keyfile_dict=self.gcp_keyfile_dict,
+            scopes=self.scopes,
+            disable_logging=True
+        )
+        client = storage.Client(
+            credentials=credentials,
+            client_info=ClientInfo(client_library_version='airflow_v' + 
version.version),

Review comment:
       This is a "User Agent" that allows you to track on the Google side which 
versions of Airflow your users are using. This identical identifier is used 
each time a client is created.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to