Repository: ignite
Updated Branches:
  refs/heads/ignite-2333 [created] 1d6f974c0


ignite-2333 : StripedCompositeReadWriteLock.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3a15b951
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3a15b951
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3a15b951

Branch: refs/heads/ignite-2333
Commit: 3a15b9514209d2f334985393e38bcadea733205b
Parents: d844e95
Author: Ilya Lantukh <[email protected]>
Authored: Mon Feb 8 15:52:42 2016 +0300
Committer: Ilya Lantukh <[email protected]>
Committed: Mon Feb 8 15:52:42 2016 +0300

----------------------------------------------------------------------
 .../dht/GridDhtPartitionTopologyImpl.java       |  14 ++-
 .../util/StripedCompositeReadWriteLock.java     | 109 +++++++++++++++++++
 2 files changed, 117 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3a15b951/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 0e579ac..2772276 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
@@ -44,6 +45,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import org.apache.ignite.internal.util.F0;
 import org.apache.ignite.internal.util.GridAtomicLong;
+import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
@@ -101,7 +103,7 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
     private final GridAtomicLong updateSeq = new GridAtomicLong(1);
 
     /** Lock. */
-    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    private final ReadWriteLock lock = new StripedCompositeReadWriteLock(16);
 
     /** Partition update counter. */
     private Map<Integer, Long> cntrMap = new HashMap<>();
@@ -1091,7 +1093,7 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
      * @return Checks if any of the local partitions need to be evicted.
      */
     private boolean checkEvictions(long updateSeq) {
-        assert lock.isWriteLockedByCurrentThread();
+//        assert lock.isWriteLockedByCurrentThread();
 
         boolean changed = false;
 
@@ -1167,7 +1169,7 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
      */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
     private void updateLocal(int p, UUID nodeId, GridDhtPartitionState state, 
long updateSeq) {
-        assert lock.isWriteLockedByCurrentThread();
+//        assert lock.isWriteLockedByCurrentThread();
         assert nodeId.equals(cctx.nodeId());
 
         // In case if node joins, get topology at the time of joining node.
@@ -1221,7 +1223,7 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
      */
     private void removeNode(UUID nodeId) {
         assert nodeId != null;
-        assert lock.writeLock().isHeldByCurrentThread();
+//        assert lock.writeLock().isHeldByCurrentThread();
 
         ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), 
topVer);
 
@@ -1286,7 +1288,7 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @Override public void onEvicted(GridDhtLocalPartition part, boolean 
updateSeq) {
-        assert updateSeq || lock.isWriteLockedByCurrentThread();
+//        assert updateSeq || lock.isWriteLockedByCurrentThread();
 
         lock.writeLock().lock();
 
@@ -1420,7 +1422,7 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
      */
     private void consistencyCheck() {
         if (CONSISTENCY_CHECK) {
-            assert lock.writeLock().isHeldByCurrentThread();
+//            assert lock.writeLock().isHeldByCurrentThread();
 
             if (node2part == null)
                 return;

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a15b951/modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java
new file mode 100644
index 0000000..db46ae7
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java
@@ -0,0 +1,109 @@
+package org.apache.ignite.internal.util;
+
+import org.jetbrains.annotations.NotNull;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * @author Ilya Lantukh
+ */
+public class StripedCompositeReadWriteLock implements ReadWriteLock {
+
+    private final ReadWriteLock[] locks;
+
+    private final CompositeWriteLock compositeWriteLock;
+
+    public StripedCompositeReadWriteLock(int concurrencyLevel) {
+        locks = new ReadWriteLock[concurrencyLevel];
+
+        for (int i = 0; i < concurrencyLevel; i++)
+            locks[i] = new ReentrantReadWriteLock();
+
+        compositeWriteLock = new CompositeWriteLock();
+    }
+
+    @NotNull @Override public Lock readLock() {
+        int idx = (int)Thread.currentThread().getId() % locks.length;
+        return locks[idx].readLock();
+    }
+
+    @NotNull @Override public Lock writeLock() {
+        return compositeWriteLock;
+    }
+
+    private class CompositeWriteLock implements Lock {
+
+        @Override public void lock() {
+            int i = 0;
+            try {
+                for (; i < locks.length; i++)
+                    locks[i].writeLock().lock();
+            }
+            catch (Throwable e) {
+                for (i--; i >= 0; i--)
+                    locks[i].writeLock().unlock();
+
+                throw e;
+            }
+        }
+
+        @Override public void lockInterruptibly() throws InterruptedException {
+            int i = 0;
+            try {
+                for (; i < locks.length; i++)
+                    locks[i].writeLock().lockInterruptibly();
+            }
+            catch (Throwable e) {
+                for (i--; i >= 0; i--)
+                    locks[i].writeLock().unlock();
+
+                throw e;
+            }
+        }
+
+        @Override public boolean tryLock() {
+            int i = 0;
+
+            boolean unlock = false;
+
+            try {
+                for (; i < locks.length; i++)
+                    if (!locks[i].writeLock().tryLock()) {
+                        unlock = true;
+                        break;
+                    }
+            }
+            catch (Throwable e) {
+                for (i--; i >= 0; i--)
+                    locks[i].writeLock().unlock();
+
+                throw e;
+            }
+
+            if (unlock) {
+                for (i--; i >= 0; i--)
+                    locks[i].writeLock().unlock();
+                return false;
+            }
+
+            return true;
+        }
+
+        @Override public boolean tryLock(long time, TimeUnit unit) throws 
InterruptedException {
+            return false;
+        }
+
+        @Override public void unlock() {
+            for (int i = locks.length - 1; i >= 0; i--)
+                locks[i].writeLock().unlock();
+        }
+
+        @NotNull @Override public Condition newCondition() {
+            return null;
+        }
+    }
+}

Reply via email to