mhenc commented on a change in pull request #10365:
URL: https://github.com/apache/airflow/pull/10365#discussion_r471946886



##########
File path: airflow/providers/google/cloud/log/gcs_task_handler.py
##########
@@ -123,16 +163,6 @@ def _read(self, ti, try_number, metadata=None):
             log += local_log
             return log, metadata
 
-    def gcs_read(self, remote_log_location):
-        """
-        Returns the log found at the remote_log_location.
-
-        :param remote_log_location: the log's location in remote storage
-        :type remote_log_location: str (path)
-        """
-        bkt, blob = self.parse_gcs_url(remote_log_location)
-        return self.hook.download(bkt, blob).decode('utf-8')
-
     def gcs_write(self, log, remote_log_location):
         """
         Writes the log to the remote_log_location. Fails silently if no hook

Review comment:
       'Fails silently if no hook'
   there is no hook any longer

##########
File path: airflow/providers/google/cloud/log/gcs_task_handler.py
##########
@@ -16,48 +16,87 @@
 # specific language governing permissions and limitations
 # under the License.
 import os
-from urllib.parse import urlparse
+from typing import Collection, Optional
 
 from cached_property import cached_property
+from google.api_core.client_info import ClientInfo
+from google.cloud import storage
 
-from airflow.configuration import conf
-from airflow.exceptions import AirflowException
+from airflow import version
+from airflow.providers.google.cloud.utils.credentials_provider import 
get_credentials_and_project_id
 from airflow.utils.log.file_task_handler import FileTaskHandler
 from airflow.utils.log.logging_mixin import LoggingMixin
 
+_DEFAULT_SCOPESS = frozenset([
+    "https://www.googleapis.com/auth/devstorage.read_write";,
+])
+
 
 class GCSTaskHandler(FileTaskHandler, LoggingMixin):
     """
     GCSTaskHandler is a python log handler that handles and reads
     task instance logs. It extends airflow FileTaskHandler and
     uploads to and reads from GCS remote storage. Upon log reading
     failure, it reads from host machine's local disk.
+
+    :param base_log_folder: Base log folder to place logs.
+    :type base_log_folder: str
+    :param gcs_log_folder: Path to a remote location where logs will be saved. 
It must have the prefix
+        ``gs://``. For example: ``gs://bucket/remote/log/location``
+    :type gcs_log_folder: str
+    :param filename_template: template filename string
+    :type filename_template: str
+    :param gcp_key_path: Path to GCP Credential JSON file. Mutually exclusive 
with gcp_keyfile_dict.
+        If omitted, authorization based on `the Application Default Credentials
+        
<https://cloud.google.com/docs/authentication/production#finding_credentials_automatically>`__
 will
+        be used.
+    :type gcp_key_path: str
+    :param gcp_keyfile_dict: Dictionary of keyfile parameters. Mutually 
exclusive with gcp_key_path.
+    :type gcp_keyfile_dict: dict
+    :param gcp_scopes: Comma-separated string containing GCP scopes
+    :type gcp_scopes: str
+    :param project_id: Project ID to read the secrets from. If not passed, the 
project ID from credentials
+        will be used.
+    :type project_id: str
     """
-    def __init__(self, base_log_folder, gcs_log_folder, filename_template):
+    def __init__(
+        self,
+        *,
+        base_log_folder: str,
+        gcs_log_folder: str,
+        filename_template: str,
+        gcp_key_path: Optional[str] = None,
+        gcp_keyfile_dict: Optional[dict] = None,
+        # See: https://github.com/PyCQA/pylint/issues/2377
+        gcp_scopes: Optional[Collection[str]] = _DEFAULT_SCOPESS,  # pylint: 
disable=unsubscriptable-object
+        project_id: Optional[str] = None,
+    ):
         super().__init__(base_log_folder, filename_template)
         self.remote_base = gcs_log_folder
         self.log_relative_path = ''
         self._hook = None
         self.closed = False
         self.upload_on_close = True
+        self.gcp_key_path = gcp_key_path
+        self.gcp_keyfile_dict = gcp_keyfile_dict
+        self.scopes = gcp_scopes
+        self.project_id = project_id
 
     @cached_property
-    def hook(self):
-        """
-        Returns GCS hook.
-        """
-        remote_conn_id = conf.get('logging', 'REMOTE_LOG_CONN_ID')
-        try:
-            from airflow.providers.google.cloud.hooks.gcs import GCSHook
-            return GCSHook(
-                google_cloud_storage_conn_id=remote_conn_id
-            )
-        except Exception as e:  # pylint: disable=broad-except
-            self.log.error(
-                'Could not create a GoogleCloudStorageHook with connection id '
-                '"%s". %s\n\nPlease make sure that airflow[gcp] is installed '
-                'and the GCS connection exists.', remote_conn_id, str(e)
-            )
+    def client(self) -> storage.Client:
+        """Returns GCS Client."""
+        credentials, project_id = get_credentials_and_project_id(
+            key_path=self.gcp_key_path,
+            keyfile_dict=self.gcp_keyfile_dict,
+            scopes=self.scopes,
+            disable_logging=True
+        )
+        client = storage.Client(

Review comment:
       nit: why do you introduce the 'client' variable.
   Can't you just write:
   return storage.Client....

##########
File path: airflow/providers/google/cloud/log/gcs_task_handler.py
##########
@@ -16,48 +16,87 @@
 # specific language governing permissions and limitations
 # under the License.
 import os
-from urllib.parse import urlparse
+from typing import Collection, Optional
 
 from cached_property import cached_property
+from google.api_core.client_info import ClientInfo
+from google.cloud import storage
 
-from airflow.configuration import conf
-from airflow.exceptions import AirflowException
+from airflow import version
+from airflow.providers.google.cloud.utils.credentials_provider import 
get_credentials_and_project_id
 from airflow.utils.log.file_task_handler import FileTaskHandler
 from airflow.utils.log.logging_mixin import LoggingMixin
 
+_DEFAULT_SCOPESS = frozenset([
+    "https://www.googleapis.com/auth/devstorage.read_write";,
+])
+
 
 class GCSTaskHandler(FileTaskHandler, LoggingMixin):
     """
     GCSTaskHandler is a python log handler that handles and reads
     task instance logs. It extends airflow FileTaskHandler and
     uploads to and reads from GCS remote storage. Upon log reading
     failure, it reads from host machine's local disk.
