Dear Michael,
I think that the port number is hardwired in CCP4i2 on line 439 of
$CCP4/share/ccp4i2/core/CCP4JobServer.py
transport = paramiko.Transport((sP.machine, 22))
also there are various calls to the "paramiko" library which handles ssh,
which do not pass a port number and so default to 22.
This is all bad.
I have attached a replacement for this file. Could you try making a backup
of the above file and copying the attached file in its place.
I have put a constant for the port number in the line
PARAMIKO_PORT=22
near the top of the file. Then change this port number for what is suitable
for you.
If this works, then I will make the port number an option in the program.
If not, then I will try something else.
Best wishes,
Stuart McNicholas
On Mon, 8 Jun 2020 at 17:40, Michael Weyand <[email protected]>
wrote:
> Dear CCP4I2 experts,
>
> I'm trying to submit remote jobs via SSH within CCP4i2. Unfortunately,
> we use a non standard SSH port.
> So far, I'm not able to add any option within the CCP4I2 interface. For
> job submission, I need something to do like
>
> 'ssh -p XXXXX -Y ..... ccp4i-specific-command ...'.
>
> Is there any chance to enter this '-p' SSH option within CCP4i2? I tried
> already to define a "ssh command" via "Preferences".
> But so far without any success.
>
> I'm also wondering why there is a sshkey file option. If an user already
> uses a ssh-key for a client/server connection, why is necessary to
> re-input the key file again?
> Any ssh between client and server should work from scratch ...
>
> Any hints are highly appreciated,
> Michael
>
> ########################################################################
>
> To unsubscribe from the CCP4BB list, click the following link:
> https://www.jiscmail.ac.uk/cgi-bin/webadmin?SUBED1=CCP4BB&A=1
>
> This message was issued to members of www.jiscmail.ac.uk/CCP4BB, a
> mailing list hosted by www.jiscmail.ac.uk, terms & conditions are
> available at https://www.jiscmail.ac.uk/policyandsecurity/
>
########################################################################
To unsubscribe from the CCP4BB list, click the following link:
https://www.jiscmail.ac.uk/cgi-bin/webadmin?SUBED1=CCP4BB&A=1
This message was issued to members of www.jiscmail.ac.uk/CCP4BB, a mailing list
hosted by www.jiscmail.ac.uk, terms & conditions are available at
https://www.jiscmail.ac.uk/policyandsecurity/
from __future__ import print_function
"""
CCP4JobServer.py: CCP4 GUI Project
Copyright (C) 2016 STFC
This library is free software: you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public License
version 3, modified in accordance with the provisions of the
license to address the requirements of UK law.
You should have received a copy of the modified GNU Lesser General
Public License along with this library. If not, copies may be
downloaded from http://www.ccp4.ac.uk/ccp4license.php
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
"""
"""
Liz Potterton Apr 2016 - Separate 'remote' server code out from CCP4JobController
"""
import os
import re
from PyQt4 import QtCore
from core.CCP4ErrorHandling import *
PARAMIKO_PORT=22
class CServerParams:
SAVELIST = ['machine', 'username', 'ccp4Dir', 'tempDir', 'mechanism', 'keyFilename', 'serverProcessId',
'remotePath', 'validate', 'customCodeFile', 'queueOptionsFile', 'sge_root', 'serverGroup']
def __init__(self, **kw):
self.jobId = kw.get('jobId', None)
self.password = kw.get('password', None)
self.jobNumber = None
self.projectId = None
self.projectName = None
self.projectDirectory = None
# pollFinish flags: 0=no poll (eg ssh expecting a signal) 1=poll for FINISHED file 2=poll qsub
# 3=custom poll (started in current i2 session) 4 = custom poll (from previous i2 session)
self.pollFinish = 0
self.pollReport = 0
for item in CServerParams.SAVELIST:
setattr(self, item, kw.get(item, None))
self.serverProcessId = None
def __str__(self):
tx = ''
for item in CServerParams.SAVELIST + ['jobNumber', 'projectName', 'projectDirectory', 'pollFinish', 'pollReport']:
tx = tx + item + ': ' + str(getattr(self, item)) + '\n'
return tx
@property
def remote_finished_tarball(self):
return self.remotePath + 'finished.ccp4db.zip'
@property
def remote_report_file(self):
if self.mechanism in ['ssh_shared', 'qsub_local', 'qsub_shared']:
return None
else:
if self.remotePath is None:
return 'work/project/CCP4_JOBS/job_' + self.jobNumber + '/program.xml'
else:
return self.remotePath + 'work/project/CCP4_JOBS/job_' + self.jobNumber + '/program.xml'
@property
def local_tarball(self):
return os.path.join(self.projectDirectory, 'CCP4_TMP', 'job_' + self.jobNumber + '_setup.ccp4db.zip')
@property
def local_report_file(self):
return os.path.join(self.projectDirectory, 'CCP4_JOBS', 'job_' + self.jobNumber, 'program.xml')
@property
def local_finished_tarball(self):
return os.path.join(self.projectDirectory, 'CCP4_TMP', 'job_' + self.jobNumber + '_finished.ccp4db.zip')
@property
def dbXml(self):
if self.mechanism not in ['ssh_shared', 'test', 'qsub_local', 'qsub_shared']:
return os.path.join(self.projectDirectory, 'CCP4_TMP', 'DATABASE_job_' + self.jobNumber + '.db.xml')
else:
return os.path.join(self.projectDirectory, 'CCP4_JOBS', 'job_' + self.jobNumber, 'DATABASE.db.xml' )
@property
def finishedFile(self):
if self.mechanism in ['ssh_shared', 'qsub_shared', 'qsub_local']:
return os.path.join(self.projectDirectory, 'CCP4_JOBS', 'job_' + self.jobNumber, 'FINISHED')
else:
return self.remotePath[0:-1]+'.FINISHED'
'''
@property
def jobNumber(self):
"""Get the jobNumber"""
print 'jobNumber',self.jobId
if self._jobNumber is None:
jobInfo = self.db.getJobInfo(jobId=self.jobId,mode=['jobnumber','projectid'])
self._jobNumber = jobInfo['jobnumber']
self._projectId = jobInfo['projectid']
return self._jobNumber
@property
def projectName(self):
"""Get the projectName"""
if self._projectName is None:
if self._projectId is None: self.jobNumber()
projectInfo = self.db.getProjectInfo(projectId=self._projectId)
self._projectName = projectInfo['projectname']
self._projectDirectory = projectInfo['projectdirectory']
return self._projectName
@property
def projectDirectory(self):
"""Get the projectDirectory"""
print 'projectDirectory',self._projectId
if self._projectDirectory is None:
if self._projectId is None: self.jobNumber()
projectInfo = self.db.getProjectInfo(projectId=self._projectId)
self._projectName = projectInfo['projectname']
self._projectDirectory = projectInfo['projectdirectory']
return self._projectDirectory
'''
def getEtree(self):
from lxml import etree
ele = etree.Element('serverParams')
ele.set('jobId', str(self.jobId))
for key in CServerParams.SAVELIST:
if getattr(self, key, None) is not None:
e = etree.Element(key)
e.text = str(getattr(self,key))
ele.append(e)
return ele
def setEtree(self,ele):
self.jobId = ele.get('jobId')
for e in ele.iterchildren():
setattr(self, str(e.tag), str(e.text))
self.setDbParams()
def setDbParams(self):
from core import CCP4Modules
jobInfo = CCP4Modules.PROJECTSMANAGER().db().getJobInfo(self.jobId, ['jobnumber', 'projectid', 'projectname'])
self.jobNumber = jobInfo['jobnumber']
self.projectId = jobInfo['projectid']
self.projectName = jobInfo['projectname']
self.projectDirectory = CCP4Modules.PROJECTSMANAGER().db().getProjectInfo(self.projectId, 'projectdirectory')
class CJobServer:
DB_KEYS = ['jobId', 'machine', 'username', 'mechanism', 'remotePath', 'customCodeFile', 'validate',
'keyFilename', 'serverProcessId', 'serverGroup']
ERROR_CODES = {301 : {'description' : 'Failed opening SSH connection'},
302 : {'description' : 'Failed opening SSH connection'},
303 : {'description' : 'Failed opening SSH connection'},
304 : {'description' : 'Failed opening SSH connection'},
320 : {'description' : 'Failed submitting job'},
330 : {'description' : 'Failed to open connection to copy files'},
331 : {'description' : 'Failed copying file to server'},
332 : {'description' : 'Failed copying file from server'},
340 : {'description' : 'Failed killing remote job - can not recover job id'},
350 : {'description' : 'Failed running job startup script on server'},
360 : {'description' : 'Failed to kill job - job id unknown'},
361 : {'description' : 'Failed to kill job - possible communication fail'},
362 : {'description' : 'Failed to find job status'},
363 : {'description' : 'Failed to recover remote process id'},
364 : {'description' : 'Failed to recover remote process status'},
365 : {'description' : 'Failed killing remote process - no information on the remote process'}}
def __init__(self):
from core import CCP4Utils
import paramiko
self._diagnostic = False
self._serverParams = {}
self.runInSSHThreads = []
# Lists of outstanding jobs when i2 restarts - check if they are finished and
# then poll in CJobController.doChecks()
paramiko.util.log_to_file(os.path.join(CCP4Utils.getDotDirectory(), 'status', 'paramiko.log'))
def restoreRunningJobs(self):
#self.loadParams()
self.restoreFromDb()
def getJobsToPollFinish(self, pollFinish=[]):
ret = []
for jobId,sP in list(self._serverParams.items()):
if sP.pollFinish in pollFinish:
ret.append(jobId)
return ret
def getJobsToPollReport(self):
jobIdList = []
for jobId,sP in list(self._serverParams.items()):
if sP.pollReport > 0:
jobIdList.append(jobId)
return jobIdList
def Exit(self):
if len(self._serverParams) > 0:
self.saveParams()
elif os.path.exists(self.defaultParamsFileName()):
os.remove(self.defaultParamsFileName())
def serverParams(self, jobId):
try:
return self._serverParams[jobId]
except:
return None
def saveParams(self, fileName=None):
from lxml import etree
from core import CCP4File
if fileName is None:
fileName = self.defaultParamsFileName()
fileObj = CCP4File.CI2XmlDataFile(fileName)
fileObj.header.setCurrent()
fileObj.header.function = 'JOBSERVERSTATUS'
bodyEtree = etree.Element('serverParamsList')
for jobId, sP in list(self._serverParams.items()):
bodyEtree.append(sP.getEtree())
fileObj.saveFile(bodyEtree)
def defaultParamsFileName(self):
from core import CCP4Utils
return os.path.join(CCP4Utils.getDotDirectory(), 'status', 'jobServer.params.xml')
def restoreFromDb(self):
from core import CCP4Modules
jobInfoList = CCP4Modules.PROJECTSMANAGER().db().getServerJobs()
#print 'restoreFromDb',jobInfoList
for jobInfo in jobInfoList:
sP = self._serverParams[jobInfo[0]] = CServerParams()
ii = 0
for key in self.DB_KEYS:
setattr(sP, key, jobInfo[ii])
ii += 1
sP.setDbParams()
self.setPollStatus(sP)
def setPollStatus(self, sP):
if sP.mechanism in ['qsub']:
sP.pollFinish = 2
elif sP.mechanism in ['qsub_remote', 'ssh', 'ssh_shared', 'slurm_remote']:
if sP.validate in ['password', 'pass_key_filename']:
sP.pollFinish = -1
else:
sP.pollFinish = 1
elif sP.mechanism in ['custom']:
sP.pollFinish = 4
# And set to poll for report update
if sP.remote_report_file is not None:
sP.pollReport = 1
def loadParams(self, fileName=None):
from core import CCP4File
if fileName is None:
fileName = self.defaultParamsFileName()
if not os.path.exists(fileName):
return
fileObj = CCP4File.CI2XmlDataFile(fileName)
body = fileObj.getBodyEtree()
#print 'CJobServer.loadParams body',body.tag
# pollFinish flags: 0=no poll (eg ssh expecting a signal) 1=poll for FINISHED file 2=poll qsub
# 3=custom poll (started in current i2 session) 4 = custom poll (from previous i2 session)
for ele in body.iterchildren():
jobId = ele.get('jobId')
if len(ele) > 0:
sP = self._serverParams[jobId] = CServerParams()
sP.setEtree(ele)
sP.setDbParams()
self.setPollStatus(sP)
def createServerParams(self, jobId, params):
from core import CCP4Utils
self._serverParams[jobId] = params
self._serverParams[jobId].jobId = jobId
jobInfo = self.db.getJobInfo(jobId=jobId, mode=['jobnumber', 'projectid'])
self._serverParams[jobId].jobNumber = jobInfo['jobnumber']
self._serverParams[jobId].projectId = jobInfo['projectid']
projectInfo = self.db.getProjectInfo(projectId=self._serverParams[jobId].projectId)
self._serverParams[jobId].projectName = CCP4Utils.safeOneWord(projectInfo['projectname'])
self._serverParams[jobId].projectDirectory = projectInfo['projectdirectory']
#print 'setServerParams',jobId, self._serverParams[jobId]
self.db.createServerJob(jobId, self._serverParams[jobId])
def setServerParam(self, jobId, key, value):
#Trying to set the keys below is bad on Python3 and I guess not neccessary as surely property method should override?
if jobId not in self._serverParams or key in ("remote_finished_tarball","remote_report_file","local_tarball","local_report_file","local_finished_tarball","dbXml","finishedFile"):
pass
else:
setattr(self._serverParams[jobId] ,key, value)
if key in self.DB_KEYS:
self.db.updateServerJob(jobId, key, value)
def getServerParam(self, jobId, key):
from core import CCP4Modules
if jobId not in self._serverParams:
return None
rv = getattr(self._serverParams[jobId], key, None)
if rv is not None:
return rv
if key in ['ccp4Dir']:
#Try recovering info from the server setup data
#This is in a def file (.CCP4I2/configs/serverSetup.def.xml) and loaded into
# a container with each serverGroup labelled SERVERGROUPn
serverGroup = getattr(self._serverParams[jobId], 'serverGroup', None)
if serverGroup is None:
return None
if getattr(self, 'setupContainer', None) is None:
self.setupContainer = CCP4Modules.SERVERSETUP()
self.setupContainer.load()
for dataObjName in self.setupContainer.dataOrder():
if self.setupContainer.get(dataObjName).name == serverGroup:
value = self.setupContainer.get(dataObjName).get(key)
#print 'getServerParam from SERVERSETUP',key,dataObjName,value
setattr(self._serverParams[jobId], key, value)
return value
return None
def deleteServerParams(self, jobId):
try:
self.db.deleteServerJob(jobId)
except Exception as e:
print('ERROR removing server job record from database\n',e)
if jobId in self._serverParams:
del self._serverParams[jobId]
else:
return
def deleteServerParam(self, jobId, key):
try:
delattr(self._serverParams[jobId], key)
except:
pass
def patchRemotePasswords(self, jobId,password):
sP = self.serverParams(jobId)
count = 0
for jI in list(self._serverParams.keys()):
if jI == jobId or (self._serverParams[jI].pollFinish < 0 and \
self._serverParams[jI].machine == sP.machine and self._serverParams[jI].username == sP.username):
count += 1
self._serverParams[jI].password = password
self._serverParams[jI].pollFinish = 1
self._serverParams[jI].pollReport = 1
return count
def checkForRemotePasswords(self, projectId):
requirePass = []
for jobId in self.getJobsToPollFinish([-1, -2, -3, -4]):
sP = self.serverParams(jobId)
if sP.pollFinish < 0 and sP.projectId == projectId:
requirePass.append({'jobId' : jobId , 'machine' : sP.machine, 'username' : sP.username})
return requirePass
def getPKey(self,jobId):
import paramiko
from core import CCP4Utils
sP = self.serverParams(jobId)
keyFilename = re.sub(r'\$HOME', CCP4Utils.getHOME(), sP.keyFilename)
if sP.validate == 'pass_key_filename':
pkey = paramiko.RSAKey.from_private_key_file(keyFilename, password=sP.password)
else:
pkey = paramiko.RSAKey.from_private_key_file(keyFilename)
#print 'CJobServer.getPKey', keyFilename
return pkey
def pollQsubStat(self, mode='finished'):
import subprocess
procCheck = subprocess.Popen(["qstat", "-xml"], stdout=subprocess.PIPE, universal_newlines=True, stderr=subprocess.STDOUT)
outCheck,errCheck = procCheck.communicate()
if self._diagnostic:
print('pollQsubStat', outCheck, errCheck)
if mode == 'finished':
qsubIdDict= self.parseQsubStat(outCheck)
for jobId in self.getJobsToPollFinish([2]):
if self.serverParams(jobId).serverProcessId not in qsubIdDict:
self.handleFinishedServerJob(jobId)
elif mode == 'status':
return self.parseQsubStat(outCheck, mode)
def handleFinishedServerJob(self, jobId):
# Should be reimplemented in application
# Do whatever necessary to retrieve and use job
# Remember to delete from self.serverParams
self.setServerParam(jobId, 'pollFinish', 0)
self.setServerParam(jobId, 'pollReport', 0)
def parseQsubStat(self, returnString, mode='finished'):
from lxml import etree
parser = etree.XMLParser()
tree = etree.fromstring(returnString, parser)
if mode == 'finished':
qsubIdDict = {}
jobs = tree.xpath("job_info|queue_info")
for job in jobs:
for jobl in job.xpath("job_list"):
qsubIdDict[str(jobl.xpath("JB_job_number")[0].text)] = str(jobl.attrib["state"])
#print 'parseQsubStat',qsubIdDict
return qsubIdDict
elif mode == 'status':
return {}
def pidFile(self, jobId, local=False):
sP = self.serverParams(jobId)
if local or sP.mechanism == 'ssh_shared':
return os.path.join(sP.projectDirectory, 'CCP4_JOBS', 'job_' + sP.jobNumber, 'pidfile.txt')
elif sP.mechanism == 'ssh':
return sP.remotePath + 'pidfile.txt'
else:
return None
def transportFiles(self, jobId, copyList=[], mode='put', finishHandler=None, failSignal=True, diagnostic=True):
import functools
import UtilityThread
sP = self.serverParams(jobId)
#print 'transport files',mode,copyList
if getattr(sP, 'sshThreadTransport', None) is not None:
return False
if finishHandler is None:
finishHandler = self._transportFilesFinished
sP.sshThreadTransport = UtilityThread.UtilityThread(functools.partial(self._transportFiles, jobId, copyList, mode, failSignal, diagnostic))
sP.sshThreadTransport.finished.connect(functools.partial(finishHandler, jobId))
self.runInSSHThreads.append(sP.sshThreadTransport)
sP.sshThreadTransport.start()
return True
def _transportFiles(self, jobId, copyList=[], mode='put', failSignal=True, diagnostic=True):
import paramiko
sP = self.serverParams(jobId)
try:
transport = paramiko.Transport((sP.machine, PARAMIKO_PORT))
if sP.validate in ['key_filename', 'pass_key_filename'] and sP.keyFilename is not None and len(sP.keyFilename) > 0:
transport.connect(username=sP.username, password=sP.password, pkey=self.getPKey(jobId))
else:
transport.connect(username=sP.username, password=sP.password)
sftp = paramiko.SFTPClient.from_transport(transport)
except Exception as e:
print('ERROR setting up paramiko FTP file transfer')
print(e)
err = CException(self.__class__, 330, 'Machine:' + str(self.getServerParam(jobId, 'machine')) + '\n' + str(e))
if failSignal:
self.emit(QtCore.SIGNAL('failedOpenConnection'), jobId, err)
raise err
err = CException()
for localName,remoteName in copyList:
try:
if mode == 'put':
sftp.put(localName, remoteName)
if diagnostic:
print("Copied", localName, 'to', remoteName)
else:
sftp.get(remoteName, localName )
if diagnostic:
print("Copied", remoteName, 'to', localName)
except Exception as e:
if mode == 'put':
if diagnostic:
print('ERROR copying files', str(localName), 'to', str(remoteName))
err.append(self.__class__, 331, 'Local:' + str(localName) + ' Remote:' + str(remoteName) + '\n' + str(e))
else:
if diagnostic:
print('ERROR copying files',str(localName),'from',str(remoteName))
err.append(self.__class__, 332, 'Local:' + str(localName) + ' Remote:' + str(remoteName) + '\n' + str(e))
sftp.close()
transport.close()
if len(err) > 0:
if failSignal:
self.emit(QtCore.SIGNAL('failedOpenConnection'), jobId, err)
#raise err
def _transportFilesFinished(self,jobId):
#print '_transportFilesFinished',jobId
self.deleteServerParam(jobId,'sshThreadTransport')
def openSSHConnection(self, jobId, machine, username, password, keyFilename=None, timeout=None, maxTries=2, emitFail=True):
import paramiko,socket
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
connected= False
ntries = 0
#print 'openSSHConnection',machine,username,keyFilename,timeout,maxTries
self.emit(QtCore.SIGNAL('testMessage'), 'Connecting to ' + machine + ' with timeout ' + str(timeout))
while ntries < maxTries and not connected:
failCode = 0
try:
if self._diagnostic:
print('TO client.connect', ntries)
if keyFilename is not None and keyFilename != 'None' and len(keyFilename) > 0:
client.connect(machine, port=PARAMIKO_PORT, username=username, password=password, key_filename=keyFilename, timeout=timeout)
else:
client.connect(machine, port=PARAMIKO_PORT, username=username, password=password, timeout=timeout)
#print 'DONE client.connect'
connected=True
except paramiko.ssh_exception.AuthenticationException:
failCode = 1
ntries = maxTries
mess = 'Authentication failed'
except paramiko.ssh_exception.BadAuthenticationType:
failCode = 2
mess = 'Authentication type not supported'
except socket.gaierror:
failCode = 3
mess = 'Socket error'
except Exception as e:
failCode = 4
mess = str(e)
print(e)
#print 'runOnServer try', ntries, 'failCode', failCode
if not connected:
self.emit(QtCore.SIGNAL('testMessage'), ' Try: ' + str(ntries) + ' failed: ' + str(mess))
else:
self.emit(QtCore.SIGNAL('testMessage'), ' Try: ' + str(ntries) + ' succeeded')
ntries = ntries + 1
if not connected:
print("Some error submitting job")
exc_type, exc_value, exc_tb = sys.exc_info()[:3]
print("Error running job using ssh" + str(exc_type) + "\n" + str(exc_value))
err = CException(self.__class__, 300 + failCode, 'Machine:' + machine + '\n' + str(exc_type) + "\n" + str(exc_value) + '\n')
if emitFail:
self.emit(QtCore.SIGNAL('failedOpenConnection'), jobId, err)
raise err
else:
return client
def testRemoteFiles(self, jobId=None, machine=None, username=None, password=None, keyFilename=None, remoteFileList=[], timeout=None, maxTries=2):
import UtilityThread
import functools
if self.getServerParam(jobId, 'sshThreadRemoteFiles') is not None:
return
sP = self.serverParams(jobId)
sP.sshThreadRemoteFiles = UtilityThread.UtilityThread(functools.partial(self._testRemoteFiles, jobId, machine, username, password,
keyFilename, remoteFileList, timeout, maxTries))
sP.sshThreadRemoteFiles.finished.connect(functools.partial(self._sendSSHFinished, jobId, None, 'sshThreadRemoteFiles'))
self.runInSSHThreads.append(sP.sshThreadRemoteFiles)
sP.sshThreadRemoteFiles.start()
def _testRemoteFiles(self, jobId=None, machine=None, username=None, password=None, keyFilename=None, remoteFileList=[], timeout=None, maxTries=2):
#print 'CJobServer._testRemoteFiles',jobId,machine,username,remoteFileList
ret = []
if isinstance(jobId, list):
jI = jobId[0]
else:
jI = jobId
try:
client = self.openSSHConnection(jobId=jI, machine=machine, username=username, password=password,
keyFilename=keyFilename, timeout=timeout, maxTries=maxTries)
except Exception as e:
print('testRemoteFiles fail\n',e)
for item in remoteFileList:
ret.append(False)
return ret
for remoteFile in remoteFileList:
stdin, stdout, stderr = client.exec_command('[ -e ' + remoteFile + ' ] && echo "Found" || echo "Lost"')
out = ""
err = ""
for line in stdout:
out += line
for line in stderr:
err += line
if self._diagnostic:
print('testRemoteFiles', remoteFile, out, '*', err)
if out.count('Found') > 0:
self.emit(QtCore.SIGNAL('testMessage'), ' File: ' + str(remoteFile) + ' exists')
else:
self.emit(QtCore.SIGNAL('testMessage'), ' File: ' + str(remoteFile) + ' not found')
ret.append(out.count('Found') > 0)
client.close()
self.deleteServerParam(jobId, 'sshThreadRemoteFiles')
self.emit(QtCore.SIGNAL('testRemoteFiles'), jobId, ret)
def runOnServer(self, jobId=None):
'''Run ssh or qsub with or without shared file system
reimplement in application'''
pass
def runInQsub(self, jobId, remoteSh, optionsFile=None, mechanism='qsub_local'):
import subprocess
path,name = os.path.split(remoteSh)
name = name[0:-9]
stdout = os.path.join(path, name + 'qsub_stdout.txt')
stderr = os.path.join(path, name + 'qsub_stderr.txt')
sP = self.serverParams(jobId)
comList = ["qsub", '-o', stdout, '-e', stderr, '-S', '/bin/sh', remoteSh]
if optionsFile is not None:
comList.insert(1, '@')
comList.insert(2, optionsFile)
if mechanism == 'qsub_local':
try:
proc = subprocess.Popen(comList, stdout=subprocess.PIPE, universal_newlines=True, stderr=subprocess.STDOUT)
out,err = proc.communicate()
if self._diagnostic:
print('qsub start', out, err)
except:
exc_type, exc_value,exc_tb = sys.exc_info()[:3]
print("Error running job using remote queue command 'qsub'\n\n" + str(exc_type) + "\n\n" + str(exc_value))
raise CException(self.__class__, 320)
else:
try:
self.setServerParam(jobId, 'serverProcessId', out[:out.find('(')].rstrip().split()[-1])
self.setServerParam(jobId, 'pollFinish',1)
self.setServerParam(jobId, 'pollReport',1)
except:
pass
else:
# Use ssh to submit qsub on a different machine
if sP.sge_root is not None:
comLine = 'SGE_ROOT=' + str(sP.sge_root) + ' ' + comList[0]
else:
comLine = comList[0]
for item in comList[1:]:
comLine = comLine + ' ' + item
self.sendSSHCommand(jobId, comLine=comLine, retHandler=self.handleRemoteQsubStart)
self.setServerParam(jobId, 'pollFinish', 1)
self.setServerParam(jobId, 'pollReport', 1)
def runInSlurm(self, jobId, remoteSh, optionsFile=None, mechanism='slurm_remote'):
import subprocess
path,name = os.path.split(remoteSh)
name = name[0:-9]
stdout = os.path.join(path, name + 'slurm_stdout.txt')
stderr = os.path.join(path, name + 'slurm_stderr.txt')
sP = self.serverParams(jobId)
comList = ["sbatch", '-o', stdout, '-e', stderr, remoteSh]
if optionsFile is not None:
comList.insert(1, '@')
comList.insert(2, optionsFile)
if mechanism == 'slurm_local':
try:
proc = subprocess.Popen(comList, stdout=subprocess.PIPE, universal_newlines=True, stderr=subprocess.STDOUT)
out,err = proc.communicate()
if self._diagnostic:
print('slurm start', out, err)
except:
exc_type, exc_value,exc_tb = sys.exc_info()[:3]
print("Error running job using command 'sbatch'\n\n" + str(exc_type) + "\n\n" + str(exc_value))
raise CException(self.__class__, 320)
else:
try:
match = re.search(r'[0-9]+',out)
pid = -999
if match:
pid = match.group(0)
self.setServerParam(jobId, 'serverProcessId', pid)
self.setServerParam(jobId, 'pollFinish',1)
self.setServerParam(jobId, 'pollReport',1)
except:
pass
else:
# Use ssh to submit to Slurm on a different machine
comLine = comList[0]
for item in comList[1:]:
comLine = comLine + ' ' + item
self.sendSSHCommand(jobId, comLine=comLine, retHandler=self.handleRemoteSbatchStart)
self.setServerParam(jobId, 'pollFinish', 1)
self.setServerParam(jobId, 'pollReport', 1)
def handleRemoteQsubStart(self, jobId, out, err):
self.setServerParam(jobId,'serverProcessId', out[:out.find('(')].rstrip().split()[-1])
if self._diagnostic:
print('handleRemoteQsubStart', self.serverParams(jobId).serverProcessId)
def handleRemoteSbatchStart(self, jobId, out, err):
# parse the output from the sbatch command
pid = -999
match = re.search(r'[0-9]+',out)
if match:
pid = match.group(0)
# TODO: add some error handling
self.setServerParam(jobId,'serverProcessId', pid)
if self._diagnostic:
print('handleRemoteSbatchStart', self.serverParams(jobId).serverProcessId)
def sendSSHCommand(self, jobId, comLine, finishHandler=None, retHandler=None, emitFail=True):
import functools
import UtilityThread
from core import CCP4Utils
if self._diagnostic:
print('Remote server command:', comLine)
sP = self.serverParams(jobId)
sP.sshThread = UtilityThread.UtilityThread(functools.partial(self._sendSSHCommand, jobId, comLine, emitFail, retHandler))
if finishHandler is not None:
sP.sshThread.finished.connect(functools.partial(finishHandler, jobId))
self.runInSSHThreads.append(sP.sshThread)
sP.sshThread.start()
#print 'runInSSH DONE'
def _sendSSHCommand(self, jobId, comLine, emitFail=True, retHandler=None):
from core import CCP4Utils
sP = self.serverParams(jobId)
if sP.keyFilename is not None and len(sP.keyFilename) > 0:
if sP.keyFilename.count('HOME'):
keyFilename = re.sub(r'\$HOME', CCP4Utils.getHOME(), sP.keyFilename)
else:
keyFilename = sP.keyFilename
else:
keyFilename = None
sP.sshClient = self.openSSHConnection(jobId, sP.machine, sP.username, sP.password, keyFilename, emitFail=emitFail)
stdin, stdout, stderr = sP.sshClient.exec_command(str(comLine))
out = ""
err = ""
for line in stdout:
out += line
for line in stderr:
err += line
if self._diagnostic:
print('Remote start stdout:', out)
print('Remote start stderr:', err)
if len(err) > 0:
exc = CException(self.__class__, 350, err)
if emitFail:
self.emit(QtCore.SIGNAL('failedRemoteCommand'), jobId, exc)
if retHandler:
try:
retHandler(jobId, out, err)
except Exception as e:
print('ERROR handling return output from SSH command')
print(e)
return out, err
def _sendSSHFinished(self, jobId, clientParam='sshClient', threadParam='sshThread'):
#print "Finished remote job",jobId,threadParam
#print '_runInSSHFinished',self.serverParams[jobId].mechanism
sP = self.serverParams(jobId)
if sP is None:
return
if clientParam is not None and hasattr(sP, clientParam):
getattr(sP, clientParam).close()
#getattr(sP, clientParam).__del__() - daft but how do we be show it is zapped?
if hasattr(sP, threadParam):
if getattr(sP, threadParam).retval is not None:
stdout, stderr = getattr(sP,threadParam).retval
if self._diagnostic:
print('Remote finish stdout:', stdout)
print('Remote finish stderr:', stderr)
else:
print('Failed analysing return from remote command')
if hasattr(sP, threadParam):
self.runInSSHThreads.remove(getattr(sP, threadParam))
#getattr(sP,threadParam).__del__() -- this is nonsense for a thread
def runInSSH(self, jobId, remoteSh):
comLine = 'sh ' + remoteSh
self.sendSSHCommand(jobId, comLine, finishHandler=self._sendSSHFinished, retHandler=self.handleRunInSSHDone)
#retHandler=self.handleRunInSSHDone)
self.setServerParam(jobId, 'pollReport', 1)
self.setServerParam(jobId, 'pollFinish', 1)
def _runInSSHFinished(self, jobId):
self._sendSSHFinished(jobId)
self.handleFinishedServerJob(jobId)
def handleRunInSSHDone(self, jobId, out, err):
if self._diagnostic:
print('handleRunInSSHDone', jobId, 'out', out, 'err', err)
for line in out.split('\n'):
if line.startswith('PID='):
self.setServerParam(jobId, 'serverProcessId', int(line[4:]))
break
def checkRemoteStatus(self, jobId):
sP = self.serverParams(jobId)
if sP is None:
return
if sP.mechanism in ['qsub_remote', 'qsub_shared']:
self.checkRemoteQsubStatus(jobId)
return True
elif sP.mechanism in ['slurm_remote']:
self.checkRemoteSlurmStatus(jobId)
return True
elif sP.mechanism in ['qsub_local']:
self.checkLocalQsubStatus(jobId)
return True
elif sP.mechanism in ['ssh', 'ssh_shared']:
self.checkSSHJobStatus(jobId)
return True
def checkSSHJobStatus(self,jobId):
sP = self.serverParams(jobId)
if sP.serverProcessId is None:
self.getPidFileContent(jobId)
else:
#comLine = 'pgrep -P '+str(sP.serverProcessId)+';ps -fu '+sP.username
comLine = 'ps -fu ' + sP.username
self.sendSSHCommand(jobId, comLine=comLine, retHandler=self.handleRemoteSSHStatus, emitFail=False)
def handleRemoteSSHStatus(self, jobId, out, err):
if self._diagnostic:
print('handleRemoteSSHStatus', jobId, out, err)
sP = self.serverParams(jobId)
self.emit(QtCore.SIGNAL('remoteJobMessage'), jobId, 'The process id for this job: ' + str(sP.serverProcessId), out, sP.machine)
def checkLocalQsubStatus(self, jobId):
try: # KJS : Problem here. This func looks broken.
proc = subprocess.Popen(comList='qstat', stdout=subprocess.PIPE, universal_newlines=True, stderr=subprocess.STDOUT)
out,err = proc.communicate()
if self._diagnostic:
print('checkLocalQsubStatus', out, err)
except:
exc_type, exc_value,exc_tb = sys.exc_info()[:3]
print("Error checking qsub status\n" + str(exc_type) + "\n" + str(exc_value))
raise CException(self.__class__, 362)
else:
self.handleRemoteQsubStatus(jobId, out, err)
def checkRemoteQsubStatus(self, jobId):
sP = self.serverParams(jobId)
if sP is None:
return False
comline = 'qstat'
if sP.sge_root is not None:
comLine = 'SGE_ROOT=' + str(sP.sge_root) + ' ' + comline
self.sendSSHCommand(jobId, comLine=comLine, retHandler=self.handleRemoteQsubStatus, emitFail=False)
def checkRemoteSlurmStatus(self, jobId):
sP = self.serverParams(jobId)
if sP is None:
return False
comline = 'squeue -h -o "%A" --job ' + str(jobId)
self.sendSSHCommand(jobId, comLine=comLine, retHandler=self.handleRemoteSlurmStatus, emitFail=False)
def handleRemoteQsubStatus(self, jobId, out, err):
# VU: It seems this function is not used at all.
sP = self.serverParams(jobId)
# VU: this check makes no sense, if the cluster runs other jobs, too
if len(out) < 10:
out = 'No jobs in queue (imples all finished/failed)'
self.emit(QtCore.SIGNAL('remoteJobMessage'), jobId, 'QSub status on ' + sP.machine, out, sP.machine)
#print('handleRemoteQsubStatus: '+out)
def handleRemoteSlurmStatus(self, jobId, out, err):
sP = self.serverParams(jobId)
if str(jobId) not in out:
out = 'The job is not in queue (implies it has finished or failed).'
self.emit(QtCore.SIGNAL('remoteJobMessage'), jobId, 'Squeue status on ' + sP.machine, out, sP.machine)
def getPidFileContent(self, jobId):
from core import CCP4Utils
sP = self.serverParams(jobId)
if sP.mechanism == 'ssh':
self.transportFiles(jobId,[[self.pidFile(jobId, local=True), self.pidFile(jobId)]], 'get', finishHandler=self.getPidFileContent2, failSignal=False)
else:
self.getPidFileContent2(jobId)
def getPidFileContent2(self,jobId):
self._transportFilesFinished(jobId)
from core import CCP4Utils
ret = CCP4Utils.readFile(self.pidFile(jobId, local=True))
try:
ret= int(ret.strip())
except:
ret = None
else:
self.setServerParam(jobId, 'serverProcessId', ret)
#comLine = 'pgrep -P ' + str(self.serverParams[jobId].serverProcessId)+';ps -ef'
comLine = 'ps -efu ' + self.serverParams(jobId).username
self.sendSSHCommand(jobId, comLine=comLine, retHandler=self.handleRemoteSSHStatus, emitFail=False)
return ret
def killSSHJob(self, jobId):
sP = self.serverParams(jobId)
'''
if sP.serverProcessId is None: self.getPidFileContent(jobId)
if sP.serverProcessId is None:
raise CException(self.__class__,340)
'''
self.sendSSHCommand(jobId, comLine='pkill -F ' + self.pidFile(jobId), retHandler=self.handleRemoteDel)
def killQsubJob(self, jobId):
sP = self.serverParams(jobId)
if sP.serverProcessId is None:
raise CException(self.__class__, 360)
comList = ['qdel', sP.serverProcessId]
if sP.mechanism in ['qsub_local']:
try: # KJS Again a problem.
proc = subprocess.Popen(comList, stdout=subprocess.PIPE,universal_newlines=True, stderr=subprocess.STDOUT)
out,err = proc.communicate()
print('kill qsub',out,err)
except:
exc_type, exc_value,exc_tb = sys.exc_info()[:3]
print("Error killing job\n"+str(exc_type)+"\n"+str(exc_value))
raise CException(self.__class__,361)
else:
from dbapi import CCP4DbApi
from core import CCP4Modules
self.deleteServerParams(jobId)
CCP4Modules.PROJECTSMANAGER().db().updateJobStatus(jobId=jobId, status=CCP4DbApi.JOB_STATUS_FAILED)
elif sP.mechanism in ['qsub_remote']:
comline = 'qdel ' + sP.serverProcessId
self.sendSSHCommand(jobId, comLine='qdel ' + sP.serverProcessId, retHandler=self.handleRemoteDel)
def killSlurmJob(self, jobId):
sP = self.serverParams(jobId)
if sP.serverProcessId is None:
raise CException(self.__class__, 360)
comList = ['scancel', sP.serverProcessId]
if sP.mechanism in ['slurm_local']:
try:
proc = subprocess.Popen(comList, stdout=subprocess.PIPE,universal_newlines=True, stderr=subprocess.STDOUT)
out,err = proc.communicate()
print('kill Slurm job',out,err)
except:
exc_type, exc_value,exc_tb = sys.exc_info()[:3]
print("Error killing job\n"+str(exc_type)+"\n"+str(exc_value))
raise CException(self.__class__,361)
else:
from dbapi import CCP4DbApi
from core import CCP4Modules
self.deleteServerParams(jobId)
CCP4Modules.PROJECTSMANAGER().db().updateJobStatus(jobId=jobId, status=CCP4DbApi.JOB_STATUS_FAILED)
elif sP.mechanism in ['slurm_remote']:
self.sendSSHCommand(jobId, comLine='scancel ' + sP.serverProcessId, retHandler=self.handleRemoteDel)
def handleRemoteDel(self, jobId, out, err):
from dbapi import CCP4DbApi
from core import CCP4Modules
if self._diagnostic:
print('handleRemoteQsubDel', jobId, out, err)
self.deleteServerParams(jobId)
CCP4Modules.PROJECTSMANAGER().db().updateJobStatus(jobId=jobId, status=CCP4DbApi.JOB_STATUS_FAILED)
def killRemoteJob(self, jobId):
sP = self.serverParams(jobId)
if sP is None:
raise CException(self.__class__, 365)
if sP.mechanism in ['ssh', 'ssh_shared']:
self.killSSHJob(jobId)
elif sP.mechanism in ['qsub_local', 'qsub_shared', 'qsub_remote']:
self.killQsubJob(jobId)
elif sP.mechanism in ['slurm_remote']:
self.killSlurmJob(jobId)
elif sP.mechanism in ['custom']:
self.customHandler(jobid).killJob(jobId)
def runLocalTest(self, jobId, remoteSh):
from core import CCP4Modules
CCP4Modules.PROCESSMANAGER().startProcess('sh', [remoteSh], handler=[self.localTestFinished, {'jobId':jobId}], ifAsync=True)
def localTestFinished(self, pid, jobId=None):
#print 'localTestFinished',pid,args
self.handleFinishedServerJob(jobId)
def pollForFinishedFlagFile(self):
if getattr(self,'pollFinishConnected',None) is None:
self.connect(self,QtCore.SIGNAL('testRemoteFiles'), self.pollForFinishedFlagFile1)
pollByMachine = {}
for jobId in self.getJobsToPollFinish([1]):
sP = self.serverParams(jobId)
if sP.mechanism in ['ssh', 'ssh_shared', 'qsub_remote', 'qsub_shared', 'qsub_local', 'slurm_remote']:
if sP.mechanism in ['ssh_shared', 'qsub_shared', 'qsub_local']:
if os.path.exists(sP.finishedFile):
self.handleFinishedServerJob(jobId)
else:
if sP.machine not in pollByMachine:
pollByMachine[sP.machine] = []
pollByMachine[sP.machine].append(jobId)
#print 'pollForFinishedFlagFile pollByMachine',pollByMachine
for machine, jobIdList in list(pollByMachine.items()):
finfileList = []
for jobId in jobIdList:
sP = self.serverParams(jobId)
if sP is not None:
finfileList.append (sP.finishedFile)
#print 'pollForFinishedFlagFile finfileList',finfileList
if len(finfileList) > 0:
self.testRemoteFiles(jobId=jobId, machine=sP.machine, username=sP.username, password=sP.password, remoteFileList=finfileList)
def pollForFinishedFlagFile1(self, jobId, fileStatus):
if self._diagnostic:
print('pollForFinishedFlagFile', jobId, fileStatus)
if not isinstance(jobId,list):
jobId = [jobId]
for ii in range(len(jobId)):
if fileStatus[ii]:
self.handleFinishedServerJob(jobId[ii])
def customHandler(self, jobId=None, customCodeFile=None):
# This is creating one instance of the custom handler for each session
# The alternative of one per job may be better for some cases???
customCodeFile = str(customCodeFile)
if not hasattr(self,'_customHandler'):
self._customHandler = {}
if jobId is not None:
customCodeFile = self.getServerParam(jobId, 'customCodeFile')
if customCodeFile in self._customHandler:
return self._customHandler[customCodeFile]
if customCodeFile is None:
print('ERROR attempting to access server custom code when no file provided')
return None
elif not os.path.exists(str(customCodeFile) ):
print('ERROR server custom code file does not exist:' + customCodeFile)
return None
else:
# If a startup.py file exists exec it first
startupFile = os.path.join(os.path.split(customCodeFile)[0], 'startup.py')
if os.path.exists(startupFile):
try:
exec(compile(open(startupFile).read(), startupFile, 'exec'))
except Exception as e:
print('ERROR execing custom server interface startup code:', startupFile)
print(e)
return None
import inspect
from core import CCP4Utils
sys.path.append(os.path.split(customCodeFile)[0])
module, err = CCP4Utils.importFileModule(customCodeFile)
if err is not None:
print('ERROR loading custom code file:' + customCodeFile)
print(err)
return None
clsList = inspect.getmembers(module, inspect.isclass)
if len(clsList) == 0:
print('ERROR no classes found in custom code file:' + customCodeFile)
return None
if self._diagnostic:
print('CServerParams.customHandler instantiating customHandler', clsList[0][0], self._serverParams)
try:
self._customHandler[customCodeFile] = clsList[0][1](self.serverParams)
except Exception as e:
print('ERROR instantiating custom handler class')
print(e)
return None
return self._customHandler[customCodeFile]
def listRemoteProcesses(self):
import functools
runningJobs = self.getJobsToPollFinish(pollFinish=[0, 1])
jobsByMachine = {}
for jobId in runningJobs:
sP = self.serverParams(jobId)
if sP.machine not in jobsByMachine:
jobsByMachine[sP.machine] = {}
jobsByMachine[sP.machine][jobId] = sP.serverProcessId
for machine, jobDict in list(jobsByMachine.items()):
comLine=self.getServerParam(jobId, 'ccp4Dir') + '/bin/ccp4-python ' + \
self.getServerParam(jobId,'ccp4Dir') + '/share/ccp4i2-devel/bin/listProcesses.py'
self.sendSSHCommand(list(jobDict.keys())[0], comLine=comLine,
retHandler=functools.partial(self.handleListRemoteProcesses, machine, jobDict), emitFail=False)
# get qsub status
runningJobs = self.getJobsToPollFinish(pollFinish=[2])
def handleListRemoteProcesses(self, machine, jobDict, jobId, out, err):
#print 'handleListRemoteProcesses out',out
#print 'handleListRemoteProcesses err',err
# Extract the processes and time from the remote job stdout
for line in out.split('\n'):
if line.startswith('processes') or line.startswith('atTime'):
#print 'handleListRemoteProcesses line',line
try:
exec(line)
except Exception as e:
print('Failed evaling the return from listRemoteProcesses')
print('Error:', e)
#print 'processes',processes
#print 'atTime',atTime
try:
self.emit(QtCore.SIGNAL('remoteProcessesList'), machine, jobDict, processes, atTime) # KJS : Another basic error here by the looks of it.
except:
print('ERROR in handling listing of remote processes')
print('execing line', line)