This is an automated email from the ASF dual-hosted git repository.

dmvk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 220ef999d2a [FLINK-26043][runtime][security] Add periodic kerberos 
relogin to KerberosDelegationTokenManager [FLINK-27605][tests] Updated Mockito 
version to 3.4.6 in order to use static method mocking
220ef999d2a is described below

commit 220ef999d2a353fd52cc0aa1a93c26d9b696c1ce
Author: gabor.g.somogyi <[email protected]>
AuthorDate: Mon Feb 14 14:23:17 2022 +0100

    [FLINK-26043][runtime][security] Add periodic kerberos relogin to 
KerberosDelegationTokenManager
    [FLINK-27605][tests] Updated Mockito version to 3.4.6 in order to use 
static method mocking
---
 .../generated/security_auth_kerberos_section.html  |   6 +
 .../generated/security_configuration.html          |   6 +
 .../kinesis/FlinkKinesisConsumerTest.java          | 215 +++++++++++----------
 .../flink/configuration/SecurityOptions.java       |   9 +
 .../runtime/entrypoint/ClusterEntrypoint.java      |  15 +-
 .../flink/runtime/minicluster/MiniCluster.java     |  15 +-
 .../security/token/DelegationTokenManager.java     |   2 +-
 .../token/KerberosDelegationTokenManager.java      |  87 ++++++++-
 .../KerberosDelegationTokenManagerFactory.java     |  58 ++++++
 .../token/KerberosRenewalPossibleProvider.java     |  66 +++++++
 .../runtime/rest/FileUploadHandlerITCase.java      |  14 +-
 .../token/KerberosDelegationTokenManagerTest.java  |  69 +++++--
 .../token/KerberosRenewalPossibleProviderTest.java |  78 ++++++++
 .../apache/flink/yarn/YarnClusterDescriptor.java   |   2 +-
 pom.xml                                            |  14 +-
 15 files changed, 508 insertions(+), 148 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/security_auth_kerberos_section.html 
b/docs/layouts/shortcodes/generated/security_auth_kerberos_section.html
index 580b51dae73..638ee4c9ec0 100644
--- a/docs/layouts/shortcodes/generated/security_auth_kerberos_section.html
+++ b/docs/layouts/shortcodes/generated/security_auth_kerberos_section.html
@@ -38,5 +38,11 @@
             <td>Boolean</td>
             <td>Indicates whether to read from your Kerberos ticket cache.</td>
         </tr>
+        <tr>
+            <td><h5>security.kerberos.relogin.period</h5></td>
+            <td style="word-wrap: break-word;">1 min</td>
+            <td>Duration</td>
+            <td>The time period when keytab login happens automatically in 
order to always have a valid TGT.</td>
+        </tr>
     </tbody>
 </table>
diff --git a/docs/layouts/shortcodes/generated/security_configuration.html 
b/docs/layouts/shortcodes/generated/security_configuration.html
index 2afa767a44f..87a3aab06d5 100644
--- a/docs/layouts/shortcodes/generated/security_configuration.html
+++ b/docs/layouts/shortcodes/generated/security_configuration.html
@@ -50,6 +50,12 @@
             <td>Boolean</td>
             <td>Indicates whether to read from your Kerberos ticket cache.</td>
         </tr>
+        <tr>
+            <td><h5>security.kerberos.relogin.period</h5></td>
+            <td style="word-wrap: break-word;">1 min</td>
+            <td>Duration</td>
+            <td>The time period when keytab login happens automatically in 
order to always have a valid TGT.</td>
+        </tr>
         <tr>
             <td><h5>security.module.factory.classes</h5></td>
             <td style="word-wrap: 
