Repository: tez Updated Branches: refs/heads/master 25f0247ee -> d336ebdbc
TEZ-2886. Ability to merge AM credentials with DAG credentials. Contributed by Jason Lowe. Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d336ebdb Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d336ebdb Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d336ebdb Branch: refs/heads/master Commit: d336ebdbc4ea86dce81a44b16d36fa2846cc4401 Parents: 25f0247 Author: Siddharth Seth <[email protected]> Authored: Fri Oct 16 17:34:56 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Fri Oct 16 17:34:56 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/tez/dag/api/TezConfiguration.java | 10 + .../org/apache/tez/dag/app/DAGAppMaster.java | 6 + .../apache/tez/dag/app/TestDAGAppMaster.java | 212 +++++++++++++++++++ 4 files changed, 230 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/d336ebdb/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 7d90402..6f94f0a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ Release 0.8.2: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2886. Ability to merge AM credentials with DAG credentials. TEZ-2896. Fix thread names used during Input/Output initialization. TEZ-2866. Tez UI: Newly added columns wont be displayed by default in tables TEZ-2887. Tez build failure due to missing dependency in pom files. @@ -215,6 +216,7 @@ Release 0.7.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES + TEZ-2886. Ability to merge AM credentials with DAG credentials. TEZ-2896. Fix thread names used during Input/Output initialization. TEZ-2866. Tez UI: Newly added columns wont be displayed by default in tables TEZ-2885. Remove counter logs from AMWebController. http://git-wip-us.apache.org/repos/asf/tez/blob/d336ebdb/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 12435ca..80b9860 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -149,6 +149,16 @@ public class TezConfiguration extends Configuration { @ConfigurationProperty public static final String TEZ_CREDENTIALS_PATH = TEZ_PREFIX + "credentials.path"; + /** + * Boolean value. If true then Tez will add the ApplicationMaster credentials + * to all task credentials. + */ + @ConfigurationScope(Scope.AM) + @ConfigurationProperty(type="boolean") + public static final String TEZ_AM_CREDENTIALS_MERGE = TEZ_AM_PREFIX + + "credentials-merge"; + public static boolean TEZ_AM_CREDENTIALS_MERGE_DEFAULT = true; + @Private @ConfigurationScope(Scope.AM) public static final String TEZ_AM_USE_CONCURRENT_DISPATCHER = TEZ_AM_PREFIX http://git-wip-us.apache.org/repos/asf/tez/blob/d336ebdb/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index d774b9a..f4c57e3 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -935,6 +935,12 @@ public class DAGAppMaster extends AbstractService { } else { dagCredentials = new Credentials(); } + if (getConfig().getBoolean(TezConfiguration.TEZ_AM_CREDENTIALS_MERGE, + TezConfiguration.TEZ_AM_CREDENTIALS_MERGE_DEFAULT)) { + LOG.info("Merging AM credentials into DAG credentials"); + dagCredentials.mergeAll(amCredentials); + } + // TODO Does this move to the client in case of work-preserving recovery. TokenCache.setSessionToken(sessionToken, dagCredentials); http://git-wip-us.apache.org/repos/asf/tez/blob/d336ebdb/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java index fa5d87c..a81c964 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java @@ -15,11 +15,18 @@ package org.apache.tez.dag.app; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import java.io.ByteArrayInputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.LinkedList; import java.util.List; @@ -28,15 +35,44 @@ import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; import com.google.common.collect.Lists; import com.google.protobuf.ByteString; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.SecretManager; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.tez.client.TezApiVersionInfo; +import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.TezUtils; +import org.apache.tez.common.security.JobTokenIdentifier; +import org.apache.tez.common.security.JobTokenSecretManager; +import org.apache.tez.common.security.TokenCache; +import org.apache.tez.dag.api.DagTypeConverters; import org.apache.tez.dag.api.NamedEntityDescriptor; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.records.DAGProtos; import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto; +import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto; import org.apache.tez.dag.api.records.DAGProtos.TezUserPayloadProto; +import org.apache.tez.dag.app.dag.impl.DAGImpl; +import org.apache.tez.dag.records.TezDAGID; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; import org.junit.Test; public class TestDAGAppMaster { @@ -47,6 +83,21 @@ public class TestDAGAppMaster { private static final String CL_NAME = "CL"; private static final String TC_NAME = "TC"; private static final String CLASS_SUFFIX = "_CLASS"; + private static final File TEST_DIR = new File( + System.getProperty("test.build.data", + System.getProperty("java.io.tmpdir")), + TestDAGAppMaster.class.getSimpleName()).getAbsoluteFile(); + + @Before + public void setup() { + FileUtil.fullyDelete(TEST_DIR); + TEST_DIR.mkdir(); + } + + @After + public void teardown() { + FileUtil.fullyDelete(TEST_DIR); + } @Test(timeout = 5000) public void testPluginParsing() throws IOException { @@ -297,4 +348,165 @@ public class TestDAGAppMaster { } + @Test + public void testDagCredentialsWithoutMerge() throws Exception { + testDagCredentials(false); + } + + @Test + public void testDagCredentialsWithMerge() throws Exception { + testDagCredentials(true); + } + + private void testDagCredentials(boolean doMerge) throws IOException { + TezConfiguration conf = new TezConfiguration(); + conf.setBoolean(TezConfiguration.TEZ_AM_CREDENTIALS_MERGE, doMerge); + conf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true); + conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, TEST_DIR.toString()); + ApplicationId appId = ApplicationId.newInstance(1, 1); + ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1); + + // create some sample AM credentials + Credentials amCreds = new Credentials(); + JobTokenSecretManager jtsm = new JobTokenSecretManager(); + JobTokenIdentifier identifier = new JobTokenIdentifier( + new Text(appId.toString())); + Token<JobTokenIdentifier> sessionToken = + new Token<JobTokenIdentifier>(identifier, jtsm); + sessionToken.setService(identifier.getJobId()); + TokenCache.setSessionToken(sessionToken, amCreds); + TestTokenSecretManager ttsm = new TestTokenSecretManager(); + Text tokenAlias1 = new Text("alias1"); + Token<TestTokenIdentifier> amToken1 = new Token<TestTokenIdentifier>( + new TestTokenIdentifier(new Text("amtoken1")), ttsm); + amCreds.addToken(tokenAlias1, amToken1); + Text tokenAlias2 = new Text("alias2"); + Token<TestTokenIdentifier> amToken2 = new Token<TestTokenIdentifier>( + new TestTokenIdentifier(new Text("amtoken2")), ttsm); + amCreds.addToken(tokenAlias2, amToken2); + + FileSystem fs = FileSystem.getLocal(conf); + FSDataOutputStream sessionJarsPBOutStream = + TezCommonUtils.createFileForAM(fs, new Path(TEST_DIR.toString(), + TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME)); + DAGProtos.PlanLocalResourcesProto.getDefaultInstance() + .writeDelimitedTo(sessionJarsPBOutStream); + sessionJarsPBOutStream.close(); + DAGAppMaster am = new DAGAppMaster(attemptId, + ContainerId.newContainerId(attemptId, 1), + "127.0.0.1", 0, 0, new SystemClock(), 1, true, + TEST_DIR.toString(), new String[] {TEST_DIR.toString()}, + new String[] {TEST_DIR.toString()}, + new TezApiVersionInfo().getVersion(), 1, amCreds, + "someuser", null); + am.init(conf); + am.start(); + + // create some sample DAG credentials + Credentials dagCreds = new Credentials(); + Token<TestTokenIdentifier> dagToken1 = new Token<TestTokenIdentifier>( + new TestTokenIdentifier(new Text("dagtoken1")), ttsm); + dagCreds.addToken(tokenAlias2, dagToken1); + Text tokenAlias3 = new Text("alias3"); + Token<TestTokenIdentifier> dagToken2 = new Token<TestTokenIdentifier>( + new TestTokenIdentifier(new Text("dagtoken2")), ttsm); + dagCreds.addToken(tokenAlias3, dagToken2); + + TezDAGID dagId = TezDAGID.getInstance(appId, 1); + DAGPlan dagPlan = DAGPlan.newBuilder() + .setName("somedag") + .setCredentialsBinary( + DagTypeConverters.convertCredentialsToProto(dagCreds)) + .build(); + DAGImpl dag = am.createDAG(dagPlan, dagId); + Credentials fetchedDagCreds = dag.getCredentials(); + am.stop(); + + Token<? extends TokenIdentifier> fetchedToken1 = + fetchedDagCreds.getToken(tokenAlias1); + if (doMerge) { + assertNotNull("AM creds missing from DAG creds", fetchedToken1); + compareTestTokens(amToken1, fetchedDagCreds.getToken(tokenAlias1)); + } else { + assertNull("AM creds leaked to DAG creds", fetchedToken1); + } + compareTestTokens(dagToken1, fetchedDagCreds.getToken(tokenAlias2)); + compareTestTokens(dagToken2, fetchedDagCreds.getToken(tokenAlias3)); + } + + private static void compareTestTokens( + Token<? extends TokenIdentifier> expected, + Token<? extends TokenIdentifier> actual) throws IOException { + TestTokenIdentifier expectedId = getTestTokenIdentifier(expected); + TestTokenIdentifier actualId = getTestTokenIdentifier(actual); + assertEquals("Token id not preserved", expectedId.getTestId(), + actualId.getTestId()); + } + + private static TestTokenIdentifier getTestTokenIdentifier( + Token<? extends TokenIdentifier> token) throws IOException { + ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier()); + DataInputStream in = new DataInputStream(buf); + TestTokenIdentifier tokenId = new TestTokenIdentifier(); + tokenId.readFields(in); + in.close(); + return tokenId; + } + + private static class TestTokenIdentifier extends TokenIdentifier { + private static Text KIND_NAME = new Text("test-token"); + + private Text testId; + + public TestTokenIdentifier() { + this(new Text()); + } + + public TestTokenIdentifier(Text id) { + testId = id; + } + + @Override + public void readFields(DataInput in) throws IOException { + testId.readFields(in); + } + + @Override + public void write(DataOutput out) throws IOException { + testId.write(out); + } + + @Override + public Text getKind() { + return KIND_NAME; + } + + @Override + public UserGroupInformation getUser() { + return UserGroupInformation.createRemoteUser("token-user"); + } + + public Text getTestId() { + return testId; + } + } + + private static class TestTokenSecretManager extends + SecretManager<TestTokenIdentifier> { + @Override + public byte[] createPassword(TestTokenIdentifier id) { + return id.getBytes(); + } + + @Override + public byte[] retrievePassword(TestTokenIdentifier id) + throws InvalidToken { + return id.getBytes(); + } + + @Override + public TestTokenIdentifier createIdentifier() { + return new TestTokenIdentifier(); + } + } }
