Author: mukund
Date: Thu Jun 5 15:08:18 2008
New Revision: 663762
URL: http://svn.apache.org/viewvc?rev=663762&view=rev
Log:
HADOOP-3483. Modified HOD to create a cluster directory if one does not exist
and to auto-deallocate a cluster while reallocating it, if it is already dead.
(Hemanth Yamijala via mukund)
Modified:
hadoop/core/trunk/src/contrib/hod/CHANGES.txt
hadoop/core/trunk/src/contrib/hod/hodlib/Hod/hadoop.py
hadoop/core/trunk/src/contrib/hod/hodlib/Hod/hod.py
hadoop/core/trunk/src/contrib/hod/hodlib/Hod/nodePool.py
hadoop/core/trunk/src/contrib/hod/hodlib/NodePools/torque.py
hadoop/core/trunk/src/contrib/hod/testing/lib.py
Modified: hadoop/core/trunk/src/contrib/hod/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/CHANGES.txt?rev=663762&r1=663761&r2=663762&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/CHANGES.txt (original)
+++ hadoop/core/trunk/src/contrib/hod/CHANGES.txt Thu Jun 5 15:08:18 2008
@@ -5,6 +5,10 @@
INCOMPATIBLE CHANGES
+ HADOOP-3483. Modified HOD to create a cluster directory if one does not
+ exist and to auto-deallocate a cluster while reallocating it, if it is
+ already dead. (Hemanth Yamijala via mukund)
+
NEW FEATURES
IMPROVEMENTS
Modified: hadoop/core/trunk/src/contrib/hod/hodlib/Hod/hadoop.py
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/hodlib/Hod/hadoop.py?rev=663762&r1=663761&r2=663762&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/hodlib/Hod/hadoop.py (original)
+++ hadoop/core/trunk/src/contrib/hod/hodlib/Hod/hadoop.py Thu Jun 5 15:08:18
2008
@@ -426,7 +426,14 @@
status = 15
return status
-
+
+ def is_cluster_deallocated(self, jobId):
+ """Returns True if the JobId that represents this cluster
+ is in the Completed or exiting state."""
+ jobInfo = self.__nodePool.getJobInfo(jobId)
+ state = jobInfo['job_state']
+ return ((state == 'C') or (state == 'E'))
+
def cleanup(self):
if self.__nodePool: self.__nodePool.finalize()
Modified: hadoop/core/trunk/src/contrib/hod/hodlib/Hod/hod.py
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/hodlib/Hod/hod.py?rev=663762&r1=663761&r2=663762&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/hodlib/Hod/hod.py (original)
+++ hadoop/core/trunk/src/contrib/hod/hodlib/Hod/hod.py Thu Jun 5 15:08:18 2008
@@ -240,10 +240,12 @@
clusterDir = self.__norm_cluster_dir(args[1])
if not os.path.exists(clusterDir):
- errorFlag = True
- errorMsgs.append( \
- "Invalid cluster directory (--hod.clusterdir or -d) : " + \
- clusterDir + " : No such directory")
+ try:
+ os.makedirs(clusterDir)
+ except OSError, err:
+ errorFlag = True
+ errorMsgs.append("Could not create cluster directory. %s" \
+ % (str(err)))
elif not os.path.isdir(clusterDir):
errorFlag = True
errorMsgs.append( \
@@ -269,9 +271,20 @@
clusterList = self.__userState.read(CLUSTER_DATA_FILE)
if clusterDir in clusterList.keys():
- self.__log.critical("Found a previously allocated cluster at cluster
directory '%s'. Deallocate the cluster first." % (clusterDir))
- self.__opCode = 12
- return
+ self.__setup_cluster_state(clusterDir)
+ clusterInfo = self.__clusterState.read()
+ # Check if the job is not running. Only then can we safely
+ # allocate another cluster. Otherwise the user would need
+ # to deallocate and free up resources himself.
+ if clusterInfo.has_key('jobid') and \
+ self.__cluster.is_cluster_deallocated(clusterInfo['jobid']):
+ self.__log.warn("Found a dead cluster at cluster directory '%s'.
Deallocating it to allocate a new one." % (clusterDir))
+ self.__remove_cluster(clusterDir)
+ self.__clusterState.clear()
+ else:
+ self.__log.critical("Found a previously allocated cluster at cluster
directory '%s'. Deallocate the cluster first." % (clusterDir))
+ self.__opCode = 12
+ return
self.__setup_cluster_logger(clusterDir)
if re.match('\d+-\d+', nodes):
Modified: hadoop/core/trunk/src/contrib/hod/hodlib/Hod/nodePool.py
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/hodlib/Hod/nodePool.py?rev=663762&r1=663761&r2=663762&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/hodlib/Hod/nodePool.py (original)
+++ hadoop/core/trunk/src/contrib/hod/hodlib/Hod/nodePool.py Thu Jun 5
15:08:18 2008
@@ -101,7 +101,7 @@
def getServiceId(self):
raise NotImplementedError
- def getJobInfo(self):
+ def getJobInfo(self, jobId=None):
raise NotImplementedError
def deleteJob(self, jobId):
Modified: hadoop/core/trunk/src/contrib/hod/hodlib/NodePools/torque.py
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/hodlib/NodePools/torque.py?rev=663762&r1=663761&r2=663762&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/hodlib/NodePools/torque.py (original)
+++ hadoop/core/trunk/src/contrib/hod/hodlib/NodePools/torque.py Thu Jun 5
15:08:18 2008
@@ -268,15 +268,18 @@
return id
- def getJobInfo(self):
+ def getJobInfo(self, jobId=None):
#torque error code when credentials fail, a temporary condition sometimes.
credFailureErrorCode = 171
credFailureRetries = 10
i = 0
self.__jobInfo = None
+ if jobId == None:
+ jobId = self.getServiceId()
+
while i < credFailureRetries:
- qstatInfo, exitCode = self.__torque.qstat(self.getServiceId())
+ qstatInfo, exitCode = self.__torque.qstat(jobId)
if exitCode == 0:
self.__jobInfo = qstatInfo
break
Modified: hadoop/core/trunk/src/contrib/hod/testing/lib.py
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hod/testing/lib.py?rev=663762&r1=663761&r2=663762&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hod/testing/lib.py (original)
+++ hadoop/core/trunk/src/contrib/hod/testing/lib.py Thu Jun 5 15:08:18 2008
@@ -98,7 +98,10 @@
def delete_job(self, jobid):
self.__operations['delete_job'] = [jobid]
-
+
+ def is_cluster_deallocated(self, dummy):
+ return False
+
def wasOperationPerformed(self, operation, args):
if self.__operations.has_key(operation):
actualArgs = self.__operations[operation]