+class BkrProxyException(Exception):
+ def __init__(self, text):
+ Exception.__init__(self, text)
+
+class OfflineProxy:
+ '''Locally mimic the beaker server. Useful for debugging and
+ dealing with a flakey server. Sits underneath everything.
+ '''
+
+ OFFLINE_RESULTS = AUTOTEST_CACHE_DIR + '/results'
+
+ def __init__(self):
+ if os.path.isdir(self.OFFLINE_RESULTS):
+ shutil.rmtree(self.OFFLINE_RESULTS)
+ os.makedirs(self.OFFLINE_RESULTS)
+ self.cmd_log = open(self.OFFLINE_RESULTS + '/cmd_log', 'a', 0)
+
+ def task_upload_file(self, tid, remotedir, remotename, sz, digest, offset,
data):
+ self.cmd_log.write('task_upload_file: tid(%s) remotedir(%s)
remotename(%s) sz(%d) digest(%d) offset(%d) data(%s)\n' % \
+ (tid, remotedir, remotename, sz, digest, offset,
data))
+
+ def result_upload_file(self, tid, remotedir, remotename, sz, digest,
offset, data):
+ self.cmd_log.write('result_upload_file: tid(%s) remotedir(%s)
remotename(%s) sz(%d) digest(%d) offset(%d) data(%s)\n' % \
+ (tid, remotedir, remotename, sz, digest, offset,
data))
+
+ def recipe_upload_file(self, tid, remotedir, remotename, sz, digest,
offset, data):
+ self.cmd_log.write('recipe_upload_file: tid(%s) remotedir(%s)
remotename(%s) sz(%d) digest(%d) offset(%d) data(%s)\n' % \
+ (tid, remotedir, remotename, sz, digest, offset,
data))
+
+ def get_my_recipe(self, request):
+ '''Returns the cached recipe. Ignore the request for now'''
+ self.cmd_log.write('get_my_recipe: request(%s)\n' % request)
+ if not os.path.isfile(BKR_CACHE):
+ raise BkrProxyException("Couldn't find cache %s" % BKR_CACHE)
+ return open(BKR_CACHE, 'r').read()
+
+ def task_result(self, task_id, result_type, result_path,
+ result_score, result_summary):
+ '''No remote server to do anything, so just stub it out'''
+ self.cmd_log.write('task_result: task_id(%s) result_type(%s)
result_path(%s) result_score(%s) result_summary(%s)\n' % \
+ (task_id, result_type, result_path, result_score,
result_summary))
+
+ def task_start(self, task_id, kill_time=None):
+ '''No remote server to do anything, so just stub it out'''
+ if not kill_time:
+ ktime = "None"
+ else:
+ ktime = str(kill_time)
+ self.cmd_log.write('task_start: task_id(%s) kill_time(%s)\n'
%(task_id, ktime))
+
+ def task_stop(self, task_id, stop_type, msg):
+ '''No remote server to do anything, so just stub it out'''
+ self.cmd_log.write('task_stop: task_id(%s) stop_type(%s) msg(%s)\n'
%(task_id, stop_type, msg))
+
+ def recipe_stop(self, recipeset_id, stop_type, msg):
+ '''No remote server to do anything, so just stub it out'''
+ self.cmd_log.write('Recipe_stop: recipeset_id(%s) stop_type(%s)
msg(%s)\n' %(recipeset_id, stop_type, msg))
+
+ def extend_watchdog(self, task_id, killtime):
+ '''No remote server to kill us, so just stub it out'''
+ self.cmd_log.write('Extend_watchdog: task_id(%s) killtime(%d)\n'
%(task_id, killtime))
+
+class FailingProxy:
+ def __init__(self, server_proxy, failafter=0, failchance=0):
+ self.count = 0
+ self.failafter = failafter
+ self.failchance = failchance
+ self.server_proxy = server_proxy
+
+ def test_fail(self):
+ self.count = self.count + 1
+ if self.count >= self.failafter:
+ self.count = 0
+ raise xmlrpclib.Fault(404, 'XMLRPC failed')
+
+ def test_randfail(self):
+ if not self.failchance:
+ return
+ if random.randint(1, self.failchance) == 1:
+ raise xmlrpclib.Fault(404, 'XMLRPC failed')
+
+ def __getattr__(self, name):
+ self.test_fail()
+ self.test_randfail()
+ def wrapper(*args, **kwargs):
+ fn = getattr(self.server_proxy, name)
+ return fn(*args, **kwargs)
+ return wrapper
+
+class SimpleRepeatingProxy:
+ def __init__(self, server_proxy, delay, retries=0):
+ self.delay = delay
+ self.max_retries = retries
+ self.server_proxy = server_proxy
+ self.attempt = 0
+
+ def __getattr__(self, name):
+ def wrapper(*args, **kwargs):
+ self.attempt = self.max_retries + 1
+ while (self.attempt):
+ try:
+ self.attempt = self.attempt - 1
+ fn = getattr(self.server_proxy, name)
+ return fn(*args, **kwargs)
+ except xmlrpclib.Fault, flt:
+ logException(flt)
+ if (not self.max_retries):
+ raise BkrProxyException(flt.faultString)
+ if self.attempt == 0:
+ raise BkrProxyException('Failed to deliver message
after %s retries' % self.max_retries)
+ log.info('Failed to deliver message: %s', name)
+ log.info('Sleeping for %s', self.delay)
+ time.sleep(self.delay)
+ log.info('Trying again')
+ return wrapper
+
+class DebuggingProxy:
+ def __init__(self, server_proxy):
+ self.server_proxy = server_proxy
+
+ def __getattr__(self, name):
+ def wrapper(*args, **kwargs):
+ def print_args(args):
+ #TODO: hoping to avoid call if not-debug
+ max_param_len = 15
+ short_args = []
+ for arg in args:
+ if (len(str(arg)) > max_param_len):
+ short_args.append(str(arg)[:max_param_len] + '..')
+ else:
+ short_args.append(arg)
+ return short_args
+
+ log.debug('XML: %s(%s)', name, print_args(args))
+ fn = getattr(self.server_proxy, name)
+ ret = fn(*args, **kwargs)
+ log.debug(' --> %s', ret)
+ return ret
+ return wrapper
+
+class BkrProxy:
+ def __init__(self, labc_url='', delay=60, retries=0, failafter=0,
failchance=0):
+
+ #why do I love wrappers??
+
+ #labc_url determines local or remote functionality
+ if labc_url:
+ self.labc_url = "http://" + labc_url + ":8000"
+ self.server_proxy = xmlrpclib.Server(self.labc_url)
+ else:
+ log.info("Using OfflineProxy() as remote host")
+ self.server_proxy = OfflineProxy()
+ self.labc_url = ''
+
+ #emulate failures
+ if failafter or failchance:
+ self.server_proxy = FailingProxy(self.server_proxy,
failafter=failafter, failchance=failchance)
+
+ #deal with retries
+ self.server_proxy = SimpleRepeatingProxy(self.server_proxy,
delay=delay, retries=retries)
+
+ #debugging
+ self.server_proxy = DebuggingProxy(self.server_proxy)
+
+ def task_upload_file(self, localfile, id, remotename, remotedir):
+ return self._uploadWrapper('task_upload_file', localfile, id,
+ remotename, remotedir)
+
+ def result_upload_file(self, localfile, id, remotename, remotedir):
+ return self._uploadWrapper('result_upload_file', localfile, id,
+ remotename, remotedir)
+
+ def recipe_upload_file(self, localfile, id, remotename, remotedir):
+ return self._uploadWrapper('recipe_upload_file', localfile, id,
+ remotename, remotedir)
+
+ def get_recipe(self, hostname):
+ ret = self.server_proxy.get_my_recipe({'system_name': hostname})
+ #cache this locally, in case the server goes down
+ if not os.path.isdir(os.path.dirname(BKR_CACHE)):
+ os.makedirs(os.path.dirname(BKR_CACHE))
+ open(BKR_CACHE, 'w').write("%s" % ret)
+ return ret
+
+ def task_result(self, task_id, result_type, result_path,
+ result_score, result_summary):
+ return self.server_proxy.task_result(task_id, result_type,
+ result_path, result_score, result_summary)
+
+ def task_start(self, task_id, kill_time=None):
+ if (kill_time):
+ return self.server_proxy.task_start(task_id, kill_time)
+ else:
+ return self.server_proxy.task_start(task_id)
+
+ def task_stop(self, task_id, stop_type, msg):
+ return self.server_proxy.task_stop(task_id, stop_type, msg)
+
+ def recipe_stop(self, recipeset_id, stop_type, msg):
+ return self.server_proxy.recipe_stop(recipeset_id, stop_type, msg)
+
+ def extend_watchdog(self, task_id, killtime):
+ return self.server_proxy.extend_watchdog(task_id, killtime)
+
+ def _uploadWrapper(self, rpc_method, localfile, tid, remotename=None,
+ remotedir=None, callback=None, blocksize=262144,
start=0):
+ """
+ upload a file in chunks - taken over from rhts-test-env and
modified
+ rpc_method - *_upload_file, e.g. task_upload_file
+ localfile - filepath to file on local filesystem
+ tid - ID of task, result or recipe
+ remotename - name of file on remote side
+ remotedir - relative path on remote side
+ """
+ started = time.time()
+ ret = False
+ hashlibMD5 = getMD5Constructor()
+ fn = getattr(self.server_proxy, rpc_method)
+
+ if remotename == None:
+ remotename = os.path.basename(localfile)
+
+ if remotedir == None:
+ remotedir = '.'
+
+ fo = file(localfile, "r") #specify bufsize?
+ totalsize = os.path.getsize(localfile)
+ ofs = start
+ if ofs != 0:
+ fo.seek(ofs)
+
+ digestor = hashlibMD5()
+
+ debug = False
+ if callback:
+ callback(0, totalsize, 0, 0, 0)
+ while ofs <= totalsize:
+ if (totalsize - ofs) < blocksize:
+ blocksize = totalsize - ofs
+ lap = time.time()
+ contents = fo.read(blocksize)
+
+ digestor.update(contents)
+ size = len(contents)
+ data = base64.encodestring(contents)
+ if size == 0:
+ # end of file, use offset = -1 to finalize upload
+ offset = -1
+ digest = digestor.hexdigest()
+ sz = ofs
+ else:
+ offset = ofs
+ digest = hashlibMD5(contents).hexdigest()
+ sz = size
+ del contents
+
+ if True:
+ if debug:
+ log.info('uploadFile(%r,%r,%r,%r,%r,...)',
+ tid, remotename, sz, digest, offset)
+
+ ret = fn(tid, remotedir, remotename, sz, digest, offset, data)
+
+ if debug:
+ log.info('ret: ', ret)
+
+ if size == 0:
+ break
+
+ ofs += size
+ now = time.time()
+ t1 = now - lap
+ if t1 <= 0:
+ t1 = 1
+ t2 = now - started
+ if t2 <= 0:
+ t2 = 1
+ if debug:
+ log.info('Uploaded %d bytes in %f seconds (%f kbytes/sec)',
+ size, t1, size / t1 / 1024)
+ log.info('Total: %d bytes in %f seconds (%f kbytes/sec)',
+ ofs, t2, ofs / t2 / 1024)
+ if callback:
+ callback(ofs, totalsize, size, t1, t2)
+ fo.close()
+ return ret
+
+
+if __name__ == '__main__':
+ pass
+