link3280 closed pull request #7294: [FLINK-11126][YARN][security] Filter out 
AMRMToken in the TaskManager credentials
URL: https://github.com/apache/flink/pull/7294
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index 20e02e12fba..60d75183815 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -565,7 +565,20 @@ static ContainerLaunchContext createTaskExecutorContext(
                                                new File(fileLocation),
                                                
HadoopUtils.getHadoopConfiguration(flinkConfig));
 
-                               cred.writeTokenStorageToStream(dob);
+                               // Filter out AMRMToken before setting the 
tokens to the TaskManager container context.
+                               Method getAllTokensMethod = 
Credentials.class.getMethod("getAllTokens");
+                               Credentials taskManagerCred = new Credentials();
+                               final Text amRmTokenKind = new 
Text("YARN_AM_RM_TOKEN");
+                               Collection<Token<? extends TokenIdentifier>> 
userTokens =
+                                       (Collection<Token<? extends 
TokenIdentifier>>) getAllTokensMethod.invoke(cred);
+                               for (Token<? extends TokenIdentifier> token : 
userTokens) {
+                                       if 
(!token.getKind().equals(amRmTokenKind)) {
+                                               final Text id = new 
Text(token.getIdentifier());
+                                               taskManagerCred.addToken(id, 
token);
+                                       }
+                               }
+
+                               taskManagerCred.writeTokenStorageToStream(dob);
                                ByteBuffer securityTokens = 
ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
                                ctx.setTokens(securityTokens);
                        } catch (Throwable t) {
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java
 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java
index b15374b2a4b..b44a0e84ee6 100644
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java
@@ -19,9 +19,15 @@
 package org.apache.flink.yarn;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CommonTestUtils;
 import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.util.OperatingSystem;
 
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.junit.Assume;
@@ -35,7 +41,10 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.File;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -47,6 +56,7 @@
 import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
 import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyString;
@@ -100,6 +110,19 @@ public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
                env.put(FLINK_JAR_PATH, root.toURI().toString());
                env = Collections.unmodifiableMap(env);
 
+               File credentialFile = folder.newFile("container_tokens");
+               final Text amRmTokenKind = new Text("YARN_AM_RM_TOKEN");
+               final Text hdfsDelegationTokenKind = new 
Text("HDFS_DELEGATION_TOKEN");
+               final Text service = new Text("test-service");
+               Credentials amCredentials = new Credentials();
+               amCredentials.addToken(amRmTokenKind, new Token<>(new byte[4], 
new byte[4], amRmTokenKind, service));
+               amCredentials.addToken(hdfsDelegationTokenKind, new Token<>(new 
byte[4], new byte[4],
+                       hdfsDelegationTokenKind, service));
+               amCredentials.writeTokenStorageFile(new 
Path(credentialFile.getAbsolutePath()), yarnConf);
+               Map<String, String> systemEnv = new HashMap<>();
+               systemEnv.put("HADOOP_TOKEN_FILE_LOCATION", 
credentialFile.getAbsolutePath());
+               CommonTestUtils.setEnv(systemEnv);
+
                ContaineredTaskManagerParameters tmParams = 
mock(ContaineredTaskManagerParameters.class);
                Configuration taskManagerConf = new Configuration();
 
@@ -108,5 +131,22 @@ public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
                ContainerLaunchContext ctx = 
Utils.createTaskExecutorContext(flinkConf, yarnConf, env, tmParams,
                        taskManagerConf, workingDirectory, 
taskManagerMainClass, LOG);
                assertEquals("file", 
ctx.getLocalResources().get("flink.jar").getResource().getScheme());
+
+               Credentials credentials = new Credentials();
+               try (DataInputStream dis = new DataInputStream(new 
ByteArrayInputStream(ctx.getTokens().array()))) {
+                       credentials.readTokenStorageStream(dis);
+               }
+               Collection<Token<? extends TokenIdentifier>> tokens = 
credentials.getAllTokens();
+               boolean hasHdfsDelegationToken = false;
+               boolean hasAmRmToken = false;
+               for (Token<? extends TokenIdentifier> token : tokens) {
+                       if (token.getKind().equals(amRmTokenKind)) {
+                               hasAmRmToken = true;
+                       } else if 
(token.getKind().equals(hdfsDelegationTokenKind)) {
+                               hasHdfsDelegationToken = true;
+                       }
+               }
+               assertTrue(hasHdfsDelegationToken);
+               assertFalse(hasAmRmToken);
        }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to