Fixed wrongly formatted event fired to CEP after 60 minutes Renamed test folder to tests for consistency
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/75cb9942 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/75cb9942 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/75cb9942 Branch: refs/heads/master Commit: 75cb99421a2efc5067140c626893ca6ecf804a31 Parents: c76076e Author: Chamila de Alwis <[email protected]> Authored: Mon Oct 27 17:55:37 2014 +0530 Committer: Chamila de Alwis <[email protected]> Committed: Mon Oct 27 17:55:37 2014 +0530 ---------------------------------------------------------------------- .../cartridgeagent/agent.py | 6 + .../cartridgeagent/modules/databridge/agent.py | 32 +++-- tools/python_cartridgeagent/test/__init__.py | 16 --- tools/python_cartridgeagent/test/asynctest.txt | 1 - tools/python_cartridgeagent/test/test_util.py | 133 ------------------- tools/python_cartridgeagent/tests/__init__.py | 16 +++ tools/python_cartridgeagent/tests/asynctest.txt | 1 + tools/python_cartridgeagent/tests/test_util.py | 133 +++++++++++++++++++ 8 files changed, 178 insertions(+), 160 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/75cb9942/tools/python_cartridgeagent/cartridgeagent/agent.py ---------------------------------------------------------------------- diff --git a/tools/python_cartridgeagent/cartridgeagent/agent.py b/tools/python_cartridgeagent/cartridgeagent/agent.py index 9f1a972..4336990 100644 --- a/tools/python_cartridgeagent/cartridgeagent/agent.py +++ b/tools/python_cartridgeagent/cartridgeagent/agent.py @@ -17,6 +17,7 @@ # under the License. import threading +import sys from modules.exception.parameternotfoundexception import ParameterNotFoundException from modules.subscriber import eventsubscriber @@ -327,7 +328,12 @@ class CartridgeAgent(threading.Thread): self.log.exception("Error processing tenant unSubscribed event") +def uncaught_exception_mg(exctype, value, tb): + log = LogFactory().get_log(__name__) + log.exception("UNCAUGHT EXCEPTION:", value) + def main(): + sys.excepthook = uncaught_exception_mg cartridge_agent = CartridgeAgent() log = LogFactory().get_log(__name__) http://git-wip-us.apache.org/repos/asf/stratos/blob/75cb9942/tools/python_cartridgeagent/cartridgeagent/modules/databridge/agent.py ---------------------------------------------------------------------- diff --git a/tools/python_cartridgeagent/cartridgeagent/modules/databridge/agent.py b/tools/python_cartridgeagent/cartridgeagent/modules/databridge/agent.py index 5d341dd..96762f2 100644 --- a/tools/python_cartridgeagent/cartridgeagent/modules/databridge/agent.py +++ b/tools/python_cartridgeagent/cartridgeagent/modules/databridge/agent.py @@ -156,25 +156,37 @@ class ThriftPublisher: :param ThriftEvent event: The log event to be published :return: void """ - event_bundler = EventBundle() - event_bundler.addStringAttribute(self.stream_id) - event_bundler.addLongAttribute(time.time() * 1000) - ThriftPublisher.assign_attributes(event.metaData, event_bundler) - ThriftPublisher.assign_attributes(event.correlationData, event_bundler) - ThriftPublisher.assign_attributes(event.payloadData, event_bundler) + + event_bundle = self.create_event_bundle(event) try: - self.__publisher.publish(event_bundler) + self.__publisher.publish(event_bundle) except ThriftSessionExpiredException as ex: self.log.debug("ThriftSession expired. Reconnecting") self.__publisher.connect(self.username, self.password) - self.__publisher.defineStream(str(self.stream_definition)) - self.stream_id = self.__publisher.streamId self.log.debug("connected! stream ID: %r" % self.stream_id) - self.__publisher.publish(event_bundler) + + self.publish(event) self.log.debug("Published event to thrift stream [%r]" % self.stream_id) + def create_event_bundle(self, event): + """ + Creates an EventBundle object to be published to the Thrift stream + + :param ThriftEvent event: + :return: EventBundle event bundle object + """ + + event_bundle = EventBundle() + event_bundle.addStringAttribute(self.stream_id) + event_bundle.addLongAttribute(time.time() * 1000) + ThriftPublisher.assign_attributes(event.metaData, event_bundle) + ThriftPublisher.assign_attributes(event.correlationData, event_bundle) + ThriftPublisher.assign_attributes(event.payloadData, event_bundle) + + return event_bundle + def disconnect(self): """ Disconnect the thrift publisher http://git-wip-us.apache.org/repos/asf/stratos/blob/75cb9942/tools/python_cartridgeagent/test/__init__.py ---------------------------------------------------------------------- diff --git a/tools/python_cartridgeagent/test/__init__.py b/tools/python_cartridgeagent/test/__init__.py deleted file mode 100644 index 13a8339..0000000 --- a/tools/python_cartridgeagent/test/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. http://git-wip-us.apache.org/repos/asf/stratos/blob/75cb9942/tools/python_cartridgeagent/test/asynctest.txt ---------------------------------------------------------------------- diff --git a/tools/python_cartridgeagent/test/asynctest.txt b/tools/python_cartridgeagent/test/asynctest.txt deleted file mode 100644 index b676e7d..0000000 --- a/tools/python_cartridgeagent/test/asynctest.txt +++ /dev/null @@ -1 +0,0 @@ -1414239655582.5959 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/75cb9942/tools/python_cartridgeagent/test/test_util.py ---------------------------------------------------------------------- diff --git a/tools/python_cartridgeagent/test/test_util.py b/tools/python_cartridgeagent/test/test_util.py deleted file mode 100644 index 63c0cc7..0000000 --- a/tools/python_cartridgeagent/test/test_util.py +++ /dev/null @@ -1,133 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -from ..cartridgeagent.modules.util.asyncscheduledtask import * -from ..cartridgeagent.modules.util import cartridgeagentutils -import time -import socket -from threading import Thread - -ASYNC_WRITE_FILE = "asynctest.txt" - - -def test_async_task(): - with open(ASYNC_WRITE_FILE, "r") as f: - init_context = f.read() - - test_task = TestTask() - astask = ScheduledExecutor(1, test_task) - start_time = time.time() * 1000 - astask.start() - contents_changed = False - timeout = 10 #seconds - - # wait till file content is written - while not contents_changed and (time.time() * 1000 - start_time) < (10 * 1000): - time.sleep(2) - with open(ASYNC_WRITE_FILE, "r") as f: - now_content = f.read() - - if init_context != now_content: - contents_changed = True - - astask.terminate() - f = open(ASYNC_WRITE_FILE, "r") - end_time = float(f.read()) - assert (end_time - start_time) >= 1 * 1000, "Task was executed before specified delay" - - -class TestTask(AbstractAsyncScheduledTask): - - def execute_task(self): - with open(ASYNC_WRITE_FILE, "w") as f: - f.seek(0) - f.truncate() - f.write("%1.4f" % (time.time()*1000)) - - -def test_decrypt_password_success(): - # def mockgetlog(path): - # return mocklog - # - # monkeypatch.delattr("LogFactory().get_log") - # TODO: enable logging in cartridgeagentutils - - plain_password = "plaintext" - secret_key = "tvnw63ufg9gh5111" - encrypted_password= "jP1lZ5xMlpLzu8MbY2Porg==" - - decrypted_password = cartridgeagentutils.decrypt_password(encrypted_password, secret_key) - #print decrypted_password - - assert decrypted_password == plain_password, "Password decryption failed" - - -def test_decrypt_password_failure(): - plain_password = "plaintext" - secret_key = "notsecretkeyhere" - encrypted_password= "jP1lZ5xMlpLzu8MbY2Porg==" - assert cartridgeagentutils.decrypt_password(encrypted_password, secret_key) != plain_password, "Password decrypted for wrong key" - - -def test_create_dir_normal(): - assert True - - -def test_create_dir_system_path(): - assert True - - -def test_create_dir_existing_dir(): - assert True - - -def test_wait_for_ports_activity_normal(): - portnumber = 12345 - listener = PortListener(portnumber) - listener.start() - - assert cartridgeagentutils.check_ports_active(socket.gethostbyname(socket.gethostname()), [str(portnumber)]) - - -class PortListener(Thread): - - def __init__(self, portnumber): - Thread.__init__(self) - self.portnumber = portnumber - self.terminated = False - - def run(self): - s = socket.socket() - host = socket.gethostname() - - s.bind((host, self.portnumber)) - s.listen(5) - - #while not self.terminated: - c, addr = s.accept() # Establish connection with client. - #print 'Got connection from', addr - c.send('Thank you for connecting') - c.close() - - s.close() - - def terminate(self): - self.terminated = True - - -def test_wait_for_ports_activity_non_existent(): - assert cartridgeagentutils.check_ports_active(socket.gethostbyname(socket.gethostname()), [str(34565)]) == False http://git-wip-us.apache.org/repos/asf/stratos/blob/75cb9942/tools/python_cartridgeagent/tests/__init__.py ---------------------------------------------------------------------- diff --git a/tools/python_cartridgeagent/tests/__init__.py b/tools/python_cartridgeagent/tests/__init__.py new file mode 100644 index 0000000..13a8339 --- /dev/null +++ b/tools/python_cartridgeagent/tests/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. http://git-wip-us.apache.org/repos/asf/stratos/blob/75cb9942/tools/python_cartridgeagent/tests/asynctest.txt ---------------------------------------------------------------------- diff --git a/tools/python_cartridgeagent/tests/asynctest.txt b/tools/python_cartridgeagent/tests/asynctest.txt new file mode 100644 index 0000000..b676e7d --- /dev/null +++ b/tools/python_cartridgeagent/tests/asynctest.txt @@ -0,0 +1 @@ +1414239655582.5959 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/75cb9942/tools/python_cartridgeagent/tests/test_util.py ---------------------------------------------------------------------- diff --git a/tools/python_cartridgeagent/tests/test_util.py b/tools/python_cartridgeagent/tests/test_util.py new file mode 100644 index 0000000..63c0cc7 --- /dev/null +++ b/tools/python_cartridgeagent/tests/test_util.py @@ -0,0 +1,133 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from ..cartridgeagent.modules.util.asyncscheduledtask import * +from ..cartridgeagent.modules.util import cartridgeagentutils +import time +import socket +from threading import Thread + +ASYNC_WRITE_FILE = "asynctest.txt" + + +def test_async_task(): + with open(ASYNC_WRITE_FILE, "r") as f: + init_context = f.read() + + test_task = TestTask() + astask = ScheduledExecutor(1, test_task) + start_time = time.time() * 1000 + astask.start() + contents_changed = False + timeout = 10 #seconds + + # wait till file content is written + while not contents_changed and (time.time() * 1000 - start_time) < (10 * 1000): + time.sleep(2) + with open(ASYNC_WRITE_FILE, "r") as f: + now_content = f.read() + + if init_context != now_content: + contents_changed = True + + astask.terminate() + f = open(ASYNC_WRITE_FILE, "r") + end_time = float(f.read()) + assert (end_time - start_time) >= 1 * 1000, "Task was executed before specified delay" + + +class TestTask(AbstractAsyncScheduledTask): + + def execute_task(self): + with open(ASYNC_WRITE_FILE, "w") as f: + f.seek(0) + f.truncate() + f.write("%1.4f" % (time.time()*1000)) + + +def test_decrypt_password_success(): + # def mockgetlog(path): + # return mocklog + # + # monkeypatch.delattr("LogFactory().get_log") + # TODO: enable logging in cartridgeagentutils + + plain_password = "plaintext" + secret_key = "tvnw63ufg9gh5111" + encrypted_password= "jP1lZ5xMlpLzu8MbY2Porg==" + + decrypted_password = cartridgeagentutils.decrypt_password(encrypted_password, secret_key) + #print decrypted_password + + assert decrypted_password == plain_password, "Password decryption failed" + + +def test_decrypt_password_failure(): + plain_password = "plaintext" + secret_key = "notsecretkeyhere" + encrypted_password= "jP1lZ5xMlpLzu8MbY2Porg==" + assert cartridgeagentutils.decrypt_password(encrypted_password, secret_key) != plain_password, "Password decrypted for wrong key" + + +def test_create_dir_normal(): + assert True + + +def test_create_dir_system_path(): + assert True + + +def test_create_dir_existing_dir(): + assert True + + +def test_wait_for_ports_activity_normal(): + portnumber = 12345 + listener = PortListener(portnumber) + listener.start() + + assert cartridgeagentutils.check_ports_active(socket.gethostbyname(socket.gethostname()), [str(portnumber)]) + + +class PortListener(Thread): + + def __init__(self, portnumber): + Thread.__init__(self) + self.portnumber = portnumber + self.terminated = False + + def run(self): + s = socket.socket() + host = socket.gethostname() + + s.bind((host, self.portnumber)) + s.listen(5) + + #while not self.terminated: + c, addr = s.accept() # Establish connection with client. + #print 'Got connection from', addr + c.send('Thank you for connecting') + c.close() + + s.close() + + def terminate(self): + self.terminated = True + + +def test_wait_for_ports_activity_non_existent(): + assert cartridgeagentutils.check_ports_active(socket.gethostbyname(socket.gethostname()), [str(34565)]) == False
