gaborgsomogyi commented on code in PR #21511:
URL: https://github.com/apache/flink/pull/21511#discussion_r1049390777
##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java:
##########
@@ -45,91 +42,86 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
-import static
org.apache.flink.configuration.SecurityOptions.KERBEROS_RELOGIN_PERIOD;
-import static
org.apache.flink.configuration.SecurityOptions.KERBEROS_TOKENS_RENEWAL_RETRY_BACKOFF;
-import static
org.apache.flink.configuration.SecurityOptions.KERBEROS_TOKENS_RENEWAL_TIME_RATIO;
+import static
org.apache.flink.configuration.SecurityOptions.DELEGATION_TOKENS_RENEWAL_RETRY_BACKOFF;
+import static
org.apache.flink.configuration.SecurityOptions.DELEGATION_TOKENS_RENEWAL_TIME_RATIO;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
/**
* Manager for delegation tokens in a Flink cluster.
*
* <p>When delegation token renewal is enabled, this manager will make sure
long-running apps can
- * run without interruption while accessing secured services. It periodically
logs in to the KDC
- * with user-provided credentials, and contacts all the configured secure
services to obtain
- * delegation tokens to be distributed to the rest of the application.
+ * run without interruption while accessing secured services. It periodically
contacts all the
+ * configured secure services to obtain delegation tokens to be distributed to
the rest of the
+ * application.
*/
@Internal
-public class KerberosDelegationTokenManager implements DelegationTokenManager {
+public class DefaultDelegationTokenManager implements DelegationTokenManager {
- private static final Logger LOG =
LoggerFactory.getLogger(KerberosDelegationTokenManager.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(DefaultDelegationTokenManager.class);
private final Configuration configuration;
private final double tokensRenewalTimeRatio;
private final long renewalRetryBackoffPeriod;
- private final KerberosLoginProvider kerberosLoginProvider;
-
- @VisibleForTesting final Map<String, HadoopDelegationTokenProvider>
delegationTokenProviders;
+ @VisibleForTesting final Map<String, DelegationTokenProvider>
delegationTokenProviders;
@Nullable private final ScheduledExecutor scheduledExecutor;
@Nullable private final ExecutorService ioExecutor;
- @Nullable private ScheduledFuture<?> tgtRenewalFuture;
-
private final Object tokensUpdateFutureLock = new Object();
@GuardedBy("tokensUpdateFutureLock")
@Nullable
private ScheduledFuture<?> tokensUpdateFuture;
- @Nullable private DelegationTokenListener delegationTokenListener;
+ @Nullable private Listener listener;
- public KerberosDelegationTokenManager(
+ public DefaultDelegationTokenManager(
Configuration configuration,
@Nullable ScheduledExecutor scheduledExecutor,
@Nullable ExecutorService ioExecutor) {
this.configuration = checkNotNull(configuration, "Flink configuration
must not be null");
- this.tokensRenewalTimeRatio =
configuration.get(KERBEROS_TOKENS_RENEWAL_TIME_RATIO);
+ this.tokensRenewalTimeRatio =
configuration.get(DELEGATION_TOKENS_RENEWAL_TIME_RATIO);
this.renewalRetryBackoffPeriod =
-
configuration.get(KERBEROS_TOKENS_RENEWAL_RETRY_BACKOFF).toMillis();
- this.kerberosLoginProvider = new KerberosLoginProvider(configuration);
+
configuration.get(DELEGATION_TOKENS_RENEWAL_RETRY_BACKOFF).toMillis();
this.delegationTokenProviders = loadProviders();
this.scheduledExecutor = scheduledExecutor;
this.ioExecutor = ioExecutor;
}
- private Map<String, HadoopDelegationTokenProvider> loadProviders() {
+ private Map<String, DelegationTokenProvider> loadProviders() {
LOG.info("Loading delegation token providers");
- ServiceLoader<HadoopDelegationTokenProvider> serviceLoader =
- ServiceLoader.load(HadoopDelegationTokenProvider.class);
+ ServiceLoader<DelegationTokenProvider> serviceLoader =
+ ServiceLoader.load(DelegationTokenProvider.class);
- Map<String, HadoopDelegationTokenProvider> providers = new HashMap<>();
- for (HadoopDelegationTokenProvider provider : serviceLoader) {
+ Map<String, DelegationTokenProvider> providers = new HashMap<>();
+ for (DelegationTokenProvider provider : serviceLoader) {
try {
if (isProviderEnabled(provider.serviceName())) {
provider.init(configuration);
LOG.info(
"Delegation token provider {} loaded and
initialized",
provider.serviceName());
+ checkState(
+ !providers.containsKey(provider.serviceName()),
+ "Delegation token provider with service name {}
has multiple implementations",
+ provider.serviceName());
providers.put(provider.serviceName(), provider);
} else {
LOG.info(
"Delegation token provider {} is disabled so not
loaded",
provider.serviceName());
}
} catch (Exception | NoClassDefFoundError e) {
- LOG.error(
+ LOG.warn(
Review Comment:
The actual 2 providers are so in a bad shape (inherited from the old code)
that I see this the best as TMP solution. My plan is to extract HDFS and HBase
token providers to an external repo. When that happens we must put back fail
fast here. Feel free to suggest something if you have in mind which is not
horror complex.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]