Repository: ignite Updated Branches: refs/heads/ignite-2333 [created] 1d6f974c0
ignite-2333 : StripedCompositeReadWriteLock. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3a15b951 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3a15b951 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3a15b951 Branch: refs/heads/ignite-2333 Commit: 3a15b9514209d2f334985393e38bcadea733205b Parents: d844e95 Author: Ilya Lantukh <[email protected]> Authored: Mon Feb 8 15:52:42 2016 +0300 Committer: Ilya Lantukh <[email protected]> Committed: Mon Feb 8 15:52:42 2016 +0300 ---------------------------------------------------------------------- .../dht/GridDhtPartitionTopologyImpl.java | 14 ++- .../util/StripedCompositeReadWriteLock.java | 109 +++++++++++++++++++ 2 files changed, 117 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3a15b951/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 0e579ac..2772276 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; @@ -44,6 +45,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.util.F0; import org.apache.ignite.internal.util.GridAtomicLong; +import org.apache.ignite.internal.util.StripedCompositeReadWriteLock; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; @@ -101,7 +103,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { private final GridAtomicLong updateSeq = new GridAtomicLong(1); /** Lock. */ - private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private final ReadWriteLock lock = new StripedCompositeReadWriteLock(16); /** Partition update counter. */ private Map<Integer, Long> cntrMap = new HashMap<>(); @@ -1091,7 +1093,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { * @return Checks if any of the local partitions need to be evicted. */ private boolean checkEvictions(long updateSeq) { - assert lock.isWriteLockedByCurrentThread(); +// assert lock.isWriteLockedByCurrentThread(); boolean changed = false; @@ -1167,7 +1169,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) private void updateLocal(int p, UUID nodeId, GridDhtPartitionState state, long updateSeq) { - assert lock.isWriteLockedByCurrentThread(); +// assert lock.isWriteLockedByCurrentThread(); assert nodeId.equals(cctx.nodeId()); // In case if node joins, get topology at the time of joining node. @@ -1221,7 +1223,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { */ private void removeNode(UUID nodeId) { assert nodeId != null; - assert lock.writeLock().isHeldByCurrentThread(); +// assert lock.writeLock().isHeldByCurrentThread(); ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer); @@ -1286,7 +1288,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** {@inheritDoc} */ @Override public void onEvicted(GridDhtLocalPartition part, boolean updateSeq) { - assert updateSeq || lock.isWriteLockedByCurrentThread(); +// assert updateSeq || lock.isWriteLockedByCurrentThread(); lock.writeLock().lock(); @@ -1420,7 +1422,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { */ private void consistencyCheck() { if (CONSISTENCY_CHECK) { - assert lock.writeLock().isHeldByCurrentThread(); +// assert lock.writeLock().isHeldByCurrentThread(); if (node2part == null) return; http://git-wip-us.apache.org/repos/asf/ignite/blob/3a15b951/modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java new file mode 100644 index 0000000..db46ae7 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java @@ -0,0 +1,109 @@ +package org.apache.ignite.internal.util; + +import org.jetbrains.annotations.NotNull; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * @author Ilya Lantukh + */ +public class StripedCompositeReadWriteLock implements ReadWriteLock { + + private final ReadWriteLock[] locks; + + private final CompositeWriteLock compositeWriteLock; + + public StripedCompositeReadWriteLock(int concurrencyLevel) { + locks = new ReadWriteLock[concurrencyLevel]; + + for (int i = 0; i < concurrencyLevel; i++) + locks[i] = new ReentrantReadWriteLock(); + + compositeWriteLock = new CompositeWriteLock(); + } + + @NotNull @Override public Lock readLock() { + int idx = (int)Thread.currentThread().getId() % locks.length; + return locks[idx].readLock(); + } + + @NotNull @Override public Lock writeLock() { + return compositeWriteLock; + } + + private class CompositeWriteLock implements Lock { + + @Override public void lock() { + int i = 0; + try { + for (; i < locks.length; i++) + locks[i].writeLock().lock(); + } + catch (Throwable e) { + for (i--; i >= 0; i--) + locks[i].writeLock().unlock(); + + throw e; + } + } + + @Override public void lockInterruptibly() throws InterruptedException { + int i = 0; + try { + for (; i < locks.length; i++) + locks[i].writeLock().lockInterruptibly(); + } + catch (Throwable e) { + for (i--; i >= 0; i--) + locks[i].writeLock().unlock(); + + throw e; + } + } + + @Override public boolean tryLock() { + int i = 0; + + boolean unlock = false; + + try { + for (; i < locks.length; i++) + if (!locks[i].writeLock().tryLock()) { + unlock = true; + break; + } + } + catch (Throwable e) { + for (i--; i >= 0; i--) + locks[i].writeLock().unlock(); + + throw e; + } + + if (unlock) { + for (i--; i >= 0; i--) + locks[i].writeLock().unlock(); + return false; + } + + return true; + } + + @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { + return false; + } + + @Override public void unlock() { + for (int i = locks.length - 1; i >= 0; i--) + locks[i].writeLock().unlock(); + } + + @NotNull @Override public Condition newCondition() { + return null; + } + } +}
