dmvk commented on code in PR #19372:
URL: https://github.com/apache/flink/pull/19372#discussion_r849229800
##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -110,13 +126,84 @@ public void obtainDelegationTokens(Credentials
credentials) {
* task managers.
*/
@Override
- public void start() {
- LOG.info("Starting renewal task");
+ public void start() throws Exception {
+ checkState(renewalExecutor == null, "Manager is already started");
+
+ if (!isRenewalPossible()) {
+ LOG.info("Renewal is NOT possible, skipping to start renewal
task");
+ return;
+ }
+
+ ThreadFactory threadFactory =
+ new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("Credential Renewal Thread")
+ .build();
+ renewalExecutor = new ScheduledThreadPoolExecutor(1, threadFactory);
Review Comment:
🤔 After little bit of digging, there is actually one more (scheduled)
executor shared by the RPC services, that we could leverage. It should still be
used in combination with the `ioExecutor`, because blocking there could
potentially delay other critical things (eg. TM registration process).
Both `ioExecutor` and `rpcService.getScheduledExecutor()` are already
available in the `ClusterEntrypoint` so it should be straightforward to pass
them via constructor, without having to create additional factories.
Any thoughts?
```diff
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 01c5f5f1c1c..8ee77e80d9c 100755
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -395,7 +395,10 @@ public abstract class ClusterEntrypoint implements
AutoCloseableAsync, FatalErro
configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN)
&&
HadoopDependency.isHadoopCommonOnClasspath(
getClass().getClassLoader())
- ? new
KerberosDelegationTokenManager(configuration)
+ ? new KerberosDelegationTokenManager(
+ configuration,
+ commonRpcService.getScheduledExecutor(),
+ ioExecutor)
: new NoOpDelegationTokenManager();
metricRegistry = createMetricRegistry(configuration,
pluginManager, rpcSystem);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]