Dear Michael,
   I think that the port number is hardwired in CCP4i2 on line 439 of
$CCP4/share/ccp4i2/core/CCP4JobServer.py

        transport = paramiko.Transport((sP.machine, 22))

also there are various calls to the "paramiko" library which handles ssh,
which do not pass a port number and so default to 22.

This is all bad.

I have attached a replacement for this file. Could you try making a backup
of the above file and copying the attached file in its place.
I have put a constant for the port number in the line

PARAMIKO_PORT=22

near the top of the file. Then change this port number for what is suitable
for you.

If this works, then I will make the port number an option in the program.
If not, then I will try something else.


Best wishes,
Stuart McNicholas

On Mon, 8 Jun 2020 at 17:40, Michael Weyand <michael.wey...@uni-bayreuth.de>
wrote:

> Dear CCP4I2 experts,
>
> I'm trying to submit remote jobs via SSH within CCP4i2. Unfortunately,
> we use a non standard SSH port.
> So far, I'm not able to add any option within the CCP4I2 interface. For
> job submission, I need something to do like
>
> 'ssh -p XXXXX -Y ..... ccp4i-specific-command ...'.
>
> Is there any chance to enter this '-p' SSH option within CCP4i2? I tried
> already to define a "ssh command" via "Preferences".
> But so far without any success.
>
> I'm also wondering why there is a sshkey file option. If an user already
> uses a ssh-key for a client/server connection, why is necessary to
> re-input the key file again?
> Any ssh between client and server should work from scratch ...
>
> Any hints are highly appreciated,
> Michael
>
> ########################################################################
>
> To unsubscribe from the CCP4BB list, click the following link:
> https://www.jiscmail.ac.uk/cgi-bin/webadmin?SUBED1=CCP4BB&A=1
>
> This message was issued to members of www.jiscmail.ac.uk/CCP4BB, a
> mailing list hosted by www.jiscmail.ac.uk, terms & conditions are
> available at https://www.jiscmail.ac.uk/policyandsecurity/
>

########################################################################

To unsubscribe from the CCP4BB list, click the following link:
https://www.jiscmail.ac.uk/cgi-bin/webadmin?SUBED1=CCP4BB&A=1

This message was issued to members of www.jiscmail.ac.uk/CCP4BB, a mailing list 
hosted by www.jiscmail.ac.uk, terms & conditions are available at 
https://www.jiscmail.ac.uk/policyandsecurity/
from __future__ import print_function


"""
     CCP4JobServer.py: CCP4 GUI Project
     Copyright (C) 2016 STFC

     This library is free software: you can redistribute it and/or
     modify it under the terms of the GNU Lesser General Public License
     version 3, modified in accordance with the provisions of the 
     license to address the requirements of UK law.
 
     You should have received a copy of the modified GNU Lesser General 
     Public License along with this library.  If not, copies may be 
     downloaded from http://www.ccp4.ac.uk/ccp4license.php
 
     This program is distributed in the hope that it will be useful,
     but WITHOUT ANY WARRANTY; without even the implied warranty of
     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     GNU Lesser General Public License for more details.
"""

"""
   Liz Potterton Apr 2016 - Separate 'remote' server code out from CCP4JobController
"""
import os
import re
from PyQt4 import QtCore
from core.CCP4ErrorHandling import *

PARAMIKO_PORT=22

class CServerParams:
    SAVELIST = ['machine', 'username', 'ccp4Dir', 'tempDir', 'mechanism', 'keyFilename', 'serverProcessId',
                'remotePath', 'validate', 'customCodeFile', 'queueOptionsFile', 'sge_root', 'serverGroup']

    def __init__(self, **kw):
        self.jobId = kw.get('jobId', None)
        self.password = kw.get('password', None)
        self.jobNumber = None
        self.projectId = None
        self.projectName = None
        self.projectDirectory = None
        # pollFinish flags: 0=no poll (eg ssh expecting a signal) 1=poll for FINISHED file 2=poll qsub
        #                   3=custom poll (started in current i2 session) 4 = custom poll (from previous i2 session)
        self.pollFinish = 0
        self.pollReport = 0
        for item in CServerParams.SAVELIST:
            setattr(self, item, kw.get(item, None))
        self.serverProcessId = None

    def __str__(self):
        tx = ''
        for item in CServerParams.SAVELIST + ['jobNumber', 'projectName', 'projectDirectory', 'pollFinish', 'pollReport']:
            tx = tx + item + ': ' + str(getattr(self, item)) + '\n'
        return tx

    @property
    def remote_finished_tarball(self):
        return self.remotePath + 'finished.ccp4db.zip'

    @property
    def remote_report_file(self):
        if self.mechanism in ['ssh_shared', 'qsub_local', 'qsub_shared']:
            return None
        else:
            if self.remotePath is None:
                return 'work/project/CCP4_JOBS/job_' + self.jobNumber + '/program.xml'
            else:
                return self.remotePath + 'work/project/CCP4_JOBS/job_' + self.jobNumber + '/program.xml'

    @property
    def local_tarball(self):
        return os.path.join(self.projectDirectory, 'CCP4_TMP', 'job_' + self.jobNumber + '_setup.ccp4db.zip')

    @property
    def local_report_file(self):
        return os.path.join(self.projectDirectory, 'CCP4_JOBS', 'job_' + self.jobNumber, 'program.xml')

    @property
    def local_finished_tarball(self):
        return os.path.join(self.projectDirectory, 'CCP4_TMP', 'job_' + self.jobNumber + '_finished.ccp4db.zip')

    @property
    def dbXml(self):
        if self.mechanism not in ['ssh_shared', 'test', 'qsub_local', 'qsub_shared']:
            return os.path.join(self.projectDirectory, 'CCP4_TMP', 'DATABASE_job_' + self.jobNumber + '.db.xml')
        else:
            return os.path.join(self.projectDirectory, 'CCP4_JOBS', 'job_' + self.jobNumber, 'DATABASE.db.xml' )

    @property
    def finishedFile(self):
        if self.mechanism in ['ssh_shared', 'qsub_shared', 'qsub_local']:
            return os.path.join(self.projectDirectory, 'CCP4_JOBS', 'job_' + self.jobNumber, 'FINISHED')
        else:
            return self.remotePath[0:-1]+'.FINISHED'

    '''
    @property
    def jobNumber(self):
        """Get the jobNumber"""
        print 'jobNumber',self.jobId
        if self._jobNumber is None:
            jobInfo = self.db.getJobInfo(jobId=self.jobId,mode=['jobnumber','projectid'])
            self._jobNumber = jobInfo['jobnumber']
            self._projectId = jobInfo['projectid']
        return self._jobNumber

    @property
    def projectName(self):
        """Get the projectName"""
        if self._projectName is None:
            if self._projectId is None: self.jobNumber()
            projectInfo = self.db.getProjectInfo(projectId=self._projectId)
            self._projectName = projectInfo['projectname']
            self._projectDirectory = projectInfo['projectdirectory']
        return self._projectName

    @property
    def projectDirectory(self):
        """Get the projectDirectory"""
        print 'projectDirectory',self._projectId
        if self._projectDirectory is None:
            if self._projectId is None: self.jobNumber()
            projectInfo = self.db.getProjectInfo(projectId=self._projectId)
            self._projectName = projectInfo['projectname']
            self._projectDirectory = projectInfo['projectdirectory']
        return self._projectDirectory
    '''

    def getEtree(self):
        from lxml import etree
        ele = etree.Element('serverParams')
        ele.set('jobId', str(self.jobId))
        for key in CServerParams.SAVELIST:
            if getattr(self, key, None) is not None:
                e = etree.Element(key)
                e.text = str(getattr(self,key))
                ele.append(e)
        return ele

    def setEtree(self,ele):
        self.jobId = ele.get('jobId')
        for e in ele.iterchildren():
            setattr(self, str(e.tag), str(e.text))
        self.setDbParams()

    def setDbParams(self):
        from core import CCP4Modules
        jobInfo = CCP4Modules.PROJECTSMANAGER().db().getJobInfo(self.jobId, ['jobnumber', 'projectid', 'projectname'])
        self.jobNumber = jobInfo['jobnumber']
        self.projectId = jobInfo['projectid']
        self.projectName = jobInfo['projectname']
        self.projectDirectory = CCP4Modules.PROJECTSMANAGER().db().getProjectInfo(self.projectId, 'projectdirectory')


