Dear Michael,
   I think that the port number is hardwired in CCP4i2 on line 439 of

        transport = paramiko.Transport((sP.machine, 22))

also there are various calls to the "paramiko" library which handles ssh,
which do not pass a port number and so default to 22.

This is all bad.

I have attached a replacement for this file. Could you try making a backup
of the above file and copying the attached file in its place.
I have put a constant for the port number in the line


near the top of the file. Then change this port number for what is suitable
for you.

If this works, then I will make the port number an option in the program.
If not, then I will try something else.

Best wishes,
Stuart McNicholas

On Mon, 8 Jun 2020 at 17:40, Michael Weyand <>

> Dear CCP4I2 experts,
> I'm trying to submit remote jobs via SSH within CCP4i2. Unfortunately,
> we use a non standard SSH port.
> So far, I'm not able to add any option within the CCP4I2 interface. For
> job submission, I need something to do like
> 'ssh -p XXXXX -Y ..... ccp4i-specific-command ...'.
> Is there any chance to enter this '-p' SSH option within CCP4i2? I tried
> already to define a "ssh command" via "Preferences".
> But so far without any success.
> I'm also wondering why there is a sshkey file option. If an user already
> uses a ssh-key for a client/server connection, why is necessary to
> re-input the key file again?
> Any ssh between client and server should work from scratch ...
> Any hints are highly appreciated,
> Michael
from __future__ import print_function

