This is an automated email from the ASF dual-hosted git repository.
gaborgsomogyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 74b100bcba3 [FLINK-35525][yarn] Add a token services configuration to
allow obtained token to be passed to Yarn AM
74b100bcba3 is described below
commit 74b100bcba33ce18b21a44b26db7e162507f7830
Author: Zhen Wang <[email protected]>
AuthorDate: Mon Jun 10 20:50:14 2024 +0800
[FLINK-35525][yarn] Add a token services configuration to allow obtained
token to be passed to Yarn AM
---
.../generated/yarn_config_configuration.html | 6 +++
.../apache/flink/yarn/YarnClusterDescriptor.java | 6 ++-
.../yarn/configuration/YarnConfigOptions.java | 10 ++++
.../flink/yarn/YarnClusterDescriptorTest.java | 31 +++++++++++
.../token/TestYarnAMDelegationTokenProvider.java | 63 ++++++++++++++++++++++
.../token/TestYarnAMDelegationTokenReceiver.java | 36 +++++++++++++
...ink.core.security.token.DelegationTokenProvider | 16 ++++++
...ink.core.security.token.DelegationTokenReceiver | 16 ++++++
8 files changed, 182 insertions(+), 2 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/yarn_config_configuration.html
b/docs/layouts/shortcodes/generated/yarn_config_configuration.html
index 41c32c77c94..cb0b0f935ba 100644
--- a/docs/layouts/shortcodes/generated/yarn_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/yarn_config_configuration.html
@@ -152,6 +152,12 @@
<td>String</td>
<td>The provided usrlib directory in remote. It should be
pre-uploaded and world-readable. Flink will use it to exclude the local usrlib
directory(i.e. usrlib/ under the parent directory of FLINK_LIB_DIR). Unlike
yarn.provided.lib.dirs, YARN will not cache it on the nodes as it is for each
application. An example could be
hdfs://$namenode_address/path/of/flink/usrlib</td>
</tr>
+ <tr>
+ <td><h5>yarn.security.appmaster.delegation.token.services</h5></td>
+ <td style="word-wrap: break-word;">"hadoopfs"</td>
+ <td>List<String></td>
+ <td>The delegation token provider services are allowed to pass
obtained tokens to YARN application master. For backward compatibility to make
log aggregation to work, we add tokens obtained by `hadoopfs` provider to AM by
default.</td>
+ </tr>
<tr>
<td><h5>yarn.security.kerberos.localized-keytab-path</h5></td>
<td style="word-wrap: break-word;">"krb5.keytab"</td>
diff --git
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 31bd574b024..bc65a5ee93d 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -147,6 +147,7 @@ import static
org.apache.flink.yarn.Utils.getPathFromLocalFilePathStr;
import static org.apache.flink.yarn.Utils.getStartCommand;
import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
import static
org.apache.flink.yarn.YarnConfigKeys.LOCAL_RESOURCE_DESCRIPTOR_SEPARATOR;
+import static
org.apache.flink.yarn.configuration.YarnConfigOptions.APP_MASTER_TOKEN_SERVICES;
import static
org.apache.flink.yarn.configuration.YarnConfigOptions.YARN_CONTAINER_START_COMMAND_TEMPLATE;
/** The descriptor with deployment information for deploying a Flink cluster
on Yarn. */
@@ -1348,7 +1349,8 @@ public class YarnClusterDescriptor implements
ClusterDescriptor<ApplicationId> {
});
}
- private void setTokensFor(ContainerLaunchContext containerLaunchContext,
boolean fetchToken)
+ @VisibleForTesting
+ void setTokensFor(ContainerLaunchContext containerLaunchContext, boolean
fetchToken)
throws Exception {
Credentials credentials = new Credentials();
@@ -1372,7 +1374,7 @@ public class YarnClusterDescriptor implements
ClusterDescriptor<ApplicationId> {
// This is here for backward compatibility to make log aggregation
work
for (Map.Entry<String, byte[]> e :
container.getTokens().entrySet()) {
- if (e.getKey().equals("hadoopfs")) {
+ if
(flinkConfiguration.get(APP_MASTER_TOKEN_SERVICES).contains(e.getKey())) {
credentials.addAll(HadoopDelegationTokenConverter.deserialize(e.getValue()));
}
}
diff --git
a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
index 06ab7e7437d..5f45c1f86b4 100644
---
a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
+++
b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
@@ -364,6 +364,16 @@ public class YarnConfigOptions {
+ "resource directory. If set to false,
Flink"
+ " will try to directly locate the keytab
from the path itself.");
+ public static final ConfigOption<List<String>> APP_MASTER_TOKEN_SERVICES =
+ key("yarn.security.appmaster.delegation.token.services")
+ .stringType()
+ .asList()
+ .defaultValues("hadoopfs")
+ .withDescription(
+ "The delegation token provider services are
allowed to pass obtained tokens to YARN application master."
+ + " For backward compatibility to make log
aggregation to work, we add tokens obtained"
+ + " by `hadoopfs` provider to AM by
default.");
+
public static final ConfigOption<List<String>> PROVIDED_LIB_DIRS =
key("yarn.provided.lib.dirs")
.stringType()
diff --git
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index e0e581f75eb..f87313cf21b 100644
---
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -35,23 +35,29 @@ import
org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal;
import org.apache.flink.yarn.configuration.YarnDeploymentTarget;
import org.apache.flink.yarn.configuration.YarnLogConfigUtil;
+import org.apache.flink.yarn.token.TestYarnAMDelegationTokenProvider;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.Records;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
@@ -69,6 +75,7 @@ import java.util.UUID;
import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
import static
org.apache.flink.runtime.jobmanager.JobManagerProcessUtils.createDefaultJobManagerProcessSpec;
import static org.apache.flink.yarn.Utils.getPathFromLocalFile;
+import static
org.apache.flink.yarn.configuration.YarnConfigOptions.APP_MASTER_TOKEN_SERVICES;
import static
org.apache.flink.yarn.configuration.YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR;
import static
org.apache.flink.yarn.configuration.YarnConfigOptions.YARN_CONTAINER_START_COMMAND_TEMPLATE;
import static org.assertj.core.api.Assertions.assertThat;
@@ -974,4 +981,28 @@ class YarnClusterDescriptorTest {
appId.toString());
}
}
+
+ @Test
+ public void testSetTokensForYarnAppMaster() {
+ final Configuration flinkConfig = new Configuration();
+ flinkConfig.set(
+ APP_MASTER_TOKEN_SERVICES,
+ Arrays.asList(TestYarnAMDelegationTokenProvider.SERVICE_NAME));
+ YarnClusterDescriptor yarnClusterDescriptor =
createYarnClusterDescriptor(flinkConfig);
+ ContainerLaunchContext amContainer =
Records.newRecord(ContainerLaunchContext.class);
+ try {
+ yarnClusterDescriptor.setTokensFor(amContainer, true);
+ Credentials credentials = new Credentials();
+ try (DataInputStream dis =
+ new DataInputStream(
+ new
ByteArrayInputStream(amContainer.getTokens().array()))) {
+ credentials.readTokenStorageStream(dis);
+ }
+ assertThat(credentials.getAllTokens())
+ .hasSize(1)
+
.contains(TestYarnAMDelegationTokenProvider.TEST_YARN_AM_TOKEN);
+ } catch (Exception e) {
+ fail("Should not throw exception when setting tokens for AM
container.");
+ }
+ }
}
diff --git
a/flink-yarn/src/test/java/org/apache/flink/yarn/token/TestYarnAMDelegationTokenProvider.java
b/flink-yarn/src/test/java/org/apache/flink/yarn/token/TestYarnAMDelegationTokenProvider.java
new file mode 100644
index 00000000000..cc7d839515c
--- /dev/null
+++
b/flink-yarn/src/test/java/org/apache/flink/yarn/token/TestYarnAMDelegationTokenProvider.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.token;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.security.token.DelegationTokenProvider;
+import
org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenConverter;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+
+import java.io.IOException;
+import java.util.Optional;
+
+public class TestYarnAMDelegationTokenProvider implements
DelegationTokenProvider {
+
+ public static final String SERVICE_NAME = "yarn-am";
+ public static final Token<? extends TokenIdentifier> TEST_YARN_AM_TOKEN =
+ new Token<>(
+ new byte[4],
+ new byte[4],
+ new Text("TEST_YARN_AM_TOKEN_KIND"),
+ new Text("TEST_YARN_AM_TOKEN_SERVICE"));
+
+ @Override
+ public String serviceName() {
+ return SERVICE_NAME;
+ }
+
+ @Override
+ public void init(Configuration configuration) {}
+
+ @Override
+ public boolean delegationTokensRequired() {
+ return true;
+ }
+
+ @Override
+ public ObtainedDelegationTokens obtainDelegationTokens() throws
IOException {
+ Credentials credentials = new Credentials();
+ credentials.addToken(TEST_YARN_AM_TOKEN.getService(),
TEST_YARN_AM_TOKEN);
+ return new ObtainedDelegationTokens(
+ HadoopDelegationTokenConverter.serialize(credentials),
Optional.empty());
+ }
+}
diff --git
a/flink-yarn/src/test/java/org/apache/flink/yarn/token/TestYarnAMDelegationTokenReceiver.java
b/flink-yarn/src/test/java/org/apache/flink/yarn/token/TestYarnAMDelegationTokenReceiver.java
new file mode 100644
index 00000000000..38d3a7abfd6
--- /dev/null
+++
b/flink-yarn/src/test/java/org/apache/flink/yarn/token/TestYarnAMDelegationTokenReceiver.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.token;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.security.token.DelegationTokenReceiver;
+
+public class TestYarnAMDelegationTokenReceiver implements
DelegationTokenReceiver {
+
+ @Override
+ public String serviceName() {
+ return "yarn-am";
+ }
+
+ @Override
+ public void init(Configuration configuration) throws Exception {}
+
+ @Override
+ public void onNewTokensObtained(byte[] tokens) throws Exception {}
+}
diff --git
a/flink-yarn/src/test/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
b/flink-yarn/src/test/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
new file mode 100644
index 00000000000..5b8223ff1d3
--- /dev/null
+++
b/flink-yarn/src/test/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.yarn.token.TestYarnAMDelegationTokenProvider
diff --git
a/flink-yarn/src/test/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
b/flink-yarn/src/test/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
new file mode 100644
index 00000000000..e4a9611e32c
--- /dev/null
+++
b/flink-yarn/src/test/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.yarn.token.TestYarnAMDelegationTokenReceiver