dmvk commented on a change in pull request #18664: URL: https://github.com/apache/flink/pull/18664#discussion_r802784990
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/hadoop/HadoopDependency.java ########## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.hadoop; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Responsible telling if specific Hadoop dependencies are on classpath. */ +public class HadoopDependency { + + private static final Logger LOG = LoggerFactory.getLogger(HadoopDependency.class); + + public static boolean isHadoopCommonOnClasspath(ClassLoader classLoader) { Review comment: nit: javadoc I really like this, that class not found exception was confusing users for ages :) ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenManager.java ########## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security.token; + +import org.apache.flink.configuration.Configuration; + +import org.apache.hadoop.security.Credentials; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.ServiceLoader; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Manager for delegation tokens in a Flink cluster. + * + * <p>When delegation token renewal is enabled, this manager will make sure long-running apps can + * run without interruption while accessing secured services. It periodically logs in to the KDC + * with user-provided credentials, and contacts all the configured secure services to obtain + * delegation tokens to be distributed to the rest of the application. + */ +public class DelegationTokenManager { + + private static final Logger LOG = LoggerFactory.getLogger(DelegationTokenManager.class); + + private final Configuration configuration; + + final Map<String, DelegationTokenProvider> delegationTokenProviders; Review comment: ```suggestion @VisibleForTesting final Map<String, DelegationTokenProvider> delegationTokenProviders; ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenManager.java ########## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security.token; + +import org.apache.flink.configuration.Configuration; + +import org.apache.hadoop.security.Credentials; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.ServiceLoader; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Manager for delegation tokens in a Flink cluster. + * + * <p>When delegation token renewal is enabled, this manager will make sure long-running apps can + * run without interruption while accessing secured services. It periodically logs in to the KDC + * with user-provided credentials, and contacts all the configured secure services to obtain + * delegation tokens to be distributed to the rest of the application. + */ +public class DelegationTokenManager { + + private static final Logger LOG = LoggerFactory.getLogger(DelegationTokenManager.class); + + private final Configuration configuration; + + final Map<String, DelegationTokenProvider> delegationTokenProviders; + + public DelegationTokenManager(Configuration configuration) { + this.configuration = checkNotNull(configuration, "Flink configuration must not be null"); + this.delegationTokenProviders = loadProviders(); + } + + private Map<String, DelegationTokenProvider> loadProviders() { + LOG.info("Loading service providers"); + + ServiceLoader<DelegationTokenProvider> serviceLoader = + ServiceLoader.load(DelegationTokenProvider.class); + + Map<String, DelegationTokenProvider> providers = new HashMap<>(); + Iterator<DelegationTokenProvider> providerIterator = serviceLoader.iterator(); + providerIterator.forEachRemaining( Review comment: We should probably get less fancy here :) ```suggestion for (DelegationTokenProvider provider : serviceLoader) { ``` ########## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java ########## @@ -1245,6 +1258,21 @@ private ApplicationReport startAppMaster( return report; } + private void setTokensFor(ContainerLaunchContext containerLaunchContext) throws IOException { + LOG.info("Adding delegation tokens to the AM container."); + + Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials(); + + DelegationTokenManager delegationTokenManager = + new DelegationTokenManager(flinkConfiguration); + delegationTokenManager.obtainDelegationTokens(credentials); Review comment: nit: This makes me think whether DTM should be `Closeable` instead of having `stop()` method. I know that `start / stop` methods are intended for the renewal, but it feels like the `obtainDelegationTokens` will also need to create a resource for pulling the token out of KDC that needs be closed 🤔 ```suggestion try (final DelegationTokenManager delegationTokenManager = new DelegationTokenManager(flinkConfiguration) { delegationTokenManager.obtainDelegationTokens(credentials); } ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ########## @@ -205,6 +212,14 @@ public ResourceManager( this.ioExecutor = ioExecutor; this.startedFuture = new CompletableFuture<>(); + + checkNotNull(configuration, "Flink configuration must not be null"); + this.delegationTokenManager = + configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN) + && HadoopDependency.isHadoopCommonOnClasspath( + getClass().getClassLoader()) + ? Optional.of(new DelegationTokenManager(configuration)) + : Optional.empty(); Review comment: Other option instead of `NoOpDelegationTokenManager` would be, that the KerberosDTM is basically "no-op" when no providers are enabled. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenManager.java ########## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security.token; + +import org.apache.flink.configuration.Configuration; + +import org.apache.hadoop.security.Credentials; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.ServiceLoader; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Manager for delegation tokens in a Flink cluster. + * + * <p>When delegation token renewal is enabled, this manager will make sure long-running apps can + * run without interruption while accessing secured services. It periodically logs in to the KDC + * with user-provided credentials, and contacts all the configured secure services to obtain + * delegation tokens to be distributed to the rest of the application. + */ +public class DelegationTokenManager { Review comment: This should be an interface. The implementation should suggest it has something to do with Kerberos -> `KerverosDelegationTokenManager` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ########## @@ -285,6 +302,8 @@ private void stopResourceManagerServices() throws Exception { new ResourceManagerException("Error while shutting down resource manager", e); } + delegationTokenManager.ifPresent(DelegationTokenManager::stop); Review comment: do we need to handle exceptions here as with the other services? ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenProvider.java ########## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security.token; + +import org.apache.flink.configuration.Configuration; + +import org.apache.hadoop.security.Credentials; + +import java.util.Optional; + +/** + * An example implementation of {@link DelegationTokenProvider} which throws exception when enabled. + */ +public class ExceptionThrowingDelegationTokenProvider implements DelegationTokenProvider { + + public static volatile boolean enabled; + public static volatile boolean constructed; + + static { + enabled = false; + constructed = false; + } Review comment: ```suggestion public static volatile boolean enabled = false; public static volatile boolean constructed = false; ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenManager.java ########## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security.token; + +import org.apache.flink.configuration.Configuration; + +import org.apache.hadoop.security.Credentials; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.ServiceLoader; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Manager for delegation tokens in a Flink cluster. + * + * <p>When delegation token renewal is enabled, this manager will make sure long-running apps can + * run without interruption while accessing secured services. It periodically logs in to the KDC + * with user-provided credentials, and contacts all the configured secure services to obtain + * delegation tokens to be distributed to the rest of the application. + */ +public class DelegationTokenManager { + + private static final Logger LOG = LoggerFactory.getLogger(DelegationTokenManager.class); + + private final Configuration configuration; + + final Map<String, DelegationTokenProvider> delegationTokenProviders; + + public DelegationTokenManager(Configuration configuration) { + this.configuration = checkNotNull(configuration, "Flink configuration must not be null"); Review comment: nit: We're implicitly treating all fields as non null, unless they're annotated with `Nullable`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenManager.java ########## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security.token; + +import org.apache.flink.configuration.Configuration; + +import org.apache.hadoop.security.Credentials; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.ServiceLoader; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Manager for delegation tokens in a Flink cluster. + * + * <p>When delegation token renewal is enabled, this manager will make sure long-running apps can + * run without interruption while accessing secured services. It periodically logs in to the KDC + * with user-provided credentials, and contacts all the configured secure services to obtain + * delegation tokens to be distributed to the rest of the application. + */ +public class DelegationTokenManager { + + private static final Logger LOG = LoggerFactory.getLogger(DelegationTokenManager.class); + + private final Configuration configuration; + + final Map<String, DelegationTokenProvider> delegationTokenProviders; + + public DelegationTokenManager(Configuration configuration) { + this.configuration = checkNotNull(configuration, "Flink configuration must not be null"); + this.delegationTokenProviders = loadProviders(); + } + + private Map<String, DelegationTokenProvider> loadProviders() { + LOG.info("Loading service providers"); + + ServiceLoader<DelegationTokenProvider> serviceLoader = + ServiceLoader.load(DelegationTokenProvider.class); + + Map<String, DelegationTokenProvider> providers = new HashMap<>(); + Iterator<DelegationTokenProvider> providerIterator = serviceLoader.iterator(); + providerIterator.forEachRemaining( + provider -> { + try { + if (isProviderEnabled(provider.serviceName())) { + provider.init(configuration); + LOG.info( + "Delegation provider {} loaded and initialized", + provider.serviceName()); + providers.put(provider.serviceName(), provider); + } else { + LOG.info( + "Delegation provider {} is disabled so not loaded", + provider.serviceName()); + } + } catch (Exception e) { + LOG.error( + "Failed to initialize delegation provider {}, exception: {}", Review comment: ```suggestion "Failed to initialize delegation provider {}.", ``` I'm not really sure this would print a stacktrace. Anyway slf4j is designed to accept throwable as a last argument. ########## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java ########## @@ -1090,9 +1094,18 @@ private ApplicationReport startAppMaster( final ContainerLaunchContext amContainer = setupApplicationMasterContainer(yarnClusterEntrypoint, hasKrb5, processSpec); - // setup security tokens + /* + * Here we have the new and the old delegation token framework together temporarily. + * The reason is simple, adding a horror complicated several thousand lines PR is not a good + * idea so in the upcoming commits will remove the old DT framework and only the + * new will be in place. + */ Review comment: ```suggestion // Here we have the new and the old delegation token framework together temporarily. // The reason is simple, adding a horror complicated several thousand lines PR is not a good // idea so in the upcoming commits will remove the old DT framework and only the // new will be in place. ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenProvider.java ########## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security.token; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.Configuration; + +import org.apache.hadoop.security.Credentials; + +import java.util.Optional; + +/** + * Delegation token provider API. Instances of token providers are loaded by {@link + * DelegationTokenManager} through service loader. + */ +@PublicEvolving Review comment: ```suggestion @Experimental ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ########## @@ -205,6 +212,14 @@ public ResourceManager( this.ioExecutor = ioExecutor; this.startedFuture = new CompletableFuture<>(); + + checkNotNull(configuration, "Flink configuration must not be null"); + this.delegationTokenManager = + configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN) + && HadoopDependency.isHadoopCommonOnClasspath( + getClass().getClassLoader()) + ? Optional.of(new DelegationTokenManager(configuration)) + : Optional.empty(); Review comment: It doesn't feel right to construct the DTM here, it should be constructed in the `ClusterEntrypoint` as the rest of the the services. I know it's quite tricky to propagate it all the way down to the RM, but it allows easier testing. This approach basically doesn't give us any chance to override the implementation for tests. Also it would be really nice to turn DTM into an interface, so you don't have to deal with nullables -> you'd simply provide a `NoOpDelegationTokenManager` implementation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