""" CCP4 GUI Project
     Copyright (C) 2016 STFC

     This library is free software: you can redistribute it and/or
     modify it under the terms of the GNU Lesser General Public License
     version 3, modified in accordance with the provisions of the 
     license to address the requirements of UK law.
     You should have received a copy of the modified GNU Lesser General 
     Public License along with this library.  If not, copies may be 
     downloaded from
     This program is distributed in the hope that it will be useful,
     but WITHOUT ANY WARRANTY; without even the implied warranty of
     GNU Lesser General Public License for more details.

   Liz Potterton Apr 2016 - Separate 'remote' server code out from CCP4JobController
import os
import re
from PyQt4 import QtCore
from core.CCP4ErrorHandling import *


class CServerParams:
    SAVELIST = ['machine', 'username', 'ccp4Dir', 'tempDir', 'mechanism', 'keyFilename', 'serverProcessId',
                'remotePath', 'validate', 'customCodeFile', 'queueOptionsFile', 'sge_root', 'serverGroup']

    def __init__(self, **kw):
        self.jobId = kw.get('jobId', None)
        self.password = kw.get('password', None)
        self.jobNumber = None
        self.projectId = None
        self.projectName = None
        self.projectDirectory = None
        # pollFinish flags: 0=no poll (eg ssh expecting a signal) 1=poll for FINISHED file 2=poll qsub
        #                   3=custom poll (started in current i2 session) 4 = custom poll (from previous i2 session)
        self.pollFinish = 0
        self.pollReport = 0
        for item in CServerParams.SAVELIST:
            setattr(self, item, kw.get(item, None))
        self.serverProcessId = None

    def __str__(self):
        tx = ''
        for item in CServerParams.SAVELIST + ['jobNumber', 'projectName', 'projectDirectory', 'pollFinish', 'pollReport']:
            tx = tx + item + ': ' + str(getattr(self, item)) + '\n'
        return tx

    def remote_finished_tarball(self):
        return self.remotePath + ''

    def remote_report_file(self):
        if self.mechanism in ['ssh_shared', 'qsub_local', 'qsub_shared']:
            return None
            if self.remotePath is None:
                return 'work/project/CCP4_JOBS/job_' + self.jobNumber + '/program.xml'
                return self.remotePath + 'work/project/CCP4_JOBS/job_' + self.jobNumber + '/program.xml'

    def local_tarball(self):
        return os.path.join(self.projectDirectory, 'CCP4_TMP', 'job_' + self.jobNumber + '')

    def local_report_file(self):
        return os.path.join(self.projectDirectory, 'CCP4_JOBS', 'job_' + self.jobNumber, 'program.xml')

    def local_finished_tarball(self):
        return os.path.join(self.projectDirectory, 'CCP4_TMP', 'job_' + self.jobNumber + '')

    def dbXml(self):
        if self.mechanism not in ['ssh_shared', 'test', 'qsub_local', 'qsub_shared']:
            return os.path.join(self.projectDirectory, 'CCP4_TMP', 'DATABASE_job_' + self.jobNumber + '.db.xml')
            return os.path.join(self.projectDirectory, 'CCP4_JOBS', 'job_' + self.jobNumber, 'DATABASE.db.xml' )

    def finishedFile(self):
        if self.mechanism in ['ssh_shared', 'qsub_shared', 'qsub_local']:
            return os.path.join(self.projectDirectory, 'CCP4_JOBS', 'job_' + self.jobNumber, 'FINISHED')
            return self.remotePath[0:-1]+'.FINISHED'

    def jobNumber(self):
        """Get the jobNumber"""
        print 'jobNumber',self.jobId
        if self._jobNumber is None:
            jobInfo = self.db.getJobInfo(jobId=self.jobId,mode=['jobnumber','projectid'])
            self._jobNumber = jobInfo['jobnumber']
            self._projectId = jobInfo['projectid']
        return self._jobNumber

    def projectName(self):
        """Get the projectName"""
        if self._projectName is None:
            if self._projectId is None: self.jobNumber()
            projectInfo = self.db.getProjectInfo(projectId=self._projectId)
            self._projectName = projectInfo['projectname']
            self._projectDirectory = projectInfo['projectdirectory']
        return self._projectName

    def projectDirectory(self):
        """Get the projectDirectory"""
        print 'projectDirectory',self._projectId
        if self._projectDirectory is None:
            if self._projectId is None: self.jobNumber()
            projectInfo = self.db.getProjectInfo(projectId=self._projectId)
            self._projectName = projectInfo['projectname']
            self._projectDirectory = projectInfo['projectdirectory']
        return self._projectDirectory

    def getEtree(self):
        from lxml import etree
        ele = etree.Element('serverParams')
        ele.set('jobId', str(self.jobId))
        for key in CServerParams.SAVELIST:
            if getattr(self, key, None) is not None:
                e = etree.Element(key)
                e.text = str(getattr(self,key))
        return ele

    def setEtree(self,ele):
        self.jobId = ele.get('jobId')
        for e in ele.iterchildren():
            setattr(self, str(e.tag), str(e.text))

    def setDbParams(self):
        from core import CCP4Modules
        jobInfo = CCP4Modules.PROJECTSMANAGER().db().getJobInfo(self.jobId, ['jobnumber', 'projectid', 'projectname'])
        self.jobNumber = jobInfo['jobnumber']
        self.projectId = jobInfo['projectid']
        self.projectName = jobInfo['projectname']
        self.projectDirectory = CCP4Modules.PROJECTSMANAGER().db().getProjectInfo(self.projectId, 'projectdirectory')

class CJobServer:
    DB_KEYS = ['jobId', 'machine', 'username', 'mechanism', 'remotePath', 'customCodeFile', 'validate',
               'keyFilename', 'serverProcessId', 'serverGroup']
    ERROR_CODES = {301 : {'description' : 'Failed opening SSH connection'},
                   302 : {'description' : 'Failed opening SSH connection'},
                   303 : {'description' : 'Failed opening SSH connection'},
                   304 : {'description' : 'Failed opening SSH connection'},
                   320 : {'description' : 'Failed submitting job'},
                   330 : {'description' : 'Failed to open connection to copy files'},
                   331 : {'description' : 'Failed copying file to server'},
                   332 : {'description' : 'Failed copying file from server'},
                   340 : {'description' : 'Failed killing remote job - can not recover job id'},
                   350 : {'description' : 'Failed running job startup script on server'},
                   360 : {'description' : 'Failed to kill job - job id unknown'},
                   361 : {'description' : 'Failed to kill job - possible communication fail'},
                   362 : {'description' : 'Failed to find job status'},
                   363 : {'description' : 'Failed to recover remote process id'},
                   364 : {'description' : 'Failed to recover remote process status'},
                   365 : {'description' : 'Failed killing remote process - no information on the remote process'}}

    def __init__(self):
        from core import CCP4Utils
        import paramiko
        self._diagnostic = False
        self._serverParams = {}
        self.runInSSHThreads = []
        # Lists of outstanding jobs when i2 restarts - check if they are finished and
        # then poll in CJobController.doChecks()
        paramiko.util.log_to_file(os.path.join(CCP4Utils.getDotDirectory(), 'status', 'paramiko.log'))

    def restoreRunningJobs(self):

    def getJobsToPollFinish(self, pollFinish=[]):
        ret = []
        for jobId,sP in list(self._serverParams.items()):
            if sP.pollFinish in pollFinish:
        return ret

    def getJobsToPollReport(self):
        jobIdList = []
        for jobId,sP in list(self._serverParams.items()):
            if sP.pollReport > 0:
        return jobIdList

    def Exit(self):
        if len(self._serverParams) > 0:
        elif os.path.exists(self.defaultParamsFileName()):

    def serverParams(self, jobId):
            return self._serverParams[jobId]
            return None

    def saveParams(self, fileName=None):
        from lxml import etree
        from core import CCP4File
        if fileName is None:
            fileName = self.defaultParamsFileName()
        fileObj = CCP4File.CI2XmlDataFile(fileName)
        fileObj.header.function = 'JOBSERVERSTATUS'
        bodyEtree = etree.Element('serverParamsList')
        for jobId, sP in list(self._serverParams.items()):

    def defaultParamsFileName(self):
        from core import CCP4Utils
        return os.path.join(CCP4Utils.getDotDirectory(), 'status', 'jobServer.params.xml')

    def restoreFromDb(self):
        from core import CCP4Modules
        jobInfoList = CCP4Modules.PROJECTSMANAGER().db().getServerJobs()
        #print 'restoreFromDb',jobInfoList
        for jobInfo in jobInfoList:
            sP = self._serverParams[jobInfo[0]] = CServerParams()
            ii = 0
            for key in self.DB_KEYS:
                setattr(sP, key, jobInfo[ii])
                ii += 1

    def setPollStatus(self, sP):
        if sP.mechanism in ['qsub']:
            sP.pollFinish = 2
        elif sP.mechanism in ['qsub_remote', 'ssh', 'ssh_shared', 'slurm_remote']:
            if sP.validate in ['password', 'pass_key_filename']:
                sP.pollFinish = -1
                sP.pollFinish = 1
        elif sP.mechanism in ['custom']:
            sP.pollFinish = 4
        # And set to poll for report update
        if sP.remote_report_file is not None:
            sP.pollReport = 1

    def loadParams(self, fileName=None):
        from core import CCP4File
        if fileName is None:
            fileName = self.defaultParamsFileName()
        if not os.path.exists(fileName):
        fileObj = CCP4File.CI2XmlDataFile(fileName)
        body = fileObj.getBodyEtree()
        #print 'CJobServer.loadParams body',body.tag
        # pollFinish flags: 0=no poll (eg ssh expecting a signal) 1=poll for FINISHED file 2=poll qsub
        #                   3=custom poll (started in current i2 session) 4 = custom poll (from previous i2 session)
        for ele in body.iterchildren():
            jobId = ele.get('jobId')
            if len(ele) > 0:
                sP = self._serverParams[jobId] = CServerParams()

    def createServerParams(self, jobId, params):
        from core import CCP4Utils
        self._serverParams[jobId] = params
        self._serverParams[jobId].jobId = jobId
        jobInfo = self.db.getJobInfo(jobId=jobId, mode=['jobnumber', 'projectid'])
        self._serverParams[jobId].jobNumber = jobInfo['jobnumber']
        self._serverParams[jobId].projectId = jobInfo['projectid']
        projectInfo = self.db.getProjectInfo(projectId=self._serverParams[jobId].projectId)
        self._serverParams[jobId].projectName = CCP4Utils.safeOneWord(projectInfo['projectname'])
        self._serverParams[jobId].projectDirectory = projectInfo['projectdirectory']
        #print 'setServerParams',jobId, self._serverParams[jobId]
        self.db.createServerJob(jobId, self._serverParams[jobId])

    def setServerParam(self, jobId, key, value):
#Trying to set the keys below is bad on Python3 and I guess not neccessary as surely property method should override?
        if jobId not in self._serverParams or key in ("remote_finished_tarball","remote_report_file","local_tarball","local_report_file","local_finished_tarball","dbXml","finishedFile"):
            setattr(self._serverParams[jobId] ,key, value)
            if key in self.DB_KEYS:
                self.db.updateServerJob(jobId, key, value)

    def getServerParam(self, jobId, key):
        from core import CCP4Modules
        if jobId not in self._serverParams:
            return None
        rv = getattr(self._serverParams[jobId], key, None)
        if rv is not None:
            return rv
        if key in ['ccp4Dir']:
            #Try recovering info from the server setup data
            #This is in a def file (.CCP4I2/configs/serverSetup.def.xml) and loaded into
            # a container with each serverGroup labelled SERVERGROUPn
            serverGroup = getattr(self._serverParams[jobId], 'serverGroup', None)
            if serverGroup is None:
                return None
            if getattr(self, 'setupContainer', None) is None:
                self.setupContainer = CCP4Modules.SERVERSETUP()
            for dataObjName in self.setupContainer.dataOrder():
                if self.setupContainer.get(dataObjName).name == serverGroup:
                    value = self.setupContainer.get(dataObjName).get(key)
                    #print 'getServerParam from SERVERSETUP',key,dataObjName,value
                    setattr(self._serverParams[jobId], key, value)
                    return value
        return None
    def deleteServerParams(self, jobId):
        except Exception as e:
            print('ERROR removing server job record from database\n',e)
        if jobId in self._serverParams:
            del self._serverParams[jobId]
    def deleteServerParam(self, jobId, key):
            delattr(self._serverParams[jobId], key)
    def patchRemotePasswords(self, jobId,password):
        sP = self.serverParams(jobId)
        count = 0
        for jI in list(self._serverParams.keys()):
            if jI == jobId or (self._serverParams[jI].pollFinish < 0 and \
                               self._serverParams[jI].machine == sP.machine and self._serverParams[jI].username == sP.username):
                count += 1
                self._serverParams[jI].password = password
                self._serverParams[jI].pollFinish = 1
                self._serverParams[jI].pollReport = 1
        return count

    def checkForRemotePasswords(self, projectId):
        requirePass = []
        for jobId in self.getJobsToPollFinish([-1, -2, -3, -4]):
            sP = self.serverParams(jobId)
            if sP.pollFinish < 0 and sP.projectId == projectId:
                requirePass.append({'jobId' : jobId , 'machine' : sP.machine, 'username' : sP.username})
        return requirePass

    def getPKey(self,jobId):
        import paramiko
        from core import CCP4Utils
        sP = self.serverParams(jobId)
        keyFilename = re.sub(r'\$HOME', CCP4Utils.getHOME(), sP.keyFilename)
        if sP.validate == 'pass_key_filename': 
            pkey = paramiko.RSAKey.from_private_key_file(keyFilename, password=sP.password)
            pkey = paramiko.RSAKey.from_private_key_file(keyFilename)
        #print 'CJobServer.getPKey', keyFilename
        return pkey

    def pollQsubStat(self, mode='finished'):
        import subprocess
        procCheck = subprocess.Popen(["qstat", "-xml"], stdout=subprocess.PIPE, universal_newlines=True, stderr=subprocess.STDOUT)
        outCheck,errCheck = procCheck.communicate()
        if self._diagnostic:
            print('pollQsubStat', outCheck, errCheck)
        if mode == 'finished':
            qsubIdDict= self.parseQsubStat(outCheck)
            for jobId in self.getJobsToPollFinish([2]):
                if self.serverParams(jobId).serverProcessId not in qsubIdDict:
        elif mode == 'status':
            return self.parseQsubStat(outCheck, mode)

    def handleFinishedServerJob(self, jobId):
        # Should be reimplemented in application
        # Do whatever necessary to retrieve and use job
        # Remember to delete from self.serverParams
        self.setServerParam(jobId, 'pollFinish', 0)
        self.setServerParam(jobId, 'pollReport', 0)

    def parseQsubStat(self, returnString, mode='finished'):
        from lxml import etree
        parser = etree.XMLParser()
        tree = etree.fromstring(returnString, parser)
        if mode == 'finished':
            qsubIdDict = {}
            jobs = tree.xpath("job_info|queue_info")
            for job in jobs:
                for jobl in job.xpath("job_list"):
                    qsubIdDict[str(jobl.xpath("JB_job_number")[0].text)] = str(jobl.attrib["state"])
            #print 'parseQsubStat',qsubIdDict
            return qsubIdDict
        elif mode == 'status':
            return {}

    def pidFile(self, jobId, local=False):
        sP = self.serverParams(jobId)
        if local or sP.mechanism == 'ssh_shared':
            return  os.path.join(sP.projectDirectory, 'CCP4_JOBS', 'job_' + sP.jobNumber, 'pidfile.txt')
        elif sP.mechanism == 'ssh':
            return sP.remotePath + 'pidfile.txt'
            return None

    def transportFiles(self, jobId, copyList=[], mode='put', finishHandler=None, failSignal=True, diagnostic=True):
        import functools
        import UtilityThread
        sP = self.serverParams(jobId)
        #print  'transport files',mode,copyList
        if getattr(sP, 'sshThreadTransport', None) is not None:
            return False
        if finishHandler is None:
            finishHandler = self._transportFilesFinished
        sP.sshThreadTransport = UtilityThread.UtilityThread(functools.partial(self._transportFiles, jobId, copyList, mode, failSignal, diagnostic))
        sP.sshThreadTransport.finished.connect(functools.partial(finishHandler, jobId))
        return True

    def _transportFiles(self, jobId, copyList=[], mode='put', failSignal=True, diagnostic=True):
        import paramiko
        sP = self.serverParams(jobId)
            transport = paramiko.Transport((sP.machine, PARAMIKO_PORT))
            if sP.validate in ['key_filename', 'pass_key_filename'] and sP.keyFilename is not None and len(sP.keyFilename) > 0:
                transport.connect(username=sP.username, password=sP.password, pkey=self.getPKey(jobId))
                transport.connect(username=sP.username, password=sP.password)
            sftp = paramiko.SFTPClient.from_transport(transport)
        except Exception as e:
            print('ERROR setting up paramiko FTP file transfer')
            err = CException(self.__class__, 330, 'Machine:' + str(self.getServerParam(jobId, 'machine')) + '\n' + str(e))
            if failSignal:
                self.emit(QtCore.SIGNAL('failedOpenConnection'), jobId, err)
            raise err
        err = CException()
        for localName,remoteName in copyList:
                if mode == 'put':
                    sftp.put(localName, remoteName)
                    if diagnostic:
                        print("Copied", localName, 'to', remoteName)
                    sftp.get(remoteName, localName )
                    if diagnostic:
                        print("Copied", remoteName, 'to', localName)
            except Exception as e:
                if mode == 'put':
                    if diagnostic:
                        print('ERROR copying files', str(localName), 'to', str(remoteName))
                    err.append(self.__class__, 331, 'Local:' + str(localName) + ' Remote:' + str(remoteName) + '\n' + str(e))
                    if diagnostic:
                        print('ERROR copying files',str(localName),'from',str(remoteName))
                    err.append(self.__class__, 332, 'Local:' + str(localName) + ' Remote:' + str(remoteName) + '\n' + str(e))
        if len(err) > 0:
            if failSignal:
                self.emit(QtCore.SIGNAL('failedOpenConnection'), jobId, err)
            #raise err

    def _transportFilesFinished(self,jobId):
        #print '_transportFilesFinished',jobId

    def openSSHConnection(self, jobId, machine, username, password, keyFilename=None, timeout=None, maxTries=2, emitFail=True):
        import paramiko,socket
        client = paramiko.SSHClient()
        connected= False
        ntries = 0
        #print 'openSSHConnection',machine,username,keyFilename,timeout,maxTries
        self.emit(QtCore.SIGNAL('testMessage'), 'Connecting to ' + machine + ' with timeout ' + str(timeout))
        while ntries < maxTries and not connected:
            failCode = 0
                if self._diagnostic:
                    print('TO client.connect', ntries)
                if keyFilename is not None and keyFilename != 'None' and len(keyFilename) > 0: 
                    client.connect(machine, port=PARAMIKO_PORT, username=username, password=password, key_filename=keyFilename, timeout=timeout)
                    client.connect(machine, port=PARAMIKO_PORT, username=username, password=password, timeout=timeout)
                #print 'DONE client.connect'
            except paramiko.ssh_exception.AuthenticationException:
                failCode = 1
                ntries = maxTries
                mess = 'Authentication failed'
            except paramiko.ssh_exception.BadAuthenticationType:
                failCode = 2
                mess = 'Authentication type not supported'
            except socket.gaierror:
                failCode = 3
                mess = 'Socket error'
            except Exception as e:
                failCode = 4
                mess = str(e)
            #print 'runOnServer try', ntries, 'failCode', failCode
            if not connected:
                self.emit(QtCore.SIGNAL('testMessage'), '   Try: ' + str(ntries) + ' failed: ' + str(mess))
                self.emit(QtCore.SIGNAL('testMessage'), '   Try: ' + str(ntries) + ' succeeded')
            ntries = ntries + 1
        if not connected:
            print("Some error submitting job")
            exc_type, exc_value, exc_tb = sys.exc_info()[:3]
            print("Error running job using ssh" + str(exc_type) + "\n" + str(exc_value))
            err = CException(self.__class__, 300 + failCode, 'Machine:' + machine + '\n' + str(exc_type) + "\n" + str(exc_value) + '\n')
            if emitFail:
                self.emit(QtCore.SIGNAL('failedOpenConnection'), jobId, err)
            raise err
            return client

    def testRemoteFiles(self, jobId=None, machine=None, username=None, password=None, keyFilename=None, remoteFileList=[], timeout=None, maxTries=2):
        import UtilityThread
        import functools
        if self.getServerParam(jobId, 'sshThreadRemoteFiles') is not None:
        sP = self.serverParams(jobId)
        sP.sshThreadRemoteFiles = UtilityThread.UtilityThread(functools.partial(self._testRemoteFiles, jobId, machine, username, password,
                                                                                keyFilename, remoteFileList, timeout, maxTries))
        sP.sshThreadRemoteFiles.finished.connect(functools.partial(self._sendSSHFinished, jobId, None, 'sshThreadRemoteFiles'))

    def _testRemoteFiles(self, jobId=None, machine=None, username=None, password=None, keyFilename=None, remoteFileList=[], timeout=None, maxTries=2):
        #print 'CJobServer._testRemoteFiles',jobId,machine,username,remoteFileList
        ret = []
        if isinstance(jobId, list):
            jI = jobId[0]
            jI = jobId
            client = self.openSSHConnection(jobId=jI, machine=machine, username=username, password=password,
                                            keyFilename=keyFilename, timeout=timeout, maxTries=maxTries)
        except Exception as e:
            print('testRemoteFiles fail\n',e)
            for item in remoteFileList:
            return ret
        for remoteFile in remoteFileList:
            stdin, stdout, stderr = client.exec_command('[ -e ' + remoteFile + ' ] && echo "Found" || echo "Lost"')
            out = ""
            err = ""
            for line in stdout:
                out += line
            for line in stderr:
                err += line
            if self._diagnostic:
                print('testRemoteFiles', remoteFile, out, '*', err)
            if out.count('Found') > 0:
                self.emit(QtCore.SIGNAL('testMessage'), '   File: ' + str(remoteFile) + ' exists')
                self.emit(QtCore.SIGNAL('testMessage'), '   File: ' + str(remoteFile) + ' not found')
            ret.append(out.count('Found') > 0)
        self.deleteServerParam(jobId, 'sshThreadRemoteFiles')
        self.emit(QtCore.SIGNAL('testRemoteFiles'), jobId, ret)

    def runOnServer(self, jobId=None):
        '''Run ssh or qsub with or without shared file system
         reimplement in application'''

    def runInQsub(self, jobId, remoteSh, optionsFile=None, mechanism='qsub_local'):
        import subprocess
        path,name = os.path.split(remoteSh)
        name = name[0:-9]
        stdout = os.path.join(path, name + 'qsub_stdout.txt')
        stderr = os.path.join(path, name + 'qsub_stderr.txt')
        sP = self.serverParams(jobId)
        comList = ["qsub", '-o', stdout, '-e', stderr, '-S', '/bin/sh', remoteSh]
        if optionsFile is not None:
            comList.insert(1, '@')
            comList.insert(2, optionsFile)
        if mechanism == 'qsub_local':
                proc = subprocess.Popen(comList, stdout=subprocess.PIPE, universal_newlines=True, stderr=subprocess.STDOUT)
                out,err = proc.communicate()
                if self._diagnostic:
                    print('qsub start', out, err)
                exc_type, exc_value,exc_tb = sys.exc_info()[:3]
                print("Error running job using remote queue command 'qsub'\n\n" + str(exc_type) + "\n\n" + str(exc_value))
                raise CException(self.__class__, 320)
                    self.setServerParam(jobId, 'serverProcessId', out[:out.find('(')].rstrip().split()[-1])
                    self.setServerParam(jobId, 'pollFinish',1)
                    self.setServerParam(jobId, 'pollReport',1)
            # Use ssh to submit qsub on a different machine
            if sP.sge_root is not None:
                comLine = 'SGE_ROOT=' + str(sP.sge_root) + ' ' + comList[0]
                comLine = comList[0]
            for item in comList[1:]:
                comLine = comLine + ' ' + item
            self.sendSSHCommand(jobId, comLine=comLine, retHandler=self.handleRemoteQsubStart)
            self.setServerParam(jobId, 'pollFinish', 1)
            self.setServerParam(jobId, 'pollReport', 1)

    def runInSlurm(self, jobId, remoteSh, optionsFile=None, mechanism='slurm_remote'):
        import subprocess
        path,name = os.path.split(remoteSh)
        name = name[0:-9]
        stdout = os.path.join(path, name + 'slurm_stdout.txt')
        stderr = os.path.join(path, name + 'slurm_stderr.txt')
        sP = self.serverParams(jobId)
        comList = ["sbatch", '-o', stdout, '-e', stderr, remoteSh]
        if optionsFile is not None:
            comList.insert(1, '@')
            comList.insert(2, optionsFile)
        if mechanism == 'slurm_local':
                proc = subprocess.Popen(comList, stdout=subprocess.PIPE, universal_newlines=True, stderr=subprocess.STDOUT)
                out,err = proc.communicate()
                if self._diagnostic:
                    print('slurm start', out, err)
                exc_type, exc_value,exc_tb = sys.exc_info()[:3]
                print("Error running job using command 'sbatch'\n\n" + str(exc_type) + "\n\n" + str(exc_value))
                raise CException(self.__class__, 320)
                    match ='[0-9]+',out)
                    pid = -999
                    if match:
                        pid =
                    self.setServerParam(jobId, 'serverProcessId', pid)
                    self.setServerParam(jobId, 'pollFinish',1)
                    self.setServerParam(jobId, 'pollReport',1)
            # Use ssh to submit to Slurm on a different machine
            comLine = comList[0]
            for item in comList[1:]:
                comLine = comLine + ' ' + item
            self.sendSSHCommand(jobId, comLine=comLine, retHandler=self.handleRemoteSbatchStart)
            self.setServerParam(jobId, 'pollFinish', 1)
            self.setServerParam(jobId, 'pollReport', 1)

    def handleRemoteQsubStart(self, jobId, out, err):
        self.setServerParam(jobId,'serverProcessId', out[:out.find('(')].rstrip().split()[-1])
        if self._diagnostic:
            print('handleRemoteQsubStart', self.serverParams(jobId).serverProcessId) 

    def handleRemoteSbatchStart(self, jobId, out, err):
        # parse the output from the sbatch command
        pid = -999
        match ='[0-9]+',out)
        if match:
            pid =
        # TODO: add some error handling
        self.setServerParam(jobId,'serverProcessId', pid)
        if self._diagnostic:
            print('handleRemoteSbatchStart', self.serverParams(jobId).serverProcessId)

    def sendSSHCommand(self, jobId, comLine, finishHandler=None, retHandler=None, emitFail=True):
        import functools
        import UtilityThread
        from core import CCP4Utils
        if self._diagnostic:
            print('Remote server command:', comLine)
        sP = self.serverParams(jobId)
        sP.sshThread = UtilityThread.UtilityThread(functools.partial(self._sendSSHCommand, jobId, comLine, emitFail, retHandler))
        if finishHandler is not None:
            sP.sshThread.finished.connect(functools.partial(finishHandler, jobId))
        #print 'runInSSH DONE'

    def _sendSSHCommand(self, jobId, comLine, emitFail=True, retHandler=None):
        from core import CCP4Utils
        sP = self.serverParams(jobId)
        if sP.keyFilename is not None and len(sP.keyFilename) > 0:
            if sP.keyFilename.count('HOME'):
                keyFilename = re.sub(r'\$HOME', CCP4Utils.getHOME(), sP.keyFilename)
                keyFilename = sP.keyFilename
            keyFilename = None
        sP.sshClient = self.openSSHConnection(jobId, sP.machine, sP.username, sP.password, keyFilename, emitFail=emitFail)
        stdin, stdout, stderr = sP.sshClient.exec_command(str(comLine))
        out = ""
        err = ""
        for line in stdout:
            out += line
        for line in stderr:
            err += line
        if self._diagnostic:
            print('Remote start stdout:', out)
            print('Remote start stderr:', err)
        if len(err) > 0:
            exc = CException(self.__class__, 350, err)
            if emitFail:
                self.emit(QtCore.SIGNAL('failedRemoteCommand'), jobId, exc)
        if retHandler:
                retHandler(jobId, out, err)
            except Exception as e:
                print('ERROR handling return output from SSH command')
        return out, err

    def _sendSSHFinished(self, jobId, clientParam='sshClient', threadParam='sshThread'):
        #print "Finished remote job",jobId,threadParam
        #print '_runInSSHFinished',self.serverParams[jobId].mechanism
        sP = self.serverParams(jobId)
        if sP is None:
        if clientParam is not None and hasattr(sP, clientParam):
            getattr(sP, clientParam).close()
            #getattr(sP, clientParam).__del__() - daft but how do we be show it is zapped?
        if hasattr(sP, threadParam):
            if getattr(sP, threadParam).retval is not None:
                stdout, stderr = getattr(sP,threadParam).retval
                if self._diagnostic:
                    print('Remote finish stdout:', stdout)
                    print('Remote finish stderr:', stderr)
                print('Failed analysing return from remote command')
        if hasattr(sP, threadParam):
            self.runInSSHThreads.remove(getattr(sP, threadParam))
            #getattr(sP,threadParam).__del__() -- this is nonsense for a thread

    def runInSSH(self, jobId, remoteSh):
        comLine = 'sh ' + remoteSh
        self.sendSSHCommand(jobId, comLine, finishHandler=self._sendSSHFinished, retHandler=self.handleRunInSSHDone)
        self.setServerParam(jobId, 'pollReport', 1)
        self.setServerParam(jobId, 'pollFinish', 1)
    def _runInSSHFinished(self, jobId):

    def handleRunInSSHDone(self, jobId, out, err):
        if self._diagnostic: 
            print('handleRunInSSHDone', jobId, 'out', out, 'err', err)
        for line in out.split('\n'):
            if line.startswith('PID='):
                self.setServerParam(jobId, 'serverProcessId', int(line[4:]))

    def checkRemoteStatus(self, jobId):
        sP = self.serverParams(jobId)
        if sP is None:
        if sP.mechanism in ['qsub_remote', 'qsub_shared']:
            return True
        elif sP.mechanism in ['slurm_remote']:
            return True
        elif sP.mechanism in ['qsub_local']:
            return True
        elif sP.mechanism in ['ssh', 'ssh_shared']:
            return True

    def checkSSHJobStatus(self,jobId):
        sP = self.serverParams(jobId)
        if sP.serverProcessId is None:
            #comLine = 'pgrep -P '+str(sP.serverProcessId)+';ps -fu '+sP.username
            comLine = 'ps -fu ' + sP.username
            self.sendSSHCommand(jobId, comLine=comLine, retHandler=self.handleRemoteSSHStatus, emitFail=False)

    def handleRemoteSSHStatus(self, jobId, out, err):
        if self._diagnostic:
            print('handleRemoteSSHStatus', jobId, out, err)
        sP = self.serverParams(jobId)
        self.emit(QtCore.SIGNAL('remoteJobMessage'), jobId, 'The process id for this job: ' + str(sP.serverProcessId), out, sP.machine)

    def checkLocalQsubStatus(self, jobId):
        try:      # KJS : Problem here. This func looks broken.
            proc = subprocess.Popen(comList='qstat', stdout=subprocess.PIPE, universal_newlines=True, stderr=subprocess.STDOUT)
            out,err = proc.communicate()
            if self._diagnostic:
                print('checkLocalQsubStatus', out, err)
            exc_type, exc_value,exc_tb = sys.exc_info()[:3]
            print("Error checking qsub status\n" + str(exc_type) + "\n" + str(exc_value))
            raise CException(self.__class__, 362)
            self.handleRemoteQsubStatus(jobId, out, err)

    def checkRemoteQsubStatus(self, jobId):
        sP = self.serverParams(jobId)
        if sP is None:
            return False
        comline = 'qstat'
        if sP.sge_root is not None:
            comLine = 'SGE_ROOT=' + str(sP.sge_root) + ' ' + comline
        self.sendSSHCommand(jobId, comLine=comLine, retHandler=self.handleRemoteQsubStatus, emitFail=False)

    def checkRemoteSlurmStatus(self, jobId):
        sP = self.serverParams(jobId)
        if sP is None:
            return False
        comline = 'squeue -h -o "%A" --job ' + str(jobId)
        self.sendSSHCommand(jobId, comLine=comLine, retHandler=self.handleRemoteSlurmStatus, emitFail=False)

    def handleRemoteQsubStatus(self, jobId, out, err):
        # VU: It seems this function is not used at all.
        sP = self.serverParams(jobId)
        # VU: this check makes no sense, if the cluster runs other jobs, too
        if len(out) < 10:
            out = 'No jobs in queue (imples all finished/failed)'
        self.emit(QtCore.SIGNAL('remoteJobMessage'), jobId, 'QSub status on ' + sP.machine, out, sP.machine)
        #print('handleRemoteQsubStatus: '+out)

    def handleRemoteSlurmStatus(self, jobId, out, err):
        sP = self.serverParams(jobId)
        if str(jobId) not in out:
            out = 'The job is not in queue (implies it has finished or failed).'
        self.emit(QtCore.SIGNAL('remoteJobMessage'), jobId, 'Squeue status on ' + sP.machine, out, sP.machine)

    def getPidFileContent(self, jobId):
        from core import CCP4Utils
        sP = self.serverParams(jobId)
        if sP.mechanism == 'ssh':
            self.transportFiles(jobId,[[self.pidFile(jobId, local=True), self.pidFile(jobId)]], 'get', finishHandler=self.getPidFileContent2, failSignal=False)

    def getPidFileContent2(self,jobId):
        from core import CCP4Utils
        ret = CCP4Utils.readFile(self.pidFile(jobId, local=True))
            ret= int(ret.strip())
            ret = None
            self.setServerParam(jobId, 'serverProcessId', ret)
            #comLine = 'pgrep -P ' + str(self.serverParams[jobId].serverProcessId)+';ps -ef'
            comLine = 'ps -efu ' + self.serverParams(jobId).username
            self.sendSSHCommand(jobId, comLine=comLine, retHandler=self.handleRemoteSSHStatus, emitFail=False)
        return ret

    def killSSHJob(self, jobId):
        sP =  self.serverParams(jobId)
        if sP.serverProcessId is None: self.getPidFileContent(jobId)
        if sP.serverProcessId is None:
           raise CException(self.__class__,340)
        self.sendSSHCommand(jobId, comLine='pkill -F ' + self.pidFile(jobId), retHandler=self.handleRemoteDel)

    def killQsubJob(self, jobId):
        sP = self.serverParams(jobId)
        if sP.serverProcessId is None:
            raise CException(self.__class__, 360)
        comList = ['qdel', sP.serverProcessId]
        if sP.mechanism in ['qsub_local']:
            try:    # KJS Again a problem.
                proc = subprocess.Popen(comList, stdout=subprocess.PIPE,universal_newlines=True, stderr=subprocess.STDOUT)
                out,err = proc.communicate()
                print('kill qsub',out,err)
                exc_type, exc_value,exc_tb = sys.exc_info()[:3]
                print("Error killing job\n"+str(exc_type)+"\n"+str(exc_value))
                raise CException(self.__class__,361)
                from dbapi import CCP4DbApi
                from core import CCP4Modules
                CCP4Modules.PROJECTSMANAGER().db().updateJobStatus(jobId=jobId, status=CCP4DbApi.JOB_STATUS_FAILED)
        elif sP.mechanism in ['qsub_remote']:
            comline = 'qdel ' + sP.serverProcessId
            self.sendSSHCommand(jobId, comLine='qdel ' + sP.serverProcessId, retHandler=self.handleRemoteDel)

    def killSlurmJob(self, jobId):
        sP = self.serverParams(jobId)
        if sP.serverProcessId is None:
            raise CException(self.__class__, 360)
        comList = ['scancel', sP.serverProcessId]
        if sP.mechanism in ['slurm_local']:
                proc = subprocess.Popen(comList, stdout=subprocess.PIPE,universal_newlines=True, stderr=subprocess.STDOUT)
                out,err = proc.communicate()
                print('kill Slurm job',out,err)
                exc_type, exc_value,exc_tb = sys.exc_info()[:3]
                print("Error killing job\n"+str(exc_type)+"\n"+str(exc_value))
                raise CException(self.__class__,361)
                from dbapi import CCP4DbApi
                from core import CCP4Modules
                CCP4Modules.PROJECTSMANAGER().db().updateJobStatus(jobId=jobId, status=CCP4DbApi.JOB_STATUS_FAILED)
        elif sP.mechanism in ['slurm_remote']:
            self.sendSSHCommand(jobId, comLine='scancel ' + sP.serverProcessId, retHandler=self.handleRemoteDel)

    def handleRemoteDel(self, jobId, out, err):
        from dbapi import CCP4DbApi
        from core import CCP4Modules
        if self._diagnostic:
            print('handleRemoteQsubDel', jobId, out, err)
        CCP4Modules.PROJECTSMANAGER().db().updateJobStatus(jobId=jobId, status=CCP4DbApi.JOB_STATUS_FAILED)

    def killRemoteJob(self, jobId):
        sP = self.serverParams(jobId)
        if sP is None:
            raise CException(self.__class__, 365)
        if sP.mechanism in ['ssh', 'ssh_shared']:
        elif sP.mechanism in ['qsub_local', 'qsub_shared', 'qsub_remote']:
        elif sP.mechanism in ['slurm_remote']:
        elif sP.mechanism in ['custom']:

    def runLocalTest(self, jobId, remoteSh):
        from core import CCP4Modules
        CCP4Modules.PROCESSMANAGER().startProcess('sh', [remoteSh], handler=[self.localTestFinished, {'jobId':jobId}], ifAsync=True)

    def localTestFinished(self, pid, jobId=None):
        #print 'localTestFinished',pid,args

    def pollForFinishedFlagFile(self):
        if getattr(self,'pollFinishConnected',None) is None:
            self.connect(self,QtCore.SIGNAL('testRemoteFiles'), self.pollForFinishedFlagFile1)
        pollByMachine = {}
        for jobId in self.getJobsToPollFinish([1]):
            sP = self.serverParams(jobId)
            if sP.mechanism in ['ssh', 'ssh_shared', 'qsub_remote', 'qsub_shared', 'qsub_local', 'slurm_remote']:
                if sP.mechanism in ['ssh_shared', 'qsub_shared', 'qsub_local']:
                    if os.path.exists(sP.finishedFile):
                    if sP.machine not in pollByMachine:
                        pollByMachine[sP.machine] = []
        #print 'pollForFinishedFlagFile pollByMachine',pollByMachine
        for machine, jobIdList in list(pollByMachine.items()):
            finfileList = []
            for jobId in jobIdList:
                sP = self.serverParams(jobId)
                if sP is not None:
                    finfileList.append (sP.finishedFile)
                #print 'pollForFinishedFlagFile finfileList',finfileList
            if len(finfileList) > 0:
                self.testRemoteFiles(jobId=jobId, machine=sP.machine, username=sP.username, password=sP.password, remoteFileList=finfileList)
    def pollForFinishedFlagFile1(self, jobId, fileStatus):
        if self._diagnostic:
            print('pollForFinishedFlagFile', jobId, fileStatus)
        if not isinstance(jobId,list):
            jobId = [jobId]
        for ii in range(len(jobId)):
            if fileStatus[ii]:

    def customHandler(self, jobId=None, customCodeFile=None):
        # This is creating one instance of the custom handler for each session
        # The alternative of one per job may be better for some cases???
        customCodeFile = str(customCodeFile)
        if not hasattr(self,'_customHandler'):
            self._customHandler = {}
        if jobId is not None:
            customCodeFile  = self.getServerParam(jobId, 'customCodeFile')
        if customCodeFile in self._customHandler:
            return self._customHandler[customCodeFile]
        if customCodeFile is None:
            print('ERROR attempting to access server custom code when no file provided')
            return None
        elif not os.path.exists(str(customCodeFile) ):
            print('ERROR server custom code file does not exist:' + customCodeFile)
            return None
            # If a file exists exec it first
            startupFile = os.path.join(os.path.split(customCodeFile)[0], '')
            if os.path.exists(startupFile):
                    exec(compile(open(startupFile).read(), startupFile, 'exec'))
                except Exception as e:
                    print('ERROR execing custom server interface startup code:', startupFile)
                    return None
            import inspect
            from core import CCP4Utils
            module, err = CCP4Utils.importFileModule(customCodeFile)
            if err is not None:
                print('ERROR loading custom code file:' + customCodeFile)
                return None
            clsList = inspect.getmembers(module, inspect.isclass)
            if len(clsList) == 0:
                print('ERROR no classes found in custom code file:' + customCodeFile)
                return None
            if self._diagnostic:
                print('CServerParams.customHandler instantiating customHandler', clsList[0][0], self._serverParams)
                self._customHandler[customCodeFile] = clsList[0][1](self.serverParams)
            except Exception as e:
                print('ERROR instantiating custom handler class')
                return None
            return self._customHandler[customCodeFile]

    def listRemoteProcesses(self):
        import functools
        runningJobs = self.getJobsToPollFinish(pollFinish=[0, 1])
        jobsByMachine = {}
        for jobId in runningJobs:
            sP = self.serverParams(jobId)
            if sP.machine not in jobsByMachine:
                jobsByMachine[sP.machine] = {}
            jobsByMachine[sP.machine][jobId] = sP.serverProcessId
        for machine, jobDict in list(jobsByMachine.items()):
            comLine=self.getServerParam(jobId, 'ccp4Dir') + '/bin/ccp4-python ' + \
                     self.getServerParam(jobId,'ccp4Dir') + '/share/ccp4i2-devel/bin/'
            self.sendSSHCommand(list(jobDict.keys())[0], comLine=comLine,
                                retHandler=functools.partial(self.handleListRemoteProcesses, machine, jobDict), emitFail=False)
        # get qsub status
        runningJobs = self.getJobsToPollFinish(pollFinish=[2])

    def handleListRemoteProcesses(self, machine, jobDict, jobId, out, err):
        #print 'handleListRemoteProcesses out',out
        #print 'handleListRemoteProcesses err',err
        # Extract the processes and time from the remote job stdout
        for line in out.split('\n'):
            if line.startswith('processes') or line.startswith('atTime'):
                #print 'handleListRemoteProcesses line',line
                except Exception as e:
                    print('Failed evaling the return from listRemoteProcesses')
                    print('Error:', e)
        #print 'processes',processes
        #print 'atTime',atTime
            self.emit(QtCore.SIGNAL('remoteProcessesList'), machine, jobDict, processes, atTime) # KJS : Another basic error here by the looks of it.
            print('ERROR in handling listing of remote processes')
            print('execing line', line)

Reply via email to