+
+    :param base_log_folder: Base log folder to place logs.
+    :type base_log_folder: str
+    :param gcs_log_folder: Path to a remote location where logs will be saved. 
It must have the prefix
+        ``gs://``. For example: ``gs://bucket/remote/log/location``
+    :type gcs_log_folder: str
+    :param filename_template: template filename string
+    :type filename_template: str
+    :param gcp_key_path: Path to GCP Credential JSON file. Mutually exclusive 
with gcp_keyfile_dict.
+        If omitted, authorization based on `the Application Default Credentials
+        
<https://cloud.google.com/docs/authentication/production#finding_credentials_automatically>`__
 will
+        be used.
+    :type gcp_key_path: str
+    :param gcp_keyfile_dict: Dictionary of keyfile parameters. Mutually 
exclusive with gcp_key_path.
+    :type gcp_keyfile_dict: dict
+    :param gcp_scopes: Comma-separated string containing GCP scopes
+    :type gcp_scopes: str
+    :param project_id: Project ID to read the secrets from. If not passed, the 
project ID from credentials
+        will be used.
+    :type project_id: str
     """
-    def __init__(self, base_log_folder, gcs_log_folder, filename_template):
+    def __init__(
+        self,
+        *,
+        base_log_folder: str,
+        gcs_log_folder: str,
+        filename_template: str,
+        gcp_key_path: Optional[str] = None,
+        gcp_keyfile_dict: Optional[dict] = None,
+        # See: https://github.com/PyCQA/pylint/issues/2377
+        gcp_scopes: Optional[Collection[str]] = _DEFAULT_SCOPESS,  # pylint: 
disable=unsubscriptable-object
+        project_id: Optional[str] = None,
+    ):
         super().__init__(base_log_folder, filename_template)
         self.remote_base = gcs_log_folder
         self.log_relative_path = ''
         self._hook = None
         self.closed = False
         self.upload_on_close = True
+        self.gcp_key_path = gcp_key_path
+        self.gcp_keyfile_dict = gcp_keyfile_dict
+        self.scopes = gcp_scopes
+        self.project_id = project_id
 
     @cached_property
-    def hook(self):
-        """
-        Returns GCS hook.
-        """
-        remote_conn_id = conf.get('logging', 'REMOTE_LOG_CONN_ID')
-        try:
-            from airflow.providers.google.cloud.hooks.gcs import GCSHook
-            return GCSHook(
-                google_cloud_storage_conn_id=remote_conn_id
-            )
-        except Exception as e:  # pylint: disable=broad-except
-            self.log.error(
-                'Could not create a GoogleCloudStorageHook with connection id '
-                '"%s". %s\n\nPlease make sure that airflow[gcp] is installed '
-                'and the GCS connection exists.', remote_conn_id, str(e)
-            )
+    def client(self) -> storage.Client:
+        """Returns GCS Client."""
+        credentials, project_id = get_credentials_and_project_id(
+            key_path=self.gcp_key_path,
+            keyfile_dict=self.gcp_keyfile_dict,
+            scopes=self.scopes,
+            disable_logging=True
+        )
+        client = storage.Client(
+            credentials=credentials,
+            client_info=ClientInfo(client_library_version='airflow_v' + 
version.version),

Review comment:
       what is 'airflow_v'? - looks magical for me.

##########
File path: airflow/providers/google/cloud/log/gcs_task_handler.py
##########
@@ -144,28 +174,15 @@ def gcs_write(self, log, remote_log_location):
         :type remote_log_location: str (path)
         """
         try:
-            old_log = self.gcs_read(remote_log_location)
+            blob = storage.Blob.from_string(remote_log_location, self.client)
+            old_log = blob.download_as_string()
             log = '\n'.join([old_log, log]) if old_log else log
         except Exception as e:  # pylint: disable=broad-except

Review comment:
       nit: btw. isn't it worth to log the exeption?

##########
File path: airflow/config_templates/config.yml
##########
@@ -402,9 +402,9 @@
       type: string
       example: ~
       default: ""
-    - name: stackdriver_key_path

Review comment:
       I am not sure if there is 'deprecation policy' but if someone was 
actually using this setting. After upgrade it will stop working.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to