This is an automated email from the ASF dual-hosted git repository. mbalassi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6e5f8694b66d8dea8ae2926adafdb610c09d9b88 Author: Gabor Somogyi <[email protected]> AuthorDate: Tue Dec 13 15:48:33 2022 +0100 [FLINK-30422][runtime][security] Generalize token framework provider API --- .../runtime/resourcemanager/ResourceManager.java | 3 +- ...Listener.java => DelegationTokenContainer.java} | 31 +++-- .../security/token/DelegationTokenManager.java | 16 ++- ...nProvider.java => DelegationTokenProvider.java} | 47 ++++--- .../security/token/NoOpDelegationTokenManager.java | 5 +- .../token/hadoop/HBaseDelegationTokenProvider.java | 153 +++++++++------------ .../token/hadoop/HadoopDelegationTokenUpdater.java | 18 ++- .../hadoop/HadoopFSDelegationTokenProvider.java | 60 ++++---- .../hadoop/KerberosDelegationTokenManager.java | 125 ++++++++--------- .../flink/runtime/taskexecutor/TaskExecutor.java | 6 +- ...runtime.security.token.DelegationTokenProvider} | 0 .../token/DelegationTokenContainerTest.java | 73 ++++++++++ ... ExceptionThrowingDelegationTokenProvider.java} | 30 ++-- ...vider.java => TestDelegationTokenProvider.java} | 17 +-- .../hadoop/HadoopDelegationTokenConverterTest.java | 2 +- .../hadoop/HadoopDelegationTokenUpdaterITCase.java | 15 +- .../KerberosDelegationTokenManagerITCase.java | 25 ++-- ...runtime.security.token.DelegationTokenProvider} | 4 +- .../apache/flink/yarn/YarnClusterDescriptor.java | 10 +- 19 files changed, 361 insertions(+), 279 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 5d38143974d..2dd390ac711 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -67,7 +67,6 @@ import org.apache.flink.runtime.rpc.FencedRpcEndpoint; import org.apache.flink.runtime.rpc.Local; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcServiceUtils; -import org.apache.flink.runtime.security.token.DelegationTokenListener; import org.apache.flink.runtime.security.token.DelegationTokenManager; import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import org.apache.flink.runtime.slots.ResourceRequirement; @@ -117,7 +116,7 @@ import static org.apache.flink.util.Preconditions.checkState; */ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> extends FencedRpcEndpoint<ResourceManagerId> - implements DelegationTokenListener, ResourceManagerGateway { + implements DelegationTokenManager.Listener, ResourceManagerGateway { public static final String RESOURCE_MANAGER_NAME = "resourcemanager"; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenContainer.java similarity index 61% rename from flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenListener.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenContainer.java index 9a29e1acf3b..0f11452449b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenListener.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenContainer.java @@ -18,17 +18,26 @@ package org.apache.flink.runtime.security.token; -import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.Experimental; -/** - * Listener for delegation tokens state changes in the {@link DelegationTokenManager}. - * - * <p>By registering it in the manager one can receive callbacks when events are happening related - * to delegation tokens. - */ -@Internal -public interface DelegationTokenListener { +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +/** Container for delegation tokens. */ +@Experimental +public class DelegationTokenContainer implements Serializable { + private Map<String, byte[]> tokens = new HashMap<>(); + + public Map<String, byte[]> getTokens() { + return tokens; + } + + public void addToken(String key, byte[] value) { + tokens.put(key, value); + } - /** Callback function when new delegation tokens obtained. */ - void onNewTokensObtained(byte[] tokens) throws Exception; + public boolean hasTokens() { + return !tokens.isEmpty(); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenManager.java index 8a00c9ae154..57f53c75ccc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenManager.java @@ -20,8 +20,6 @@ package org.apache.flink.runtime.security.token; import org.apache.flink.annotation.Internal; -import org.apache.hadoop.security.Credentials; - /** * Manager for delegation tokens in a Flink cluster. * @@ -31,17 +29,27 @@ import org.apache.hadoop.security.Credentials; */ @Internal public interface DelegationTokenManager { + /** + * Listener for events in the {@link DelegationTokenManager}. + * + * <p>By registering it in the manager one can receive callbacks when events are happening. + */ + @Internal + interface Listener { + /** Callback function when new delegation tokens obtained. */ + void onNewTokensObtained(byte[] tokens) throws Exception; + } /** * Obtains new tokens in a one-time fashion and leaves it up to the caller to distribute them. */ - void obtainDelegationTokens(Credentials credentials) throws Exception; + void obtainDelegationTokens(DelegationTokenContainer container) throws Exception; /** * Creates a re-occurring task which obtains new tokens and automatically distributes them to * task managers. */ - void start(DelegationTokenListener delegationTokenListener) throws Exception; + void start(Listener listener) throws Exception; /** Stops re-occurring token obtain task. */ void stop(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenProvider.java similarity index 60% rename from flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenProvider.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenProvider.java index 061f32c6d6d..d98e5816fa9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenProvider.java @@ -16,27 +16,44 @@ * limitations under the License. */ -package org.apache.flink.runtime.security.token.hadoop; +package org.apache.flink.runtime.security.token; import org.apache.flink.annotation.Experimental; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.security.token.DelegationTokenManager; - -import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.UserGroupInformation; import java.util.Optional; /** - * Delegation token provider API for Hadoop components. Instances of token providers are loaded by - * {@link DelegationTokenManager} through service loader. - * - * <h2>Important Notes</h2> - * - * <p>Tokens are stored in {@link UserGroupInformation} + * Delegation token provider API. Instances of {@link DelegationTokenProvider}s are loaded by {@link + * DelegationTokenManager} through service loader. */ @Experimental -public interface HadoopDelegationTokenProvider { +public interface DelegationTokenProvider { + /** Container for obtained delegation tokens. */ + class ObtainedDelegationTokens { + /** Serialized form of delegation tokens. */ + private byte[] tokens; + + /** + * Time until the tokens are valid, if valid forever then `Optional.empty()` should be + * returned. + */ + private Optional<Long> validUntil; + + public ObtainedDelegationTokens(byte[] tokens, Optional<Long> validUntil) { + this.tokens = tokens; + this.validUntil = validUntil; + } + + public byte[] getTokens() { + return tokens; + } + + public Optional<Long> getValidUntil() { + return validUntil; + } + } + /** Name of the service to provide delegation tokens. This name should be unique. */ String serviceName(); @@ -57,9 +74,7 @@ public interface HadoopDelegationTokenProvider { /** * Obtain delegation tokens for this service. * - * @param credentials Credentials to add tokens and security keys to. - * @return If the returned tokens are renewable and can be renewed, return the time of the next - * renewal, otherwise `Optional.empty()` should be returned. + * @return the obtained delegation tokens. */ - Optional<Long> obtainDelegationTokens(Credentials credentials) throws Exception; + ObtainedDelegationTokens obtainDelegationTokens() throws Exception; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/NoOpDelegationTokenManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/NoOpDelegationTokenManager.java index 92063bc07eb..cab68f825e6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/NoOpDelegationTokenManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/NoOpDelegationTokenManager.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.security.token; import org.apache.flink.annotation.Internal; -import org.apache.hadoop.security.Credentials; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,12 +34,12 @@ public class NoOpDelegationTokenManager implements DelegationTokenManager { } @Override - public void obtainDelegationTokens(Credentials credentials) { + public void obtainDelegationTokens(DelegationTokenContainer container) { LOG.debug("obtainDelegationTokens"); } @Override - public void start(DelegationTokenListener delegationTokenListener) { + public void start(Listener listener) { LOG.debug("start"); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenProvider.java index 4441267f355..572a49eb8c3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenProvider.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.security.token.hadoop; import org.apache.flink.annotation.Experimental; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.security.token.DelegationTokenProvider; import org.apache.flink.runtime.util.HadoopUtils; import org.apache.flink.util.Preconditions; @@ -42,7 +43,7 @@ import java.util.Optional; * tend to move this but that would be a breaking change. */ @Experimental -public class HBaseDelegationTokenProvider implements HadoopDelegationTokenProvider { +public class HBaseDelegationTokenProvider implements DelegationTokenProvider { private static final Logger LOG = LoggerFactory.getLogger(HBaseDelegationTokenProvider.class); @@ -89,7 +90,7 @@ public class HBaseDelegationTokenProvider implements HadoopDelegationTokenProvid } @Override - public boolean delegationTokensRequired() { + public boolean delegationTokensRequired() throws Exception { try { if (!HadoopUtils.isKerberosSecurityEnabled(UserGroupInformation.getCurrentUser())) { return false; @@ -99,96 +100,74 @@ public class HBaseDelegationTokenProvider implements HadoopDelegationTokenProvid return false; } return Objects.nonNull(hbaseConf) - && hbaseConf.get("hbase.security.authentication").equals("kerberos"); + && hbaseConf.get("hbase.security.authentication").equals("kerberos") + && kerberosLoginProvider.isLoginPossible(); } @Override - public Optional<Long> obtainDelegationTokens(Credentials credentials) throws Exception { - if (kerberosLoginProvider.isLoginPossible()) { - UserGroupInformation freshUGI = kerberosLoginProvider.doLoginAndReturnUGI(); - return freshUGI.doAs( - (PrivilegedExceptionAction<Optional<Long>>) - () -> { - Token<?> token; - try { - Preconditions.checkNotNull(hbaseConf); - try { - LOG.info("Obtaining Kerberos security token for HBase"); - // ---- - // Intended call: Token<AuthenticationTokenIdentifier> token - // = - // TokenUtil.obtainToken(conf); - token = - (Token<?>) - Class.forName( - "org.apache.hadoop.hbase.security.token.TokenUtil") - .getMethod( - "obtainToken", - org.apache.hadoop.conf - .Configuration - .class) - .invoke(null, hbaseConf); - } catch (NoSuchMethodException e) { - // for HBase 2 - - // ---- - // Intended call: ConnectionFactory connectionFactory = - // ConnectionFactory.createConnection(conf); - Closeable connectionFactory = - (Closeable) - Class.forName( - "org.apache.hadoop.hbase.client.ConnectionFactory") - .getMethod( - "createConnection", - org.apache.hadoop.conf - .Configuration - .class) - .invoke(null, hbaseConf); - // ---- - Class<?> connectionClass = + public ObtainedDelegationTokens obtainDelegationTokens() throws Exception { + UserGroupInformation freshUGI = kerberosLoginProvider.doLoginAndReturnUGI(); + return freshUGI.doAs( + (PrivilegedExceptionAction<ObtainedDelegationTokens>) + () -> { + Token<?> token; + Preconditions.checkNotNull(hbaseConf); + try { + LOG.info("Obtaining Kerberos security token for HBase"); + // ---- + // Intended call: Token<AuthenticationTokenIdentifier> token + // = + // TokenUtil.obtainToken(conf); + token = + (Token<?>) Class.forName( - "org.apache.hadoop.hbase.client.Connection"); - // ---- - // Intended call: Token<AuthenticationTokenIdentifier> token - // = - // TokenUtil.obtainToken(connectionFactory); - token = - (Token<?>) - Class.forName( - "org.apache.hadoop.hbase.security.token.TokenUtil") - .getMethod( - "obtainToken", - connectionClass) - .invoke(null, connectionFactory); - if (null != connectionFactory) { - connectionFactory.close(); - } - } - if (token == null) { - LOG.error("No Kerberos security token for HBase available"); - } else { - credentials.addToken(token.getService(), token); - LOG.info( - "Added HBase Kerberos security token to credentials."); - } - } catch (ClassNotFoundException - | NoSuchMethodException - | IllegalAccessException - | InvocationTargetException - | IOException e) { - LOG.info( - "HBase is not available (failed to obtain delegation tokens): {} : \"{}\".", - e.getClass().getSimpleName(), - e.getMessage()); + "org.apache.hadoop.hbase.security.token.TokenUtil") + .getMethod( + "obtainToken", + org.apache.hadoop.conf.Configuration + .class) + .invoke(null, hbaseConf); + } catch (NoSuchMethodException e) { + // for HBase 2 + + // ---- + // Intended call: ConnectionFactory connectionFactory = + // ConnectionFactory.createConnection(conf); + Closeable connectionFactory = + (Closeable) + Class.forName( + "org.apache.hadoop.hbase.client.ConnectionFactory") + .getMethod( + "createConnection", + org.apache.hadoop.conf.Configuration + .class) + .invoke(null, hbaseConf); + // ---- + Class<?> connectionClass = + Class.forName("org.apache.hadoop.hbase.client.Connection"); + // ---- + // Intended call: Token<AuthenticationTokenIdentifier> token + // = + // TokenUtil.obtainToken(connectionFactory); + token = + (Token<?>) + Class.forName( + "org.apache.hadoop.hbase.security.token.TokenUtil") + .getMethod("obtainToken", connectionClass) + .invoke(null, connectionFactory); + if (null != connectionFactory) { + connectionFactory.close(); } + } - // HBase does not support to renew the delegation token currently - // https://cwiki.apache.org/confluence/display/HADOOP2/Hbase+HBaseTokenAuthentication - return Optional.empty(); - }); - } else { - LOG.info("Real user has no kerberos credentials so no tokens obtained"); - return Optional.empty(); - } + Credentials credentials = new Credentials(); + credentials.addToken(token.getService(), token); + + // HBase does not support to renew the delegation token currently + // https://cwiki.apache.org/confluence/display/HADOOP2/Hbase+HBaseTokenAuthentication + return new ObtainedDelegationTokens( + HadoopDelegationTokenConverter.serialize(credentials), + Optional.empty()); + }); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenUpdater.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenUpdater.java index a4528cfad54..d6ec75797a9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenUpdater.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenUpdater.java @@ -19,14 +19,14 @@ package org.apache.flink.runtime.security.token.hadoop; import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.security.token.DelegationTokenContainer; +import org.apache.flink.util.InstantiationUtil; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; - /** Delegation token updater functionality. */ @Internal public final class HadoopDelegationTokenUpdater { @@ -36,11 +36,17 @@ public final class HadoopDelegationTokenUpdater { private HadoopDelegationTokenUpdater() {} /** Updates delegation tokens for the current user. */ - public static void addCurrentUserCredentials(byte[] credentialsBytes) throws IOException { - if (credentialsBytes == null || credentialsBytes.length == 0) { - throw new IllegalArgumentException("Illegal credentials tried to be set"); + public static void addCurrentUserCredentials(byte[] containerBytes) throws Exception { + if (containerBytes == null || containerBytes.length == 0) { + throw new IllegalArgumentException("Illegal container tried to be processed"); + } + DelegationTokenContainer container = + InstantiationUtil.deserializeObject( + containerBytes, HadoopDelegationTokenUpdater.class.getClassLoader()); + Credentials credentials = new Credentials(); + for (byte[] v : container.getTokens().values()) { + credentials.addAll(HadoopDelegationTokenConverter.deserialize(v)); } - Credentials credentials = HadoopDelegationTokenConverter.deserialize(credentialsBytes); LOG.info("Updating delegation tokens for current user"); dumpAllTokens(credentials); UserGroupInformation.getCurrentUser().addCredentials(credentials); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java index 81d25eb0f51..a6a13f3a6cb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.ConfigUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.SecurityOptions; +import org.apache.flink.runtime.security.token.DelegationTokenProvider; import org.apache.flink.runtime.util.HadoopUtils; import org.apache.flink.util.FlinkRuntimeException; @@ -47,7 +48,7 @@ import java.util.Set; /** Delegation token provider for Hadoop filesystems. */ @Experimental -public class HadoopFSDelegationTokenProvider implements HadoopDelegationTokenProvider { +public class HadoopFSDelegationTokenProvider implements DelegationTokenProvider { private static final Logger LOG = LoggerFactory.getLogger(HadoopFSDelegationTokenProvider.class); @@ -74,37 +75,38 @@ public class HadoopFSDelegationTokenProvider implements HadoopDelegationTokenPro @Override public boolean delegationTokensRequired() throws Exception { - return HadoopUtils.isKerberosSecurityEnabled(UserGroupInformation.getCurrentUser()); + return HadoopUtils.isKerberosSecurityEnabled(UserGroupInformation.getCurrentUser()) + && kerberosLoginProvider.isLoginPossible(); } @Override - public Optional<Long> obtainDelegationTokens(Credentials credentials) throws Exception { - if (kerberosLoginProvider.isLoginPossible()) { - UserGroupInformation freshUGI = kerberosLoginProvider.doLoginAndReturnUGI(); - return freshUGI.doAs( - (PrivilegedExceptionAction<Optional<Long>>) - () -> { - Clock clock = Clock.systemDefaultZone(); - Set<FileSystem> fileSystemsToAccess = getFileSystemsToAccess(); - - obtainDelegationTokens( - getRenewer(), fileSystemsToAccess, credentials); - - // Get the token renewal interval if it is not set. It will be - // called - // only once. - if (tokenRenewalInterval == null) { - tokenRenewalInterval = - getTokenRenewalInterval(clock, fileSystemsToAccess); - } - return tokenRenewalInterval.flatMap( - interval -> - getTokenRenewalDate(clock, credentials, interval)); - }); - } else { - LOG.info("Real user has no kerberos credentials so no tokens obtained"); - return Optional.empty(); - } + public ObtainedDelegationTokens obtainDelegationTokens() throws Exception { + UserGroupInformation freshUGI = kerberosLoginProvider.doLoginAndReturnUGI(); + return freshUGI.doAs( + (PrivilegedExceptionAction<ObtainedDelegationTokens>) + () -> { + Credentials credentials = new Credentials(); + Clock clock = Clock.systemDefaultZone(); + Set<FileSystem> fileSystemsToAccess = getFileSystemsToAccess(); + + obtainDelegationTokens(getRenewer(), fileSystemsToAccess, credentials); + + // Get the token renewal interval if it is not set. It will be + // called + // only once. + if (tokenRenewalInterval == null) { + tokenRenewalInterval = + getTokenRenewalInterval(clock, fileSystemsToAccess); + } + Optional<Long> validUntil = + tokenRenewalInterval.flatMap( + interval -> + getTokenRenewalDate( + clock, credentials, interval)); + return new ObtainedDelegationTokens( + HadoopDelegationTokenConverter.serialize(credentials), + validUntil); + }); } @VisibleForTesting diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManager.java index 60b754bd450..98bab881b1f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManager.java @@ -21,12 +21,13 @@ package org.apache.flink.runtime.security.token.hadoop; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.security.token.DelegationTokenListener; +import org.apache.flink.runtime.security.token.DelegationTokenContainer; import org.apache.flink.runtime.security.token.DelegationTokenManager; +import org.apache.flink.runtime.security.token.DelegationTokenProvider; import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.concurrent.ScheduledExecutor; -import org.apache.hadoop.security.Credentials; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,9 +68,7 @@ public class KerberosDelegationTokenManager implements DelegationTokenManager { private final long renewalRetryBackoffPeriod; - private final KerberosLoginProvider kerberosLoginProvider; - - @VisibleForTesting final Map<String, HadoopDelegationTokenProvider> delegationTokenProviders; + @VisibleForTesting final Map<String, DelegationTokenProvider> delegationTokenProviders; @Nullable private final ScheduledExecutor scheduledExecutor; @@ -81,7 +80,7 @@ public class KerberosDelegationTokenManager implements DelegationTokenManager { @Nullable private ScheduledFuture<?> tokensUpdateFuture; - @Nullable private DelegationTokenListener delegationTokenListener; + @Nullable private Listener listener; public KerberosDelegationTokenManager( Configuration configuration, @@ -91,26 +90,29 @@ public class KerberosDelegationTokenManager implements DelegationTokenManager { this.tokensRenewalTimeRatio = configuration.get(KERBEROS_TOKENS_RENEWAL_TIME_RATIO); this.renewalRetryBackoffPeriod = configuration.get(KERBEROS_TOKENS_RENEWAL_RETRY_BACKOFF).toMillis(); - this.kerberosLoginProvider = new KerberosLoginProvider(configuration); this.delegationTokenProviders = loadProviders(); this.scheduledExecutor = scheduledExecutor; this.ioExecutor = ioExecutor; } - private Map<String, HadoopDelegationTokenProvider> loadProviders() { + private Map<String, DelegationTokenProvider> loadProviders() { LOG.info("Loading delegation token providers"); - ServiceLoader<HadoopDelegationTokenProvider> serviceLoader = - ServiceLoader.load(HadoopDelegationTokenProvider.class); + ServiceLoader<DelegationTokenProvider> serviceLoader = + ServiceLoader.load(DelegationTokenProvider.class); - Map<String, HadoopDelegationTokenProvider> providers = new HashMap<>(); - for (HadoopDelegationTokenProvider provider : serviceLoader) { + Map<String, DelegationTokenProvider> providers = new HashMap<>(); + for (DelegationTokenProvider provider : serviceLoader) { try { if (isProviderEnabled(provider.serviceName())) { provider.init(configuration); LOG.info( "Delegation token provider {} loaded and initialized", provider.serviceName()); + checkState( + !providers.containsKey(provider.serviceName()), + "Delegation token provider with service name {} has multiple implementations", + provider.serviceName()); providers.put(provider.serviceName(), provider); } else { LOG.info( @@ -148,47 +150,47 @@ public class KerberosDelegationTokenManager implements DelegationTokenManager { * Obtains new tokens in a one-time fashion and leaves it up to the caller to distribute them. */ @Override - public void obtainDelegationTokens(Credentials credentials) throws Exception { + public void obtainDelegationTokens(DelegationTokenContainer container) throws Exception { LOG.info("Obtaining delegation tokens"); - obtainDelegationTokensAndGetNextRenewal(credentials); + obtainDelegationTokensAndGetNextRenewal(container); LOG.info("Delegation tokens obtained successfully"); } - protected Optional<Long> obtainDelegationTokensAndGetNextRenewal(Credentials credentials) { - Optional<Long> nextRenewal = - delegationTokenProviders.values().stream() - .map( - provider -> { - try { - Optional<Long> nr = Optional.empty(); - if (provider.delegationTokensRequired()) { - LOG.debug( - "Obtaining delegation token for service {}", - provider.serviceName()); - nr = provider.obtainDelegationTokens(credentials); - LOG.debug( - "Obtained delegation token for service {} successfully", - provider.serviceName()); - } else { - LOG.debug( - "Service {} does not need to obtain delegation token", - provider.serviceName()); - } - return nr; - } catch (Exception e) { - LOG.error( - "Failed to obtain delegation token for provider {}", - provider.serviceName(), - e); - throw new FlinkRuntimeException(e); - } - }) - .flatMap(nr -> nr.map(Stream::of).orElseGet(Stream::empty)) - .min(Long::compare); - - HadoopDelegationTokenUpdater.dumpAllTokens(credentials); - - return nextRenewal; + protected Optional<Long> obtainDelegationTokensAndGetNextRenewal( + DelegationTokenContainer container) { + return delegationTokenProviders.values().stream() + .map( + p -> { + Optional<Long> nr = Optional.empty(); + try { + if (p.delegationTokensRequired()) { + LOG.debug( + "Obtaining delegation token for service {}", + p.serviceName()); + DelegationTokenProvider.ObtainedDelegationTokens t = + p.obtainDelegationTokens(); + checkNotNull(t, "Obtained delegation tokens must not be null"); + container.addToken(p.serviceName(), t.getTokens()); + nr = t.getValidUntil(); + LOG.debug( + "Obtained delegation token for service {} successfully", + p.serviceName()); + } else { + LOG.debug( + "Service {} does not need to obtain delegation token", + p.serviceName()); + } + } catch (Exception e) { + LOG.error( + "Failed to obtain delegation token for provider {}", + p.serviceName(), + e); + throw new FlinkRuntimeException(e); + } + return nr; + }) + .flatMap(nr -> nr.map(Stream::of).orElseGet(Stream::empty)) + .min(Long::compare); } /** @@ -196,20 +198,14 @@ public class KerberosDelegationTokenManager implements DelegationTokenManager { * task managers. */ @Override - public void start(DelegationTokenListener delegationTokenListener) throws Exception { + public void start(Listener listener) throws Exception { checkNotNull(scheduledExecutor, "Scheduled executor must not be null"); checkNotNull(ioExecutor, "IO executor must not be null"); - this.delegationTokenListener = - checkNotNull(delegationTokenListener, "Delegation token listener must not be null"); + this.listener = checkNotNull(listener, "Listener must not be null"); synchronized (tokensUpdateFutureLock) { checkState(tokensUpdateFuture == null, "Manager is already started"); } - if (!kerberosLoginProvider.isLoginPossible()) { - LOG.info("Renewal is NOT possible, skipping to start renewal task"); - return; - } - startTokensUpdate(); } @@ -217,17 +213,16 @@ public class KerberosDelegationTokenManager implements DelegationTokenManager { void startTokensUpdate() { try { LOG.info("Starting tokens update task"); - Credentials credentials = new Credentials(); - Optional<Long> nextRenewal = obtainDelegationTokensAndGetNextRenewal(credentials); - - if (credentials.numberOfTokens() > 0) { - byte[] credentialsBytes = HadoopDelegationTokenConverter.serialize(credentials); + DelegationTokenContainer container = new DelegationTokenContainer(); + Optional<Long> nextRenewal = obtainDelegationTokensAndGetNextRenewal(container); - HadoopDelegationTokenUpdater.addCurrentUserCredentials(credentialsBytes); + if (container.hasTokens()) { + byte[] containerBytes = InstantiationUtil.serializeObject(container); + HadoopDelegationTokenUpdater.addCurrentUserCredentials(containerBytes); LOG.info("Notifying listener about new tokens"); - checkNotNull(delegationTokenListener, "Listener must not be null"); - delegationTokenListener.onNewTokensObtained(credentialsBytes); + checkNotNull(listener, "Listener must not be null"); + listener.onNewTokensObtained(containerBytes); LOG.info("Listener notified successfully"); } else { LOG.warn("No tokens obtained so skipping listener notification"); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 24384585475..bd8941622f0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -2376,11 +2376,11 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { final ClusterInformation clusterInformation = success.getClusterInformation(); final ResourceManagerGateway resourceManagerGateway = connection.getTargetGateway(); - if (success.getInitialTokens() != null) { + byte[] tokens = success.getInitialTokens(); + if (tokens != null) { try { log.info("Receive initial delegation tokens from resource manager"); - HadoopDelegationTokenUpdater.addCurrentUserCredentials( - success.getInitialTokens()); + HadoopDelegationTokenUpdater.addCurrentUserCredentials(tokens); } catch (Throwable t) { log.error("Could not update delegation tokens.", t); ExceptionUtils.rethrowIfFatalError(t); diff --git a/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenProvider b/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider similarity index 100% rename from flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenProvider rename to flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenContainerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenContainerTest.java new file mode 100644 index 00000000000..ed63463231f --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenContainerTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security.token; + +import org.apache.flink.util.InstantiationUtil; + +import org.junit.jupiter.api.Test; + +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** Test for {@link DelegationTokenContainer}. */ +class DelegationTokenContainerTest { + private static final String TOKEN_KEY = "TEST_TOKEN_KEY"; + private static final String TOKEN_VALUE = "TEST_TOKEN_VALUE"; + + @Test + public void testRoundTrip() throws Exception { + final DelegationTokenContainer container = new DelegationTokenContainer(); + container.addToken(TOKEN_KEY, TOKEN_VALUE.getBytes()); + + final byte[] containerBytes = InstantiationUtil.serializeObject(container); + final DelegationTokenContainer deserializedContainer = + InstantiationUtil.deserializeObject(containerBytes, getClass().getClassLoader()); + + final Map<String, byte[]> genericTokens = deserializedContainer.getTokens(); + assertEquals(1, genericTokens.size()); + assertArrayEquals(TOKEN_VALUE.getBytes(), genericTokens.get(TOKEN_KEY)); + } + + @Test + public void getTokenShouldReturnNullWhenNoTokens() { + final DelegationTokenContainer container = new DelegationTokenContainer(); + + assertNull(container.getTokens().get(TOKEN_KEY)); + } + + @Test + public void hasTokensShouldReturnFalseWhenNoTokens() { + final DelegationTokenContainer container = new DelegationTokenContainer(); + + assertFalse(container.hasTokens()); + } + + @Test + public void hasTokensShouldReturnTrueWithGenericToken() { + final DelegationTokenContainer container = new DelegationTokenContainer(); + container.addToken(TOKEN_KEY, TOKEN_VALUE.getBytes()); + + assertTrue(container.hasTokens()); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/ExceptionThrowingHadoopDelegationTokenProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenProvider.java similarity index 67% rename from flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/ExceptionThrowingHadoopDelegationTokenProvider.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenProvider.java index 234b0b68b13..05b2875a8d1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/ExceptionThrowingHadoopDelegationTokenProvider.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenProvider.java @@ -16,34 +16,30 @@ * limitations under the License. */ -package org.apache.flink.runtime.security.token.hadoop; +package org.apache.flink.runtime.security.token; import org.apache.flink.configuration.Configuration; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.token.Token; - import java.util.Optional; /** - * An example implementation of {@link HadoopDelegationTokenProvider} which throws exception when - * enabled. + * An example implementation of {@link DelegationTokenProvider} which throws exception when enabled. */ -public class ExceptionThrowingHadoopDelegationTokenProvider - implements HadoopDelegationTokenProvider { +public class ExceptionThrowingDelegationTokenProvider implements DelegationTokenProvider { public static volatile boolean throwInInit = false; public static volatile boolean throwInUsage = false; + public static volatile boolean addToken = false; public static volatile boolean constructed = false; public static void reset() { throwInInit = false; throwInUsage = false; + addToken = false; constructed = false; } - public ExceptionThrowingHadoopDelegationTokenProvider() { + public ExceptionThrowingDelegationTokenProvider() { constructed = true; } @@ -64,18 +60,18 @@ public class ExceptionThrowingHadoopDelegationTokenProvider if (throwInUsage) { throw new IllegalArgumentException(); } - return true; + return addToken; } @Override - public Optional<Long> obtainDelegationTokens(Credentials credentials) { + public ObtainedDelegationTokens obtainDelegationTokens() { if (throwInUsage) { throw new IllegalArgumentException(); } - final Text tokenKind = new Text("TEST_TOKEN_KIND"); - final Text tokenService = new Text("TEST_TOKEN_SERVICE"); - credentials.addToken( - tokenService, new Token<>(new byte[4], new byte[4], tokenKind, tokenService)); - return Optional.empty(); + if (addToken) { + return new ObtainedDelegationTokens("TEST_TOKEN_VALUE".getBytes(), Optional.empty()); + } else { + return null; + } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/TestHadoopDelegationTokenProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestDelegationTokenProvider.java similarity index 70% rename from flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/TestHadoopDelegationTokenProvider.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestDelegationTokenProvider.java index 7b692ad8da2..a5cf75d3dc4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/TestHadoopDelegationTokenProvider.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestDelegationTokenProvider.java @@ -16,19 +16,12 @@ * limitations under the License. */ -package org.apache.flink.runtime.security.token.hadoop; +package org.apache.flink.runtime.security.token; import org.apache.flink.configuration.Configuration; -import org.apache.hadoop.security.Credentials; - -import java.util.Optional; - -/** - * An example implementation of {@link - * org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenProvider} which does nothing. - */ -public class TestHadoopDelegationTokenProvider implements HadoopDelegationTokenProvider { +/** An example implementation of {@link DelegationTokenProvider} which does nothing. */ +public class TestDelegationTokenProvider implements DelegationTokenProvider { @Override public String serviceName() { @@ -44,7 +37,7 @@ public class TestHadoopDelegationTokenProvider implements HadoopDelegationTokenP } @Override - public Optional<Long> obtainDelegationTokens(Credentials credentials) { - return Optional.empty(); + public ObtainedDelegationTokens obtainDelegationTokens() { + return null; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenConverterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenConverterTest.java index 8ab3625df6f..5a2e9db21c3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenConverterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenConverterTest.java @@ -32,7 +32,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; public class HadoopDelegationTokenConverterTest { @Test - public void testRoundTrip() throws IOException { + public void testRoundTrip() throws IOException, ClassNotFoundException { final Text tokenKind = new Text("TEST_TOKEN_KIND"); final Text tokenService = new Text("TEST_TOKEN_SERVICE"); Credentials credentials = new Credentials(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenUpdaterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenUpdaterITCase.java index 70d1a1cfab4..7da8411dc96 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenUpdaterITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenUpdaterITCase.java @@ -18,6 +18,9 @@ package org.apache.flink.runtime.security.token.hadoop; +import org.apache.flink.runtime.security.token.DelegationTokenContainer; +import org.apache.flink.util.InstantiationUtil; + import org.apache.commons.collections.CollectionUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; @@ -27,8 +30,6 @@ import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.mockito.MockedStatic; -import java.io.IOException; - import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; @@ -60,25 +61,27 @@ public class HadoopDelegationTokenUpdaterITCase { () -> HadoopDelegationTokenUpdater.addCurrentUserCredentials( credentialsBytes)); - assertTrue(e.getMessage().contains("Illegal credentials")); + assertTrue(e.getMessage().contains("Illegal container")); } } @Test - public void addCurrentUserCredentialsShouldOverwriteCredentials() throws IOException { + public void addCurrentUserCredentialsShouldOverwriteCredentials() throws Exception { final Text tokenKind = new Text("TEST_TOKEN_KIND"); final Text tokenService = new Text("TEST_TOKEN_SERVICE"); Credentials credentials = new Credentials(); credentials.addToken( tokenService, new Token<>(new byte[4], new byte[4], tokenKind, tokenService)); - byte[] credentialsBytes = HadoopDelegationTokenConverter.serialize(credentials); + DelegationTokenContainer container = new DelegationTokenContainer(); + container.addToken("TEST_TOKEN_KEY", credentialsBytes); + byte[] containerBytes = InstantiationUtil.serializeObject(container); try (MockedStatic<UserGroupInformation> ugi = mockStatic(UserGroupInformation.class)) { UserGroupInformation userGroupInformation = mock(UserGroupInformation.class); ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation); - HadoopDelegationTokenUpdater.addCurrentUserCredentials(credentialsBytes); + HadoopDelegationTokenUpdater.addCurrentUserCredentials(containerBytes); ArgumentCaptor<Credentials> argumentCaptor = ArgumentCaptor.forClass(Credentials.class); verify(userGroupInformation, times(1)).addCredentials(argumentCaptor.capture()); assertTrue( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManagerITCase.java index fd329dce3e7..45b04eeeff0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManagerITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManagerITCase.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.security.token.hadoop; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService; import org.apache.flink.runtime.security.token.DelegationTokenManager; +import org.apache.flink.runtime.security.token.ExceptionThrowingDelegationTokenProvider; import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor; import org.apache.hadoop.security.UserGroupInformation; @@ -50,7 +51,7 @@ public class KerberosDelegationTokenManagerITCase { @Test public void isProviderEnabledMustGiveBackTrueByDefault() { - ExceptionThrowingHadoopDelegationTokenProvider.reset(); + ExceptionThrowingDelegationTokenProvider.reset(); Configuration configuration = new Configuration(); KerberosDelegationTokenManager delegationTokenManager = new KerberosDelegationTokenManager(configuration, null, null); @@ -60,7 +61,7 @@ public class KerberosDelegationTokenManagerITCase { @Test public void isProviderEnabledMustGiveBackFalseWhenDisabled() { - ExceptionThrowingHadoopDelegationTokenProvider.reset(); + ExceptionThrowingDelegationTokenProvider.reset(); Configuration configuration = new Configuration(); configuration.setBoolean("security.kerberos.token.provider.test.enabled", false); KerberosDelegationTokenManager delegationTokenManager = @@ -80,18 +81,18 @@ public class KerberosDelegationTokenManagerITCase { Exception.class, () -> { try { - ExceptionThrowingHadoopDelegationTokenProvider.reset(); - ExceptionThrowingHadoopDelegationTokenProvider.throwInInit = true; + ExceptionThrowingDelegationTokenProvider.reset(); + ExceptionThrowingDelegationTokenProvider.throwInInit = true; new KerberosDelegationTokenManager(new Configuration(), null, null); } finally { - ExceptionThrowingHadoopDelegationTokenProvider.reset(); + ExceptionThrowingDelegationTokenProvider.reset(); } }); } @Test public void testAllProvidersLoaded() { - ExceptionThrowingHadoopDelegationTokenProvider.reset(); + ExceptionThrowingDelegationTokenProvider.reset(); Configuration configuration = new Configuration(); configuration.setBoolean("security.kerberos.token.provider.throw.enabled", false); KerberosDelegationTokenManager delegationTokenManager = @@ -101,7 +102,7 @@ public class KerberosDelegationTokenManagerITCase { assertTrue(delegationTokenManager.isProviderLoaded("hadoopfs")); assertTrue(delegationTokenManager.isProviderLoaded("hbase")); assertTrue(delegationTokenManager.isProviderLoaded("test")); - assertTrue(ExceptionThrowingHadoopDelegationTokenProvider.constructed); + assertTrue(ExceptionThrowingDelegationTokenProvider.constructed); assertFalse(delegationTokenManager.isProviderLoaded("throw")); } @@ -116,11 +117,11 @@ public class KerberosDelegationTokenManagerITCase { UserGroupInformation userGroupInformation = mock(UserGroupInformation.class); ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation); - ExceptionThrowingHadoopDelegationTokenProvider.reset(); + ExceptionThrowingDelegationTokenProvider.reset(); + ExceptionThrowingDelegationTokenProvider.addToken = true; Configuration configuration = new Configuration(); configuration.setBoolean("security.kerberos.token.provider.throw.enabled", true); AtomicInteger startTokensUpdateCallCount = new AtomicInteger(0); - KerberosLoginProvider kerberosLoginProvider = new KerberosLoginProvider(configuration); KerberosDelegationTokenManager delegationTokenManager = new KerberosDelegationTokenManager( configuration, scheduledExecutor, scheduler) { @@ -132,10 +133,10 @@ public class KerberosDelegationTokenManagerITCase { }; delegationTokenManager.startTokensUpdate(); - ExceptionThrowingHadoopDelegationTokenProvider.throwInUsage = true; + ExceptionThrowingDelegationTokenProvider.throwInUsage = true; scheduledExecutor.triggerScheduledTasks(); scheduler.triggerAll(); - ExceptionThrowingHadoopDelegationTokenProvider.throwInUsage = false; + ExceptionThrowingDelegationTokenProvider.throwInUsage = false; scheduledExecutor.triggerScheduledTasks(); scheduler.triggerAll(); delegationTokenManager.stopTokensUpdate(); @@ -146,7 +147,7 @@ public class KerberosDelegationTokenManagerITCase { @Test public void calculateRenewalDelayShouldConsiderRenewalRatio() { - ExceptionThrowingHadoopDelegationTokenProvider.reset(); + ExceptionThrowingDelegationTokenProvider.reset(); Configuration configuration = new Configuration(); configuration.setBoolean("security.kerberos.token.provider.throw.enabled", false); configuration.set(KERBEROS_TOKENS_RENEWAL_TIME_RATIO, 0.5); diff --git a/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenProvider b/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider similarity index 83% rename from flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenProvider rename to flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider index 74e3982f7bc..bd3ed1d6482 100644 --- a/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenProvider +++ b/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider @@ -13,5 +13,5 @@ # See the License for the specific language governing permissions and # limitations under the License. -org.apache.flink.runtime.security.token.hadoop.TestHadoopDelegationTokenProvider -org.apache.flink.runtime.security.token.hadoop.ExceptionThrowingHadoopDelegationTokenProvider +org.apache.flink.runtime.security.token.TestDelegationTokenProvider +org.apache.flink.runtime.security.token.ExceptionThrowingDelegationTokenProvider diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java index 235400fe7ce..7cd065cf206 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java @@ -52,6 +52,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.jobmanager.JobManagerProcessSpec; import org.apache.flink.runtime.jobmanager.JobManagerProcessUtils; +import org.apache.flink.runtime.security.token.DelegationTokenContainer; import org.apache.flink.runtime.security.token.DelegationTokenManager; import org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenConverter; import org.apache.flink.runtime.security.token.hadoop.KerberosDelegationTokenManager; @@ -1294,12 +1295,15 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> { private void setTokensFor(ContainerLaunchContext containerLaunchContext) throws Exception { LOG.info("Adding delegation tokens to the AM container."); - Credentials credentials = new Credentials(); - DelegationTokenManager delegationTokenManager = new KerberosDelegationTokenManager(flinkConfiguration, null, null); - delegationTokenManager.obtainDelegationTokens(credentials); + DelegationTokenContainer container = new DelegationTokenContainer(); + delegationTokenManager.obtainDelegationTokens(container); + Credentials credentials = new Credentials(); + for (byte[] v : container.getTokens().values()) { + credentials.addAll(HadoopDelegationTokenConverter.deserialize(v)); + } ByteBuffer tokens = ByteBuffer.wrap(HadoopDelegationTokenConverter.serialize(credentials)); containerLaunchContext.setTokens(tokens);
