mhenc commented on a change in pull request #10365: URL: https://github.com/apache/airflow/pull/10365#discussion_r472029453
########## File path: airflow/providers/google/cloud/log/gcs_task_handler.py ########## @@ -16,48 +16,87 @@ # specific language governing permissions and limitations # under the License. import os -from urllib.parse import urlparse +from typing import Collection, Optional from cached_property import cached_property +from google.api_core.client_info import ClientInfo +from google.cloud import storage -from airflow.configuration import conf -from airflow.exceptions import AirflowException +from airflow import version +from airflow.providers.google.cloud.utils.credentials_provider import get_credentials_and_project_id from airflow.utils.log.file_task_handler import FileTaskHandler from airflow.utils.log.logging_mixin import LoggingMixin +_DEFAULT_SCOPESS = frozenset([ + "https://www.googleapis.com/auth/devstorage.read_write", +]) + class GCSTaskHandler(FileTaskHandler, LoggingMixin): """ GCSTaskHandler is a python log handler that handles and reads task instance logs. It extends airflow FileTaskHandler and uploads to and reads from GCS remote storage. Upon log reading failure, it reads from host machine's local disk. + + :param base_log_folder: Base log folder to place logs. + :type base_log_folder: str + :param gcs_log_folder: Path to a remote location where logs will be saved. It must have the prefix + ``gs://``. For example: ``gs://bucket/remote/log/location`` + :type gcs_log_folder: str + :param filename_template: template filename string + :type filename_template: str + :param gcp_key_path: Path to GCP Credential JSON file. Mutually exclusive with gcp_keyfile_dict. + If omitted, authorization based on `the Application Default Credentials + <https://cloud.google.com/docs/authentication/production#finding_credentials_automatically>`__ will + be used. + :type gcp_key_path: str + :param gcp_keyfile_dict: Dictionary of keyfile parameters. Mutually exclusive with gcp_key_path. + :type gcp_keyfile_dict: dict + :param gcp_scopes: Comma-separated string containing GCP scopes + :type gcp_scopes: str + :param project_id: Project ID to read the secrets from. If not passed, the project ID from credentials + will be used. + :type project_id: str """ - def __init__(self, base_log_folder, gcs_log_folder, filename_template): + def __init__( + self, + *, + base_log_folder: str, + gcs_log_folder: str, + filename_template: str, + gcp_key_path: Optional[str] = None, + gcp_keyfile_dict: Optional[dict] = None, + # See: https://github.com/PyCQA/pylint/issues/2377 + gcp_scopes: Optional[Collection[str]] = _DEFAULT_SCOPESS, # pylint: disable=unsubscriptable-object + project_id: Optional[str] = None, + ): super().__init__(base_log_folder, filename_template) self.remote_base = gcs_log_folder self.log_relative_path = '' self._hook = None self.closed = False self.upload_on_close = True + self.gcp_key_path = gcp_key_path + self.gcp_keyfile_dict = gcp_keyfile_dict + self.scopes = gcp_scopes + self.project_id = project_id @cached_property - def hook(self): - """ - Returns GCS hook. - """ - remote_conn_id = conf.get('logging', 'REMOTE_LOG_CONN_ID') - try: - from airflow.providers.google.cloud.hooks.gcs import GCSHook - return GCSHook( - google_cloud_storage_conn_id=remote_conn_id - ) - except Exception as e: # pylint: disable=broad-except - self.log.error( - 'Could not create a GoogleCloudStorageHook with connection id ' - '"%s". %s\n\nPlease make sure that airflow[gcp] is installed ' - 'and the GCS connection exists.', remote_conn_id, str(e) - ) + def client(self) -> storage.Client: + """Returns GCS Client.""" + credentials, project_id = get_credentials_and_project_id( + key_path=self.gcp_key_path, + keyfile_dict=self.gcp_keyfile_dict, + scopes=self.scopes, + disable_logging=True + ) + client = storage.Client( + credentials=credentials, + client_info=ClientInfo(client_library_version='airflow_v' + version.version), Review comment: Ok, why it's not in any constant then? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
