mhenc commented on a change in pull request #10365:
URL: https://github.com/apache/airflow/pull/10365#discussion_r471946886
##########
File path: airflow/providers/google/cloud/log/gcs_task_handler.py
##########
@@ -123,16 +163,6 @@ def _read(self, ti, try_number, metadata=None):
log += local_log
return log, metadata
- def gcs_read(self, remote_log_location):
- """
- Returns the log found at the remote_log_location.
-
- :param remote_log_location: the log's location in remote storage
- :type remote_log_location: str (path)
- """
- bkt, blob = self.parse_gcs_url(remote_log_location)
- return self.hook.download(bkt, blob).decode('utf-8')
-
def gcs_write(self, log, remote_log_location):
"""
Writes the log to the remote_log_location. Fails silently if no hook
Review comment:
'Fails silently if no hook'
there is no hook any longer
##########
File path: airflow/providers/google/cloud/log/gcs_task_handler.py
##########
@@ -16,48 +16,87 @@
# specific language governing permissions and limitations
# under the License.
import os
-from urllib.parse import urlparse
+from typing import Collection, Optional
from cached_property import cached_property
+from google.api_core.client_info import ClientInfo
+from google.cloud import storage
-from airflow.configuration import conf
-from airflow.exceptions import AirflowException
+from airflow import version
+from airflow.providers.google.cloud.utils.credentials_provider import
get_credentials_and_project_id
from airflow.utils.log.file_task_handler import FileTaskHandler
from airflow.utils.log.logging_mixin import LoggingMixin
+_DEFAULT_SCOPESS = frozenset([
+ "https://www.googleapis.com/auth/devstorage.read_write",
+])
+
class GCSTaskHandler(FileTaskHandler, LoggingMixin):
"""
GCSTaskHandler is a python log handler that handles and reads
task instance logs. It extends airflow FileTaskHandler and
uploads to and reads from GCS remote storage. Upon log reading
failure, it reads from host machine's local disk.
+
+ :param base_log_folder: Base log folder to place logs.
+ :type base_log_folder: str
+ :param gcs_log_folder: Path to a remote location where logs will be saved.
It must have the prefix
+ ``gs://``. For example: ``gs://bucket/remote/log/location``
+ :type gcs_log_folder: str
+ :param filename_template: template filename string
+ :type filename_template: str
+ :param gcp_key_path: Path to GCP Credential JSON file. Mutually exclusive
with gcp_keyfile_dict.
+ If omitted, authorization based on `the Application Default Credentials
+
<https://cloud.google.com/docs/authentication/production#finding_credentials_automatically>`__
will
+ be used.
+ :type gcp_key_path: str
+ :param gcp_keyfile_dict: Dictionary of keyfile parameters. Mutually
exclusive with gcp_key_path.
+ :type gcp_keyfile_dict: dict
+ :param gcp_scopes: Comma-separated string containing GCP scopes
+ :type gcp_scopes: str
+ :param project_id: Project ID to read the secrets from. If not passed, the
project ID from credentials
+ will be used.
+ :type project_id: str
"""
- def __init__(self, base_log_folder, gcs_log_folder, filename_template):
+ def __init__(
+ self,
+ *,
+ base_log_folder: str,
+ gcs_log_folder: str,
+ filename_template: str,
+ gcp_key_path: Optional[str] = None,
+ gcp_keyfile_dict: Optional[dict] = None,
+ # See: https://github.com/PyCQA/pylint/issues/2377
+ gcp_scopes: Optional[Collection[str]] = _DEFAULT_SCOPESS, # pylint:
disable=unsubscriptable-object
+ project_id: Optional[str] = None,
+ ):
super().__init__(base_log_folder, filename_template)
self.remote_base = gcs_log_folder
self.log_relative_path = ''
self._hook = None
self.closed = False
self.upload_on_close = True
+ self.gcp_key_path = gcp_key_path
+ self.gcp_keyfile_dict = gcp_keyfile_dict
+ self.scopes = gcp_scopes
+ self.project_id = project_id
@cached_property
- def hook(self):
- """
- Returns GCS hook.
- """
- remote_conn_id = conf.get('logging', 'REMOTE_LOG_CONN_ID')
- try:
- from airflow.providers.google.cloud.hooks.gcs import GCSHook
- return GCSHook(
- google_cloud_storage_conn_id=remote_conn_id
- )
- except Exception as e: # pylint: disable=broad-except
- self.log.error(
- 'Could not create a GoogleCloudStorageHook with connection id '
- '"%s". %s\n\nPlease make sure that airflow[gcp] is installed '
- 'and the GCS connection exists.', remote_conn_id, str(e)
- )
+ def client(self) -> storage.Client:
+ """Returns GCS Client."""
+ credentials, project_id = get_credentials_and_project_id(
+ key_path=self.gcp_key_path,
+ keyfile_dict=self.gcp_keyfile_dict,
+ scopes=self.scopes,
+ disable_logging=True
+ )
+ client = storage.Client(
Review comment:
nit: why do you introduce the 'client' variable.
Can't you just write:
return storage.Client....
##########
File path: airflow/providers/google/cloud/log/gcs_task_handler.py
##########
@@ -16,48 +16,87 @@
# specific language governing permissions and limitations
# under the License.
import os
-from urllib.parse import urlparse
+from typing import Collection, Optional
from cached_property import cached_property
+from google.api_core.client_info import ClientInfo
+from google.cloud import storage
-from airflow.configuration import conf
-from airflow.exceptions import AirflowException
+from airflow import version
+from airflow.providers.google.cloud.utils.credentials_provider import
get_credentials_and_project_id
from airflow.utils.log.file_task_handler import FileTaskHandler
from airflow.utils.log.logging_mixin import LoggingMixin
+_DEFAULT_SCOPESS = frozenset([
+ "https://www.googleapis.com/auth/devstorage.read_write",
+])
+
class GCSTaskHandler(FileTaskHandler, LoggingMixin):
"""
GCSTaskHandler is a python log handler that handles and reads
task instance logs. It extends airflow FileTaskHandler and
uploads to and reads from GCS remote storage. Upon log reading
failure, it reads from host machine's local disk.
+
+ :param base_log_folder: Base log folder to place logs.
+ :type base_log_folder: str
+ :param gcs_log_folder: Path to a remote location where logs will be saved.
It must have the prefix
+ ``gs://``. For example: ``gs://bucket/remote/log/location``
+ :type gcs_log_folder: str
+ :param filename_template: template filename string
+ :type filename_template: str
+ :param gcp_key_path: Path to GCP Credential JSON file. Mutually exclusive
with gcp_keyfile_dict.
+ If omitted, authorization based on `the Application Default Credentials
+
<https://cloud.google.com/docs/authentication/production#finding_credentials_automatically>`__
will
+ be used.
+ :type gcp_key_path: str
+ :param gcp_keyfile_dict: Dictionary of keyfile parameters. Mutually
exclusive with gcp_key_path.
+ :type gcp_keyfile_dict: dict
+ :param gcp_scopes: Comma-separated string containing GCP scopes
+ :type gcp_scopes: str
+ :param project_id: Project ID to read the secrets from. If not passed, the
project ID from credentials
+ will be used.
+ :type project_id: str
"""
- def __init__(self, base_log_folder, gcs_log_folder, filename_template):
+ def __init__(
+ self,
+ *,
+ base_log_folder: str,
+ gcs_log_folder: str,
+ filename_template: str,
+ gcp_key_path: Optional[str] = None,
+ gcp_keyfile_dict: Optional[dict] = None,
+ # See: https://github.com/PyCQA/pylint/issues/2377
+ gcp_scopes: Optional[Collection[str]] = _DEFAULT_SCOPESS, # pylint:
disable=unsubscriptable-object
+ project_id: Optional[str] = None,
+ ):
super().__init__(base_log_folder, filename_template)
self.remote_base = gcs_log_folder
self.log_relative_path = ''
self._hook = None
self.closed = False
self.upload_on_close = True
+ self.gcp_key_path = gcp_key_path
+ self.gcp_keyfile_dict = gcp_keyfile_dict
+ self.scopes = gcp_scopes
+ self.project_id = project_id
@cached_property
- def hook(self):
- """
- Returns GCS hook.
- """
- remote_conn_id = conf.get('logging', 'REMOTE_LOG_CONN_ID')
- try:
- from airflow.providers.google.cloud.hooks.gcs import GCSHook
- return GCSHook(
- google_cloud_storage_conn_id=remote_conn_id
- )
- except Exception as e: # pylint: disable=broad-except
- self.log.error(
- 'Could not create a GoogleCloudStorageHook with connection id '
- '"%s". %s\n\nPlease make sure that airflow[gcp] is installed '
- 'and the GCS connection exists.', remote_conn_id, str(e)
- )
+ def client(self) -> storage.Client:
+ """Returns GCS Client."""
+ credentials, project_id = get_credentials_and_project_id(
+ key_path=self.gcp_key_path,
+ keyfile_dict=self.gcp_keyfile_dict,
+ scopes=self.scopes,
+ disable_logging=True
+ )
+ client = storage.Client(
+ credentials=credentials,
+ client_info=ClientInfo(client_library_version='airflow_v' +
version.version),
Review comment:
what is 'airflow_v'? - looks magical for me.
##########
File path: airflow/providers/google/cloud/log/gcs_task_handler.py
##########
@@ -144,28 +174,15 @@ def gcs_write(self, log, remote_log_location):
:type remote_log_location: str (path)
"""
try:
- old_log = self.gcs_read(remote_log_location)
+ blob = storage.Blob.from_string(remote_log_location, self.client)
+ old_log = blob.download_as_string()
log = '\n'.join([old_log, log]) if old_log else log
except Exception as e: # pylint: disable=broad-except
Review comment:
nit: btw. isn't it worth to log the exeption?
##########
File path: airflow/config_templates/config.yml
##########
@@ -402,9 +402,9 @@
type: string
example: ~
default: ""
- - name: stackdriver_key_path
Review comment:
I am not sure if there is 'deprecation policy' but if someone was
actually using this setting. After upgrade it will stop working.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]