dmvk commented on code in PR #19372:
URL: https://github.com/apache/flink/pull/19372#discussion_r849445045


##########
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java:
##########
@@ -424,12 +424,21 @@ public void start() throws Exception {
 
                 heartbeatServices = 
HeartbeatServices.fromConfiguration(configuration);
 
-                delegationTokenManager =
-                        
configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN)
-                                        && 
HadoopDependency.isHadoopCommonOnClasspath(
-                                                getClass().getClassLoader())
-                                ? new 
KerberosDelegationTokenManager(configuration)
-                                : new NoOpDelegationTokenManager();
+                if 
(configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN)) {
+                    if 
(HadoopDependency.isHadoopCommonOnClasspath(getClass().getClassLoader())) {
+                        delegationTokenManager =
+                                new KerberosDelegationTokenManager(
+                                        configuration,
+                                        
commonRpcService.getScheduledExecutor(),
+                                        ioExecutor);
+                    } else {
+                        LOG.info(
+                                "Cannot use kerberos delegation token manager 
because Hadoop cannot be found in the Classpath.");
+                        delegationTokenManager = new 
NoOpDelegationTokenManager();
+                    }
+                } else {
+                    delegationTokenManager = new NoOpDelegationTokenManager();
+                }

Review Comment:
   Can you please hide this behind a static method factory (eg. in 
`DelegationTokenManagerUtils`) so it can be reused by both `MiniCluster` and 
`ClusterEntrypoint`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -45,10 +56,28 @@ public class KerberosDelegationTokenManager implements 
DelegationTokenManager {
 
     private final Configuration configuration;
 
+    private final SecurityConfiguration securityConfiguration;
+
+    private final KerberosRenewalPossibleProvider 
kerberosRenewalPossibleProvider;
+
     @VisibleForTesting final Map<String, DelegationTokenProvider> 
delegationTokenProviders;
 
-    public KerberosDelegationTokenManager(Configuration configuration) {
+    private final ScheduledExecutor scheduledExecutor;
+
+    private final ExecutorService executorService;
+
+    private ScheduledFuture<?> tgtRenewalFuture;

Review Comment:
   ```suggestion
       @Nullable
       private ScheduledFuture<?> tgtRenewalFuture;
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -110,13 +139,62 @@ public void obtainDelegationTokens(Credentials 
credentials) {
      * task managers.
      */
     @Override
-    public void start() {
-        LOG.info("Starting renewal task");
+    public void start() throws Exception {
+        checkNotNull(scheduledExecutor, "Scheduled executor must not be null");
+        checkNotNull(executorService, "Executor service must not be null");
+        checkState(tgtRenewalFuture == null, "Manager is already started");
+
+        if (!kerberosRenewalPossibleProvider.isRenewalPossible()) {
+            LOG.info("Renewal is NOT possible, skipping to start renewal 
task");
+            return;
+        }
+
+        startTGTRenewal();
+    }
+
+    private void startTGTRenewal() throws IOException {
+        LOG.debug("Starting credential renewal task");
+
+        UserGroupInformation currentUser = 
UserGroupInformation.getCurrentUser();
+        if (currentUser.isFromKeytab()) {
+            // In Hadoop 2.x, renewal of the keytab-based login seems to be 
automatic, but in Hadoop
+            // 3.x, it is configurable (see 
hadoop.kerberos.keytab.login.autorenewal.enabled, added
+            // in HADOOP-9567). This task will make sure that the user stays 
logged in regardless of
+            // that configuration's value. Note that 
checkTGTAndReloginFromKeytab() is a no-op if
+            // the TGT does not need to be renewed yet.
+            long tgtRenewalPeriod = 
configuration.get(KERBEROS_RELOGIN_PERIOD).toMillis();
+            tgtRenewalFuture =
+                    scheduledExecutor.scheduleAtFixedRate(
+                            () ->
+                                    executorService.execute(
+                                            () -> {
+                                                try {
+                                                    LOG.debug("Renewing TGT");
+                                                    
currentUser.checkTGTAndReloginFromKeytab();
+                                                    LOG.debug("TGT renewed 
successfully");
+                                                } catch (Exception e) {
+                                                    LOG.error("Error while 
renewing TGT", e);

Review Comment:
   warn



##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -45,10 +56,28 @@ public class KerberosDelegationTokenManager implements 
DelegationTokenManager {
 
     private final Configuration configuration;
 
+    private final SecurityConfiguration securityConfiguration;
+
+    private final KerberosRenewalPossibleProvider 
kerberosRenewalPossibleProvider;
+
     @VisibleForTesting final Map<String, DelegationTokenProvider> 
delegationTokenProviders;
 
-    public KerberosDelegationTokenManager(Configuration configuration) {
+    private final ScheduledExecutor scheduledExecutor;
+
+    private final ExecutorService executorService;
+
+    private ScheduledFuture<?> tgtRenewalFuture;
+
+    public KerberosDelegationTokenManager(
+            Configuration configuration,
+            @Nullable ScheduledExecutor scheduledExecutor,
+            @Nullable ExecutorService executorService) {

Review Comment:
   It feels off that this could be null just because of one usage of 
`DelegationTokenManager#obtainDelegationTokens` in  the 
`YarnClusterDescriptor`. Could it be the case that this method doesn't really 
belong into this interface?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -45,10 +56,28 @@ public class KerberosDelegationTokenManager implements 
DelegationTokenManager {
 
     private final Configuration configuration;
 
+    private final SecurityConfiguration securityConfiguration;

Review Comment:
   this doesn't seem to need an instance variable



##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -45,10 +56,28 @@ public class KerberosDelegationTokenManager implements 
DelegationTokenManager {
 
     private final Configuration configuration;
 
+    private final SecurityConfiguration securityConfiguration;
+
+    private final KerberosRenewalPossibleProvider 
kerberosRenewalPossibleProvider;
+
     @VisibleForTesting final Map<String, DelegationTokenProvider> 
delegationTokenProviders;
 
-    public KerberosDelegationTokenManager(Configuration configuration) {
+    private final ScheduledExecutor scheduledExecutor;
+
+    private final ExecutorService executorService;

Review Comment:
   `ioExecutor` would be more explicit as it's commonly used throughout the JM 
components.
   ```suggestion
       private final ExecutorService ioExecutor;
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -110,13 +139,62 @@ public void obtainDelegationTokens(Credentials 
credentials) {
      * task managers.
      */
     @Override
-    public void start() {
-        LOG.info("Starting renewal task");
+    public void start() throws Exception {
+        checkNotNull(scheduledExecutor, "Scheduled executor must not be null");
+        checkNotNull(executorService, "Executor service must not be null");
+        checkState(tgtRenewalFuture == null, "Manager is already started");
+
+        if (!kerberosRenewalPossibleProvider.isRenewalPossible()) {
+            LOG.info("Renewal is NOT possible, skipping to start renewal 
task");
+            return;
+        }
+
+        startTGTRenewal();
+    }
+
+    private void startTGTRenewal() throws IOException {
+        LOG.debug("Starting credential renewal task");
+
+        UserGroupInformation currentUser = 
UserGroupInformation.getCurrentUser();
+        if (currentUser.isFromKeytab()) {
+            // In Hadoop 2.x, renewal of the keytab-based login seems to be 
automatic, but in Hadoop
+            // 3.x, it is configurable (see 
hadoop.kerberos.keytab.login.autorenewal.enabled, added
+            // in HADOOP-9567). This task will make sure that the user stays 
logged in regardless of
+            // that configuration's value. Note that 
checkTGTAndReloginFromKeytab() is a no-op if
+            // the TGT does not need to be renewed yet.
+            long tgtRenewalPeriod = 
configuration.get(KERBEROS_RELOGIN_PERIOD).toMillis();
+            tgtRenewalFuture =
+                    scheduledExecutor.scheduleAtFixedRate(
+                            () ->
+                                    executorService.execute(
+                                            () -> {
+                                                try {
+                                                    LOG.debug("Renewing TGT");
+                                                    
currentUser.checkTGTAndReloginFromKeytab();

Review Comment:
   Can you please write a test case for the periodic renewal? 
(`ManuallyTriggeredScheduledExecutorService` is usually pretty helpful for this)
   
   The main reason I'm bringing this up that this still uses Hadoop's UGI 
internals, which makes this mostly untestable. My feeling is that 
`KerberosRenewalPossibleProvider` should become something that completely hides 
the Hadoop internals from DTM (eg. `KerberosClient` with two implementations: 
`HadoopKerberosClient` and `TestingKerberosClient` -> see eg. 
`TestingDeclarativeSlotPool` for how the "testing" implementations in Flink are 
usually created).



##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -110,13 +139,62 @@ public void obtainDelegationTokens(Credentials 
credentials) {
      * task managers.
      */
     @Override
-    public void start() {
-        LOG.info("Starting renewal task");
+    public void start() throws Exception {
+        checkNotNull(scheduledExecutor, "Scheduled executor must not be null");
+        checkNotNull(executorService, "Executor service must not be null");
+        checkState(tgtRenewalFuture == null, "Manager is already started");
+
+        if (!kerberosRenewalPossibleProvider.isRenewalPossible()) {
+            LOG.info("Renewal is NOT possible, skipping to start renewal 
task");
+            return;
+        }
+
+        startTGTRenewal();
+    }
+
+    private void startTGTRenewal() throws IOException {
+        LOG.debug("Starting credential renewal task");
+
+        UserGroupInformation currentUser = 
UserGroupInformation.getCurrentUser();
+        if (currentUser.isFromKeytab()) {
+            // In Hadoop 2.x, renewal of the keytab-based login seems to be 
automatic, but in Hadoop
+            // 3.x, it is configurable (see 
hadoop.kerberos.keytab.login.autorenewal.enabled, added
+            // in HADOOP-9567). This task will make sure that the user stays 
logged in regardless of
+            // that configuration's value. Note that 
checkTGTAndReloginFromKeytab() is a no-op if
+            // the TGT does not need to be renewed yet.
+            long tgtRenewalPeriod = 
configuration.get(KERBEROS_RELOGIN_PERIOD).toMillis();
+            tgtRenewalFuture =
+                    scheduledExecutor.scheduleAtFixedRate(
+                            () ->
+                                    executorService.execute(
+                                            () -> {
+                                                try {
+                                                    LOG.debug("Renewing TGT");
+                                                    
currentUser.checkTGTAndReloginFromKeytab();
+                                                    LOG.debug("TGT renewed 
successfully");
+                                                } catch (Exception e) {
+                                                    LOG.error("Error while 
renewing TGT", e);
+                                                }
+                                            }),
+                            tgtRenewalPeriod,
+                            tgtRenewalPeriod,
+                            TimeUnit.MILLISECONDS);
+            LOG.debug("Credential renewal task started and reoccur in {} ms", 
tgtRenewalPeriod);

Review Comment:
   There is some inconsistency in the log messages, there is a mix of 
references to TGT and credentials, these should be unified. Also it would be 
great to add a reference to the class javadoc about what the TGT really stands 
for.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to