Author: omalley
Date: Fri Mar 4 03:49:54 2011
New Revision: 1077187
URL: http://svn.apache.org/viewvc?rev=1077187&view=rev
Log:
commit a1654131f5aca0bbb7058c36bb8c2806056d4c0c
Author: Boris Shkolnik <[email protected]>
Date: Sun Feb 21 22:10:38 2010 -0800
MAPREDUCE:1430 from
https://issues.apache.org/jira/secure/attachment/12436542/1430-dd4-BP20.patch
+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-1430. JobTracker should be able to renew delegation tokens
+ for the jobs(boryas)
+
Added:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenIdentifier.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/Job.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenIdentifier.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenIdentifier.java?rev=1077187&r1=1077186&r2=1077187&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenIdentifier.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenIdentifier.java
Fri Mar 4 03:49:54 2011
@@ -28,7 +28,7 @@ import org.apache.hadoop.security.token.
//@InterfaceAudience.Private
public class DelegationTokenIdentifier
extends AbstractDelegationTokenIdentifier {
- static final Text HDFS_DELEGATION_KIND = new Text("HDFS_DELEGATION_TOKEN");
+ public static final Text HDFS_DELEGATION_KIND = new
Text("HDFS_DELEGATION_TOKEN");
/**
* Create an empty delegation token identifier for reading into.
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml?rev=1077187&r1=1077186&r2=1077187&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml
Fri Mar 4 03:49:54 2011
@@ -717,6 +717,14 @@
</property>
<property>
+ <name>mapreduce.job.complete.cancel.delegation.tokens</name>
+ <value>true</value>
+ <description> if false - do not unregister/cancel delegation tokens
+ from renewal, because same tokens may be used by spawned jobs
+ </description>
+ </property>
+
+ <property>
<name>mapred.task.profile</name>
<value>false</value>
<description>To set whether the system should collect profiler
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077187&r1=1077186&r2=1077187&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
Fri Mar 4 03:49:54 2011
@@ -21,8 +21,6 @@ import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
@@ -38,32 +36,34 @@ import java.util.concurrent.atomic.Atomi
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
import org.apache.hadoop.mapred.JobHistory.Values;
-import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import org.apache.hadoop.mapreduce.split.JobSplit;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.metrics.MetricsContext;
import org.apache.hadoop.metrics.MetricsRecord;
import org.apache.hadoop.metrics.MetricsUtil;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
+import org.apache.hadoop.security.TokenStorage;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
-import org.apache.hadoop.mapreduce.JobSubmissionFiles;
-import org.apache.hadoop.mapreduce.split.*;
-import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
/*************************************************************
* JobInProgress maintains all the info for keeping
@@ -394,6 +394,10 @@ class JobInProgress {
this.nonRunningReduces = new LinkedList<TaskInProgress>();
this.runningReduces = new LinkedHashSet<TaskInProgress>();
this.resourceEstimator = new ResourceEstimator(this);
+
+ // register job's tokens for renewal
+ DelegationTokenRenewal.registerDelegationTokensForRenewal(
+ jobInfo.getJobID(), ts, this.conf);
}
/**
@@ -2881,37 +2885,43 @@ class JobInProgress {
* from all tables. Be sure to remove all of this job's tasks
* from the various tables.
*/
- synchronized void garbageCollect() {
- // Cancel task tracker reservation
- cancelReservedSlots();
-
- // Let the JobTracker know that a job is complete
- jobtracker.getInstrumentation().decWaitingMaps(getJobID(), pendingMaps());
- jobtracker.getInstrumentation().decWaitingReduces(getJobID(),
pendingReduces());
- jobtracker.storeCompletedJob(this);
- jobtracker.finalizeJob(this);
-
- try {
- // Definitely remove the local-disk copy of the job file
- if (localJobFile != null) {
- localFs.delete(localJobFile, true);
- localJobFile = null;
- }
-
- Path tempDir = jobtracker.getSystemDirectoryForJob(getJobID());
- new CleanupQueue().addToQueue(new PathDeletionContext(
- jobtracker.getFileSystem(), tempDir.toUri().getPath()));
- } catch (IOException e) {
- LOG.warn("Error cleaning up "+profile.getJobID()+": "+e);
- }
-
- cleanUpMetrics();
- // free up the memory used by the data structures
- this.nonRunningMapCache = null;
- this.runningMapCache = null;
- this.nonRunningReduces = null;
- this.runningReduces = null;
+ void garbageCollect() {
+ synchronized(this) {
+ // Cancel task tracker reservation
+ cancelReservedSlots();
+ // Let the JobTracker know that a job is complete
+ jobtracker.getInstrumentation().decWaitingMaps(getJobID(),
pendingMaps());
+ jobtracker.getInstrumentation().decWaitingReduces(getJobID(),
pendingReduces());
+ jobtracker.storeCompletedJob(this);
+ jobtracker.finalizeJob(this);
+
+ try {
+ // Definitely remove the local-disk copy of the job file
+ if (localJobFile != null) {
+ localFs.delete(localJobFile, true);
+ localJobFile = null;
+ }
+
+ Path tempDir = jobtracker.getSystemDirectoryForJob(getJobID());
+ new CleanupQueue().addToQueue(new PathDeletionContext(
+ jobtracker.getFileSystem(), tempDir.toUri().getPath()));
+ } catch (IOException e) {
+ LOG.warn("Error cleaning up "+profile.getJobID()+": "+e);
+ }
+
+ cleanUpMetrics();
+ // free up the memory used by the data structures
+ this.nonRunningMapCache = null;
+ this.runningMapCache = null;
+ this.nonRunningReduces = null;
+ this.runningReduces = null;
+ }
+
+ // remove jobs delegation tokens
+ if(conf.getBoolean(JobContext.JOB_CANCEL_DELEGATION_TOKEN, true)) {
+ DelegationTokenRenewal.removeDelegationTokenRenewalForJob(jobId);
+ } // else don't remove it.May be used by spawned tasks
}
/**
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/Job.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/Job.java?rev=1077187&r1=1077186&r2=1077187&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/Job.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/Job.java
Fri Mar 4 03:49:54 2011
@@ -387,6 +387,15 @@ public class Job extends JobContext {
throw new IOException(attr + " is incompatible with " + msg + " mode.");
}
}
+
+ /**
+ * Sets the flag that will allow the JobTracker to cancel the HDFS delegation
+ * tokens upon job completion. Defaults to true.
+ */
+ public void setCancelDelegationTokenUponJobCompletion(boolean value) {
+ ensureState(JobState.DEFINE);
+ conf.setBoolean(JOB_CANCEL_DELEGATION_TOKEN, value);
+ }
/**
* Default to the new APIs unless they are explicitly set or the old mapper
or
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java?rev=1077187&r1=1077186&r2=1077187&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobContext.java
Fri Mar 4 03:49:54 2011
@@ -56,6 +56,9 @@ public class JobContext {
public static final String CACHE_ARCHIVES_VISIBILITIES =
"mapreduce.job.cache.archives.visibilities";
+ public static final String JOB_CANCEL_DELEGATION_TOKEN =
+ "mapreduce.job.complete.cancel.delegation.tokens";
+
public JobContext(Configuration conf, JobID jobId) {
this.conf = new org.apache.hadoop.mapred.JobConf(conf);
this.jobId = jobId;
Added:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java?rev=1077187&view=auto
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
(added)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
Fri Mar 4 03:49:54 2011
@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.security.token;
+
+import java.net.URI;
+import java.security.AccessControlException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+
+
+//@InterfaceAudience.Private
+public class DelegationTokenRenewal {
+ private static final Log LOG =
LogFactory.getLog(DelegationTokenRenewal.class);
+ public static final String SCHEME = "hdfs";
+
+ /**
+ * class that is used for keeping tracks of DT to renew
+ *
+ */
+ private static class DelegationTokenToRenew {
+ public final Token<DelegationTokenIdentifier> token;
+ public final JobID jobId;
+ public final Configuration conf;
+ public long expirationDate;
+ public TimerTask timerTask;
+
+ public DelegationTokenToRenew(
+ JobID jId, Token<DelegationTokenIdentifier> t,
+ Configuration newConf, long newExpirationDate) {
+ token = t;
+ jobId = jId;
+ conf = newConf;
+ expirationDate = newExpirationDate;
+ timerTask = null;
+ if(token==null || jobId==null || conf==null) {
+ throw new IllegalArgumentException("invalid params for Renew Token" +
+ ";t="+token+";j="+jobId+";c="+conf);
+ }
+ }
+ public void setTimerTask(TimerTask tTask) {
+ timerTask = tTask;
+ }
+ public String toString() {
+ return token + ";exp="+expirationDate;
+ }
+ }
+
+ // global single timer (daemon)
+ private static Timer renewalTimer = new Timer(true);
+
+ //managing the list of tokens using Map
+ // jobId=>List<tokens>
+ private static Map<JobID, List<DelegationTokenToRenew>> delegationTokens =
+ Collections.synchronizedMap(new HashMap<JobID,
+ List<DelegationTokenToRenew>>());
+ //adding token
+ private static void addTokenToMap(DelegationTokenToRenew t) {
+ // see if a list already exists
+ JobID jobId = t.jobId;
+ List<DelegationTokenToRenew> l = delegationTokens.get(jobId);
+ if(l==null) {
+ l = new ArrayList<DelegationTokenToRenew>();
+ delegationTokens.put(jobId, l);
+ }
+ l.add(t);
+ }
+
+ // kind of tokens we currently renew
+ private static final Text kindHdfs =
+ DelegationTokenIdentifier.HDFS_DELEGATION_KIND;
+
+ @SuppressWarnings("unchecked")
+ public static synchronized void registerDelegationTokensForRenewal(
+ JobID jobId, TokenStorage ts, Configuration conf) {
+ if(ts==null)
+ return; //nothing to add
+
+ Collection <Token<? extends TokenIdentifier>> tokens = ts.getAllTokens();
+ long now = System.currentTimeMillis();
+
+ for(Token<? extends TokenIdentifier> t : tokens) {
+ // currently we only check for HDFS delegation tokens
+ // later we can add more different types.
+ if(! t.getKind().equals(kindHdfs)) {
+ continue;
+ }
+ Token<DelegationTokenIdentifier> dt =
+ (Token<DelegationTokenIdentifier>)t;
+
+ // first renew happens immediately
+ DelegationTokenToRenew dtr =
+ new DelegationTokenToRenew(jobId, dt, conf, now);
+
+ addTokenToMap(dtr);
+
+ setTimerForTokenRenewal(dtr, true);
+ LOG.info("registering token for renewal for service =" + dt.getService()+
+ " and jobID = " + jobId);
+ }
+ }
+
+ private static long renewDelegationToken(DelegationTokenToRenew dttr)
+ throws Exception {
+ long newExpirationDate=System.currentTimeMillis()+3600*1000;
+ Token<DelegationTokenIdentifier> token = dttr.token;
+ Configuration conf = dttr.conf;
+
+ if(token.getKind().equals(kindHdfs)) {
+ try {
+ DistributedFileSystem dfs = getDFSForToken(token, conf);
+ newExpirationDate = dfs.renewDelegationToken(token);
+ } catch (InvalidToken ite) {
+ LOG.warn("token canceled - not scheduling for renew");
+ removeFailedDelegationToken(dttr);
+ throw new Exception("failed to renew token", ite);
+ } catch (AccessControlException ace) {
+ LOG.warn("token canceled - not scheduling for renew");
+ removeFailedDelegationToken(dttr);
+ throw new Exception("failed to renew token", ace);
+ } catch (Exception ioe) {
+ LOG.warn("failed to renew token:"+token, ioe);
+ // returns default expiration date
+ }
+ } else {
+ throw new Exception("unknown token type to renew+"+token.getKind());
+ }
+ return newExpirationDate;
+ }
+
+
+ /**
+ * Task - to renew a token
+ *
+ */
+ private static class RenewalTimerTask extends TimerTask {
+ private DelegationTokenToRenew dttr;
+
+ RenewalTimerTask(DelegationTokenToRenew t) { dttr = t; }
+
+ @Override
+ public void run() {
+ Token<DelegationTokenIdentifier> token = dttr.token;
+ long newExpirationDate=0;
+ try {
+ newExpirationDate = renewDelegationToken(dttr);
+ } catch (Exception e) {
+ return; // message logged in renewDT method
+ }
+ if (LOG.isDebugEnabled())
+ LOG.debug("renewing for:"+token.getService()+";newED=" +
+ newExpirationDate);
+
+ // new expiration date
+ dttr.expirationDate = newExpirationDate;
+ setTimerForTokenRenewal(dttr, false);// set the next one
+ }
+ }
+
+ private static DistributedFileSystem getDFSForToken(
+ Token<DelegationTokenIdentifier> token, Configuration conf)
+ throws Exception {
+ DistributedFileSystem dfs = null;
+ try {
+ URI uri = new URI (SCHEME + "://" + token.getService().toString());
+ dfs = (DistributedFileSystem) FileSystem.get(uri, conf);
+ } catch (Exception e) {
+ LOG.warn("Failed to create a dfs to renew for:" + token.getService(), e);
+ throw e;
+ }
+ return dfs;
+ }
+
+ /**
+ * find the soonest expiring token and set it for renew
+ */
+ private static void setTimerForTokenRenewal(
+ DelegationTokenToRenew token, boolean firstTime) {
+
+ // calculate timer time
+ long now = System.currentTimeMillis();
+ long renewIn;
+ if(firstTime) {
+ renewIn = now;
+ } else {
+ long expiresIn = (token.expirationDate - now);
+ renewIn = now + expiresIn - expiresIn/10; // little before expiration
+ }
+
+ try {
+ // need to create new timer every time
+ TimerTask tTask = new RenewalTimerTask(token);
+ token.setTimerTask(tTask); // keep reference to the timer
+
+ renewalTimer.schedule(token.timerTask, new Date(renewIn));
+ } catch (Exception e) {
+ LOG.warn("failed to schedule a task, token will not renew more", e);
+ }
+ }
+
+ /**
+ * removing all tokens renewals
+ */
+ static public void close() {
+ renewalTimer.cancel();
+ delegationTokens.clear();
+ }
+
+ // cancel a token
+ private static void cancelToken(DelegationTokenToRenew t) {
+ Token<DelegationTokenIdentifier> token = t.token;
+ Configuration conf = t.conf;
+
+ if(token.getKind().equals(kindHdfs)) {
+ try {
+ DistributedFileSystem dfs = getDFSForToken(token, conf);
+ if (LOG.isDebugEnabled())
+ LOG.debug("canceling token " + token.getService() + " for dfs=" +
+ dfs);
+ dfs.cancelDelegationToken(token);
+ } catch (Exception e) {
+ LOG.warn("Failed to cancel " + token, e);
+ }
+ }
+ }
+
+ /**
+ * removing failed DT
+ * @param jobId
+ */
+ private static void removeFailedDelegationToken(DelegationTokenToRenew t) {
+ JobID jobId = t.jobId;
+ List<DelegationTokenToRenew> l = delegationTokens.get(jobId);
+ if(l==null) return;
+
+ Iterator<DelegationTokenToRenew> it = l.iterator();
+ while(it.hasNext()) {
+ DelegationTokenToRenew dttr = it.next();
+ if(dttr == t) {
+ if (LOG.isDebugEnabled())
+ LOG.debug("removing failed delegation token for jobid=" + jobId +
+ ";t=" + dttr.token.getService());
+
+ // cancel the timer
+ if(dttr.timerTask!=null)
+ dttr.timerTask.cancel();
+
+ // no need to cancel the token - it is invalid
+ it.remove();
+ break; //should be only one
+ }
+ }
+ }
+
+ /**
+ * removing DT for completed jobs
+ * @param jobId
+ */
+ public static void removeDelegationTokenRenewalForJob(JobID jobId) {
+ List<DelegationTokenToRenew> l = delegationTokens.remove(jobId);
+ if(l==null) return;
+
+ Iterator<DelegationTokenToRenew> it = l.iterator();
+ while(it.hasNext()) {
+ DelegationTokenToRenew dttr = it.next();
+ if (LOG.isDebugEnabled())
+ LOG.debug("removing delegation token for jobid=" + jobId +
+ ";t=" + dttr.token.getService());
+
+ // cancel the timer
+ if(dttr.timerTask!=null)
+ dttr.timerTask.cancel();
+
+ // cancel the token
+ cancelToken(dttr);
+
+ it.remove();
+ }
+ }
+}
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java?rev=1077187&r1=1077186&r2=1077187&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java
Fri Mar 4 03:49:54 2011
@@ -18,8 +18,7 @@
package org.apache.hadoop.mapreduce.security;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
import java.io.File;
import java.io.IOException;
@@ -59,7 +58,6 @@ import org.codehaus.jackson.map.ObjectMa
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
-import static org.junit.Assert.*;
public class TestTokenCache {
@@ -275,7 +273,7 @@ public class TestTokenCache {
for(Token<? extends TokenIdentifier> t: tns) {
System.out.println("kind="+t.getKind() + ";servic=" + t.getService() +
";str=" + t.toString());
- if(t.getKind().equals(new Text("HDFS_DELEGATION_TOKEN")) &&
+ if(t.getKind().equals(DelegationTokenIdentifier.HDFS_DELEGATION_KIND) &&
t.getService().equals(new Text(fs_addr))) {
found = true;
}
Added:
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java?rev=1077187&view=auto
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java
(added)
+++
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java
Fri Mar 4 03:49:54 2011
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.security.token;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.security.TokenStorage;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.util.StringUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * unit test -
+ * tests addition/deletion/cancelation of renewals of delegation tokens
+ *
+ */
+public class TestDelegationTokenRenewal {
+ private static final Log LOG =
+ LogFactory.getLog(TestDelegationTokenRenewal.class);
+
+ private static Configuration conf;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ conf = new Configuration();
+
+ // create a fake FileSystem (MyFS) and assosiate it
+ // with "hdfs" schema.
+ URI uri = new URI(DelegationTokenRenewal.SCHEME+"://localhost:0");
+ System.out.println("scheme is : " + uri.getScheme());
+ conf.setClass("fs." + uri.getScheme() + ".impl", MyFS.class,
DistributedFileSystem.class);
+ FileSystem.setDefaultUri(conf, uri);
+ System.out.println("filesystem uri = " +
FileSystem.getDefaultUri(conf).toString());
+ }
+
+ /**
+ * add some extra functionality for testing
+ * 1. toString();
+ * 2. cancel() and isCanceled()
+ */
+ private static class MyToken extends Token<DelegationTokenIdentifier> {
+ public String status = "GOOD";
+ public static final String CANCELED = "CANCELED";
+
+ public MyToken(DelegationTokenIdentifier dtId1,
+ DelegationTokenSecretManager sm) {
+ super(dtId1, sm);
+ status = "GOOD";
+ }
+
+ public boolean isCanceled() {return status.equals(CANCELED);}
+ public void cancelToken() {this.status=CANCELED;}
+ public String toString() {
+ StringBuilder sb = new StringBuilder(1024);
+
+ sb.append("id=");
+ String id = StringUtils.byteToHexString(this.getIdentifier());
+ int idLen = id.length();
+ sb.append(id.substring(idLen-6));
+ sb.append(";k=");
+ sb.append(this.getKind());
+ sb.append(";s=");
+ sb.append(this.getService());
+ return sb.toString();
+ }
+ }
+
+ /**
+ * fake FileSystem
+ * overwrites three methods
+ * 1. getDelegationToken() - generates a token
+ * 2. renewDelegataionToken - counts number of calls, and remembers
+ * most recently renewed token.
+ * 3. cancelToken -cancels token (subsequent renew will cause IllegalToken
+ * exception
+ */
+ static class MyFS extends DistributedFileSystem {
+ int counter=0;
+ MyToken token;
+ MyToken tokenToRenewIn2Sec;
+
+ public MyFS() {}
+ public void close() {}
+ @Override
+ public void initialize(URI uri, Configuration conf) throws IOException {}
+
+ @Override
+ public long renewDelegationToken(Token<DelegationTokenIdentifier> t)
+ throws InvalidToken, IOException {
+ MyToken token = (MyToken)t;
+ if(token.isCanceled()) {
+ throw new InvalidToken("token has been canceled");
+ }
+ counter ++;
+ this.token = (MyToken)token;
+ System.out.println("Called MYDFS.renewdelegationtoken " + token);
+ if(tokenToRenewIn2Sec == token) {
+ // this token first renewal in 2 seconds
+ System.out.println("RENEW in 2 seconds");
+ tokenToRenewIn2Sec=null;
+ return 2*1000 + System.currentTimeMillis();
+ } else {
+ return 86400*1000 + System.currentTimeMillis();
+ }
+ }
+ @Override
+ public MyToken getDelegationToken(Text renewer)
+ throws IOException {
+ System.out.println("Called MYDFS.getdelegationtoken");
+ return createTokens(renewer);
+ }
+ @Override
+ public void cancelDelegationToken(Token<DelegationTokenIdentifier> t)
+ throws IOException {
+ MyToken token = (MyToken)t;
+ token.cancelToken();
+ }
+
+ public void setTokenToRenewIn2Sec(MyToken t) {tokenToRenewIn2Sec=t;}
+ public int getCounter() {return counter; }
+ public MyToken getToken() {return token;}
+ }
+
+ /**
+ * auxilary - create token
+ * @param renewer
+ * @return
+ * @throws IOException
+ */
+ static MyToken createTokens(Text renewer)
+ throws IOException {
+ Text user1= new Text("user1");
+
+ DelegationTokenSecretManager sm = new DelegationTokenSecretManager(
+ DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT,
+ DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT,
+ DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT,
+ 3600000);
+ sm.startThreads();
+
+ DelegationTokenIdentifier dtId1 =
+ new DelegationTokenIdentifier(user1, renewer, user1);
+
+ MyToken token1 = new MyToken(dtId1, sm);
+
+
+ token1.setService(new Text("localhost:0"));
+ return token1;
+ }
+
+
+ /**
+ * Basic idea of the test:
+ * 1. create tokens.
+ * 2. Mark one of them to be renewed in 2 seconds (istead of
+ * 24 hourse)
+ * 3. register them for renewal
+ * 4. sleep for 3 seconds
+ * 5. count number of renewals (should 3 initial ones + one extra)
+ * 6. register another token for 2 seconds
+ * 7. cancel it immediately
+ * 8. Sleep and check that the 2 seconds renew didn't happen
+ * (totally 5 reneals)
+ * 9. check cancelation
+ * @throws IOException
+ * @throws URISyntaxException
+ */
+ @Test
+ public void testDTRenewal () throws IOException, URISyntaxException {
+ MyFS dfs = (MyFS)FileSystem.get(conf);
+ System.out.println("dfs="+(Object)dfs);
+ // Test 1. - add three tokens - make sure exactly one get's renewed
+
+ // get the delegation tokens
+ MyToken token1, token2, token3;
+ token1 = dfs.getDelegationToken(new Text("user1"));
+ token2 = dfs.getDelegationToken(new Text("user2"));
+ token3 = dfs.getDelegationToken(new Text("user3"));
+
+ //to cause this one to be set for renew in 2 secs
+ dfs.setTokenToRenewIn2Sec(token1);
+ System.out.println("token="+token1+" should be renewed for 2 secs");
+
+ // two distinct Namenodes
+ String nn1 = DelegationTokenRenewal.SCHEME + "://host1:0";
+ String nn2 = DelegationTokenRenewal.SCHEME + "://host2:0";
+ String nn3 = DelegationTokenRenewal.SCHEME + "://host3:0";
+
+ TokenStorage ts = new TokenStorage();
+
+ // register the token for renewal
+ ts.addToken(new Text(nn1), token1);
+ ts.addToken(new Text(nn2), token2);
+ ts.addToken(new Text(nn3), token3);
+
+ // register the tokens for renewal
+ DelegationTokenRenewal.registerDelegationTokensForRenewal(
+ new JobID("job1", 1), ts, conf);
+ // first 3 initial renewals + 1 real
+ int numberOfExpectedRenewals = 3+1;
+
+ int attempts = 4;
+ while(attempts-- > 0) {
+ try {
+ Thread.sleep(3*1000); // sleep 3 seconds, so it has time to renew
+ } catch (InterruptedException e) {}
+
+ // since we cannot guarantee timely execution - let's give few chances
+ if(dfs.getCounter()==numberOfExpectedRenewals)
+ break;
+ }
+
+ System.out.println("Counter = " + dfs.getCounter() + ";t="+
+ dfs.getToken());
+ assertEquals("renew wasn't called as many times as expected(4):",
+ numberOfExpectedRenewals, dfs.getCounter());
+ assertEquals("most recently renewed token mismatch", dfs.getToken(),
+ token1);
+
+ // Test 2.
+ // add another token ( that expires in 2 secs). Then remove it, before
+ // time is up.
+ // Wait for 3 secs , and make sure no renews were called
+ ts = new TokenStorage();
+ MyToken token4 = dfs.getDelegationToken(new Text("user4"));
+
+ //to cause this one to be set for renew in 2 secs
+ dfs.setTokenToRenewIn2Sec(token4);
+ System.out.println("token="+token4+" should be renewed for 2 secs");
+
+ String nn4 = DelegationTokenRenewal.SCHEME + "://host4:0";
+ ts.addToken(new Text(nn4), token4);
+
+
+ JobID jid2 = new JobID("job2",1);
+ DelegationTokenRenewal.registerDelegationTokensForRenewal(jid2, ts, conf);
+ DelegationTokenRenewal.removeDelegationTokenRenewalForJob(jid2);
+ numberOfExpectedRenewals++; // one more initial renewal
+ attempts = 4;
+ while(attempts-- > 0) {
+ try {
+ Thread.sleep(3*1000); // sleep 3 seconds, so it has time to renew
+ } catch (InterruptedException e) {}
+ // since we cannot guarantee timely execution - let's give few chances
+ if(dfs.getCounter()==numberOfExpectedRenewals)
+ break;
+ }
+ System.out.println("Counter = " + dfs.getCounter() + ";t="+dfs.getToken());
+
+ // counter and the token should stil be the old ones
+ assertEquals("renew wasn't called as many times as expected",
+ numberOfExpectedRenewals, dfs.getCounter());
+
+ // also renewing of the cancelled token should fail
+ boolean exception=false;
+ try {
+ dfs.renewDelegationToken(token4);
+ } catch (InvalidToken ite) {
+ //expected
+ exception = true;
+ }
+ assertTrue("Renew of canceled token didn't fail", exception);
+ }
+}