This is an automated email from the ASF dual-hosted git repository.
mbalassi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 324b54cf413 [FLINK-30425][runtime][security] Generalize token receive
side
324b54cf413 is described below
commit 324b54cf413229f78e6e709aaaa2e16a03d45df1
Author: Gabor Somogyi <[email protected]>
AuthorDate: Mon Jan 2 15:30:45 2023 +0100
[FLINK-30425][runtime][security] Generalize token receive side
---
flink-end-to-end-tests/test-scripts/common.sh | 1 +
.../flink/runtime/minicluster/MiniCluster.java | 10 +-
.../token/DefaultDelegationTokenManager.java | 64 ++++++++--
.../security/token/DelegationTokenProvider.java | 13 +-
.../security/token/DelegationTokenReceiver.java | 61 +++++++++
.../token/DelegationTokenReceiverRepository.java | 140 +++++++++++++++++++++
.../token/hadoop/HBaseDelegationTokenProvider.java | 27 +++-
.../token/hadoop/HBaseDelegationTokenReceiver.java | 35 ++++++
...ter.java => HadoopDelegationTokenReceiver.java} | 39 +++---
.../hadoop/HadoopFSDelegationTokenProvider.java | 31 ++++-
.../hadoop/HadoopFSDelegationTokenReceiver.java | 31 +++++
.../flink/runtime/taskexecutor/TaskExecutor.java | 12 +-
.../runtime/taskexecutor/TaskManagerRunner.java | 22 +++-
....runtime.security.token.DelegationTokenReceiver | 17 +++
.../token/DefaultDelegationTokenManagerTest.java | 96 ++++++++++++--
.../DelegationTokenReceiverRepositoryTest.java | 75 +++++++++++
.../ExceptionThrowingDelegationTokenReceiver.java | 60 +++++++++
.../token/TestDelegationTokenReceiver.java | 36 ++++++
...va => HadoopDelegationTokenReceiverITCase.java} | 55 ++++----
.../runtime/taskexecutor/TaskExecutorBuilder.java | 7 +-
...cutorExecutionDeploymentReconciliationTest.java | 4 +-
.../TaskExecutorPartitionLifecycleTest.java | 4 +-
.../taskexecutor/TaskExecutorSlotLifetimeTest.java | 4 +-
.../runtime/taskexecutor/TaskExecutorTest.java | 7 +-
.../taskexecutor/TaskManagerRunnerStartupTest.java | 4 +-
.../taskexecutor/TaskManagerRunnerTest.java | 7 +-
.../TaskSubmissionTestEnvironment.java | 4 +-
.../runtime/taskexecutor/TestingTaskExecutor.java | 7 +-
....runtime.security.token.DelegationTokenReceiver | 17 +++
.../apache/flink/yarn/YarnClusterDescriptor.java | 7 +-
30 files changed, 792 insertions(+), 105 deletions(-)
diff --git a/flink-end-to-end-tests/test-scripts/common.sh
b/flink-end-to-end-tests/test-scripts/common.sh
index 31741710b3a..047a05a7bb4 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -380,6 +380,7 @@ function check_logs_for_errors {
| grep -v "Error sending fetch request" \
| grep -v "WARN akka.remote.ReliableDeliverySupervisor" \
| grep -v "Options.*error_*" \
+ | grep -v "not packaged with this application" \
| grep -ic "error" || true)
if [[ ${error_count} -gt 0 ]]; then
echo "Found error in log files; printing first 500 lines; see full logs
for details:"
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 59240a14c03..1d48126e141 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -91,6 +91,7 @@ import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import
org.apache.flink.runtime.security.token.DefaultDelegationTokenManagerFactory;
import org.apache.flink.runtime.security.token.DelegationTokenManager;
+import
org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
import org.apache.flink.runtime.webmonitor.retriever.LeaderRetriever;
@@ -199,6 +200,9 @@ public class MiniCluster implements AutoCloseableAsync {
@GuardedBy("lock")
private DelegationTokenManager delegationTokenManager;
+ @GuardedBy("lock")
+ private DelegationTokenReceiverRepository
delegationTokenReceiverRepository;
+
@GuardedBy("lock")
private BlobCacheService blobCacheService;
@@ -431,6 +435,9 @@ public class MiniCluster implements AutoCloseableAsync {
DefaultDelegationTokenManagerFactory.create(
configuration,
commonRpcService.getScheduledExecutor(), ioExecutor);
+ delegationTokenReceiverRepository =
+ new DelegationTokenReceiverRepository(configuration);
+
blobCacheService =
BlobUtils.createBlobCacheService(
configuration,
@@ -749,7 +756,8 @@ public class MiniCluster implements AutoCloseableAsync {
ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,
workingDirectory.createSubWorkingDirectory("tm_" +
taskManagers.size()),
taskManagerTerminatingFatalErrorHandlerFactory.create(
- taskManagers.size()));
+ taskManagers.size()),
+ delegationTokenReceiverRepository);
taskExecutor.start();
taskManagers.add(taskExecutor);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java
index 1703d796dec..fb5a195d2cc 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.security.token;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
-import
org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenUpdater;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.concurrent.ScheduledExecutor;
@@ -34,9 +33,11 @@ import javax.annotation.concurrent.GuardedBy;
import java.time.Clock;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -58,6 +59,13 @@ import static org.apache.flink.util.Preconditions.checkState;
@Internal
public class DefaultDelegationTokenManager implements DelegationTokenManager {
+ private static final String PROVIDER_RECEIVER_INCONSISTENCY_ERROR =
+ "There is an inconsistency between loaded delegation token
providers and receivers. "
+ + "One must implement a DelegationTokenProvider and a
DelegationTokenReceiver "
+ + "with the same service name and add them together to the
classpath to make "
+ + "the system consistent. The mentioned classes are loaded
with Java's service "
+ + "loader so the appropriate META-INF registration also
needs to be created.";
+
private static final Logger LOG =
LoggerFactory.getLogger(DefaultDelegationTokenManager.class);
private final Configuration configuration;
@@ -68,6 +76,8 @@ public class DefaultDelegationTokenManager implements
DelegationTokenManager {
@VisibleForTesting final Map<String, DelegationTokenProvider>
delegationTokenProviders;
+ private final DelegationTokenReceiverRepository
delegationTokenReceiverRepository;
+
@Nullable private final ScheduledExecutor scheduledExecutor;
@Nullable private final ExecutorService ioExecutor;
@@ -89,8 +99,13 @@ public class DefaultDelegationTokenManager implements
DelegationTokenManager {
this.renewalRetryBackoffPeriod =
configuration.get(DELEGATION_TOKENS_RENEWAL_RETRY_BACKOFF).toMillis();
this.delegationTokenProviders = loadProviders();
+ this.delegationTokenReceiverRepository =
+ new DelegationTokenReceiverRepository(configuration);
this.scheduledExecutor = scheduledExecutor;
this.ioExecutor = ioExecutor;
+ checkProviderAndReceiverConsistency(
+ delegationTokenProviders,
+ delegationTokenReceiverRepository.delegationTokenReceivers);
}
private Map<String, DelegationTokenProvider> loadProviders() {
@@ -102,7 +117,7 @@ public class DefaultDelegationTokenManager implements
DelegationTokenManager {
Map<String, DelegationTokenProvider> providers = new HashMap<>();
for (DelegationTokenProvider provider : serviceLoader) {
try {
- if (isProviderEnabled(provider.serviceName())) {
+ if (isProviderEnabled(configuration, provider.serviceName())) {
provider.init(configuration);
LOG.info(
"Delegation token provider {} loaded and
initialized",
@@ -118,10 +133,13 @@ public class DefaultDelegationTokenManager implements
DelegationTokenManager {
provider.serviceName());
}
} catch (Exception | NoClassDefFoundError e) {
- LOG.warn(
+ // The intentional general rule is that if a provider's init
method throws exception
+ // then stop the workload
+ LOG.error(
"Failed to initialize delegation token provider {}",
provider.serviceName(),
e);
+ throw new FlinkRuntimeException(e);
}
}
@@ -130,8 +148,7 @@ public class DefaultDelegationTokenManager implements
DelegationTokenManager {
return providers;
}
- @VisibleForTesting
- boolean isProviderEnabled(String serviceName) {
+ static boolean isProviderEnabled(Configuration configuration, String
serviceName) {
return configuration.getBoolean(
String.format("security.delegation.token.provider.%s.enabled",
serviceName), true);
}
@@ -141,6 +158,38 @@ public class DefaultDelegationTokenManager implements
DelegationTokenManager {
return delegationTokenProviders.containsKey(serviceName);
}
+ @VisibleForTesting
+ boolean isReceiverLoaded(String serviceName) {
+ return delegationTokenReceiverRepository.isReceiverLoaded(serviceName);
+ }
+
+ @VisibleForTesting
+ static void checkProviderAndReceiverConsistency(
+ Map<String, DelegationTokenProvider> providers,
+ Map<String, DelegationTokenReceiver> receivers) {
+ LOG.info("Checking provider and receiver instances consistency");
+ if (providers.size() != receivers.size()) {
+ Set<String> missingReceiverServiceNames = new
HashSet<>(providers.keySet());
+ missingReceiverServiceNames.removeAll(receivers.keySet());
+ if (!missingReceiverServiceNames.isEmpty()) {
+ throw new IllegalStateException(
+ PROVIDER_RECEIVER_INCONSISTENCY_ERROR
+ + " Missing receivers: "
+ + String.join(",",
missingReceiverServiceNames));
+ }
+
+ Set<String> missingProviderServiceNames = new
HashSet<>(receivers.keySet());
+ missingProviderServiceNames.removeAll(providers.keySet());
+ if (!missingProviderServiceNames.isEmpty()) {
+ throw new IllegalStateException(
+ PROVIDER_RECEIVER_INCONSISTENCY_ERROR
+ + " Missing providers: "
+ + String.join(",",
missingProviderServiceNames));
+ }
+ }
+ LOG.info("Provider and receiver instances are consistent");
+ }
+
/**
* Obtains new tokens in a one-time fashion and leaves it up to the caller
to distribute them.
*/
@@ -212,12 +261,11 @@ public class DefaultDelegationTokenManager implements
DelegationTokenManager {
Optional<Long> nextRenewal =
obtainDelegationTokensAndGetNextRenewal(container);
if (container.hasTokens()) {
- byte[] containerBytes =
InstantiationUtil.serializeObject(container);
-
HadoopDelegationTokenUpdater.addCurrentUserCredentials(containerBytes);
+
delegationTokenReceiverRepository.onNewTokensObtained(container);
LOG.info("Notifying listener about new tokens");
checkNotNull(listener, "Listener must not be null");
- listener.onNewTokensObtained(containerBytes);
+
listener.onNewTokensObtained(InstantiationUtil.serializeObject(container));
LOG.info("Listener notified successfully");
} else {
LOG.warn("No tokens obtained so skipping listener
notification");
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenProvider.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenProvider.java
index d98e5816fa9..c6d7dcea413 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenProvider.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenProvider.java
@@ -25,10 +25,16 @@ import java.util.Optional;
/**
* Delegation token provider API. Instances of {@link
DelegationTokenProvider}s are loaded by {@link
- * DelegationTokenManager} through service loader.
+ * DelegationTokenManager} through service loader. Basically the
implementation of this interface is
+ * responsible to produce the serialized form of tokens which will be handled
by {@link
+ * DelegationTokenReceiver} instances both on JobManager and TaskManager side.
*/
@Experimental
public interface DelegationTokenProvider {
+
+ /** Config prefix of providers. */
+ String CONFIG_PREFIX = "security.delegation.token.provider";
+
/** Container for obtained delegation tokens. */
class ObtainedDelegationTokens {
/** Serialized form of delegation tokens. */
@@ -57,6 +63,11 @@ public interface DelegationTokenProvider {
/** Name of the service to provide delegation tokens. This name should be
unique. */
String serviceName();
+ /** Config prefix of the service. */
+ default String serviceConfigPrefix() {
+ return String.format("%s.%s", CONFIG_PREFIX, serviceName());
+ }
+
/**
* Called by {@link DelegationTokenManager} to initialize provider after
construction.
*
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenReceiver.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenReceiver.java
new file mode 100644
index 00000000000..b296186edc3
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenReceiver.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * Delegation token receiver API. Instances of {@link
DelegationTokenReceiver}s are loaded both on
+ * JobManager and TaskManager side through service loader. Basically the
implementation of this
+ * interface is responsible to receive the serialized form of tokens produced
by {@link
+ * DelegationTokenProvider}.
+ */
+@Experimental
+public interface DelegationTokenReceiver {
+
+ /** Config prefix of receivers. */
+ String CONFIG_PREFIX = "security.delegation.token.receiver";
+
+ /**
+ * Name of the service to receive delegation tokens for. This name should
be unique and the same
+ * as the one provided in the corresponding {@link
DelegationTokenProvider}.
+ */
+ String serviceName();
+
+ /** Config prefix of the service. */
+ default String serviceConfigPrefix() {
+ return String.format("%s.%s", CONFIG_PREFIX, serviceName());
+ }
+
+ /**
+ * Called to initialize receiver after construction.
+ *
+ * @param configuration Configuration to initialize the receiver.
+ */
+ void init(Configuration configuration) throws Exception;
+
+ /**
+ * Callback function when new delegation tokens obtained.
+ *
+ * @param tokens Serialized form of delegation tokens. Must be
deserialized the reverse way
+ * which is implemented in {@link DelegationTokenProvider}.
+ */
+ void onNewTokensObtained(byte[] tokens) throws Exception;
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepository.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepository.java
new file mode 100644
index 00000000000..8e63c3b4390
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepository.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.ServiceLoader;
+
+import static
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager.isProviderEnabled;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Repository for delegation token receivers. */
+@Internal
+public class DelegationTokenReceiverRepository {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DelegationTokenReceiverRepository.class);
+
+ private final Configuration configuration;
+
+ @VisibleForTesting final Map<String, DelegationTokenReceiver>
delegationTokenReceivers;
+
+ public DelegationTokenReceiverRepository(Configuration configuration) {
+ this.configuration = checkNotNull(configuration, "Flink configuration
must not be null");
+ this.delegationTokenReceivers = loadReceivers();
+ }
+
+ private Map<String, DelegationTokenReceiver> loadReceivers() {
+ LOG.info("Loading delegation token receivers");
+
+ ServiceLoader<DelegationTokenReceiver> serviceLoader =
+ ServiceLoader.load(DelegationTokenReceiver.class);
+
+ Map<String, DelegationTokenReceiver> receivers = new HashMap<>();
+ for (DelegationTokenReceiver receiver : serviceLoader) {
+ try {
+ if (isProviderEnabled(configuration, receiver.serviceName())) {
+ receiver.init(configuration);
+ LOG.info(
+ "Delegation token receiver {} loaded and
initialized",
+ receiver.serviceName());
+ checkState(
+ !receivers.containsKey(receiver.serviceName()),
+ "Delegation token receiver with service name {}
has multiple implementations",
+ receiver.serviceName());
+ receivers.put(receiver.serviceName(), receiver);
+ } else {
+ LOG.info(
+ "Delegation token receiver {} is disabled so not
loaded",
+ receiver.serviceName());
+ }
+ } catch (Exception | NoClassDefFoundError e) {
+ // The intentional general rule is that if a receiver's init
method throws exception
+ // then stop the workload
+ LOG.error(
+ "Failed to initialize delegation token receiver {}",
+ receiver.serviceName(),
+ e);
+ throw new FlinkRuntimeException(e);
+ }
+ }
+
+ LOG.info("Delegation token receivers loaded successfully");
+
+ return receivers;
+ }
+
+ @VisibleForTesting
+ boolean isReceiverLoaded(String serviceName) {
+ return delegationTokenReceivers.containsKey(serviceName);
+ }
+
+ /**
+ * Callback function when new delegation tokens obtained.
+ *
+ * @param containerBytes Serialized form of a DelegationTokenContainer.
All the available tokens
+ * will be forwarded to the appropriate {@link
DelegationTokenReceiver} based on service
+ * name.
+ */
+ public void onNewTokensObtained(byte[] containerBytes) throws Exception {
+ if (containerBytes == null || containerBytes.length == 0) {
+ throw new IllegalArgumentException("Illegal container tried to be
processed");
+ }
+ DelegationTokenContainer container =
+ InstantiationUtil.deserializeObject(
+ containerBytes,
DelegationTokenContainer.class.getClassLoader());
+ onNewTokensObtained(container);
+ }
+
+ /**
+ * Callback function when new delegation tokens obtained.
+ *
+ * @param container Serialized form of delegation tokens stored in
DelegationTokenContainer. All
+ * the available tokens will be forwarded to the appropriate {@link
DelegationTokenReceiver}
+ * based on service name.
+ */
+ public void onNewTokensObtained(DelegationTokenContainer container) throws
Exception {
+ LOG.info("New delegation tokens arrived, sending them to receivers");
+ for (Map.Entry<String, byte[]> entry :
container.getTokens().entrySet()) {
+ String serviceName = entry.getKey();
+ byte[] tokens = entry.getValue();
+ if (!delegationTokenReceivers.containsKey(serviceName)) {
+ throw new IllegalStateException(
+ "Tokens arrived for service but no receiver found for
it: " + serviceName);
+ }
+ try {
+
delegationTokenReceivers.get(serviceName).onNewTokensObtained(tokens);
+ } catch (Exception e) {
+ LOG.warn("Failed to send tokens to delegation token receiver
{}", serviceName, e);
+ }
+ }
+ LOG.info("Delegation tokens sent to receivers");
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenProvider.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenProvider.java
index 572a49eb8c3..7e8dd66ca54 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenProvider.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenProvider.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.security.token.hadoop;
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.security.token.DelegationTokenProvider;
import org.apache.flink.runtime.util.HadoopUtils;
@@ -34,7 +34,6 @@ import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.security.PrivilegedExceptionAction;
-import java.util.Objects;
import java.util.Optional;
/**
@@ -42,7 +41,7 @@ import java.util.Optional;
* flink-connector-hbase-base but HBase connection can be made without the
connector. All in all I
* tend to move this but that would be a breaking change.
*/
-@Experimental
+@Internal
public class HBaseDelegationTokenProvider implements DelegationTokenProvider {
private static final Logger LOG =
LoggerFactory.getLogger(HBaseDelegationTokenProvider.class);
@@ -80,7 +79,8 @@ public class HBaseDelegationTokenProvider implements
DelegationTokenProvider {
} catch (InvocationTargetException
| NoSuchMethodException
| IllegalAccessException
- | ClassNotFoundException e) {
+ | ClassNotFoundException
+ | NoClassDefFoundError e) {
LOG.info(
"HBase is not available (not packaged with this
application): {} : \"{}\".",
e.getClass().getSimpleName(),
@@ -91,6 +91,22 @@ public class HBaseDelegationTokenProvider implements
DelegationTokenProvider {
@Override
public boolean delegationTokensRequired() throws Exception {
+ /**
+ * The general rule how a provider/receiver must behave is the
following: The provider and
+ * the receiver must be added to the classpath together with all the
additionally required
+ * dependencies.
+ *
+ * <p>This null check is required because the HBase provider is always
on classpath but
+ * HBase jars are optional. Such case configuration is not able to be
loaded. This construct
+ * is intended to be removed when HBase provider/receiver pair can be
externalized (namely
+ * if a provider/receiver throws an exception then workload must be
stopped).
+ */
+ if (hbaseConf == null) {
+ LOG.debug(
+ "HBase is not available (not packaged with this
application), hence no "
+ + "tokens will be acquired.");
+ return false;
+ }
try {
if
(!HadoopUtils.isKerberosSecurityEnabled(UserGroupInformation.getCurrentUser()))
{
return false;
@@ -99,8 +115,7 @@ public class HBaseDelegationTokenProvider implements
DelegationTokenProvider {
LOG.debug("Hadoop Kerberos is not enabled.");
return false;
}
- return Objects.nonNull(hbaseConf)
- &&
hbaseConf.get("hbase.security.authentication").equals("kerberos")
+ return
hbaseConf.get("hbase.security.authentication").equals("kerberos")
&& kerberosLoginProvider.isLoginPossible();
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenReceiver.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenReceiver.java
new file mode 100644
index 00000000000..9fba0cc5604
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenReceiver.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token.hadoop;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Delegation token receiver implementation for HBase. Basically it would be
good to move this to
+ * flink-connector-hbase-base but HBase connection can be made without the
connector. All in all I
+ * tend to move this but that would be a breaking change.
+ */
+@Internal
+public class HBaseDelegationTokenReceiver extends
HadoopDelegationTokenReceiver {
+
+ @Override
+ public String serviceName() {
+ return "hbase";
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenUpdater.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenReceiver.java
similarity index 54%
rename from
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenUpdater.java
rename to
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenReceiver.java
index d6ec75797a9..0b1cba0f738 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenUpdater.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenReceiver.java
@@ -19,46 +19,43 @@
package org.apache.flink.runtime.security.token.hadoop;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.security.token.DelegationTokenContainer;
-import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.security.token.DelegationTokenReceiver;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/** Delegation token updater functionality. */
+/** Hadoop delegation token receiver base class. */
@Internal
-public final class HadoopDelegationTokenUpdater {
+public abstract class HadoopDelegationTokenReceiver implements
DelegationTokenReceiver {
- private static final Logger LOG =
LoggerFactory.getLogger(HadoopDelegationTokenUpdater.class);
+ private final Logger log = LoggerFactory.getLogger(getClass());
- private HadoopDelegationTokenUpdater() {}
+ public abstract String serviceName();
- /** Updates delegation tokens for the current user. */
- public static void addCurrentUserCredentials(byte[] containerBytes) throws
Exception {
- if (containerBytes == null || containerBytes.length == 0) {
- throw new IllegalArgumentException("Illegal container tried to be
processed");
- }
- DelegationTokenContainer container =
- InstantiationUtil.deserializeObject(
- containerBytes,
HadoopDelegationTokenUpdater.class.getClassLoader());
- Credentials credentials = new Credentials();
- for (byte[] v : container.getTokens().values()) {
- credentials.addAll(HadoopDelegationTokenConverter.deserialize(v));
+ public void init(Configuration configuration) throws Exception {}
+
+ @Override
+ public void onNewTokensObtained(byte[] tokens) throws Exception {
+ if (tokens == null || tokens.length == 0) {
+ throw new IllegalArgumentException("Illegal tokens tried to be
processed");
}
- LOG.info("Updating delegation tokens for current user");
+ Credentials credentials =
HadoopDelegationTokenConverter.deserialize(tokens);
+
+ log.info("Updating delegation tokens for current user");
dumpAllTokens(credentials);
UserGroupInformation.getCurrentUser().addCredentials(credentials);
- LOG.info("Updated delegation tokens for current user successfully");
+ log.info("Updated delegation tokens for current user successfully");
}
- public static void dumpAllTokens(Credentials credentials) {
+ private void dumpAllTokens(Credentials credentials) {
credentials
.getAllTokens()
.forEach(
token ->
- LOG.info(
+ log.info(
"Token Service:{} Identifier:{}",
token.getService(),
token.getIdentifier()));
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java
index a6a13f3a6cb..23202d27df9 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.security.token.hadoop;
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
@@ -47,7 +47,7 @@ import java.util.Optional;
import java.util.Set;
/** Delegation token provider for Hadoop filesystems. */
-@Experimental
+@Internal
public class HadoopFSDelegationTokenProvider implements
DelegationTokenProvider {
private static final Logger LOG =
@@ -69,12 +69,35 @@ public class HadoopFSDelegationTokenProvider implements
DelegationTokenProvider
@Override
public void init(Configuration configuration) throws Exception {
flinkConfiguration = configuration;
- hadoopConfiguration =
HadoopUtils.getHadoopConfiguration(configuration);
- kerberosLoginProvider = new KerberosLoginProvider(configuration);
+ try {
+ hadoopConfiguration =
HadoopUtils.getHadoopConfiguration(configuration);
+ kerberosLoginProvider = new KerberosLoginProvider(configuration);
+ } catch (NoClassDefFoundError e) {
+ LOG.info(
+ "Hadoop FS is not available (not packaged with this
application): {} : \"{}\".",
+ e.getClass().getSimpleName(),
+ e.getMessage());
+ }
}
@Override
public boolean delegationTokensRequired() throws Exception {
+ /**
+ * The general rule how a provider/receiver must behave is the
following: The provider and
+ * the receiver must be added to the classpath together with all the
additionally required
+ * dependencies.
+ *
+ * <p>This null check is required because the Hadoop FS provider is
always on classpath but
+ * Hadoop FS jars are optional. Such case configuration is not able to
be loaded. This
+ * construct is intended to be removed when HBase provider/receiver
pair can be externalized
+ * (namely if a provider/receiver throws an exception then workload
must be stopped).
+ */
+ if (hadoopConfiguration == null) {
+ LOG.debug(
+ "Hadoop FS is not available (not packaged with this
application), hence no "
+ + "tokens will be acquired.");
+ return false;
+ }
return
HadoopUtils.isKerberosSecurityEnabled(UserGroupInformation.getCurrentUser())
&& kerberosLoginProvider.isLoginPossible();
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenReceiver.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenReceiver.java
new file mode 100644
index 00000000000..113914a8651
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenReceiver.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token.hadoop;
+
+import org.apache.flink.annotation.Internal;
+
+/** Delegation token receiver for Hadoop filesystems. */
+@Internal
+public class HadoopFSDelegationTokenReceiver extends
HadoopDelegationTokenReceiver {
+
+ @Override
+ public String serviceName() {
+ return "hadoopfs";
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 218ecdaab46..9c3dd3744af 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -94,7 +94,7 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcServiceUtils;
-import
org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenUpdater;
+import
org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
@@ -263,6 +263,8 @@ public class TaskExecutor extends RpcEndpoint implements
TaskExecutorGateway {
private final TaskExecutorPartitionTracker partitionTracker;
+ private final DelegationTokenReceiverRepository
delegationTokenReceiverRepository;
+
// --------- resource manager --------
@Nullable private ResourceManagerAddress resourceManagerAddress;
@@ -289,7 +291,8 @@ public class TaskExecutor extends RpcEndpoint implements
TaskExecutorGateway {
@Nullable String metricQueryServiceAddress,
TaskExecutorBlobService taskExecutorBlobService,
FatalErrorHandler fatalErrorHandler,
- TaskExecutorPartitionTracker partitionTracker) {
+ TaskExecutorPartitionTracker partitionTracker,
+ DelegationTokenReceiverRepository
delegationTokenReceiverRepository) {
super(rpcService, RpcServiceUtils.createRandomName(TASK_MANAGER_NAME));
@@ -302,6 +305,7 @@ public class TaskExecutor extends RpcEndpoint implements
TaskExecutorGateway {
this.haServices = checkNotNull(haServices);
this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
this.partitionTracker = partitionTracker;
+ this.delegationTokenReceiverRepository =
checkNotNull(delegationTokenReceiverRepository);
this.taskManagerMetricGroup = checkNotNull(taskManagerMetricGroup);
this.taskExecutorBlobService = checkNotNull(taskExecutorBlobService);
this.metricQueryServiceAddress = metricQueryServiceAddress;
@@ -1346,7 +1350,7 @@ public class TaskExecutor extends RpcEndpoint implements
TaskExecutorGateway {
}
try {
- HadoopDelegationTokenUpdater.addCurrentUserCredentials(tokens);
+ delegationTokenReceiverRepository.onNewTokensObtained(tokens);
return CompletableFuture.completedFuture(Acknowledge.get());
} catch (Throwable t) {
log.error("Could not update delegation tokens.", t);
@@ -2386,7 +2390,7 @@ public class TaskExecutor extends RpcEndpoint implements
TaskExecutorGateway {
if (tokens != null) {
try {
log.info("Receive initial delegation tokens from resource
manager");
-
HadoopDelegationTokenUpdater.addCurrentUserCredentials(tokens);
+
delegationTokenReceiverRepository.onNewTokensObtained(tokens);
} catch (Throwable t) {
log.error("Could not update delegation tokens.", t);
ExceptionUtils.rethrowIfFatalError(t);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 0f08ccdaf7a..a7207e1fd8c 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -61,6 +61,7 @@ import org.apache.flink.runtime.rpc.RpcSystemUtils;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
+import
org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository;
import org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader;
import org.apache.flink.runtime.taskmanager.MemoryLogger;
import org.apache.flink.runtime.util.ConfigurationParserUtils;
@@ -241,6 +242,9 @@ public class TaskManagerRunner implements FatalErrorHandler
{
ExternalResourceUtils.createStaticExternalResourceInfoProviderFromConfig(
configuration, pluginManager);
+ final DelegationTokenReceiverRepository
delegationTokenReceiverRepository =
+ new DelegationTokenReceiverRepository(configuration);
+
taskExecutorService =
taskExecutorServiceFactory.createTaskExecutor(
this.configuration,
@@ -253,7 +257,8 @@ public class TaskManagerRunner implements FatalErrorHandler
{
false,
externalResourceInfoProvider,
workingDirectory.unwrap(),
- this);
+ this,
+ delegationTokenReceiverRepository);
handleUnexpectedTaskExecutorServiceTermination();
@@ -552,7 +557,8 @@ public class TaskManagerRunner implements FatalErrorHandler
{
boolean localCommunicationOnly,
ExternalResourceInfoProvider externalResourceInfoProvider,
WorkingDirectory workingDirectory,
- FatalErrorHandler fatalErrorHandler)
+ FatalErrorHandler fatalErrorHandler,
+ DelegationTokenReceiverRepository
delegationTokenReceiverRepository)
throws Exception {
final TaskExecutor taskExecutor =
@@ -567,7 +573,8 @@ public class TaskManagerRunner implements FatalErrorHandler
{
localCommunicationOnly,
externalResourceInfoProvider,
workingDirectory,
- fatalErrorHandler);
+ fatalErrorHandler,
+ delegationTokenReceiverRepository);
return TaskExecutorToServiceAdapter.createFor(taskExecutor);
}
@@ -583,7 +590,8 @@ public class TaskManagerRunner implements FatalErrorHandler
{
boolean localCommunicationOnly,
ExternalResourceInfoProvider externalResourceInfoProvider,
WorkingDirectory workingDirectory,
- FatalErrorHandler fatalErrorHandler)
+ FatalErrorHandler fatalErrorHandler,
+ DelegationTokenReceiverRepository
delegationTokenReceiverRepository)
throws Exception {
checkNotNull(configuration);
@@ -653,7 +661,8 @@ public class TaskManagerRunner implements FatalErrorHandler
{
metricQueryServiceAddress,
taskExecutorBlobService,
fatalErrorHandler,
- new
TaskExecutorPartitionTrackerImpl(taskManagerServices.getShuffleEnvironment()));
+ new
TaskExecutorPartitionTrackerImpl(taskManagerServices.getShuffleEnvironment()),
+ delegationTokenReceiverRepository);
}
/**
@@ -776,7 +785,8 @@ public class TaskManagerRunner implements FatalErrorHandler
{
boolean localCommunicationOnly,
ExternalResourceInfoProvider externalResourceInfoProvider,
WorkingDirectory workingDirectory,
- FatalErrorHandler fatalErrorHandler)
+ FatalErrorHandler fatalErrorHandler,
+ DelegationTokenReceiverRepository
delegationTokenReceiverRepository)
throws Exception;
}
diff --git
a/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenReceiver
b/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenReceiver
new file mode 100644
index 00000000000..fc1f34c11b8
--- /dev/null
+++
b/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenReceiver
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.runtime.security.token.hadoop.HadoopFSDelegationTokenReceiver
+org.apache.flink.runtime.security.token.hadoop.HBaseDelegationTokenReceiver
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerTest.java
index 8092f330a99..ed34ea978de 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerTest.java
@@ -22,16 +22,20 @@ import org.apache.flink.configuration.Configuration;
import
org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
-import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.time.Clock;
import java.time.ZoneId;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import static java.time.Instant.ofEpochMilli;
import static
org.apache.flink.configuration.SecurityOptions.DELEGATION_TOKENS_RENEWAL_TIME_RATIO;
+import static
org.apache.flink.runtime.security.token.DelegationTokenProvider.CONFIG_PREFIX;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -43,30 +47,28 @@ public class DefaultDelegationTokenManagerTest {
@BeforeEach
public void beforeEach() {
ExceptionThrowingDelegationTokenProvider.reset();
+ ExceptionThrowingDelegationTokenReceiver.reset();
}
- @AfterAll
- public static void afterAll() {
+ @AfterEach
+ public void afterEach() {
ExceptionThrowingDelegationTokenProvider.reset();
+ ExceptionThrowingDelegationTokenReceiver.reset();
}
@Test
public void isProviderEnabledMustGiveBackTrueByDefault() {
Configuration configuration = new Configuration();
- DefaultDelegationTokenManager delegationTokenManager =
- new DefaultDelegationTokenManager(configuration, null, null);
- assertTrue(delegationTokenManager.isProviderEnabled("test"));
+
assertTrue(DefaultDelegationTokenManager.isProviderEnabled(configuration,
"test"));
}
@Test
public void isProviderEnabledMustGiveBackFalseWhenDisabled() {
Configuration configuration = new Configuration();
-
configuration.setBoolean("security.delegation.token.provider.test.enabled",
false);
- DefaultDelegationTokenManager delegationTokenManager =
- new DefaultDelegationTokenManager(configuration, null, null);
+ configuration.setBoolean(CONFIG_PREFIX + ".test.enabled", false);
- assertFalse(delegationTokenManager.isProviderEnabled("test"));
+
assertFalse(DefaultDelegationTokenManager.isProviderEnabled(configuration,
"test"));
}
@Test
@@ -74,19 +76,87 @@ public class DefaultDelegationTokenManagerTest {
assertThrows(Exception.class, () -> new
DefaultDelegationTokenManager(null, null, null));
}
+ @Test
+ public void oneProviderThrowsExceptionMustFailFast() {
+ assertThrows(
+ Exception.class,
+ () -> {
+ ExceptionThrowingDelegationTokenProvider.throwInInit =
true;
+ new DefaultDelegationTokenManager(new Configuration(),
null, null);
+ });
+ }
+
@Test
public void testAllProvidersLoaded() {
Configuration configuration = new Configuration();
-
configuration.setBoolean("security.delegation.token.provider.throw.enabled",
false);
+ configuration.setBoolean(CONFIG_PREFIX + ".throw.enabled", false);
DefaultDelegationTokenManager delegationTokenManager =
new DefaultDelegationTokenManager(configuration, null, null);
assertEquals(3,
delegationTokenManager.delegationTokenProviders.size());
+
assertTrue(delegationTokenManager.isProviderLoaded("hadoopfs"));
+ assertTrue(delegationTokenManager.isReceiverLoaded("hadoopfs"));
+
assertTrue(delegationTokenManager.isProviderLoaded("hbase"));
+ assertTrue(delegationTokenManager.isReceiverLoaded("hbase"));
+
assertTrue(delegationTokenManager.isProviderLoaded("test"));
+ assertTrue(delegationTokenManager.isReceiverLoaded("test"));
+
assertTrue(ExceptionThrowingDelegationTokenProvider.constructed);
+ assertTrue(ExceptionThrowingDelegationTokenReceiver.constructed);
assertFalse(delegationTokenManager.isProviderLoaded("throw"));
+ assertFalse(delegationTokenManager.isReceiverLoaded("throw"));
+ }
+
+ @Test
+ public void
checkProviderAndReceiverConsistencyShouldNotThrowWhenNothingLoaded() {
+ DefaultDelegationTokenManager.checkProviderAndReceiverConsistency(
+ Collections.emptyMap(), Collections.emptyMap());
+ }
+
+ @Test
+ public void
checkProviderAndReceiverConsistencyShouldThrowWhenMissingReceiver() {
+ Map<String, DelegationTokenProvider> providers = new HashMap<>();
+ providers.put("test", new TestDelegationTokenProvider());
+
+ IllegalStateException e =
+ assertThrows(
+ IllegalStateException.class,
+ () ->
+
DefaultDelegationTokenManager.checkProviderAndReceiverConsistency(
+ providers, Collections.emptyMap()));
+ assertTrue(e.getMessage().contains("Missing receivers: test"));
+ }
+
+ @Test
+ public void
checkProviderAndReceiverConsistencyShouldThrowWhenMissingProvider() {
+ Map<String, DelegationTokenReceiver> receivers = new HashMap<>();
+ receivers.put("test", new TestDelegationTokenReceiver());
+
+ IllegalStateException e =
+ assertThrows(
+ IllegalStateException.class,
+ () ->
+
DefaultDelegationTokenManager.checkProviderAndReceiverConsistency(
+ Collections.emptyMap(), receivers));
+ assertTrue(e.getMessage().contains("Missing providers: test"));
+ }
+
+ @Test
+ public void
checkProviderAndReceiverConsistencyShouldNotThrowWhenBothLoaded() {
+ Map<String, DelegationTokenProvider> providers = new HashMap<>();
+ providers.put("test", new TestDelegationTokenProvider());
+ Map<String, DelegationTokenReceiver> receivers = new HashMap<>();
+ receivers.put("test", new TestDelegationTokenReceiver());
+
+
DefaultDelegationTokenManager.checkProviderAndReceiverConsistency(providers,
receivers);
+
+ assertEquals(1, providers.size());
+ assertTrue(providers.containsKey("test"));
+ assertEquals(1, receivers.size());
+ assertTrue(receivers.containsKey("test"));
}
@Test
@@ -98,7 +168,7 @@ public class DefaultDelegationTokenManagerTest {
ExceptionThrowingDelegationTokenProvider.addToken = true;
Configuration configuration = new Configuration();
-
configuration.setBoolean("security.delegation.token.provider.throw.enabled",
true);
+ configuration.setBoolean(CONFIG_PREFIX + ".throw.enabled", true);
AtomicInteger startTokensUpdateCallCount = new AtomicInteger(0);
DefaultDelegationTokenManager delegationTokenManager =
new DefaultDelegationTokenManager(configuration,
scheduledExecutor, scheduler) {
@@ -124,7 +194,7 @@ public class DefaultDelegationTokenManagerTest {
@Test
public void calculateRenewalDelayShouldConsiderRenewalRatio() {
Configuration configuration = new Configuration();
-
configuration.setBoolean("security.delegation.token.provider.throw.enabled",
false);
+ configuration.setBoolean(CONFIG_PREFIX + ".throw.enabled", false);
configuration.set(DELEGATION_TOKENS_RENEWAL_TIME_RATIO, 0.5);
DefaultDelegationTokenManager delegationTokenManager =
new DefaultDelegationTokenManager(configuration, null, null);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepositoryTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepositoryTest.java
new file mode 100644
index 00000000000..db93128d7ab
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepositoryTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.configuration.Configuration;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static
org.apache.flink.runtime.security.token.DelegationTokenProvider.CONFIG_PREFIX;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Test for {@link DelegationTokenReceiverRepository}. */
+class DelegationTokenReceiverRepositoryTest {
+
+ @BeforeEach
+ public void beforeEach() {
+ ExceptionThrowingDelegationTokenReceiver.reset();
+ }
+
+ @AfterEach
+ public void afterEach() {
+ ExceptionThrowingDelegationTokenReceiver.reset();
+ }
+
+ @Test
+ public void configurationIsNullMustFailFast() {
+ assertThrows(Exception.class, () -> new
DelegationTokenReceiverRepository(null));
+ }
+
+ @Test
+ public void oneReceiverThrowsExceptionMustFailFast() {
+ assertThrows(
+ Exception.class,
+ () -> {
+ ExceptionThrowingDelegationTokenReceiver.throwInInit =
true;
+ new DelegationTokenReceiverRepository(new Configuration());
+ });
+ }
+
+ @Test
+ public void testAllReceiversLoaded() {
+ Configuration configuration = new Configuration();
+ configuration.setBoolean(CONFIG_PREFIX + ".throw.enabled", false);
+ DelegationTokenReceiverRepository delegationTokenReceiverRepository =
+ new DelegationTokenReceiverRepository(configuration);
+
+ assertEquals(3,
delegationTokenReceiverRepository.delegationTokenReceivers.size());
+
assertTrue(delegationTokenReceiverRepository.isReceiverLoaded("hadoopfs"));
+
assertTrue(delegationTokenReceiverRepository.isReceiverLoaded("hbase"));
+ assertTrue(delegationTokenReceiverRepository.isReceiverLoaded("test"));
+ assertTrue(ExceptionThrowingDelegationTokenReceiver.constructed);
+
assertFalse(delegationTokenReceiverRepository.isReceiverLoaded("throw"));
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenReceiver.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenReceiver.java
new file mode 100644
index 00000000000..41e72c11cce
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenReceiver.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * An example implementation of {@link DelegationTokenReceiver} which throws
exception when enabled.
+ */
+public class ExceptionThrowingDelegationTokenReceiver implements
DelegationTokenReceiver {
+
+ public static volatile boolean throwInInit = false;
+ public static volatile boolean throwInUsage = false;
+ public static volatile boolean constructed = false;
+
+ public static void reset() {
+ throwInInit = false;
+ throwInUsage = false;
+ constructed = false;
+ }
+
+ public ExceptionThrowingDelegationTokenReceiver() {
+ constructed = true;
+ }
+
+ @Override
+ public String serviceName() {
+ return "throw";
+ }
+
+ @Override
+ public void init(Configuration configuration) {
+ if (throwInInit) {
+ throw new IllegalArgumentException();
+ }
+ }
+
+ @Override
+ public void onNewTokensObtained(byte[] tokens) throws Exception {
+ if (throwInUsage) {
+ throw new IllegalArgumentException();
+ }
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestDelegationTokenReceiver.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestDelegationTokenReceiver.java
new file mode 100644
index 00000000000..405d026a3a1
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestDelegationTokenReceiver.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.configuration.Configuration;
+
+/** An example implementation of {@link DelegationTokenReceiver} which does
nothing. */
+public class TestDelegationTokenReceiver implements DelegationTokenReceiver {
+
+ @Override
+ public String serviceName() {
+ return "test";
+ }
+
+ @Override
+ public void init(Configuration configuration) throws Exception {}
+
+ @Override
+ public void onNewTokensObtained(byte[] tokens) throws Exception {}
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenUpdaterITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenReceiverITCase.java
similarity index 60%
rename from
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenUpdaterITCase.java
rename to
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenReceiverITCase.java
index 7da8411dc96..9f98b4b2304 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenUpdaterITCase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenReceiverITCase.java
@@ -18,9 +18,6 @@
package org.apache.flink.runtime.security.token.hadoop;
-import org.apache.flink.runtime.security.token.DelegationTokenContainer;
-import org.apache.flink.util.InstantiationUtil;
-
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
@@ -37,51 +34,55 @@ import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-/** Test for {@link HadoopDelegationTokenConverter}. */
-public class HadoopDelegationTokenUpdaterITCase {
+/** Test for {@link HadoopDelegationTokenReceiver}. */
+public class HadoopDelegationTokenReceiverITCase {
@Test
- public void
addCurrentUserCredentialsShouldThrowExceptionWhenNullCredentials() {
- addCurrentUserCredentialsShouldThrowException(null);
+ public void onNewTokensObtainedShouldThrowExceptionWhenNullCredentials() {
+ onNewTokensObtainedShouldThrowException(null);
}
@Test
- public void
addCurrentUserCredentialsShouldThrowExceptionWhenEmptyCredentials() {
- addCurrentUserCredentialsShouldThrowException(new byte[0]);
+ public void onNewTokensObtainedShouldThrowExceptionWhenEmptyCredentials() {
+ onNewTokensObtainedShouldThrowException(new byte[0]);
}
- private void addCurrentUserCredentialsShouldThrowException(byte[]
credentialsBytes) {
- try (MockedStatic<UserGroupInformation> ugi =
mockStatic(UserGroupInformation.class)) {
- UserGroupInformation userGroupInformation =
mock(UserGroupInformation.class);
-
ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
-
- IllegalArgumentException e =
- assertThrows(
- IllegalArgumentException.class,
- () ->
-
HadoopDelegationTokenUpdater.addCurrentUserCredentials(
- credentialsBytes));
- assertTrue(e.getMessage().contains("Illegal container"));
- }
+ private void onNewTokensObtainedShouldThrowException(byte[]
credentialsBytes) {
+ HadoopDelegationTokenReceiver receiver =
+ new HadoopDelegationTokenReceiver() {
+ @Override
+ public String serviceName() {
+ return "test";
+ }
+ };
+ IllegalArgumentException e =
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> receiver.onNewTokensObtained(credentialsBytes));
+ assertTrue(e.getMessage().contains("Illegal tokens"));
}
@Test
- public void addCurrentUserCredentialsShouldOverwriteCredentials() throws
Exception {
+ public void onNewTokensObtainedShouldOverwriteCredentials() throws
Exception {
final Text tokenKind = new Text("TEST_TOKEN_KIND");
final Text tokenService = new Text("TEST_TOKEN_SERVICE");
Credentials credentials = new Credentials();
credentials.addToken(
tokenService, new Token<>(new byte[4], new byte[4], tokenKind,
tokenService));
byte[] credentialsBytes =
HadoopDelegationTokenConverter.serialize(credentials);
- DelegationTokenContainer container = new DelegationTokenContainer();
- container.addToken("TEST_TOKEN_KEY", credentialsBytes);
- byte[] containerBytes = InstantiationUtil.serializeObject(container);
try (MockedStatic<UserGroupInformation> ugi =
mockStatic(UserGroupInformation.class)) {
UserGroupInformation userGroupInformation =
mock(UserGroupInformation.class);
ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
-
HadoopDelegationTokenUpdater.addCurrentUserCredentials(containerBytes);
+ HadoopDelegationTokenReceiver receiver =
+ new HadoopDelegationTokenReceiver() {
+ @Override
+ public String serviceName() {
+ return "test";
+ }
+ };
+ receiver.onNewTokensObtained(credentialsBytes);
ArgumentCaptor<Credentials> argumentCaptor =
ArgumentCaptor.forClass(Credentials.class);
verify(userGroupInformation,
times(1)).addCredentials(argumentCaptor.capture());
assertTrue(
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorBuilder.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorBuilder.java
index 8d3367ac28e..362b4de5957 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorBuilder.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorBuilder.java
@@ -35,6 +35,7 @@ import
org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.rest.util.NoOpFatalErrorHandler;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
+import
org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository;
import org.apache.flink.util.concurrent.Executors;
import javax.annotation.Nullable;
@@ -143,6 +144,9 @@ public class TaskExecutorBuilder {
resolvedTaskManagerServices = taskManagerServices;
}
+ final DelegationTokenReceiverRepository
delegationTokenReceiverRepository =
+ new DelegationTokenReceiverRepository(configuration);
+
return new TaskExecutor(
rpcService,
resolvedTaskManagerConfiguration,
@@ -154,7 +158,8 @@ public class TaskExecutorBuilder {
metricQueryServiceAddress,
resolvedTaskExecutorBlobService,
fatalErrorHandler,
- partitionTracker);
+ partitionTracker,
+ delegationTokenReceiverRepository);
}
public static TaskExecutorBuilder newBuilder(
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java
index 43f9b75ee92..6933d29b551 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java
@@ -52,6 +52,7 @@ import
org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcServiceResource;
+import
org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
@@ -251,7 +252,8 @@ public class
TaskExecutorExecutionDeploymentReconciliationTest extends TestLogge
null,
NoOpTaskExecutorBlobService.INSTANCE,
testingFatalErrorHandlerResource.getFatalErrorHandler(),
- new TestingTaskExecutorPartitionTracker());
+ new TestingTaskExecutorPartitionTracker(),
+ new DelegationTokenReceiverRepository(configuration));
}
private static TaskDeploymentDescriptor
createTaskDeploymentDescriptor(JobID jobId)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
index 6136e12f9d3..e2afdfb703a 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
@@ -58,6 +58,7 @@ import
org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
+import
org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
@@ -599,7 +600,8 @@ public class TaskExecutorPartitionLifecycleTest extends
TestLogger {
null,
NoOpTaskExecutorBlobService.INSTANCE,
new TestingFatalErrorHandler(),
- partitionTracker);
+ partitionTracker,
+ new DelegationTokenReceiverRepository(configuration));
}
private static TaskSlotTable<Task> createTaskSlotTable() {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSlotLifetimeTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSlotLifetimeTest.java
index 35f099b4215..55cb4f7c6b5 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSlotLifetimeTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSlotLifetimeTest.java
@@ -44,6 +44,7 @@ import
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGate
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.TestingRpcServiceResource;
+import
org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils;
import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation;
@@ -236,7 +237,8 @@ public class TaskExecutorSlotLifetimeTest extends
TestLogger {
null,
NoOpTaskExecutorBlobService.INSTANCE,
testingFatalErrorHandlerResource.getFatalErrorHandler(),
- new TestingTaskExecutorPartitionTracker());
+ new TestingTaskExecutorPartitionTracker(),
+ new DelegationTokenReceiverRepository(configuration));
}
private TaskExecutorLocalStateStoresManager
createTaskExecutorLocalStateStoresManager()
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index e13efdcec55..cc857ba0e1f 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -79,6 +79,7 @@ import
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGate
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException;
+import
org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
@@ -2818,7 +2819,8 @@ public class TaskExecutorTest extends TestLogger {
null,
NoOpTaskExecutorBlobService.INSTANCE,
testingFatalErrorHandler,
- taskExecutorPartitionTracker);
+ taskExecutorPartitionTracker,
+ new DelegationTokenReceiverRepository(configuration));
}
private TestingTaskExecutor createTestingTaskExecutor(TaskManagerServices
taskManagerServices)
@@ -2853,7 +2855,8 @@ public class TaskExecutorTest extends TestLogger {
null,
NoOpTaskExecutorBlobService.INSTANCE,
testingFatalErrorHandler,
- new
TaskExecutorPartitionTrackerImpl(taskManagerServices.getShuffleEnvironment()));
+ new
TaskExecutorPartitionTrackerImpl(taskManagerServices.getShuffleEnvironment()),
+ new DelegationTokenReceiverRepository(configuration));
}
private TaskExecutorTestingContext createTaskExecutorTestingContext(int
numberOfSlots)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
index 2ea9bacaa8a..79ff1dc541d 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.metrics.scope.ScopeFormats;
import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingRpcServiceResource;
+import
org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository;
import org.apache.flink.runtime.testutils.WorkingDirectoryResource;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.TestLogger;
@@ -292,6 +293,7 @@ public class TaskManagerRunnerStartupTest extends
TestLogger {
false,
ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,
workingDirectory,
- error -> {});
+ error -> {},
+ new DelegationTokenReceiverRepository(configuration));
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java
index 6406c1d06de..1cde71b77ab 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java
@@ -35,6 +35,7 @@ import
org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
+import
org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.TimeUtils;
@@ -276,7 +277,8 @@ public class TaskManagerRunnerTest extends TestLogger {
localCommunicationOnly,
externalResourceInfoProvider,
workingDirectory,
- fatalErrorHandler) -> taskExecutorService;
+ fatalErrorHandler,
+ delegationTokenReceiverRepository) -> taskExecutorService;
}
private static Configuration createConfiguration() {
@@ -317,7 +319,8 @@ public class TaskManagerRunnerTest extends TestLogger {
boolean localCommunicationOnly,
ExternalResourceInfoProvider externalResourceInfoProvider,
WorkingDirectory workingDirectory,
- FatalErrorHandler fatalErrorHandler) {
+ FatalErrorHandler fatalErrorHandler,
+ DelegationTokenReceiverRepository
delegationTokenReceiverRepository) {
return TestingTaskExecutorService.newBuilder()
.setStartRunnable(
() ->
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
index 24ac627fa9b..70f42962e51 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
@@ -44,6 +44,7 @@ import
org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.rpc.MainThreadExecutable;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingRpcService;
+import
org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import org.apache.flink.runtime.taskexecutor.slot.DefaultTimerService;
@@ -268,7 +269,8 @@ class TaskSubmissionTestEnvironment implements
AutoCloseable {
metricQueryServiceAddress,
taskExecutorBlobService,
testingFatalErrorHandler,
- new
TaskExecutorPartitionTrackerImpl(taskManagerServices.getShuffleEnvironment()));
+ new
TaskExecutorPartitionTrackerImpl(taskManagerServices.getShuffleEnvironment()),
+ new DelegationTokenReceiverRepository(configuration));
}
private static ShuffleEnvironment<?, ?> createShuffleEnvironment(
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java
index 871784caf1e..9bf4b5be894 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java
@@ -27,6 +27,7 @@ import
org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.MainThreadExecutable;
import org.apache.flink.runtime.rpc.RpcService;
+import
org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository;
import javax.annotation.Nullable;
@@ -47,7 +48,8 @@ class TestingTaskExecutor extends TaskExecutor {
@Nullable String metricQueryServiceAddress,
TaskExecutorBlobService taskExecutorBlobService,
FatalErrorHandler fatalErrorHandler,
- TaskExecutorPartitionTracker partitionTracker) {
+ TaskExecutorPartitionTracker partitionTracker,
+ DelegationTokenReceiverRepository
delegationTokenReceiverRepository) {
super(
rpcService,
taskManagerConfiguration,
@@ -59,7 +61,8 @@ class TestingTaskExecutor extends TaskExecutor {
metricQueryServiceAddress,
taskExecutorBlobService,
fatalErrorHandler,
- partitionTracker);
+ partitionTracker,
+ delegationTokenReceiverRepository);
}
@Override
diff --git
a/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenReceiver
b/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenReceiver
new file mode 100644
index 00000000000..b186e9e09b9
--- /dev/null
+++
b/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenReceiver
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.runtime.security.token.TestDelegationTokenReceiver
+org.apache.flink.runtime.security.token.ExceptionThrowingDelegationTokenReceiver
diff --git
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index f692a501d41..2653fc46d9f 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -1300,9 +1300,12 @@ public class YarnClusterDescriptor implements
ClusterDescriptor<ApplicationId> {
DelegationTokenContainer container = new DelegationTokenContainer();
delegationTokenManager.obtainDelegationTokens(container);
+ // This is here for backward compatibility to make log aggregation work
Credentials credentials = new Credentials();
- for (byte[] v : container.getTokens().values()) {
- credentials.addAll(HadoopDelegationTokenConverter.deserialize(v));
+ for (Map.Entry<String, byte[]> e : container.getTokens().entrySet()) {
+ if (e.getKey().equals("hadoopfs")) {
+
credentials.addAll(HadoopDelegationTokenConverter.deserialize(e.getValue()));
+ }
}
ByteBuffer tokens =
ByteBuffer.wrap(HadoopDelegationTokenConverter.serialize(credentials));
containerLaunchContext.setTokens(tokens);