Ignite-642: adds fairness parameter;
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6bc6ce9f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6bc6ce9f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6bc6ce9f Branch: refs/heads/ignite-642 Commit: 6bc6ce9f09eb3b2af408aa1e2ec4a9e2e7fb0b35 Parents: 7a21976 Author: Vladisav Jelisavcic <[email protected]> Authored: Fri Apr 8 09:27:30 2016 +0200 Committer: vladisav <[email protected]> Committed: Fri Apr 8 09:27:30 2016 +0200 ---------------------------------------------------------------------- .../datastructures/IgniteLockExample.java | 6 +- .../src/main/java/org/apache/ignite/Ignite.java | 3 +- .../main/java/org/apache/ignite/IgniteLock.java | 68 +++- .../apache/ignite/internal/IgniteKernal.java | 3 +- .../datastructures/DataStructuresProcessor.java | 8 +- .../datastructures/GridCacheLockImpl.java | 316 +++++++++++++-- .../datastructures/GridCacheLockState.java | 75 +++- .../IgniteClientReconnectAtomicsTest.java | 10 +- ...eAbstractDataStructuresFailoverSelfTest.java | 76 +++- .../IgniteClientDataStructuresAbstractTest.java | 12 +- .../IgniteDataStructureUniqueNameTest.java | 2 +- .../IgniteLockAbstractSelfTest.java | 400 +++++++++++++++++-- .../local/IgniteLocalLockSelfTest.java | 6 +- .../cache/GridCacheDataStructuresLoadTest.java | 4 +- .../ignite/testframework/junits/IgniteMock.java | 1 + .../junits/multijvm/IgniteProcessProxy.java | 2 +- .../org/apache/ignite/IgniteSpringBean.java | 3 +- 17 files changed, 880 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/6bc6ce9f/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteLockExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteLockExample.java b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteLockExample.java index 3ff1f02..e6c19c3 100644 --- a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteLockExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteLockExample.java @@ -74,7 +74,7 @@ public class IgniteLockExample { final String reentrantLockName = UUID.randomUUID().toString(); // Initialize lock. - IgniteLock lock = ignite.reentrantLock(reentrantLockName, true, true); + IgniteLock lock = ignite.reentrantLock(reentrantLockName, true, false, true); // Init distributed cache. IgniteCache<String, Integer> cache = ignite.getOrCreateCache(CACHE_NAME); @@ -152,7 +152,7 @@ public class IgniteLockExample { @Override public void run() { System.out.println("Producer started. "); - IgniteLock lock = Ignition.ignite().reentrantLock(reentrantLockName, true, true); + IgniteLock lock = Ignition.ignite().reentrantLock(reentrantLockName, true, false, true); // Condition to wait on when queue is full. IgniteCondition notFull = lock.getOrCreateCondition(NOT_FULL); @@ -231,7 +231,7 @@ public class IgniteLockExample { Ignite g = Ignition.ignite(); - IgniteLock lock = g.reentrantLock(reentrantLockName, true, true); + IgniteLock lock = g.reentrantLock(reentrantLockName, true, false, true); // Condition to wait on when queue is full. IgniteCondition notFull = lock.getOrCreateCondition(NOT_FULL); http://git-wip-us.apache.org/repos/asf/ignite/blob/6bc6ce9f/modules/core/src/main/java/org/apache/ignite/Ignite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/Ignite.java b/modules/core/src/main/java/org/apache/ignite/Ignite.java index d8b1e1e..b62672e 100644 --- a/modules/core/src/main/java/org/apache/ignite/Ignite.java +++ b/modules/core/src/main/java/org/apache/ignite/Ignite.java @@ -474,11 +474,12 @@ public interface Ignite extends AutoCloseable { * if any node leaves topology, all locks already acquired by that node are silently released * and become available for other nodes to acquire. If flag is {@code false} then * all threads on other nodes waiting to acquire lock are interrupted. + * @param fair If {@code True}, fair lock will be created. * @param create Boolean flag indicating whether data structure should be created if does not exist. * @return ReentrantLock for the given name. * @throws IgniteException If reentrant lock could not be fetched or created. */ - public IgniteLock reentrantLock(String name, boolean failoverSafe, boolean create) + public IgniteLock reentrantLock(String name, boolean failoverSafe, boolean fair, boolean create) throws IgniteException; /** http://git-wip-us.apache.org/repos/asf/ignite/blob/6bc6ce9f/modules/core/src/main/java/org/apache/ignite/IgniteLock.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteLock.java b/modules/core/src/main/java/org/apache/ignite/IgniteLock.java index e5aeb27..d24a45e 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteLock.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteLock.java @@ -29,7 +29,66 @@ import java.util.concurrent.locks.Lock; * Distributed reentrant lock provides functionality similar to {@code java.util.concurrent.ReentrantLock}. * <h1 class="header">Creating Distributed ReentrantLock</h1> * Instance of cache reentrant lock can be created by calling the following method: - * {@link Ignite#reentrantLock(String, boolean, boolean)}. + * {@link Ignite#reentrantLock(String, boolean, boolean, boolean)}. + * <h1 class="header">Protection from failover</h1> + * Ignite lock can automatically recover from node failure. + * <ul> + * <li>If failoverSafe flag is set to true upon creation, + * in case a node owning the lock fails, lock will be automatically released and become available for threads on other + * nodes to acquire. No exception will be thrown. + * <li>If failoverSafe flag is set to false upon creation, + * in case a node owning the lock fails, {@code IgniteException} will be thrown on every other node attempting to + * perform any operation on this lock. No automatic recovery will be attempted, + * and lock will be marked as broken (i.e. unusable), which can be checked using the method #isBroken(). + * Broken lock cannot be reused again. + * </ul> + * + * <h1 class="header">Implementation issues</h1> + * Ignite lock comes in two flavours: fair and non-fair. Non-fair lock assumes no ordering should be imposed + * on acquiring threads; in case of contention, threads from all nodes compete for the lock once the lock is released. + * In most cases this is the desired behaviour. However, in some cases, using the non-fair lock can lead to uneven load + * distribution among nodes. + * Fair lock solves this issue by imposing strict FIFO ordering policy at a cost of an additional transaction. + * This ordering does not guarantee fairness of thread scheduling (similar to {@code java.util.concurrent.ReentrantLock}). + * Thus, one of many threads on any node using a fair lock may obtain it multiple times in succession while other + * active threads are not progressing and not currently holding the lock. Also note that the untimed tryLock method + * does not honor the fairness setting. It will succeed if the lock is available even if other threads are waiting. + * + * </p> + * As a rule of thumb, whenever there is a reasonable time window between successive calls to release and acquire + * the lock, non-fair lock should be preferred: + * + * <pre> {@code + * while(someCondition){ + * // do anything + * lock.lock(); + * try{ + * // ... + * } + * finally { + * lock.unlock(); + * } + * } + * }</pre> + * + * If successive calls to release/acquire are following immediately, + * e.g. + * + * <pre> {@code + * while(someCondition){ + * lock.lock(); + * try { + * // do something + * } + * finally { + * lock.unlock(); + * } + * } + * }</pre> + * + * using the fair lock is reasonable in order to allow even distribution of load among nodes + * (although overall throughput may be lower due to increased overhead). + * */ public interface IgniteLock extends Lock, Closeable { @@ -377,6 +436,13 @@ public interface IgniteLock extends Lock, Closeable { public boolean isFailoverSafe(); /** + * Returns {@code true} if this lock is fair. Fairness flag can only be set on lock creation. + * + * @return {@code true} if this reentrant lock has fairness flag set true. + */ + public boolean isFair(); + + /** * Returns true if any node that owned the locked failed before releasing the lock. * * @return true if any node failed while owning the lock. http://git-wip-us.apache.org/repos/asf/ignite/blob/6bc6ce9f/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index c267a4a..eb92f9e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -2997,12 +2997,13 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { @Nullable @Override public IgniteLock reentrantLock( String name, boolean failoverSafe, + boolean fair, boolean create ) { guard(); try { - return ctx.dataStructures().reentrantLock(name, failoverSafe, create); + return ctx.dataStructures().reentrantLock(name, failoverSafe, fair, create); } catch (IgniteCheckedException e) { throw U.convertException(e); http://git-wip-us.apache.org/repos/asf/ignite/blob/6bc6ce9f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index 71a0f1d..773dfe2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -1347,15 +1347,17 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { /** * Gets or creates reentrant lock. If reentrant lock is not found in cache, - * it is created using provided name and count parameter. + * it is created using provided name, failover mode, and fairness mode parameters. * * @param name Name of the reentrant lock. + * @param failoverSafe Flag indicating behaviour in case of failure. + * @param fair Flag indicating fairness policy of this lock. * @param create If {@code true} reentrant lock will be created in case it is not in cache. * @return ReentrantLock for the given name or {@code null} if it is not found and * {@code create} is false. * @throws IgniteCheckedException If operation failed. */ - public IgniteLock reentrantLock(final String name, final boolean failoverSafe, final boolean create) + public IgniteLock reentrantLock(final String name, final boolean failoverSafe, final boolean fair, final boolean create) throws IgniteCheckedException { A.notNull(name, "name"); @@ -1387,7 +1389,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { return null; if (val == null) { - val = new GridCacheLockState(0, dsCacheCtx.nodeId(), 0, failoverSafe); + val = new GridCacheLockState(0, dsCacheCtx.nodeId(), 0, failoverSafe, fair); dsView.put(key, val); } http://git-wip-us.apache.org/repos/asf/ignite/blob/6bc6ce9f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java index 6c80ca1..9aa610b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java @@ -23,11 +23,14 @@ import java.io.ObjectInput; import java.io.ObjectOutput; import java.util.Date; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -44,10 +47,12 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.transactions.TransactionRollbackException; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -143,6 +148,12 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable /** FailoverSafe flag. */ private final boolean failoverSafe; + /** Fairness flag. */ + private final boolean fair; + + /** Threads that are waiting on this lock. */ + private Set<Long> waitingThreads; + protected Sync(GridCacheLockState state) { setState(state.get()); @@ -157,6 +168,10 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable outgoingSignals = new HashMap<>(); failoverSafe = state.isFailoverSafe(); + + fair = state.isFair(); + + waitingThreads = new ConcurrentSkipListSet<Long>(); } /** */ @@ -221,28 +236,40 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable this.currentOwnerThreadId = tempThreadID; } + // Interrupt any future call to acquire/release on this sync object. + interruptAll = true; + // Interrupt any ongoing transactions. for(Thread t: getQueuedThreads()){ t.interrupt(); } - - // Interrupt any future call to acquire/release on this sync object. - interruptAll = true; } /** Check if lock is in correct state (i.e. not broken in non-failoversafe mode), * if not throw {@linkplain IgniteInterruptedException} */ - private void validate(final boolean checkInterrupt){ - // Interrupted flag is not always cleared - // (e.g. lock() doesn't throw exception and doesn't clear interrupted) + private void validate(final boolean throwInterrupt){ + // Interrupted flag shouldn't be always cleared + // (e.g. lock() method doesn't throw exception and doesn't clear interrupted) // but should be cleared if this method is called after lock breakage or node stop. // If interruptAll is set, exception is thrown anyway. - boolean clearInterrupt = checkInterrupt || interruptAll; + boolean interrupted = Thread.currentThread().isInterrupted(); + + // Clear interrupt flag. + if(throwInterrupt || interruptAll){ + Thread.interrupted(); + } - if((clearInterrupt && Thread.interrupted()) || interruptAll){ + if(interruptAll){ throw new IgniteInterruptedException("Lock broken (possible reason: node stopped" + " or node owning lock failed while in non-failoversafe mode)."); } + + // Global queue should be synchronized only if exception should be thrown. + if(fair && (throwInterrupt && interrupted) && !interruptAll){ + this.synchronizeQueue(true, Thread.currentThread()); + + throw new IgniteInterruptedException("Lock is interrupted."); + } } /** @@ -316,10 +343,37 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable this.isBroken = isBroken; } + /** */ + protected synchronized boolean hasPredecessor(LinkedList<UUID> nodes){ + if(!fair) + return false; + + for (Iterator<UUID> it = nodes.iterator(); it.hasNext(); ) { + UUID node = it.next(); + + if (ctx.discovery().node(node) == null) { + it.remove(); + + continue; + } + + if (node.equals(thisNode)) { + return false; + } + + return true; + } + + return false; + } + /** - * Performs non-fair tryLock. + * Performs tryLock. + * @param acquires Number of permits to acquire. + * @param fair Fairness parameter. + * @result true if succeeded, false otherwise. */ - final boolean nonfairTryAcquire(int acquires) { + final boolean tryAcquire(final int acquires, final boolean fair) { // If broken in non-failoversafe mode, exit immediately. if(interruptAll){ return true; @@ -327,18 +381,31 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable final Thread current = Thread.currentThread(); - final UUID currentOwner = this.currentOwnerNode; + boolean failed = false; int c = getState(); + // Wait for lock to reach stable state. + while(c != 0){ + UUID currentOwner = this.currentOwnerNode; + + if(currentOwner != null) { + failed = ctx.discovery().node(currentOwner) == null; + + break; + } + + c = getState(); + } + // Check if lock is released or current owner failed. - if(c == 0 || ctx.discovery().node(currentOwner) == null){ - if (compareAndSetGlobalState(0, acquires, current.getId())){ + if(c == 0 || failed){ + if (compareAndSetGlobalState(0, acquires, current, fair)){ // Not used for synchronization (we use ThreadID), but updated anyway. setExclusiveOwnerThread(current); - while(getState() != acquires) + while(!isHeldExclusively() && !interruptAll) Thread.yield(); return true; @@ -355,6 +422,10 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable return true; } + if(fair && !isQueued(current)){ + synchronizeQueue(false, current); + } + return false; } @@ -362,12 +433,12 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable * Performs lock. */ final void lock() { - acquire(1); + acquire(1); } /** {@inheritDoc} */ protected final boolean tryAcquire(int acquires) { - return nonfairTryAcquire(acquires); + return tryAcquire(acquires, fair); } /** {@inheritDoc} */ @@ -379,12 +450,16 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable } // If broken in non-failoversafe mode, exit immediately. - if(interruptAll) + if(interruptAll) { return true; + } int c = getState() - releases; if (!isHeldExclusively()) { + log.error("Lock.unlock() is called in illegal state [callerNodeId=" + thisNode + ", ownerNodeId=" + + currentOwnerNode + ", callerThreadId=" + Thread.currentThread().getId() + ", ownerThreadId=" + + currentOwnerThreadId + ", lockState=" + getState() + "]"); throw new IllegalMonitorStateException(); } @@ -395,7 +470,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable setGlobalState(0, processAwait(), processSignal()); - while(getState() != 0) + while(isHeldExclusively() && !interruptAll) Thread.yield(); } else @@ -438,7 +513,8 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable /** * This method is used for synchronizing the reentrant lock state across all nodes. */ - protected boolean compareAndSetGlobalState(final int expVal, final int newVal, final long newThreadID) { + protected boolean compareAndSetGlobalState(final int expVal, final int newVal, + final Thread newThread, final boolean bargingProhibited) { try { return CU.outTx( retryTopologySafe(new Callable<Boolean>() { @@ -450,26 +526,155 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable if (val == null) throw new IgniteCheckedException("Failed to find reentrant lock with given name: " + name); - if (val.get() == expVal || ctx.discovery().node(val.getId()) == null) { - val.set(newVal); + final long newThreadID = newThread.getId(); + + LinkedList<UUID> nodes = val.getNodes(); + + // Barging is prohibited in fair mode unless tryLock() is called. + if(!(bargingProhibited && hasPredecessor(nodes))){ + if (val.get() == expVal || ctx.discovery().node(val.getId()) == null) { + val.set(newVal); + + val.setId(thisNode); + + val.setThreadId(newThreadID); + + val.setSignals(null); + + // This node is already in queue, except in cases where this is the only node + // or this is a call to tryLock(), in which case barging is ok. + // Queue is only updated if this is fair lock. + if(val.isFair() && (nodes.isEmpty() || !bargingProhibited)){ + nodes.addFirst(thisNode); + } + + val.setNodes(nodes); + + val.setChanged(true); + + lockView.put(key, val); + + tx.commit(); - val.setId(thisNode); + return true; + } + } - val.setThreadId(newThreadID); + return false; + } + catch (Error | Exception e) { + if(interruptAll){ + log.info("Node is stopped (or lock is broken in non-failover safe mode)," + + " aborting transaction."); - val.setSignals(null); + // Return immediately, exception will be thrown later. + return true; + } + else{ + if(Thread.currentThread().isInterrupted()){ + log.info("Thread is interrupted while attempting to acquire lock."); + + // Delegate the decision to throw InterruptedException to the AQS. + sync.release(0); + + return false; + } + + U.error(log, "Failed to compare and set: " + this, e); + } + + throw e; + } + } + }), + ctx + ); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** + * This method is used for synchronizing the number of acquire attempts on this lock across all nodes. + * + * @param cancelled true if acquire attempt is cancelled, false if acquire attempt should be registered. + */ + protected boolean synchronizeQueue(final boolean cancelled, final Thread thread) { + final AtomicBoolean interrupted = new AtomicBoolean(false); + + try { + return CU.outTx( + retryTopologySafe(new Callable<Boolean>() { + @Override public Boolean call() throws Exception { + try (IgniteInternalTx tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) { + + GridCacheLockState val = lockView.get(key); + + if (val == null) + throw new IgniteCheckedException("Failed to find reentrant lock with given name: " + name); + + LinkedList<UUID> nodes = val.getNodes(); + + if(!cancelled){ + nodes.add(thisNode); + + val.setChanged(false); lockView.put(key, val); tx.commit(); + // Keep track of all threads that are queued in global queue. + // We deliberately don't use #sync.isQueued(), because AQS + // cancel threads immediately after throwing interrupted exception. + sync.waitingThreads.add(thread.getId()); + return true; } + else { + if (sync.waitingThreads.contains(thread.getId())) { + // Update other nodes if this is the first node in queue. + val.setChanged(nodes.lastIndexOf(thisNode) == 0); + + nodes.removeLastOccurrence(thisNode); + + lockView.put(key, val); + + tx.commit(); + + sync.waitingThreads.remove(thread.getId()); + + return true; + } + } return false; } catch (Error | Exception e) { - U.error(log, "Failed to compare and set: " + this, e); + if(interruptAll){ + log.info("Node is stopped (or lock is broken in non-failover safe mode)," + + " aborting transaction."); + + // Abort this attempt to synchronize queue and start another one, + // that will return immediately. + sync.release(0); + + return false; + } + else{ + // If thread got interrupted, abort this attempt to synchronize queue, + // clear interrupt flag and try again, and let the AQS decide + // whether to throw an exception or ignore it. + if(Thread.interrupted() || X.hasCause(e, InterruptedException.class)){ + interrupted.set(true); + + throw new TransactionRollbackException("Thread got interrupted " + + "while synchronizing the global queue, retrying. "); + } + + U.error(log, "Failed to synchronize global lock queue: " + this, e); + } throw e; } @@ -481,6 +686,12 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable catch (IgniteCheckedException e) { throw U.convertException(e); } + finally { + // Restore interrupt flag and let AQS decide what to do with it. + if(interrupted.get()) { + Thread.currentThread().interrupt(); + } + } } /** @@ -503,9 +714,20 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable val.set(newVal); - val.setId(null); + if(newVal == 0) { + val.setId(null); - val.setThreadId(LOCK_FREE); + val.setThreadId(LOCK_FREE); + } + + val.setChanged(true); + + // If this lock is fair, remove this node from queue. + if(val.isFair() && newVal == 0) { + UUID removedNode = val.getNodes().removeFirst(); + + assert(thisNode.equals(removedNode)); + } // Get global condition queue. Map<String, LinkedList<UUID>> condMap = val.getConditionMap(); @@ -585,7 +807,15 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable return true; } catch (Error | Exception e) { - U.error(log, "Failed to compare and set: " + this, e); + if(interruptAll){ + log.info("Node is stopped (or lock is broken in non-failover safe mode)," + + " aborting transaction."); + + return true; + } + else{ + U.error(log, "Failed to release: " + this, e); + } throw e; } @@ -891,21 +1121,25 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable try { updateLock.lock(); + // If this update is a result of unsuccessful acquire in fair mode, no local update should be done. + if(!val.isChanged()) + return; + // Check if update came from this node. boolean local = sync.isLockedLocally(val.getId()); // Process any incoming signals. boolean incomingSignals = sync.checkIncomingSignals(val); + // Update permission count. + sync.setPermits(val.get()); + // Update owner's node id. sync.setCurrentOwnerNode(val.getId()); // Update owner's thread id. sync.setCurrentOwnerThread(val.getThreadId()); - // Update permission count. - sync.setPermits(val.get()); - // Check if any threads waiting on this node need to be notified. if ((incomingSignals || sync.getPermits() == 0) && !local) { // Try to notify any waiting threads. @@ -928,10 +1162,10 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable if(!sync.failoverSafe){ sync.interruptAll(); } - - // Try to notify any waiting threads. - sync.release(0); } + + // Try to notify any waiting threads. + sync.release(0); } finally { updateLock.unlock(); @@ -993,6 +1227,9 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable throw U.convertException(e); } catch (InterruptedException e) { + if(sync.fair) + sync.synchronizeQueue(true, Thread.currentThread()); + throw new IgniteInterruptedException(e); } finally { @@ -1007,7 +1244,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable try{ initializeReentrantLock(); - boolean result = sync.nonfairTryAcquire(1); + boolean result = sync.tryAcquire(1, false); sync.validate(false); @@ -1038,6 +1275,9 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable throw U.convertException(e); } catch (InterruptedException e){ + if(sync.fair) + sync.synchronizeQueue(true, Thread.currentThread()); + throw new IgniteInterruptedException(e); } finally { @@ -1092,7 +1332,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable /** {@inheritDoc} */ @Override public int getHoldCount() { - try{ + try{ initializeReentrantLock(); return sync.getHoldCount(); @@ -1188,6 +1428,10 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable return sync.failoverSafe; } + @Override public boolean isFair() { + return sync.fair; + } + /** {@inheritDoc} */ @Override public boolean isBroken() { try{ http://git-wip-us.apache.org/repos/asf/ignite/blob/6bc6ce9f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockState.java index f5b2d06..dae7e9c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockState.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockState.java @@ -22,10 +22,8 @@ import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; import java.util.HashMap; -import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.Map; -import java.util.Set; import java.util.UUID; import org.apache.ignite.internal.processors.cache.GridCacheInternal; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -59,13 +57,27 @@ public final class GridCacheLockState implements GridCacheInternal, Externalizab @GridToStringInclude private Map<UUID, LinkedList<String>> signals; + /** Flag indicating lock fairness. */ + private boolean fair; + + /** Queue containing nodes that are waiting to acquire this lock, used to ensure fairness. */ + @GridToStringInclude + private LinkedList<UUID> nodes; + + /** Flag indicating that global state changed. Used in fair mode to ensure that only successful acquires + * and releases trigger update. */ + private boolean changed; + /** * Constructor. * * @param cnt Initial count. * @param id UUID of owning node. + * @param threadID ID of the current thread. + * @param failoverSafe true if created in failoverSafe mode. + * @param fair true if created in fair mode. */ - public GridCacheLockState(int cnt, UUID id, long threadID, boolean failoverSafe) { + public GridCacheLockState(int cnt, UUID id, long threadID, boolean failoverSafe, boolean fair) { assert cnt >= 0; this.id = id; @@ -76,6 +88,10 @@ public final class GridCacheLockState implements GridCacheInternal, Externalizab signals = null; + nodes = new LinkedList<UUID>(); + + this.fair = fair; + this.failoverSafe = failoverSafe; } @@ -152,6 +168,26 @@ public final class GridCacheLockState implements GridCacheInternal, Externalizab this.signals = signals; } + public LinkedList<UUID> getNodes() { + return nodes; + } + + public void setNodes(LinkedList<UUID> nodes) { + this.nodes = nodes; + } + + public boolean isFair() { + return fair; + } + + public boolean isChanged() { + return changed; + } + + public void setChanged(boolean changed) { + this.changed = changed; + } + /** {@inheritDoc} */ @Override public Object clone() throws CloneNotSupportedException { return super.clone(); @@ -165,6 +201,10 @@ public final class GridCacheLockState implements GridCacheInternal, Externalizab out.writeBoolean(failoverSafe); + out.writeBoolean(fair); + + out.writeBoolean(changed); + out.writeBoolean(conditionMap != null); if (conditionMap != null) { @@ -176,7 +216,7 @@ public final class GridCacheLockState implements GridCacheInternal, Externalizab out.writeInt(e.getValue().size()); for(UUID uuid:e.getValue()){ - U.writeUuid(out,uuid); + U.writeUuid(out, uuid); } } } @@ -196,6 +236,16 @@ public final class GridCacheLockState implements GridCacheInternal, Externalizab } } } + + out.writeBoolean(nodes != null); + + if (nodes != null) { + out.writeInt(nodes.size()); + + for (UUID uuid: nodes) { + U.writeUuid(out, uuid); + } + } } /** {@inheritDoc} */ @@ -206,6 +256,10 @@ public final class GridCacheLockState implements GridCacheInternal, Externalizab failoverSafe = in.readBoolean(); + fair = in.readBoolean(); + + changed = in.readBoolean(); + if (in.readBoolean()) { int size = in.readInt(); @@ -250,6 +304,19 @@ public final class GridCacheLockState implements GridCacheInternal, Externalizab else{ signals = null; } + + if (in.readBoolean()) { + int size = in.readInt(); + + nodes = new LinkedList(); + + for (int i = 0; i < size; i++) { + nodes.add(U.readUuid(in)); + } + } + else{ + nodes = null; + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/6bc6ce9f/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java index c3ee082..f56eb2b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java @@ -780,17 +780,23 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr * @throws Exception If failed. */ public void testReentrantLockReconnect() throws Exception { + testReentrantLockReconnect(false); + + testReentrantLockReconnect(true); + } + + private void testReentrantLockReconnect(final boolean fair) throws Exception { Ignite client = grid(serverCount()); assertTrue(client.cluster().localNode().isClient()); Ignite srv = clientRouter(client); - IgniteLock clientLock = client.reentrantLock("lock1", true, true); + IgniteLock clientLock = client.reentrantLock("lock1", true, fair, true); assertEquals(false, clientLock.isLocked()); - final IgniteLock srvLock = srv.reentrantLock("lock1", true, true); + final IgniteLock srvLock = srv.reentrantLock("lock1", true, fair, true); assertEquals(false, srvLock.isLocked()); http://git-wip-us.apache.org/repos/asf/ignite/blob/6bc6ce9f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java index c12b865..6034a48 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java @@ -589,6 +589,20 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig * @throws Exception If failed. */ public void testReentrantLockFailsWhenServersLeft() throws Exception { + testReentrantLockFailsWhenServersLeft(false); + } + + /** + * @throws Exception If failed. + */ + public void testFairReentrantLockFailsWhenServersLeft() throws Exception { + testReentrantLockFailsWhenServersLeft(true); + } + + /** + * @throws Exception If failed. + */ + public void testReentrantLockFailsWhenServersLeft(final boolean fair) throws Exception { client = true; Ignite client = startGrid(gridCount()); @@ -596,7 +610,7 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig Ignite server = grid(0); // Initialize lock. - IgniteLock srvLock = server.reentrantLock("lock", true, true); + IgniteLock srvLock = server.reentrantLock("lock", true, fair, true); IgniteSemaphore semaphore = server.semaphore("sync", 0, true, true); @@ -604,7 +618,7 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig compute.apply(new IgniteClosure<Ignite, Object>() { @Override public Object apply(Ignite ignite) { - final IgniteLock l = ignite.reentrantLock("lock", true, true); + final IgniteLock l = ignite.reentrantLock("lock", true, fair, true); l.lock(); @@ -657,47 +671,87 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig * @throws Exception If failed. */ public void testReentrantLockConstantTopologyChangeFailoverSafe() throws Exception { - doTestReentrantLock(new ConstantTopologyChangeWorker(TOP_CHANGE_THREAD_CNT), true); + doTestReentrantLock(new ConstantTopologyChangeWorker(TOP_CHANGE_THREAD_CNT), true, false); } /** * @throws Exception If failed. */ public void testReentrantLockConstantMultipleTopologyChangeFailoverSafe() throws Exception { - doTestReentrantLock(multipleTopologyChangeWorker(TOP_CHANGE_THREAD_CNT), true); + doTestReentrantLock(multipleTopologyChangeWorker(TOP_CHANGE_THREAD_CNT), true, false); } /** * @throws Exception If failed. */ public void testReentrantLockConstantTopologyChangeNonFailoverSafe() throws Exception { - doTestReentrantLock(new ConstantTopologyChangeWorker(TOP_CHANGE_THREAD_CNT), false); + doTestReentrantLock(new ConstantTopologyChangeWorker(TOP_CHANGE_THREAD_CNT), false, false); } /** * @throws Exception If failed. */ public void testReentrantLockConstantMultipleTopologyChangeNonFailoverSafe() throws Exception { - doTestReentrantLock(multipleTopologyChangeWorker(TOP_CHANGE_THREAD_CNT), false); + doTestReentrantLock(multipleTopologyChangeWorker(TOP_CHANGE_THREAD_CNT), false, false); + } + + /** + * @throws Exception If failed. + */ + public void testFairReentrantLockConstantTopologyChangeFailoverSafe() throws Exception { + doTestReentrantLock(new ConstantTopologyChangeWorker(TOP_CHANGE_THREAD_CNT), true, true); + } + + /** + * @throws Exception If failed. + */ + public void testFairReentrantLockConstantMultipleTopologyChangeFailoverSafe() throws Exception { + doTestReentrantLock(multipleTopologyChangeWorker(TOP_CHANGE_THREAD_CNT), true, true); + } + + /** + * @throws Exception If failed. + */ + public void testFairReentrantLockConstantTopologyChangeNonFailoverSafe() throws Exception { + doTestReentrantLock(new ConstantTopologyChangeWorker(TOP_CHANGE_THREAD_CNT), false, true); + } + + /** + * @throws Exception If failed. + */ + public void testFairReentrantLockConstantMultipleTopologyChangeNonFailoverSafe() throws Exception { + doTestReentrantLock(multipleTopologyChangeWorker(TOP_CHANGE_THREAD_CNT), false, true); } /** * @throws Exception If failed. */ - private void doTestReentrantLock(ConstantTopologyChangeWorker topWorker, final boolean failoverSafe) throws Exception { - try (IgniteLock lock = grid(0).reentrantLock(STRUCTURE_NAME, failoverSafe, true)) { + private void doTestReentrantLock(ConstantTopologyChangeWorker topWorker, final boolean failoverSafe, final boolean fair) throws Exception { + try (IgniteLock lock = grid(0).reentrantLock(STRUCTURE_NAME, failoverSafe, fair, true)) { IgniteInternalFuture<?> fut = topWorker.startChangingTopology(new IgniteClosure<Ignite, Object>() { @Override public Object apply(Ignite ignite) { - final IgniteLock l = ignite.reentrantLock(STRUCTURE_NAME, failoverSafe, false); + final IgniteLock l = ignite.reentrantLock(STRUCTURE_NAME, failoverSafe, fair, false); + + final AtomicBoolean done = new AtomicBoolean(false); IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { @Override public Void call() throws Exception { - l.lock(); + try{ + l.lock(); + } + finally { + done.set(true); + } return null; } }); + // Wait until l.lock() has been called. + while(!l.hasQueuedThreads() && !done.get()){ + // No-op. + } + return null; } }); @@ -729,7 +783,7 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig fut.get(); for (Ignite g : G.allGrids()) - assertFalse(g.reentrantLock(STRUCTURE_NAME, failoverSafe, false).isHeldByCurrentThread()); + assertFalse(g.reentrantLock(STRUCTURE_NAME, failoverSafe, fair, false).isHeldByCurrentThread()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/6bc6ce9f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java index 2b23389..5929d42 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java @@ -343,10 +343,10 @@ public abstract class IgniteClientDataStructuresAbstractTest extends GridCommonA * @throws Exception If failed. */ private void testReentrantLock(Ignite creator, final Ignite other) throws Exception { - assertNull(creator.reentrantLock("lock1", true, false)); - assertNull(other.reentrantLock("lock1", true, false)); + assertNull(creator.reentrantLock("lock1", true, false, false)); + assertNull(other.reentrantLock("lock1", true, false, false)); - try (IgniteLock lock = creator.reentrantLock("lock1", true, true)) { + try (IgniteLock lock = creator.reentrantLock("lock1", true, false, true)) { assertNotNull(lock); assertFalse(lock.isLocked()); @@ -355,7 +355,7 @@ public abstract class IgniteClientDataStructuresAbstractTest extends GridCommonA IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Object>() { @Override public Object call() throws Exception { - IgniteLock lock0 = other.reentrantLock("lock1", true, false); + IgniteLock lock0 = other.reentrantLock("lock1", true, false, false); lock0.lock(); @@ -390,8 +390,8 @@ public abstract class IgniteClientDataStructuresAbstractTest extends GridCommonA assertFalse(lock.isLocked()); } - assertNull(creator.reentrantLock("lock1", true, false)); - assertNull(other.reentrantLock("lock1", true, false)); + assertNull(creator.reentrantLock("lock1", true, false, false)); + assertNull(other.reentrantLock("lock1", true, false, false)); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/6bc6ce9f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java index e45e252..b3013ed 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java @@ -326,7 +326,7 @@ public class IgniteDataStructureUniqueNameTest extends IgniteCollectionAbstractT case 8: log.info("Create atomic reentrant lock, grid: " + ignite.name()); - res = ignite.reentrantLock(name, true, true); + res = ignite.reentrantLock(name, true, true, true); break; default: http://git-wip-us.apache.org/repos/asf/ignite/blob/6bc6ce9f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteLockAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteLockAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteLockAbstractSelfTest.java index a745b85..bca99ee 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteLockAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteLockAbstractSelfTest.java @@ -23,13 +23,19 @@ import java.io.ObjectInput; import java.io.ObjectOutput; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCompute; import org.apache.ignite.IgniteCondition; +import org.apache.ignite.IgniteCountDownLatch; import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.IgniteLock; import org.apache.ignite.IgniteLogger; @@ -40,6 +46,7 @@ import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.PA; import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.resources.LoggerResource; @@ -80,7 +87,9 @@ public abstract class IgniteLockAbstractSelfTest extends IgniteAtomicsAbstractTe * @throws Exception If failed. */ public void testReentrantLock() throws Exception { - checkReentrantLock(); + checkReentrantLock(false); + + checkReentrantLock(true); } /** @@ -90,23 +99,28 @@ public abstract class IgniteLockAbstractSelfTest extends IgniteAtomicsAbstractTe if (atomicsCacheMode() == LOCAL) return; - checkFailover(true); - checkFailover(false); + checkFailover(true, false); + + checkFailover(false, false); + + checkFailover(true, true); + + checkFailover(false, true); } /** * @param failoverSafe Failover safe flag. * @throws Exception */ - private void checkFailover(final boolean failoverSafe) throws Exception { + private void checkFailover(final boolean failoverSafe, final boolean fair) throws Exception { IgniteEx g = startGrid(NODES_CNT + 1); // For vars locality. { // Ensure not exists. - assert g.reentrantLock("lock", failoverSafe, false) == null; + assert g.reentrantLock("lock", failoverSafe, fair, false) == null; - IgniteLock lock = g.reentrantLock("lock", failoverSafe, true); + IgniteLock lock = g.reentrantLock("lock", failoverSafe, fair, true); lock.lock(); @@ -117,7 +131,7 @@ public abstract class IgniteLockAbstractSelfTest extends IgniteAtomicsAbstractTe Ignite g0 = grid(0); - final IgniteLock lock0 = g0.reentrantLock("lock", false, false); + final IgniteLock lock0 = g0.reentrantLock("lock", false, fair, false); assert !lock0.tryLock(); @@ -161,14 +175,14 @@ public abstract class IgniteLockAbstractSelfTest extends IgniteAtomicsAbstractTe /** * @throws Exception If failed. */ - private void checkReentrantLock() throws Exception { + private void checkReentrantLock(final boolean fair) throws Exception { // Test API. - checkLock(); + checkLock(fair); - checkFailoverSafe(); + checkFailoverSafe(fair); // Test main functionality. - IgniteLock lock1 = grid(0).reentrantLock("lock", true, true); + IgniteLock lock1 = grid(0).reentrantLock("lock", true, fair, true); assertFalse(lock1.isLocked()); @@ -188,7 +202,7 @@ public abstract class IgniteLockAbstractSelfTest extends IgniteAtomicsAbstractTe IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync( new Callable<Object>() { @Nullable @Override public Object call() throws Exception { - IgniteLock lock = ignite.reentrantLock("lock", true, true); + IgniteLock lock = ignite.reentrantLock("lock", true, fair, true); assert lock != null; @@ -260,27 +274,27 @@ public abstract class IgniteLockAbstractSelfTest extends IgniteAtomicsAbstractTe * * @throws Exception Exception. */ - private void checkFailoverSafe() throws Exception { + private void checkFailoverSafe(final boolean fair) throws Exception { // Checks only if reentrant lock is initialized properly - IgniteLock lock = createReentrantLock("rmv", true); + IgniteLock lock = createReentrantLock("rmv", true, fair); assert lock.isFailoverSafe(); - removeReentrantLock("rmv"); + removeReentrantLock("rmv", fair); - IgniteLock lock1 = createReentrantLock("rmv1", false); + IgniteLock lock1 = createReentrantLock("rmv1", false, fair); assert !lock1.isFailoverSafe(); - removeReentrantLock("rmv1"); + removeReentrantLock("rmv1", fair); } /** * @throws Exception Exception. */ - private void checkLock() throws Exception { + private void checkLock(final boolean fair) throws Exception { // Check only 'false' cases here. Successful lock is tested over the grid. - final IgniteLock lock = createReentrantLock("acquire", false); + final IgniteLock lock = createReentrantLock("acquire", false, fair); lock.lock(); @@ -300,23 +314,25 @@ public abstract class IgniteLockAbstractSelfTest extends IgniteAtomicsAbstractTe lock.unlock(); - removeReentrantLock("acquire"); + removeReentrantLock("acquire", fair); } /** * @param lockName Reentrant lock name. - * @param failoverSafe Fairness flag. + * @param failoverSafe FailoverSafe flag. + * @param fair Fairness flag. * @return New distributed reentrant lock. * @throws Exception If failed. */ - private IgniteLock createReentrantLock(String lockName, boolean failoverSafe) + private IgniteLock createReentrantLock(String lockName, boolean failoverSafe, boolean fair) throws Exception { - IgniteLock lock = grid(RND.nextInt(NODES_CNT)).reentrantLock(lockName, failoverSafe, true); + IgniteLock lock = grid(RND.nextInt(NODES_CNT)).reentrantLock(lockName, failoverSafe, fair, true); // Test initialization. assert lockName.equals(lock.name()); assert lock.isLocked() == false; assert lock.isFailoverSafe() == failoverSafe; + assert lock.isFair() == fair; return lock; } @@ -325,14 +341,14 @@ public abstract class IgniteLockAbstractSelfTest extends IgniteAtomicsAbstractTe * @param lockName Reentrant lock name. * @throws Exception If failed. */ - private void removeReentrantLock(String lockName) + private void removeReentrantLock(String lockName, final boolean fair) throws Exception { - IgniteLock lock = grid(RND.nextInt(NODES_CNT)).reentrantLock(lockName, false, true); + IgniteLock lock = grid(RND.nextInt(NODES_CNT)).reentrantLock(lockName, false, fair, true); assert lock != null; // Remove lock on random node. - IgniteLock lock0 = grid(RND.nextInt(NODES_CNT)).reentrantLock(lockName, false, true); + IgniteLock lock0 = grid(RND.nextInt(NODES_CNT)).reentrantLock(lockName, false, fair, true); assertNotNull(lock0); @@ -340,7 +356,7 @@ public abstract class IgniteLockAbstractSelfTest extends IgniteAtomicsAbstractTe // Ensure reentrant lock is removed on all nodes. for (Ignite g : G.allGrids()) - assertNull(((IgniteKernal)g).context().dataStructures().reentrantLock(lockName, false, false)); + assertNull(((IgniteKernal)g).context().dataStructures().reentrantLock(lockName, false, fair, false)); checkRemovedReentrantLock(lock); } @@ -349,10 +365,19 @@ public abstract class IgniteLockAbstractSelfTest extends IgniteAtomicsAbstractTe * @throws Exception If failed. */ public void testReentrantLockMultinode1() throws Exception { + testReentrantLockMultinode1(false); + + testReentrantLockMultinode1(true); + } + + /** + * @throws Exception If failed. + */ + private void testReentrantLockMultinode1(final boolean fair) throws Exception { if (gridCount() == 1) return; - IgniteLock lock = grid(0).reentrantLock("s1", true, true); + IgniteLock lock = grid(0).reentrantLock("s1", true, fair, true); List<IgniteInternalFuture<?>> futs = new ArrayList<>(); @@ -361,7 +386,7 @@ public abstract class IgniteLockAbstractSelfTest extends IgniteAtomicsAbstractTe futs.add(GridTestUtils.runAsync(new Callable<Void>() { @Override public Void call() throws Exception { - IgniteLock lock = ignite.reentrantLock("s1", true, false); + IgniteLock lock = ignite.reentrantLock("s1", true, fair, false); assertNotNull(lock); @@ -417,7 +442,16 @@ public abstract class IgniteLockAbstractSelfTest extends IgniteAtomicsAbstractTe * @throws Exception If failed. */ public void testLockInterruptibly() throws Exception { - final IgniteLock lock0 = grid(0).reentrantLock("lock", true, true); + testLockInterruptibly(false); + + testLockInterruptibly(true); + } + + /** + * @throws Exception If failed. + */ + private void testLockInterruptibly(final boolean fair) throws Exception { + final IgniteLock lock0 = grid(0).reentrantLock("lock", true, fair, true); assertEquals(0, lock0.getHoldCount()); @@ -447,8 +481,6 @@ public abstract class IgniteLockAbstractSelfTest extends IgniteAtomicsAbstractTe isInterrupted = true; } finally { - System.out.println(Thread.currentThread()); - // Assert that thread was interrupted. assertTrue(isInterrupted); @@ -486,8 +518,131 @@ public abstract class IgniteLockAbstractSelfTest extends IgniteAtomicsAbstractTe /** * @throws Exception If failed. */ + public void testLockInterruptiblyMultinode() throws Exception { + testLockInterruptiblyMultinode(false); + + testLockInterruptiblyMultinode(true); + } + + /** + * @throws Exception If failed. + */ + private void testLockInterruptiblyMultinode(final boolean fair) throws Exception { + if (gridCount() == 1) + return; + + // Initialize reentrant lock. + final IgniteLock lock0 = grid(0).reentrantLock("lock", true, fair, true); + + assertEquals(0, lock0.getHoldCount()); + + assertFalse(lock0.hasQueuedThreads()); + + lock0.lock(); + + // Number of threads, one per node. + final int threadCount = gridCount(); + + final AtomicLong threadCounter = new AtomicLong(0); + + IgniteInternalFuture<?> fut = multithreadedAsync( + new Callable<Object>() { + @Override public Object call() throws Exception { + final int localNodeId = (int)threadCounter.getAndIncrement(); + + final Ignite grid = grid(localNodeId); + + IgniteClosure<Ignite, Void> closure = new IgniteClosure<Ignite, Void>() { + @Override public Void apply(Ignite ignite) { + final IgniteLock l = ignite.reentrantLock("lock", true, true, true); + + final AtomicReference<Thread> thread = new AtomicReference(); + + final AtomicBoolean done = new AtomicBoolean(false); + + final AtomicBoolean exceptionThrown = new AtomicBoolean(false); + + final IgniteCountDownLatch latch = ignite.countDownLatch("latch", threadCount, false, true); + + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + try{ + thread.set(Thread.currentThread()); + + l.lockInterruptibly(); + } + catch(IgniteInterruptedException e){ + exceptionThrown.set(true); + } + finally { + done.set(true); + } + + return null; + } + }); + + // Wait until l.lock() has been called. + while(!l.hasQueuedThreads()){ + // No-op. + } + + latch.countDown(); + + latch.await(); + + thread.get().interrupt(); + + while(!done.get()){ + // No-op. + } + + assertTrue(exceptionThrown.get()); + + return null; + } + }; + + closure.apply(grid); + + return null; + } + }, threadCount); + + fut.get(); + + lock0.unlock(); + + info("Checking if interrupted threads are removed from global waiting queue..."); + + // Check if interrupted threads are removed from global waiting queue. + boolean locked = lock0.tryLock(1000, MILLISECONDS); + + info("Interrupted threads successfully removed from global waiting queue. "); + + assertTrue(locked); + + lock0.unlock(); + + assertFalse(lock0.isLocked()); + + lock0.close(); + } + + /** + * @throws Exception If failed. + */ public void testLock() throws Exception { - final IgniteLock lock0 = grid(0).reentrantLock("lock", true, true); + testLock(false); + + testLock(true); + } + + /** + * @throws Exception If failed. + */ + private void testLock(final boolean fair) throws Exception { + final IgniteLock lock0 = grid(0).reentrantLock("lock", true, fair, true); assertEquals(0, lock0.getHoldCount()); @@ -561,7 +716,16 @@ public abstract class IgniteLockAbstractSelfTest extends IgniteAtomicsAbstractTe * @throws Exception If failed. */ public void testTryLock() throws Exception { - final IgniteLock lock0 = grid(0).reentrantLock("lock", true, true); + testTryLock(false); + + testTryLock(true); + } + + /** + * @throws Exception If failed. + */ + private void testTryLock(final boolean fair) throws Exception { + final IgniteLock lock0 = grid(0).reentrantLock("lock", true, fair, true); assertEquals(0, lock0.getHoldCount()); @@ -635,7 +799,16 @@ public abstract class IgniteLockAbstractSelfTest extends IgniteAtomicsAbstractTe * @throws Exception If failed. */ public void testTryLockTimed() throws Exception { - final IgniteLock lock0 = grid(0).reentrantLock("lock", true, true); + testTryLockTimed(false); + + testTryLockTimed(true); + } + + /** + * @throws Exception If failed. + */ + private void testTryLockTimed(final boolean fair) throws Exception { + final IgniteLock lock0 = grid(0).reentrantLock("lock", true, fair, true); assertEquals(0, lock0.getHoldCount()); @@ -702,7 +875,16 @@ public abstract class IgniteLockAbstractSelfTest extends IgniteAtomicsAbstractTe * @throws Exception If failed. */ public void testConditionAwaitUninterruptibly() throws Exception { - final IgniteLock lock0 = grid(0).reentrantLock("lock", true, true); + testConditionAwaitUninterruptibly(false); + + testConditionAwaitUninterruptibly(true); + } + + /** + * @throws Exception If failed. + */ + private void testConditionAwaitUninterruptibly(final boolean fair) throws Exception { + final IgniteLock lock0 = grid(0).reentrantLock("lock", true, fair, true); assertEquals(0, lock0.getHoldCount()); @@ -782,7 +964,16 @@ public abstract class IgniteLockAbstractSelfTest extends IgniteAtomicsAbstractTe * @throws Exception If failed. */ public void testConditionInterruptAwait() throws Exception { - final IgniteLock lock0 = grid(0).reentrantLock("lock", true, true); + testConditionInterruptAwait(false); + + testConditionInterruptAwait(true); + } + + /** + * @throws Exception If failed. + */ + private void testConditionInterruptAwait(final boolean fair) throws Exception { + final IgniteLock lock0 = grid(0).reentrantLock("lock", true, fair, true); assertEquals(0, lock0.getHoldCount()); @@ -852,7 +1043,16 @@ public abstract class IgniteLockAbstractSelfTest extends IgniteAtomicsAbstractTe * @throws Exception If failed. */ public void testHasQueuedThreads() throws Exception { - final IgniteLock lock0 = grid(0).reentrantLock("lock", true, true); + testHasQueuedThreads(false); + + testHasQueuedThreads(true); + } + + /** + * @throws Exception If failed. + */ + private void testHasQueuedThreads(final boolean fair) throws Exception { + final IgniteLock lock0 = grid(0).reentrantLock("lock", true, fair, true); assertEquals(0, lock0.getHoldCount()); @@ -920,7 +1120,16 @@ public abstract class IgniteLockAbstractSelfTest extends IgniteAtomicsAbstractTe * @throws Exception If failed. */ public void testHasConditionQueuedThreads() throws Exception { - final IgniteLock lock0 = grid(0).reentrantLock("lock", true, true); + testHasConditionQueuedThreads(false); + + testHasConditionQueuedThreads(true); + } + + /** + * @throws Exception If failed. + */ + private void testHasConditionQueuedThreads(final boolean fair) throws Exception { + final IgniteLock lock0 = grid(0).reentrantLock("lock", true, fair, true); assertEquals(0, lock0.getHoldCount()); @@ -1015,6 +1224,119 @@ public abstract class IgniteLockAbstractSelfTest extends IgniteAtomicsAbstractTe lock0.close(); } + /** + * Tests if lock is evenly acquired among nodes when fair flag is set on. + * Since exact ordering of lock acquisitions cannot be guaranteed because it also depends + * on the OS thread scheduling, certain deviation from uniform distribution is tolerated. + * @throws Exception If failed. + */ + public void testFairness() throws Exception { + if (gridCount() == 1) + return; + + // Total number of ops. + final long opsCount = 10000; + + // Allowed deviation from uniform distribution. + final double tolerance = 0.05; + + // Shared counter. + final String OPS_COUNTER = "ops_counter"; + + // Number of threads, one per node. + final int threadCount = gridCount(); + + final AtomicLong threadCounter = new AtomicLong(0); + + Ignite ignite = startGrid(gridCount()); + + // Initialize reentrant lock. + IgniteLock l = ignite.reentrantLock("lock", true, true, true); + + // Initialize OPS_COUNTER. + ignite.getOrCreateCache(OPS_COUNTER).put(OPS_COUNTER, (long)0); + + final Map<Integer, Long> counts = new ConcurrentHashMap<Integer, Long>(); + + IgniteInternalFuture<?> fut = multithreadedAsync( + new Callable<Object>() { + @Override public Object call() throws Exception { + final int localNodeId = (int)threadCounter.getAndIncrement(); + + final Ignite grid = grid(localNodeId); + + IgniteClosure<Ignite, Long> closure = new IgniteClosure<Ignite, Long>() { + @Override public Long apply(Ignite ignite) { + IgniteLock l = ignite.reentrantLock("lock", true, true, true); + + long localCount = 0; + + IgniteCountDownLatch latch = ignite.countDownLatch("latch", threadCount, false, true); + + latch.countDown(); + + latch.await(); + + while(true){ + l.lock(); + + try { + long opsCounter = (long) ignite.getOrCreateCache(OPS_COUNTER).get(OPS_COUNTER); + + if(opsCounter == opsCount) + break; + + ignite.getOrCreateCache(OPS_COUNTER).put(OPS_COUNTER, ++opsCounter); + + localCount++; + + if(localCount > 1000){ + assertTrue(localCount < (1 + tolerance) * opsCounter / threadCount); + + assertTrue(localCount > (1 - tolerance) * opsCounter / threadCount); + } + + if(localCount % 100 == 0) { + info("Node [id=" +ignite.cluster().localNode().id() + "] acquired " + + localCount + " times. " + "Total ops count: " + + opsCounter + "/" + opsCount +"]"); + } + } + finally { + l.unlock(); + } + } + + return localCount; + } + }; + + long localCount = closure.apply(grid); + + counts.put(localNodeId, localCount); + + return null; + } + }, threadCount); + + fut.get(); + + long totalSum = 0; + + for(int i=0; i<gridCount(); i++){ + + totalSum += counts.get(i); + + info("Node " + grid(i).localNode().id() + " acquired the lock " + counts.get(i) + " times. "); + } + + assertEquals(totalSum, opsCount); + + l.close(); + + ignite.close(); + } + /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { // No-op. http://git-wip-us.apache.org/repos/asf/ignite/blob/6bc6ce9f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/local/IgniteLocalLockSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/local/IgniteLocalLockSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/local/IgniteLocalLockSelfTest.java index 543655c..7e1a11c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/local/IgniteLocalLockSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/local/IgniteLocalLockSelfTest.java @@ -45,7 +45,7 @@ public class IgniteLocalLockSelfTest extends IgniteLockAbstractSelfTest { /** {@inheritDoc} */ @Override public void testReentrantLock() throws Exception { // Test main functionality. - IgniteLock lock = grid(0).reentrantLock("lock", true, true); + IgniteLock lock = grid(0).reentrantLock("lock", true, false, true); assertNotNull(lock); @@ -56,7 +56,7 @@ public class IgniteLocalLockSelfTest extends IgniteLockAbstractSelfTest { IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync( new Callable<Object>() { @Nullable @Override public Object call() throws Exception { - IgniteLock lock = grid(0).reentrantLock("lock", true, true); + IgniteLock lock = grid(0).reentrantLock("lock", true, false, true); assert lock != null; @@ -99,7 +99,7 @@ public class IgniteLocalLockSelfTest extends IgniteLockAbstractSelfTest { fut.get(); // Test operations on removed lock. - IgniteLock lock0 = grid(0).reentrantLock("lock", true, false); + IgniteLock lock0 = grid(0).reentrantLock("lock", true, false, false); assertNotNull(lock0); http://git-wip-us.apache.org/repos/asf/ignite/blob/6bc6ce9f/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheDataStructuresLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheDataStructuresLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheDataStructuresLoadTest.java index ee37aec..18537c5 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheDataStructuresLoadTest.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheDataStructuresLoadTest.java @@ -359,7 +359,7 @@ public final class GridCacheDataStructuresLoadTest extends GridCacheAbstractLoad private final CIX1<Ignite> reentrantLockReadClos = new CIX1<Ignite>() { @Override public void applyx(Ignite ignite) { - IgniteLock r = ignite.reentrantLock(TEST_REENTRANT_LOCK_NAME, true, true); + IgniteLock r = ignite.reentrantLock(TEST_REENTRANT_LOCK_NAME, true, false, true); for (int i = 0; i < operationsPerTx; i++) { r.isLocked(); @@ -376,7 +376,7 @@ public final class GridCacheDataStructuresLoadTest extends GridCacheAbstractLoad private final CIX1<Ignite> reentrantLockWriteClos = new CIX1<Ignite>() { @Override public void applyx(Ignite ignite) { - IgniteLock r = ignite.reentrantLock(TEST_REENTRANT_LOCK_NAME, true, true); + IgniteLock r = ignite.reentrantLock(TEST_REENTRANT_LOCK_NAME, true, false, true); for (int i = 0; i < operationsPerTx; i++) { if ((i % 2) == 0) http://git-wip-us.apache.org/repos/asf/ignite/blob/6bc6ce9f/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java index 53f8b2e..c9859fc 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java @@ -370,6 +370,7 @@ public class IgniteMock implements Ignite { /** {@inheritDoc} */ @Nullable @Override public IgniteLock reentrantLock(String name, boolean failoverSafe, + boolean fair, boolean create) { return null; http://git-wip-us.apache.org/repos/asf/ignite/blob/6bc6ce9f/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java index c2ffd3f..2598bc5 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java @@ -547,7 +547,7 @@ public class IgniteProcessProxy implements IgniteEx { /** {@inheritDoc} */ @Override public IgniteLock reentrantLock(String name, boolean failoverSafe, - boolean create) throws IgniteException { + boolean fair, boolean create) throws IgniteException { throw new UnsupportedOperationException("Operation isn't supported yet."); } http://git-wip-us.apache.org/repos/asf/ignite/blob/6bc6ce9f/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java ---------------------------------------------------------------------- diff --git a/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java b/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java index b346d86..03c7b0e 100644 --- a/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java +++ b/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java @@ -427,11 +427,12 @@ public class IgniteSpringBean implements Ignite, DisposableBean, InitializingBea /** {@inheritDoc} */ @Nullable @Override public IgniteLock reentrantLock(String name, boolean failoverSafe, + boolean fair, boolean create) { assert g != null; - return g.reentrantLock(name, failoverSafe, create); + return g.reentrantLock(name, failoverSafe, create, fair); } /** {@inheritDoc} */