class CJobServer:
    DB_KEYS = ['jobId', 'machine', 'username', 'mechanism', 'remotePath', 'customCodeFile', 'validate',
               'keyFilename', 'serverProcessId', 'serverGroup']
    ERROR_CODES = {301 : {'description' : 'Failed opening SSH connection'},
                   302 : {'description' : 'Failed opening SSH connection'},
                   303 : {'description' : 'Failed opening SSH connection'},
                   304 : {'description' : 'Failed opening SSH connection'},
                   320 : {'description' : 'Failed submitting job'},
                   330 : {'description' : 'Failed to open connection to copy files'},
                   331 : {'description' : 'Failed copying file to server'},
                   332 : {'description' : 'Failed copying file from server'},
                   340 : {'description' : 'Failed killing remote job - can not recover job id'},
                   350 : {'description' : 'Failed running job startup script on server'},
                   360 : {'description' : 'Failed to kill job - job id unknown'},
                   361 : {'description' : 'Failed to kill job - possible communication fail'},
                   362 : {'description' : 'Failed to find job status'},
                   363 : {'description' : 'Failed to recover remote process id'},
                   364 : {'description' : 'Failed to recover remote process status'},
                   365 : {'description' : 'Failed killing remote process - no information on the remote process'}}

    def __init__(self):
        from core import CCP4Utils
        import paramiko
        self._diagnostic = False
        self._serverParams = {}
        self.runInSSHThreads = []
        # Lists of outstanding jobs when i2 restarts - check if they are finished and
        # then poll in CJobController.doChecks()
        paramiko.util.log_to_file(os.path.join(CCP4Utils.getDotDirectory(), 'status', 'paramiko.log'))

    def restoreRunningJobs(self):
        #self.loadParams()
        self.restoreFromDb()

    def getJobsToPollFinish(self, pollFinish=[]):
        ret = []
        for jobId,sP in list(self._serverParams.items()):
            if sP.pollFinish in pollFinish:
                ret.append(jobId)
        return ret

    def getJobsToPollReport(self):
        jobIdList = []
        for jobId,sP in list(self._serverParams.items()):
            if sP.pollReport > 0:
                jobIdList.append(jobId)
        return jobIdList

    def Exit(self):
        if len(self._serverParams) > 0:
            self.saveParams()
        elif os.path.exists(self.defaultParamsFileName()):
            os.remove(self.defaultParamsFileName())

    def serverParams(self, jobId):
        try:
            return self._serverParams[jobId]
        except:
            return None

    def saveParams(self, fileName=None):
        from lxml import etree
        from core import CCP4File
        if fileName is None:
            fileName = self.defaultParamsFileName()
        fileObj = CCP4File.CI2XmlDataFile(fileName)
        fileObj.header.setCurrent()
        fileObj.header.function = 'JOBSERVERSTATUS'
        bodyEtree = etree.Element('serverParamsList')
        for jobId, sP in list(self._serverParams.items()):
            bodyEtree.append(sP.getEtree())
        fileObj.saveFile(bodyEtree)

    def defaultParamsFileName(self):
        from core import CCP4Utils
        return os.path.join(CCP4Utils.getDotDirectory(), 'status', 'jobServer.params.xml')

    def restoreFromDb(self):
        from core import CCP4Modules
        jobInfoList = CCP4Modules.PROJECTSMANAGER().db().getServerJobs()
        #print 'restoreFromDb',jobInfoList
        for jobInfo in jobInfoList:
            sP = self._serverParams[jobInfo[0]] = CServerParams()
            ii = 0
            for key in self.DB_KEYS:
                setattr(sP, key, jobInfo[ii])
                ii += 1
            sP.setDbParams()
            self.setPollStatus(sP)

    def setPollStatus(self, sP):
        if sP.mechanism in ['qsub']:
            sP.pollFinish = 2
        elif sP.mechanism in ['qsub_remote', 'ssh', 'ssh_shared', 'slurm_remote']:
            if sP.validate in ['password', 'pass_key_filename']:
                sP.pollFinish = -1
            else:
                sP.pollFinish = 1
        elif sP.mechanism in ['custom']:
            sP.pollFinish = 4
        # And set to poll for report update
        if sP.remote_report_file is not None:
            sP.pollReport = 1

    def loadParams(self, fileName=None):
        from core import CCP4File
        if fileName is None:
            fileName = self.defaultParamsFileName()
        if not os.path.exists(fileName):
            return
        fileObj = CCP4File.CI2XmlDataFile(fileName)
        body = fileObj.getBodyEtree()
        #print 'CJobServer.loadParams body',body.tag
        # pollFinish flags: 0=no poll (eg ssh expecting a signal) 1=poll for FINISHED file 2=poll qsub
        #                   3=custom poll (started in current i2 session) 4 = custom poll (from previous i2 session)
        for ele in body.iterchildren():
            jobId = ele.get('jobId')
            if len(ele) > 0:
                sP = self._serverParams[jobId] = CServerParams()
                sP.setEtree(ele)
                sP.setDbParams()
                self.setPollStatus(sP)

    def createServerParams(self, jobId, params):
        from core import CCP4Utils
        self._serverParams[jobId] = params
        self._serverParams[jobId].jobId = jobId
        jobInfo = self.db.getJobInfo(jobId=jobId, mode=['jobnumber', 'projectid'])
        self._serverParams[jobId].jobNumber = jobInfo['jobnumber']
        self._serverParams[jobId].projectId = jobInfo['projectid']
        projectInfo = self.db.getProjectInfo(projectId=self._serverParams[jobId].projectId)
        self._serverParams[jobId].projectName = CCP4Utils.safeOneWord(projectInfo['projectname'])
        self._serverParams[jobId].projectDirectory = projectInfo['projectdirectory']
        #print 'setServerParams',jobId, self._serverParams[jobId]
        self.db.createServerJob(jobId, self._serverParams[jobId])

    def setServerParam(self, jobId, key, value):
