http://git-wip-us.apache.org/repos/asf/stratos/blob/700a6d26/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/server/TNonblockingServer.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/server/TNonblockingServer.py b/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/server/TNonblockingServer.py new file mode 100644 index 0000000..fa478d0 --- /dev/null +++ b/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/server/TNonblockingServer.py @@ -0,0 +1,346 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +"""Implementation of non-blocking server. + +The main idea of the server is to receive and send requests +only from the main thread. + +The thread poool should be sized for concurrent tasks, not +maximum connections +""" +import threading +import socket +import Queue +import select +import struct +import logging + +from thrift.transport import TTransport +from thrift.protocol.TBinaryProtocol import TBinaryProtocolFactory + +__all__ = ['TNonblockingServer'] + + +class Worker(threading.Thread): + """Worker is a small helper to process incoming connection.""" + + def __init__(self, queue): + threading.Thread.__init__(self) + self.queue = queue + + def run(self): + """Process queries from task queue, stop if processor is None.""" + while True: + try: + processor, iprot, oprot, otrans, callback = self.queue.get() + if processor is None: + break + processor.process(iprot, oprot) + callback(True, otrans.getvalue()) + except Exception: + logging.exception("Exception while processing request") + callback(False, '') + +WAIT_LEN = 0 +WAIT_MESSAGE = 1 +WAIT_PROCESS = 2 +SEND_ANSWER = 3 +CLOSED = 4 + + +def locked(func): + """Decorator which locks self.lock.""" + def nested(self, *args, **kwargs): + self.lock.acquire() + try: + return func(self, *args, **kwargs) + finally: + self.lock.release() + return nested + + +def socket_exception(func): + """Decorator close object on socket.error.""" + def read(self, *args, **kwargs): + try: + return func(self, *args, **kwargs) + except socket.error: + self.close() + return read + + +class Connection: + """Basic class is represented connection. + + It can be in state: + WAIT_LEN --- connection is reading request len. + WAIT_MESSAGE --- connection is reading request. + WAIT_PROCESS --- connection has just read whole request and + waits for call ready routine. + SEND_ANSWER --- connection is sending answer string (including length + of answer). + CLOSED --- socket was closed and connection should be deleted. + """ + def __init__(self, new_socket, wake_up): + self.socket = new_socket + self.socket.setblocking(False) + self.status = WAIT_LEN + self.len = 0 + self.message = '' + self.lock = threading.Lock() + self.wake_up = wake_up + + def _read_len(self): + """Reads length of request. + + It's a safer alternative to self.socket.recv(4) + """ + read = self.socket.recv(4 - len(self.message)) + if len(read) == 0: + # if we read 0 bytes and self.message is empty, then + # the client closed the connection + if len(self.message) != 0: + logging.error("can't read frame size from socket") + self.close() + return + self.message += read + if len(self.message) == 4: + self.len, = struct.unpack('!i', self.message) + if self.len < 0: + logging.error("negative frame size, it seems client " + "doesn't use FramedTransport") + self.close() + elif self.len == 0: + logging.error("empty frame, it's really strange") + self.close() + else: + self.message = '' + self.status = WAIT_MESSAGE + + @socket_exception + def read(self): + """Reads data from stream and switch state.""" + assert self.status in (WAIT_LEN, WAIT_MESSAGE) + if self.status == WAIT_LEN: + self._read_len() + # go back to the main loop here for simplicity instead of + # falling through, even though there is a good chance that + # the message is already available + elif self.status == WAIT_MESSAGE: + read = self.socket.recv(self.len - len(self.message)) + if len(read) == 0: + logging.error("can't read frame from socket (get %d of " + "%d bytes)" % (len(self.message), self.len)) + self.close() + return + self.message += read + if len(self.message) == self.len: + self.status = WAIT_PROCESS + + @socket_exception + def write(self): + """Writes data from socket and switch state.""" + assert self.status == SEND_ANSWER + sent = self.socket.send(self.message) + if sent == len(self.message): + self.status = WAIT_LEN + self.message = '' + self.len = 0 + else: + self.message = self.message[sent:] + + @locked + def ready(self, all_ok, message): + """Callback function for switching state and waking up main thread. + + This function is the only function witch can be called asynchronous. + + The ready can switch Connection to three states: + WAIT_LEN if request was oneway. + SEND_ANSWER if request was processed in normal way. + CLOSED if request throws unexpected exception. + + The one wakes up main thread. + """ + assert self.status == WAIT_PROCESS + if not all_ok: + self.close() + self.wake_up() + return + self.len = '' + if len(message) == 0: + # it was a oneway request, do not write answer + self.message = '' + self.status = WAIT_LEN + else: + self.message = struct.pack('!i', len(message)) + message + self.status = SEND_ANSWER + self.wake_up() + + @locked + def is_writeable(self): + """Return True if connection should be added to write list of select""" + return self.status == SEND_ANSWER + + # it's not necessary, but... + @locked + def is_readable(self): + """Return True if connection should be added to read list of select""" + return self.status in (WAIT_LEN, WAIT_MESSAGE) + + @locked + def is_closed(self): + """Returns True if connection is closed.""" + return self.status == CLOSED + + def fileno(self): + """Returns the file descriptor of the associated socket.""" + return self.socket.fileno() + + def close(self): + """Closes connection""" + self.status = CLOSED + self.socket.close() + + +class TNonblockingServer: + """Non-blocking server.""" + + def __init__(self, + processor, + lsocket, + inputProtocolFactory=None, + outputProtocolFactory=None, + threads=10): + self.processor = processor + self.socket = lsocket + self.in_protocol = inputProtocolFactory or TBinaryProtocolFactory() + self.out_protocol = outputProtocolFactory or self.in_protocol + self.threads = int(threads) + self.clients = {} + self.tasks = Queue.Queue() + self._read, self._write = socket.socketpair() + self.prepared = False + self._stop = False + + def setNumThreads(self, num): + """Set the number of worker threads that should be created.""" + # implement ThreadPool interface + assert not self.prepared, "Can't change number of threads after start" + self.threads = num + + def prepare(self): + """Prepares server for serve requests.""" + if self.prepared: + return + self.socket.listen() + for _ in xrange(self.threads): + thread = Worker(self.tasks) + thread.setDaemon(True) + thread.start() + self.prepared = True + + def wake_up(self): + """Wake up main thread. + + The server usualy waits in select call in we should terminate one. + The simplest way is using socketpair. + + Select always wait to read from the first socket of socketpair. + + In this case, we can just write anything to the second socket from + socketpair. + """ + self._write.send('1') + + def stop(self): + """Stop the server. + + This method causes the serve() method to return. stop() may be invoked + from within your handler, or from another thread. + + After stop() is called, serve() will return but the server will still + be listening on the socket. serve() may then be called again to resume + processing requests. Alternatively, close() may be called after + serve() returns to close the server socket and shutdown all worker + threads. + """ + self._stop = True + self.wake_up() + + def _select(self): + """Does select on open connections.""" + readable = [self.socket.handle.fileno(), self._read.fileno()] + writable = [] + for i, connection in self.clients.items(): + if connection.is_readable(): + readable.append(connection.fileno()) + if connection.is_writeable(): + writable.append(connection.fileno()) + if connection.is_closed(): + del self.clients[i] + return select.select(readable, writable, readable) + + def handle(self): + """Handle requests. + + WARNING! You must call prepare() BEFORE calling handle() + """ + assert self.prepared, "You have to call prepare before handle" + rset, wset, xset = self._select() + for readable in rset: + if readable == self._read.fileno(): + # don't care i just need to clean readable flag + self._read.recv(1024) + elif readable == self.socket.handle.fileno(): + client = self.socket.accept().handle + self.clients[client.fileno()] = Connection(client, + self.wake_up) + else: + connection = self.clients[readable] + connection.read() + if connection.status == WAIT_PROCESS: + itransport = TTransport.TMemoryBuffer(connection.message) + otransport = TTransport.TMemoryBuffer() + iprot = self.in_protocol.getProtocol(itransport) + oprot = self.out_protocol.getProtocol(otransport) + self.tasks.put([self.processor, iprot, oprot, + otransport, connection.ready]) + for writeable in wset: + self.clients[writeable].write() + for oob in xset: + self.clients[oob].close() + del self.clients[oob] + + def close(self): + """Closes the server.""" + for _ in xrange(self.threads): + self.tasks.put([None, None, None, None, None]) + self.socket.close() + self.prepared = False + + def serve(self): + """Serve requests. + + Serve requests forever, or until stop() is called. + """ + self._stop = False + self.prepare() + while not self._stop: + self.handle()
http://git-wip-us.apache.org/repos/asf/stratos/blob/700a6d26/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/server/TProcessPoolServer.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/server/TProcessPoolServer.py b/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/server/TProcessPoolServer.py new file mode 100644 index 0000000..2cd2189 --- /dev/null +++ b/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/server/TProcessPoolServer.py @@ -0,0 +1,118 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + + +import logging +from multiprocessing import Process, Value, Condition + +from TServer import TServer +from thrift.transport.TTransport import TTransportException + + +class TProcessPoolServer(TServer): + """Server with a fixed size pool of worker subprocesses to service requests + + Note that if you need shared state between the handlers - it's up to you! + Written by Dvir Volk, doat.com + """ + def __init__(self, *args): + TServer.__init__(self, *args) + self.numWorkers = 10 + self.workers = [] + self.isRunning = Value('b', False) + self.stopCondition = Condition() + self.postForkCallback = None + + def setPostForkCallback(self, callback): + if not callable(callback): + raise TypeError("This is not a callback!") + self.postForkCallback = callback + + def setNumWorkers(self, num): + """Set the number of worker threads that should be created""" + self.numWorkers = num + + def workerProcess(self): + """Loop getting clients from the shared queue and process them""" + if self.postForkCallback: + self.postForkCallback() + + while self.isRunning.value: + try: + client = self.serverTransport.accept() + self.serveClient(client) + except (KeyboardInterrupt, SystemExit): + return 0 + except Exception, x: + logging.exception(x) + + def serveClient(self, client): + """Process input/output from a client for as long as possible""" + itrans = self.inputTransportFactory.getTransport(client) + otrans = self.outputTransportFactory.getTransport(client) + iprot = self.inputProtocolFactory.getProtocol(itrans) + oprot = self.outputProtocolFactory.getProtocol(otrans) + + try: + while True: + self.processor.process(iprot, oprot) + except TTransportException, tx: + pass + except Exception, x: + logging.exception(x) + + itrans.close() + otrans.close() + + def serve(self): + """Start workers and put into queue""" + # this is a shared state that can tell the workers to exit when False + self.isRunning.value = True + + # first bind and listen to the port + self.serverTransport.listen() + + # fork the children + for i in range(self.numWorkers): + try: + w = Process(target=self.workerProcess) + w.daemon = True + w.start() + self.workers.append(w) + except Exception, x: + logging.exception(x) + + # wait until the condition is set by stop() + while True: + self.stopCondition.acquire() + try: + self.stopCondition.wait() + break + except (SystemExit, KeyboardInterrupt): + break + except Exception, x: + logging.exception(x) + + self.isRunning.value = False + + def stop(self): + self.isRunning.value = False + self.stopCondition.acquire() + self.stopCondition.notify() + self.stopCondition.release() http://git-wip-us.apache.org/repos/asf/stratos/blob/700a6d26/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/server/TServer.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/server/TServer.py b/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/server/TServer.py new file mode 100644 index 0000000..2f24842 --- /dev/null +++ b/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/server/TServer.py @@ -0,0 +1,269 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import Queue +import logging +import os +import sys +import threading +import traceback + +from thrift.Thrift import TProcessor +from thrift.protocol import TBinaryProtocol +from thrift.transport import TTransport + + +class TServer: + """Base interface for a server, which must have a serve() method. + + Three constructors for all servers: + 1) (processor, serverTransport) + 2) (processor, serverTransport, transportFactory, protocolFactory) + 3) (processor, serverTransport, + inputTransportFactory, outputTransportFactory, + inputProtocolFactory, outputProtocolFactory) + """ + def __init__(self, *args): + if (len(args) == 2): + self.__initArgs__(args[0], args[1], + TTransport.TTransportFactoryBase(), + TTransport.TTransportFactoryBase(), + TBinaryProtocol.TBinaryProtocolFactory(), + TBinaryProtocol.TBinaryProtocolFactory()) + elif (len(args) == 4): + self.__initArgs__(args[0], args[1], args[2], args[2], args[3], args[3]) + elif (len(args) == 6): + self.__initArgs__(args[0], args[1], args[2], args[3], args[4], args[5]) + + def __initArgs__(self, processor, serverTransport, + inputTransportFactory, outputTransportFactory, + inputProtocolFactory, outputProtocolFactory): + self.processor = processor + self.serverTransport = serverTransport + self.inputTransportFactory = inputTransportFactory + self.outputTransportFactory = outputTransportFactory + self.inputProtocolFactory = inputProtocolFactory + self.outputProtocolFactory = outputProtocolFactory + + def serve(self): + pass + + +class TSimpleServer(TServer): + """Simple single-threaded server that just pumps around one transport.""" + + def __init__(self, *args): + TServer.__init__(self, *args) + + def serve(self): + self.serverTransport.listen() + while True: + client = self.serverTransport.accept() + itrans = self.inputTransportFactory.getTransport(client) + otrans = self.outputTransportFactory.getTransport(client) + iprot = self.inputProtocolFactory.getProtocol(itrans) + oprot = self.outputProtocolFactory.getProtocol(otrans) + try: + while True: + self.processor.process(iprot, oprot) + except TTransport.TTransportException, tx: + pass + except Exception, x: + logging.exception(x) + + itrans.close() + otrans.close() + + +class TThreadedServer(TServer): + """Threaded server that spawns a new thread per each connection.""" + + def __init__(self, *args, **kwargs): + TServer.__init__(self, *args) + self.daemon = kwargs.get("daemon", False) + + def serve(self): + self.serverTransport.listen() + while True: + try: + client = self.serverTransport.accept() + t = threading.Thread(target=self.handle, args=(client,)) + t.setDaemon(self.daemon) + t.start() + except KeyboardInterrupt: + raise + except Exception, x: + logging.exception(x) + + def handle(self, client): + itrans = self.inputTransportFactory.getTransport(client) + otrans = self.outputTransportFactory.getTransport(client) + iprot = self.inputProtocolFactory.getProtocol(itrans) + oprot = self.outputProtocolFactory.getProtocol(otrans) + try: + while True: + self.processor.process(iprot, oprot) + except TTransport.TTransportException, tx: + pass + except Exception, x: + logging.exception(x) + + itrans.close() + otrans.close() + + +class TThreadPoolServer(TServer): + """Server with a fixed size pool of threads which service requests.""" + + def __init__(self, *args, **kwargs): + TServer.__init__(self, *args) + self.clients = Queue.Queue() + self.threads = 10 + self.daemon = kwargs.get("daemon", False) + + def setNumThreads(self, num): + """Set the number of worker threads that should be created""" + self.threads = num + + def serveThread(self): + """Loop around getting clients from the shared queue and process them.""" + while True: + try: + client = self.clients.get() + self.serveClient(client) + except Exception, x: + logging.exception(x) + + def serveClient(self, client): + """Process input/output from a client for as long as possible""" + itrans = self.inputTransportFactory.getTransport(client) + otrans = self.outputTransportFactory.getTransport(client) + iprot = self.inputProtocolFactory.getProtocol(itrans) + oprot = self.outputProtocolFactory.getProtocol(otrans) + try: + while True: + self.processor.process(iprot, oprot) + except TTransport.TTransportException, tx: + pass + except Exception, x: + logging.exception(x) + + itrans.close() + otrans.close() + + def serve(self): + """Start a fixed number of worker threads and put client into a queue""" + for i in range(self.threads): + try: + t = threading.Thread(target=self.serveThread) + t.setDaemon(self.daemon) + t.start() + except Exception, x: + logging.exception(x) + + # Pump the socket for clients + self.serverTransport.listen() + while True: + try: + client = self.serverTransport.accept() + self.clients.put(client) + except Exception, x: + logging.exception(x) + + +class TForkingServer(TServer): + """A Thrift server that forks a new process for each request + + This is more scalable than the threaded server as it does not cause + GIL contention. + + Note that this has different semantics from the threading server. + Specifically, updates to shared variables will no longer be shared. + It will also not work on windows. + + This code is heavily inspired by SocketServer.ForkingMixIn in the + Python stdlib. + """ + def __init__(self, *args): + TServer.__init__(self, *args) + self.children = [] + + def serve(self): + def try_close(file): + try: + file.close() + except IOError, e: + logging.warning(e, exc_info=True) + + self.serverTransport.listen() + while True: + client = self.serverTransport.accept() + try: + pid = os.fork() + + if pid: # parent + # add before collect, otherwise you race w/ waitpid + self.children.append(pid) + self.collect_children() + + # Parent must close socket or the connection may not get + # closed promptly + itrans = self.inputTransportFactory.getTransport(client) + otrans = self.outputTransportFactory.getTransport(client) + try_close(itrans) + try_close(otrans) + else: + itrans = self.inputTransportFactory.getTransport(client) + otrans = self.outputTransportFactory.getTransport(client) + + iprot = self.inputProtocolFactory.getProtocol(itrans) + oprot = self.outputProtocolFactory.getProtocol(otrans) + + ecode = 0 + try: + try: + while True: + self.processor.process(iprot, oprot) + except TTransport.TTransportException, tx: + pass + except Exception, e: + logging.exception(e) + ecode = 1 + finally: + try_close(itrans) + try_close(otrans) + + os._exit(ecode) + + except TTransport.TTransportException, tx: + pass + except Exception, x: + logging.exception(x) + + def collect_children(self): + while self.children: + try: + pid, status = os.waitpid(0, os.WNOHANG) + except os.error: + pid = None + + if pid: + self.children.remove(pid) + else: + break http://git-wip-us.apache.org/repos/asf/stratos/blob/700a6d26/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/server/__init__.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/server/__init__.py b/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/server/__init__.py new file mode 100644 index 0000000..1bf6e25 --- /dev/null +++ b/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/server/__init__.py @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +__all__ = ['TServer', 'TNonblockingServer'] http://git-wip-us.apache.org/repos/asf/stratos/blob/700a6d26/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/THttpClient.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/THttpClient.py b/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/THttpClient.py new file mode 100644 index 0000000..9ef9535 --- /dev/null +++ b/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/THttpClient.py @@ -0,0 +1,147 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import httplib +import os +import socket +import sys +import urllib +import urlparse +import warnings + +from TTransport import * + + +class THttpClient(TTransportBase): + """Http implementation of TTransport base.""" + + def __init__(self, uri_or_host, port=None, path=None): + """THttpClient supports two different types constructor parameters. + + THttpClient(host, port, path) - deprecated + THttpClient(uri) + + Only the second supports https. + """ + if port is not None: + warnings.warn( + "Please use the THttpClient('http://host:port/path') syntax", + DeprecationWarning, + stacklevel=2) + self.host = uri_or_host + self.port = port + assert path + self.path = path + self.scheme = 'http' + else: + parsed = urlparse.urlparse(uri_or_host) + self.scheme = parsed.scheme + assert self.scheme in ('http', 'https') + if self.scheme == 'http': + self.port = parsed.port or httplib.HTTP_PORT + elif self.scheme == 'https': + self.port = parsed.port or httplib.HTTPS_PORT + self.host = parsed.hostname + self.path = parsed.path + if parsed.query: + self.path += '?%s' % parsed.query + self.__wbuf = StringIO() + self.__http = None + self.__timeout = None + self.__custom_headers = None + + def open(self): + if self.scheme == 'http': + self.__http = httplib.HTTP(self.host, self.port) + else: + self.__http = httplib.HTTPS(self.host, self.port) + + def close(self): + self.__http.close() + self.__http = None + + def isOpen(self): + return self.__http is not None + + def setTimeout(self, ms): + if not hasattr(socket, 'getdefaulttimeout'): + raise NotImplementedError + + if ms is None: + self.__timeout = None + else: + self.__timeout = ms / 1000.0 + + def setCustomHeaders(self, headers): + self.__custom_headers = headers + + def read(self, sz): + return self.__http.file.read(sz) + + def write(self, buf): + self.__wbuf.write(buf) + + def __withTimeout(f): + def _f(*args, **kwargs): + orig_timeout = socket.getdefaulttimeout() + socket.setdefaulttimeout(args[0].__timeout) + result = f(*args, **kwargs) + socket.setdefaulttimeout(orig_timeout) + return result + return _f + + def flush(self): + if self.isOpen(): + self.close() + self.open() + + # Pull data out of buffer + data = self.__wbuf.getvalue() + self.__wbuf = StringIO() + + # HTTP request + self.__http.putrequest('POST', self.path) + + # Write headers + self.__http.putheader('Host', self.host) + self.__http.putheader('Content-Type', 'application/x-thrift') + self.__http.putheader('Content-Length', str(len(data))) + + if not self.__custom_headers or 'User-Agent' not in self.__custom_headers: + user_agent = 'Python/THttpClient' + script = os.path.basename(sys.argv[0]) + if script: + user_agent = '%s (%s)' % (user_agent, urllib.quote(script)) + self.__http.putheader('User-Agent', user_agent) + + if self.__custom_headers: + for key, val in self.__custom_headers.iteritems(): + self.__http.putheader(key, val) + + self.__http.endheaders() + + # Write payload + self.__http.send(data) + + # Get reply to flush the request + self.code, self.message, self.headers = self.__http.getreply() + + # Decorate if we know how to timeout + if hasattr(socket, 'getdefaulttimeout'): + flush = __withTimeout(flush) http://git-wip-us.apache.org/repos/asf/stratos/blob/700a6d26/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/TSSLSocket.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/TSSLSocket.py b/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/TSSLSocket.py new file mode 100644 index 0000000..81e0984 --- /dev/null +++ b/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/TSSLSocket.py @@ -0,0 +1,214 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os +import socket +import ssl + +from thrift.transport import TSocket +from thrift.transport.TTransport import TTransportException + + +class TSSLSocket(TSocket.TSocket): + """ + SSL implementation of client-side TSocket + + This class creates outbound sockets wrapped using the + python standard ssl module for encrypted connections. + + The protocol used is set using the class variable + SSL_VERSION, which must be one of ssl.PROTOCOL_* and + defaults to ssl.PROTOCOL_TLSv1 for greatest security. + """ + SSL_VERSION = ssl.PROTOCOL_TLSv1 + + def __init__(self, + host='localhost', + port=9090, + validate=True, + ca_certs=None, + keyfile=None, + certfile=None, + unix_socket=None): + """Create SSL TSocket + + @param validate: Set to False to disable SSL certificate validation + @type validate: bool + @param ca_certs: Filename to the Certificate Authority pem file, possibly a + file downloaded from: http://curl.haxx.se/ca/cacert.pem This is passed to + the ssl_wrap function as the 'ca_certs' parameter. + @type ca_certs: str + @param keyfile: The private key + @type keyfile: str + @param certfile: The cert file + @type certfile: str + + Raises an IOError exception if validate is True and the ca_certs file is + None, not present or unreadable. + """ + self.validate = validate + self.is_valid = False + self.peercert = None + if not validate: + self.cert_reqs = ssl.CERT_NONE + else: + self.cert_reqs = ssl.CERT_REQUIRED + self.ca_certs = ca_certs + self.keyfile = keyfile + self.certfile = certfile + if validate: + if ca_certs is None or not os.access(ca_certs, os.R_OK): + raise IOError('Certificate Authority ca_certs file "%s" ' + 'is not readable, cannot validate SSL ' + 'certificates.' % (ca_certs)) + TSocket.TSocket.__init__(self, host, port, unix_socket) + + def open(self): + try: + res0 = self._resolveAddr() + for res in res0: + sock_family, sock_type = res[0:2] + ip_port = res[4] + plain_sock = socket.socket(sock_family, sock_type) + self.handle = ssl.wrap_socket(plain_sock, + ssl_version=self.SSL_VERSION, + do_handshake_on_connect=True, + ca_certs=self.ca_certs, + keyfile=self.keyfile, + certfile=self.certfile, + cert_reqs=self.cert_reqs) + self.handle.settimeout(self._timeout) + try: + self.handle.connect(ip_port) + except socket.error, e: + if res is not res0[-1]: + continue + else: + raise e + break + except socket.error, e: + if self._unix_socket: + message = 'Could not connect to secure socket %s: %s' \ + % (self._unix_socket, e) + else: + message = 'Could not connect to %s:%d: %s' % (self.host, self.port, e) + raise TTransportException(type=TTransportException.NOT_OPEN, + message=message) + if self.validate: + self._validate_cert() + + def _validate_cert(self): + """internal method to validate the peer's SSL certificate, and to check the + commonName of the certificate to ensure it matches the hostname we + used to make this connection. Does not support subjectAltName records + in certificates. + + raises TTransportException if the certificate fails validation. + """ + cert = self.handle.getpeercert() + self.peercert = cert + if 'subject' not in cert: + raise TTransportException( + type=TTransportException.NOT_OPEN, + message='No SSL certificate found from %s:%s' % (self.host, self.port)) + fields = cert['subject'] + for field in fields: + # ensure structure we get back is what we expect + if not isinstance(field, tuple): + continue + cert_pair = field[0] + if len(cert_pair) < 2: + continue + cert_key, cert_value = cert_pair[0:2] + if cert_key != 'commonName': + continue + certhost = cert_value + # this check should be performed by some sort of Access Manager + if certhost == self.host: + # success, cert commonName matches desired hostname + self.is_valid = True + return + else: + raise TTransportException( + type=TTransportException.UNKNOWN, + message='Hostname we connected to "%s" doesn\'t match certificate ' + 'provided commonName "%s"' % (self.host, certhost)) + raise TTransportException( + type=TTransportException.UNKNOWN, + message='Could not validate SSL certificate from ' + 'host "%s". Cert=%s' % (self.host, cert)) + + +class TSSLServerSocket(TSocket.TServerSocket): + """SSL implementation of TServerSocket + + This uses the ssl module's wrap_socket() method to provide SSL + negotiated encryption. + """ + SSL_VERSION = ssl.PROTOCOL_TLSv1 + + def __init__(self, + host=None, + port=9090, + certfile='cert.pem', + unix_socket=None): + """Initialize a TSSLServerSocket + + @param certfile: filename of the server certificate, defaults to cert.pem + @type certfile: str + @param host: The hostname or IP to bind the listen socket to, + i.e. 'localhost' for only allowing local network connections. + Pass None to bind to all interfaces. + @type host: str + @param port: The port to listen on for inbound connections. + @type port: int + """ + self.setCertfile(certfile) + TSocket.TServerSocket.__init__(self, host, port) + + def setCertfile(self, certfile): + """Set or change the server certificate file used to wrap new connections. + + @param certfile: The filename of the server certificate, + i.e. '/etc/certs/server.pem' + @type certfile: str + + Raises an IOError exception if the certfile is not present or unreadable. + """ + if not os.access(certfile, os.R_OK): + raise IOError('No such certfile found: %s' % (certfile)) + self.certfile = certfile + + def accept(self): + plain_client, addr = self.handle.accept() + try: + client = ssl.wrap_socket(plain_client, certfile=self.certfile, + server_side=True, ssl_version=self.SSL_VERSION) + except ssl.SSLError, ssl_exc: + # failed handshake/ssl wrap, close socket to client + plain_client.close() + # raise ssl_exc + # We can't raise the exception, because it kills most TServer derived + # serve() methods. + # Instead, return None, and let the TServer instance deal with it in + # other exception handling. (but TSimpleServer dies anyway) + return None + result = TSocket.TSocket() + result.setHandle(client) + return result http://git-wip-us.apache.org/repos/asf/stratos/blob/700a6d26/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/TSocket.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/TSocket.py b/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/TSocket.py new file mode 100644 index 0000000..9e2b384 --- /dev/null +++ b/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/TSocket.py @@ -0,0 +1,176 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import errno +import os +import socket +import sys + +from TTransport import * + + +class TSocketBase(TTransportBase): + def _resolveAddr(self): + if self._unix_socket is not None: + return [(socket.AF_UNIX, socket.SOCK_STREAM, None, None, + self._unix_socket)] + else: + return socket.getaddrinfo(self.host, + self.port, + socket.AF_UNSPEC, + socket.SOCK_STREAM, + 0, + socket.AI_PASSIVE | socket.AI_ADDRCONFIG) + + def close(self): + if self.handle: + self.handle.close() + self.handle = None + + +class TSocket(TSocketBase): + """Socket implementation of TTransport base.""" + + def __init__(self, host='localhost', port=9090, unix_socket=None): + """Initialize a TSocket + + @param host(str) The host to connect to. + @param port(int) The (TCP) port to connect to. + @param unix_socket(str) The filename of a unix socket to connect to. + (host and port will be ignored.) + """ + self.host = host + self.port = port + self.handle = None + self._unix_socket = unix_socket + self._timeout = None + + def setHandle(self, h): + self.handle = h + + def isOpen(self): + return self.handle is not None + + def setTimeout(self, ms): + if ms is None: + self._timeout = None + else: + self._timeout = ms / 1000.0 + + if self.handle is not None: + self.handle.settimeout(self._timeout) + + def open(self): + try: + res0 = self._resolveAddr() + for res in res0: + self.handle = socket.socket(res[0], res[1]) + self.handle.settimeout(self._timeout) + try: + self.handle.connect(res[4]) + except socket.error, e: + if res is not res0[-1]: + continue + else: + raise e + break + except socket.error, e: + if self._unix_socket: + message = 'Could not connect to socket %s' % self._unix_socket + else: + message = 'Could not connect to %s:%d' % (self.host, self.port) + raise TTransportException(type=TTransportException.NOT_OPEN, + message=message) + + def read(self, sz): + try: + buff = self.handle.recv(sz) + except socket.error, e: + if (e.args[0] == errno.ECONNRESET and + (sys.platform == 'darwin' or sys.platform.startswith('freebsd'))): + # freebsd and Mach don't follow POSIX semantic of recv + # and fail with ECONNRESET if peer performed shutdown. + # See corresponding comment and code in TSocket::read() + # in lib/cpp/src/transport/TSocket.cpp. + self.close() + # Trigger the check to raise the END_OF_FILE exception below. + buff = '' + else: + raise + if len(buff) == 0: + raise TTransportException(type=TTransportException.END_OF_FILE, + message='TSocket read 0 bytes') + return buff + + def write(self, buff): + if not self.handle: + raise TTransportException(type=TTransportException.NOT_OPEN, + message='Transport not open') + sent = 0 + have = len(buff) + while sent < have: + plus = self.handle.send(buff) + if plus == 0: + raise TTransportException(type=TTransportException.END_OF_FILE, + message='TSocket sent 0 bytes') + sent += plus + buff = buff[plus:] + + def flush(self): + pass + + +class TServerSocket(TSocketBase, TServerTransportBase): + """Socket implementation of TServerTransport base.""" + + def __init__(self, host=None, port=9090, unix_socket=None): + self.host = host + self.port = port + self._unix_socket = unix_socket + self.handle = None + + def listen(self): + res0 = self._resolveAddr() + for res in res0: + if res[0] is socket.AF_INET6 or res is res0[-1]: + break + + # We need remove the old unix socket if the file exists and + # nobody is listening on it. + if self._unix_socket: + tmp = socket.socket(res[0], res[1]) + try: + tmp.connect(res[4]) + except socket.error, err: + eno, message = err.args + if eno == errno.ECONNREFUSED: + os.unlink(res[4]) + + self.handle = socket.socket(res[0], res[1]) + self.handle.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + if hasattr(self.handle, 'settimeout'): + self.handle.settimeout(None) + self.handle.bind(res[4]) + self.handle.listen(128) + + def accept(self): + client, addr = self.handle.accept() + result = TSocket() + result.setHandle(client) + return result http://git-wip-us.apache.org/repos/asf/stratos/blob/700a6d26/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/TTransport.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/TTransport.py b/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/TTransport.py new file mode 100644 index 0000000..4481371 --- /dev/null +++ b/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/TTransport.py @@ -0,0 +1,330 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from cStringIO import StringIO +from struct import pack, unpack +from thrift.Thrift import TException + + +class TTransportException(TException): + """Custom Transport Exception class""" + + UNKNOWN = 0 + NOT_OPEN = 1 + ALREADY_OPEN = 2 + TIMED_OUT = 3 + END_OF_FILE = 4 + + def __init__(self, type=UNKNOWN, message=None): + TException.__init__(self, message) + self.type = type + + +class TTransportBase: + """Base class for Thrift transport layer.""" + + def isOpen(self): + pass + + def open(self): + pass + + def close(self): + pass + + def read(self, sz): + pass + + def readAll(self, sz): + buff = '' + have = 0 + while (have < sz): + chunk = self.read(sz - have) + have += len(chunk) + buff += chunk + + if len(chunk) == 0: + raise EOFError() + + return buff + + def write(self, buf): + pass + + def flush(self): + pass + + +# This class should be thought of as an interface. +class CReadableTransport: + """base class for transports that are readable from C""" + + # TODO(dreiss): Think about changing this interface to allow us to use + # a (Python, not c) StringIO instead, because it allows + # you to write after reading. + + # NOTE: This is a classic class, so properties will NOT work + # correctly for setting. + @property + def cstringio_buf(self): + """A cStringIO buffer that contains the current chunk we are reading.""" + pass + + def cstringio_refill(self, partialread, reqlen): + """Refills cstringio_buf. + + Returns the currently used buffer (which can but need not be the same as + the old cstringio_buf). partialread is what the C code has read from the + buffer, and should be inserted into the buffer before any more reads. The + return value must be a new, not borrowed reference. Something along the + lines of self._buf should be fine. + + If reqlen bytes can't be read, throw EOFError. + """ + pass + + +class TServerTransportBase: + """Base class for Thrift server transports.""" + + def listen(self): + pass + + def accept(self): + pass + + def close(self): + pass + + +class TTransportFactoryBase: + """Base class for a Transport Factory""" + + def getTransport(self, trans): + return trans + + +class TBufferedTransportFactory: + """Factory transport that builds buffered transports""" + + def getTransport(self, trans): + buffered = TBufferedTransport(trans) + return buffered + + +class TBufferedTransport(TTransportBase, CReadableTransport): + """Class that wraps another transport and buffers its I/O. + + The implementation uses a (configurable) fixed-size read buffer + but buffers all writes until a flush is performed. + """ + DEFAULT_BUFFER = 4096 + + def __init__(self, trans, rbuf_size=DEFAULT_BUFFER): + self.__trans = trans + self.__wbuf = StringIO() + self.__rbuf = StringIO("") + self.__rbuf_size = rbuf_size + + def isOpen(self): + return self.__trans.isOpen() + + def open(self): + return self.__trans.open() + + def close(self): + return self.__trans.close() + + def read(self, sz): + ret = self.__rbuf.read(sz) + if len(ret) != 0: + return ret + + self.__rbuf = StringIO(self.__trans.read(max(sz, self.__rbuf_size))) + return self.__rbuf.read(sz) + + def write(self, buf): + self.__wbuf.write(buf) + + def flush(self): + out = self.__wbuf.getvalue() + # reset wbuf before write/flush to preserve state on underlying failure + self.__wbuf = StringIO() + self.__trans.write(out) + self.__trans.flush() + + # Implement the CReadableTransport interface. + @property + def cstringio_buf(self): + return self.__rbuf + + def cstringio_refill(self, partialread, reqlen): + retstring = partialread + if reqlen < self.__rbuf_size: + # try to make a read of as much as we can. + retstring += self.__trans.read(self.__rbuf_size) + + # but make sure we do read reqlen bytes. + if len(retstring) < reqlen: + retstring += self.__trans.readAll(reqlen - len(retstring)) + + self.__rbuf = StringIO(retstring) + return self.__rbuf + + +class TMemoryBuffer(TTransportBase, CReadableTransport): + """Wraps a cStringIO object as a TTransport. + + NOTE: Unlike the C++ version of this class, you cannot write to it + then immediately read from it. If you want to read from a + TMemoryBuffer, you must either pass a string to the constructor. + TODO(dreiss): Make this work like the C++ version. + """ + + def __init__(self, value=None): + """value -- a value to read from for stringio + + If value is set, this will be a transport for reading, + otherwise, it is for writing""" + if value is not None: + self._buffer = StringIO(value) + else: + self._buffer = StringIO() + + def isOpen(self): + return not self._buffer.closed + + def open(self): + pass + + def close(self): + self._buffer.close() + + def read(self, sz): + return self._buffer.read(sz) + + def write(self, buf): + self._buffer.write(buf) + + def flush(self): + pass + + def getvalue(self): + return self._buffer.getvalue() + + # Implement the CReadableTransport interface. + @property + def cstringio_buf(self): + return self._buffer + + def cstringio_refill(self, partialread, reqlen): + # only one shot at reading... + raise EOFError() + + +class TFramedTransportFactory: + """Factory transport that builds framed transports""" + + def getTransport(self, trans): + framed = TFramedTransport(trans) + return framed + + +class TFramedTransport(TTransportBase, CReadableTransport): + """Class that wraps another transport and frames its I/O when writing.""" + + def __init__(self, trans,): + self.__trans = trans + self.__rbuf = StringIO() + self.__wbuf = StringIO() + + def isOpen(self): + return self.__trans.isOpen() + + def open(self): + return self.__trans.open() + + def close(self): + return self.__trans.close() + + def read(self, sz): + ret = self.__rbuf.read(sz) + if len(ret) != 0: + return ret + + self.readFrame() + return self.__rbuf.read(sz) + + def readFrame(self): + buff = self.__trans.readAll(4) + sz, = unpack('!i', buff) + self.__rbuf = StringIO(self.__trans.readAll(sz)) + + def write(self, buf): + self.__wbuf.write(buf) + + def flush(self): + wout = self.__wbuf.getvalue() + wsz = len(wout) + # reset wbuf before write/flush to preserve state on underlying failure + self.__wbuf = StringIO() + # N.B.: Doing this string concatenation is WAY cheaper than making + # two separate calls to the underlying socket object. Socket writes in + # Python turn out to be REALLY expensive, but it seems to do a pretty + # good job of managing string buffer operations without excessive copies + buf = pack("!i", wsz) + wout + self.__trans.write(buf) + self.__trans.flush() + + # Implement the CReadableTransport interface. + @property + def cstringio_buf(self): + return self.__rbuf + + def cstringio_refill(self, prefix, reqlen): + # self.__rbuf will already be empty here because fastbinary doesn't + # ask for a refill until the previous buffer is empty. Therefore, + # we can start reading new frames immediately. + while len(prefix) < reqlen: + self.readFrame() + prefix += self.__rbuf.getvalue() + self.__rbuf = StringIO(prefix) + return self.__rbuf + + +class TFileObjectTransport(TTransportBase): + """Wraps a file-like object to make it work as a Thrift transport.""" + + def __init__(self, fileobj): + self.fileobj = fileobj + + def isOpen(self): + return True + + def close(self): + self.fileobj.close() + + def read(self, sz): + return self.fileobj.read(sz) + + def write(self, buf): + self.fileobj.write(buf) + + def flush(self): + self.fileobj.flush() http://git-wip-us.apache.org/repos/asf/stratos/blob/700a6d26/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/TTwisted.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/TTwisted.py b/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/TTwisted.py new file mode 100644 index 0000000..3ce3eb2 --- /dev/null +++ b/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/TTwisted.py @@ -0,0 +1,221 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from cStringIO import StringIO + +from zope.interface import implements, Interface, Attribute +from twisted.internet.protocol import Protocol, ServerFactory, ClientFactory, \ + connectionDone +from twisted.internet import defer +from twisted.protocols import basic +from twisted.python import log +from twisted.web import server, resource, http + +from thrift.transport import TTransport + + +class TMessageSenderTransport(TTransport.TTransportBase): + + def __init__(self): + self.__wbuf = StringIO() + + def write(self, buf): + self.__wbuf.write(buf) + + def flush(self): + msg = self.__wbuf.getvalue() + self.__wbuf = StringIO() + self.sendMessage(msg) + + def sendMessage(self, message): + raise NotImplementedError + + +class TCallbackTransport(TMessageSenderTransport): + + def __init__(self, func): + TMessageSenderTransport.__init__(self) + self.func = func + + def sendMessage(self, message): + self.func(message) + + +class ThriftClientProtocol(basic.Int32StringReceiver): + + MAX_LENGTH = 2 ** 31 - 1 + + def __init__(self, client_class, iprot_factory, oprot_factory=None): + self._client_class = client_class + self._iprot_factory = iprot_factory + if oprot_factory is None: + self._oprot_factory = iprot_factory + else: + self._oprot_factory = oprot_factory + + self.recv_map = {} + self.started = defer.Deferred() + + def dispatch(self, msg): + self.sendString(msg) + + def connectionMade(self): + tmo = TCallbackTransport(self.dispatch) + self.client = self._client_class(tmo, self._oprot_factory) + self.started.callback(self.client) + + def connectionLost(self, reason=connectionDone): + for k, v in self.client._reqs.iteritems(): + tex = TTransport.TTransportException( + type=TTransport.TTransportException.END_OF_FILE, + message='Connection closed') + v.errback(tex) + + def stringReceived(self, frame): + tr = TTransport.TMemoryBuffer(frame) + iprot = self._iprot_factory.getProtocol(tr) + (fname, mtype, rseqid) = iprot.readMessageBegin() + + try: + method = self.recv_map[fname] + except KeyError: + method = getattr(self.client, 'recv_' + fname) + self.recv_map[fname] = method + + method(iprot, mtype, rseqid) + + +class ThriftServerProtocol(basic.Int32StringReceiver): + + MAX_LENGTH = 2 ** 31 - 1 + + def dispatch(self, msg): + self.sendString(msg) + + def processError(self, error): + self.transport.loseConnection() + + def processOk(self, _, tmo): + msg = tmo.getvalue() + + if len(msg) > 0: + self.dispatch(msg) + + def stringReceived(self, frame): + tmi = TTransport.TMemoryBuffer(frame) + tmo = TTransport.TMemoryBuffer() + + iprot = self.factory.iprot_factory.getProtocol(tmi) + oprot = self.factory.oprot_factory.getProtocol(tmo) + + d = self.factory.processor.process(iprot, oprot) + d.addCallbacks(self.processOk, self.processError, + callbackArgs=(tmo,)) + + +class IThriftServerFactory(Interface): + + processor = Attribute("Thrift processor") + + iprot_factory = Attribute("Input protocol factory") + + oprot_factory = Attribute("Output protocol factory") + + +class IThriftClientFactory(Interface): + + client_class = Attribute("Thrift client class") + + iprot_factory = Attribute("Input protocol factory") + + oprot_factory = Attribute("Output protocol factory") + + +class ThriftServerFactory(ServerFactory): + + implements(IThriftServerFactory) + + protocol = ThriftServerProtocol + + def __init__(self, processor, iprot_factory, oprot_factory=None): + self.processor = processor + self.iprot_factory = iprot_factory + if oprot_factory is None: + self.oprot_factory = iprot_factory + else: + self.oprot_factory = oprot_factory + + +class ThriftClientFactory(ClientFactory): + + implements(IThriftClientFactory) + + protocol = ThriftClientProtocol + + def __init__(self, client_class, iprot_factory, oprot_factory=None): + self.client_class = client_class + self.iprot_factory = iprot_factory + if oprot_factory is None: + self.oprot_factory = iprot_factory + else: + self.oprot_factory = oprot_factory + + def buildProtocol(self, addr): + p = self.protocol(self.client_class, self.iprot_factory, + self.oprot_factory) + p.factory = self + return p + + +class ThriftResource(resource.Resource): + + allowedMethods = ('POST',) + + def __init__(self, processor, inputProtocolFactory, + outputProtocolFactory=None): + resource.Resource.__init__(self) + self.inputProtocolFactory = inputProtocolFactory + if outputProtocolFactory is None: + self.outputProtocolFactory = inputProtocolFactory + else: + self.outputProtocolFactory = outputProtocolFactory + self.processor = processor + + def getChild(self, path, request): + return self + + def _cbProcess(self, _, request, tmo): + msg = tmo.getvalue() + request.setResponseCode(http.OK) + request.setHeader("content-type", "application/x-thrift") + request.write(msg) + request.finish() + + def render_POST(self, request): + request.content.seek(0, 0) + data = request.content.read() + tmi = TTransport.TMemoryBuffer(data) + tmo = TTransport.TMemoryBuffer() + + iprot = self.inputProtocolFactory.getProtocol(tmi) + oprot = self.outputProtocolFactory.getProtocol(tmo) + + d = self.processor.process(iprot, oprot) + d.addCallback(self._cbProcess, request, tmo) + return server.NOT_DONE_YET http://git-wip-us.apache.org/repos/asf/stratos/blob/700a6d26/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/TZlibTransport.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/TZlibTransport.py b/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/TZlibTransport.py new file mode 100644 index 0000000..2bdfd14 --- /dev/null +++ b/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/TZlibTransport.py @@ -0,0 +1,249 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +"""TZlibTransport provides a compressed transport and transport factory +class, using the python standard library zlib module to implement +data compression. +""" + +from __future__ import division +import zlib +from cStringIO import StringIO + +from TTransport import TTransportBase, CReadableTransport + + +class TZlibTransportFactory(object): + """Factory transport that builds zlib compressed transports. + + This factory caches the last single client/transport that it was passed + and returns the same TZlibTransport object that was created. + + This caching means the TServer class will get the _same_ transport + object for both input and output transports from this factory. + (For non-threaded scenarios only, since the cache only holds one object) + + The purpose of this caching is to allocate only one TZlibTransport where + only one is really needed (since it must have separate read/write buffers), + and makes the statistics from getCompSavings() and getCompRatio() + easier to understand. + """ + # class scoped cache of last transport given and zlibtransport returned + _last_trans = None + _last_z = None + + def getTransport(self, trans, compresslevel=9): + """Wrap a transport, trans, with the TZlibTransport + compressed transport class, returning a new + transport to the caller. + + @param compresslevel: The zlib compression level, ranging + from 0 (no compression) to 9 (best compression). Defaults to 9. + @type compresslevel: int + + This method returns a TZlibTransport which wraps the + passed C{trans} TTransport derived instance. + """ + if trans == self._last_trans: + return self._last_z + ztrans = TZlibTransport(trans, compresslevel) + self._last_trans = trans + self._last_z = ztrans + return ztrans + + +class TZlibTransport(TTransportBase, CReadableTransport): + """Class that wraps a transport with zlib, compressing writes + and decompresses reads, using the python standard + library zlib module. + """ + # Read buffer size for the python fastbinary C extension, + # the TBinaryProtocolAccelerated class. + DEFAULT_BUFFSIZE = 4096 + + def __init__(self, trans, compresslevel=9): + """Create a new TZlibTransport, wrapping C{trans}, another + TTransport derived object. + + @param trans: A thrift transport object, i.e. a TSocket() object. + @type trans: TTransport + @param compresslevel: The zlib compression level, ranging + from 0 (no compression) to 9 (best compression). Default is 9. + @type compresslevel: int + """ + self.__trans = trans + self.compresslevel = compresslevel + self.__rbuf = StringIO() + self.__wbuf = StringIO() + self._init_zlib() + self._init_stats() + + def _reinit_buffers(self): + """Internal method to initialize/reset the internal StringIO objects + for read and write buffers. + """ + self.__rbuf = StringIO() + self.__wbuf = StringIO() + + def _init_stats(self): + """Internal method to reset the internal statistics counters + for compression ratios and bandwidth savings. + """ + self.bytes_in = 0 + self.bytes_out = 0 + self.bytes_in_comp = 0 + self.bytes_out_comp = 0 + + def _init_zlib(self): + """Internal method for setting up the zlib compression and + decompression objects. + """ + self._zcomp_read = zlib.decompressobj() + self._zcomp_write = zlib.compressobj(self.compresslevel) + + def getCompRatio(self): + """Get the current measured compression ratios (in,out) from + this transport. + + Returns a tuple of: + (inbound_compression_ratio, outbound_compression_ratio) + + The compression ratios are computed as: + compressed / uncompressed + + E.g., data that compresses by 10x will have a ratio of: 0.10 + and data that compresses to half of ts original size will + have a ratio of 0.5 + + None is returned if no bytes have yet been processed in + a particular direction. + """ + r_percent, w_percent = (None, None) + if self.bytes_in > 0: + r_percent = self.bytes_in_comp / self.bytes_in + if self.bytes_out > 0: + w_percent = self.bytes_out_comp / self.bytes_out + return (r_percent, w_percent) + + def getCompSavings(self): + """Get the current count of saved bytes due to data + compression. + + Returns a tuple of: + (inbound_saved_bytes, outbound_saved_bytes) + + Note: if compression is actually expanding your + data (only likely with very tiny thrift objects), then + the values returned will be negative. + """ + r_saved = self.bytes_in - self.bytes_in_comp + w_saved = self.bytes_out - self.bytes_out_comp + return (r_saved, w_saved) + + def isOpen(self): + """Return the underlying transport's open status""" + return self.__trans.isOpen() + + def open(self): + """Open the underlying transport""" + self._init_stats() + return self.__trans.open() + + def listen(self): + """Invoke the underlying transport's listen() method""" + self.__trans.listen() + + def accept(self): + """Accept connections on the underlying transport""" + return self.__trans.accept() + + def close(self): + """Close the underlying transport,""" + self._reinit_buffers() + self._init_zlib() + return self.__trans.close() + + def read(self, sz): + """Read up to sz bytes from the decompressed bytes buffer, and + read from the underlying transport if the decompression + buffer is empty. + """ + ret = self.__rbuf.read(sz) + if len(ret) > 0: + return ret + # keep reading from transport until something comes back + while True: + if self.readComp(sz): + break + ret = self.__rbuf.read(sz) + return ret + + def readComp(self, sz): + """Read compressed data from the underlying transport, then + decompress it and append it to the internal StringIO read buffer + """ + zbuf = self.__trans.read(sz) + zbuf = self._zcomp_read.unconsumed_tail + zbuf + buf = self._zcomp_read.decompress(zbuf) + self.bytes_in += len(zbuf) + self.bytes_in_comp += len(buf) + old = self.__rbuf.read() + self.__rbuf = StringIO(old + buf) + if len(old) + len(buf) == 0: + return False + return True + + def write(self, buf): + """Write some bytes, putting them into the internal write + buffer for eventual compression. + """ + self.__wbuf.write(buf) + + def flush(self): + """Flush any queued up data in the write buffer and ensure the + compression buffer is flushed out to the underlying transport + """ + wout = self.__wbuf.getvalue() + if len(wout) > 0: + zbuf = self._zcomp_write.compress(wout) + self.bytes_out += len(wout) + self.bytes_out_comp += len(zbuf) + else: + zbuf = '' + ztail = self._zcomp_write.flush(zlib.Z_SYNC_FLUSH) + self.bytes_out_comp += len(ztail) + if (len(zbuf) + len(ztail)) > 0: + self.__wbuf = StringIO() + self.__trans.write(zbuf + ztail) + self.__trans.flush() + + @property + def cstringio_buf(self): + """Implement the CReadableTransport interface""" + return self.__rbuf + + def cstringio_refill(self, partialread, reqlen): + """Implement the CReadableTransport interface for refill""" + retstring = partialread + if reqlen < self.DEFAULT_BUFFSIZE: + retstring += self.read(self.DEFAULT_BUFFSIZE) + while len(retstring) < reqlen: + retstring += self.read(reqlen - len(retstring)) + self.__rbuf = StringIO(retstring) + return self.__rbuf http://git-wip-us.apache.org/repos/asf/stratos/blob/700a6d26/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/__init__.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/__init__.py b/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/__init__.py new file mode 100644 index 0000000..c9596d9 --- /dev/null +++ b/tools/python-cartridge-agent/cartridge-agent/modules/databridge/thrift/thrift/transport/__init__.py @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +__all__ = ['TTransport', 'TSocket', 'THttpClient', 'TZlibTransport'] http://git-wip-us.apache.org/repos/asf/stratos/blob/700a6d26/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/__init__.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/__init__.py b/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/__init__.py new file mode 100644 index 0000000..0de6991 --- /dev/null +++ b/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/__init__.py @@ -0,0 +1 @@ +__author__ = 'chamilad' http://git-wip-us.apache.org/repos/asf/stratos/blob/700a6d26/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/exception/__init__.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/exception/__init__.py b/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/exception/__init__.py new file mode 100644 index 0000000..0de6991 --- /dev/null +++ b/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/exception/__init__.py @@ -0,0 +1 @@ +__author__ = 'chamilad' http://git-wip-us.apache.org/repos/asf/stratos/blob/700a6d26/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/exception/datapublisherexception.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/exception/datapublisherexception.py b/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/exception/datapublisherexception.py new file mode 100644 index 0000000..59e103d --- /dev/null +++ b/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/exception/datapublisherexception.py @@ -0,0 +1,13 @@ +class DataPublisherException(Exception): + + def __init__(self, msg): + super(self, msg) + self.message = msg + + def get_message(self): + """ + The message provided when the exception is raised + :return: message + :rtype: str + """ + return self.message http://git-wip-us.apache.org/repos/asf/stratos/blob/700a6d26/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/logpublisher.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/logpublisher.py b/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/logpublisher.py new file mode 100644 index 0000000..cf00b0b --- /dev/null +++ b/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/logpublisher.py @@ -0,0 +1,238 @@ +import os +import datetime +import logging +from threading import Thread, current_thread + +from ..databridge.agent import * +from ..config.cartridgeagentconfiguration import CartridgeAgentConfiguration +from ..util import cartridgeagentutils +from exception.datapublisherexception import DataPublisherException + + +class LogPublisher(Thread): + + def __init__(self, file_path, stream_definition, tenant_id, alias, date_time, member_id): + Thread.__init__(self) + + logging.basicConfig(level=logging.DEBUG) + self.log = logging.getLogger(__name__) + + self.file_path = file_path + self.thrift_publisher = ThriftPublisher( + DataPublisherConfiguration.get_instance().monitoring_server_ip, + DataPublisherConfiguration.get_instance().monitoring_server_port, + DataPublisherConfiguration.get_instance().admin_username, + DataPublisherConfiguration.get_instance().admin_password, + stream_definition) + self.tenant_id = tenant_id + self.alias = alias + self.datetime = date_time + self.member_id = member_id + + self.terminated = False + + def run(self): + if os.path.isfile(self.file_path) and os.access(self.file_path, os.R_OK): + self.log.info("Starting log publisher for file: " + self.file_path + ", thread: " + current_thread()) + # open file and keep reading for new entries + read_file = open(self.file_path, "r") + read_file.seek(os.stat(self.file_path)[6]) # go to the end of the file + + while not self.terminated: + where = read_file.tell() # where the seeker is in the file + line = read_file.readline() # read the current line + if not line: + # no new line entered + time.sleep(1) + read_file.seek(where) # set seeker + else: + # new line detected, create event object + event = LogEvent() + event.metaData.append(self.member_id) + event.payloadData.append(self.tenant_id) + event.payloadData.append(self.alias) + event.payloadData.append("") + event.payloadData.append(self.datetime) + event.payloadData.append("") + event.payloadData.append(line) + event.payloadData.append("") + event.payloadData.append("") + event.payloadData.append(self.member_id) + event.payloadData.append("") + + self.thrift_publisher.publish(event) + + self.thrift_publisher.disconnect() # dicsonnect the publisher upon being terminated + else: + raise DataPublisherException("Unable to read the file at path %r" % self.file_path) + + def terminate(self): + """ + Allows the LogPublisher thread to be terminated to stop publishing to BAM/CEP. Allow a minimum of 1 second delay + to take effect. + """ + self.terminated = True + + +class LogPublisherManager(Thread): + """ + A log publishing thread management thread which maintains a log publisher for each log file. Also defines a stream + definition and the BAM/CEP server information for a single publishing context. + """ + + def __init__(self, logfile_paths): + Thread.__init__(self) + self.logfile_paths = logfile_paths + self.publishers = {} + self.ports = [] + self.ports.append(DataPublisherConfiguration.get_instance().monitoring_server_port) + self.ports.append(DataPublisherConfiguration.get_instance().monitoring_server_secure_port) + + cartridgeagentutils.wait_until_ports_active(DataPublisherConfiguration.get_instance().monitoring_server_ip, self.ports) + ports_active = cartridgeagentutils.check_ports_active(DataPublisherConfiguration.get_instance().monitoring_server_ip, self.ports) + if not ports_active: + raise DataPublisherException("Monitoring server not active, data publishing is aborted") + + #stream definition + self.stream_definition = StreamDefinition() + valid_tenant_id = LogPublisherManager.get_valid_tenant_id(CartridgeAgentConfiguration.tenant_id) + alias = LogPublisherManager.get_alias(CartridgeAgentConfiguration.cluster_id) + stream_name = "logs." + valid_tenant_id + "." \ + + alias + "." + LogPublisherManager.get_current_date() + + stream_version = "1.0.0" + self.stream_definition.name = stream_name + self.stream_definition.version = stream_version + self.stream_definition.description = "Apache Stratos Instance Log Publisher" + self.stream_definition.add_metadata_attribute("memberId", 'STRING') + + self.stream_definition.add_payloaddata_attribute("tenantID", "STRING") + self.stream_definition.add_payloaddata_attribute("serverName", "STRING") + self.stream_definition.add_payloaddata_attribute("appName", "STRING") + self.stream_definition.add_payloaddata_attribute("logTime", "STRING") + self.stream_definition.add_payloaddata_attribute("priority", "STRING") + self.stream_definition.add_payloaddata_attribute("message", "STRING") + self.stream_definition.add_payloaddata_attribute("logger", "STRING") + self.stream_definition.add_payloaddata_attribute("ip", "STRING") + self.stream_definition.add_payloaddata_attribute("instance", "STRING") + self.stream_definition.add_payloaddata_attribute("stacktrace", "STRING") + + def run(self): + if self.logfile_paths is not None and len(self.logfile_paths): + for log_path in self.logfile_paths: + # thread for each log file + publisher = self.get_publisher(log_path) + publisher.start() + + def get_publisher(self, log_path): + """ + Retrieve the publisher for the specified log file path. Creates a new LogPublisher if one is not available + :return: The LogPublisher object + :rtype : LogPublisher + """ + if log_path not in self.publishers: + self.publishers[log_path] = LogPublisher(log_path, self.stream_definition) + + return self.publishers[log_path] + + def terminate_publisher(self, log_path): + """ + Terminates the LogPublisher thread associated with the specified log file + """ + if log_path in self.publishers: + self.publishers[log_path].terminate() + + def terminate_all_publishers(self): + """ + Terminates all LogPublisher threads + """ + for publisher in self.publishers: + publisher.terminate() + + @staticmethod + def get_valid_tenant_id(tenant_id): + if tenant_id == "-1" or tenant_id == "-1234": + return "0" + + return tenant_id + + @staticmethod + def get_alias(cluster_id): + alias = "" + try: + alias = cluster_id.split("\\.")[0] + except: + alias = cluster_id + + return alias + + @staticmethod + def get_current_date(): + """ + Returns the current date formatted as yyyy-MM-dd + :return: Formatted date string + :rtype : str + """ + return datetime.date.today().strftime("%Y.%m.%d") + + +class DataPublisherConfiguration: + """ + A singleton implementation to access configuration information for data publishing to BAM/CEP + TODO: perfect singleton impl ex: Borg + """ + + __instance = None + logging.basicConfig(level=logging.DEBUG) + log = logging.getLogger(__name__) + + @staticmethod + def get_instance(): + """ + Singleton instance retriever + :return: Instnace + :rtype : DataPublisherConfiguration + """ + if DataPublisherConfiguration.__instance is None: + DataPublisherConfiguration.__instance = DataPublisherConfiguration() + + return DataPublisherConfiguration.__instance + + def __init__(self): + self.enabled = False + self.monitoring_server_ip = None + self.monitoring_server_port = None + self.monitoring_server_secure_port = None + self.admin_username = None + self.admin_password = None + self.read_config() + + def read_config(self): + self.enabled = True if CartridgeAgentConfiguration.read_property("enable.data.publisher", False).strip().lower() == "true" else False + if not self.enabled: + DataPublisherConfiguration.log.info("Data Publisher disabled") + return + + DataPublisherConfiguration.log.info("Data Publisher enabled") + + self.monitoring_server_ip = CartridgeAgentConfiguration.read_property("monitoring.server.ip", False) + if self.monitoring_server_ip.strip() == "": + raise RuntimeError("System property not found: monitoring.server.ip") + + self.monitoring_server_port = CartridgeAgentConfiguration.read_property("monitoring.server.port", False) + if self.monitoring_server_port.strip() == "": + raise RuntimeError("System property not found: monitoring.server.port") + + self.monitoring_server_secure_port = CartridgeAgentConfiguration.read_property("monitoring.server.secure.port", False) + if self.monitoring_server_secure_port.strip() == "": + raise RuntimeError("System property not found: monitoring.server.secure.port") + + self.admin_username = CartridgeAgentConfiguration.read_property("monitoring.server.admin.username", False) + if self.admin_username.strip() == "": + raise RuntimeError("System property not found: monitoring.server.admin.username") + + self.admin_password = CartridgeAgentConfiguration.read_property("monitoring.server.admin.password", False) + if self.admin_password.strip() == "": + raise RuntimeError("System property not found: monitoring.server.admin.password") + + DataPublisherConfiguration.log.info("Data Publisher configuration initialized") http://git-wip-us.apache.org/repos/asf/stratos/blob/700a6d26/tools/python-cartridge-agent/cartridge-agent/modules/logpublisher/__init__.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/logpublisher/__init__.py b/tools/python-cartridge-agent/cartridge-agent/modules/logpublisher/__init__.py deleted file mode 100644 index 0de6991..0000000 --- a/tools/python-cartridge-agent/cartridge-agent/modules/logpublisher/__init__.py +++ /dev/null @@ -1 +0,0 @@ -__author__ = 'chamilad'
