Author: omalley
Date: Fri Mar 4 03:48:56 2011
New Revision: 1077177
URL: http://svn.apache.org/viewvc?rev=1077177&view=rev
Log:
commit 5fb460785475c22997d4f4fcaa144cb368e7239b
Author: Sharad Agarwal <[email protected]>
Date: Mon Feb 22 13:19:07 2010 +0530
patch from
Added:
hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/TaskAspect.aj
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestControlledJob.java
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/FinishTaskControlAction.java
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/ControlAction.java
Removed:
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRFault.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/JobClientAspect.aj
hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/TaskTrackerAspect.aj
hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/test/system/DaemonProtocolAspect.aj
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTClient.java
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractDaemonClient.java
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractMasterSlaveCluster.java
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/DaemonProtocol.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/JobClientAspect.aj
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/JobClientAspect.aj?rev=1077177&r1=1077176&r2=1077177&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/JobClientAspect.aj
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/JobClientAspect.aj
Fri Mar 4 03:48:56 2011
@@ -1,8 +1,16 @@
package org.apache.hadoop.mapred;
+import java.io.IOException;
+import org.apache.hadoop.mapreduce.JobID;
+
public privileged aspect JobClientAspect {
public JobSubmissionProtocol JobClient.getProtocol() {
return jobSubmitClient;
}
+
+ public void JobClient.killJob(JobID id) throws IOException {
+ jobSubmitClient.killJob(
+ org.apache.hadoop.mapred.JobID.downgrade(id));
+ }
}
Added:
hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/TaskAspect.aj
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/TaskAspect.aj?rev=1077177&view=auto
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/TaskAspect.aj
(added)
+++
hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/TaskAspect.aj
Fri Mar 4 03:48:56 2011
@@ -0,0 +1,95 @@
+package org.apache.hadoop.mapred;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.mapred.Task.TaskReporter;
+import org.apache.hadoop.mapreduce.test.system.FinishTaskControlAction;
+import org.apache.hadoop.test.system.ControlAction;
+import org.apache.hadoop.test.system.DaemonProtocol;
+
+public privileged aspect TaskAspect {
+
+ private static final Log LOG = LogFactory.getLog(TaskAspect.class);
+
+ private Object waitObject = new Object();
+ private AtomicBoolean isWaitingForSignal = new AtomicBoolean(false);
+
+ private DaemonProtocol daemonProxy;
+
+ pointcut taskDoneIntercept(Task task) : execution(
+ public void Task.done(..)) && target(task);
+
+ void around(Task task) : taskDoneIntercept(task) {
+ if(task.isJobCleanupTask() || task.isJobSetupTask() ||
task.isTaskCleanupTask()) {
+ proceed(task);
+ return;
+ }
+ Configuration conf = task.getConf();
+ boolean controlEnabled =
FinishTaskControlAction.isControlActionEnabled(conf);
+ if(controlEnabled) {
+ LOG.info("Task control enabled, waiting till client sends signal to " +
+ "complete");
+ try {
+ synchronized (waitObject) {
+ isWaitingForSignal.set(true);
+ waitObject.wait();
+ }
+ } catch (InterruptedException e) {
+ }
+ }
+ proceed(task);
+ return;
+ }
+
+ pointcut taskStatusUpdate(TaskReporter reporter, TaskAttemptID id) :
+ call(public boolean TaskUmbilicalProtocol.ping(TaskAttemptID))
+ && this(reporter) && args(id);
+
+ after(TaskReporter reporter, TaskAttemptID id) throws IOException :
+ taskStatusUpdate(reporter, id) {
+ synchronized (waitObject) {
+ if(isWaitingForSignal.get()) {
+ ControlAction[] actions = daemonProxy.getActions(
+ id.getTaskID());
+ if(actions.length == 0) {
+ return;
+ }
+ boolean shouldProceed = false;
+ for(ControlAction action : actions) {
+ if (action instanceof FinishTaskControlAction) {
+ LOG.info("Recv : Control task action to finish task id: "
+ + action.getTarget());
+ shouldProceed = true;
+ daemonProxy.removeAction(action);
+ LOG.info("Removed the control action from TaskTracker");
+ break;
+ }
+ }
+ if(shouldProceed) {
+ LOG.info("Notifying the task to completion");
+ waitObject.notify();
+ }
+ }
+ }
+ }
+
+
+ pointcut rpcInterceptor(Class k, long version,InetSocketAddress addr,
+ Configuration conf) : call(
+ public static * RPC.getProxy(Class, long ,InetSocketAddress,
+ Configuration)) && args(k, version,addr, conf) &&
+ within(org.apache.hadoop.mapred.Child) ;
+
+ after(Class k, long version, InetSocketAddress addr, Configuration conf)
+ throws IOException : rpcInterceptor(k, version, addr, conf) {
+ daemonProxy =
+ (DaemonProtocol) RPC.getProxy(
+ DaemonProtocol.class, DaemonProtocol.versionID, addr, conf);
+ }
+
+}
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/TaskTrackerAspect.aj
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/TaskTrackerAspect.aj?rev=1077177&r1=1077176&r2=1077177&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/TaskTrackerAspect.aj
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/TaskTrackerAspect.aj
Fri Mar 4 03:48:56 2011
@@ -4,13 +4,13 @@ import java.io.IOException;
import java.util.List;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.test.system.JTProtocol;
import org.apache.hadoop.mapreduce.test.system.TTProtocol;
import org.apache.hadoop.mapreduce.test.system.TTTaskInfo;
import org.apache.hadoop.mapred.TTTaskInfoImpl.MapTTTaskInfo;
import org.apache.hadoop.mapred.TTTaskInfoImpl.ReduceTTTaskInfo;
+import org.apache.hadoop.test.system.ControlAction;
import org.apache.hadoop.test.system.DaemonProtocol;
-import org.apache.hadoop.test.system.DaemonProtocolAspect;
+import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
public privileged aspect TaskTrackerAspect {
@@ -75,4 +75,5 @@ public privileged aspect TaskTrackerAspe
return proceed(protocol, clientVersion);
}
}
+
}
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/test/system/DaemonProtocolAspect.aj
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/test/system/DaemonProtocolAspect.aj?rev=1077177&r1=1077176&r2=1077177&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/test/system/DaemonProtocolAspect.aj
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/test/system/DaemonProtocolAspect.aj
Fri Mar 4 03:48:56 2011
@@ -3,10 +3,11 @@ package org.apache.hadoop.test.system;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
+import java.util.ArrayList;
import java.util.Map;
import java.util.Properties;
-
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -22,7 +23,11 @@ import org.apache.hadoop.conf.Configurat
public aspect DaemonProtocolAspect {
private boolean DaemonProtocol.ready;
-
+
+ @SuppressWarnings("unchecked")
+ private HashMap<Object, List<ControlAction>> DaemonProtocol.actions =
+ new HashMap<Object, List<ControlAction>>();
+
/**
* Set if the daemon process is ready or not, concrete daemon protocol should
* implement pointcuts to determine when the daemon is ready and use the
@@ -140,4 +145,66 @@ public aspect DaemonProtocolAspect {
}
return fs;
}
+
+
+ @SuppressWarnings("unchecked")
+ public ControlAction[] DaemonProtocol.getActions(Writable key)
+ throws IOException {
+ synchronized (actions) {
+ List<ControlAction> actionList = actions.get(key);
+ if(actionList == null) {
+ return new ControlAction[0];
+ } else {
+ return (ControlAction[]) actionList.toArray(new
ControlAction[actionList
+
.size()]);
+ }
+ }
+ }
+
+
+ @SuppressWarnings("unchecked")
+ public void DaemonProtocol.sendAction(ControlAction action)
+ throws IOException {
+ synchronized (actions) {
+ List<ControlAction> actionList = actions.get(action.getTarget());
+ if(actionList == null) {
+ actionList = new ArrayList<ControlAction>();
+ actions.put(action.getTarget(), actionList);
+ }
+ actionList.add(action);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public boolean DaemonProtocol.isActionPending(ControlAction action)
+ throws IOException{
+ synchronized (actions) {
+ List<ControlAction> actionList = actions.get(action.getTarget());
+ if(actionList == null) {
+ return false;
+ } else {
+ return actionList.contains(action);
+ }
+ }
+ }
+
+
+ @SuppressWarnings("unchecked")
+ public void DaemonProtocol.removeAction(ControlAction action)
+ throws IOException {
+ synchronized (actions) {
+ List<ControlAction> actionList = actions.get(action.getTarget());
+ if(actionList == null) {
+ return;
+ } else {
+ actionList.remove(action);
+ }
+ }
+ }
+
+ public void DaemonProtocol.clearActions() throws IOException {
+ synchronized (actions) {
+ actions.clear();
+ }
+ }
}
Added:
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestControlledJob.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestControlledJob.java?rev=1077177&view=auto
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestControlledJob.java
(added)
+++
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestControlledJob.java
Fri Mar 4 03:48:56 2011
@@ -0,0 +1,90 @@
+package org.apache.hadoop.mapred;
+
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.SleepJob;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.test.system.FinishTaskControlAction;
+import org.apache.hadoop.mapreduce.test.system.JTProtocol;
+import org.apache.hadoop.mapreduce.test.system.JobInfo;
+import org.apache.hadoop.mapreduce.test.system.MRCluster;
+import org.apache.hadoop.mapreduce.test.system.TTClient;
+import org.apache.hadoop.mapreduce.test.system.TaskInfo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestControlledJob {
+ private MRCluster cluster;
+
+ private static final Log LOG = LogFactory.getLog(TestControlledJob.class);
+
+ public TestControlledJob() throws Exception {
+ cluster = MRCluster.createCluster(new Configuration());
+ }
+
+ @Before
+ public void before() throws Exception {
+ cluster.setUp();
+ }
+
+ @After
+ public void after() throws Exception {
+ cluster.tearDown();
+ }
+
+ @Test
+ public void testControlledJob() throws Exception {
+ Configuration conf = new Configuration(cluster.getConf());
+ JTProtocol wovenClient = cluster.getMaster().getProxy();
+ FinishTaskControlAction.configureControlActionForJob(conf);
+ SleepJob job = new SleepJob();
+ job.setConf(conf);
+
+ conf = job.setupJobConf(1, 0, 100, 100, 100, 100);
+ JobClient client = cluster.getMaster().getClient();
+
+ RunningJob rJob = client.submitJob(new JobConf(conf));
+ JobID id = rJob.getID();
+
+ JobInfo jInfo = wovenClient.getJobInfo(id);
+
+ while (jInfo.getStatus().getRunState() != JobStatus.RUNNING) {
+ Thread.sleep(1000);
+ jInfo = wovenClient.getJobInfo(id);
+ }
+
+ LOG.info("Waiting till job starts running one map");
+ jInfo = wovenClient.getJobInfo(id);
+ Assert.assertEquals(jInfo.runningMaps(), 1);
+
+ LOG.info("waiting for another cycle to " +
+ "check if the maps dont finish off");
+ Thread.sleep(1000);
+ jInfo = wovenClient.getJobInfo(id);
+ Assert.assertEquals(jInfo.runningMaps(), 1);
+
+ TaskInfo[] taskInfos = wovenClient.getTaskInfo(id);
+
+ for(TaskInfo info : taskInfos) {
+ LOG.info("constructing control action to signal task to finish");
+ FinishTaskControlAction action = new FinishTaskControlAction(
+ TaskID.downgrade(info.getTaskID()));
+ for(TTClient cli : cluster.getSlaves().values()) {
+ cli.getProxy().sendAction(action);
+ }
+ }
+
+ jInfo = wovenClient.getJobInfo(id);
+ while(!jInfo.getStatus().isJobComplete()) {
+ Thread.sleep(1000);
+ jInfo = wovenClient.getJobInfo(id);
+ }
+
+ LOG.info("Job sucessfully completed after signalling!!!!");
+ }
+}
Added:
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/FinishTaskControlAction.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/FinishTaskControlAction.java?rev=1077177&view=auto
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/FinishTaskControlAction.java
(added)
+++
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/FinishTaskControlAction.java
Fri Mar 4 03:48:56 2011
@@ -0,0 +1,52 @@
+package org.apache.hadoop.mapreduce.test.system;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.TaskID;
+import org.apache.hadoop.test.system.ControlAction;
+
+/**
+ * Control Action which signals a controlled task to proceed to completion.
<br/>
+ */
+public class FinishTaskControlAction extends ControlAction<TaskID> {
+
+ private static final String ENABLE_CONTROLLED_TASK_COMPLETION =
+ "test.system.enabled.task.completion.control";
+
+ /**
+ * Create a default control action. <br/>
+ *
+ */
+ public FinishTaskControlAction() {
+ super(new TaskID());
+ }
+
+ /**
+ * Create a control action specific to a particular task. <br/>
+ *
+ * @param id
+ * of the task.
+ */
+ public FinishTaskControlAction(TaskID id) {
+ super(id);
+ }
+
+ /**
+ * Sets up the job to be controlled using the finish task control action.
+ * <br/>
+ *
+ * @param conf
+ * configuration to be used submit the job.
+ */
+ public static void configureControlActionForJob(Configuration conf) {
+ conf.setBoolean(ENABLE_CONTROLLED_TASK_COMPLETION, true);
+ }
+
+ /**
+ * Checks if the control action is enabled in the passed configuration. <br/>
+ * @param conf configuration
+ * @return true if action is enabled.
+ */
+ public static boolean isControlActionEnabled(Configuration conf) {
+ return conf.getBoolean(ENABLE_CONTROLLED_TASK_COMPLETION, false);
+ }
+}
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTClient.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTClient.java?rev=1077177&r1=1077176&r2=1077177&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTClient.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTClient.java
Fri Mar 4 03:48:56 2011
@@ -82,6 +82,15 @@ public class JTClient extends MRDaemonCl
public Configuration getJobTrackerConfig() throws IOException {
return getProxy().getDaemonConf();
}
+
+ /**
+ * Kills the job. <br/>
+ * @param id of the job to be killed.
+ * @throws IOException
+ */
+ public void killJob(JobID id) throws IOException {
+ getClient().killJob(id);
+ }
/**
* Verification API to check running jobs and running job states.
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java?rev=1077177&r1=1077176&r2=1077177&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java
Fri Mar 4 03:48:56 2011
@@ -68,5 +68,11 @@ public class MRCluster extends AbstractM
public void ensureClean() throws IOException {
//TODO: ensure that no jobs/tasks are running
//restart the cluster if cleanup fails
+ JTClient jtClient = getMaster();
+ JobInfo[] jobs = jtClient.getProxy().getAllJobInfo();
+ for(JobInfo job : jobs) {
+ jtClient.getClient().killJob(
+ org.apache.hadoop.mapred.JobID.downgrade(job.getID()));
+ }
}
}
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractDaemonClient.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractDaemonClient.java?rev=1077177&r1=1077176&r2=1077177&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractDaemonClient.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractDaemonClient.java
Fri Mar 4 03:48:56 2011
@@ -32,6 +32,11 @@ public abstract class AbstractDaemonClie
this.process = process;
}
+ /**
+ * Gets if the client is connected to the Daemon <br/>
+ *
+ * @return true if connected.
+ */
public boolean isConnected() {
return connected;
}
@@ -40,8 +45,17 @@ public abstract class AbstractDaemonClie
this.connected = connected;
}
+ /**
+ * Create an RPC proxy to the daemon <br/>
+ *
+ * @throws IOException
+ */
public abstract void connect() throws IOException;
+ /**
+ * Disconnect the underlying RPC proxy to the daemon.<br/>
+ * @throws IOException
+ */
public abstract void disconnect() throws IOException;
/**
@@ -51,51 +65,113 @@ public abstract class AbstractDaemonClie
*/
protected abstract PROXY getProxy();
+ /**
+ * Gets the daemon level configuration.<br/>
+ *
+ * @return configuration using which daemon is running
+ */
public Configuration getConf() {
return conf;
}
+ /**
+ * Gets the host on which Daemon is currently running. <br/>
+ *
+ * @return hostname
+ */
public String getHostName() {
return process.getHostName();
}
+ /**
+ * Gets if the Daemon is ready to accept RPC connections. <br/>
+ *
+ * @return true if daemon is ready.
+ * @throws IOException
+ */
public boolean isReady() throws IOException {
return getProxy().isReady();
}
+ /**
+ * Kills the Daemon process <br/>
+ * @throws IOException
+ */
public void kill() throws IOException {
process.kill();
}
+ /**
+ * Checks if the Daemon process is alive or not <br/>
+ *
+ * @throws IOException
+ */
public void ping() throws IOException {
getProxy().ping();
}
+ /**
+ * Start up the Daemon process. <br/>
+ * @throws IOException
+ */
public void start() throws IOException {
process.start();
}
+ /**
+ * Get system level view of the Daemon process.
+ *
+ * @return returns system level view of the Daemon process.
+ *
+ * @throws IOException
+ */
public ProcessInfo getProcessInfo() throws IOException {
return getProxy().getProcessInfo();
}
- public void enable(List<Enum<?>> faults) throws IOException {
- getProxy().enable(faults);
- }
-
- public void disableAll() throws IOException {
- getProxy().disableAll();
- }
-
+ /**
+ * Return a file status object that represents the path.
+ * @param path
+ * given path
+ * @param local
+ * whether the path is local or not
+ * @return a FileStatus object
+ * @throws FileNotFoundException when the path does not exist;
+ * IOException see specific implementation
+ */
public FileStatus getFileStatus(String path, boolean local) throws
IOException {
return getProxy().getFileStatus(path, local);
}
+ /**
+ * List the statuses of the files/directories in the given path if the path
is
+ * a directory.
+ *
+ * @param path
+ * given path
+ * @param local
+ * whether the path is local or not
+ * @return the statuses of the files/directories in the given patch
+ * @throws IOException
+ */
public FileStatus[] listStatus(String path, boolean local)
throws IOException {
return getProxy().listStatus(path, local);
}
+ /**
+ * List the statuses of the files/directories in the given path if the path
is
+ * a directory recursive/nonrecursively depending on parameters
+ *
+ * @param path
+ * given path
+ * @param local
+ * whether the path is local or not
+ * @param recursive
+ * whether to recursively get the status
+ * @return the statuses of the files/directories in the given patch
+ * @throws IOException
+ */
public FileStatus[] listStatus(String f, boolean local, boolean recursive)
throws IOException {
List<FileStatus> status = new ArrayList<FileStatus>();
@@ -118,4 +194,5 @@ public abstract class AbstractDaemonClie
}
}
}
+
}
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractMasterSlaveCluster.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractMasterSlaveCluster.java?rev=1077177&r1=1077176&r2=1077177&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractMasterSlaveCluster.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractMasterSlaveCluster.java
Fri Mar 4 03:48:56 2011
@@ -4,7 +4,6 @@ import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
@@ -236,48 +235,6 @@ public abstract class AbstractMasterSlav
}
/**
- * Enable/Inject the faults. In case fault can't be enabled on ALL nodes
- * cluster is restarted.
- */
- public void enable(List<Enum<?>> faults) throws IOException {
- try {
- enableFaults(faults);
- } catch (IOException e) {
- stop();
- start();
- enableFaults(faults);
- }
- }
-
- /**
- * Disable/Remove the all the faults. In case fault can't be disabled on ALL
- * nodes cluster is restarted.
- */
- public void disableAllFaults() throws IOException {
- try {
- disableFaults();
- } catch (IOException e) {
- stop();
- start();
- disableFaults();
- }
- }
-
- private void enableFaults(List<Enum<?>> faults) throws IOException {
- master.enable(faults);
- for (SLAVE slave : slaves.values()) {
- slave.enable(faults);
- }
- }
-
- private void disableFaults() throws IOException {
- master.disableAll();
- for (SLAVE slave : slaves.values()) {
- slave.disableAll();
- }
- }
-
- /**
* Ping all the daemons of the cluster.
* @throws IOException
*/
@@ -302,6 +259,7 @@ public abstract class AbstractMasterSlav
}
connect();
ping();
+ clearAllControlActions();
ensureClean();
}
@@ -313,11 +271,22 @@ public abstract class AbstractMasterSlav
}
/**
+ * Clears all the pending control actions in the cluster.<br/>
+ * @throws IOException
+ */
+ public void clearAllControlActions() throws IOException {
+ master.getProxy().clearActions();
+ for (SLAVE slave : getSlaves().values()) {
+ slave.getProxy().clearActions();
+ }
+ }
+ /**
* Ensure that cluster is clean. Disconnect from the RPC ports of the
daemons.
* @throws IOException
*/
public void tearDown() throws IOException {
ensureClean();
+ clearAllControlActions();
disconnect();
}
}
Added:
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/ControlAction.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/ControlAction.java?rev=1077177&view=auto
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/ControlAction.java
(added)
+++
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/ControlAction.java
Fri Mar 4 03:48:56 2011
@@ -0,0 +1,68 @@
+package org.apache.hadoop.test.system;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Class to represent a control action which can be performed on Daemon.<br/>
+ *
+ */
+
+public abstract class ControlAction<T extends Writable> implements Writable {
+
+ private T target;
+
+ /**
+ * Default constructor of the Control Action, sets the Action type to zero.
<br/>
+ */
+ public ControlAction() {
+ }
+
+ /**
+ * Constructor which sets the type of the Control action to a specific type.
<br/>
+ *
+ * @param type
+ * of the control action.
+ */
+ public ControlAction(T target) {
+ this.target = target;
+ }
+
+ /**
+ * Gets the id of the control action <br/>
+ *
+ * @return target of action
+ */
+ public T getTarget() {
+ return target;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ target.readFields(in);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ target.write(out);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof ControlAction) {
+ ControlAction<T> other = (ControlAction<T>) obj;
+ return (this.target.equals(other.getTarget()));
+ } else {
+ return false;
+ }
+ }
+
+
+ @Override
+ public String toString() {
+ return "Action Target : " + this.target;
+ }
+}
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/DaemonProtocol.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/DaemonProtocol.java?rev=1077177&r1=1077176&r2=1077177&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/DaemonProtocol.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/DaemonProtocol.java
Fri Mar 4 03:48:56 2011
@@ -1,11 +1,11 @@
package org.apache.hadoop.test.system;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.VersionedProtocol;
/**
@@ -45,24 +45,7 @@ public interface DaemonProtocol extends
* @throws IOException
*/
ProcessInfo getProcessInfo() throws IOException;
-
- /**
- * Enable the set of specified faults in the Daemon.<br/>
- *
- * @param faults
- * list of faults to be enabled.
- *
- * @throws IOException
- */
- void enable(List<Enum<?>> faults) throws IOException;
-
- /**
- * Disable all the faults which are enabled in the Daemon. <br/>
- *
- * @throws IOException
- */
- void disableAll() throws IOException;
-
+
/**
* Return a file status object that represents the path.
* @param path
@@ -87,4 +70,56 @@ public interface DaemonProtocol extends
* @throws IOException
*/
FileStatus[] listStatus(String path, boolean local) throws IOException;
+
+ /**
+ * Enables a particular control action to be performed on the Daemon <br/>
+ *
+ * @param control action to be enabled.
+ *
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked")
+ void sendAction(ControlAction action) throws IOException;
+
+ /**
+ * Checks if the particular control action has be delivered to the Daemon
+ * component <br/>
+ *
+ * @param action to be checked.
+ *
+ * @return true if action is still in waiting queue of
+ * actions to be delivered.
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked")
+ boolean isActionPending(ControlAction action) throws IOException;
+
+ /**
+ * Removes a particular control action from the list of the actions which the
+ * daemon maintains. <br/>
+ * <i><b>Not to be directly called by Test Case or clients.</b></i>
+ * @param action to be removed
+ * @throws IOException
+ */
+
+ @SuppressWarnings("unchecked")
+ void removeAction(ControlAction action) throws IOException;
+
+ /**
+ * Clears out the list of control actions on the particular daemon.
+ * <br/>
+ * @throws IOException
+ */
+ void clearActions() throws IOException;
+
+ /**
+ * Gets a list of pending actions which are targeted on the specified key.
+ * <br/>
+ * <i><b>Not to be directly used by clients</b></i>
+ * @param key target
+ * @return list of actions.
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked")
+ ControlAction[] getActions(Writable key) throws IOException;
}
\ No newline at end of file