#Trying to set the keys below is bad on Python3 and I guess not neccessary as surely property method should override?
        if jobId not in self._serverParams or key in ("remote_finished_tarball","remote_report_file","local_tarball","local_report_file","local_finished_tarball","dbXml","finishedFile"):
            pass
        else:
            setattr(self._serverParams[jobId] ,key, value)
            if key in self.DB_KEYS:
                self.db.updateServerJob(jobId, key, value)

    def getServerParam(self, jobId, key):
        from core import CCP4Modules
        if jobId not in self._serverParams:
            return None
        rv = getattr(self._serverParams[jobId], key, None)
        if rv is not None:
            return rv
        if key in ['ccp4Dir']:
            #Try recovering info from the server setup data
            #This is in a def file (.CCP4I2/configs/serverSetup.def.xml) and loaded into
            # a container with each serverGroup labelled SERVERGROUPn
            serverGroup = getattr(self._serverParams[jobId], 'serverGroup', None)
            if serverGroup is None:
                return None
            if getattr(self, 'setupContainer', None) is None:
                self.setupContainer = CCP4Modules.SERVERSETUP()
                self.setupContainer.load()
            for dataObjName in self.setupContainer.dataOrder():
                if self.setupContainer.get(dataObjName).name == serverGroup:
                    value = self.setupContainer.get(dataObjName).get(key)
                    #print 'getServerParam from SERVERSETUP',key,dataObjName,value
                    setattr(self._serverParams[jobId], key, value)
                    return value
        return None
 
    def deleteServerParams(self, jobId):
        try:
            self.db.deleteServerJob(jobId)
        except Exception as e:
            print('ERROR removing server job record from database\n',e)
        if jobId in self._serverParams:
            del self._serverParams[jobId]
        else:
            return
  
    def deleteServerParam(self, jobId, key):
        try:
            delattr(self._serverParams[jobId], key)
        except:
            pass
  
    def patchRemotePasswords(self, jobId,password):
        sP = self.serverParams(jobId)
        count = 0
        for jI in list(self._serverParams.keys()):
            if jI == jobId or (self._serverParams[jI].pollFinish < 0 and \
                               self._serverParams[jI].machine == sP.machine and self._serverParams[jI].username == sP.username):
                count += 1
                self._serverParams[jI].password = password
                self._serverParams[jI].pollFinish = 1
                self._serverParams[jI].pollReport = 1
        return count

    def checkForRemotePasswords(self, projectId):
        requirePass = []
        for jobId in self.getJobsToPollFinish([-1, -2, -3, -4]):
            sP = self.serverParams(jobId)
            if sP.pollFinish < 0 and sP.projectId == projectId:
                requirePass.append({'jobId' : jobId , 'machine' : sP.machine, 'username' : sP.username})
        return requirePass

    def getPKey(self,jobId):
        import paramiko
        from core import CCP4Utils
        sP = self.serverParams(jobId)
        keyFilename = re.sub(r'\$HOME', CCP4Utils.getHOME(), sP.keyFilename)
        if sP.validate == 'pass_key_filename': 
            pkey = paramiko.RSAKey.from_private_key_file(keyFilename, password=sP.password)
        else:
            pkey = paramiko.RSAKey.from_private_key_file(keyFilename)
        #print 'CJobServer.getPKey', keyFilename
        return pkey

    def pollQsubStat(self, mode='finished'):
        import subprocess
        procCheck = subprocess.Popen(["qstat", "-xml"], stdout=subprocess.PIPE, universal_newlines=True, stderr=subprocess.STDOUT)
        outCheck,errCheck = procCheck.communicate()
        if self._diagnostic:
            print('pollQsubStat', outCheck, errCheck)
        if mode == 'finished':
            qsubIdDict= self.parseQsubStat(outCheck)
            for jobId in self.getJobsToPollFinish([2]):
                if self.serverParams(jobId).serverProcessId not in qsubIdDict:
                    self.handleFinishedServerJob(jobId)
        elif mode == 'status':
            return self.parseQsubStat(outCheck, mode)

    def handleFinishedServerJob(self, jobId):
        # Should be reimplemented in application
        # Do whatever necessary to retrieve and use job
        # Remember to delete from self.serverParams
        self.setServerParam(jobId, 'pollFinish', 0)
        self.setServerParam(jobId, 'pollReport', 0)

    def parseQsubStat(self, returnString, mode='finished'):
        from lxml import etree
        parser = etree.XMLParser()
        tree = etree.fromstring(returnString, parser)
        if mode == 'finished':
            qsubIdDict = {}
            jobs = tree.xpath("job_info|queue_info")
            for job in jobs:
                for jobl in job.xpath("job_list"):
                    qsubIdDict[str(jobl.xpath("JB_job_number")[0].text)] = str(jobl.attrib["state"])
            #print 'parseQsubStat',qsubIdDict
            return qsubIdDict
        elif mode == 'status':
            return {}


    def pidFile(self, jobId, local=False):
        sP = self.serverParams(jobId)
        if local or sP.mechanism == 'ssh_shared':
            return  os.path.join(sP.projectDirectory, 'CCP4_JOBS', 'job_' + sP.jobNumber, 'pidfile.txt')
        elif sP.mechanism == 'ssh':
            return sP.remotePath + 'pidfile.txt'
        else:
            return None

    def transportFiles(self, jobId, copyList=[], mode='put', finishHandler=None, failSignal=True, diagnostic=True):
        import functools
        import UtilityThread
        sP = self.serverParams(jobId)
        #print  'transport files',mode,copyList
        if getattr(sP, 'sshThreadTransport', None) is not None:
            return False
        if finishHandler is None:
            finishHandler = self._transportFilesFinished
        sP.sshThreadTransport = UtilityThread.UtilityThread(functools.partial(self._transportFiles, jobId, copyList, mode, failSignal, diagnostic))
        sP.sshThreadTransport.finished.connect(functools.partial(finishHandler, jobId))
        self.runInSSHThreads.append(sP.sshThreadTransport)
        sP.sshThreadTransport.start()
        return True

    def _transportFiles(self, jobId, copyList=[], mode='put', failSignal=True, diagnostic=True):
        import paramiko
        sP = self.serverParams(jobId)
        try:
            transport = paramiko.Transport((sP.machine, PARAMIKO_PORT))
            if sP.validate in ['key_filename', 'pass_key_filename'] and sP.keyFilename is not None and len(sP.keyFilename) > 0:
                transport.connect(username=sP.username, password=sP.password, pkey=self.getPKey(jobId))
            else:
                transport.connect(username=sP.username, password=sP.password)
            sftp = paramiko.SFTPClient.from_transport(transport)
        except Exception as e:
            print('ERROR setting up paramiko FTP file transfer')
            print(e)
            err = CException(self.__class__, 330, 'Machine:' + str(self.getServerParam(jobId, 'machine')) + '\n' + str(e))
            if failSignal:
                self.emit(QtCore.SIGNAL('failedOpenConnection'), jobId, err)
            raise err
        err = CException()
        for localName,remoteName in copyList:
            try:
                if mode == 'put':
                    sftp.put(localName, remoteName)
                    if diagnostic:
                        print("Copied", localName, 'to', remoteName)
                else:
                    sftp.get(remoteName, localName )
                    if diagnostic:
                        print("Copied", remoteName, 'to', localName)
            except Exception as e:
                if mode == 'put':
                    if diagnostic:
                        print('ERROR copying files', str(localName), 'to', str(remoteName))
                    err.append(self.__class__, 331, 'Local:' + str(localName) + ' Remote:' + str(remoteName) + '\n' + str(e))
                else:
                    if diagnostic:
                        print('ERROR copying files',str(localName),'from',str(remoteName))
                    err.append(self.__class__, 332, 'Local:' + str(localName) + ' Remote:' + str(remoteName) + '\n' + str(e))
        sftp.close()
        transport.close()
        if len(err) > 0:
            if failSignal:
                self.emit(QtCore.SIGNAL('failedOpenConnection'), jobId, err)
            #raise err

    def _transportFilesFinished(self,jobId):
        #print '_transportFilesFinished',jobId
        self.deleteServerParam(jobId,'sshThreadTransport')

    def openSSHConnection(self, jobId, machine, username, password, keyFilename=None, timeout=None, maxTries=2, emitFail=True):
        import paramiko,socket
        client = paramiko.SSHClient()
        client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        connected= False
        ntries = 0
        #print 'openSSHConnection',machine,username,keyFilename,timeout,maxTries
        self.emit(QtCore.SIGNAL('testMessage'), 'Connecting to ' + machine + ' with timeout ' + str(timeout))
        while ntries < maxTries and not connected:
            failCode = 0
            try:
                if self._diagnostic:
                    print('TO client.connect', ntries)
                if keyFilename is not None and keyFilename != 'None' and len(keyFilename) > 0: 
                    client.connect(machine, port=PARAMIKO_PORT, username=username, password=password, key_filename=keyFilename, timeout=timeout)
                else:
                    client.connect(machine, port=PARAMIKO_PORT, username=username, password=password, timeout=timeout)
                #print 'DONE client.connect'
                connected=True
            except paramiko.ssh_exception.AuthenticationException:
                failCode = 1
                ntries = maxTries
                mess = 'Authentication failed'
            except paramiko.ssh_exception.BadAuthenticationType:
                failCode = 2
                mess = 'Authentication type not supported'
            except socket.gaierror:
                failCode = 3
                mess = 'Socket error'
            except Exception as e:
                failCode = 4
                mess = str(e)
                print(e)
            #print 'runOnServer try', ntries, 'failCode', failCode
            if not connected:
                self.emit(QtCore.SIGNAL('testMessage'), '   Try: ' + str(ntries) + ' failed: ' + str(mess))
            else:
                self.emit(QtCore.SIGNAL('testMessage'), '   Try: ' + str(ntries) + ' succeeded')
            ntries = ntries + 1
        if not connected:
            print("Some error submitting job")
            exc_type, exc_value, exc_tb = sys.exc_info()[:3]
            print("Error running job using ssh" + str(exc_type) + "\n" + str(exc_value))
            err = CException(self.__class__, 300 + failCode, 'Machine:' + machine + '\n' + str(exc_type) + "\n" + str(exc_value) + '\n')
            if emitFail:
                self.emit(QtCore.SIGNAL('failedOpenConnection'), jobId, err)
            raise err
        else:
            return client

    def testRemoteFiles(self, jobId=None, machine=None, username=None, password=None, keyFilename=None, remoteFileList=[], timeout=None, maxTries=2):
        import UtilityThread
        import functools
        if self.getServerParam(jobId, 'sshThreadRemoteFiles') is not None:
            return
        sP = self.serverParams(jobId)
        sP.sshThreadRemoteFiles = UtilityThread.UtilityThread(functools.partial(self._testRemoteFiles, jobId, machine, username, password,
                                                                                keyFilename, remoteFileList, timeout, maxTries))
        sP.sshThreadRemoteFiles.finished.connect(functools.partial(self._sendSSHFinished, jobId, None, 'sshThreadRemoteFiles'))
        self.runInSSHThreads.append(sP.sshThreadRemoteFiles)
        sP.sshThreadRemoteFiles.start()

    def _testRemoteFiles(self, jobId=None, machine=None, username=None, password=None, keyFilename=None, remoteFileList=[], timeout=None, maxTries=2):
        #print 'CJobServer._testRemoteFiles',jobId,machine,username,remoteFileList
        ret = []
        if isinstance(jobId, list):
            jI = jobId[0]
        else:
            jI = jobId
        try:
            client = self.openSSHConnection(jobId=jI, machine=machine, username=username, password=password,
                                            keyFilename=keyFilename, timeout=timeout, maxTries=maxTries)
        except Exception as e:
            print('testRemoteFiles fail\n',e)
            for item in remoteFileList:
                ret.append(False)
            return ret
        for remoteFile in remoteFileList:
            stdin, stdout, stderr = client.exec_command('[ -e ' + remoteFile + ' ] && echo "Found" || echo "Lost"')
            out = ""
            err = ""
            for line in stdout:
                out += line
            for line in stderr:
                err += line
            if self._diagnostic:
                print('testRemoteFiles', remoteFile, out, '*', err)
            if out.count('Found') > 0:
                self.emit(QtCore.SIGNAL('testMessage'), '   File: ' + str(remoteFile) + ' exists')
            else:
                self.emit(QtCore.SIGNAL('testMessage'), '   File: ' + str(remoteFile) + ' not found')
            ret.append(out.count('Found') > 0)
        client.close()
        self.deleteServerParam(jobId, 'sshThreadRemoteFiles')
        self.emit(QtCore.SIGNAL('testRemoteFiles'), jobId, ret)

    def runOnServer(self, jobId=None):
        '''Run ssh or qsub with or without shared file system
         reimplement in application'''
        pass

    def runInQsub(self, jobId, remoteSh, optionsFile=None, mechanism='qsub_local'):
        import subprocess
        path,name = os.path.split(remoteSh)
        name = name[0:-9]
        stdout = os.path.join(path, name + 'qsub_stdout.txt')
        stderr = os.path.join(path, name + 'qsub_stderr.txt')
        sP = self.serverParams(jobId)
        comList = ["qsub", '-o', stdout, '-e', stderr, '-S', '/bin/sh', remoteSh]
        if optionsFile is not None:
            comList.insert(1, '@')
            comList.insert(2, optionsFile)
        if mechanism == 'qsub_local':
            try:
                proc = subprocess.Popen(comList, stdout=subprocess.PIPE, universal_newlines=True, stderr=subprocess.STDOUT)
                out,err = proc.communicate()
                if self._diagnostic:
                    print('qsub start', out, err)
            except:
                exc_type, exc_value,exc_tb = sys.exc_info()[:3]
                print("Error running job using remote queue command 'qsub'\n\n" + str(exc_type) + "\n\n" + str(exc_value))
                raise CException(self.__class__, 320)
            else:
                try:
                    self.setServerParam(jobId, 'serverProcessId', out[:out.find('(')].rstrip().split()[-1])
                    self.setServerParam(jobId, 'pollFinish',1)
                    self.setServerParam(jobId, 'pollReport',1)
                except:
                    pass
        else:
            # Use ssh to submit qsub on a different machine
            if sP.sge_root is not None:
                comLine = 'SGE_ROOT=' + str(sP.sge_root) + ' ' + comList[0]
            else:
                comLine = comList[0]
            for item in comList[1:]:
                comLine = comLine + ' ' + item
            self.sendSSHCommand(jobId, comLine=comLine, retHandler=self.handleRemoteQsubStart)
            self.setServerParam(jobId, 'pollFinish', 1)
            self.setServerParam(jobId, 'pollReport', 1)

    def runInSlurm(self, jobId, remoteSh, optionsFile=None, mechanism='slurm_remote'):
        import subprocess
        path,name = os.path.split(remoteSh)
        name = name[0:-9]
        stdout = os.path.join(path, name + 'slurm_stdout.txt')
        stderr = os.path.join(path, name + 'slurm_stderr.txt')
        sP = self.serverParams(jobId)
        comList = ["sbatch", '-o', stdout, '-e', stderr, remoteSh]
        if optionsFile is not None:
            comList.insert(1, '@')
            comList.insert(2, optionsFile)
        if mechanism == 'slurm_local':
            try:
                proc = subprocess.Popen(comList, stdout=subprocess.PIPE, universal_newlines=True, stderr=subprocess.STDOUT)
                out,err = proc.communicate()
                if self._diagnostic:
                    print('slurm start', out, err)
            except:
                exc_type, exc_value,exc_tb = sys.exc_info()[:3]
                print("Error running job using command 'sbatch'\n\n" + str(exc_type) + "\n\n" + str(exc_value))
                raise CException(self.__class__, 320)
            else:
                try:
                    match = re.search(r'[0-9]+',out)
                    pid = -999
                    if match:
                        pid = match.group(0)
                    self.setServerParam(jobId, 'serverProcessId', pid)
                    self.setServerParam(jobId, 'pollFinish',1)
                    self.setServerParam(jobId, 'pollReport',1)
                except:
                    pass
        else:
            # Use ssh to submit to Slurm on a different machine
            comLine = comList[0]
            for item in comList[1:]:
                comLine = comLine + ' ' + item
            self.sendSSHCommand(jobId, comLine=comLine, retHandler=self.handleRemoteSbatchStart)
            self.setServerParam(jobId, 'pollFinish', 1)
            self.setServerParam(jobId, 'pollReport', 1)
    

    def handleRemoteQsubStart(self, jobId, out, err):
        self.setServerParam(jobId,'serverProcessId', out[:out.find('(')].rstrip().split()[-1])
        if self._diagnostic:
            print('handleRemoteQsubStart', self.serverParams(jobId).serverProcessId) 

    def handleRemoteSbatchStart(self, jobId, out, err):
        # parse the output from the sbatch command
        pid = -999
        match = re.search(r'[0-9]+',out)
        if match:
            pid = match.group(0)
        # TODO: add some error handling
        self.setServerParam(jobId,'serverProcessId', pid)
        if self._diagnostic:
            print('handleRemoteSbatchStart', self.serverParams(jobId).serverProcessId)

    def sendSSHCommand(self, jobId, comLine, finishHandler=None, retHandler=None, emitFail=True):
        import functools
        import UtilityThread
        from core import CCP4Utils
        if self._diagnostic:
            print('Remote server command:', comLine)
        sP = self.serverParams(jobId)
    
        sP.sshThread = UtilityThread.UtilityThread(functools.partial(self._sendSSHCommand, jobId, comLine, emitFail, retHandler))
        if finishHandler is not None:
            sP.sshThread.finished.connect(functools.partial(finishHandler, jobId))
        self.runInSSHThreads.append(sP.sshThread)
        sP.sshThread.start()
        #print 'runInSSH DONE'

    def _sendSSHCommand(self, jobId, comLine, emitFail=True, retHandler=None):
        from core import CCP4Utils
        sP = self.serverParams(jobId)
        if sP.keyFilename is not None and len(sP.keyFilename) > 0:
            if sP.keyFilename.count('HOME'):
                keyFilename = re.sub(r'\$HOME', CCP4Utils.getHOME(), sP.keyFilename)
            else:
                keyFilename = sP.keyFilename
        else:
            keyFilename = None
        sP.sshClient = self.openSSHConnection(jobId, sP.machine, sP.username, sP.password, keyFilename, emitFail=emitFail)
        stdin, stdout, stderr = sP.sshClient.exec_command(str(comLine))
        out = ""
        err = ""
        for line in stdout:
            out += line
        for line in stderr:
            err += line
        if self._diagnostic:
            print('Remote start stdout:', out)
            print('Remote start stderr:', err)
        if len(err) > 0:
            exc = CException(self.__class__, 350, err)
            if emitFail:
                self.emit(QtCore.SIGNAL('failedRemoteCommand'), jobId, exc)
        if retHandler:
            try:
                retHandler(jobId, out, err)
            except Exception as e:
                print('ERROR handling return output from SSH command')
                print(e)
        return out, err

    def _sendSSHFinished(self, jobId, clientParam='sshClient', threadParam='sshThread'):
        #print "Finished remote job",jobId,threadParam
        #print '_runInSSHFinished',self.serverParams[jobId].mechanism
        sP = self.serverParams(jobId)
        if sP is None:
            return
        if clientParam is not None and hasattr(sP, clientParam):
            getattr(sP, clientParam).close()
            #getattr(sP, clientParam).__del__() - daft but how do we be show it is zapped?
        if hasattr(sP, threadParam):
            if getattr(sP, threadParam).retval is not None:
                stdout, stderr = getattr(sP,threadParam).retval
                if self._diagnostic:
                    print('Remote finish stdout:', stdout)
                    print('Remote finish stderr:', stderr)
            else:
                print('Failed analysing return from remote command')
        if hasattr(sP, threadParam):
            self.runInSSHThreads.remove(getattr(sP, threadParam))
            #getattr(sP,threadParam).__del__() -- this is nonsense for a thread

    def runInSSH(self, jobId, remoteSh):
        comLine = 'sh ' + remoteSh
        self.sendSSHCommand(jobId, comLine, finishHandler=self._sendSSHFinished, retHandler=self.handleRunInSSHDone)
        #retHandler=self.handleRunInSSHDone)
        self.setServerParam(jobId, 'pollReport', 1)
        self.setServerParam(jobId, 'pollFinish', 1)
        
    def _runInSSHFinished(self, jobId):
        self._sendSSHFinished(jobId)
        self.handleFinishedServerJob(jobId)

    def handleRunInSSHDone(self, jobId, out, err):
        if self._diagnostic: 
            print('handleRunInSSHDone', jobId, 'out', out, 'err', err)
        for line in out.split('\n'):
            if line.startswith('PID='):
                self.setServerParam(jobId, 'serverProcessId', int(line[4:]))
                break

    def checkRemoteStatus(self, jobId):
        sP = self.serverParams(jobId)
        if sP is None:
            return
        if sP.mechanism in ['qsub_remote', 'qsub_shared']:
            self.checkRemoteQsubStatus(jobId)
            return True
        elif sP.mechanism in ['slurm_remote']:
            self.checkRemoteSlurmStatus(jobId)
            return True
        elif sP.mechanism in ['qsub_local']:
            self.checkLocalQsubStatus(jobId)
            return True
        elif sP.mechanism in ['ssh', 'ssh_shared']:
            self.checkSSHJobStatus(jobId)
            return True

    def checkSSHJobStatus(self,jobId):
        sP = self.serverParams(jobId)
        if sP.serverProcessId is None:
            self.getPidFileContent(jobId)
        else:
            #comLine = 'pgrep -P '+str(sP.serverProcessId)+';ps -fu '+sP.username
            comLine = 'ps -fu ' + sP.username
            self.sendSSHCommand(jobId, comLine=comLine, retHandler=self.handleRemoteSSHStatus, emitFail=False)

    def handleRemoteSSHStatus(self, jobId, out, err):
        if self._diagnostic:
            print('handleRemoteSSHStatus', jobId, out, err)
        sP = self.serverParams(jobId)
        self.emit(QtCore.SIGNAL('remoteJobMessage'), jobId, 'The process id for this job: ' + str(sP.serverProcessId), out, sP.machine)

    def checkLocalQsubStatus(self, jobId):
        try:      # KJS : Problem here. This func looks broken.
            proc = subprocess.Popen(comList='qstat', stdout=subprocess.PIPE, universal_newlines=True, stderr=subprocess.STDOUT)
            out,err = proc.communicate()
            if self._diagnostic:
                print('checkLocalQsubStatus', out, err)
        except:
            exc_type, exc_value,exc_tb = sys.exc_info()[:3]
            print("Error checking qsub status\n" + str(exc_type) + "\n" + str(exc_value))
            raise CException(self.__class__, 362)
        else:
            self.handleRemoteQsubStatus(jobId, out, err)

    def checkRemoteQsubStatus(self, jobId):
        sP = self.serverParams(jobId)
        if sP is None:
            return False
        comline = 'qstat'
        if sP.sge_root is not None:
            comLine = 'SGE_ROOT=' + str(sP.sge_root) + ' ' + comline
        self.sendSSHCommand(jobId, comLine=comLine, retHandler=self.handleRemoteQsubStatus, emitFail=False)

    def checkRemoteSlurmStatus(self, jobId):
        sP = self.serverParams(jobId)
        if sP is None:
            return False
        comline = 'squeue -h -o "%A" --job ' + str(jobId)
        self.sendSSHCommand(jobId, comLine=comLine, retHandler=self.handleRemoteSlurmStatus, emitFail=False)

    def handleRemoteQsubStatus(self, jobId, out, err):
        # VU: It seems this function is not used at all.
        sP = self.serverParams(jobId)
        # VU: this check makes no sense, if the cluster runs other jobs, too
        if len(out) < 10:
            out = 'No jobs in queue (imples all finished/failed)'
        self.emit(QtCore.SIGNAL('remoteJobMessage'), jobId, 'QSub status on ' + sP.machine, out, sP.machine)
        #print('handleRemoteQsubStatus: '+out)

    def handleRemoteSlurmStatus(self, jobId, out, err):
        sP = self.serverParams(jobId)
        if str(jobId) not in out:
            out = 'The job is not in queue (implies it has finished or failed).'
        self.emit(QtCore.SIGNAL('remoteJobMessage'), jobId, 'Squeue status on ' + sP.machine, out, sP.machine)

    def getPidFileContent(self, jobId):
        from core import CCP4Utils
        sP = self.serverParams(jobId)
        if sP.mechanism == 'ssh':
            self.transportFiles(jobId,[[self.pidFile(jobId, local=True), self.pidFile(jobId)]], 'get', finishHandler=self.getPidFileContent2, failSignal=False)
        else:
            self.getPidFileContent2(jobId)

    def getPidFileContent2(self,jobId):
        self._transportFilesFinished(jobId)
        from core import CCP4Utils
        ret = CCP4Utils.readFile(self.pidFile(jobId, local=True))
        try:
            ret= int(ret.strip())
        except:
            ret = None
        else:
            self.setServerParam(jobId, 'serverProcessId', ret)
            #comLine = 'pgrep -P ' + str(self.serverParams[jobId].serverProcessId)+';ps -ef'
            comLine = 'ps -efu ' + self.serverParams(jobId).username
            self.sendSSHCommand(jobId, comLine=comLine, retHandler=self.handleRemoteSSHStatus, emitFail=False)
        return ret

    def killSSHJob(self, jobId):
        sP =  self.serverParams(jobId)
        '''
        if sP.serverProcessId is None: self.getPidFileContent(jobId)
        if sP.serverProcessId is None:
           raise CException(self.__class__,340)
        '''
        self.sendSSHCommand(jobId, comLine='pkill -F ' + self.pidFile(jobId), retHandler=self.handleRemoteDel)

    def killQsubJob(self, jobId):
        sP = self.serverParams(jobId)
        if sP.serverProcessId is None:
            raise CException(self.__class__, 360)
        comList = ['qdel', sP.serverProcessId]
        if sP.mechanism in ['qsub_local']:
            try:    # KJS Again a problem.
                proc = subprocess.Popen(comList, stdout=subprocess.PIPE,universal_newlines=True, stderr=subprocess.STDOUT)
                out,err = proc.communicate()
                print('kill qsub',out,err)
            except:
                exc_type, exc_value,exc_tb = sys.exc_info()[:3]
                print("Error killing job\n"+str(exc_type)+"\n"+str(exc_value))
                raise CException(self.__class__,361)
            else:
                from dbapi import CCP4DbApi
                from core import CCP4Modules
                self.deleteServerParams(jobId)
                CCP4Modules.PROJECTSMANAGER().db().updateJobStatus(jobId=jobId, status=CCP4DbApi.JOB_STATUS_FAILED)
        elif sP.mechanism in ['qsub_remote']:
            comline = 'qdel ' + sP.serverProcessId
            self.sendSSHCommand(jobId, comLine='qdel ' + sP.serverProcessId, retHandler=self.handleRemoteDel)

    def killSlurmJob(self, jobId):
        sP = self.serverParams(jobId)
        if sP.serverProcessId is None:
            raise CException(self.__class__, 360)
        comList = ['scancel', sP.serverProcessId]
        if sP.mechanism in ['slurm_local']:
            try:
                proc = subprocess.Popen(comList, stdout=subprocess.PIPE,universal_newlines=True, stderr=subprocess.STDOUT)
                out,err = proc.communicate()
                print('kill Slurm job',out,err)
            except:
                exc_type, exc_value,exc_tb = sys.exc_info()[:3]
                print("Error killing job\n"+str(exc_type)+"\n"+str(exc_value))
                raise CException(self.__class__,361)
            else:
                from dbapi import CCP4DbApi
                from core import CCP4Modules
                self.deleteServerParams(jobId)
                CCP4Modules.PROJECTSMANAGER().db().updateJobStatus(jobId=jobId, status=CCP4DbApi.JOB_STATUS_FAILED)
        elif sP.mechanism in ['slurm_remote']:
            self.sendSSHCommand(jobId, comLine='scancel ' + sP.serverProcessId, retHandler=self.handleRemoteDel)

    def handleRemoteDel(self, jobId, out, err):
        from dbapi import CCP4DbApi
        from core import CCP4Modules
        if self._diagnostic:
            print('handleRemoteQsubDel', jobId, out, err)
        self.deleteServerParams(jobId)
        CCP4Modules.PROJECTSMANAGER().db().updateJobStatus(jobId=jobId, status=CCP4DbApi.JOB_STATUS_FAILED)

          
    def killRemoteJob(self, jobId):
        sP = self.serverParams(jobId)
        if sP is None:
            raise CException(self.__class__, 365)
        if sP.mechanism in ['ssh', 'ssh_shared']:
            self.killSSHJob(jobId)
        elif sP.mechanism in ['qsub_local', 'qsub_shared', 'qsub_remote']:
            self.killQsubJob(jobId)
        elif sP.mechanism in ['slurm_remote']:
            self.killSlurmJob(jobId)
        elif sP.mechanism in ['custom']:
            self.customHandler(jobid).killJob(jobId)

    def runLocalTest(self, jobId, remoteSh):
        from core import CCP4Modules
        CCP4Modules.PROCESSMANAGER().startProcess('sh', [remoteSh], handler=[self.localTestFinished, {'jobId':jobId}], ifAsync=True)

    def localTestFinished(self, pid, jobId=None):
        #print 'localTestFinished',pid,args
        self.handleFinishedServerJob(jobId)

    def pollForFinishedFlagFile(self):
        if getattr(self,'pollFinishConnected',None) is None:
            self.connect(self,QtCore.SIGNAL('testRemoteFiles'), self.pollForFinishedFlagFile1)
        pollByMachine = {}
        for jobId in self.getJobsToPollFinish([1]):
            sP = self.serverParams(jobId)
            if sP.mechanism in ['ssh', 'ssh_shared', 'qsub_remote', 'qsub_shared', 'qsub_local', 'slurm_remote']:
                if sP.mechanism in ['ssh_shared', 'qsub_shared', 'qsub_local']:
                    if os.path.exists(sP.finishedFile):
                        self.handleFinishedServerJob(jobId)
                else:
                    if sP.machine not in pollByMachine:
                        pollByMachine[sP.machine] = []
                    pollByMachine[sP.machine].append(jobId)
        #print 'pollForFinishedFlagFile pollByMachine',pollByMachine
        for machine, jobIdList in list(pollByMachine.items()):
            finfileList = []
            for jobId in jobIdList:
                sP = self.serverParams(jobId)
                if sP is not None:
                    finfileList.append (sP.finishedFile)
                #print 'pollForFinishedFlagFile finfileList',finfileList
            if len(finfileList) > 0:
                self.testRemoteFiles(jobId=jobId, machine=sP.machine, username=sP.username, password=sP.password, remoteFileList=finfileList)
            
              
    def pollForFinishedFlagFile1(self, jobId, fileStatus):
        if self._diagnostic:
            print('pollForFinishedFlagFile', jobId, fileStatus)
        if not isinstance(jobId,list):
            jobId = [jobId]
        for ii in range(len(jobId)):
            if fileStatus[ii]:
                self.handleFinishedServerJob(jobId[ii])

    def customHandler(self, jobId=None, customCodeFile=None):
        # This is creating one instance of the custom handler for each session
        # The alternative of one per job may be better for some cases???
        customCodeFile = str(customCodeFile)
        if not hasattr(self,'_customHandler'):
            self._customHandler = {}
        if jobId is not None:
            customCodeFile  = self.getServerParam(jobId, 'customCodeFile')
        if customCodeFile in self._customHandler:
            return self._customHandler[customCodeFile]
        if customCodeFile is None:
            print('ERROR attempting to access server custom code when no file provided')
            return None
        elif not os.path.exists(str(customCodeFile) ):
            print('ERROR server custom code file does not exist:' + customCodeFile)
            return None
        else:
            # If a startup.py file exists exec it first
            startupFile = os.path.join(os.path.split(customCodeFile)[0], 'startup.py')
            if os.path.exists(startupFile):
                try:
                    exec(compile(open(startupFile).read(), startupFile, 'exec'))
                except Exception as e:
                    print('ERROR execing custom server interface startup code:', startupFile)
                    print(e)
                    return None
            import inspect
            from core import CCP4Utils
            sys.path.append(os.path.split(customCodeFile)[0])
            module, err = CCP4Utils.importFileModule(customCodeFile)
            if err is not None:
                print('ERROR loading custom code file:' + customCodeFile)
                print(err)
                return None
            clsList = inspect.getmembers(module, inspect.isclass)
            if len(clsList) == 0:
                print('ERROR no classes found in custom code file:' + customCodeFile)
                return None
            if self._diagnostic:
                print('CServerParams.customHandler instantiating customHandler', clsList[0][0], self._serverParams)
            try:
                self._customHandler[customCodeFile] = clsList[0][1](self.serverParams)
            except Exception as e:
                print('ERROR instantiating custom handler class')
                print(e)
                return None
            return self._customHandler[customCodeFile]

    def listRemoteProcesses(self):
        import functools
        runningJobs = self.getJobsToPollFinish(pollFinish=[0, 1])
        jobsByMachine = {}
        for jobId in runningJobs:
            sP = self.serverParams(jobId)
            if sP.machine not in jobsByMachine:
                jobsByMachine[sP.machine] = {}
            jobsByMachine[sP.machine][jobId] = sP.serverProcessId
        for machine, jobDict in list(jobsByMachine.items()):
            comLine=self.getServerParam(jobId, 'ccp4Dir') + '/bin/ccp4-python ' + \
                     self.getServerParam(jobId,'ccp4Dir') + '/share/ccp4i2-devel/bin/listProcesses.py'
            self.sendSSHCommand(list(jobDict.keys())[0], comLine=comLine,
                                retHandler=functools.partial(self.handleListRemoteProcesses, machine, jobDict), emitFail=False)
        # get qsub status
        runningJobs = self.getJobsToPollFinish(pollFinish=[2])

    def handleListRemoteProcesses(self, machine, jobDict, jobId, out, err):
        #print 'handleListRemoteProcesses out',out
        #print 'handleListRemoteProcesses err',err
        # Extract the processes and time from the remote job stdout
        for line in out.split('\n'):
            if line.startswith('processes') or line.startswith('atTime'):
                #print 'handleListRemoteProcesses line',line
                try:
                    exec(line)
                except Exception as e:
                    print('Failed evaling the return from listRemoteProcesses')
                    print('Error:', e)
        #print 'processes',processes
        #print 'atTime',atTime
        try:
            self.emit(QtCore.SIGNAL('remoteProcessesList'), machine, jobDict, processes, atTime) # KJS : Another basic error here by the looks of it.
        except:
            print('ERROR in handling listing of remote processes')
            print('execing line', line)

Reply via email to