This is an automated email from the ASF dual-hosted git repository.

mbalassi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 21a1bf53ad5a5ed1842341624dcfe36c3a385f45
Author: Gabor Somogyi <[email protected]>
AuthorDate: Wed Dec 14 13:48:10 2022 +0100

    [FLINK-30421][runtime][security] Move TGT renewal to hadoop module
---
 .../runtime/security/SecurityConfiguration.java    |  9 ++++
 .../runtime/security/modules/HadoopModule.java     | 54 +++++++++++++++++++++-
 .../hadoop/KerberosDelegationTokenManager.java     | 53 +--------------------
 .../runtime/security/modules/HadoopModuleTest.java | 54 ++++++++++++++++++++++
 .../KerberosDelegationTokenManagerITCase.java      | 32 +------------
 5 files changed, 118 insertions(+), 84 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityConfiguration.java
index 4db3e3a6139..ac032b02862 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityConfiguration.java
@@ -25,10 +25,12 @@ import org.apache.flink.configuration.SecurityOptions;
 import org.apache.commons.lang3.StringUtils;
 
 import java.io.File;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
+import static 
org.apache.flink.configuration.SecurityOptions.KERBEROS_RELOGIN_PERIOD;
 import static 
org.apache.flink.configuration.SecurityOptions.SECURITY_CONTEXT_FACTORY_CLASSES;
 import static 
org.apache.flink.configuration.SecurityOptions.SECURITY_MODULE_FACTORY_CLASSES;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -53,6 +55,8 @@ public class SecurityConfiguration {
 
     private final boolean useTicketCache;
 
+    private final Duration tgtRenewalPeriod;
+
     private final String keytab;
 
     private final String principal;
@@ -90,6 +94,7 @@ public class SecurityConfiguration {
         this.keytab = 
flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
         this.principal = 
flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
         this.useTicketCache = 
flinkConf.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);
+        this.tgtRenewalPeriod = flinkConf.get(KERBEROS_RELOGIN_PERIOD);
         this.loginContextNames =
                 
parseList(flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_CONTEXTS));
         this.zkServiceName = 
flinkConf.getString(SecurityOptions.ZOOKEEPER_SASL_SERVICE_NAME);
@@ -116,6 +121,10 @@ public class SecurityConfiguration {
         return useTicketCache;
     }
 
+    public Duration getTgtRenewalPeriod() {
+        return tgtRenewalPeriod;
+    }
+
     public Configuration getFlinkConfig() {
         return flinkConfig;
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
index 2ebcdb6f1bf..9f0bba6ea90 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.hadoop.HadoopUserUtils;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.token.hadoop.KerberosLoginProvider;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
@@ -29,7 +30,12 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.File;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -42,6 +48,8 @@ public class HadoopModule implements SecurityModule {
 
     private final Configuration hadoopConfiguration;
 
+    @Nullable private ScheduledExecutorService tgtRenewalExecutorService;
+
     public HadoopModule(
             SecurityConfiguration securityConfiguration, Configuration 
hadoopConfiguration) {
         this.securityConfig = checkNotNull(securityConfiguration);
@@ -75,6 +83,10 @@ public class HadoopModule implements SecurityModule {
                                         new File(fileLocation), 
hadoopConfiguration);
                         loginUser.addCredentials(credentials);
                     }
+                    tgtRenewalExecutorService =
+                            Executors.newSingleThreadScheduledExecutor(
+                                    new 
ExecutorThreadFactory("TGTRenewalExecutorService"));
+                    startTGTRenewal(tgtRenewalExecutorService, loginUser);
                 }
             } else {
                 loginUser = UserGroupInformation.getLoginUser();
@@ -95,8 +107,48 @@ public class HadoopModule implements SecurityModule {
         }
     }
 
