This is an automated email from the ASF dual-hosted git repository.
dmvk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 220ef999d2a [FLINK-26043][runtime][security] Add periodic kerberos
relogin to KerberosDelegationTokenManager [FLINK-27605][tests] Updated Mockito
version to 3.4.6 in order to use static method mocking
220ef999d2a is described below
commit 220ef999d2a353fd52cc0aa1a93c26d9b696c1ce
Author: gabor.g.somogyi <[email protected]>
AuthorDate: Mon Feb 14 14:23:17 2022 +0100
[FLINK-26043][runtime][security] Add periodic kerberos relogin to
KerberosDelegationTokenManager
[FLINK-27605][tests] Updated Mockito version to 3.4.6 in order to use
static method mocking
---
.../generated/security_auth_kerberos_section.html | 6 +
.../generated/security_configuration.html | 6 +
.../kinesis/FlinkKinesisConsumerTest.java | 215 +++++++++++----------
.../flink/configuration/SecurityOptions.java | 9 +
.../runtime/entrypoint/ClusterEntrypoint.java | 15 +-
.../flink/runtime/minicluster/MiniCluster.java | 15 +-
.../security/token/DelegationTokenManager.java | 2 +-
.../token/KerberosDelegationTokenManager.java | 87 ++++++++-
.../KerberosDelegationTokenManagerFactory.java | 58 ++++++
.../token/KerberosRenewalPossibleProvider.java | 66 +++++++
.../runtime/rest/FileUploadHandlerITCase.java | 14 +-
.../token/KerberosDelegationTokenManagerTest.java | 69 +++++--
.../token/KerberosRenewalPossibleProviderTest.java | 78 ++++++++
.../apache/flink/yarn/YarnClusterDescriptor.java | 2 +-
pom.xml | 14 +-
15 files changed, 508 insertions(+), 148 deletions(-)
diff --git
a/docs/layouts/shortcodes/generated/security_auth_kerberos_section.html
b/docs/layouts/shortcodes/generated/security_auth_kerberos_section.html
index 580b51dae73..638ee4c9ec0 100644
--- a/docs/layouts/shortcodes/generated/security_auth_kerberos_section.html
+++ b/docs/layouts/shortcodes/generated/security_auth_kerberos_section.html
@@ -38,5 +38,11 @@
<td>Boolean</td>
<td>Indicates whether to read from your Kerberos ticket cache.</td>
</tr>
+ <tr>
+ <td><h5>security.kerberos.relogin.period</h5></td>
+ <td style="word-wrap: break-word;">1 min</td>
+ <td>Duration</td>
+ <td>The time period when keytab login happens automatically in
order to always have a valid TGT.</td>
+ </tr>
</tbody>
</table>
diff --git a/docs/layouts/shortcodes/generated/security_configuration.html
b/docs/layouts/shortcodes/generated/security_configuration.html
index 2afa767a44f..87a3aab06d5 100644
--- a/docs/layouts/shortcodes/generated/security_configuration.html
+++ b/docs/layouts/shortcodes/generated/security_configuration.html
@@ -50,6 +50,12 @@
<td>Boolean</td>
<td>Indicates whether to read from your Kerberos ticket cache.</td>
</tr>
+ <tr>
+ <td><h5>security.kerberos.relogin.period</h5></td>
+ <td style="word-wrap: break-word;">1 min</td>
+ <td>Duration</td>
+ <td>The time period when keytab login happens automatically in
order to always have a valid TGT.</td>
+ </tr>
<tr>
<td><h5>security.module.factory.classes</h5></td>
<td style="word-wrap:
break-word;">"org.apache.flink.runtime.security.modules.HadoopModuleFactory";<wbr>"org.apache.flink.runtime.security.modules.JaasModuleFactory";<wbr>"org.apache.flink.runtime.security.modules.ZookeeperModuleFactory"</td>
diff --git
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index d48e04ed4e6..91473a8ca2c 100644
---
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -68,6 +68,7 @@ import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Matchers;
+import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -97,13 +98,14 @@ import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
/** Suite of FlinkKinesisConsumer tests for the methods called throughout the
source life cycle. */
@RunWith(PowerMockRunner.class)
-@PrepareForTest({FlinkKinesisConsumer.class, KinesisConfigUtil.class})
+@PrepareForTest(FlinkKinesisConsumer.class)
public class FlinkKinesisConsumerTest extends TestLogger {
// ----------------------------------------------------------------------
@@ -324,13 +326,12 @@ public class FlinkKinesisConsumerTest extends TestLogger {
KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher();
// assume the given config is correct
- PowerMockito.mockStatic(KinesisConfigUtil.class);
- PowerMockito.doNothing().when(KinesisConfigUtil.class);
-
- TestableFlinkKinesisConsumer consumer =
- new TestableFlinkKinesisConsumer("fakeStream", new
Properties(), 10, 2);
- consumer.open(new Configuration());
- consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
+ try (MockedStatic<KinesisConfigUtil> kcu =
mockStatic(KinesisConfigUtil.class)) {
+ TestableFlinkKinesisConsumer consumer =
+ new TestableFlinkKinesisConsumer("fakeStream", new
Properties(), 10, 2);
+ consumer.open(new Configuration());
+ consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
+ }
}
@Test
@@ -374,28 +375,28 @@ public class FlinkKinesisConsumerTest extends TestLogger {
when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
// assume the given config is correct
- PowerMockito.mockStatic(KinesisConfigUtil.class);
- PowerMockito.doNothing().when(KinesisConfigUtil.class);
-
- //
----------------------------------------------------------------------
- // start to test fetcher's initial state seeding
- //
----------------------------------------------------------------------
-
- TestableFlinkKinesisConsumer consumer =
- new TestableFlinkKinesisConsumer("fakeStream", new
Properties(), 10, 2);
- consumer.initializeState(initializationContext);
- consumer.open(new Configuration());
- consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
-
- for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard :
- fakeRestoredState.entrySet()) {
- Mockito.verify(mockedFetcher)
- .registerNewSubscribedShardState(
- new KinesisStreamShardState(
-
KinesisDataFetcher.convertToStreamShardMetadata(
- restoredShard.getKey()),
- restoredShard.getKey(),
- restoredShard.getValue()));
+ try (MockedStatic<KinesisConfigUtil> kcu =
mockStatic(KinesisConfigUtil.class)) {
+
+ //
----------------------------------------------------------------------
+ // start to test fetcher's initial state seeding
+ //
----------------------------------------------------------------------
+
+ TestableFlinkKinesisConsumer consumer =
+ new TestableFlinkKinesisConsumer("fakeStream", new
Properties(), 10, 2);
+ consumer.initializeState(initializationContext);
+ consumer.open(new Configuration());
+ consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
+
+ for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard :
+ fakeRestoredState.entrySet()) {
+ Mockito.verify(mockedFetcher)
+ .registerNewSubscribedShardState(
+ new KinesisStreamShardState(
+
KinesisDataFetcher.convertToStreamShardMetadata(
+ restoredShard.getKey()),
+ restoredShard.getKey(),
+ restoredShard.getValue()));
+ }
}
}
@@ -451,40 +452,40 @@ public class FlinkKinesisConsumerTest extends TestLogger {
when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
// assume the given config is correct
- PowerMockito.mockStatic(KinesisConfigUtil.class);
- PowerMockito.doNothing().when(KinesisConfigUtil.class);
-
- //
----------------------------------------------------------------------
- // start to test fetcher's initial state seeding
- //
----------------------------------------------------------------------
-
- TestableFlinkKinesisConsumer consumer =
- new TestableFlinkKinesisConsumer("fakeStream", new
Properties(), 10, 2);
- consumer.initializeState(initializationContext);
- consumer.open(new Configuration());
- consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
-
- for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard :
- fakeRestoredStateForOthers.entrySet()) {
- // should never get restored state not belonging to itself
- Mockito.verify(mockedFetcher, never())
- .registerNewSubscribedShardState(
- new KinesisStreamShardState(
-
KinesisDataFetcher.convertToStreamShardMetadata(
- restoredShard.getKey()),
- restoredShard.getKey(),
- restoredShard.getValue()));
- }
- for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard :
- fakeRestoredState.entrySet()) {
- // should get restored state belonging to itself
- Mockito.verify(mockedFetcher)
- .registerNewSubscribedShardState(
- new KinesisStreamShardState(
-
KinesisDataFetcher.convertToStreamShardMetadata(
- restoredShard.getKey()),
- restoredShard.getKey(),
- restoredShard.getValue()));
+ try (MockedStatic<KinesisConfigUtil> kcu =
mockStatic(KinesisConfigUtil.class)) {
+
+ //
----------------------------------------------------------------------
+ // start to test fetcher's initial state seeding
+ //
----------------------------------------------------------------------
+
+ TestableFlinkKinesisConsumer consumer =
+ new TestableFlinkKinesisConsumer("fakeStream", new
Properties(), 10, 2);
+ consumer.initializeState(initializationContext);
+ consumer.open(new Configuration());
+ consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
+
+ for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard :
+ fakeRestoredStateForOthers.entrySet()) {
+ // should never get restored state not belonging to itself
+ Mockito.verify(mockedFetcher, never())
+ .registerNewSubscribedShardState(
+ new KinesisStreamShardState(
+
KinesisDataFetcher.convertToStreamShardMetadata(
+ restoredShard.getKey()),
+ restoredShard.getKey(),
+ restoredShard.getValue()));
+ }
+ for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard :
+ fakeRestoredState.entrySet()) {
+ // should get restored state belonging to itself
+ Mockito.verify(mockedFetcher)
+ .registerNewSubscribedShardState(
+ new KinesisStreamShardState(
+
KinesisDataFetcher.convertToStreamShardMetadata(
+ restoredShard.getKey()),
+ restoredShard.getKey(),
+ restoredShard.getValue()));
+ }
}
}
@@ -564,33 +565,35 @@ public class FlinkKinesisConsumerTest extends TestLogger {
when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
// assume the given config is correct
- PowerMockito.mockStatic(KinesisConfigUtil.class);
- PowerMockito.doNothing().when(KinesisConfigUtil.class);
+ try (MockedStatic<KinesisConfigUtil> kcu =
mockStatic(KinesisConfigUtil.class)) {
- //
----------------------------------------------------------------------
- // start to test fetcher's initial state seeding
- //
----------------------------------------------------------------------
+ //
----------------------------------------------------------------------
+ // start to test fetcher's initial state seeding
+ //
----------------------------------------------------------------------
- TestableFlinkKinesisConsumer consumer =
- new TestableFlinkKinesisConsumer("fakeStream", new
Properties(), 10, 2);
- consumer.initializeState(initializationContext);
- consumer.open(new Configuration());
- consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
+ TestableFlinkKinesisConsumer consumer =
+ new TestableFlinkKinesisConsumer("fakeStream", new
Properties(), 10, 2);
+ consumer.initializeState(initializationContext);
+ consumer.open(new Configuration());
+ consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
- fakeRestoredState.put(
- new StreamShardHandle(
- "fakeStream2",
- new
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
- SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
- for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard :
- fakeRestoredState.entrySet()) {
- Mockito.verify(mockedFetcher)
- .registerNewSubscribedShardState(
- new KinesisStreamShardState(
-
KinesisDataFetcher.convertToStreamShardMetadata(
- restoredShard.getKey()),
- restoredShard.getKey(),
- restoredShard.getValue()));
+ fakeRestoredState.put(
+ new StreamShardHandle(
+ "fakeStream2",
+ new Shard()
+ .withShardId(
+
KinesisShardIdGenerator.generateFromShardOrder(2))),
+
SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
+ for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard :
+ fakeRestoredState.entrySet()) {
+ Mockito.verify(mockedFetcher)
+ .registerNewSubscribedShardState(
+ new KinesisStreamShardState(
+
KinesisDataFetcher.convertToStreamShardMetadata(
+ restoredShard.getKey()),
+ restoredShard.getKey(),
+ restoredShard.getValue()));
+ }
}
}
@@ -709,26 +712,26 @@ public class FlinkKinesisConsumerTest extends TestLogger {
when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
// assume the given config is correct
- PowerMockito.mockStatic(KinesisConfigUtil.class);
- PowerMockito.doNothing().when(KinesisConfigUtil.class);
+ try (MockedStatic<KinesisConfigUtil> kcu =
mockStatic(KinesisConfigUtil.class)) {
- //
----------------------------------------------------------------------
- // start to test fetcher's initial state seeding
- //
----------------------------------------------------------------------
+ //
----------------------------------------------------------------------
+ // start to test fetcher's initial state seeding
+ //
----------------------------------------------------------------------
- TestableFlinkKinesisConsumer consumer =
- new TestableFlinkKinesisConsumer("fakeStream", new
Properties(), 10, 2);
- consumer.initializeState(initializationContext);
- consumer.open(new Configuration());
- consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
-
- Mockito.verify(mockedFetcher)
- .registerNewSubscribedShardState(
- new KinesisStreamShardState(
-
KinesisDataFetcher.convertToStreamShardMetadata(
- closedStreamShardHandle),
- closedStreamShardHandle,
-
fakeRestoredState.get(closedStreamShardHandle)));
+ TestableFlinkKinesisConsumer consumer =
+ new TestableFlinkKinesisConsumer("fakeStream", new
Properties(), 10, 2);
+ consumer.initializeState(initializationContext);
+ consumer.open(new Configuration());
+ consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
+
+ Mockito.verify(mockedFetcher)
+ .registerNewSubscribedShardState(
+ new KinesisStreamShardState(
+
KinesisDataFetcher.convertToStreamShardMetadata(
+ closedStreamShardHandle),
+ closedStreamShardHandle,
+
fakeRestoredState.get(closedStreamShardHandle)));
+ }
}
private static final class TestingListState<T> implements ListState<T> {
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
index 34c71f5e3af..9c125959c84 100644
---
a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
+++
b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
@@ -21,6 +21,7 @@ package org.apache.flink.configuration;
import org.apache.flink.annotation.docs.Documentation;
import org.apache.flink.configuration.description.Description;
+import java.time.Duration;
import java.util.List;
import static org.apache.flink.configuration.ConfigOptions.key;
@@ -123,6 +124,14 @@ public class SecurityOptions {
+ "You may need to disable this option, if
you rely on submission mechanisms, e.g. Apache Oozie, "
+ "to handle delegation tokens.");
+ @Documentation.Section(Documentation.Sections.SECURITY_AUTH_KERBEROS)
+ public static final ConfigOption<Duration> KERBEROS_RELOGIN_PERIOD =
+ key("security.kerberos.relogin.period")
+ .durationType()
+ .defaultValue(Duration.ofMinutes(1))
+ .withDescription(
+ "The time period when keytab login happens
automatically in order to always have a valid TGT.");
+
// ------------------------------------------------------------------------
// ZooKeeper Security Options
// ------------------------------------------------------------------------
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 01c5f5f1c1c..4242d241ff8 100755
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -32,7 +32,6 @@ import org.apache.flink.configuration.JMXServerOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.SchedulerExecutionMode;
-import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.plugin.PluginManager;
@@ -48,7 +47,6 @@ import org.apache.flink.runtime.dispatcher.MiniDispatcher;
import
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
import
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
-import org.apache.flink.runtime.hadoop.HadoopDependency;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
@@ -68,8 +66,7 @@ import
org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.security.contexts.SecurityContext;
import org.apache.flink.runtime.security.token.DelegationTokenManager;
-import org.apache.flink.runtime.security.token.KerberosDelegationTokenManager;
-import org.apache.flink.runtime.security.token.NoOpDelegationTokenManager;
+import
org.apache.flink.runtime.security.token.KerberosDelegationTokenManagerFactory;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import
org.apache.flink.runtime.webmonitor.retriever.impl.RpcMetricQueryServiceRetriever;
import org.apache.flink.util.AutoCloseableAsync;
@@ -392,11 +389,11 @@ public abstract class ClusterEntrypoint implements
AutoCloseableAsync, FatalErro
configuration.setString(BlobServerOptions.PORT,
String.valueOf(blobServer.getPort()));
heartbeatServices = createHeartbeatServices(configuration);
delegationTokenManager =
-
configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN)
- &&
HadoopDependency.isHadoopCommonOnClasspath(
- getClass().getClassLoader())
- ? new KerberosDelegationTokenManager(configuration)
- : new NoOpDelegationTokenManager();
+ KerberosDelegationTokenManagerFactory.create(
+ getClass().getClassLoader(),
+ configuration,
+ commonRpcService.getScheduledExecutor(),
+ ioExecutor);
metricRegistry = createMetricRegistry(configuration,
pluginManager, rpcSystem);
final RpcService metricQueryServiceRpcService =
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 695507e8414..e07980aa5c0 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -31,7 +31,6 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.BlobClient;
@@ -54,7 +53,6 @@ import
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerCo
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
-import org.apache.flink.runtime.hadoop.HadoopDependency;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import
org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory;
@@ -90,8 +88,7 @@ import org.apache.flink.runtime.rpc.RpcSystem;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.security.token.DelegationTokenManager;
-import org.apache.flink.runtime.security.token.KerberosDelegationTokenManager;
-import org.apache.flink.runtime.security.token.NoOpDelegationTokenManager;
+import
org.apache.flink.runtime.security.token.KerberosDelegationTokenManagerFactory;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
import org.apache.flink.runtime.webmonitor.retriever.LeaderRetriever;
@@ -425,11 +422,11 @@ public class MiniCluster implements AutoCloseableAsync {
heartbeatServices =
HeartbeatServices.fromConfiguration(configuration);
delegationTokenManager =
-
configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN)
- &&
HadoopDependency.isHadoopCommonOnClasspath(
- getClass().getClassLoader())
- ? new
KerberosDelegationTokenManager(configuration)
- : new NoOpDelegationTokenManager();
+ KerberosDelegationTokenManagerFactory.create(
+ getClass().getClassLoader(),
+ configuration,
+ commonRpcService.getScheduledExecutor(),
+ ioExecutor);
blobCacheService =
BlobUtils.createBlobCacheService(
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenManager.java
index 30594b7883f..a4efe308c4e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenManager.java
@@ -38,7 +38,7 @@ public interface DelegationTokenManager {
* Creates a re-occurring task which obtains new tokens and automatically
distributes them to
* task managers.
*/
- void start();
+ void start() throws Exception;
/** Stops re-occurring token obtain task. */
void stop();
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java
index 6cf56de35d9..20fd44d6fc2 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java
@@ -20,16 +20,27 @@ package org.apache.flink.runtime.security.token;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.security.SecurityConfiguration;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.ServiceLoader;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import static
org.apache.flink.configuration.SecurityOptions.KERBEROS_RELOGIN_PERIOD;
import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
/**
* Manager for delegation tokens in a Flink cluster.
@@ -45,10 +56,25 @@ public class KerberosDelegationTokenManager implements
DelegationTokenManager {
private final Configuration configuration;
+ private final KerberosRenewalPossibleProvider
kerberosRenewalPossibleProvider;
+
@VisibleForTesting final Map<String, DelegationTokenProvider>
delegationTokenProviders;
- public KerberosDelegationTokenManager(Configuration configuration) {
+ @Nullable private final ScheduledExecutor scheduledExecutor;
+
+ @Nullable private final ExecutorService ioExecutor;
+
+ @Nullable private ScheduledFuture<?> tgtRenewalFuture;
+
+ public KerberosDelegationTokenManager(
+ Configuration configuration,
+ @Nullable ScheduledExecutor scheduledExecutor,
+ @Nullable ExecutorService ioExecutor) {
this.configuration = checkNotNull(configuration, "Flink configuration
must not be null");
+ this.scheduledExecutor = scheduledExecutor;
+ this.ioExecutor = ioExecutor;
+ this.kerberosRenewalPossibleProvider =
+ new KerberosRenewalPossibleProvider(new
SecurityConfiguration(configuration));
this.delegationTokenProviders = loadProviders();
}
@@ -110,13 +136,66 @@ public class KerberosDelegationTokenManager implements
DelegationTokenManager {
* task managers.
*/
@Override
- public void start() {
- LOG.info("Starting renewal task");
+ public void start() throws Exception {
+ checkNotNull(scheduledExecutor, "Scheduled executor must not be null");
+ checkNotNull(ioExecutor, "IO executor must not be null");
+ checkState(tgtRenewalFuture == null, "Manager is already started");
+
+ if (!kerberosRenewalPossibleProvider.isRenewalPossible()) {
+ LOG.info("Renewal is NOT possible, skipping to start renewal
task");
+ return;
+ }
+
+ startTGTRenewal();
+ }
+
+ void startTGTRenewal() throws IOException {
+ LOG.debug("Starting TGT renewal task");
+
+ UserGroupInformation currentUser =
UserGroupInformation.getCurrentUser();
+ if (currentUser.isFromKeytab()) {
+ // In Hadoop 2.x, renewal of the keytab-based login seems to be
automatic, but in Hadoop
+ // 3.x, it is configurable (see
hadoop.kerberos.keytab.login.autorenewal.enabled, added
+ // in HADOOP-9567). This task will make sure that the user stays
logged in regardless of
+ // that configuration's value. Note that
checkTGTAndReloginFromKeytab() is a no-op if
+ // the TGT does not need to be renewed yet.
+ long tgtRenewalPeriod =
configuration.get(KERBEROS_RELOGIN_PERIOD).toMillis();
+ tgtRenewalFuture =
+ scheduledExecutor.scheduleAtFixedRate(
+ () ->
+ ioExecutor.execute(
+ () -> {
+ try {
+ LOG.debug("Renewing TGT");
+
currentUser.checkTGTAndReloginFromKeytab();
+ LOG.debug("TGT renewed
successfully");
+ } catch (Exception e) {
+ LOG.warn("Error while
renewing TGT", e);
+ }
+ }),
+ tgtRenewalPeriod,
+ tgtRenewalPeriod,
+ TimeUnit.MILLISECONDS);
+ LOG.debug("TGT renewal task started and reoccur in {} ms",
tgtRenewalPeriod);
+ } else {
+ LOG.debug("TGT renewal task not started");
+ }
+ }
+
+ void stopTGTRenewal() {
+ if (tgtRenewalFuture != null) {
+ tgtRenewalFuture.cancel(true);
+ tgtRenewalFuture = null;
+ }
}
/** Stops re-occurring token obtain task. */
@Override
public void stop() {
- LOG.info("Stopping renewal task");
+ LOG.info("Stopping credential renewal");
+
+ stopTGTRenewal();
+
+ LOG.info("Stopped credential renewal");
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerFactory.java
new file mode 100644
index 00000000000..60494b105d7
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.hadoop.HadoopDependency;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.ExecutorService;
+
+/** A factory for {@link KerberosDelegationTokenManager}. */
+public class KerberosDelegationTokenManagerFactory {
+
+ private static final Logger LOG =
+
LoggerFactory.getLogger(KerberosDelegationTokenManagerFactory.class);
+
+ public static DelegationTokenManager create(
+ ClassLoader classLoader,
+ Configuration configuration,
+ @Nullable ScheduledExecutor scheduledExecutor,
+ @Nullable ExecutorService ioExecutor) {
+
+ if
(configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN)) {
+ if (HadoopDependency.isHadoopCommonOnClasspath(classLoader)) {
+ return new KerberosDelegationTokenManager(
+ configuration, scheduledExecutor, ioExecutor);
+ } else {
+ LOG.info(
+ "Cannot use kerberos delegation token manager because
Hadoop cannot be found in the Classpath.");
+ return new NoOpDelegationTokenManager();
+ }
+ } else {
+ return new NoOpDelegationTokenManager();
+ }
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosRenewalPossibleProvider.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosRenewalPossibleProvider.java
new file mode 100644
index 00000000000..05a11806be8
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosRenewalPossibleProvider.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.runtime.security.SecurityConfiguration;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** It checks whether kerberos credentials can be renewed. */
+public class KerberosRenewalPossibleProvider {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(KerberosRenewalPossibleProvider.class);
+
+ private final SecurityConfiguration securityConfiguration;
+
+ public KerberosRenewalPossibleProvider(SecurityConfiguration
securityConfiguration) {
+ this.securityConfiguration =
+ checkNotNull(
+ securityConfiguration, "Flink security configuration
must not be null");
+ }
+
+ public boolean isRenewalPossible() throws IOException {
+ if (!StringUtils.isBlank(securityConfiguration.getKeytab())
+ && !StringUtils.isBlank(securityConfiguration.getPrincipal()))
{
+ LOG.debug("Login from keytab is possible");
+ return true;
+ }
+ LOG.debug("Login from keytab is NOT possible");
+
+ if (securityConfiguration.useTicketCache() &&
hasCurrentUserCredentials()) {
+ LOG.debug("Login from ticket cache is possible");
+ return true;
+ }
+ LOG.debug("Login from ticket cache is NOT possible");
+
+ return false;
+ }
+
+ protected boolean hasCurrentUserCredentials() throws IOException {
+ return UserGroupInformation.getCurrentUser().hasKerberosCredentials();
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerITCase.java
index 632f57ce878..34d17955922 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerITCase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerITCase.java
@@ -58,6 +58,7 @@ import java.util.stream.Collectors;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -471,8 +472,17 @@ public class FileUploadHandlerITCase extends TestLogger {
Class<?> clazz = Class.forName("java.io.DeleteOnExitHook");
Field field = clazz.getDeclaredField("files");
field.setAccessible(true);
- LinkedHashSet files = (LinkedHashSet) field.get(null);
- assertTrue(files.isEmpty());
+ LinkedHashSet<String> files = (LinkedHashSet<String>)
field.get(null);
+ boolean fileFound = false;
+ // Mockito automatically registers mockitoboot*.jar for on-exit
removal. Verify that
+ // there are no other files registered.
+ for (String file : files) {
+ if (!file.contains("mockitoboot")) {
+ fileFound = true;
+ break;
+ }
+ }
+ assertFalse(fileFound);
} catch (ClassNotFoundException | IllegalAccessException |
NoSuchFieldException e) {
fail("This should never happen.");
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerTest.java
index 71454cd5501..3e6a5bd8846 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerTest.java
@@ -19,12 +19,24 @@
package org.apache.flink.runtime.security.token;
import org.apache.flink.configuration.Configuration;
+import
org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
+import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
-import org.junit.Test;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+
+import java.io.IOException;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
/** Test for {@link DelegationTokenManager}. */
public class KerberosDelegationTokenManagerTest {
@@ -34,7 +46,7 @@ public class KerberosDelegationTokenManagerTest {
ExceptionThrowingDelegationTokenProvider.enabled = false;
Configuration configuration = new Configuration();
KerberosDelegationTokenManager delegationTokenManager =
- new KerberosDelegationTokenManager(configuration);
+ new KerberosDelegationTokenManager(configuration, null, null);
assertTrue(delegationTokenManager.isProviderEnabled("test"));
}
@@ -45,24 +57,28 @@ public class KerberosDelegationTokenManagerTest {
Configuration configuration = new Configuration();
configuration.setBoolean("security.kerberos.token.provider.test.enabled",
false);
KerberosDelegationTokenManager delegationTokenManager =
- new KerberosDelegationTokenManager(configuration);
+ new KerberosDelegationTokenManager(configuration, null, null);
assertFalse(delegationTokenManager.isProviderEnabled("test"));
}
- @Test(expected = Exception.class)
+ @Test
public void configurationIsNullMustFailFast() {
- new KerberosDelegationTokenManager(null);
+ assertThrows(Exception.class, () -> new
KerberosDelegationTokenManager(null, null, null));
}
- @Test(expected = Exception.class)
+ @Test
public void oneProviderThrowsExceptionMustFailFast() {
- try {
- ExceptionThrowingDelegationTokenProvider.enabled = true;
- new KerberosDelegationTokenManager(new Configuration());
- } finally {
- ExceptionThrowingDelegationTokenProvider.enabled = false;
- }
+ assertThrows(
+ Exception.class,
+ () -> {
+ try {
+ ExceptionThrowingDelegationTokenProvider.enabled =
true;
+ new KerberosDelegationTokenManager(new
Configuration(), null, null);
+ } finally {
+ ExceptionThrowingDelegationTokenProvider.enabled =
false;
+ }
+ });
}
@Test
@@ -72,11 +88,38 @@ public class KerberosDelegationTokenManagerTest {
Configuration configuration = new Configuration();
configuration.setBoolean("security.kerberos.token.provider.throw.enabled",
false);
KerberosDelegationTokenManager delegationTokenManager =
- new KerberosDelegationTokenManager(configuration);
+ new KerberosDelegationTokenManager(configuration, null, null);
assertEquals(1,
delegationTokenManager.delegationTokenProviders.size());
assertTrue(delegationTokenManager.isProviderLoaded("test"));
assertTrue(ExceptionThrowingDelegationTokenProvider.constructed);
assertFalse(delegationTokenManager.isProviderLoaded("throw"));
}
+
+ @Test
+ public void testStartTGTRenewalShouldScheduleRenewal() throws IOException {
+ final ManuallyTriggeredScheduledExecutor scheduledExecutor =
+ new ManuallyTriggeredScheduledExecutor();
+ final ManuallyTriggeredScheduledExecutorService scheduler =
+ new ManuallyTriggeredScheduledExecutorService();
+ try (MockedStatic<UserGroupInformation> ugi =
mockStatic(UserGroupInformation.class)) {
+ UserGroupInformation userGroupInformation =
mock(UserGroupInformation.class);
+ when(userGroupInformation.isFromKeytab()).thenReturn(true);
+
ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
+
+ ExceptionThrowingDelegationTokenProvider.enabled = false;
+ ExceptionThrowingDelegationTokenProvider.constructed = false;
+ Configuration configuration = new Configuration();
+
configuration.setBoolean("security.kerberos.token.provider.throw.enabled",
false);
+ KerberosDelegationTokenManager delegationTokenManager =
+ new KerberosDelegationTokenManager(configuration,
scheduledExecutor, scheduler);
+
+ delegationTokenManager.startTGTRenewal();
+ scheduledExecutor.triggerPeriodicScheduledTasks();
+ scheduler.triggerAll();
+ delegationTokenManager.stopTGTRenewal();
+
+ verify(userGroupInformation,
times(1)).checkTGTAndReloginFromKeytab();
+ }
+ }
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosRenewalPossibleProviderTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosRenewalPossibleProviderTest.java
new file mode 100644
index 00000000000..a52ad68f79f
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosRenewalPossibleProviderTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.security.SecurityConfiguration;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import static
org.apache.flink.configuration.SecurityOptions.KERBEROS_LOGIN_KEYTAB;
+import static
org.apache.flink.configuration.SecurityOptions.KERBEROS_LOGIN_PRINCIPAL;
+import static
org.apache.flink.configuration.SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class KerberosRenewalPossibleProviderTest {
+ @Test
+ public void isRenewalPossibleMustGiveBackFalseByDefault() throws
IOException {
+ Configuration configuration = new Configuration();
+ configuration.setBoolean(KERBEROS_LOGIN_USETICKETCACHE, false);
+ SecurityConfiguration securityConfiguration = new
SecurityConfiguration(configuration);
+ KerberosRenewalPossibleProvider kerberosRenewalPossibleProvider =
+ new KerberosRenewalPossibleProvider(securityConfiguration);
+
+ assertFalse(kerberosRenewalPossibleProvider.isRenewalPossible());
+ }
+
+ @Test
+ public void isRenewalPossibleMustGiveBackTrueWhenKeytab(@TempDir Path
tmpDir)
+ throws IOException {
+ Configuration configuration = new Configuration();
+ configuration.setString(KERBEROS_LOGIN_PRINCIPAL, "principal");
+ final Path keyTab = Files.createFile(tmpDir.resolve("test.keytab"));
+ configuration.setString(KERBEROS_LOGIN_KEYTAB,
keyTab.toAbsolutePath().toString());
+ SecurityConfiguration securityConfiguration = new
SecurityConfiguration(configuration);
+ KerberosRenewalPossibleProvider kerberosRenewalPossibleProvider =
+ new KerberosRenewalPossibleProvider(securityConfiguration);
+
+ assertTrue(kerberosRenewalPossibleProvider.isRenewalPossible());
+ }
+
+ @Test
+ public void isRenewalPossibleMustGiveBackTrueWhenTGT() throws IOException {
+ Configuration configuration = new Configuration();
+ configuration.setBoolean(KERBEROS_LOGIN_USETICKETCACHE, true);
+ SecurityConfiguration securityConfiguration = new
SecurityConfiguration(configuration);
+ KerberosRenewalPossibleProvider kerberosRenewalPossibleProvider =
+ new KerberosRenewalPossibleProvider(securityConfiguration) {
+ @Override
+ protected boolean hasCurrentUserCredentials() {
+ return true;
+ }
+ };
+
+ assertTrue(kerberosRenewalPossibleProvider.isRenewalPossible());
+ }
+}
diff --git
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 482e38f93ce..39c59a3b4f1 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -1294,7 +1294,7 @@ public class YarnClusterDescriptor implements
ClusterDescriptor<ApplicationId> {
Credentials credentials =
UserGroupInformation.getCurrentUser().getCredentials();
DelegationTokenManager delegationTokenManager =
- new KerberosDelegationTokenManager(flinkConfiguration);
+ new KerberosDelegationTokenManager(flinkConfiguration, null,
null);
delegationTokenManager.obtainDelegationTokens(credentials);
ByteBuffer tokens =
ByteBuffer.wrap(DelegationTokenConverter.serialize(credentials));
diff --git a/pom.xml b/pom.xml
index 29675294538..addd4ad8649 100644
--- a/pom.xml
+++ b/pom.xml
@@ -130,7 +130,7 @@ under the License.
<junit4.version>4.13.2</junit4.version>
<junit5.version>5.8.1</junit5.version>
<archunit.version>0.22.0</archunit.version>
- <mockito.version>2.21.0</mockito.version>
+ <mockito.version>3.4.6</mockito.version>
<powermock.version>2.0.9</powermock.version>
<hamcrest.version>1.3</hamcrest.version>
<assertj.version>3.21.0</assertj.version>
@@ -227,6 +227,14 @@ under the License.
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-inline</artifactId>
+ <version>${mockito.version}</version>
+ <type>jar</type>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
@@ -537,14 +545,14 @@ under the License.
<!-- mockito/powermock mismatch -->
<groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy</artifactId>
- <version>1.8.22</version>
+ <version>1.10.14</version>
</dependency>
<dependency>
<!-- mockito/powermock mismatch -->
<groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy-agent</artifactId>
- <version>1.8.22</version>
+ <version>1.10.14</version>
</dependency>
<!-- For dependency convergence -->