break-word;">"org.apache.flink.runtime.security.modules.HadoopModuleFactory";<wbr>"org.apache.flink.runtime.security.modules.JaasModuleFactory";<wbr>"org.apache.flink.runtime.security.modules.ZookeeperModuleFactory"</td>
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index d48e04ed4e6..91473a8ca2c 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -68,6 +68,7 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Matchers;
+import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -97,13 +98,14 @@ import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
 /** Suite of FlinkKinesisConsumer tests for the methods called throughout the 
source life cycle. */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({FlinkKinesisConsumer.class, KinesisConfigUtil.class})
+@PrepareForTest(FlinkKinesisConsumer.class)
 public class FlinkKinesisConsumerTest extends TestLogger {
 
     // ----------------------------------------------------------------------
@@ -324,13 +326,12 @@ public class FlinkKinesisConsumerTest extends TestLogger {
         KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher();
 
         // assume the given config is correct
-        PowerMockito.mockStatic(KinesisConfigUtil.class);
-        PowerMockito.doNothing().when(KinesisConfigUtil.class);
-
-        TestableFlinkKinesisConsumer consumer =
-                new TestableFlinkKinesisConsumer("fakeStream", new 
Properties(), 10, 2);
-        consumer.open(new Configuration());
-        consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
+        try (MockedStatic<KinesisConfigUtil> kcu = 
mockStatic(KinesisConfigUtil.class)) {
+            TestableFlinkKinesisConsumer consumer =
+                    new TestableFlinkKinesisConsumer("fakeStream", new 
Properties(), 10, 2);
+            consumer.open(new Configuration());
+            consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
+        }
     }
 
     @Test
@@ -374,28 +375,28 @@ public class FlinkKinesisConsumerTest extends TestLogger {
         when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
 
         // assume the given config is correct
-        PowerMockito.mockStatic(KinesisConfigUtil.class);
-        PowerMockito.doNothing().when(KinesisConfigUtil.class);
-
-        // 
----------------------------------------------------------------------
-        // start to test fetcher's initial state seeding
-        // 
----------------------------------------------------------------------
-
-        TestableFlinkKinesisConsumer consumer =
-                new TestableFlinkKinesisConsumer("fakeStream", new 
Properties(), 10, 2);
-        consumer.initializeState(initializationContext);
-        consumer.open(new Configuration());
-        consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
-
-        for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard :
-                fakeRestoredState.entrySet()) {
-            Mockito.verify(mockedFetcher)
-                    .registerNewSubscribedShardState(
-                            new KinesisStreamShardState(
-                                    
KinesisDataFetcher.convertToStreamShardMetadata(
-                                            restoredShard.getKey()),
-                                    restoredShard.getKey(),
-                                    restoredShard.getValue()));
+        try (MockedStatic<KinesisConfigUtil> kcu = 
mockStatic(KinesisConfigUtil.class)) {
+
+            // 
----------------------------------------------------------------------
+            // start to test fetcher's initial state seeding
+            // 
----------------------------------------------------------------------
+
+            TestableFlinkKinesisConsumer consumer =
+                    new TestableFlinkKinesisConsumer("fakeStream", new 
Properties(), 10, 2);
+            consumer.initializeState(initializationContext);
+            consumer.open(new Configuration());
+            consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
+
+            for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard :
+                    fakeRestoredState.entrySet()) {
+                Mockito.verify(mockedFetcher)
+                        .registerNewSubscribedShardState(
+                                new KinesisStreamShardState(
+                                        
KinesisDataFetcher.convertToStreamShardMetadata(
+                                                restoredShard.getKey()),
+                                        restoredShard.getKey(),
+                                        restoredShard.getValue()));
+            }
         }
     }
 
@@ -451,40 +452,40 @@ public class FlinkKinesisConsumerTest extends TestLogger {
         when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
 
         // assume the given config is correct
-        PowerMockito.mockStatic(KinesisConfigUtil.class);
-        PowerMockito.doNothing().when(KinesisConfigUtil.class);
-
-        // 
----------------------------------------------------------------------
-        // start to test fetcher's initial state seeding
-        // 
----------------------------------------------------------------------
-
-        TestableFlinkKinesisConsumer consumer =
-                new TestableFlinkKinesisConsumer("fakeStream", new 
Properties(), 10, 2);
-        consumer.initializeState(initializationContext);
-        consumer.open(new Configuration());
-        consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
-
-        for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard :
-                fakeRestoredStateForOthers.entrySet()) {
-            // should never get restored state not belonging to itself
-            Mockito.verify(mockedFetcher, never())
-                    .registerNewSubscribedShardState(
-                            new KinesisStreamShardState(
-                                    
KinesisDataFetcher.convertToStreamShardMetadata(
-                                            restoredShard.getKey()),
-                                    restoredShard.getKey(),
-                                    restoredShard.getValue()));
-        }
-        for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard :
-                fakeRestoredState.entrySet()) {
-            // should get restored state belonging to itself
-            Mockito.verify(mockedFetcher)
-                    .registerNewSubscribedShardState(
-                            new KinesisStreamShardState(
-                                    
KinesisDataFetcher.convertToStreamShardMetadata(
-                                            restoredShard.getKey()),
-                                    restoredShard.getKey(),
-                                    restoredShard.getValue()));
+        try (MockedStatic<KinesisConfigUtil> kcu = 
mockStatic(KinesisConfigUtil.class)) {
+
+            // 
----------------------------------------------------------------------
+            // start to test fetcher's initial state seeding
+            // 
----------------------------------------------------------------------
+
+            TestableFlinkKinesisConsumer consumer =
+                    new TestableFlinkKinesisConsumer("fakeStream", new 
Properties(), 10, 2);
+            consumer.initializeState(initializationContext);
+            consumer.open(new Configuration());
+            consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
+
+            for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard :
+                    fakeRestoredStateForOthers.entrySet()) {
+                // should never get restored state not belonging to itself
+                Mockito.verify(mockedFetcher, never())
+                        .registerNewSubscribedShardState(
+                                new KinesisStreamShardState(
+                                        
KinesisDataFetcher.convertToStreamShardMetadata(
+                                                restoredShard.getKey()),
+                                        restoredShard.getKey(),
+                                        restoredShard.getValue()));
+            }
+            for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard :
+                    fakeRestoredState.entrySet()) {
+                // should get restored state belonging to itself
+                Mockito.verify(mockedFetcher)
+                        .registerNewSubscribedShardState(
+                                new KinesisStreamShardState(
+                                        
KinesisDataFetcher.convertToStreamShardMetadata(
+                                                restoredShard.getKey()),
+                                        restoredShard.getKey(),
+                                        restoredShard.getValue()));
+            }
         }
     }
 