+    @VisibleForTesting
+    void startTGTRenewal(
+            ScheduledExecutorService tgtRenewalExecutorService, 
UserGroupInformation loginUser) {
+        LOG.info("Starting TGT renewal task");
+
+        long tgtRenewalPeriod = 
securityConfig.getTgtRenewalPeriod().toMillis();
+        tgtRenewalExecutorService.scheduleAtFixedRate(
+                () -> {
+                    // In Hadoop 2.x, renewal of the keytab-based login seems 
to be automatic, but
+                    // in Hadoop
+                    // 3.x, it is configurable (see
+                    // hadoop.kerberos.keytab.login.autorenewal.enabled, added
+                    // in HADOOP-9567). This task will make sure that the user 
stays logged in
+                    // regardless of
+                    // that configuration's value. Note that 
checkTGTAndReloginFromKeytab() is a
+                    // no-op if
+                    // the TGT does not need to be renewed yet.
+                    try {
+                        LOG.debug("Renewing TGT");
+                        loginUser.checkTGTAndReloginFromKeytab();
+                        LOG.debug("TGT renewed successfully");
+                    } catch (Exception e) {
+                        LOG.warn("Error while renewing TGT", e);
+                    }
+                },
+                tgtRenewalPeriod,
+                tgtRenewalPeriod,
+                TimeUnit.MILLISECONDS);
+
+        LOG.info("TGT renewal task started and reoccur in {} ms", 
tgtRenewalPeriod);
+    }
+
+    @VisibleForTesting
+    void stopTGTRenewal() {
+        if (tgtRenewalExecutorService != null) {
+            tgtRenewalExecutorService.shutdown();
+            tgtRenewalExecutorService = null;
+        }
+    }
+
     @Override
     public void uninstall() {
-        throw new UnsupportedOperationException();
+        stopTGTRenewal();
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManager.java
index ff4c95ca4af..60b754bd450 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManager.java
@@ -27,14 +27,12 @@ import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.concurrent.ScheduledExecutor;
 
 import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 
-import java.io.IOException;
 import java.time.Clock;
 import java.util.HashMap;
 import java.util.Map;
@@ -45,7 +43,6 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 
-import static 
org.apache.flink.configuration.SecurityOptions.KERBEROS_RELOGIN_PERIOD;
 import static 
org.apache.flink.configuration.SecurityOptions.KERBEROS_TOKENS_RENEWAL_RETRY_BACKOFF;
 import static 
org.apache.flink.configuration.SecurityOptions.KERBEROS_TOKENS_RENEWAL_TIME_RATIO;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -78,8 +75,6 @@ public class KerberosDelegationTokenManager implements 
DelegationTokenManager {
 
     @Nullable private final ExecutorService ioExecutor;
 
-    @Nullable private ScheduledFuture<?> tgtRenewalFuture;
-
     private final Object tokensUpdateFutureLock = new Object();
 
     @GuardedBy("tokensUpdateFutureLock")
@@ -207,9 +202,7 @@ public class KerberosDelegationTokenManager implements 
DelegationTokenManager {
         this.delegationTokenListener =
                 checkNotNull(delegationTokenListener, "Delegation token 
listener must not be null");
         synchronized (tokensUpdateFutureLock) {
-            checkState(
-                    tgtRenewalFuture == null && tokensUpdateFuture == null,
-                    "Manager is already started");
+            checkState(tokensUpdateFuture == null, "Manager is already 
started");
         }
 
         if (!kerberosLoginProvider.isLoginPossible()) {
@@ -217,52 +210,9 @@ public class KerberosDelegationTokenManager implements 
DelegationTokenManager {
             return;
         }
 
-        startTGTRenewal();
         startTokensUpdate();
     }
 
-    @VisibleForTesting
-    void startTGTRenewal() throws IOException {
-        LOG.info("Starting TGT renewal task");
-
-        UserGroupInformation currentUser = 
UserGroupInformation.getCurrentUser();
-        if (currentUser.isFromKeytab()) {
-            // In Hadoop 2.x, renewal of the keytab-based login seems to be 
automatic, but in Hadoop
-            // 3.x, it is configurable (see 
hadoop.kerberos.keytab.login.autorenewal.enabled, added
-            // in HADOOP-9567). This task will make sure that the user stays 
logged in regardless of
-            // that configuration's value. Note that 
checkTGTAndReloginFromKeytab() is a no-op if
-            // the TGT does not need to be renewed yet.
-            long tgtRenewalPeriod = 
configuration.get(KERBEROS_RELOGIN_PERIOD).toMillis();
-            tgtRenewalFuture =
-                    scheduledExecutor.scheduleAtFixedRate(
-                            () ->
-                                    ioExecutor.execute(
-                                            () -> {
-                                                try {
-                                                    LOG.debug("Renewing TGT");
-                                                    
currentUser.checkTGTAndReloginFromKeytab();
-                                                    LOG.debug("TGT renewed 
successfully");
-                                                } catch (Exception e) {
-                                                    LOG.warn("Error while 
renewing TGT", e);
-                                                }
-                                            }),
-                            0,
-                            tgtRenewalPeriod,
-                            TimeUnit.MILLISECONDS);
-            LOG.info("TGT renewal task started and reoccur in {} ms", 
tgtRenewalPeriod);
-        } else {
-            LOG.info("TGT renewal task not started");
-        }
-    }
-
-    @VisibleForTesting
-    void stopTGTRenewal() {
-        if (tgtRenewalFuture != null) {
-            tgtRenewalFuture.cancel(true);
-            tgtRenewalFuture = null;
-        }
-    }
-
     @VisibleForTesting
     void startTokensUpdate() {
         try {
@@ -345,7 +295,6 @@ public class KerberosDelegationTokenManager implements 
DelegationTokenManager {
         LOG.info("Stopping credential renewal");
 
         stopTokensUpdate();
-        stopTGTRenewal();
 
         LOG.info("Stopped credential renewal");
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/modules/HadoopModuleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/modules/HadoopModuleTest.java
new file mode 100644
index 00000000000..959da264496
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/modules/HadoopModuleTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.modules;
+
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
+import org.apache.flink.runtime.security.SecurityConfiguration;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/** Test for {@link HadoopModule}. */
+class HadoopModuleTest {
+    @Test
+    public void startTGTRenewalShouldScheduleRenewalWithKeytab() throws 
IOException {
+        final ManuallyTriggeredScheduledExecutorService executorService =
+                new ManuallyTriggeredScheduledExecutorService();
+        UserGroupInformation userGroupInformation = 
mock(UserGroupInformation.class);
+
+        Configuration flinkConf = new Configuration();
+        SecurityConfiguration securityConf = new 
SecurityConfiguration(flinkConf);
+        org.apache.hadoop.conf.Configuration hadoopConf =
+                new org.apache.hadoop.conf.Configuration();
+        HadoopModule hadoopModule = new HadoopModule(securityConf, hadoopConf);
+
+        hadoopModule.startTGTRenewal(executorService, userGroupInformation);
+        executorService.triggerPeriodicScheduledTasks();
+        hadoopModule.stopTGTRenewal();
+
+        verify(userGroupInformation, times(1)).checkTGTAndReloginFromKeytab();
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManagerITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManagerITCase.java
index f40baab5c38..fd329dce3e7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManagerITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManagerITCase.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.jupiter.api.Test;
 import org.mockito.MockedStatic;
 
-import java.io.IOException;
 import java.time.Clock;
 import java.time.ZoneId;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -40,9 +39,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockStatic;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 /**
  * Test for {@link DelegationTokenManager}.
@@ -110,33 +106,7 @@ public class KerberosDelegationTokenManagerITCase {
     }
 
     @Test
-    public void startTGTRenewalShouldScheduleRenewal() throws IOException {
-        final ManuallyTriggeredScheduledExecutor scheduledExecutor =
-                new ManuallyTriggeredScheduledExecutor();
-        final ManuallyTriggeredScheduledExecutorService scheduler =
-                new ManuallyTriggeredScheduledExecutorService();
-        try (MockedStatic<UserGroupInformation> ugi = 
mockStatic(UserGroupInformation.class)) {
-            UserGroupInformation userGroupInformation = 
mock(UserGroupInformation.class);
-            when(userGroupInformation.isFromKeytab()).thenReturn(true);
-            
ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
-
-            ExceptionThrowingHadoopDelegationTokenProvider.reset();
-            Configuration configuration = new Configuration();
-            
configuration.setBoolean("security.kerberos.token.provider.throw.enabled", 
false);
-            KerberosDelegationTokenManager delegationTokenManager =
-                    new KerberosDelegationTokenManager(configuration, 
scheduledExecutor, scheduler);
-
-            delegationTokenManager.startTGTRenewal();
-            scheduledExecutor.triggerPeriodicScheduledTasks();
-            scheduler.triggerAll();
-            delegationTokenManager.stopTGTRenewal();
-
-            verify(userGroupInformation, 
times(1)).checkTGTAndReloginFromKeytab();
-        }
-    }
-
-    @Test
-    public void startTokensUpdateShouldScheduleRenewal() throws IOException {
+    public void startTokensUpdateShouldScheduleRenewal() {
         final ManuallyTriggeredScheduledExecutor scheduledExecutor =
                 new ManuallyTriggeredScheduledExecutor();
         final ManuallyTriggeredScheduledExecutorService scheduler =

Reply via email to