This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b4c50dadd3 GCSTaskHandler may use remote log conn id (#29117)
b4c50dadd3 is described below
commit b4c50dadd36d66e4d222c627a61771653767afd6
Author: Daniel Standish <[email protected]>
AuthorDate: Tue Jan 24 15:25:04 2023 -0800
GCSTaskHandler may use remote log conn id (#29117)
---
.../providers/google/cloud/log/gcs_task_handler.py | 30 +++++++++++++++++-----
.../google/cloud/log/test_gcs_task_handler.py | 25 +++++++++++++-----
2 files changed, 41 insertions(+), 14 deletions(-)
diff --git a/airflow/providers/google/cloud/log/gcs_task_handler.py
b/airflow/providers/google/cloud/log/gcs_task_handler.py
index 5fbba80798..a264821093 100644
--- a/airflow/providers/google/cloud/log/gcs_task_handler.py
+++ b/airflow/providers/google/cloud/log/gcs_task_handler.py
@@ -24,6 +24,9 @@ from typing import Collection
from google.cloud import storage # type: ignore[attr-defined]
from airflow.compat.functools import cached_property
+from airflow.configuration import conf
+from airflow.exceptions import AirflowNotFoundException
+from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.providers.google.cloud.utils.credentials_provider import
get_credentials_and_project_id
from airflow.providers.google.common.consts import CLIENT_INFO
from airflow.utils.log.file_task_handler import FileTaskHandler
@@ -72,7 +75,6 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
super().__init__(base_log_folder, filename_template)
self.remote_base = gcs_log_folder
self.log_relative_path = ""
- self._hook = None
self.closed = False
self.upload_on_close = True
self.gcp_key_path = gcp_key_path
@@ -80,15 +82,29 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
self.scopes = gcp_scopes
self.project_id = project_id
+ @cached_property
+ def hook(self) -> GCSHook | None:
+ """Returns GCSHook if remote_log_conn_id configured."""
+ conn_id = conf.get("logging", "remote_log_conn_id", fallback=None)
+ if conn_id:
+ try:
+ return GCSHook(gcp_conn_id=conn_id)
+ except AirflowNotFoundException:
+ pass
+ return None
+
@cached_property
def client(self) -> storage.Client:
"""Returns GCS Client."""
- credentials, project_id = get_credentials_and_project_id(
- key_path=self.gcp_key_path,
- keyfile_dict=self.gcp_keyfile_dict,
- scopes=self.scopes,
- disable_logging=True,
- )
+ if self.hook:
+ credentials, project_id =
self.hook.get_credentials_and_project_id()
+ else:
+ credentials, project_id = get_credentials_and_project_id(
+ key_path=self.gcp_key_path,
+ keyfile_dict=self.gcp_keyfile_dict,
+ scopes=self.scopes,
+ disable_logging=True,
+ )
return storage.Client(
credentials=credentials,
client_info=CLIENT_INFO,
diff --git a/tests/providers/google/cloud/log/test_gcs_task_handler.py
b/tests/providers/google/cloud/log/test_gcs_task_handler.py
index 049a627336..b801d1fc05 100644
--- a/tests/providers/google/cloud/log/test_gcs_task_handler.py
+++ b/tests/providers/google/cloud/log/test_gcs_task_handler.py
@@ -21,6 +21,7 @@ import tempfile
from unittest import mock
import pytest
+from pytest import param
from airflow.providers.google.cloud.log.gcs_task_handler import GCSTaskHandler
from airflow.utils.state import TaskInstanceState
@@ -59,15 +60,25 @@ class TestGCSTaskHandler:
)
yield self.gcs_task_handler
- @mock.patch(
-
"airflow.providers.google.cloud.log.gcs_task_handler.get_credentials_and_project_id",
- return_value=("TEST_CREDENTIALS", "TEST_PROJECT_ID"),
- )
+ @mock.patch("airflow.providers.google.cloud.log.gcs_task_handler.GCSHook")
@mock.patch("google.cloud.storage.Client")
- def test_hook(self, mock_client, mock_creds):
- return_value = self.gcs_task_handler.client
+
@mock.patch("airflow.providers.google.cloud.log.gcs_task_handler.get_credentials_and_project_id")
+ @pytest.mark.parametrize("conn_id", [param("", id="no-conn"),
param("my_gcs_conn", id="with-conn")])
+ def test_client_conn_id_behavior(self, mock_get_cred, mock_client,
mock_hook, conn_id):
+ """When remote log conn id configured, hook will be used"""
+ mock_hook.return_value.get_credentials_and_project_id.return_value =
("test_cred", "test_proj")
+ mock_get_cred.return_value = ("test_cred", "test_proj")
+ with conf_vars({("logging", "remote_log_conn_id"): conn_id}):
+ return_value = self.gcs_task_handler.client
+ if conn_id:
+ mock_hook.assert_called_once_with(gcp_conn_id="my_gcs_conn")
+ mock_get_cred.assert_not_called()
+ else:
+ mock_hook.assert_not_called()
+ mock_get_cred.assert_called()
+
mock_client.assert_called_once_with(
- client_info=mock.ANY, credentials="TEST_CREDENTIALS",
project="TEST_PROJECT_ID"
+ client_info=mock.ANY, credentials="test_cred", project="test_proj"
)
assert mock_client.return_value == return_value