Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-79-concurrent-storage-modifications 1b03bf211 -> b77070899


tests fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/b7707089
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/b7707089
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/b7707089

Branch: refs/heads/ARIA-79-concurrent-storage-modifications
Commit: b77070899e9f2113973d9a65fdc9a95b35d996fb
Parents: 1b03bf2
Author: mxmrlv <mxm...@gmail.com>
Authored: Thu Feb 16 13:06:35 2017 +0200
Committer: mxmrlv <mxm...@gmail.com>
Committed: Thu Feb 16 13:06:35 2017 +0200

----------------------------------------------------------------------
 aria/orchestrator/workflows/executor/process.py |  5 ++
 ...process_executor_concurrent_modifications.py | 74 ++++++++------------
 2 files changed, 34 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b7707089/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py 
b/aria/orchestrator/workflows/executor/process.py
index 319982e..2c563a8 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -366,6 +366,7 @@ def _main():
     storage_type.remove_mutable_association_listener()
 
     with instrumentation.track_changes() as instrument:
+        # import pydevd; pydevd.settrace('localhost')
         try:
             ctx = serialization.operation_context_from_dict(context_dict)
             _patch_session(ctx=ctx, messenger=messenger, instrument=instrument)
@@ -376,6 +377,10 @@ def _main():
             task_func(ctx=ctx, **operation_inputs)
             messenger.succeeded(tracked_changes=instrument.tracked_changes)
         except BaseException as e:
+            # import traceback
+            # with open('/home/maxim/Desktop/tmp_log', 'wr+') as f:
+            #     traceback.print_exc(file=f)
+
             messenger.failed(exception=e, 
tracked_changes=instrument.tracked_changes)
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b7707089/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
----------------------------------------------------------------------
diff --git 
a/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
 
b/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
index 7c54bc5..e46921e 100644
--- 
a/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
+++ 
b/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
@@ -12,7 +12,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 import time
 import json
 
@@ -33,34 +32,34 @@ from tests import mock
 from tests import storage
 
 
-def test_concurrent_modification_on_task_succeeded(context, executor, 
shared_file):
-    _test(context, executor, shared_file, _test_task_succeeded, 
expected_failure=True)
+def test_concurrent_modification_on_task_succeeded(context, executor, 
lock_files):
+    _test(context, executor, lock_files, _test_task_succeeded, 
expected_failure=True)
 
 
 @operation
-def _test_task_succeeded(ctx, shared_file, key, first_value, second_value):
-    _concurrent_update(shared_file, ctx.node_instance, key, first_value, 
second_value)
+def _test_task_succeeded(ctx, lock_files, key, first_value, second_value):
+    _concurrent_update(lock_files, ctx.node_instance, key, first_value, 
second_value)
 
 
-def test_concurrent_modification_on_task_failed(context, executor, 
shared_file):
-    _test(context, executor, shared_file, _test_task_failed, 
expected_failure=True)
+def test_concurrent_modification_on_task_failed(context, executor, lock_files):
+    _test(context, executor, lock_files, _test_task_failed, 
expected_failure=True)
 
 
 @operation
-def _test_task_failed(ctx, shared_file, key, first_value, second_value):
-    first = _concurrent_update(shared_file, ctx.node_instance, key, 
first_value, second_value)
+def _test_task_failed(ctx, lock_files, key, first_value, second_value):
+    first = _concurrent_update(lock_files, ctx.node_instance, key, 
first_value, second_value)
     if not first:
         raise RuntimeError('MESSAGE')
 
 
-def test_concurrent_modification_on_update_and_refresh(context, executor, 
shared_file):
-    _test(context, executor, shared_file, _test_update_and_refresh, 
expected_failure=False)
+def test_concurrent_modification_on_update_and_refresh(context, executor, 
lock_files):
+    _test(context, executor, lock_files, _test_update_and_refresh, 
expected_failure=False)
 
 
 @operation
-def _test_update_and_refresh(ctx, shared_file, key, first_value, second_value):
+def _test_update_and_refresh(ctx, lock_files, key, first_value, second_value):
     node_instance = ctx.node_instance
-    first = _concurrent_update(shared_file, node_instance, key, first_value, 
second_value)
+    first = _concurrent_update(lock_files, node_instance, key, first_value, 
second_value)
     if not first:
         try:
             ctx.model.node_instance.update(node_instance)
@@ -71,16 +70,15 @@ def _test_update_and_refresh(ctx, shared_file, key, 
first_value, second_value):
             raise RuntimeError('Unexpected')
 
 
-def _test(context, executor, shared_file, func, expected_failure):
+def _test(context, executor, lock_files, func, expected_failure):
     def _node_instance(ctx):
         return 
ctx.model.node_instance.get_by_name(mock.models.DEPENDENCY_NODE_INSTANCE_NAME)
 
-    shared_file.write(json.dumps({}))
     key = 'key'
     first_value = 'value1'
     second_value = 'value2'
     inputs = {
-        'shared_file': str(shared_file),
+        'lock_files': lock_files,
         'key': key,
         'first_value': first_value,
         'second_value': second_value
@@ -130,45 +128,31 @@ def context(tmpdir):
 
 
 @pytest.fixture
-def shared_file(tmpdir):
-    return tmpdir.join('shared_file')
-
+def lock_files(tmpdir):
+    return str(tmpdir.join('first_lock_file')), 
str(tmpdir.join('second_lock_file'))
 
-def _concurrent_update(shared_file, node_instance, key, first_value, 
second_value):
-    def lock():
-        return fasteners.InterProcessLock(shared_file)
 
-    def get(key):
-        with open(shared_file) as f:
-            return json.load(f).get(key)
+def _concurrent_update(lock_files, node_instance, key, first_value, 
second_value):
 
-    def set(key):
-        with open(shared_file) as f:
-            content = json.load(f)
-        content[key] = True
-        with open(shared_file, 'wb') as f:
-            json.dump(content, f)
+    locker1 = fasteners.InterProcessLock(lock_files[0])
+    locker2 = fasteners.InterProcessLock(lock_files[1])
 
-    def wait_for(key):
-        while True:
-            time.sleep(0.01)
-            with lock():
-                if get(key):
-                    break
-
-    with lock():
-        first = not get('first_in')
-        set('first_in' if first else 'second_in')
+    first = locker1.acquire(blocking=False)
 
     if first:
-        wait_for('second_in')
+        # Give chance for both processes to acquire locks
+        while locker2.acquire(blocking=False):
+            locker2.release()
+            time.sleep(0.1)
+    else:
+        locker2.acquire()
 
     node_instance.runtime_properties[key] = first_value if first else 
second_value
 
     if first:
-        with lock():
-            set('first_out')
+        locker1.release()
     else:
-        wait_for('first_out')
+        with locker1:
+            locker2.release()
 
     return first

Reply via email to