This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 87a249a02f fix: Pass proxies config when using ClientSecretCredential
in AzureDataLakeStorageV2Hook (#37103)
87a249a02f is described below
commit 87a249a02f3665746d05b7a309c288bfda2d4cc2
Author: David Blain <[email protected]>
AuthorDate: Fri Mar 1 11:32:17 2024 +0100
fix: Pass proxies config when using ClientSecretCredential in
AzureDataLakeStorageV2Hook (#37103)
* fix: Pass proxies config when using ClientSecretCredential in
AzureDataLakeStorageV2Hook and added
---------
Co-authored-by: David Blain <[email protected]>
Co-authored-by: David Blain <[email protected]>
---
.../providers/microsoft/azure/hooks/data_lake.py | 12 +++++--
.../microsoft/azure/hooks/test_data_lake.py | 39 ++++++++++++++++++++++
.../azure/transfers/test_local_to_adls.py | 7 +++-
3 files changed, 55 insertions(+), 3 deletions(-)
diff --git a/airflow/providers/microsoft/azure/hooks/data_lake.py
b/airflow/providers/microsoft/azure/hooks/data_lake.py
index 3777e0e582..054eda087e 100644
--- a/airflow/providers/microsoft/azure/hooks/data_lake.py
+++ b/airflow/providers/microsoft/azure/hooks/data_lake.py
@@ -348,7 +348,11 @@ class AzureDataLakeStorageV2Hook(BaseHook):
# use Active Directory auth
app_id = conn.login
app_secret = conn.password
- credential = ClientSecretCredential(tenant, app_id, app_secret)
+ proxies = extra.get("proxies", {})
+
+ credential = ClientSecretCredential(
+ tenant_id=tenant, client_id=app_id, client_secret=app_secret,
proxies=proxies
+ )
elif conn.password:
credential = conn.password
else:
@@ -359,8 +363,12 @@ class AzureDataLakeStorageV2Hook(BaseHook):
workload_identity_tenant_id=workload_identity_tenant_id,
)
+ account_url = extra.pop("account_url",
f"https://{conn.host}.dfs.core.windows.net")
+
+ self.log.info("account_url: %s", account_url)
+
return DataLakeServiceClient(
- account_url=f"https://{conn.host}.dfs.core.windows.net",
+ account_url=account_url,
credential=credential, # type: ignore[arg-type]
**extra,
)
diff --git a/tests/providers/microsoft/azure/hooks/test_data_lake.py
b/tests/providers/microsoft/azure/hooks/test_data_lake.py
index 84a0a9420b..f1e10c5100 100644
--- a/tests/providers/microsoft/azure/hooks/test_data_lake.py
+++ b/tests/providers/microsoft/azure/hooks/test_data_lake.py
@@ -17,15 +17,20 @@
# under the License.
from __future__ import annotations
+from typing import TYPE_CHECKING
from unittest import mock
from unittest.mock import PropertyMock
import pytest
+from azure.core.pipeline.policies._universal import ProxyPolicy
from azure.storage.filedatalake._models import FileSystemProperties
from airflow.models import Connection
from airflow.providers.microsoft.azure.hooks.data_lake import
AzureDataLakeStorageV2Hook
+if TYPE_CHECKING:
+ from azure.storage.filedatalake import DataLakeServiceClient
+
MODULE = "airflow.providers.microsoft.azure.hooks.data_lake"
@@ -297,3 +302,37 @@ class TestAzureDataLakeStorageV2Hook:
assert status is False
assert msg == "Authentication failed."
+
+ @mock.patch(f"{MODULE}.AzureDataLakeStorageV2Hook.get_connection")
+ def test_proxies_passed_to_credentials(self, mock_conn):
+ hook = AzureDataLakeStorageV2Hook(adls_conn_id=self.conn_id)
+ mock_conn.return_value = Connection(
+ conn_id=self.conn_id,
+ login="client_id",
+ password="secret",
+ extra={
+ "tenant_id": "tenant-id",
+ "proxies": {"https": "https://proxy:80"},
+ "account_url": "https://onelake.dfs.fabric.microsoft.com",
+ },
+ )
+ conn: DataLakeServiceClient = hook.get_conn()
+
+ assert conn is not None
+ assert conn.primary_endpoint ==
"https://onelake.dfs.fabric.microsoft.com/"
+ assert conn.primary_hostname == "onelake.dfs.fabric.microsoft.com"
+ assert conn.scheme == "https"
+ assert conn.url == "https://onelake.dfs.fabric.microsoft.com/"
+ assert conn.credential._client_id == "client_id"
+ assert conn.credential._client_credential == "secret"
+ assert self.find_policy(conn, ProxyPolicy) is not None
+ assert self.find_policy(conn, ProxyPolicy).proxies["https"] ==
"https://proxy:80"
+
+ def find_policy(self, conn, policy_type):
+ policies = conn.credential._client._pipeline._impl_policies
+ return next(
+ map(
+ lambda policy: policy._policy,
+ filter(lambda policy: isinstance(policy._policy, policy_type),
policies),
+ )
+ )
diff --git a/tests/providers/microsoft/azure/transfers/test_local_to_adls.py
b/tests/providers/microsoft/azure/transfers/test_local_to_adls.py
index 1ef388eef7..50020b1692 100644
--- a/tests/providers/microsoft/azure/transfers/test_local_to_adls.py
+++ b/tests/providers/microsoft/azure/transfers/test_local_to_adls.py
@@ -17,17 +17,22 @@
# under the License.
from __future__ import annotations
+import json
from unittest import mock
import pytest
from airflow.exceptions import AirflowException
-from airflow.providers.microsoft.azure.transfers.local_to_adls import
LocalFilesystemToADLSOperator
+from airflow.providers.microsoft.azure.transfers.local_to_adls import (
+ LocalFilesystemToADLSOperator,
+)
TASK_ID = "test-adls-upload-operator"
+FILE_SYSTEM_NAME = "Fabric"
LOCAL_PATH = "test/*"
BAD_LOCAL_PATH = "test/**"
REMOTE_PATH = "TEST-DIR"
+DATA = json.dumps({"name": "David", "surname": "Blain", "gender":
"M"}).encode("utf-8")
class TestADLSUploadOperator: