Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-213-Sporadic-tests-failures-over-locked-database-issue 
[created] 0942b2e2e


wip


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/0942b2e2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/0942b2e2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/0942b2e2

Branch: refs/heads/ARIA-213-Sporadic-tests-failures-over-locked-database-issue
Commit: 0942b2e2ee714a89a620b4165d8530b285133704
Parents: 16fcca4
Author: max-orlov <[email protected]>
Authored: Tue May 9 17:24:31 2017 +0300
Committer: max-orlov <[email protected]>
Committed: Tue May 9 17:24:31 2017 +0300

----------------------------------------------------------------------
 aria/logger.py                                  | 22 ++++++--------------
 aria/orchestrator/context/common.py             | 10 +++------
 aria/orchestrator/workflows/executor/process.py |  2 ++
 aria/storage/instrumentation.py                 | 21 ++++++++++++++++---
 tests/orchestrator/context/test_operation.py    |  5 +++--
 5 files changed, 32 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0942b2e2/aria/logger.py
----------------------------------------------------------------------
diff --git a/aria/logger.py b/aria/logger.py
index 97d3878..9214bd9 100644
--- a/aria/logger.py
+++ b/aria/logger.py
@@ -114,14 +114,13 @@ def create_console_log_handler(level=logging.DEBUG, 
formatter=None):
     return console
 
 
-def create_sqla_log_handler(session, engine, log_cls, execution_id, 
level=logging.DEBUG):
+def create_sqla_log_handler(model, log_cls, execution_id, level=logging.DEBUG):
 
     # This is needed since the engine and session are entirely new we need to 
reflect the db
     # schema of the logging model into the engine and session.
-    log_cls.__table__.create(bind=engine, checkfirst=True)
+    log_cls.__table__.create(bind=model.log._engine, checkfirst=True)
 
-    return _SQLAlchemyHandler(session=session,
-                              engine=engine,
+    return _SQLAlchemyHandler(model=model,
                               log_cls=log_cls,
                               execution_id=execution_id,
                               level=level)
@@ -168,10 +167,9 @@ def create_file_log_handler(
 
 class _SQLAlchemyHandler(logging.Handler):
 
-    def __init__(self, session, engine, log_cls, execution_id, **kwargs):
+    def __init__(self, model, log_cls, execution_id, **kwargs):
         logging.Handler.__init__(self, **kwargs)
-        self._session = session
-        self._engine = engine
+        self._model = model
         self._cls = log_cls
         self._execution_id = execution_id
 
@@ -188,15 +186,7 @@ class _SQLAlchemyHandler(logging.Handler):
             # Not mandatory.
             traceback=getattr(record, 'traceback', None)
         )
-        self._session.add(log)
-
-        try:
-            self._session.commit()
-        except BaseException:
-            self._session.rollback()
-            raise
-        finally:
-            self._session.close()
+        self._model.log.put(log)
 
 
 _default_file_formatter = logging.Formatter(

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0942b2e2/aria/orchestrator/context/common.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/common.py 
b/aria/orchestrator/context/common.py
index 64ef9a4..c0047e9 100644
--- a/aria/orchestrator/context/common.py
+++ b/aria/orchestrator/context/common.py
@@ -79,13 +79,9 @@ class BaseContext(object):
             self.logger.addHandler(self._get_sqla_handler())
 
     def _get_sqla_handler(self):
-        api_kwargs = {}
-        if self._model._initiator:
-            
api_kwargs.update(self._model._initiator(**self._model._initiator_kwargs))
-        api_kwargs.update(**self._model._api_kwargs)
-        return aria_logger.create_sqla_log_handler(log_cls=modeling.models.Log,
-                                                   
execution_id=self._execution_id,
-                                                   **api_kwargs)
+        return aria_logger.create_sqla_log_handler(model=self._model,
+                                                   log_cls=modeling.models.Log,
+                                                   
execution_id=self._execution_id)
 
     def __repr__(self):
         return (

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0942b2e2/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py 
b/aria/orchestrator/workflows/executor/process.py
index e464f7d..eb1bffe 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -326,6 +326,7 @@ def _patch_session(ctx, messenger, instrument):
         original_refresh(target)
 
     def patched_commit():
+        import pydevd; pydevd.settrace('localhost', suspend=False)
         messenger.apply_tracked_changes(instrument.tracked_changes)
         instrument.clear()
 
@@ -344,6 +345,7 @@ def _patch_session(ctx, messenger, instrument):
 
 
 def _main():
+    import pydevd; pydevd.settrace('localhost', suspend=False)
     arguments_json_path = sys.argv[1]
     with open(arguments_json_path) as f:
         arguments = pickle.loads(f.read())

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0942b2e2/aria/storage/instrumentation.py
----------------------------------------------------------------------
diff --git a/aria/storage/instrumentation.py b/aria/storage/instrumentation.py
index cf2a365..14d4423 100644
--- a/aria/storage/instrumentation.py
+++ b/aria/storage/instrumentation.py
@@ -26,7 +26,15 @@ from ..storage.exceptions import StorageError
 _VERSION_ID_COL = 'version'
 _STUB = object()
 _INSTRUMENTED = {
-    _models.Node.runtime_properties: dict
+    _models.Node.runtime_properties: dict,
+
+    # Log related stuff
+    _models.Log.level: str,
+    _models.Log.msg: str,
+    _models.Log.traceback: str,
+    _models.Log.created_at: lambda date: date,
+    _models.Log.execution_fk: int,
+    _models.Log.task_fk: int,
 }
 
 
@@ -178,11 +186,18 @@ def apply_tracked_changes(tracked_changes, model):
                 for attribute_name, value in tracked_attributes.items():
                     if value.initial != value.current:
                         if not instance:
-                            instance = mapi.get(instance_id)
+                            # The object can be entirely new (Log is an 
example of this use case,
+                            # its id is None (or 'null'), thus we need to 
create it from scratch,
+                            # and not just update it.
+                            instance = mapi.model_cls() if 'null' else 
mapi.get(instance_id)
                         setattr(instance, attribute_name, value.current)
                 if instance:
                     _validate_version_id(instance, mapi)
-                    mapi.update(instance)
+                    # This follows the same logic as the same comment 
regarding 'null'
+                    if instance_id == 'null':
+                        mapi.put(instance)
+                    else:
+                        mapi.update(instance)
                     successfully_updated_changes[mapi_name][instance_id] = [
                         v.dict for v in tracked_attributes.values()]
     except BaseException:

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0942b2e2/tests/orchestrator/context/test_operation.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_operation.py 
b/tests/orchestrator/context/test_operation.py
index cdeb5fa..b7c7968 100644
--- a/tests/orchestrator/context/test_operation.py
+++ b/tests/orchestrator/context/test_operation.py
@@ -263,7 +263,7 @@ def test_plugin_workdir(ctx, thread_executor, tmpdir):
 
 
 @pytest.fixture(params=[
-    (thread.ThreadExecutor, {}),
+    # (thread.ThreadExecutor, {}),
     (process.ProcessExecutor, {'python_path': [tests.ROOT_DIR]}),
 ])
 def executor(request):
@@ -372,11 +372,12 @@ def _assert_loggins(ctx, inputs):
     assert len(op_end_log) == 1
     op_end_log = op_end_log[0]
 
-    assert op_start_log.created_at < op_end_log.created_at
+    # assert op_start_log.created_at < op_end_log.created_at
 
 
 @operation
 def logged_operation(ctx, **_):
+    import pydevd; pydevd.settrace('localhost', suspend=False)
     ctx.logger.info(ctx.task.inputs['op_start'])
     # enables to check the relation between the created_at field properly
     time.sleep(1)

Reply via email to