andreahlert commented on code in PR #62867:
URL: https://github.com/apache/airflow/pull/62867#discussion_r2920720023


##########
providers/common/sql/src/airflow/providers/common/sql/datafusion/engine.py:
##########
@@ -158,6 +158,58 @@ def _fetch_extra_configs(keys: list[str]) -> dict[str, 
Any]:
                 credentials = self._remove_none_values(credentials)
                 extra_config = _fetch_extra_configs(["region", "endpoint"])
 
+            case "google_cloud_platform":
+                try:
+                    from airflow.providers.google.common.hooks.base_google 
import get_field as gcp_get_field
+                except ImportError:
+                    from airflow.providers.common.compat.sdk import 
AirflowOptionalProviderFeatureException
+
+                    raise AirflowOptionalProviderFeatureException(
+                        "Failed to import get_field. To use the GCS storage 
functionality, please install the "
+                        "apache-airflow-providers-google package."
+                    )
+                key_path = gcp_get_field(conn.extra_dejson, "key_path")
+                if key_path:
+                    credentials.update({"service_account_path": key_path})
+                # Without key_path, credentials stays empty and DataFusion 
falls back to ADC.
+                extra_config = {}
+
+            case "wasb":
+                try:
+                    # Imported as a feature gate only: verifies the Azure 
provider is installed
+                    from airflow.providers.microsoft.azure.hooks.wasb import 
WasbHook  # noqa: F401
+                except ImportError:
+                    from airflow.providers.common.compat.sdk import 
AirflowOptionalProviderFeatureException
+
+                    raise AirflowOptionalProviderFeatureException(
+                        "Failed to import WasbHook. To use the Azure storage 
functionality, please install the "
+                        "apache-airflow-providers-microsoft-azure package."
+                    )
+                tenant_id = conn.extra_dejson.get("tenant_id")
+                if tenant_id:
+                    # Service Principal auth: conn.host holds the storage 
account (name or full URL);
+                    # conn.login is the client_id (AAD app ID), matching 
WasbHook convention.
+                    # DataFusion requires just the account name, so strip any 
URL components.
+                    from urllib.parse import urlparse
+
+                    host = conn.host or ""
+                    parsed = urlparse(host if "://" in host else 
f"https://{host}";)
+                    account = parsed.netloc.split(".")[0] if "." in 
(parsed.netloc or "") else host
+                    credentials = {
+                        "account": account or None,
+                        "client_id": conn.extra_dejson.get("client_id") or 
conn.login,
+                        "client_secret": 
conn.extra_dejson.get("client_secret") or conn.password,
+                        "tenant_id": tenant_id,
+                    }
+                else:
+                    # Key auth: conn.login = storage account name
+                    credentials = {
+                        "account": conn.login,
+                        "access_key": conn.password or 
conn.extra_dejson.get("account_key"),
+                    }
+                credentials = self._remove_none_values(credentials)
+                extra_config = {}

Review Comment:
   If both `conn.login` and `conn.password`/`account_key` are absent, 
`_remove_none_values` returns an empty dict and DataFusion fails with a cryptic 
internal error. Worth raising a `ValueError` early pointing to the missing 
connection fields, otherwise the user has no idea where to look.



##########
providers/common/sql/src/airflow/providers/common/sql/datafusion/engine.py:
##########
@@ -158,6 +158,58 @@ def _fetch_extra_configs(keys: list[str]) -> dict[str, 
Any]:
                 credentials = self._remove_none_values(credentials)
                 extra_config = _fetch_extra_configs(["region", "endpoint"])
 
+            case "google_cloud_platform":
+                try:
+                    from airflow.providers.google.common.hooks.base_google 
import get_field as gcp_get_field
+                except ImportError:
+                    from airflow.providers.common.compat.sdk import 
AirflowOptionalProviderFeatureException
+
+                    raise AirflowOptionalProviderFeatureException(
+                        "Failed to import get_field. To use the GCS storage 
functionality, please install the "
+                        "apache-airflow-providers-google package."
+                    )
+                key_path = gcp_get_field(conn.extra_dejson, "key_path")
+                if key_path:
+                    credentials.update({"service_account_path": key_path})
+                # Without key_path, credentials stays empty and DataFusion 
falls back to ADC.
+                extra_config = {}
+
+            case "wasb":
+                try:
+                    # Imported as a feature gate only: verifies the Azure 
provider is installed
+                    from airflow.providers.microsoft.azure.hooks.wasb import 
WasbHook  # noqa: F401
+                except ImportError:
+                    from airflow.providers.common.compat.sdk import 
AirflowOptionalProviderFeatureException
+
+                    raise AirflowOptionalProviderFeatureException(
+                        "Failed to import WasbHook. To use the Azure storage 
functionality, please install the "
+                        "apache-airflow-providers-microsoft-azure package."
+                    )
+                tenant_id = conn.extra_dejson.get("tenant_id")
+                if tenant_id:
+                    # Service Principal auth: conn.host holds the storage 
account (name or full URL);
+                    # conn.login is the client_id (AAD app ID), matching 
WasbHook convention.
+                    # DataFusion requires just the account name, so strip any 
URL components.
+                    from urllib.parse import urlparse
+
+                    host = conn.host or ""
+                    parsed = urlparse(host if "://" in host else 
f"https://{host}";)
+                    account = parsed.netloc.split(".")[0] if "." in 
(parsed.netloc or "") else host
+                    credentials = {

Review Comment:
   The `":/" in host` branch means you explicitly handle URLs with schemes. But 
if `conn.host` is `"https://mystorageaccount"` (scheme present, no subdomain), 
`netloc` becomes `mystorageaccount`, the `". " in netloc` check fails, and 
`account` falls back to the raw `host` string, scheme included. The fallback 
should be `parsed.netloc or host`, not `host`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to