@@ -564,33 +565,35 @@ public class FlinkKinesisConsumerTest extends TestLogger {
         when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
 
         // assume the given config is correct
-        PowerMockito.mockStatic(KinesisConfigUtil.class);
-        PowerMockito.doNothing().when(KinesisConfigUtil.class);
+        try (MockedStatic<KinesisConfigUtil> kcu = 
mockStatic(KinesisConfigUtil.class)) {
 
-        // 
----------------------------------------------------------------------
-        // start to test fetcher's initial state seeding
-        // 
----------------------------------------------------------------------
+            // 
----------------------------------------------------------------------
+            // start to test fetcher's initial state seeding
+            // 
----------------------------------------------------------------------
 
-        TestableFlinkKinesisConsumer consumer =
-                new TestableFlinkKinesisConsumer("fakeStream", new 
Properties(), 10, 2);
-        consumer.initializeState(initializationContext);
-        consumer.open(new Configuration());
-        consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
+            TestableFlinkKinesisConsumer consumer =
+                    new TestableFlinkKinesisConsumer("fakeStream", new 
Properties(), 10, 2);
+            consumer.initializeState(initializationContext);
+            consumer.open(new Configuration());
+            consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
 
-        fakeRestoredState.put(
-                new StreamShardHandle(
-                        "fakeStream2",
-                        new 
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
-                SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
-        for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard :
-                fakeRestoredState.entrySet()) {
-            Mockito.verify(mockedFetcher)
-                    .registerNewSubscribedShardState(
-                            new KinesisStreamShardState(
-                                    
KinesisDataFetcher.convertToStreamShardMetadata(
-                                            restoredShard.getKey()),
-                                    restoredShard.getKey(),
-                                    restoredShard.getValue()));
+            fakeRestoredState.put(
+                    new StreamShardHandle(
+                            "fakeStream2",
+                            new Shard()
+                                    .withShardId(
+                                            
KinesisShardIdGenerator.generateFromShardOrder(2))),
+                    
SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
+            for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard :
+                    fakeRestoredState.entrySet()) {
+                Mockito.verify(mockedFetcher)
+                        .registerNewSubscribedShardState(
+                                new KinesisStreamShardState(
+                                        
KinesisDataFetcher.convertToStreamShardMetadata(
+                                                restoredShard.getKey()),
+                                        restoredShard.getKey(),
+                                        restoredShard.getValue()));
+            }
         }
     }
 
@@ -709,26 +712,26 @@ public class FlinkKinesisConsumerTest extends TestLogger {
         when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
 
         // assume the given config is correct
-        PowerMockito.mockStatic(KinesisConfigUtil.class);
-        PowerMockito.doNothing().when(KinesisConfigUtil.class);
+        try (MockedStatic<KinesisConfigUtil> kcu = 
mockStatic(KinesisConfigUtil.class)) {
 
-        // 
----------------------------------------------------------------------
-        // start to test fetcher's initial state seeding
-        // 
----------------------------------------------------------------------
+            // 
----------------------------------------------------------------------
+            // start to test fetcher's initial state seeding
+            // 
----------------------------------------------------------------------
 
-        TestableFlinkKinesisConsumer consumer =
-                new TestableFlinkKinesisConsumer("fakeStream", new 
Properties(), 10, 2);
-        consumer.initializeState(initializationContext);
-        consumer.open(new Configuration());
-        consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
-
-        Mockito.verify(mockedFetcher)
-                .registerNewSubscribedShardState(
-                        new KinesisStreamShardState(
-                                
KinesisDataFetcher.convertToStreamShardMetadata(
-                                        closedStreamShardHandle),
-                                closedStreamShardHandle,
-                                
fakeRestoredState.get(closedStreamShardHandle)));
+            TestableFlinkKinesisConsumer consumer =
+                    new TestableFlinkKinesisConsumer("fakeStream", new 
Properties(), 10, 2);
+            consumer.initializeState(initializationContext);
+            consumer.open(new Configuration());
+            consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
+
+            Mockito.verify(mockedFetcher)
+                    .registerNewSubscribedShardState(
+                            new KinesisStreamShardState(
+                                    
KinesisDataFetcher.convertToStreamShardMetadata(
+                                            closedStreamShardHandle),
+                                    closedStreamShardHandle,
+                                    
fakeRestoredState.get(closedStreamShardHandle)));
+        }
     }
 
     private static final class TestingListState<T> implements ListState<T> {
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
index 34c71f5e3af..9c125959c84 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
@@ -21,6 +21,7 @@ package org.apache.flink.configuration;
 import org.apache.flink.annotation.docs.Documentation;
 import org.apache.flink.configuration.description.Description;
 
+import java.time.Duration;
 import java.util.List;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
@@ -123,6 +124,14 @@ public class SecurityOptions {
                                     + "You may need to disable this option, if 
you rely on submission mechanisms, e.g. Apache Oozie, "
                                     + "to handle delegation tokens.");
 
+    @Documentation.Section(Documentation.Sections.SECURITY_AUTH_KERBEROS)
+    public static final ConfigOption<Duration> KERBEROS_RELOGIN_PERIOD =
+            key("security.kerberos.relogin.period")
+                    .durationType()
+                    .defaultValue(Duration.ofMinutes(1))
+                    .withDescription(
+                            "The time period when keytab login happens 
automatically in order to always have a valid TGT.");
+
     // ------------------------------------------------------------------------
     //  ZooKeeper Security Options
     // ------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 01c5f5f1c1c..4242d241ff8 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -32,7 +32,6 @@ import org.apache.flink.configuration.JMXServerOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.SchedulerExecutionMode;
-import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.plugin.PluginManager;
@@ -48,7 +47,6 @@ import org.apache.flink.runtime.dispatcher.MiniDispatcher;
 import 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
 import 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
 import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
-import org.apache.flink.runtime.hadoop.HadoopDependency;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
@@ -68,8 +66,7 @@ import 
org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.security.contexts.SecurityContext;
 import org.apache.flink.runtime.security.token.DelegationTokenManager;
-import org.apache.flink.runtime.security.token.KerberosDelegationTokenManager;
-import org.apache.flink.runtime.security.token.NoOpDelegationTokenManager;
+import 
org.apache.flink.runtime.security.token.KerberosDelegationTokenManagerFactory;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import 
org.apache.flink.runtime.webmonitor.retriever.impl.RpcMetricQueryServiceRetriever;
 import org.apache.flink.util.AutoCloseableAsync;
@@ -392,11 +389,11 @@ public abstract class ClusterEntrypoint implements 
AutoCloseableAsync, FatalErro
             configuration.setString(BlobServerOptions.PORT, 
String.valueOf(blobServer.getPort()));
             heartbeatServices = createHeartbeatServices(configuration);
             delegationTokenManager =
-                    
configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN)
-                                    && 
HadoopDependency.isHadoopCommonOnClasspath(
-                                            getClass().getClassLoader())
-                            ? new KerberosDelegationTokenManager(configuration)
-                            : new NoOpDelegationTokenManager();
+                    KerberosDelegationTokenManagerFactory.create(
+                            getClass().getClassLoader(),
+                            configuration,
+                            commonRpcService.getScheduledExecutor(),
+                            ioExecutor);
             metricRegistry = createMetricRegistry(configuration, 
pluginManager, rpcSystem);
 
             final RpcService metricQueryServiceRpcService =
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 695507e8414..e07980aa5c0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -31,7 +31,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.blob.BlobClient;
@@ -54,7 +53,6 @@ import 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerCo
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
-import org.apache.flink.runtime.hadoop.HadoopDependency;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory;
@@ -90,8 +88,7 @@ import org.apache.flink.runtime.rpc.RpcSystem;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.security.token.DelegationTokenManager;
-import org.apache.flink.runtime.security.token.KerberosDelegationTokenManager;
-import org.apache.flink.runtime.security.token.NoOpDelegationTokenManager;
+import 
org.apache.flink.runtime.security.token.KerberosDelegationTokenManagerFactory;
 import org.apache.flink.runtime.taskexecutor.TaskExecutor;
 import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderRetriever;
@@ -425,11 +422,11 @@ public class MiniCluster implements AutoCloseableAsync {
                 heartbeatServices = 
HeartbeatServices.fromConfiguration(configuration);
 
                 delegationTokenManager =
-                        
configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN)
-                                        && 
HadoopDependency.isHadoopCommonOnClasspath(
-                                                getClass().getClassLoader())
-                                ? new 
KerberosDelegationTokenManager(configuration)
-                                : new NoOpDelegationTokenManager();
+                        KerberosDelegationTokenManagerFactory.create(
+                                getClass().getClassLoader(),
+                                configuration,
+                                commonRpcService.getScheduledExecutor(),
+                                ioExecutor);
 
                 blobCacheService =
                         BlobUtils.createBlobCacheService(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenManager.java
index 30594b7883f..a4efe308c4e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenManager.java
@@ -38,7 +38,7 @@ public interface DelegationTokenManager {
      * Creates a re-occurring task which obtains new tokens and automatically 
distributes them to
      * task managers.
      */
-    void start();
+    void start() throws Exception;
 
     /** Stops re-occurring token obtain task. */
     void stop();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java
index 6cf56de35d9..20fd44d6fc2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java
@@ -20,16 +20,27 @@ package org.apache.flink.runtime.security.token;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.security.SecurityConfiguration;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
 
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.ServiceLoader;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
+import static 
org.apache.flink.configuration.SecurityOptions.KERBEROS_RELOGIN_PERIOD;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * Manager for delegation tokens in a Flink cluster.
@@ -45,10 +56,25 @@ public class KerberosDelegationTokenManager implements 
DelegationTokenManager {
 
     private final Configuration configuration;
 
+    private final KerberosRenewalPossibleProvider 
kerberosRenewalPossibleProvider;
+
     @VisibleForTesting final Map<String, DelegationTokenProvider> 
delegationTokenProviders;
 
-    public KerberosDelegationTokenManager(Configuration configuration) {
+    @Nullable private final ScheduledExecutor scheduledExecutor;
+
+    @Nullable private final ExecutorService ioExecutor;
+
+    @Nullable private ScheduledFuture<?> tgtRenewalFuture;
+
+    public KerberosDelegationTokenManager(
+            Configuration configuration,
+            @Nullable ScheduledExecutor scheduledExecutor,
+            @Nullable ExecutorService ioExecutor) {
         this.configuration = checkNotNull(configuration, "Flink configuration 
must not be null");
+        this.scheduledExecutor = scheduledExecutor;
+        this.ioExecutor = ioExecutor;
+        this.kerberosRenewalPossibleProvider =
+                new KerberosRenewalPossibleProvider(new 
SecurityConfiguration(configuration));
         this.delegationTokenProviders = loadProviders();
     }
 
@@ -110,13 +136,66 @@ public class KerberosDelegationTokenManager implements 
DelegationTokenManager {
      * task managers.
      */
     @Override
-    public void start() {
-        LOG.info("Starting renewal task");
+    public void start() throws Exception {
+        checkNotNull(scheduledExecutor, "Scheduled executor must not be null");
+        checkNotNull(ioExecutor, "IO executor must not be null");
+        checkState(tgtRenewalFuture == null, "Manager is already started");
+
+        if (!kerberosRenewalPossibleProvider.isRenewalPossible()) {
+            LOG.info("Renewal is NOT possible, skipping to start renewal 
task");
+            return;
+        }
+
+        startTGTRenewal();
+    }
+
+    void startTGTRenewal() throws IOException {
+        LOG.debug("Starting TGT renewal task");
+
+        UserGroupInformation currentUser = 
UserGroupInformation.getCurrentUser();
+        if (currentUser.isFromKeytab()) {
+            // In Hadoop 2.x, renewal of the keytab-based login seems to be 
automatic, but in Hadoop
+            // 3.x, it is configurable (see 
hadoop.kerberos.keytab.login.autorenewal.enabled, added
+            // in HADOOP-9567). This task will make sure that the user stays 
logged in regardless of
+            // that configuration's value. Note that 
checkTGTAndReloginFromKeytab() is a no-op if
+            // the TGT does not need to be renewed yet.
+            long tgtRenewalPeriod = 
configuration.get(KERBEROS_RELOGIN_PERIOD).toMillis();
+            tgtRenewalFuture =
+                    scheduledExecutor.scheduleAtFixedRate(
+                            () ->
+                                    ioExecutor.execute(
+                                            () -> {
+                                                try {
+                                                    LOG.debug("Renewing TGT");
+                                                    
currentUser.checkTGTAndReloginFromKeytab();
+                                                    LOG.debug("TGT renewed 
successfully");
+                                                } catch (Exception e) {
+                                                    LOG.warn("Error while 
renewing TGT", e);
+                                                }
+                                            }),
+                            tgtRenewalPeriod,
+                            tgtRenewalPeriod,
+                            TimeUnit.MILLISECONDS);
+            LOG.debug("TGT renewal task started and reoccur in {} ms", 
tgtRenewalPeriod);
+        } else {
+            LOG.debug("TGT renewal task not started");
+        }
+    }
+
+    void stopTGTRenewal() {
+        if (tgtRenewalFuture != null) {
+            tgtRenewalFuture.cancel(true);
+            tgtRenewalFuture = null;
+        }
     }
 
     /** Stops re-occurring token obtain task. */
     @Override
     public void stop() {
-        LOG.info("Stopping renewal task");
+        LOG.info("Stopping credential renewal");
+
+        stopTGTRenewal();
+
+        LOG.info("Stopped credential renewal");
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerFactory.java
new file mode 100644
index 00000000000..60494b105d7
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.hadoop.HadoopDependency;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.ExecutorService;
+
+/** A factory for {@link KerberosDelegationTokenManager}. */
+public class KerberosDelegationTokenManagerFactory {
+
+    private static final Logger LOG =
+            
LoggerFactory.getLogger(KerberosDelegationTokenManagerFactory.class);
+
+    public static DelegationTokenManager create(
+            ClassLoader classLoader,
+            Configuration configuration,
+            @Nullable ScheduledExecutor scheduledExecutor,
+            @Nullable ExecutorService ioExecutor) {
+
+        if 
(configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN)) {
+            if (HadoopDependency.isHadoopCommonOnClasspath(classLoader)) {
+                return new KerberosDelegationTokenManager(
+                        configuration, scheduledExecutor, ioExecutor);
+            } else {
+                LOG.info(
+                        "Cannot use kerberos delegation token manager because 
Hadoop cannot be found in the Classpath.");
+                return new NoOpDelegationTokenManager();
+            }
+        } else {
+            return new NoOpDelegationTokenManager();
+        }
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosRenewalPossibleProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosRenewalPossibleProvider.java
new file mode 100644
index 00000000000..05a11806be8
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosRenewalPossibleProvider.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.runtime.security.SecurityConfiguration;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** It checks whether kerberos credentials can be renewed. */
+public class KerberosRenewalPossibleProvider {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(KerberosRenewalPossibleProvider.class);
+
+    private final SecurityConfiguration securityConfiguration;
+
+    public KerberosRenewalPossibleProvider(SecurityConfiguration 
securityConfiguration) {
+        this.securityConfiguration =
+                checkNotNull(
+                        securityConfiguration, "Flink security configuration 
must not be null");
+    }
+
+    public boolean isRenewalPossible() throws IOException {
+        if (!StringUtils.isBlank(securityConfiguration.getKeytab())
+                && !StringUtils.isBlank(securityConfiguration.getPrincipal())) 
{
+            LOG.debug("Login from keytab is possible");
+            return true;
+        }
+        LOG.debug("Login from keytab is NOT possible");
+
+        if (securityConfiguration.useTicketCache() && 
hasCurrentUserCredentials()) {
+            LOG.debug("Login from ticket cache is possible");
+            return true;
+        }
+        LOG.debug("Login from ticket cache is NOT possible");
+
+        return false;
+    }
+
+    protected boolean hasCurrentUserCredentials() throws IOException {
+        return UserGroupInformation.getCurrentUser().hasKerberosCredentials();
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerITCase.java
index 632f57ce878..34d17955922 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerITCase.java
@@ -58,6 +58,7 @@ import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -471,8 +472,17 @@ public class FileUploadHandlerITCase extends TestLogger {
             Class<?> clazz = Class.forName("java.io.DeleteOnExitHook");
             Field field = clazz.getDeclaredField("files");
             field.setAccessible(true);
-            LinkedHashSet files = (LinkedHashSet) field.get(null);
-            assertTrue(files.isEmpty());
+            LinkedHashSet<String> files = (LinkedHashSet<String>) 
field.get(null);
+            boolean fileFound = false;
+            // Mockito automatically registers mockitoboot*.jar for on-exit 
removal. Verify that
+            // there are no other files registered.
+            for (String file : files) {
+                if (!file.contains("mockitoboot")) {
+                    fileFound = true;
+                    break;
+                }
+            }
+            assertFalse(fileFound);
         } catch (ClassNotFoundException | IllegalAccessException | 
NoSuchFieldException e) {
             fail("This should never happen.");
         }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerTest.java
index 71454cd5501..3e6a5bd8846 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerTest.java
@@ -19,12 +19,24 @@
 package org.apache.flink.runtime.security.token;
 
 import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
+import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
 
-import org.junit.Test;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+
+import java.io.IOException;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /** Test for {@link DelegationTokenManager}. */
 public class KerberosDelegationTokenManagerTest {
@@ -34,7 +46,7 @@ public class KerberosDelegationTokenManagerTest {
         ExceptionThrowingDelegationTokenProvider.enabled = false;
         Configuration configuration = new Configuration();
         KerberosDelegationTokenManager delegationTokenManager =
-                new KerberosDelegationTokenManager(configuration);
+                new KerberosDelegationTokenManager(configuration, null, null);
 
         assertTrue(delegationTokenManager.isProviderEnabled("test"));
     }
@@ -45,24 +57,28 @@ public class KerberosDelegationTokenManagerTest {
         Configuration configuration = new Configuration();
         
configuration.setBoolean("security.kerberos.token.provider.test.enabled", 
false);
         KerberosDelegationTokenManager delegationTokenManager =
-                new KerberosDelegationTokenManager(configuration);
+                new KerberosDelegationTokenManager(configuration, null, null);
 
         assertFalse(delegationTokenManager.isProviderEnabled("test"));
     }
 
-    @Test(expected = Exception.class)
+    @Test
     public void configurationIsNullMustFailFast() {
-        new KerberosDelegationTokenManager(null);
+        assertThrows(Exception.class, () -> new 
KerberosDelegationTokenManager(null, null, null));
     }
 
-    @Test(expected = Exception.class)
+    @Test
     public void oneProviderThrowsExceptionMustFailFast() {
-        try {
-            ExceptionThrowingDelegationTokenProvider.enabled = true;
-            new KerberosDelegationTokenManager(new Configuration());
-        } finally {
-            ExceptionThrowingDelegationTokenProvider.enabled = false;
-        }
+        assertThrows(
+                Exception.class,
+                () -> {
+                    try {
+                        ExceptionThrowingDelegationTokenProvider.enabled = 
true;
+                        new KerberosDelegationTokenManager(new 
Configuration(), null, null);
+                    } finally {
+                        ExceptionThrowingDelegationTokenProvider.enabled = 
false;
+                    }
+                });
     }
 
     @Test
@@ -72,11 +88,38 @@ public class KerberosDelegationTokenManagerTest {
         Configuration configuration = new Configuration();
         
configuration.setBoolean("security.kerberos.token.provider.throw.enabled", 
false);
         KerberosDelegationTokenManager delegationTokenManager =
-                new KerberosDelegationTokenManager(configuration);
+                new KerberosDelegationTokenManager(configuration, null, null);
 
         assertEquals(1, 
delegationTokenManager.delegationTokenProviders.size());
         assertTrue(delegationTokenManager.isProviderLoaded("test"));
         assertTrue(ExceptionThrowingDelegationTokenProvider.constructed);
         assertFalse(delegationTokenManager.isProviderLoaded("throw"));
     }
+
+    @Test
+    public void testStartTGTRenewalShouldScheduleRenewal() throws IOException {
+        final ManuallyTriggeredScheduledExecutor scheduledExecutor =
+                new ManuallyTriggeredScheduledExecutor();
+        final ManuallyTriggeredScheduledExecutorService scheduler =
+                new ManuallyTriggeredScheduledExecutorService();
+        try (MockedStatic<UserGroupInformation> ugi = 
mockStatic(UserGroupInformation.class)) {
+            UserGroupInformation userGroupInformation = 
mock(UserGroupInformation.class);
+            when(userGroupInformation.isFromKeytab()).thenReturn(true);
+            
ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
+
+            ExceptionThrowingDelegationTokenProvider.enabled = false;
+            ExceptionThrowingDelegationTokenProvider.constructed = false;
+            Configuration configuration = new Configuration();
+            
configuration.setBoolean("security.kerberos.token.provider.throw.enabled", 
false);
+            KerberosDelegationTokenManager delegationTokenManager =
+                    new KerberosDelegationTokenManager(configuration, 
scheduledExecutor, scheduler);
+
+            delegationTokenManager.startTGTRenewal();
+            scheduledExecutor.triggerPeriodicScheduledTasks();
+            scheduler.triggerAll();
+            delegationTokenManager.stopTGTRenewal();
+
+            verify(userGroupInformation, 
times(1)).checkTGTAndReloginFromKeytab();
+        }
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosRenewalPossibleProviderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosRenewalPossibleProviderTest.java
new file mode 100644
index 00000000000..a52ad68f79f
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosRenewalPossibleProviderTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.security.SecurityConfiguration;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import static 
org.apache.flink.configuration.SecurityOptions.KERBEROS_LOGIN_KEYTAB;
+import static 
org.apache.flink.configuration.SecurityOptions.KERBEROS_LOGIN_PRINCIPAL;
+import static 
org.apache.flink.configuration.SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class KerberosRenewalPossibleProviderTest {
+    @Test
+    public void isRenewalPossibleMustGiveBackFalseByDefault() throws 
IOException {
+        Configuration configuration = new Configuration();
+        configuration.setBoolean(KERBEROS_LOGIN_USETICKETCACHE, false);
+        SecurityConfiguration securityConfiguration = new 
SecurityConfiguration(configuration);
+        KerberosRenewalPossibleProvider kerberosRenewalPossibleProvider =
+                new KerberosRenewalPossibleProvider(securityConfiguration);
+
+        assertFalse(kerberosRenewalPossibleProvider.isRenewalPossible());
+    }
+
+    @Test
+    public void isRenewalPossibleMustGiveBackTrueWhenKeytab(@TempDir Path 
tmpDir)
+            throws IOException {
+        Configuration configuration = new Configuration();
+        configuration.setString(KERBEROS_LOGIN_PRINCIPAL, "principal");
+        final Path keyTab = Files.createFile(tmpDir.resolve("test.keytab"));
+        configuration.setString(KERBEROS_LOGIN_KEYTAB, 
keyTab.toAbsolutePath().toString());
+        SecurityConfiguration securityConfiguration = new 
SecurityConfiguration(configuration);
+        KerberosRenewalPossibleProvider kerberosRenewalPossibleProvider =
+                new KerberosRenewalPossibleProvider(securityConfiguration);
+
+        assertTrue(kerberosRenewalPossibleProvider.isRenewalPossible());
+    }
+
+    @Test
+    public void isRenewalPossibleMustGiveBackTrueWhenTGT() throws IOException {
+        Configuration configuration = new Configuration();
+        configuration.setBoolean(KERBEROS_LOGIN_USETICKETCACHE, true);
+        SecurityConfiguration securityConfiguration = new 
SecurityConfiguration(configuration);
+        KerberosRenewalPossibleProvider kerberosRenewalPossibleProvider =
+                new KerberosRenewalPossibleProvider(securityConfiguration) {
+                    @Override
+                    protected boolean hasCurrentUserCredentials() {
+                        return true;
+                    }
+                };
+
+        assertTrue(kerberosRenewalPossibleProvider.isRenewalPossible());
+    }
+}
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 482e38f93ce..39c59a3b4f1 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -1294,7 +1294,7 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
         Credentials credentials = 
UserGroupInformation.getCurrentUser().getCredentials();
 
         DelegationTokenManager delegationTokenManager =
-                new KerberosDelegationTokenManager(flinkConfiguration);
+                new KerberosDelegationTokenManager(flinkConfiguration, null, 
null);
         delegationTokenManager.obtainDelegationTokens(credentials);
 
         ByteBuffer tokens = 
ByteBuffer.wrap(DelegationTokenConverter.serialize(credentials));
diff --git a/pom.xml b/pom.xml
index 29675294538..addd4ad8649 100644
--- a/pom.xml
+++ b/pom.xml
@@ -130,7 +130,7 @@ under the License.
                <junit4.version>4.13.2</junit4.version>
                <junit5.version>5.8.1</junit5.version>
                <archunit.version>0.22.0</archunit.version>
-               <mockito.version>2.21.0</mockito.version>
+               <mockito.version>3.4.6</mockito.version>
                <powermock.version>2.0.9</powermock.version>
                <hamcrest.version>1.3</hamcrest.version>
                <assertj.version>3.21.0</assertj.version>
@@ -227,6 +227,14 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>org.mockito</groupId>
+                       <artifactId>mockito-inline</artifactId>
+                       <version>${mockito.version}</version>
+                       <type>jar</type>
+                       <scope>test</scope>
+               </dependency>
+
                <dependency>
                        <groupId>org.mockito</groupId>
                        <artifactId>mockito-core</artifactId>
@@ -537,14 +545,14 @@ under the License.
                                <!-- mockito/powermock mismatch -->
                                <groupId>net.bytebuddy</groupId>
                                <artifactId>byte-buddy</artifactId>
-                               <version>1.8.22</version>
+                               <version>1.10.14</version>
                        </dependency>
 
                        <dependency>
                                <!-- mockito/powermock mismatch -->
                                <groupId>net.bytebuddy</groupId>
                                <artifactId>byte-buddy-agent</artifactId>
-                               <version>1.8.22</version>
+                               <version>1.10.14</version>
                        </dependency>
 
                        <!-- For dependency convergence -->

Reply via email to