mukund-thakur commented on code in PR #6787: URL: https://github.com/apache/hadoop/pull/6787#discussion_r1591518494
########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/WorkloadIdentityTokenProvider.java: ########## @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.oauth2; + +import java.io.File; +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.thirdparty.com.google.common.base.Strings; +import org.apache.hadoop.util.Preconditions; + +/** + * Provides tokens based on Azure AD Workload Identity. + */ +public class WorkloadIdentityTokenProvider extends AccessTokenProvider { + + private static final String OAUTH2_TOKEN_PATH = "/oauth2/v2.0/token"; + private static final long ONE_HOUR = 3600 * 1000; + private static final Logger LOG = LoggerFactory.getLogger(AccessTokenProvider.class); + + private final String authEndpoint; + private final String clientId; + private final String tokenFile; + private long tokenFetchTime = -1; + + public WorkloadIdentityTokenProvider(final String authority, final String tenantId, + final String clientId, final String tokenFile) { + Preconditions.checkNotNull(authority, "authority"); + Preconditions.checkNotNull(tenantId, "tenantId"); + Preconditions.checkNotNull(clientId, "clientId"); + Preconditions.checkNotNull(tokenFile, "tokenFile"); + + this.authEndpoint = authority + tenantId + OAUTH2_TOKEN_PATH; + this.clientId = clientId; + this.tokenFile = tokenFile; + } + + @Override + protected AzureADToken refreshToken() throws IOException { + LOG.debug("AADToken: refreshing token from JWT Assertion"); + String clientAssertion = getClientAssertion(); + AzureADToken token = getTokenUsingJWTAssertion(clientAssertion); + tokenFetchTime = System.currentTimeMillis(); + return token; + } + + /** + * Gets the Azure AD token from a client assertion in JWT format. + * This method exists to make unit testing possible. + * + * @param clientAssertion the client assertion. + * @return the Azure AD token. + * @throws IOException if there is a failure in connecting to Azure AD. + */ + @VisibleForTesting + AzureADToken getTokenUsingJWTAssertion(String clientAssertion) throws IOException { + return AzureADAuthenticator + .getTokenUsingJWTAssertion(authEndpoint, clientId, clientAssertion); + } + + /** + * Checks if the token is about to expire as per base expiry logic. + * Otherwise, try to expire if enough time has elapsed since the last refresh. + * + * @return true if the token is expiring in next 1 hour or if a token has + * never been fetched + */ + @Override + protected boolean isTokenAboutToExpire() { + return super.isTokenAboutToExpire() || hasEnoughTimeElapsedSinceLastRefresh(); + } + + /** + * Checks to see if enough time has elapsed since the last token refresh. + * + * @return true if the token was last refreshed more than an hour ago. + */ + protected boolean hasEnoughTimeElapsedSinceLastRefresh() { + if (getTokenFetchTime() == -1) { + return true; + } + boolean expiring = false; + long elapsedTimeSinceLastTokenRefreshInMillis = + System.currentTimeMillis() - getTokenFetchTime(); + // In case token is not refreshed for 1 hr or any clock skew issues, + // refresh token. + expiring = elapsedTimeSinceLastTokenRefreshInMillis >= ONE_HOUR Review Comment: Any specific reason for choosing 1 hour refresh interval? ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/WorkloadIdentityTokenProvider.java: ########## @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.oauth2; + +import java.io.File; +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.thirdparty.com.google.common.base.Strings; +import org.apache.hadoop.util.Preconditions; + +/** + * Provides tokens based on Azure AD Workload Identity. + */ +public class WorkloadIdentityTokenProvider extends AccessTokenProvider { + + private static final String OAUTH2_TOKEN_PATH = "/oauth2/v2.0/token"; + private static final long ONE_HOUR = 3600 * 1000; + private static final Logger LOG = LoggerFactory.getLogger(AccessTokenProvider.class); + + private final String authEndpoint; + private final String clientId; + private final String tokenFile; + private long tokenFetchTime = -1; + + public WorkloadIdentityTokenProvider(final String authority, final String tenantId, + final String clientId, final String tokenFile) { + Preconditions.checkNotNull(authority, "authority"); + Preconditions.checkNotNull(tenantId, "tenantId"); + Preconditions.checkNotNull(clientId, "clientId"); + Preconditions.checkNotNull(tokenFile, "tokenFile"); + + this.authEndpoint = authority + tenantId + OAUTH2_TOKEN_PATH; + this.clientId = clientId; + this.tokenFile = tokenFile; + } + + @Override + protected AzureADToken refreshToken() throws IOException { + LOG.debug("AADToken: refreshing token from JWT Assertion"); + String clientAssertion = getClientAssertion(); + AzureADToken token = getTokenUsingJWTAssertion(clientAssertion); + tokenFetchTime = System.currentTimeMillis(); + return token; + } + + /** + * Gets the Azure AD token from a client assertion in JWT format. + * This method exists to make unit testing possible. + * + * @param clientAssertion the client assertion. + * @return the Azure AD token. + * @throws IOException if there is a failure in connecting to Azure AD. + */ + @VisibleForTesting + AzureADToken getTokenUsingJWTAssertion(String clientAssertion) throws IOException { + return AzureADAuthenticator + .getTokenUsingJWTAssertion(authEndpoint, clientId, clientAssertion); + } + + /** + * Checks if the token is about to expire as per base expiry logic. + * Otherwise, try to expire if enough time has elapsed since the last refresh. + * + * @return true if the token is expiring in next 1 hour or if a token has + * never been fetched + */ + @Override + protected boolean isTokenAboutToExpire() { + return super.isTokenAboutToExpire() || hasEnoughTimeElapsedSinceLastRefresh(); + } + + /** + * Checks to see if enough time has elapsed since the last token refresh. + * + * @return true if the token was last refreshed more than an hour ago. + */ + protected boolean hasEnoughTimeElapsedSinceLastRefresh() { + if (getTokenFetchTime() == -1) { + return true; + } + boolean expiring = false; + long elapsedTimeSinceLastTokenRefreshInMillis = + System.currentTimeMillis() - getTokenFetchTime(); + // In case token is not refreshed for 1 hr or any clock skew issues, + // refresh token. + expiring = elapsedTimeSinceLastTokenRefreshInMillis >= ONE_HOUR + || elapsedTimeSinceLastTokenRefreshInMillis < 0; + if (expiring) { + LOG.debug("JWTToken: token renewing. Time elapsed since last token fetch:" + + " {} milliseconds", elapsedTimeSinceLastTokenRefreshInMillis); + } + return expiring; + } + + /** + * Gets the client assertion from the token file. The token in the file + * is automatically refreshed by Azure at least once every 24 hours. + * See <a href="https://azure.github.io/azure-workload-identity/docs/faq.html#does-workload-identity-work-in-disconnected-environments"> + * Azure Workload Identity FAQ</a>. + * + * @return the client assertion. + * @throws IOException if the token file is empty. + */ + private String getClientAssertion() + throws IOException { + File file = new File(tokenFile); + String clientAssertion = FileUtils.readFileToString(file, "UTF-8"); Review Comment: Why not do a try catch and throw the actual exception that occurred during the file read? and if no value is present throw below IOE. ########## hadoop-tools/hadoop-azure/src/site/markdown/testing_azure.md: ########## @@ -879,6 +879,42 @@ hierarchical namespace enabled, and set the following configuration settings: </property> --> + <!--2.5. If "WorkloadIdentityTokenProvider" is set as key provider, uncomment below and + set tenant, client id and token file path. + + All service principals must have federated identity credentials for Kubernetes. + See Azure docs: https://learn.microsoft.com/en-us/azure/active-directory/workload-identities/workload-identity-federation-create-trust?pivots=identity-wif-apps-methods-azp#kubernetes + + Retrieve the Azure identity token from kubernetes: + 1. Create AKS cluster with Workload Identity: https://learn.microsoft.com/en-us/azure/aks/workload-identity-deploy-cluster + 2. Create the pod: + kubectl apply -f src/test/resources/workload-identity-pod.yaml + 3. After the pod is running, retrieve the identity token from the pod logs: + kubectl logs pod/workload-identity + 4. Save the identity token to the token file path specified below. + + The Azure identity token expires after 1 hour. + --> + <!-- + <property> + <name>fs.azure.account.oauth2.msi.tenant.{ABFS_ACCOUNT_NAME}</name> + <value>{tenantGuid}</value> + <description>msi tenantGuid.</description> + </property> + + <property> + <name>fs.azure.account.oauth2.client.id.{ABFS_ACCOUNT_NAME}</name> + <value>{client id}</value> + <description>AAD client id.</description> + </property> + + <property> + <name>fs.azure.account.oauth2.client.token.file.{ABFS_ACCOUNT_NAME}</name> + <value>{token file path}</value> Review Comment: Is there any specific format inside the token file? If it is just a string please mention that. if something else, give an example of how it will be, add validations while reading and add tests for invalid formats. ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/WorkloadIdentityTokenProvider.java: ########## @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.oauth2; + +import java.io.File; +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.thirdparty.com.google.common.base.Strings; +import org.apache.hadoop.util.Preconditions; + +/** + * Provides tokens based on Azure AD Workload Identity. + */ +public class WorkloadIdentityTokenProvider extends AccessTokenProvider { + + private static final String OAUTH2_TOKEN_PATH = "/oauth2/v2.0/token"; + private static final long ONE_HOUR = 3600 * 1000; + private static final Logger LOG = LoggerFactory.getLogger(AccessTokenProvider.class); + + private final String authEndpoint; + private final String clientId; + private final String tokenFile; + private long tokenFetchTime = -1; + + public WorkloadIdentityTokenProvider(final String authority, final String tenantId, + final String clientId, final String tokenFile) { + Preconditions.checkNotNull(authority, "authority"); + Preconditions.checkNotNull(tenantId, "tenantId"); + Preconditions.checkNotNull(clientId, "clientId"); + Preconditions.checkNotNull(tokenFile, "tokenFile"); + + this.authEndpoint = authority + tenantId + OAUTH2_TOKEN_PATH; + this.clientId = clientId; + this.tokenFile = tokenFile; + } + + @Override + protected AzureADToken refreshToken() throws IOException { + LOG.debug("AADToken: refreshing token from JWT Assertion"); + String clientAssertion = getClientAssertion(); + AzureADToken token = getTokenUsingJWTAssertion(clientAssertion); + tokenFetchTime = System.currentTimeMillis(); + return token; + } + + /** + * Gets the Azure AD token from a client assertion in JWT format. + * This method exists to make unit testing possible. + * + * @param clientAssertion the client assertion. + * @return the Azure AD token. + * @throws IOException if there is a failure in connecting to Azure AD. + */ + @VisibleForTesting + AzureADToken getTokenUsingJWTAssertion(String clientAssertion) throws IOException { + return AzureADAuthenticator + .getTokenUsingJWTAssertion(authEndpoint, clientId, clientAssertion); + } + + /** + * Checks if the token is about to expire as per base expiry logic. + * Otherwise, try to expire if enough time has elapsed since the last refresh. + * + * @return true if the token is expiring in next 1 hour or if a token has + * never been fetched + */ + @Override + protected boolean isTokenAboutToExpire() { + return super.isTokenAboutToExpire() || hasEnoughTimeElapsedSinceLastRefresh(); + } + + /** + * Checks to see if enough time has elapsed since the last token refresh. + * + * @return true if the token was last refreshed more than an hour ago. + */ + protected boolean hasEnoughTimeElapsedSinceLastRefresh() { + if (getTokenFetchTime() == -1) { + return true; + } + boolean expiring = false; + long elapsedTimeSinceLastTokenRefreshInMillis = + System.currentTimeMillis() - getTokenFetchTime(); + // In case token is not refreshed for 1 hr or any clock skew issues, + // refresh token. + expiring = elapsedTimeSinceLastTokenRefreshInMillis >= ONE_HOUR + || elapsedTimeSinceLastTokenRefreshInMillis < 0; Review Comment: So in case of clock skew we will be renewing during every call? I would at least add a warning message. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
