gaborgsomogyi commented on code in PR #21604:
URL: https://github.com/apache/flink/pull/21604#discussion_r1062460423


##########
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepository.java:
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.ServiceLoader;
+
+import static 
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager.isProviderEnabled;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Repository for delegation token receivers. */
+@Internal
+public class DelegationTokenReceiverRepository {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DelegationTokenReceiverRepository.class);
+
+    private final Configuration configuration;
+
+    @VisibleForTesting final Map<String, DelegationTokenReceiver> 
delegationTokenReceivers;
+
+    public DelegationTokenReceiverRepository(Configuration configuration) {
+        this.configuration = checkNotNull(configuration, "Flink configuration 
must not be null");
+        this.delegationTokenReceivers = loadReceivers();
+    }
+
+    private Map<String, DelegationTokenReceiver> loadReceivers() {
+        LOG.info("Loading delegation token receivers");
+
+        ServiceLoader<DelegationTokenReceiver> serviceLoader =
+                ServiceLoader.load(DelegationTokenReceiver.class);
+
+        Map<String, DelegationTokenReceiver> receivers = new HashMap<>();
+        for (DelegationTokenReceiver receiver : serviceLoader) {
+            try {
+                if (isProviderEnabled(configuration, receiver.serviceName())) {
+                    receiver.init(configuration);
+                    LOG.info(
+                            "Delegation token receiver {} loaded and 
initialized",
+                            receiver.serviceName());
+                    checkState(
+                            !receivers.containsKey(receiver.serviceName()),
+                            "Delegation token receiver with service name {} 
has multiple implementations",
+                            receiver.serviceName());
+                    receivers.put(receiver.serviceName(), receiver);
+                } else {
+                    LOG.info(
+                            "Delegation token receiver {} is disabled so not 
loaded",
+                            receiver.serviceName());
+                }
+            } catch (Exception | NoClassDefFoundError e) {
+                // The intentional general rule is that if a receiver's init 
method throws exception
+                // then stop the workload
+                LOG.error(
+                        "Failed to initialize delegation token receiver {}",
+                        receiver.serviceName(),
+                        e);
+                throw new RuntimeException(e);
+            }
+        }
+
+        LOG.info("Delegation token receivers loaded successfully");
+
+        return receivers;
+    }
+
+    @VisibleForTesting
+    boolean isReceiverLoaded(String serviceName) {
+        return delegationTokenReceivers.containsKey(serviceName);
+    }
+
+    /**
+     * Callback function when new delegation tokens obtained.
+     *
+     * @param containerBytes Serialized form of a DelegationTokenContainer. 
All the available tokens
+     *     will be forwarded to the appropriate {@link 
DelegationTokenReceiver} based on service
+     *     name.
+     */
+    public void onNewTokensObtained(byte[] containerBytes) throws Exception {
+        if (containerBytes == null || containerBytes.length == 0) {
+            throw new IllegalArgumentException("Illegal container tried to be 
processed");
+        }
+        DelegationTokenContainer container =
+                InstantiationUtil.deserializeObject(
+                        containerBytes, 
DelegationTokenContainer.class.getClassLoader());
+        onNewTokensObtained(container);
+    }
+
+    /**
+     * Callback function when new delegation tokens obtained.
+     *
+     * @param container Serialized form of delegation tokens stored in 
DelegationTokenContainer. All
+     *     the available tokens will be forwarded to the appropriate {@link 
DelegationTokenReceiver}
+     *     based on service name.
+     */
+    public void onNewTokensObtained(DelegationTokenContainer container) throws 
Exception {
+        LOG.info("New delegation tokens arrived, sending them to receivers");
+        for (Map.Entry<String, byte[]> entry : 
container.getTokens().entrySet()) {
+            String serviceName = entry.getKey();
+            byte[] tokens = entry.getValue();
+            if (!delegationTokenReceivers.containsKey(serviceName)) {
+                throw new IllegalStateException(
+                        "Tokens arrived for service but no receiver found for 
it: " + serviceName);
+            }
+            try {
+                
delegationTokenReceivers.get(serviceName).onNewTokensObtained(tokens);
+            } catch (Exception e) {
+                LOG.warn("Failed to send tokens to delegation token receiver 
{}", serviceName, e);

Review Comment:
   We handle receiver failures as temporary issue and we're going forward to 
increase resiliency. This way maybe not all of the tokens stored but still 
we're increasing the availability of the overall system.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to