This is an automated email from the ASF dual-hosted git repository. mbalassi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 21a1bf53ad5a5ed1842341624dcfe36c3a385f45 Author: Gabor Somogyi <[email protected]> AuthorDate: Wed Dec 14 13:48:10 2022 +0100 [FLINK-30421][runtime][security] Move TGT renewal to hadoop module --- .../runtime/security/SecurityConfiguration.java | 9 ++++ .../runtime/security/modules/HadoopModule.java | 54 +++++++++++++++++++++- .../hadoop/KerberosDelegationTokenManager.java | 53 +-------------------- .../runtime/security/modules/HadoopModuleTest.java | 54 ++++++++++++++++++++++ .../KerberosDelegationTokenManagerITCase.java | 32 +------------ 5 files changed, 118 insertions(+), 84 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityConfiguration.java index 4db3e3a6139..ac032b02862 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityConfiguration.java @@ -25,10 +25,12 @@ import org.apache.flink.configuration.SecurityOptions; import org.apache.commons.lang3.StringUtils; import java.io.File; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.List; +import static org.apache.flink.configuration.SecurityOptions.KERBEROS_RELOGIN_PERIOD; import static org.apache.flink.configuration.SecurityOptions.SECURITY_CONTEXT_FACTORY_CLASSES; import static org.apache.flink.configuration.SecurityOptions.SECURITY_MODULE_FACTORY_CLASSES; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -53,6 +55,8 @@ public class SecurityConfiguration { private final boolean useTicketCache; + private final Duration tgtRenewalPeriod; + private final String keytab; private final String principal; @@ -90,6 +94,7 @@ public class SecurityConfiguration { this.keytab = flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB); this.principal = flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL); this.useTicketCache = flinkConf.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE); + this.tgtRenewalPeriod = flinkConf.get(KERBEROS_RELOGIN_PERIOD); this.loginContextNames = parseList(flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_CONTEXTS)); this.zkServiceName = flinkConf.getString(SecurityOptions.ZOOKEEPER_SASL_SERVICE_NAME); @@ -116,6 +121,10 @@ public class SecurityConfiguration { return useTicketCache; } + public Duration getTgtRenewalPeriod() { + return tgtRenewalPeriod; + } + public Configuration getFlinkConfig() { return flinkConfig; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java index 2ebcdb6f1bf..9f0bba6ea90 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.hadoop.HadoopUserUtils; import org.apache.flink.runtime.security.SecurityConfiguration; import org.apache.flink.runtime.security.token.hadoop.KerberosLoginProvider; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; @@ -29,7 +30,12 @@ import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.io.File; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -42,6 +48,8 @@ public class HadoopModule implements SecurityModule { private final Configuration hadoopConfiguration; + @Nullable private ScheduledExecutorService tgtRenewalExecutorService; + public HadoopModule( SecurityConfiguration securityConfiguration, Configuration hadoopConfiguration) { this.securityConfig = checkNotNull(securityConfiguration); @@ -75,6 +83,10 @@ public class HadoopModule implements SecurityModule { new File(fileLocation), hadoopConfiguration); loginUser.addCredentials(credentials); } + tgtRenewalExecutorService = + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("TGTRenewalExecutorService")); + startTGTRenewal(tgtRenewalExecutorService, loginUser); } } else { loginUser = UserGroupInformation.getLoginUser(); @@ -95,8 +107,48 @@ public class HadoopModule implements SecurityModule { } } + @VisibleForTesting + void startTGTRenewal( + ScheduledExecutorService tgtRenewalExecutorService, UserGroupInformation loginUser) { + LOG.info("Starting TGT renewal task"); + + long tgtRenewalPeriod = securityConfig.getTgtRenewalPeriod().toMillis(); + tgtRenewalExecutorService.scheduleAtFixedRate( + () -> { + // In Hadoop 2.x, renewal of the keytab-based login seems to be automatic, but + // in Hadoop + // 3.x, it is configurable (see + // hadoop.kerberos.keytab.login.autorenewal.enabled, added + // in HADOOP-9567). This task will make sure that the user stays logged in + // regardless of + // that configuration's value. Note that checkTGTAndReloginFromKeytab() is a + // no-op if + // the TGT does not need to be renewed yet. + try { + LOG.debug("Renewing TGT"); + loginUser.checkTGTAndReloginFromKeytab(); + LOG.debug("TGT renewed successfully"); + } catch (Exception e) { + LOG.warn("Error while renewing TGT", e); + } + }, + tgtRenewalPeriod, + tgtRenewalPeriod, + TimeUnit.MILLISECONDS); + + LOG.info("TGT renewal task started and reoccur in {} ms", tgtRenewalPeriod); + } + + @VisibleForTesting + void stopTGTRenewal() { + if (tgtRenewalExecutorService != null) { + tgtRenewalExecutorService.shutdown(); + tgtRenewalExecutorService = null; + } + } + @Override public void uninstall() { - throw new UnsupportedOperationException(); + stopTGTRenewal(); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManager.java index ff4c95ca4af..60b754bd450 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManager.java @@ -27,14 +27,12 @@ import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.concurrent.ScheduledExecutor; import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; -import java.io.IOException; import java.time.Clock; import java.util.HashMap; import java.util.Map; @@ -45,7 +43,6 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; -import static org.apache.flink.configuration.SecurityOptions.KERBEROS_RELOGIN_PERIOD; import static org.apache.flink.configuration.SecurityOptions.KERBEROS_TOKENS_RENEWAL_RETRY_BACKOFF; import static org.apache.flink.configuration.SecurityOptions.KERBEROS_TOKENS_RENEWAL_TIME_RATIO; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -78,8 +75,6 @@ public class KerberosDelegationTokenManager implements DelegationTokenManager { @Nullable private final ExecutorService ioExecutor; - @Nullable private ScheduledFuture<?> tgtRenewalFuture; - private final Object tokensUpdateFutureLock = new Object(); @GuardedBy("tokensUpdateFutureLock") @@ -207,9 +202,7 @@ public class KerberosDelegationTokenManager implements DelegationTokenManager { this.delegationTokenListener = checkNotNull(delegationTokenListener, "Delegation token listener must not be null"); synchronized (tokensUpdateFutureLock) { - checkState( - tgtRenewalFuture == null && tokensUpdateFuture == null, - "Manager is already started"); + checkState(tokensUpdateFuture == null, "Manager is already started"); } if (!kerberosLoginProvider.isLoginPossible()) { @@ -217,52 +210,9 @@ public class KerberosDelegationTokenManager implements DelegationTokenManager { return; } - startTGTRenewal(); startTokensUpdate(); } - @VisibleForTesting - void startTGTRenewal() throws IOException { - LOG.info("Starting TGT renewal task"); - - UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); - if (currentUser.isFromKeytab()) { - // In Hadoop 2.x, renewal of the keytab-based login seems to be automatic, but in Hadoop - // 3.x, it is configurable (see hadoop.kerberos.keytab.login.autorenewal.enabled, added - // in HADOOP-9567). This task will make sure that the user stays logged in regardless of - // that configuration's value. Note that checkTGTAndReloginFromKeytab() is a no-op if - // the TGT does not need to be renewed yet. - long tgtRenewalPeriod = configuration.get(KERBEROS_RELOGIN_PERIOD).toMillis(); - tgtRenewalFuture = - scheduledExecutor.scheduleAtFixedRate( - () -> - ioExecutor.execute( - () -> { - try { - LOG.debug("Renewing TGT"); - currentUser.checkTGTAndReloginFromKeytab(); - LOG.debug("TGT renewed successfully"); - } catch (Exception e) { - LOG.warn("Error while renewing TGT", e); - } - }), - 0, - tgtRenewalPeriod, - TimeUnit.MILLISECONDS); - LOG.info("TGT renewal task started and reoccur in {} ms", tgtRenewalPeriod); - } else { - LOG.info("TGT renewal task not started"); - } - } - - @VisibleForTesting - void stopTGTRenewal() { - if (tgtRenewalFuture != null) { - tgtRenewalFuture.cancel(true); - tgtRenewalFuture = null; - } - } - @VisibleForTesting void startTokensUpdate() { try { @@ -345,7 +295,6 @@ public class KerberosDelegationTokenManager implements DelegationTokenManager { LOG.info("Stopping credential renewal"); stopTokensUpdate(); - stopTGTRenewal(); LOG.info("Stopped credential renewal"); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/modules/HadoopModuleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/modules/HadoopModuleTest.java new file mode 100644 index 00000000000..959da264496 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/modules/HadoopModuleTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security.modules; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService; +import org.apache.flink.runtime.security.SecurityConfiguration; + +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.jupiter.api.Test; + +import java.io.IOException; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** Test for {@link HadoopModule}. */ +class HadoopModuleTest { + @Test + public void startTGTRenewalShouldScheduleRenewalWithKeytab() throws IOException { + final ManuallyTriggeredScheduledExecutorService executorService = + new ManuallyTriggeredScheduledExecutorService(); + UserGroupInformation userGroupInformation = mock(UserGroupInformation.class); + + Configuration flinkConf = new Configuration(); + SecurityConfiguration securityConf = new SecurityConfiguration(flinkConf); + org.apache.hadoop.conf.Configuration hadoopConf = + new org.apache.hadoop.conf.Configuration(); + HadoopModule hadoopModule = new HadoopModule(securityConf, hadoopConf); + + hadoopModule.startTGTRenewal(executorService, userGroupInformation); + executorService.triggerPeriodicScheduledTasks(); + hadoopModule.stopTGTRenewal(); + + verify(userGroupInformation, times(1)).checkTGTAndReloginFromKeytab(); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManagerITCase.java index f40baab5c38..fd329dce3e7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManagerITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManagerITCase.java @@ -27,7 +27,6 @@ import org.apache.hadoop.security.UserGroupInformation; import org.junit.jupiter.api.Test; import org.mockito.MockedStatic; -import java.io.IOException; import java.time.Clock; import java.time.ZoneId; import java.util.concurrent.atomic.AtomicInteger; @@ -40,9 +39,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; /** * Test for {@link DelegationTokenManager}. @@ -110,33 +106,7 @@ public class KerberosDelegationTokenManagerITCase { } @Test - public void startTGTRenewalShouldScheduleRenewal() throws IOException { - final ManuallyTriggeredScheduledExecutor scheduledExecutor = - new ManuallyTriggeredScheduledExecutor(); - final ManuallyTriggeredScheduledExecutorService scheduler = - new ManuallyTriggeredScheduledExecutorService(); - try (MockedStatic<UserGroupInformation> ugi = mockStatic(UserGroupInformation.class)) { - UserGroupInformation userGroupInformation = mock(UserGroupInformation.class); - when(userGroupInformation.isFromKeytab()).thenReturn(true); - ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation); - - ExceptionThrowingHadoopDelegationTokenProvider.reset(); - Configuration configuration = new Configuration(); - configuration.setBoolean("security.kerberos.token.provider.throw.enabled", false); - KerberosDelegationTokenManager delegationTokenManager = - new KerberosDelegationTokenManager(configuration, scheduledExecutor, scheduler); - - delegationTokenManager.startTGTRenewal(); - scheduledExecutor.triggerPeriodicScheduledTasks(); - scheduler.triggerAll(); - delegationTokenManager.stopTGTRenewal(); - - verify(userGroupInformation, times(1)).checkTGTAndReloginFromKeytab(); - } - } - - @Test - public void startTokensUpdateShouldScheduleRenewal() throws IOException { + public void startTokensUpdateShouldScheduleRenewal() { final ManuallyTriggeredScheduledExecutor scheduledExecutor = new ManuallyTriggeredScheduledExecutor(); final ManuallyTriggeredScheduledExecutorService scheduler =
