ignite-2333 : Fixed review remarks.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/13b3f4b9 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/13b3f4b9 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/13b3f4b9 Branch: refs/heads/ignite-2333 Commit: 13b3f4b961c2822cdfc70e1259d0f238d688ab57 Parents: f84d023 Author: Ilya Lantukh <[email protected]> Authored: Wed Feb 24 11:42:15 2016 +0300 Committer: Ilya Lantukh <[email protected]> Committed: Wed Feb 24 11:42:15 2016 +0300 ---------------------------------------------------------------------- .../util/StripedCompositeReadWriteLock.java | 149 ++++++++++++++++--- 1 file changed, 130 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/13b3f4b9/modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java index ae95f6c..1a8ce2e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java @@ -1,54 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.ignite.internal.util; import org.jetbrains.annotations.NotNull; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; /** - * @author Ilya Lantukh + * ReadWriteLock with striping mechanics. + * Compared to {@link ReentrantReadWriteLock} it has slightly improved performance of {@link ReadWriteLock#readLock()} + * operations at the cost of {@link ReadWriteLock#writeLock()} operations and memory consumption. + * It also supports reentrancy semantics like {@link ReentrantReadWriteLock}. */ public class StripedCompositeReadWriteLock implements ReadWriteLock { - private final PaddedReentrantReadWriteLock[] locks; + /** + * Locks. + */ + private final ReentrantReadWriteLock[] locks; + /** + * Composite write lock. + */ private final CompositeWriteLock compositeWriteLock; - public StripedCompositeReadWriteLock(int concurrencyLevel) { - locks = new PaddedReentrantReadWriteLock[concurrencyLevel]; - - for (int i = 0; i < concurrencyLevel; i++) + /** + * Atomic field updated for {@link #counter}. + */ + private static final AtomicIntegerFieldUpdater<StripedCompositeReadWriteLock> COUNTER_UPD = + AtomicIntegerFieldUpdater.newUpdater(StripedCompositeReadWriteLock.class, "counter"); + + /** + * Counter of {@link #readLock()} calls. + */ + @SuppressWarnings("UnusedDeclaration") + private volatile int counter = 0; + + /** + * Creates a new instance with given concurrency level. + * + * @param concurrencyLvl Number of internal read locks. + */ + public StripedCompositeReadWriteLock(int concurrencyLvl) { + locks = new PaddedReentrantReadWriteLock[concurrencyLvl]; + + for (int i = 0; i < concurrencyLvl; i++) locks[i] = new PaddedReentrantReadWriteLock(); compositeWriteLock = new CompositeWriteLock(); } + /** {@inheritDoc} */ @NotNull @Override public Lock readLock() { - int idx = (int)Thread.currentThread().getId() % locks.length; + int idx = Math.abs(COUNTER_UPD.getAndIncrement(this) % locks.length); + return locks[idx].readLock(); } + /** {@inheritDoc} */ @NotNull @Override public Lock writeLock() { return compositeWriteLock; } + /** + * {@inheritDoc} + * + * Compared to {@link ReentrantReadWriteLock}, this class contains padding to ensure that different instances will + * always be located in different CPU cache lines. + */ private static class PaddedReentrantReadWriteLock extends ReentrantReadWriteLock { + /** + * + */ private static final long serialVersionUID = 0L; - long p0, p1, p2, p3, p4, p5, p6, p7; + /** + * Padding. + */ + private long p0, p1, p2, p3, p4, p5, p6, p7; } + /** + * {@inheritDoc} + * + * Methods of this class will lock all {@link #locks}. + */ private class CompositeWriteLock implements Lock { + /** {@inheritDoc} */ @Override public void lock() { + try { + lock(false); + } catch (InterruptedException e) { + // This should never happen. + throw new RuntimeException(e); + } + } + + /** {@inheritDoc} */ + @Override public void lockInterruptibly() throws InterruptedException { + lock(true); + } + + /** + * @param interruptibly true if {@link Thread#interrupt()} should be considered. + * @throws InterruptedException + */ + private void lock(boolean interruptibly) throws InterruptedException { int i = 0; try { for (; i < locks.length; i++) - locks[i].writeLock().lock(); + if (interruptibly) + locks[i].writeLock().lockInterruptibly(); + else + locks[i].writeLock().lock(); } catch (Throwable e) { for (i--; i >= 0; i--) @@ -58,11 +144,18 @@ public class StripedCompositeReadWriteLock implements ReadWriteLock { } } - @Override public void lockInterruptibly() throws InterruptedException { + /** {@inheritDoc} */ + @Override public boolean tryLock() { int i = 0; + + boolean unlock = false; + try { for (; i < locks.length; i++) - locks[i].writeLock().lockInterruptibly(); + if (!locks[i].writeLock().tryLock()) { + unlock = true; + break; + } } catch (Throwable e) { for (i--; i >= 0; i--) @@ -70,19 +163,38 @@ public class StripedCompositeReadWriteLock implements ReadWriteLock { throw e; } + + if (unlock) { + for (i--; i >= 0; i--) + locks[i].writeLock().unlock(); + + return false; + } + + return true; } - @Override public boolean tryLock() { + /** {@inheritDoc} */ + @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { + long timeLeft = unit.convert(time, TimeUnit.NANOSECONDS); + + long prevTime = System.nanoTime(); + int i = 0; boolean unlock = false; try { - for (; i < locks.length; i++) - if (!locks[i].writeLock().tryLock()) { + for (; i < locks.length; i++) { + if (timeLeft < 0 || !locks[i].writeLock().tryLock(timeLeft, TimeUnit.NANOSECONDS)) { unlock = true; break; } + + long currentTime = System.nanoTime(); + timeLeft -= (currentTime - prevTime); + prevTime = currentTime; + } } catch (Throwable e) { for (i--; i >= 0; i--) @@ -94,23 +206,22 @@ public class StripedCompositeReadWriteLock implements ReadWriteLock { if (unlock) { for (i--; i >= 0; i--) locks[i].writeLock().unlock(); + return false; } return true; } - @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { - throw new RuntimeException("Not supported"); - } - + /** {@inheritDoc} */ @Override public void unlock() { for (int i = locks.length - 1; i >= 0; i--) locks[i].writeLock().unlock(); } + /** {@inheritDoc} */ @NotNull @Override public Condition newCondition() { - throw new RuntimeException("Not supported"); + throw new UnsupportedOperationException(); } } }
