Dummk0pf opened a new issue, #4747:
URL: https://github.com/apache/datafusion-comet/issues/4747
### Describe the bug
When using Comet's native scan in `Comet 0.16.0`, reading Parquet files from
Azure Blob Storage (ABFS) fails when authenticating via Azure `Workload
Identity` on AKS.
While the JVM side correctly collects Hadoop `fs.azure.*` configurations,
the Rust native scanner drops these configs and ignores the `AZURE_*`
environment variables injected by the **Azure Workload Identity webhook**.
Consequently, the credential chain falls back to `ImdsManagedIdentityProvider`,
which fails with a 400 Bad Request if the AKS node has multiple user-assigned
managed identities attached. Even though the correct `clientID` and `tenantID`
were passed via environment variables
### Steps to reproduce
| Category | Details |
| ------------- | ------------- |
| Spark | 3.5.6 |
| Comet | 0.16.0
(https://repo1.maven.org/maven2/org/apache/datafusion/comet-spark-spark3.5_2.12/0.16.0/comet-spark-spark3.5_2.12-0.16.0.jar)
|
| Cluster Manager | Kubernetes (Azure Kubernetes Service) |
| Storage | Azure cloud storage with necessary ABFS drivers in the spark
driver and setted up to be authenticated using Workload Identity attached to
the kubernetes service account |
- Deploy Spark cluster (3.5.6) with Comet 0.16.0 enabled on an Azure
Kubernetes Service (AKS) cluster.
- Configure Azure Workload Identity (which injects AZURE_CLIENT_ID,
AZURE_TENANT_ID, and AZURE_FEDERATED_TOKEN_FILE env vars).
- In the configurations for the `spark-submit`, attach the following
```
--conf spark.comet.enabled=true
--conf spark.comet.exec.enabled=true
--conf spark.comet.scan.enabled=true
--conf spark.comet.convert.parquet.enabled=true
--conf spark.comet.expression.regexp.allowIncompatible=true
--conf spark.comet.expression.RegExpReplace.allowIncompatible=true
--conf spark.comet.expression.RegExpExtract.allowIncompatible=true
--conf spark.comet.caseConversion.enabled=true
--conf spark.comet.exec.shuffle.enabled=true
--conf spark.comet.exec.shuffle.mode=auto
--conf spark.comet.batchSize=8192
--conf spark.comet.memoryOverhead=$COMET_MEMORY_OVERHEAD
--conf spark.memory.offHeap.enabled=true
--conf spark.memory.offHeap.size=$OFFHEAP_SIZE
--conf spark.comet.explainFallback.enabled=true
--conf spark.comet.logFallbackReasons.enabled=true
## Azure related configurations
--conf spark.hadoop.fs.azure.account.oauth2.msi.tenant="$TENANT_ID"
--conf
spark.hadoop.fs.azure.account.auth.type."$STORAGE_ACCOUNT".dfs.core.windows.net=OAuth
--conf
spark.hadoop.fs.azure.account.oauth2.client.id"$STORAGE_ACCOUNT".dfs.core.windows.net="$CLIENT_ID"
--conf
spark.hadoop.fs.azure.impl=org.apache.hadoop.fs.azure.NativeAzureFileSystem
--conf
spark.hadoop.fs.azure.account.oauth2.client.federated.token.file."$STORAGE_ACCOUNT".dfs.core.windows.net=/var/run/secrets/azure/tokens/azure-identity-token
--conf
spark.hadoop.fs.azure.account.oauth.provider.type."$STORAGE_ACCOUNT".dfs.core.windows.net=org.apache.hadoop.fs.azurebfs.oauth2.WorkloadIdentityTokenProvider
## There are other configurations related to kubernetes and spark which
are skipped for the sake of brevity
```
- Attempt to read a Parquet file stored in an Azure storage bucket as a
spark dataframe using an `abfss://` URI.
- The query fails with an HTTP 400 error from IMDS: "Multiple user assigned
identities exist, please specify the clientId / resourceId".
### Expected behavior
Comet's Native scan must pickup the `AZURE*` environment variables in the
pod and also pass the hadoop `fz.azure*` configurations to the Rust native
scanner. Then it must use them to authenticate via workload identity, and then
load the corresponding parquet files into a spark dataframe.
### Additional context
In `datafusion-comet/native/core/src/parquet/parquet_support.rs`, the
prepare_object_store_with_configs function successfully handles custom
configurations for S3 but delegates Azure URLs directly to
object_store::parse_url(&url):
Rust
```
// datafusion-comet/native/core/src/parquet/parquet_support.rs (Lines
597-620)
let (store, path): (Box<dyn ObjectStore>, Path) = if is_hdfs_scheme {
create_hdfs_object_store(&url)
} else if scheme == "s3" {
objectstore::s3::create_store(&url, object_store_configs,
Duration::from_secs(300))
} else {
parse_url(&url) // <--- Azure configs dropped here
}
```
The pinned version of object_store (0.13.1) calls parse_url_opts(url,
std::iter::empty()). This expands to
MicrosoftAzureBuilder::new().with_url(url).build(). Because
MicrosoftAzureBuilder::from_env() is never called, and the configs are dropped,
the builder lacks the account key, SAS token, or OIDC federation details,
forcing the failing IMDS fallback.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]