http://git-wip-us.apache.org/repos/asf/stratos/blob/bcddfbad/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/server/TProcessPoolServer.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/server/TProcessPoolServer.py b/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/server/TProcessPoolServer.py deleted file mode 100644 index 74e142c..0000000 --- a/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/server/TProcessPoolServer.py +++ /dev/null @@ -1,118 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - - -import logging -from multiprocessing import Process, Value, Condition - -from TServer import TServer -from ..transport.TTransport import TTransportException - - -class TProcessPoolServer(TServer): - """Server with a fixed size pool of worker subprocesses to service requests - - Note that if you need shared state between the handlers - it's up to you! - Written by Dvir Volk, doat.com - """ - def __init__(self, *args): - TServer.__init__(self, *args) - self.numWorkers = 10 - self.workers = [] - self.isRunning = Value('b', False) - self.stopCondition = Condition() - self.postForkCallback = None - - def setPostForkCallback(self, callback): - if not callable(callback): - raise TypeError("This is not a callback!") - self.postForkCallback = callback - - def setNumWorkers(self, num): - """Set the number of worker threads that should be created""" - self.numWorkers = num - - def workerProcess(self): - """Loop getting clients from the shared queue and process them""" - if self.postForkCallback: - self.postForkCallback() - - while self.isRunning.value: - try: - client = self.serverTransport.accept() - self.serveClient(client) - except (KeyboardInterrupt, SystemExit): - return 0 - except Exception, x: - logging.exception(x) - - def serveClient(self, client): - """Process input/output from a client for as long as possible""" - itrans = self.inputTransportFactory.getTransport(client) - otrans = self.outputTransportFactory.getTransport(client) - iprot = self.inputProtocolFactory.getProtocol(itrans) - oprot = self.outputProtocolFactory.getProtocol(otrans) - - try: - while True: - self.processor.process(iprot, oprot) - except TTransportException, tx: - pass - except Exception, x: - logging.exception(x) - - itrans.close() - otrans.close() - - def serve(self): - """Start workers and put into queue""" - # this is a shared state that can tell the workers to exit when False - self.isRunning.value = True - - # first bind and listen to the port - self.serverTransport.listen() - - # fork the children - for i in range(self.numWorkers): - try: - w = Process(target=self.workerProcess) - w.daemon = True - w.start() - self.workers.append(w) - except Exception, x: - logging.exception(x) - - # wait until the condition is set by stop() - while True: - self.stopCondition.acquire() - try: - self.stopCondition.wait() - break - except (SystemExit, KeyboardInterrupt): - break - except Exception, x: - logging.exception(x) - - self.isRunning.value = False - - def stop(self): - self.isRunning.value = False - self.stopCondition.acquire() - self.stopCondition.notify() - self.stopCondition.release()
http://git-wip-us.apache.org/repos/asf/stratos/blob/bcddfbad/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/server/TServer.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/server/TServer.py b/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/server/TServer.py deleted file mode 100644 index 3e44107..0000000 --- a/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/server/TServer.py +++ /dev/null @@ -1,269 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import Queue -import logging -import os -import sys -import threading -import traceback - -from ..Thrift import TProcessor -from ..protocol import TBinaryProtocol -from ..transport import TTransport - - -class TServer: - """Base interface for a server, which must have a serve() method. - - Three constructors for all servers: - 1) (processor, serverTransport) - 2) (processor, serverTransport, transportFactory, protocolFactory) - 3) (processor, serverTransport, - inputTransportFactory, outputTransportFactory, - inputProtocolFactory, outputProtocolFactory) - """ - def __init__(self, *args): - if (len(args) == 2): - self.__initArgs__(args[0], args[1], - TTransport.TTransportFactoryBase(), - TTransport.TTransportFactoryBase(), - TBinaryProtocol.TBinaryProtocolFactory(), - TBinaryProtocol.TBinaryProtocolFactory()) - elif (len(args) == 4): - self.__initArgs__(args[0], args[1], args[2], args[2], args[3], args[3]) - elif (len(args) == 6): - self.__initArgs__(args[0], args[1], args[2], args[3], args[4], args[5]) - - def __initArgs__(self, processor, serverTransport, - inputTransportFactory, outputTransportFactory, - inputProtocolFactory, outputProtocolFactory): - self.processor = processor - self.serverTransport = serverTransport - self.inputTransportFactory = inputTransportFactory - self.outputTransportFactory = outputTransportFactory - self.inputProtocolFactory = inputProtocolFactory - self.outputProtocolFactory = outputProtocolFactory - - def serve(self): - pass - - -class TSimpleServer(TServer): - """Simple single-threaded server that just pumps around one transport.""" - - def __init__(self, *args): - TServer.__init__(self, *args) - - def serve(self): - self.serverTransport.listen() - while True: - client = self.serverTransport.accept() - itrans = self.inputTransportFactory.getTransport(client) - otrans = self.outputTransportFactory.getTransport(client) - iprot = self.inputProtocolFactory.getProtocol(itrans) - oprot = self.outputProtocolFactory.getProtocol(otrans) - try: - while True: - self.processor.process(iprot, oprot) - except TTransport.TTransportException, tx: - pass - except Exception, x: - logging.exception(x) - - itrans.close() - otrans.close() - - -class TThreadedServer(TServer): - """Threaded server that spawns a new thread per each connection.""" - - def __init__(self, *args, **kwargs): - TServer.__init__(self, *args) - self.daemon = kwargs.get("daemon", False) - - def serve(self): - self.serverTransport.listen() - while True: - try: - client = self.serverTransport.accept() - t = threading.Thread(target=self.handle, args=(client,)) - t.setDaemon(self.daemon) - t.start() - except KeyboardInterrupt: - raise - except Exception, x: - logging.exception(x) - - def handle(self, client): - itrans = self.inputTransportFactory.getTransport(client) - otrans = self.outputTransportFactory.getTransport(client) - iprot = self.inputProtocolFactory.getProtocol(itrans) - oprot = self.outputProtocolFactory.getProtocol(otrans) - try: - while True: - self.processor.process(iprot, oprot) - except TTransport.TTransportException, tx: - pass - except Exception, x: - logging.exception(x) - - itrans.close() - otrans.close() - - -class TThreadPoolServer(TServer): - """Server with a fixed size pool of threads which service requests.""" - - def __init__(self, *args, **kwargs): - TServer.__init__(self, *args) - self.clients = Queue.Queue() - self.threads = 10 - self.daemon = kwargs.get("daemon", False) - - def setNumThreads(self, num): - """Set the number of worker threads that should be created""" - self.threads = num - - def serveThread(self): - """Loop around getting clients from the shared queue and process them.""" - while True: - try: - client = self.clients.get() - self.serveClient(client) - except Exception, x: - logging.exception(x) - - def serveClient(self, client): - """Process input/output from a client for as long as possible""" - itrans = self.inputTransportFactory.getTransport(client) - otrans = self.outputTransportFactory.getTransport(client) - iprot = self.inputProtocolFactory.getProtocol(itrans) - oprot = self.outputProtocolFactory.getProtocol(otrans) - try: - while True: - self.processor.process(iprot, oprot) - except TTransport.TTransportException, tx: - pass - except Exception, x: - logging.exception(x) - - itrans.close() - otrans.close() - - def serve(self): - """Start a fixed number of worker threads and put client into a queue""" - for i in range(self.threads): - try: - t = threading.Thread(target=self.serveThread) - t.setDaemon(self.daemon) - t.start() - except Exception, x: - logging.exception(x) - - # Pump the socket for clients - self.serverTransport.listen() - while True: - try: - client = self.serverTransport.accept() - self.clients.put(client) - except Exception, x: - logging.exception(x) - - -class TForkingServer(TServer): - """A Thrift server that forks a new process for each request - - This is more scalable than the threaded server as it does not cause - GIL contention. - - Note that this has different semantics from the threading server. - Specifically, updates to shared variables will no longer be shared. - It will also not work on windows. - - This code is heavily inspired by SocketServer.ForkingMixIn in the - Python stdlib. - """ - def __init__(self, *args): - TServer.__init__(self, *args) - self.children = [] - - def serve(self): - def try_close(file): - try: - file.close() - except IOError, e: - logging.warning(e, exc_info=True) - - self.serverTransport.listen() - while True: - client = self.serverTransport.accept() - try: - pid = os.fork() - - if pid: # parent - # add before collect, otherwise you race w/ waitpid - self.children.append(pid) - self.collect_children() - - # Parent must close socket or the connection may not get - # closed promptly - itrans = self.inputTransportFactory.getTransport(client) - otrans = self.outputTransportFactory.getTransport(client) - try_close(itrans) - try_close(otrans) - else: - itrans = self.inputTransportFactory.getTransport(client) - otrans = self.outputTransportFactory.getTransport(client) - - iprot = self.inputProtocolFactory.getProtocol(itrans) - oprot = self.outputProtocolFactory.getProtocol(otrans) - - ecode = 0 - try: - try: - while True: - self.processor.process(iprot, oprot) - except TTransport.TTransportException, tx: - pass - except Exception, e: - logging.exception(e) - ecode = 1 - finally: - try_close(itrans) - try_close(otrans) - - os._exit(ecode) - - except TTransport.TTransportException, tx: - pass - except Exception, x: - logging.exception(x) - - def collect_children(self): - while self.children: - try: - pid, status = os.waitpid(0, os.WNOHANG) - except os.error: - pid = None - - if pid: - self.children.remove(pid) - else: - break http://git-wip-us.apache.org/repos/asf/stratos/blob/bcddfbad/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/server/__init__.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/server/__init__.py b/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/server/__init__.py deleted file mode 100644 index 1bf6e25..0000000 --- a/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/server/__init__.py +++ /dev/null @@ -1,20 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -__all__ = ['TServer', 'TNonblockingServer'] http://git-wip-us.apache.org/repos/asf/stratos/blob/bcddfbad/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/THttpClient.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/THttpClient.py b/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/THttpClient.py deleted file mode 100644 index 9ef9535..0000000 --- a/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/THttpClient.py +++ /dev/null @@ -1,147 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import httplib -import os -import socket -import sys -import urllib -import urlparse -import warnings - -from TTransport import * - - -class THttpClient(TTransportBase): - """Http implementation of TTransport base.""" - - def __init__(self, uri_or_host, port=None, path=None): - """THttpClient supports two different types constructor parameters. - - THttpClient(host, port, path) - deprecated - THttpClient(uri) - - Only the second supports https. - """ - if port is not None: - warnings.warn( - "Please use the THttpClient('http://host:port/path') syntax", - DeprecationWarning, - stacklevel=2) - self.host = uri_or_host - self.port = port - assert path - self.path = path - self.scheme = 'http' - else: - parsed = urlparse.urlparse(uri_or_host) - self.scheme = parsed.scheme - assert self.scheme in ('http', 'https') - if self.scheme == 'http': - self.port = parsed.port or httplib.HTTP_PORT - elif self.scheme == 'https': - self.port = parsed.port or httplib.HTTPS_PORT - self.host = parsed.hostname - self.path = parsed.path - if parsed.query: - self.path += '?%s' % parsed.query - self.__wbuf = StringIO() - self.__http = None - self.__timeout = None - self.__custom_headers = None - - def open(self): - if self.scheme == 'http': - self.__http = httplib.HTTP(self.host, self.port) - else: - self.__http = httplib.HTTPS(self.host, self.port) - - def close(self): - self.__http.close() - self.__http = None - - def isOpen(self): - return self.__http is not None - - def setTimeout(self, ms): - if not hasattr(socket, 'getdefaulttimeout'): - raise NotImplementedError - - if ms is None: - self.__timeout = None - else: - self.__timeout = ms / 1000.0 - - def setCustomHeaders(self, headers): - self.__custom_headers = headers - - def read(self, sz): - return self.__http.file.read(sz) - - def write(self, buf): - self.__wbuf.write(buf) - - def __withTimeout(f): - def _f(*args, **kwargs): - orig_timeout = socket.getdefaulttimeout() - socket.setdefaulttimeout(args[0].__timeout) - result = f(*args, **kwargs) - socket.setdefaulttimeout(orig_timeout) - return result - return _f - - def flush(self): - if self.isOpen(): - self.close() - self.open() - - # Pull data out of buffer - data = self.__wbuf.getvalue() - self.__wbuf = StringIO() - - # HTTP request - self.__http.putrequest('POST', self.path) - - # Write headers - self.__http.putheader('Host', self.host) - self.__http.putheader('Content-Type', 'application/x-thrift') - self.__http.putheader('Content-Length', str(len(data))) - - if not self.__custom_headers or 'User-Agent' not in self.__custom_headers: - user_agent = 'Python/THttpClient' - script = os.path.basename(sys.argv[0]) - if script: - user_agent = '%s (%s)' % (user_agent, urllib.quote(script)) - self.__http.putheader('User-Agent', user_agent) - - if self.__custom_headers: - for key, val in self.__custom_headers.iteritems(): - self.__http.putheader(key, val) - - self.__http.endheaders() - - # Write payload - self.__http.send(data) - - # Get reply to flush the request - self.code, self.message, self.headers = self.__http.getreply() - - # Decorate if we know how to timeout - if hasattr(socket, 'getdefaulttimeout'): - flush = __withTimeout(flush) http://git-wip-us.apache.org/repos/asf/stratos/blob/bcddfbad/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/TSSLSocket.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/TSSLSocket.py b/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/TSSLSocket.py deleted file mode 100644 index df35be4..0000000 --- a/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/TSSLSocket.py +++ /dev/null @@ -1,214 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import os -import socket -import ssl - -import TSocket -from TTransport import TTransportException - - -class TSSLSocket(TSocket.TSocket): - """ - SSL implementation of client-side TSocket - - This class creates outbound sockets wrapped using the - python standard ssl module for encrypted connections. - - The protocol used is set using the class variable - SSL_VERSION, which must be one of ssl.PROTOCOL_* and - defaults to ssl.PROTOCOL_TLSv1 for greatest security. - """ - SSL_VERSION = ssl.PROTOCOL_TLSv1 - - def __init__(self, - host='localhost', - port=9090, - validate=True, - ca_certs=None, - keyfile=None, - certfile=None, - unix_socket=None): - """Create SSL TSocket - - @param validate: Set to False to disable SSL certificate validation - @type validate: bool - @param ca_certs: Filename to the Certificate Authority pem file, possibly a - file downloaded from: http://curl.haxx.se/ca/cacert.pem This is passed to - the ssl_wrap function as the 'ca_certs' parameter. - @type ca_certs: str - @param keyfile: The private key - @type keyfile: str - @param certfile: The cert file - @type certfile: str - - Raises an IOError exception if validate is True and the ca_certs file is - None, not present or unreadable. - """ - self.validate = validate - self.is_valid = False - self.peercert = None - if not validate: - self.cert_reqs = ssl.CERT_NONE - else: - self.cert_reqs = ssl.CERT_REQUIRED - self.ca_certs = ca_certs - self.keyfile = keyfile - self.certfile = certfile - if validate: - if ca_certs is None or not os.access(ca_certs, os.R_OK): - raise IOError('Certificate Authority ca_certs file "%s" ' - 'is not readable, cannot validate SSL ' - 'certificates.' % (ca_certs)) - TSocket.TSocket.__init__(self, host, port, unix_socket) - - def open(self): - try: - res0 = self._resolveAddr() - for res in res0: - sock_family, sock_type = res[0:2] - ip_port = res[4] - plain_sock = socket.socket(sock_family, sock_type) - self.handle = ssl.wrap_socket(plain_sock, - ssl_version=self.SSL_VERSION, - do_handshake_on_connect=True, - ca_certs=self.ca_certs, - keyfile=self.keyfile, - certfile=self.certfile, - cert_reqs=self.cert_reqs) - self.handle.settimeout(self._timeout) - try: - self.handle.connect(ip_port) - except socket.error, e: - if res is not res0[-1]: - continue - else: - raise e - break - except socket.error, e: - if self._unix_socket: - message = 'Could not connect to secure socket %s: %s' \ - % (self._unix_socket, e) - else: - message = 'Could not connect to %s:%d: %s' % (self.host, self.port, e) - raise TTransportException(type=TTransportException.NOT_OPEN, - message=message) - if self.validate: - self._validate_cert() - - def _validate_cert(self): - """internal method to validate the peer's SSL certificate, and to check the - commonName of the certificate to ensure it matches the hostname we - used to make this connection. Does not support subjectAltName records - in certificates. - - raises TTransportException if the certificate fails validation. - """ - cert = self.handle.getpeercert() - self.peercert = cert - if 'subject' not in cert: - raise TTransportException( - type=TTransportException.NOT_OPEN, - message='No SSL certificate found from %s:%s' % (self.host, self.port)) - fields = cert['subject'] - for field in fields: - # ensure structure we get back is what we expect - if not isinstance(field, tuple): - continue - cert_pair = field[0] - if len(cert_pair) < 2: - continue - cert_key, cert_value = cert_pair[0:2] - if cert_key != 'commonName': - continue - certhost = cert_value - # this check should be performed by some sort of Access Manager - if certhost == self.host: - # success, cert commonName matches desired hostname - self.is_valid = True - return - else: - raise TTransportException( - type=TTransportException.UNKNOWN, - message='Hostname we connected to "%s" doesn\'t match certificate ' - 'provided commonName "%s"' % (self.host, certhost)) - raise TTransportException( - type=TTransportException.UNKNOWN, - message='Could not validate SSL certificate from ' - 'host "%s". Cert=%s' % (self.host, cert)) - - -class TSSLServerSocket(TSocket.TServerSocket): - """SSL implementation of TServerSocket - - This uses the ssl module's wrap_socket() method to provide SSL - negotiated encryption. - """ - SSL_VERSION = ssl.PROTOCOL_TLSv1 - - def __init__(self, - host=None, - port=9090, - certfile='cert.pem', - unix_socket=None): - """Initialize a TSSLServerSocket - - @param certfile: filename of the server certificate, defaults to cert.pem - @type certfile: str - @param host: The hostname or IP to bind the listen socket to, - i.e. 'localhost' for only allowing local network connections. - Pass None to bind to all interfaces. - @type host: str - @param port: The port to listen on for inbound connections. - @type port: int - """ - self.setCertfile(certfile) - TSocket.TServerSocket.__init__(self, host, port) - - def setCertfile(self, certfile): - """Set or change the server certificate file used to wrap new connections. - - @param certfile: The filename of the server certificate, - i.e. '/etc/certs/server.pem' - @type certfile: str - - Raises an IOError exception if the certfile is not present or unreadable. - """ - if not os.access(certfile, os.R_OK): - raise IOError('No such certfile found: %s' % (certfile)) - self.certfile = certfile - - def accept(self): - plain_client, addr = self.handle.accept() - try: - client = ssl.wrap_socket(plain_client, certfile=self.certfile, - server_side=True, ssl_version=self.SSL_VERSION) - except ssl.SSLError, ssl_exc: - # failed handshake/ssl wrap, close socket to client - plain_client.close() - # raise ssl_exc - # We can't raise the exception, because it kills most TServer derived - # serve() methods. - # Instead, return None, and let the TServer instance deal with it in - # other exception handling. (but TSimpleServer dies anyway) - return None - result = TSocket.TSocket() - result.setHandle(client) - return result http://git-wip-us.apache.org/repos/asf/stratos/blob/bcddfbad/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/TSocket.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/TSocket.py b/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/TSocket.py deleted file mode 100644 index 9e2b384..0000000 --- a/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/TSocket.py +++ /dev/null @@ -1,176 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import errno -import os -import socket -import sys - -from TTransport import * - - -class TSocketBase(TTransportBase): - def _resolveAddr(self): - if self._unix_socket is not None: - return [(socket.AF_UNIX, socket.SOCK_STREAM, None, None, - self._unix_socket)] - else: - return socket.getaddrinfo(self.host, - self.port, - socket.AF_UNSPEC, - socket.SOCK_STREAM, - 0, - socket.AI_PASSIVE | socket.AI_ADDRCONFIG) - - def close(self): - if self.handle: - self.handle.close() - self.handle = None - - -class TSocket(TSocketBase): - """Socket implementation of TTransport base.""" - - def __init__(self, host='localhost', port=9090, unix_socket=None): - """Initialize a TSocket - - @param host(str) The host to connect to. - @param port(int) The (TCP) port to connect to. - @param unix_socket(str) The filename of a unix socket to connect to. - (host and port will be ignored.) - """ - self.host = host - self.port = port - self.handle = None - self._unix_socket = unix_socket - self._timeout = None - - def setHandle(self, h): - self.handle = h - - def isOpen(self): - return self.handle is not None - - def setTimeout(self, ms): - if ms is None: - self._timeout = None - else: - self._timeout = ms / 1000.0 - - if self.handle is not None: - self.handle.settimeout(self._timeout) - - def open(self): - try: - res0 = self._resolveAddr() - for res in res0: - self.handle = socket.socket(res[0], res[1]) - self.handle.settimeout(self._timeout) - try: - self.handle.connect(res[4]) - except socket.error, e: - if res is not res0[-1]: - continue - else: - raise e - break - except socket.error, e: - if self._unix_socket: - message = 'Could not connect to socket %s' % self._unix_socket - else: - message = 'Could not connect to %s:%d' % (self.host, self.port) - raise TTransportException(type=TTransportException.NOT_OPEN, - message=message) - - def read(self, sz): - try: - buff = self.handle.recv(sz) - except socket.error, e: - if (e.args[0] == errno.ECONNRESET and - (sys.platform == 'darwin' or sys.platform.startswith('freebsd'))): - # freebsd and Mach don't follow POSIX semantic of recv - # and fail with ECONNRESET if peer performed shutdown. - # See corresponding comment and code in TSocket::read() - # in lib/cpp/src/transport/TSocket.cpp. - self.close() - # Trigger the check to raise the END_OF_FILE exception below. - buff = '' - else: - raise - if len(buff) == 0: - raise TTransportException(type=TTransportException.END_OF_FILE, - message='TSocket read 0 bytes') - return buff - - def write(self, buff): - if not self.handle: - raise TTransportException(type=TTransportException.NOT_OPEN, - message='Transport not open') - sent = 0 - have = len(buff) - while sent < have: - plus = self.handle.send(buff) - if plus == 0: - raise TTransportException(type=TTransportException.END_OF_FILE, - message='TSocket sent 0 bytes') - sent += plus - buff = buff[plus:] - - def flush(self): - pass - - -class TServerSocket(TSocketBase, TServerTransportBase): - """Socket implementation of TServerTransport base.""" - - def __init__(self, host=None, port=9090, unix_socket=None): - self.host = host - self.port = port - self._unix_socket = unix_socket - self.handle = None - - def listen(self): - res0 = self._resolveAddr() - for res in res0: - if res[0] is socket.AF_INET6 or res is res0[-1]: - break - - # We need remove the old unix socket if the file exists and - # nobody is listening on it. - if self._unix_socket: - tmp = socket.socket(res[0], res[1]) - try: - tmp.connect(res[4]) - except socket.error, err: - eno, message = err.args - if eno == errno.ECONNREFUSED: - os.unlink(res[4]) - - self.handle = socket.socket(res[0], res[1]) - self.handle.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - if hasattr(self.handle, 'settimeout'): - self.handle.settimeout(None) - self.handle.bind(res[4]) - self.handle.listen(128) - - def accept(self): - client, addr = self.handle.accept() - result = TSocket() - result.setHandle(client) - return result http://git-wip-us.apache.org/repos/asf/stratos/blob/bcddfbad/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/TTransport.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/TTransport.py b/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/TTransport.py deleted file mode 100644 index ed023d5..0000000 --- a/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/TTransport.py +++ /dev/null @@ -1,330 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from cStringIO import StringIO -from struct import pack, unpack -from ..Thrift import TException - - -class TTransportException(TException): - """Custom Transport Exception class""" - - UNKNOWN = 0 - NOT_OPEN = 1 - ALREADY_OPEN = 2 - TIMED_OUT = 3 - END_OF_FILE = 4 - - def __init__(self, type=UNKNOWN, message=None): - TException.__init__(self, message) - self.type = type - - -class TTransportBase: - """Base class for Thrift transport layer.""" - - def isOpen(self): - pass - - def open(self): - pass - - def close(self): - pass - - def read(self, sz): - pass - - def readAll(self, sz): - buff = '' - have = 0 - while (have < sz): - chunk = self.read(sz - have) - have += len(chunk) - buff += chunk - - if len(chunk) == 0: - raise EOFError() - - return buff - - def write(self, buf): - pass - - def flush(self): - pass - - -# This class should be thought of as an interface. -class CReadableTransport: - """base class for transports that are readable from C""" - - # TODO(dreiss): Think about changing this interface to allow us to use - # a (Python, not c) StringIO instead, because it allows - # you to write after reading. - - # NOTE: This is a classic class, so properties will NOT work - # correctly for setting. - @property - def cstringio_buf(self): - """A cStringIO buffer that contains the current chunk we are reading.""" - pass - - def cstringio_refill(self, partialread, reqlen): - """Refills cstringio_buf. - - Returns the currently used buffer (which can but need not be the same as - the old cstringio_buf). partialread is what the C code has read from the - buffer, and should be inserted into the buffer before any more reads. The - return value must be a new, not borrowed reference. Something along the - lines of self._buf should be fine. - - If reqlen bytes can't be read, throw EOFError. - """ - pass - - -class TServerTransportBase: - """Base class for Thrift server transports.""" - - def listen(self): - pass - - def accept(self): - pass - - def close(self): - pass - - -class TTransportFactoryBase: - """Base class for a Transport Factory""" - - def getTransport(self, trans): - return trans - - -class TBufferedTransportFactory: - """Factory transport that builds buffered transports""" - - def getTransport(self, trans): - buffered = TBufferedTransport(trans) - return buffered - - -class TBufferedTransport(TTransportBase, CReadableTransport): - """Class that wraps another transport and buffers its I/O. - - The implementation uses a (configurable) fixed-size read buffer - but buffers all writes until a flush is performed. - """ - DEFAULT_BUFFER = 4096 - - def __init__(self, trans, rbuf_size=DEFAULT_BUFFER): - self.__trans = trans - self.__wbuf = StringIO() - self.__rbuf = StringIO("") - self.__rbuf_size = rbuf_size - - def isOpen(self): - return self.__trans.isOpen() - - def open(self): - return self.__trans.open() - - def close(self): - return self.__trans.close() - - def read(self, sz): - ret = self.__rbuf.read(sz) - if len(ret) != 0: - return ret - - self.__rbuf = StringIO(self.__trans.read(max(sz, self.__rbuf_size))) - return self.__rbuf.read(sz) - - def write(self, buf): - self.__wbuf.write(buf) - - def flush(self): - out = self.__wbuf.getvalue() - # reset wbuf before write/flush to preserve state on underlying failure - self.__wbuf = StringIO() - self.__trans.write(out) - self.__trans.flush() - - # Implement the CReadableTransport interface. - @property - def cstringio_buf(self): - return self.__rbuf - - def cstringio_refill(self, partialread, reqlen): - retstring = partialread - if reqlen < self.__rbuf_size: - # try to make a read of as much as we can. - retstring += self.__trans.read(self.__rbuf_size) - - # but make sure we do read reqlen bytes. - if len(retstring) < reqlen: - retstring += self.__trans.readAll(reqlen - len(retstring)) - - self.__rbuf = StringIO(retstring) - return self.__rbuf - - -class TMemoryBuffer(TTransportBase, CReadableTransport): - """Wraps a cStringIO object as a TTransport. - - NOTE: Unlike the C++ version of this class, you cannot write to it - then immediately read from it. If you want to read from a - TMemoryBuffer, you must either pass a string to the constructor. - TODO(dreiss): Make this work like the C++ version. - """ - - def __init__(self, value=None): - """value -- a value to read from for stringio - - If value is set, this will be a transport for reading, - otherwise, it is for writing""" - if value is not None: - self._buffer = StringIO(value) - else: - self._buffer = StringIO() - - def isOpen(self): - return not self._buffer.closed - - def open(self): - pass - - def close(self): - self._buffer.close() - - def read(self, sz): - return self._buffer.read(sz) - - def write(self, buf): - self._buffer.write(buf) - - def flush(self): - pass - - def getvalue(self): - return self._buffer.getvalue() - - # Implement the CReadableTransport interface. - @property - def cstringio_buf(self): - return self._buffer - - def cstringio_refill(self, partialread, reqlen): - # only one shot at reading... - raise EOFError() - - -class TFramedTransportFactory: - """Factory transport that builds framed transports""" - - def getTransport(self, trans): - framed = TFramedTransport(trans) - return framed - - -class TFramedTransport(TTransportBase, CReadableTransport): - """Class that wraps another transport and frames its I/O when writing.""" - - def __init__(self, trans,): - self.__trans = trans - self.__rbuf = StringIO() - self.__wbuf = StringIO() - - def isOpen(self): - return self.__trans.isOpen() - - def open(self): - return self.__trans.open() - - def close(self): - return self.__trans.close() - - def read(self, sz): - ret = self.__rbuf.read(sz) - if len(ret) != 0: - return ret - - self.readFrame() - return self.__rbuf.read(sz) - - def readFrame(self): - buff = self.__trans.readAll(4) - sz, = unpack('!i', buff) - self.__rbuf = StringIO(self.__trans.readAll(sz)) - - def write(self, buf): - self.__wbuf.write(buf) - - def flush(self): - wout = self.__wbuf.getvalue() - wsz = len(wout) - # reset wbuf before write/flush to preserve state on underlying failure - self.__wbuf = StringIO() - # N.B.: Doing this string concatenation is WAY cheaper than making - # two separate calls to the underlying socket object. Socket writes in - # Python turn out to be REALLY expensive, but it seems to do a pretty - # good job of managing string buffer operations without excessive copies - buf = pack("!i", wsz) + wout - self.__trans.write(buf) - self.__trans.flush() - - # Implement the CReadableTransport interface. - @property - def cstringio_buf(self): - return self.__rbuf - - def cstringio_refill(self, prefix, reqlen): - # self.__rbuf will already be empty here because fastbinary doesn't - # ask for a refill until the previous buffer is empty. Therefore, - # we can start reading new frames immediately. - while len(prefix) < reqlen: - self.readFrame() - prefix += self.__rbuf.getvalue() - self.__rbuf = StringIO(prefix) - return self.__rbuf - - -class TFileObjectTransport(TTransportBase): - """Wraps a file-like object to make it work as a Thrift transport.""" - - def __init__(self, fileobj): - self.fileobj = fileobj - - def isOpen(self): - return True - - def close(self): - self.fileobj.close() - - def read(self, sz): - return self.fileobj.read(sz) - - def write(self, buf): - self.fileobj.write(buf) - - def flush(self): - self.fileobj.flush() http://git-wip-us.apache.org/repos/asf/stratos/blob/bcddfbad/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/TTwisted.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/TTwisted.py b/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/TTwisted.py deleted file mode 100644 index 6cdb172..0000000 --- a/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/TTwisted.py +++ /dev/null @@ -1,221 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from cStringIO import StringIO - -from zope.interface import implements, Interface, Attribute -from twisted.internet.protocol import Protocol, ServerFactory, ClientFactory, \ - connectionDone -from twisted.internet import defer -from twisted.protocols import basic -from twisted.python import log -from twisted.web import server, resource, http - -import TTransport - - -class TMessageSenderTransport(TTransport.TTransportBase): - - def __init__(self): - self.__wbuf = StringIO() - - def write(self, buf): - self.__wbuf.write(buf) - - def flush(self): - msg = self.__wbuf.getvalue() - self.__wbuf = StringIO() - self.sendMessage(msg) - - def sendMessage(self, message): - raise NotImplementedError - - -class TCallbackTransport(TMessageSenderTransport): - - def __init__(self, func): - TMessageSenderTransport.__init__(self) - self.func = func - - def sendMessage(self, message): - self.func(message) - - -class ThriftClientProtocol(basic.Int32StringReceiver): - - MAX_LENGTH = 2 ** 31 - 1 - - def __init__(self, client_class, iprot_factory, oprot_factory=None): - self._client_class = client_class - self._iprot_factory = iprot_factory - if oprot_factory is None: - self._oprot_factory = iprot_factory - else: - self._oprot_factory = oprot_factory - - self.recv_map = {} - self.started = defer.Deferred() - - def dispatch(self, msg): - self.sendString(msg) - - def connectionMade(self): - tmo = TCallbackTransport(self.dispatch) - self.client = self._client_class(tmo, self._oprot_factory) - self.started.callback(self.client) - - def connectionLost(self, reason=connectionDone): - for k, v in self.client._reqs.iteritems(): - tex = TTransport.TTransportException( - type=TTransport.TTransportException.END_OF_FILE, - message='Connection closed') - v.errback(tex) - - def stringReceived(self, frame): - tr = TTransport.TMemoryBuffer(frame) - iprot = self._iprot_factory.getProtocol(tr) - (fname, mtype, rseqid) = iprot.readMessageBegin() - - try: - method = self.recv_map[fname] - except KeyError: - method = getattr(self.client, 'recv_' + fname) - self.recv_map[fname] = method - - method(iprot, mtype, rseqid) - - -class ThriftServerProtocol(basic.Int32StringReceiver): - - MAX_LENGTH = 2 ** 31 - 1 - - def dispatch(self, msg): - self.sendString(msg) - - def processError(self, error): - self.transport.loseConnection() - - def processOk(self, _, tmo): - msg = tmo.getvalue() - - if len(msg) > 0: - self.dispatch(msg) - - def stringReceived(self, frame): - tmi = TTransport.TMemoryBuffer(frame) - tmo = TTransport.TMemoryBuffer() - - iprot = self.factory.iprot_factory.getProtocol(tmi) - oprot = self.factory.oprot_factory.getProtocol(tmo) - - d = self.factory.processor.process(iprot, oprot) - d.addCallbacks(self.processOk, self.processError, - callbackArgs=(tmo,)) - - -class IThriftServerFactory(Interface): - - processor = Attribute("Thrift processor") - - iprot_factory = Attribute("Input protocol factory") - - oprot_factory = Attribute("Output protocol factory") - - -class IThriftClientFactory(Interface): - - client_class = Attribute("Thrift client class") - - iprot_factory = Attribute("Input protocol factory") - - oprot_factory = Attribute("Output protocol factory") - - -class ThriftServerFactory(ServerFactory): - - implements(IThriftServerFactory) - - protocol = ThriftServerProtocol - - def __init__(self, processor, iprot_factory, oprot_factory=None): - self.processor = processor - self.iprot_factory = iprot_factory - if oprot_factory is None: - self.oprot_factory = iprot_factory - else: - self.oprot_factory = oprot_factory - - -class ThriftClientFactory(ClientFactory): - - implements(IThriftClientFactory) - - protocol = ThriftClientProtocol - - def __init__(self, client_class, iprot_factory, oprot_factory=None): - self.client_class = client_class - self.iprot_factory = iprot_factory - if oprot_factory is None: - self.oprot_factory = iprot_factory - else: - self.oprot_factory = oprot_factory - - def buildProtocol(self, addr): - p = self.protocol(self.client_class, self.iprot_factory, - self.oprot_factory) - p.factory = self - return p - - -class ThriftResource(resource.Resource): - - allowedMethods = ('POST',) - - def __init__(self, processor, inputProtocolFactory, - outputProtocolFactory=None): - resource.Resource.__init__(self) - self.inputProtocolFactory = inputProtocolFactory - if outputProtocolFactory is None: - self.outputProtocolFactory = inputProtocolFactory - else: - self.outputProtocolFactory = outputProtocolFactory - self.processor = processor - - def getChild(self, path, request): - return self - - def _cbProcess(self, _, request, tmo): - msg = tmo.getvalue() - request.setResponseCode(http.OK) - request.setHeader("content-type", "application/x-thrift") - request.write(msg) - request.finish() - - def render_POST(self, request): - request.content.seek(0, 0) - data = request.content.read() - tmi = TTransport.TMemoryBuffer(data) - tmo = TTransport.TMemoryBuffer() - - iprot = self.inputProtocolFactory.getProtocol(tmi) - oprot = self.outputProtocolFactory.getProtocol(tmo) - - d = self.processor.process(iprot, oprot) - d.addCallback(self._cbProcess, request, tmo) - return server.NOT_DONE_YET http://git-wip-us.apache.org/repos/asf/stratos/blob/bcddfbad/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/TZlibTransport.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/TZlibTransport.py b/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/TZlibTransport.py deleted file mode 100644 index 2bdfd14..0000000 --- a/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/TZlibTransport.py +++ /dev/null @@ -1,249 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -"""TZlibTransport provides a compressed transport and transport factory -class, using the python standard library zlib module to implement -data compression. -""" - -from __future__ import division -import zlib -from cStringIO import StringIO - -from TTransport import TTransportBase, CReadableTransport - - -class TZlibTransportFactory(object): - """Factory transport that builds zlib compressed transports. - - This factory caches the last single client/transport that it was passed - and returns the same TZlibTransport object that was created. - - This caching means the TServer class will get the _same_ transport - object for both input and output transports from this factory. - (For non-threaded scenarios only, since the cache only holds one object) - - The purpose of this caching is to allocate only one TZlibTransport where - only one is really needed (since it must have separate read/write buffers), - and makes the statistics from getCompSavings() and getCompRatio() - easier to understand. - """ - # class scoped cache of last transport given and zlibtransport returned - _last_trans = None - _last_z = None - - def getTransport(self, trans, compresslevel=9): - """Wrap a transport, trans, with the TZlibTransport - compressed transport class, returning a new - transport to the caller. - - @param compresslevel: The zlib compression level, ranging - from 0 (no compression) to 9 (best compression). Defaults to 9. - @type compresslevel: int - - This method returns a TZlibTransport which wraps the - passed C{trans} TTransport derived instance. - """ - if trans == self._last_trans: - return self._last_z - ztrans = TZlibTransport(trans, compresslevel) - self._last_trans = trans - self._last_z = ztrans - return ztrans - - -class TZlibTransport(TTransportBase, CReadableTransport): - """Class that wraps a transport with zlib, compressing writes - and decompresses reads, using the python standard - library zlib module. - """ - # Read buffer size for the python fastbinary C extension, - # the TBinaryProtocolAccelerated class. - DEFAULT_BUFFSIZE = 4096 - - def __init__(self, trans, compresslevel=9): - """Create a new TZlibTransport, wrapping C{trans}, another - TTransport derived object. - - @param trans: A thrift transport object, i.e. a TSocket() object. - @type trans: TTransport - @param compresslevel: The zlib compression level, ranging - from 0 (no compression) to 9 (best compression). Default is 9. - @type compresslevel: int - """ - self.__trans = trans - self.compresslevel = compresslevel - self.__rbuf = StringIO() - self.__wbuf = StringIO() - self._init_zlib() - self._init_stats() - - def _reinit_buffers(self): - """Internal method to initialize/reset the internal StringIO objects - for read and write buffers. - """ - self.__rbuf = StringIO() - self.__wbuf = StringIO() - - def _init_stats(self): - """Internal method to reset the internal statistics counters - for compression ratios and bandwidth savings. - """ - self.bytes_in = 0 - self.bytes_out = 0 - self.bytes_in_comp = 0 - self.bytes_out_comp = 0 - - def _init_zlib(self): - """Internal method for setting up the zlib compression and - decompression objects. - """ - self._zcomp_read = zlib.decompressobj() - self._zcomp_write = zlib.compressobj(self.compresslevel) - - def getCompRatio(self): - """Get the current measured compression ratios (in,out) from - this transport. - - Returns a tuple of: - (inbound_compression_ratio, outbound_compression_ratio) - - The compression ratios are computed as: - compressed / uncompressed - - E.g., data that compresses by 10x will have a ratio of: 0.10 - and data that compresses to half of ts original size will - have a ratio of 0.5 - - None is returned if no bytes have yet been processed in - a particular direction. - """ - r_percent, w_percent = (None, None) - if self.bytes_in > 0: - r_percent = self.bytes_in_comp / self.bytes_in - if self.bytes_out > 0: - w_percent = self.bytes_out_comp / self.bytes_out - return (r_percent, w_percent) - - def getCompSavings(self): - """Get the current count of saved bytes due to data - compression. - - Returns a tuple of: - (inbound_saved_bytes, outbound_saved_bytes) - - Note: if compression is actually expanding your - data (only likely with very tiny thrift objects), then - the values returned will be negative. - """ - r_saved = self.bytes_in - self.bytes_in_comp - w_saved = self.bytes_out - self.bytes_out_comp - return (r_saved, w_saved) - - def isOpen(self): - """Return the underlying transport's open status""" - return self.__trans.isOpen() - - def open(self): - """Open the underlying transport""" - self._init_stats() - return self.__trans.open() - - def listen(self): - """Invoke the underlying transport's listen() method""" - self.__trans.listen() - - def accept(self): - """Accept connections on the underlying transport""" - return self.__trans.accept() - - def close(self): - """Close the underlying transport,""" - self._reinit_buffers() - self._init_zlib() - return self.__trans.close() - - def read(self, sz): - """Read up to sz bytes from the decompressed bytes buffer, and - read from the underlying transport if the decompression - buffer is empty. - """ - ret = self.__rbuf.read(sz) - if len(ret) > 0: - return ret - # keep reading from transport until something comes back - while True: - if self.readComp(sz): - break - ret = self.__rbuf.read(sz) - return ret - - def readComp(self, sz): - """Read compressed data from the underlying transport, then - decompress it and append it to the internal StringIO read buffer - """ - zbuf = self.__trans.read(sz) - zbuf = self._zcomp_read.unconsumed_tail + zbuf - buf = self._zcomp_read.decompress(zbuf) - self.bytes_in += len(zbuf) - self.bytes_in_comp += len(buf) - old = self.__rbuf.read() - self.__rbuf = StringIO(old + buf) - if len(old) + len(buf) == 0: - return False - return True - - def write(self, buf): - """Write some bytes, putting them into the internal write - buffer for eventual compression. - """ - self.__wbuf.write(buf) - - def flush(self): - """Flush any queued up data in the write buffer and ensure the - compression buffer is flushed out to the underlying transport - """ - wout = self.__wbuf.getvalue() - if len(wout) > 0: - zbuf = self._zcomp_write.compress(wout) - self.bytes_out += len(wout) - self.bytes_out_comp += len(zbuf) - else: - zbuf = '' - ztail = self._zcomp_write.flush(zlib.Z_SYNC_FLUSH) - self.bytes_out_comp += len(ztail) - if (len(zbuf) + len(ztail)) > 0: - self.__wbuf = StringIO() - self.__trans.write(zbuf + ztail) - self.__trans.flush() - - @property - def cstringio_buf(self): - """Implement the CReadableTransport interface""" - return self.__rbuf - - def cstringio_refill(self, partialread, reqlen): - """Implement the CReadableTransport interface for refill""" - retstring = partialread - if reqlen < self.DEFAULT_BUFFSIZE: - retstring += self.read(self.DEFAULT_BUFFSIZE) - while len(retstring) < reqlen: - retstring += self.read(reqlen - len(retstring)) - self.__rbuf = StringIO(retstring) - return self.__rbuf http://git-wip-us.apache.org/repos/asf/stratos/blob/bcddfbad/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/__init__.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/__init__.py b/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/__init__.py deleted file mode 100644 index c9596d9..0000000 --- a/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/__init__.py +++ /dev/null @@ -1,20 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -__all__ = ['TTransport', 'TSocket', 'THttpClient', 'TZlibTransport'] http://git-wip-us.apache.org/repos/asf/stratos/blob/bcddfbad/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/__init__.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/__init__.py b/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/__init__.py deleted file mode 100644 index a595c84..0000000 --- a/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/__init__.py +++ /dev/null @@ -1,18 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - - http://git-wip-us.apache.org/repos/asf/stratos/blob/bcddfbad/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/exception/__init__.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/exception/__init__.py b/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/exception/__init__.py deleted file mode 100644 index 2456923..0000000 --- a/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/exception/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - http://git-wip-us.apache.org/repos/asf/stratos/blob/bcddfbad/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/exception/datapublisherexception.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/exception/datapublisherexception.py b/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/exception/datapublisherexception.py deleted file mode 100644 index fc4bfc9..0000000 --- a/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/exception/datapublisherexception.py +++ /dev/null @@ -1,33 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -class DataPublisherException(Exception): - """ - Exception to be used during log publishing operations - """ - - def __init__(self, msg): - super(self, msg) - self.message = msg - - def get_message(self): - """ - The message provided when the exception is raised - :return: message - :rtype: str - """ - return self.message http://git-wip-us.apache.org/repos/asf/stratos/blob/bcddfbad/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/logpublisher.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/logpublisher.py b/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/logpublisher.py deleted file mode 100644 index 050dd9e..0000000 --- a/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/logpublisher.py +++ /dev/null @@ -1,273 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -import os -import datetime -from threading import Thread, current_thread - -from ..databridge.agent import * -from ..config.cartridgeagentconfiguration import CartridgeAgentConfiguration -from ..util import cartridgeagentutils, cartridgeagentconstants -from exception.datapublisherexception import DataPublisherException - - -class LogPublisher(Thread): - - def __init__(self, file_path, stream_definition, tenant_id, alias, date_time, member_id): - Thread.__init__(self) - - self.log = LogFactory().get_log(__name__) - - self.file_path = file_path - self.thrift_publisher = ThriftPublisher( - DataPublisherConfiguration.get_instance().monitoring_server_ip, - DataPublisherConfiguration.get_instance().monitoring_server_port, - DataPublisherConfiguration.get_instance().admin_username, - DataPublisherConfiguration.get_instance().admin_password, - stream_definition) - self.tenant_id = tenant_id - self.alias = alias - self.datetime = date_time - self.member_id = member_id - - self.terminated = False - - def run(self): - if os.path.isfile(self.file_path) and os.access(self.file_path, os.R_OK): - self.log.info("Starting log publisher for file: " + self.file_path + ", thread: " + current_thread()) - # open file and keep reading for new entries - read_file = open(self.file_path, "r") - read_file.seek(os.stat(self.file_path)[6]) # go to the end of the file - - while not self.terminated: - where = read_file.tell() # where the seeker is in the file - line = read_file.readline() # read the current line - if not line: - # no new line entered - time.sleep(1) - read_file.seek(where) # set seeker - else: - # new line detected, create event object - event = ThriftEvent() - event.metaData.append(self.member_id) - event.payloadData.append(self.tenant_id) - event.payloadData.append(self.alias) - event.payloadData.append("") - event.payloadData.append(self.datetime) - event.payloadData.append("") - event.payloadData.append(line) - event.payloadData.append("") - event.payloadData.append("") - event.payloadData.append(self.member_id) - event.payloadData.append("") - - self.thrift_publisher.publish(event) - - self.thrift_publisher.disconnect() # dicsonnect the publisher upon being terminated - else: - raise DataPublisherException("Unable to read the file at path %r" % self.file_path) - - def terminate(self): - """ - Allows the LogPublisher thread to be terminated to stop publishing to BAM/CEP. Allow a minimum of 1 second delay - to take effect. - """ - self.terminated = True - - -class LogPublisherManager(Thread): - """ - A log publishing thread management thread which maintains a log publisher for each log file. Also defines a stream - definition and the BAM/CEP server information for a single publishing context. - """ - - @staticmethod - def define_stream(): - """ - Creates a stream definition for Log Publishing - :return: A StreamDefinition object with the required attributes added - :rtype : StreamDefinition - """ - # stream definition - stream_definition = StreamDefinition() - valid_tenant_id = LogPublisherManager.get_valid_tenant_id(CartridgeAgentConfiguration().tenant_id) - alias = LogPublisherManager.get_alias(CartridgeAgentConfiguration().cluster_id) - stream_name = "logs." + valid_tenant_id + "." \ - + alias + "." + LogPublisherManager.get_current_date() - stream_version = "1.0.0" - - stream_definition.name = stream_name - stream_definition.version = stream_version - stream_definition.description = "Apache Stratos Instance Log Publisher" - stream_definition.add_metadata_attribute("memberId", StreamDefinition.STRING) - stream_definition.add_payloaddata_attribute("tenantID", StreamDefinition.STRING) - stream_definition.add_payloaddata_attribute("serverName", StreamDefinition.STRING) - stream_definition.add_payloaddata_attribute("appName", StreamDefinition.STRING) - stream_definition.add_payloaddata_attribute("logTime", StreamDefinition.STRING) - stream_definition.add_payloaddata_attribute("priority", StreamDefinition.STRING) - stream_definition.add_payloaddata_attribute("message", StreamDefinition.STRING) - stream_definition.add_payloaddata_attribute("logger", StreamDefinition.STRING) - stream_definition.add_payloaddata_attribute("ip", StreamDefinition.STRING) - stream_definition.add_payloaddata_attribute("instance", StreamDefinition.STRING) - stream_definition.add_payloaddata_attribute("stacktrace", StreamDefinition.STRING) - - return stream_definition - - def __init__(self, logfile_paths): - Thread.__init__(self) - self.logfile_paths = logfile_paths - self.publishers = {} - self.ports = [] - self.ports.append(DataPublisherConfiguration.get_instance().monitoring_server_port) - self.ports.append(DataPublisherConfiguration.get_instance().monitoring_server_secure_port) - - self.cartridge_agent_config = CartridgeAgentConfiguration() - - cartridgeagentutils.wait_until_ports_active( - DataPublisherConfiguration.get_instance().monitoring_server_ip, - self.ports, - int(self.cartridge_agent_config.read_property("port.check.timeout", critical=False))) - - ports_active = cartridgeagentutils.check_ports_active( - DataPublisherConfiguration.get_instance().monitoring_server_ip, - self.ports) - - if not ports_active: - raise DataPublisherException("Monitoring server not active, data publishing is aborted") - - self.stream_definition = self.define_stream() - - def run(self): - if self.logfile_paths is not None and len(self.logfile_paths): - for log_path in self.logfile_paths: - # thread for each log file - publisher = self.get_publisher(log_path) - publisher.start() - - def get_publisher(self, log_path): - """ - Retrieve the publisher for the specified log file path. Creates a new LogPublisher if one is not available - :return: The LogPublisher object - :rtype : LogPublisher - """ - if log_path not in self.publishers: - self.publishers[log_path] = LogPublisher(log_path, self.stream_definition) - - return self.publishers[log_path] - - def terminate_publisher(self, log_path): - """ - Terminates the LogPublisher thread associated with the specified log file - """ - if log_path in self.publishers: - self.publishers[log_path].terminate() - - def terminate_all_publishers(self): - """ - Terminates all LogPublisher threads - """ - for publisher in self.publishers: - publisher.terminate() - - @staticmethod - def get_valid_tenant_id(tenant_id): - if tenant_id == cartridgeagentconstants.INVALID_TENANT_ID \ - or tenant_id == cartridgeagentconstants.SUPER_TENANT_ID: - return "0" - - return tenant_id - - @staticmethod - def get_alias(cluster_id): - try: - alias = cluster_id.split("\\.")[0] - except: - alias = cluster_id - - return alias - - @staticmethod - def get_current_date(): - """ - Returns the current date formatted as yyyy-MM-dd - :return: Formatted date string - :rtype : str - """ - return datetime.date.today().strftime(cartridgeagentconstants.DATE_FORMAT) - - -class DataPublisherConfiguration: - """ - A singleton implementation to access configuration information for data publishing to BAM/CEP - TODO: perfect singleton impl ex: Borg - """ - - __instance = None - log = LogFactory().get_log(__name__) - - @staticmethod - def get_instance(): - """ - Singleton instance retriever - :return: Instance - :rtype : DataPublisherConfiguration - """ - if DataPublisherConfiguration.__instance is None: - DataPublisherConfiguration.__instance = DataPublisherConfiguration() - - return DataPublisherConfiguration.__instance - - def __init__(self): - self.enabled = False - self.monitoring_server_ip = None - self.monitoring_server_port = None - self.monitoring_server_secure_port = None - self.admin_username = None - self.admin_password = None - self.cartridge_agent_config = CartridgeAgentConfiguration() - - self.read_config() - - def read_config(self): - self.enabled = True if self.cartridge_agent_config.read_property(cartridgeagentconstants.MONITORING_PUBLISHER_ENABLED, False).strip().lower() == "true" else False - if not self.enabled: - DataPublisherConfiguration.log.info("Data Publisher disabled") - return - - DataPublisherConfiguration.log.info("Data Publisher enabled") - - self.monitoring_server_ip = self.cartridge_agent_config.read_property(cartridgeagentconstants.MONITORING_RECEIVER_IP, False) - if self.monitoring_server_ip is None or self.monitoring_server_ip.strip() == "": - raise RuntimeError("System property not found: " + cartridgeagentconstants.MONITORING_RECEIVER_IP) - - self.monitoring_server_port = self.cartridge_agent_config.read_property(cartridgeagentconstants.MONITORING_RECEIVER_PORT, False) - if self.monitoring_server_port is None or self.monitoring_server_port.strip() == "": - raise RuntimeError("System property not found: " + cartridgeagentconstants.MONITORING_RECEIVER_PORT) - - self.monitoring_server_secure_port = self.cartridge_agent_config.read_property("monitoring.server.secure.port", False) - if self.monitoring_server_secure_port is None or self.monitoring_server_secure_port.strip() == "": - raise RuntimeError("System property not found: monitoring.server.secure.port") - - self.admin_username = self.cartridge_agent_config.read_property(cartridgeagentconstants.MONITORING_SERVER_ADMIN_USERNAME, False) - if self.admin_username is None or self.admin_username.strip() == "": - raise RuntimeError("System property not found: " + cartridgeagentconstants.MONITORING_SERVER_ADMIN_USERNAME) - - self.admin_password = self.cartridge_agent_config.read_property(cartridgeagentconstants.MONITORING_SERVER_ADMIN_PASSWORD, False) - if self.admin_password is None or self.admin_password.strip() == "": - raise RuntimeError("System property not found: " + cartridgeagentconstants.MONITORING_SERVER_ADMIN_PASSWORD) - - DataPublisherConfiguration.log.info("Data Publisher configuration initialized") http://git-wip-us.apache.org/repos/asf/stratos/blob/bcddfbad/tools/python-cartridge-agent/cartridge-agent/modules/event/__init__.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/event/__init__.py b/tools/python-cartridge-agent/cartridge-agent/modules/event/__init__.py deleted file mode 100644 index e69de29..0000000 http://git-wip-us.apache.org/repos/asf/stratos/blob/bcddfbad/tools/python-cartridge-agent/cartridge-agent/modules/event/instance/__init__.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/event/instance/__init__.py b/tools/python-cartridge-agent/cartridge-agent/modules/event/instance/__init__.py deleted file mode 100644 index 13a8339..0000000 --- a/tools/python-cartridge-agent/cartridge-agent/modules/event/instance/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. http://git-wip-us.apache.org/repos/asf/stratos/blob/bcddfbad/tools/python-cartridge-agent/cartridge-agent/modules/event/instance/notifier/__init__.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/event/instance/notifier/__init__.py b/tools/python-cartridge-agent/cartridge-agent/modules/event/instance/notifier/__init__.py deleted file mode 100644 index 2456923..0000000 --- a/tools/python-cartridge-agent/cartridge-agent/modules/event/instance/notifier/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - http://git-wip-us.apache.org/repos/asf/stratos/blob/bcddfbad/tools/python-cartridge-agent/cartridge-agent/modules/event/instance/notifier/events.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/event/instance/notifier/events.py b/tools/python-cartridge-agent/cartridge-agent/modules/event/instance/notifier/events.py deleted file mode 100644 index 024e1e4..0000000 --- a/tools/python-cartridge-agent/cartridge-agent/modules/event/instance/notifier/events.py +++ /dev/null @@ -1,77 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -import json - - -class ArtifactUpdatedEvent: - def __init__(self): - self.cluster_id = None - """ :type : str """ - self.status = None - """ :type : str """ - self.repo_username = None - """ :type : str """ - self.repo_password = None - """ :type : str """ - self.repo_url = None - """ :type : str """ - self.tenant_id = None - """ :type : int """ - self.commit_enabled = None - """ :type : bool """ - - @staticmethod - def create_from_json(json_str): - json_obj = json.loads(json_str) - instance = ArtifactUpdatedEvent() - - instance.cluster_id = json_obj["clusterId"] if "clusterId" in json_obj else None - instance.status = json_obj["status"] if "status" in json_obj else None - instance.repo_username = json_obj["repoUserName"] if "repoUserName" in json_obj else None - instance.repo_password = json_obj["repoPassword"] if "repoPassword" in json_obj else None - instance.tenant_id = json_obj["tenantId"] if "tenantId" in json_obj else None - instance.repo_url = json_obj["repoUrl"] if "repoUrl" in json_obj else "" - instance.commit_enabled = json_obj["commitEnabled"] if "commitEnabled" in json_obj else None - - return instance - - -class InstanceCleanupClusterEvent: - def __init__(self, cluster_id): - self.cluster_id = cluster_id - """ :type : str """ - - @staticmethod - def create_from_json(json_str): - json_obj = json.loads(json_str) - c_id = json_obj["clusterId"] if "clusterId" in json_obj else None - - return InstanceCleanupClusterEvent(c_id) - - -class InstanceCleanupMemberEvent: - def __init__(self, member_id): - self.member_id = member_id - """ :type : str """ - - @staticmethod - def create_from_json(json_str): - json_obj = json.loads(json_str) - m_id = json_obj["memberId"] if "memberId" in json_obj else None - - return InstanceCleanupMemberEvent(m_id) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/bcddfbad/tools/python-cartridge-agent/cartridge-agent/modules/event/instance/status/__init__.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/event/instance/status/__init__.py b/tools/python-cartridge-agent/cartridge-agent/modules/event/instance/status/__init__.py deleted file mode 100644 index 2456923..0000000 --- a/tools/python-cartridge-agent/cartridge-agent/modules/event/instance/status/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -
