dmvk commented on code in PR #19372:
URL: https://github.com/apache/flink/pull/19372#discussion_r849445045
##########
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java:
##########
@@ -424,12 +424,21 @@ public void start() throws Exception {
heartbeatServices =
HeartbeatServices.fromConfiguration(configuration);
- delegationTokenManager =
-
configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN)
- &&
HadoopDependency.isHadoopCommonOnClasspath(
- getClass().getClassLoader())
- ? new
KerberosDelegationTokenManager(configuration)
- : new NoOpDelegationTokenManager();
+ if
(configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN)) {
+ if
(HadoopDependency.isHadoopCommonOnClasspath(getClass().getClassLoader())) {
+ delegationTokenManager =
+ new KerberosDelegationTokenManager(
+ configuration,
+
commonRpcService.getScheduledExecutor(),
+ ioExecutor);
+ } else {
+ LOG.info(
+ "Cannot use kerberos delegation token manager
because Hadoop cannot be found in the Classpath.");
+ delegationTokenManager = new
NoOpDelegationTokenManager();
+ }
+ } else {
+ delegationTokenManager = new NoOpDelegationTokenManager();
+ }
Review Comment:
Can you please hide this behind a static method factory (eg. in
`DelegationTokenManagerUtils`) so it can be reused by both `MiniCluster` and
`ClusterEntrypoint`?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -45,10 +56,28 @@ public class KerberosDelegationTokenManager implements
DelegationTokenManager {
private final Configuration configuration;
+ private final SecurityConfiguration securityConfiguration;
+
+ private final KerberosRenewalPossibleProvider
kerberosRenewalPossibleProvider;
+
@VisibleForTesting final Map<String, DelegationTokenProvider>
delegationTokenProviders;
- public KerberosDelegationTokenManager(Configuration configuration) {
+ private final ScheduledExecutor scheduledExecutor;
+
+ private final ExecutorService executorService;
+
+ private ScheduledFuture<?> tgtRenewalFuture;
Review Comment:
```suggestion
@Nullable
private ScheduledFuture<?> tgtRenewalFuture;
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -110,13 +139,62 @@ public void obtainDelegationTokens(Credentials
credentials) {
* task managers.
*/
@Override
- public void start() {
- LOG.info("Starting renewal task");
+ public void start() throws Exception {
+ checkNotNull(scheduledExecutor, "Scheduled executor must not be null");
+ checkNotNull(executorService, "Executor service must not be null");
+ checkState(tgtRenewalFuture == null, "Manager is already started");
+
+ if (!kerberosRenewalPossibleProvider.isRenewalPossible()) {
+ LOG.info("Renewal is NOT possible, skipping to start renewal
task");
+ return;
+ }
+
+ startTGTRenewal();
+ }
+
+ private void startTGTRenewal() throws IOException {
+ LOG.debug("Starting credential renewal task");
+
+ UserGroupInformation currentUser =
UserGroupInformation.getCurrentUser();
+ if (currentUser.isFromKeytab()) {
+ // In Hadoop 2.x, renewal of the keytab-based login seems to be
automatic, but in Hadoop
+ // 3.x, it is configurable (see
hadoop.kerberos.keytab.login.autorenewal.enabled, added
+ // in HADOOP-9567). This task will make sure that the user stays
logged in regardless of
+ // that configuration's value. Note that
checkTGTAndReloginFromKeytab() is a no-op if
+ // the TGT does not need to be renewed yet.
+ long tgtRenewalPeriod =
configuration.get(KERBEROS_RELOGIN_PERIOD).toMillis();
+ tgtRenewalFuture =
+ scheduledExecutor.scheduleAtFixedRate(
+ () ->
+ executorService.execute(
+ () -> {
+ try {
+ LOG.debug("Renewing TGT");
+
currentUser.checkTGTAndReloginFromKeytab();
+ LOG.debug("TGT renewed
successfully");
+ } catch (Exception e) {
+ LOG.error("Error while
renewing TGT", e);
Review Comment:
warn
##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -45,10 +56,28 @@ public class KerberosDelegationTokenManager implements
DelegationTokenManager {
private final Configuration configuration;
+ private final SecurityConfiguration securityConfiguration;
+
+ private final KerberosRenewalPossibleProvider
kerberosRenewalPossibleProvider;
+
@VisibleForTesting final Map<String, DelegationTokenProvider>
delegationTokenProviders;
- public KerberosDelegationTokenManager(Configuration configuration) {
+ private final ScheduledExecutor scheduledExecutor;
+
+ private final ExecutorService executorService;
+
+ private ScheduledFuture<?> tgtRenewalFuture;
+
+ public KerberosDelegationTokenManager(
+ Configuration configuration,
+ @Nullable ScheduledExecutor scheduledExecutor,
+ @Nullable ExecutorService executorService) {
Review Comment:
It feels off that this could be null just because of one usage of
`DelegationTokenManager#obtainDelegationTokens` in the
`YarnClusterDescriptor`. Could it be the case that this method doesn't really
belong into this interface?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -45,10 +56,28 @@ public class KerberosDelegationTokenManager implements
DelegationTokenManager {
private final Configuration configuration;
+ private final SecurityConfiguration securityConfiguration;
Review Comment:
this doesn't seem to need an instance variable
##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -45,10 +56,28 @@ public class KerberosDelegationTokenManager implements
DelegationTokenManager {
private final Configuration configuration;
+ private final SecurityConfiguration securityConfiguration;
+
+ private final KerberosRenewalPossibleProvider
kerberosRenewalPossibleProvider;
+
@VisibleForTesting final Map<String, DelegationTokenProvider>
delegationTokenProviders;
- public KerberosDelegationTokenManager(Configuration configuration) {
+ private final ScheduledExecutor scheduledExecutor;
+
+ private final ExecutorService executorService;
Review Comment:
`ioExecutor` would be more explicit as it's commonly used throughout the JM
components.
```suggestion
private final ExecutorService ioExecutor;
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -110,13 +139,62 @@ public void obtainDelegationTokens(Credentials
credentials) {
* task managers.
*/
@Override
- public void start() {
- LOG.info("Starting renewal task");
+ public void start() throws Exception {
+ checkNotNull(scheduledExecutor, "Scheduled executor must not be null");
+ checkNotNull(executorService, "Executor service must not be null");
+ checkState(tgtRenewalFuture == null, "Manager is already started");
+
+ if (!kerberosRenewalPossibleProvider.isRenewalPossible()) {
+ LOG.info("Renewal is NOT possible, skipping to start renewal
task");
+ return;
+ }
+
+ startTGTRenewal();
+ }
+
+ private void startTGTRenewal() throws IOException {
+ LOG.debug("Starting credential renewal task");
+
+ UserGroupInformation currentUser =
UserGroupInformation.getCurrentUser();
+ if (currentUser.isFromKeytab()) {
+ // In Hadoop 2.x, renewal of the keytab-based login seems to be
automatic, but in Hadoop
+ // 3.x, it is configurable (see
hadoop.kerberos.keytab.login.autorenewal.enabled, added
+ // in HADOOP-9567). This task will make sure that the user stays
logged in regardless of
+ // that configuration's value. Note that
checkTGTAndReloginFromKeytab() is a no-op if
+ // the TGT does not need to be renewed yet.
+ long tgtRenewalPeriod =
configuration.get(KERBEROS_RELOGIN_PERIOD).toMillis();
+ tgtRenewalFuture =
+ scheduledExecutor.scheduleAtFixedRate(
+ () ->
+ executorService.execute(
+ () -> {
+ try {
+ LOG.debug("Renewing TGT");
+
currentUser.checkTGTAndReloginFromKeytab();
Review Comment:
Can you please write a test case for the periodic renewal?
(`ManuallyTriggeredScheduledExecutorService` is usually pretty helpful for this)
The main reason I'm bringing this up that this still uses Hadoop's UGI
internals, which makes this mostly untestable. My feeling is that
`KerberosRenewalPossibleProvider` should become something that completely hides
the Hadoop internals from DTM (eg. `KerberosClient` with two implementations:
`HadoopKerberosClient` and `TestingKerberosClient` -> see eg.
`TestingDeclarativeSlotPool` for how the "testing" implementations in Flink are
usually created).
##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java:
##########
@@ -110,13 +139,62 @@ public void obtainDelegationTokens(Credentials
credentials) {
* task managers.
*/
@Override
- public void start() {
- LOG.info("Starting renewal task");
+ public void start() throws Exception {
+ checkNotNull(scheduledExecutor, "Scheduled executor must not be null");
+ checkNotNull(executorService, "Executor service must not be null");
+ checkState(tgtRenewalFuture == null, "Manager is already started");
+
+ if (!kerberosRenewalPossibleProvider.isRenewalPossible()) {
+ LOG.info("Renewal is NOT possible, skipping to start renewal
task");
+ return;
+ }
+
+ startTGTRenewal();
+ }
+
+ private void startTGTRenewal() throws IOException {
+ LOG.debug("Starting credential renewal task");
+
+ UserGroupInformation currentUser =
UserGroupInformation.getCurrentUser();
+ if (currentUser.isFromKeytab()) {
+ // In Hadoop 2.x, renewal of the keytab-based login seems to be
automatic, but in Hadoop
+ // 3.x, it is configurable (see
hadoop.kerberos.keytab.login.autorenewal.enabled, added
+ // in HADOOP-9567). This task will make sure that the user stays
logged in regardless of
+ // that configuration's value. Note that
checkTGTAndReloginFromKeytab() is a no-op if
+ // the TGT does not need to be renewed yet.
+ long tgtRenewalPeriod =
configuration.get(KERBEROS_RELOGIN_PERIOD).toMillis();
+ tgtRenewalFuture =
+ scheduledExecutor.scheduleAtFixedRate(
+ () ->
+ executorService.execute(
+ () -> {
+ try {
+ LOG.debug("Renewing TGT");
+
currentUser.checkTGTAndReloginFromKeytab();
+ LOG.debug("TGT renewed
successfully");
+ } catch (Exception e) {
+ LOG.error("Error while
renewing TGT", e);
+ }
+ }),
+ tgtRenewalPeriod,
+ tgtRenewalPeriod,
+ TimeUnit.MILLISECONDS);
+ LOG.debug("Credential renewal task started and reoccur in {} ms",
tgtRenewalPeriod);
Review Comment:
There is some inconsistency in the log messages, there is a mix of
references to TGT and credentials, these should be unified. Also it would be
great to add a reference to the class javadoc about what the TGT really stands
for.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]