http://git-wip-us.apache.org/repos/asf/ignite/blob/f8bf93a7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java new file mode 100644 index 0000000..3ab7289 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java @@ -0,0 +1,1538 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastructures; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.InvalidObjectException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.io.ObjectStreamException; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.AbstractQueuedSynchronizer; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteCondition; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteInterruptedException; +import org.apache.ignite.IgniteLock; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.IgnitionEx; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.IgniteInternalCache; +import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.transactions.TransactionRollbackException; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.internal.processors.cache.GridCacheUtils.retryTopologySafe; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; + +/** + * Cache reentrant lock implementation based on AbstractQueuedSynchronizer. + */ +public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Deserialization stash. */ + private static final ThreadLocal<String> stash = new ThreadLocal<>(); + + /** Logger. */ + private IgniteLogger log; + + /** Reentrant lock name. */ + private String name; + + /** Removed flag. */ + private volatile boolean rmvd; + + /** Reentrant lock key. */ + private GridCacheInternalKey key; + + /** Reentrant lock projection. */ + private IgniteInternalCache<GridCacheInternalKey, GridCacheLockState> lockView; + + /** Cache context. */ + private final GridCacheContext ctx; + + /** Initialization guard. */ + private final AtomicBoolean initGuard = new AtomicBoolean(); + + /** Initialization latch. */ + private final CountDownLatch initLatch = new CountDownLatch(1); + + /** Lock that provides non-overlapping processing of updates. */ + private Lock updateLock = new ReentrantLock(); + + /** Internal synchronization object. */ + private Sync sync; + + /** Flag indicating that every operation on this lock should be interrupted. */ + private volatile boolean interruptAll; + + /** + * Empty constructor required by {@link Externalizable}. + */ + public GridCacheLockImpl() { + // This instance should never be used directly. + ctx = null; + } + + /** + * Synchronization implementation for reentrant lock using AbstractQueuedSynchronizer. + */ + @SuppressWarnings({"CallToThreadYield", "CallToSignalInsteadOfSignalAll"}) + private class Sync extends AbstractQueuedSynchronizer { + /** */ + private static final long serialVersionUID = 1192457210091910933L; + + /** */ + private static final long LOCK_FREE = 0; + + /** Map containing condition objects. */ + private Map<String, ConditionObject> conditionMap; + + /** List of condition signal calls on this node. */ + private Map<String, Integer> outgoingSignals; + + /** Last condition waited on. */ + @Nullable + private volatile String lastCondition; + + /** True if any node owning the lock had failed. */ + private volatile boolean isBroken; + + /** UUID of the node that currently owns the lock. */ + private volatile UUID currentOwnerNode; + + /** ID of the thread that currently owns the lock. */ + private volatile long currentOwnerThreadId; + + /** UUID of this node. */ + private final UUID thisNode; + + /** FailoverSafe flag. */ + private final boolean failoverSafe; + + /** Fairness flag. */ + private final boolean fair; + + /** Threads that are waiting on this lock. */ + private Set<Long> waitingThreads; + + /** + * @param state State. + */ + protected Sync(GridCacheLockState state) { + setState(state.get()); + + thisNode = ctx.localNodeId(); + + currentOwnerNode = state.getId(); + + currentOwnerThreadId = state.getThreadId(); + + conditionMap = new HashMap<>(); + + outgoingSignals = new HashMap<>(); + + failoverSafe = state.isFailoverSafe(); + + fair = state.isFair(); + + waitingThreads = new ConcurrentSkipListSet<>(); + } + + /** + * + */ + protected void addOutgoingSignal(String condition) { + int cnt = 0; + + if (outgoingSignals.containsKey(condition)) { + cnt = outgoingSignals.get(condition); + + // SignalAll has already been called. + if (cnt == 0) + return; + } + + outgoingSignals.put(condition, cnt + 1); + } + + protected void addOutgoingSignalAll(String condition) { + outgoingSignals.put(condition, 0); + } + + /** + * Process any condition await calls on this node. + */ + private String processAwait() { + if (lastCondition == null) + return null; + + String ret = lastCondition; + + lastCondition = null; + + return ret; + } + + /** */ + private Map<String, Integer> processSignal() { + Map<String,Integer> ret = new HashMap<>(outgoingSignals); + + outgoingSignals.clear(); + + return ret; + } + + /** Interrupt every thread on this node waiting on this lock. */ + private synchronized void interruptAll() { + // First release all threads waiting on associated condition queues. + if (!conditionMap.isEmpty()) { + // Temporarily obtain ownership of the lock, + // in order to signal all conditions. + UUID tempUUID = getOwnerNode(); + + long tempThreadID = currentOwnerThreadId; + + setCurrentOwnerNode(thisNode); + + currentOwnerThreadId = Thread.currentThread().getId(); + + for (Condition c : conditionMap.values()) + c.signalAll(); + + // Restore owner node and owner thread. + setCurrentOwnerNode(tempUUID); + + currentOwnerThreadId = tempThreadID; + } + + // Interrupt any future call to acquire/release on this sync object. + interruptAll = true; + + // Interrupt any ongoing transactions. + for (Thread t: getQueuedThreads()) + t.interrupt(); + } + + /** Check if lock is in correct state (i.e. not broken in non-failoversafe mode), + * if not throw {@linkplain IgniteInterruptedException} */ + private void validate(final boolean throwInterrupt) { + // Interrupted flag shouldn't be always cleared + // (e.g. lock() method doesn't throw exception and doesn't clear interrupted) + // but should be cleared if this method is called after lock breakage or node stop. + // If interruptAll is set, exception is thrown anyway. + boolean interrupted = Thread.currentThread().isInterrupted(); + + // Clear interrupt flag. + if (throwInterrupt || interruptAll) + Thread.interrupted(); + + if (interruptAll) + throw new IgniteException("Lock broken (possible reason: node stopped" + + " or node owning lock failed while in non-failoversafe mode)."); + + // Global queue should be synchronized only if interrupted exception should be thrown. + if (fair && (throwInterrupt && interrupted) && !interruptAll) { + synchronizeQueue(true, Thread.currentThread()); + + throw new IgniteInterruptedException("Lock is interrupted."); + } + } + + /** + * Sets the number of permits currently acquired on this lock. This method should only be used in {@linkplain + * GridCacheLockImpl#onUpdate(GridCacheLockState)}. + * + * @param permits Number of permits acquired at this reentrant lock. + */ + final synchronized void setPermits(int permits) { + setState(permits); + } + + /** + * Gets the number of permissions currently acquired at this lock. + * + * @return Number of permits acquired at this reentrant lock. + */ + final int getPermits() { + return getState(); + } + + /** + * Sets the UUID of the node that currently owns this lock. This method should only be used in {@linkplain + * GridCacheLockImpl#onUpdate(GridCacheLockState)}. + * + * @param ownerNode UUID of the node owning this lock. + */ + final synchronized void setCurrentOwnerNode(UUID ownerNode) { + currentOwnerNode = ownerNode; + } + + /** + * Gets the UUID of the node that currently owns the lock. + * + * @return UUID of the node that currently owns the lock. + */ + final UUID getOwnerNode() { + return currentOwnerNode; + } + + /** + * Checks if latest call to acquire/release was called on this node. + * Should only be called from update method. + * + * @param newOwnerID ID of the node that is about to acquire this lock (or null). + * @return true if acquire/release that triggered last update came from this node. + */ + protected boolean isLockedLocally(UUID newOwnerID) { + return thisNode.equals(getOwnerNode()) || thisNode.equals(newOwnerID); + } + + protected void setCurrentOwnerThread(long newOwnerThreadId) { + currentOwnerThreadId = newOwnerThreadId; + } + + /** + * Returns true if node that owned the locked failed before call to unlock. + * + * @return true if any node failed while owning the lock. + */ + protected boolean isBroken() { + return isBroken; + } + + /** */ + protected void setBroken(boolean isBroken) { + this.isBroken = isBroken; + } + + /** */ + protected synchronized boolean hasPredecessor(LinkedList<UUID> nodes) { + if (!fair) + return false; + + for (Iterator<UUID> it = nodes.iterator(); it.hasNext(); ) { + UUID node = it.next(); + + if (ctx.discovery().node(node) == null) { + it.remove(); + + continue; + } + + return !node.equals(thisNode); + } + + return false; + } + + /** + * Performs tryLock. + * @param acquires Number of permits to acquire. + * @param fair Fairness parameter. + * @return {@code True} if succeeded, false otherwise. + */ + final boolean tryAcquire(final int acquires, final boolean fair) { + // If broken in non-failoversafe mode, exit immediately. + if (interruptAll) + return true; + + final Thread current = Thread.currentThread(); + + boolean failed = false; + + int c = getState(); + + // Wait for lock to reach stable state. + while (c != 0) { + UUID currentOwner = currentOwnerNode; + + if (currentOwner != null) { + failed = ctx.discovery().node(currentOwner) == null; + + break; + } + + c = getState(); + } + + // Check if lock is released or current owner failed. + if (c == 0 || failed) { + if (compareAndSetGlobalState(0, acquires, current, fair)) { + + // Not used for synchronization (we use ThreadID), but updated anyway. + setExclusiveOwnerThread(current); + + while (!isHeldExclusively() && !interruptAll) + Thread.yield(); + + return true; + } + } + else if (isHeldExclusively()) { + int nextc = c + acquires; + + if (nextc < 0) // overflow + throw new Error("Maximum lock count exceeded."); + + setState(nextc); + + return true; + } + + if (fair && !isQueued(current)) + synchronizeQueue(false, current); + + return false; + } + + /** + * Performs lock. + */ + final void lock() { + acquire(1); + } + + /** {@inheritDoc} */ + @Override protected final boolean tryAcquire(int acquires) { + return tryAcquire(acquires, fair); + } + + /** {@inheritDoc} */ + @Override protected final boolean tryRelease(int releases) { + // This method is called with release==0 only when trying to wake through update, + // to check if some other node released the lock. + if (releases == 0) + return true; + + // If broken in non-failoversafe mode, exit immediately. + if (interruptAll) + return true; + + int c = getState() - releases; + + if (!isHeldExclusively()) { + log.error("Lock.unlock() is called in illegal state [callerNodeId=" + thisNode + ", ownerNodeId=" + + currentOwnerNode + ", callerThreadId=" + Thread.currentThread().getId() + ", ownerThreadId=" + + currentOwnerThreadId + ", lockState=" + getState() + "]"); + + throw new IllegalMonitorStateException(); + } + + boolean free = false; + + if (c == 0) { + free = true; + + setGlobalState(0, processAwait(), processSignal()); + + while (isHeldExclusively() && !interruptAll) + Thread.yield(); + } + else + setState(c); + + return free; + } + + + /** {@inheritDoc} */ + @Override protected final boolean isHeldExclusively() { + // While we must in general read state before owner, + // we don't need to do so to check if current thread is owner + + return currentOwnerThreadId == Thread.currentThread().getId() && thisNode.equals(currentOwnerNode); + } + + /** + * @param name Condition name. + * @return Condition object. + */ + final synchronized IgniteCondition newCondition(String name) { + if (conditionMap.containsKey(name)) + return new IgniteConditionObject(name, conditionMap.get(name)); + + ConditionObject cond = new ConditionObject(); + + conditionMap.put(name, cond); + + return new IgniteConditionObject(name, cond); + } + + // Methods relayed from outer class + + final int getHoldCount() { + return isHeldExclusively() ? getState() : 0; + } + + final boolean isLocked() throws IgniteCheckedException { + return getState() != 0 || lockView.get(key).get() != 0; + } + + /** + * This method is used for synchronizing the reentrant lock state across all nodes. + */ + protected boolean compareAndSetGlobalState(final int expVal, final int newVal, + final Thread newThread, final boolean bargingProhibited) { + try { + return CU.outTx( + retryTopologySafe(new Callable<Boolean>() { + @Override public Boolean call() throws Exception { + try (IgniteInternalTx tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) { + + GridCacheLockState val = lockView.get(key); + + if (val == null) + throw new IgniteCheckedException("Failed to find reentrant lock with given name: " + name); + + final long newThreadID = newThread.getId(); + + LinkedList<UUID> nodes = val.getNodes(); + + // Barging is prohibited in fair mode unless tryLock() is called. + if (!(bargingProhibited && hasPredecessor(nodes))) { + if (val.get() == expVal || ctx.discovery().node(val.getId()) == null) { + val.set(newVal); + + val.setId(thisNode); + + val.setThreadId(newThreadID); + + val.setSignals(null); + + // This node is already in queue, except in cases where this is the only node + // or this is a call to tryLock(), in which case barging is ok. + // Queue is only updated if this is fair lock. + if (val.isFair() && (nodes.isEmpty() || !bargingProhibited)) + nodes.addFirst(thisNode); + + val.setNodes(nodes); + + val.setChanged(true); + + lockView.put(key, val); + + tx.commit(); + + return true; + } + } + + return false; + } + catch (Exception e) { + if (interruptAll) { + log.info("Node is stopped (or lock is broken in non-failover safe mode)," + + " aborting transaction."); + + // Return immediately, exception will be thrown later. + return true; + } + else { + if (Thread.currentThread().isInterrupted()) { + log.info("Thread is interrupted while attempting to acquire lock."); + + // Delegate the decision to throw InterruptedException to the AQS. + sync.release(0); + + return false; + } + + U.error(log, "Failed to compare and set: " + this, e); + } + + throw e; + } + } + }), + ctx + ); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** + * This method is used for synchronizing the number of acquire attempts on this lock across all nodes. + * + * @param cancelled true if acquire attempt is cancelled, false if acquire attempt should be registered. + */ + protected boolean synchronizeQueue(final boolean cancelled, final Thread thread) { + final AtomicBoolean interrupted = new AtomicBoolean(false); + + try { + return CU.outTx( + retryTopologySafe(new Callable<Boolean>() { + @Override public Boolean call() throws Exception { + try (IgniteInternalTx tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheLockState val = lockView.get(key); + + if (val == null) + throw new IgniteCheckedException("Failed to find reentrant lock with given name: " + name); + + LinkedList<UUID> nodes = val.getNodes(); + + if (!cancelled) { + nodes.add(thisNode); + + val.setChanged(false); + + lockView.put(key, val); + + tx.commit(); + + // Keep track of all threads that are queued in global queue. + // We deliberately don't use #sync.isQueued(), because AQS + // cancel threads immediately after throwing interrupted exception. + sync.waitingThreads.add(thread.getId()); + + return true; + } + else { + if (sync.waitingThreads.contains(thread.getId())) { + // Update other nodes if this is the first node in queue. + val.setChanged(nodes.lastIndexOf(thisNode) == 0); + + nodes.removeLastOccurrence(thisNode); + + lockView.put(key, val); + + tx.commit(); + + sync.waitingThreads.remove(thread.getId()); + + return true; + } + } + + return false; + } + catch (Exception e) { + if (interruptAll) { + log.info("Node is stopped (or lock is broken in non-failover safe mode)," + + " aborting transaction."); + + // Abort this attempt to synchronize queue and start another one, + // that will return immediately. + sync.release(0); + + return false; + } + else { + // If thread got interrupted, abort this attempt to synchronize queue, + // clear interrupt flag and try again, and let the AQS decide + // whether to throw an exception or ignore it. + if (Thread.interrupted() || X.hasCause(e, InterruptedException.class)) { + interrupted.set(true); + + throw new TransactionRollbackException("Thread got interrupted " + + "while synchronizing the global queue, retrying. "); + } + + U.error(log, "Failed to synchronize global lock queue: " + this, e); + } + + throw e; + } + } + }), + ctx + ); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + // Restore interrupt flag and let AQS decide what to do with it. + if (interrupted.get()) + Thread.currentThread().interrupt(); + } + } + + /** + * Sets the global state across all nodes after releasing the reentrant lock. + * + * @param newVal New state. + * @param lastCondition Id of the condition await is called. + * @param outgoingSignals Map containing signal calls on this node since the last acquisition of the lock. + */ + protected boolean setGlobalState(final int newVal, @Nullable final String lastCondition, final Map<String, Integer> outgoingSignals) { + try { + return CU.outTx( + retryTopologySafe(new Callable<Boolean>() { + @Override public Boolean call() throws Exception { + try (IgniteInternalTx tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheLockState val = lockView.get(key); + + if (val == null) + throw new IgniteCheckedException("Failed to find reentrant lock with given name: " + name); + + val.set(newVal); + + if (newVal == 0) { + val.setId(null); + + val.setThreadId(LOCK_FREE); + } + + val.setChanged(true); + + // If this lock is fair, remove this node from queue. + if (val.isFair() && newVal == 0) { + UUID removedNode = val.getNodes().removeFirst(); + + assert(thisNode.equals(removedNode)); + } + + // Get global condition queue. + Map<String, LinkedList<UUID>> condMap = val.getConditionMap(); + + // Create map containing signals from this node. + Map<UUID, LinkedList<String>> signalMap = new HashMap<>(); + + // Put any signal calls on this node to global state. + if (!outgoingSignals.isEmpty()) { + for (String condition : outgoingSignals.keySet()) { + int cnt = outgoingSignals.get(condition); + + // Get queue for this condition. + List<UUID> list = condMap.get(condition); + + if (list != null && !list.isEmpty()) { + // Check if signalAll was called. + if (cnt == 0) { + cnt = list.size(); + } + + // Remove from global condition queue. + for (int i = 0; i < cnt; i++) { + if (list.isEmpty()) + break; + + UUID uuid = list.remove(0); + + // Skip if node to be released is not alive anymore. + if (ctx.discovery().node(uuid) == null) { + cnt++; + + continue; + } + + LinkedList<String> queue = signalMap.get(uuid); + + if (queue == null) { + queue = new LinkedList<>(); + + signalMap.put(uuid, queue); + } + + queue.add(condition); + } + } + } + } + + val.setSignals(signalMap); + + // Check if this release is called after condition.await() call; + // If true, add this node to the global waiting queue. + if (lastCondition != null) { + LinkedList<UUID> queue; + + //noinspection IfMayBeConditional + if (!condMap.containsKey(lastCondition)) + // New condition object. + queue = new LinkedList<>(); + else + // Existing condition object. + queue = condMap.get(lastCondition); + + queue.add(thisNode); + + condMap.put(lastCondition, queue); + } + + val.setConditionMap(condMap); + + lockView.put(key, val); + + tx.commit(); + + return true; + } + catch (Exception e) { + if (interruptAll) { + log.info("Node is stopped (or lock is broken in non-failover safe mode)," + + " aborting transaction."); + + return true; + } + else + U.error(log, "Failed to release: " + this, e); + + throw e; + } + } + }), + ctx + ); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + protected synchronized boolean checkIncomingSignals(GridCacheLockState state) { + if (state.getSignals() == null) + return false; + + LinkedList<String> signals = state.getSignals().get(thisNode); + + if (signals == null || signals.isEmpty()) + return false; + + UUID tempUUID = getOwnerNode(); + + Thread tempThread = getExclusiveOwnerThread(); + + long tempThreadID = currentOwnerThreadId; + + // Temporarily allow current thread to signal condition object. + // This is safe to do because: + // 1. if release was called on this node, + // it was called from currently active thread; + // 2. if release came from a thread on any other node, + // all threads on this node are already blocked. + setCurrentOwnerNode(thisNode); + + setExclusiveOwnerThread(Thread.currentThread()); + + currentOwnerThreadId = Thread.currentThread().getId(); + + for (String signal: signals) + conditionMap.get(signal).signal(); + + // Restore owner node and owner thread. + setCurrentOwnerNode(tempUUID); + + setExclusiveOwnerThread(tempThread); + + currentOwnerThreadId = tempThreadID; + + return true; + } + + /** + * Condition implementation for {@linkplain IgniteLock}. + * + **/ + private class IgniteConditionObject implements IgniteCondition { + /** */ + private final String name; + + /** */ + private final AbstractQueuedSynchronizer.ConditionObject object; + + /** + * @param name Condition name. + * @param object Condition object. + */ + protected IgniteConditionObject(String name, ConditionObject object) { + this.name = name; + + this.object = object; + } + + /** + * Name of this condition. + * + * @return name Name of this condition object. + */ + @Override public String name() { + return name; + } + + /** {@inheritDoc} */ + @Override public void await() throws IgniteInterruptedException { + ctx.kernalContext().gateway().readLock(); + + try { + if (!isHeldExclusively()) + throw new IllegalMonitorStateException(); + + lastCondition = name; + + object.await(); + + sync.validate(true); + } + catch (InterruptedException e) { + throw new IgniteInterruptedException(e); + } + finally { + ctx.kernalContext().gateway().readUnlock(); + } + } + + /** {@inheritDoc} */ + @Override public void awaitUninterruptibly() { + ctx.kernalContext().gateway().readLock(); + + try { + if (!isHeldExclusively()) + throw new IllegalMonitorStateException(); + + lastCondition = name; + + object.awaitUninterruptibly(); + + sync.validate(false); + } + finally { + ctx.kernalContext().gateway().readUnlock(); + } + } + + /** {@inheritDoc} */ + @Override public long awaitNanos(long nanosTimeout) throws IgniteInterruptedException { + ctx.kernalContext().gateway().readLock(); + + try { + if (!isHeldExclusively()) + throw new IllegalMonitorStateException(); + + lastCondition = name; + + long result = object.awaitNanos(nanosTimeout); + + sync.validate(true); + + return result; + } + catch (InterruptedException e) { + throw new IgniteInterruptedException(e); + } + finally { + ctx.kernalContext().gateway().readUnlock(); + } + } + + /** {@inheritDoc} */ + @Override public boolean await(long time, TimeUnit unit) throws IgniteInterruptedException { + ctx.kernalContext().gateway().readLock(); + + try { + if (!isHeldExclusively()) + throw new IllegalMonitorStateException(); + + lastCondition = name; + + boolean result = object.await(time, unit); + + sync.validate(true); + + return result; + } + catch (InterruptedException e) { + throw new IgniteInterruptedException(e); + } + finally { + ctx.kernalContext().gateway().readUnlock(); + } + } + + /** {@inheritDoc} */ + @Override public boolean awaitUntil(Date deadline) throws IgniteInterruptedException { + ctx.kernalContext().gateway().readLock(); + + try { + if (!isHeldExclusively()) + throw new IllegalMonitorStateException(); + + lastCondition = name; + + boolean result = object.awaitUntil(deadline); + + sync.validate(true); + + return result; + } + catch (InterruptedException e) { + throw new IgniteInterruptedException(e); + } + finally { + ctx.kernalContext().gateway().readUnlock(); + } + } + + /** {@inheritDoc} */ + @Override public void signal() { + ctx.kernalContext().gateway().readLock(); + + try { + if (!isHeldExclusively()) + throw new IllegalMonitorStateException(); + + validate(false); + + addOutgoingSignal(name); + } + finally { + ctx.kernalContext().gateway().readUnlock(); + } + } + + /** {@inheritDoc} */ + @Override public void signalAll() { + ctx.kernalContext().gateway().readLock(); + + try { + if (!isHeldExclusively()) + throw new IllegalMonitorStateException(); + + sync.validate(false); + + addOutgoingSignalAll(name); + } + finally { + ctx.kernalContext().gateway().readUnlock(); + } + } + } + } + + /** + * Constructor. + * + * @param name Reentrant lock name. + * @param key Reentrant lock key. + * @param lockView Reentrant lock projection. + * @param ctx Cache context. + */ + @SuppressWarnings("unchecked") + public GridCacheLockImpl(String name, + GridCacheInternalKey key, + IgniteInternalCache<GridCacheInternalKey, GridCacheLockState> lockView, + GridCacheContext ctx) { + assert name != null; + assert key != null; + assert ctx != null; + assert lockView != null; + + this.name = name; + this.key = key; + this.lockView = lockView; + this.ctx = ctx; + + log = ctx.logger(getClass()); + } + + /** + * @throws IgniteCheckedException If operation failed. + */ + private void initializeReentrantLock() throws IgniteCheckedException { + if (initGuard.compareAndSet(false, true)) { + try { + sync = CU.outTx( + retryTopologySafe(new Callable<Sync>() { + @Override public Sync call() throws Exception { + try (IgniteInternalTx tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheLockState val = lockView.get(key); + + if (val == null) { + if (log.isDebugEnabled()) + log.debug("Failed to find reentrant lock with given name: " + name); + + return null; + } + + tx.rollback(); + + return new Sync(val); + } + } + }), + ctx + ); + + if (log.isDebugEnabled()) + log.debug("Initialized internal sync structure: " + sync); + } + finally { + initLatch.countDown(); + } + } + else { + U.await(initLatch); + + if (sync == null) + throw new IgniteCheckedException("Internal reentrant lock has not been properly initialized."); + } + } + + /** {@inheritDoc} */ + @Override public void onUpdate(GridCacheLockState val) { + // Called only on initialization, so it's safe to ignore update. + if (sync == null) + return; + + updateLock.lock(); + + try { + // If this update is a result of unsuccessful acquire in fair mode, no local update should be done. + if (!val.isChanged()) + return; + + // Check if update came from this node. + boolean local = sync.isLockedLocally(val.getId()); + + // Process any incoming signals. + boolean incomingSignals = sync.checkIncomingSignals(val); + + // Update permission count. + sync.setPermits(val.get()); + + // Update owner's node id. + sync.setCurrentOwnerNode(val.getId()); + + // Update owner's thread id. + sync.setCurrentOwnerThread(val.getThreadId()); + + // Check if any threads waiting on this node need to be notified. + if ((incomingSignals || sync.getPermits() == 0) && !local) { + // Try to notify any waiting threads. + sync.release(0); + } + + } finally{ + updateLock.unlock(); + } + } + + /** {@inheritDoc} */ + @Override public void onNodeRemoved(UUID nodeId) { + updateLock.lock(); + + try { + if (nodeId.equals(sync.getOwnerNode())) { + sync.setBroken(true); + + if (!sync.failoverSafe) { + sync.interruptAll(); + } + } + + // Try to notify any waiting threads. + sync.release(0); + } + finally { + updateLock.unlock(); + } + } + + /** {@inheritDoc} */ + @Override public void onStop() { + if (sync == null) { + interruptAll = true; + + return; + } + + sync.setBroken(true); + + sync.interruptAll(); + + // Try to notify any waiting threads. + sync.release(0); + } + + /** {@inheritDoc} */ + @Override public String name() { + return name; + } + + /** {@inheritDoc} */ + @Override public void lock() { + ctx.kernalContext().gateway().readLock(); + + try{ + initializeReentrantLock(); + + sync.lock(); + + sync.validate(false); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + ctx.kernalContext().gateway().readUnlock(); + } + } + + /** {@inheritDoc} */ + @Override public void lockInterruptibly() throws IgniteInterruptedException { + ctx.kernalContext().gateway().readLock(); + + try { + initializeReentrantLock(); + + sync.acquireInterruptibly(1); + + sync.validate(true); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + catch (InterruptedException e) { + if (sync.fair) + sync.synchronizeQueue(true, Thread.currentThread()); + + throw new IgniteInterruptedException(e); + } + finally { + ctx.kernalContext().gateway().readUnlock(); + } + } + + /** {@inheritDoc} */ + @Override public boolean tryLock() { + ctx.kernalContext().gateway().readLock(); + + try{ + initializeReentrantLock(); + + boolean result = sync.tryAcquire(1, false); + + sync.validate(false); + + return result; + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + ctx.kernalContext().gateway().readUnlock(); + } + } + + /** {@inheritDoc} */ + @Override public boolean tryLock(long timeout, TimeUnit unit) throws IgniteInterruptedException { + ctx.kernalContext().gateway().readLock(); + + try{ + initializeReentrantLock(); + + boolean result = sync.tryAcquireNanos(1, unit.toNanos(timeout)); + + sync.validate(true); + + return result; + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + catch (InterruptedException e) { + if (sync.fair) + sync.synchronizeQueue(true, Thread.currentThread()); + + throw new IgniteInterruptedException(e); + } + finally { + ctx.kernalContext().gateway().readUnlock(); + } + } + + /** {@inheritDoc} */ + @Override public void unlock() { + ctx.kernalContext().gateway().readLock(); + + try{ + initializeReentrantLock(); + + // Validate before release. + sync.validate(false); + + sync.release(1); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + ctx.kernalContext().gateway().readUnlock(); + } + } + + @NotNull @Override public Condition newCondition() { + throw new UnsupportedOperationException("IgniteLock does not allow creation of nameless conditions. "); + } + + /** {@inheritDoc} */ + @Override public IgniteCondition getOrCreateCondition(String name) { + ctx.kernalContext().gateway().readLock(); + + try{ + initializeReentrantLock(); + + IgniteCondition result = sync.newCondition(name); + + sync.validate(false); + + return result; + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + ctx.kernalContext().gateway().readUnlock(); + } + } + + /** {@inheritDoc} */ + @Override public int getHoldCount() { + try{ + initializeReentrantLock(); + + return sync.getHoldCount(); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public boolean isHeldByCurrentThread() { + try{ + initializeReentrantLock(); + + return sync.isHeldExclusively(); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public boolean isLocked() { + try{ + initializeReentrantLock(); + + return sync.isLocked(); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public boolean hasQueuedThreads() { + try{ + initializeReentrantLock(); + + return sync.hasQueuedThreads(); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public boolean hasQueuedThread(Thread thread) { + try{ + initializeReentrantLock(); + + return sync.isQueued(thread); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public boolean hasWaiters(IgniteCondition condition) { + try{ + initializeReentrantLock(); + + AbstractQueuedSynchronizer.ConditionObject c = sync.conditionMap.get(condition.name()); + + if (c == null) + throw new IllegalArgumentException(); + + return sync.hasWaiters(c); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public int getWaitQueueLength(IgniteCondition condition) { + try{ + initializeReentrantLock(); + + AbstractQueuedSynchronizer.ConditionObject c = sync.conditionMap.get(condition.name()); + + if (c == null) + throw new IllegalArgumentException(); + + return sync.getWaitQueueLength(c); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + @Override public boolean isFailoverSafe() { + try{ + initializeReentrantLock(); + + return sync.failoverSafe; + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + @Override public boolean isFair() { + try{ + initializeReentrantLock(); + + return sync.fair; + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public boolean isBroken() { + try{ + initializeReentrantLock(); + + return sync.isBroken(); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public GridCacheInternalKey key() { + return key; + } + + /** {@inheritDoc} */ + @Override public boolean removed() { + return rmvd; + } + + /** {@inheritDoc} */ + @Override public boolean onRemoved() { + return rmvd = true; + } + + /** {@inheritDoc} */ + @Override public void needCheckNotRemoved() { + + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeUTF(name); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + stash.set(in.readUTF()); + } + + /** + * Reconstructs object on unmarshalling. + * + * @return Reconstructed object. + * @throws ObjectStreamException Thrown in case of unmarshalling error. + */ + private Object readResolve() throws ObjectStreamException { + String name = stash.get(); + + assert name != null; + + try { + IgniteLock lock = IgnitionEx.localIgnite().context().dataStructures().reentrantLock( + name, + false, + false, + false); + + if (lock == null) + throw new IllegalStateException("Lock was not found on deserialization: " + name); + + return lock; + } + catch (IgniteCheckedException e) { + throw U.withCause(new InvalidObjectException(e.getMessage()), e); + } + finally { + stash.remove(); + } + } + + /** {@inheritDoc} */ + @Override public void close() { + if (!rmvd) { + try { + boolean force = sync != null && (sync.isBroken() && !sync.failoverSafe); + + ctx.kernalContext().dataStructures().removeReentrantLock(name, force); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridCacheLockImpl.class, this); + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f8bf93a7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockState.java new file mode 100644 index 0000000..3feb9bf --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockState.java @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastructures; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.internal.processors.cache.GridCacheInternal; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * Grid cache reentrant lock state. + */ +public final class GridCacheLockState implements GridCacheInternal, Externalizable, Cloneable { + /** */ + private static final long serialVersionUID = 0L; + + /** Count. */ + private int cnt; + + /** Owner thread local ID. */ + private long threadId; + + /** Owner node ID. */ + private UUID id; + + /** FailoverSafe flag. */ + private boolean failoverSafe; + + /** Map containing state for each condition object associated with this lock. */ + @GridToStringInclude + private Map<String, LinkedList<UUID>> conditionMap; + + /** Map containing unprocessed signals for condition objects that are associated with this lock. */ + @GridToStringInclude + private Map<UUID, LinkedList<String>> signals; + + /** Flag indicating lock fairness. */ + private boolean fair; + + /** Queue containing nodes that are waiting to acquire this lock, used to ensure fairness. */ + @GridToStringInclude + private LinkedList<UUID> nodes; + + /** + * Flag indicating that global state changed. + * Used in fair mode to ensure that only successful acquires and releases trigger update. + */ + private boolean changed; + + /** + * Constructor. + * + * @param cnt Initial count. + * @param id UUID of owning node. + * @param threadID ID of the current thread. + * @param failoverSafe true if created in failoverSafe mode. + * @param fair true if created in fair mode. + */ + public GridCacheLockState(int cnt, UUID id, long threadID, boolean failoverSafe, boolean fair) { + assert cnt >= 0; + + this.id = id; + + this.threadId = threadID; + + conditionMap = new HashMap(); + + signals = null; + + nodes = new LinkedList<UUID>(); + + this.fair = fair; + + this.failoverSafe = failoverSafe; + } + + /** + * Empty constructor required for {@link Externalizable}. + */ + public GridCacheLockState() { + // No-op. + } + + /** + * @param cnt New count. + */ + public void set(int cnt) { + this.cnt = cnt; + } + + /** + * @return Current count. + */ + public int get() { + return cnt; + } + + /** + * @return Current owner thread ID. + */ + public long getThreadId() { + return threadId; + } + + /** + * @param threadId New thread owner ID. + */ + public void setThreadId(long threadId) { + this.threadId = threadId; + } + + /** + * @return Current owner node ID. + */ + public UUID getId() { + return id; + } + + /** + * @return New owner node ID. + */ + public void setId(UUID id) { + this.id = id; + } + + /** + * @return Failover safe flag. + */ + public boolean isFailoverSafe() { + return failoverSafe; + } + + /** + * @return Condition count. + */ + public int condtionCount(){ + return conditionMap.size(); + } + + /** + * @return Condition map. + */ + public Map<String, LinkedList<UUID>> getConditionMap() { + return conditionMap; + } + + /** + * @param conditionMap Condition map. + */ + public void setConditionMap(Map<String, LinkedList<UUID>> conditionMap) { + this.conditionMap = conditionMap; + } + + /** + * @return Signals. + */ + public Map<UUID, LinkedList<String>> getSignals() { + return signals; + } + + /** + * @param signals Signals. + */ + public void setSignals(Map<UUID, LinkedList<String>> signals) { + this.signals = signals; + } + + /** + * @return Nodes. + */ + public LinkedList<UUID> getNodes() { + return nodes; + } + + /** + * @param nodes Nodes. + */ + public void setNodes(LinkedList<UUID> nodes) { + this.nodes = nodes; + } + + /** + * @return Fair flag. + */ + public boolean isFair() { + return fair; + } + + /** + * @return Changed flag. + */ + public boolean isChanged() { + return changed; + } + + /** + * @param changed Changed flag. + */ + public void setChanged(boolean changed) { + this.changed = changed; + } + + /** {@inheritDoc} */ + @Override public Object clone() throws CloneNotSupportedException { + return super.clone(); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeInt(cnt); + out.writeLong(threadId); + U.writeUuid(out, id); + + out.writeBoolean(failoverSafe); + + out.writeBoolean(fair); + + out.writeBoolean(changed); + + out.writeBoolean(conditionMap != null); + + if (conditionMap != null) { + out.writeInt(conditionMap.size()); + + for (Map.Entry<String, LinkedList<UUID>> e : conditionMap.entrySet()) { + U.writeString(out, e.getKey()); + + out.writeInt(e.getValue().size()); + + for (UUID uuid:e.getValue()) + U.writeUuid(out, uuid); + } + } + + out.writeBoolean(signals != null); + + if (signals != null) { + out.writeInt(signals.size()); + + for (Map.Entry<UUID, LinkedList<String>> e : signals.entrySet()) { + U.writeUuid(out, e.getKey()); + + out.writeInt(e.getValue().size()); + + for (String condition:e.getValue()) + U.writeString(out, condition); + } + } + + out.writeBoolean(nodes != null); + + if (nodes != null) { + out.writeInt(nodes.size()); + + for (UUID uuid: nodes) + U.writeUuid(out, uuid); + } + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException { + cnt = in.readInt(); + threadId = in.readLong(); + id = U.readUuid(in); + + failoverSafe = in.readBoolean(); + + fair = in.readBoolean(); + + changed = in.readBoolean(); + + if (in.readBoolean()) { + int size = in.readInt(); + + conditionMap = U.newLinkedHashMap(size); + + for (int i = 0; i < size; i++) { + String key = U.readString(in); + + int size1 = in.readInt(); + + LinkedList<UUID> list = new LinkedList(); + + for (int j = 0; j < size1; j++) + list.add(U.readUuid(in)); + + conditionMap.put(key, list); + } + } + + if (in.readBoolean()) { + assert (conditionMap != null); + + int size = in.readInt(); + + signals = U.newLinkedHashMap(size); + + for (int i = 0; i < size; i++) { + UUID node = U.readUuid(in); + + int size1 = in.readInt(); + + LinkedList<String> list = new LinkedList(); + + for (int j = 0; j < size1; j++) + list.add(U.readString(in)); + + signals.put(node, list); + } + } + else + signals = null; + + if (in.readBoolean()) { + int size = in.readInt(); + + nodes = new LinkedList(); + + for (int i = 0; i < size; i++) + nodes.add(U.readUuid(in)); + } + else + nodes = null; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridCacheLockState.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f8bf93a7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java index c365f9d..8f196be 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java @@ -19,8 +19,10 @@ package org.apache.ignite.internal.processors.datastructures; import java.io.Externalizable; import java.io.IOException; +import java.io.InvalidObjectException; import java.io.ObjectInput; import java.io.ObjectOutput; +import java.io.ObjectStreamException; import java.util.Map; import java.util.UUID; import java.util.concurrent.Callable; @@ -32,7 +34,9 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteSemaphore; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgnitionEx; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; @@ -885,6 +889,35 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter t.set2(in.readUTF()); } + /** + * Reconstructs object on unmarshalling. + * + * @return Reconstructed object. + * @throws ObjectStreamException Thrown in case of unmarshalling error. + */ + private Object readResolve() throws ObjectStreamException { + try { + IgniteBiTuple<GridKernalContext, String> t = stash.get(); + + IgniteSemaphore sem = IgnitionEx.localIgnite().context().dataStructures().semaphore( + t.get2(), + 0, + false, + false); + + if (sem == null) + throw new IllegalStateException("Semaphore was not found on deserialization: " + t.get2()); + + return sem; + } + catch (IgniteCheckedException e) { + throw U.withCause(new InvalidObjectException(e.getMessage()), e); + } + finally { + stash.remove(); + } + } + /** {@inheritDoc} */ @Override public void close() { if (!rmvd) { http://git-wip-us.apache.org/repos/asf/ignite/blob/f8bf93a7/modules/core/src/main/resources/META-INF/classnames.properties ---------------------------------------------------------------------- diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties index 034c314..9ae74a7 100644 --- a/modules/core/src/main/resources/META-INF/classnames.properties +++ b/modules/core/src/main/resources/META-INF/classnames.properties @@ -1037,6 +1037,8 @@ org.apache.ignite.internal.processors.datastructures.GridCacheQueueItemKey org.apache.ignite.internal.processors.datastructures.GridCacheQueueProxy org.apache.ignite.internal.processors.datastructures.GridCacheSemaphoreImpl org.apache.ignite.internal.processors.datastructures.GridCacheSemaphoreState +org.apache.ignite.internal.processors.datastructures.GridCacheLockImpl +org.apache.ignite.internal.processors.datastructures.GridCacheLockState org.apache.ignite.internal.processors.datastructures.GridCacheSetHeader org.apache.ignite.internal.processors.datastructures.GridCacheSetHeaderKey org.apache.ignite.internal.processors.datastructures.GridCacheSetImpl$CollocatedItemKey http://git-wip-us.apache.org/repos/asf/ignite/blob/f8bf93a7/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java index 4653ce9..4622fff 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java @@ -28,6 +28,7 @@ import org.apache.ignite.IgniteAtomicSequence; import org.apache.ignite.IgniteAtomicStamped; import org.apache.ignite.IgniteClientDisconnectedException; import org.apache.ignite.IgniteCountDownLatch; +import org.apache.ignite.IgniteLock; import org.apache.ignite.IgniteSemaphore; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse; import org.apache.ignite.testframework.GridTestUtils; @@ -774,4 +775,61 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr assertFalse(srvSemaphore.tryAcquire()); assertFalse(srvSemaphore.tryAcquire()); } + + /** + * @throws Exception If failed. + */ + public void testReentrantLockReconnect() throws Exception { + testReentrantLockReconnect(false); + + testReentrantLockReconnect(true); + } + + private void testReentrantLockReconnect(final boolean fair) throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + IgniteLock clientLock = client.reentrantLock("lock1", true, fair, true); + + assertEquals(false, clientLock.isLocked()); + + final IgniteLock srvLock = srv.reentrantLock("lock1", true, fair, true); + + assertEquals(false, srvLock.isLocked()); + + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + srvLock.lock(); + } + }); + + assertTrue(srvLock.isLocked()); + assertTrue(clientLock.isLocked()); + + assertEquals(1, srvLock.getHoldCount()); + + srvLock.lock(); + + assertTrue(srvLock.isLocked()); + assertTrue(clientLock.isLocked()); + + assertEquals(2, srvLock.getHoldCount()); + + srvLock.unlock(); + + assertTrue(srvLock.isLocked()); + assertTrue(clientLock.isLocked()); + + assertEquals(1, srvLock.getHoldCount()); + + srvLock.unlock(); + + assertFalse(srvLock.isLocked()); + assertFalse(clientLock.isLocked()); + + assertEquals(0, srvLock.getHoldCount()); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/f8bf93a7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java index 74023e9..6ba65ab 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java @@ -33,9 +33,11 @@ import org.apache.ignite.IgniteAtomicLong; import org.apache.ignite.IgniteAtomicReference; import org.apache.ignite.IgniteAtomicSequence; import org.apache.ignite.IgniteAtomicStamped; +import org.apache.ignite.IgniteCompute; import org.apache.ignite.IgniteCountDownLatch; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteInterruptedException; +import org.apache.ignite.IgniteLock; import org.apache.ignite.IgniteQueue; import org.apache.ignite.IgniteSemaphore; import org.apache.ignite.cache.CacheMode; @@ -75,10 +77,10 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig private static final String TRANSACTIONAL_CACHE_NAME = "tx_cache"; /** */ - private static final int TOP_CHANGE_CNT = 5; + private static final int TOP_CHANGE_CNT = 2; /** */ - private static final int TOP_CHANGE_THREAD_CNT = 3; + private static final int TOP_CHANGE_THREAD_CNT = 2; /** */ private boolean client; @@ -586,6 +588,208 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig /** * @throws Exception If failed. */ + public void testReentrantLockFailsWhenServersLeft() throws Exception { + testReentrantLockFailsWhenServersLeft(false); + } + + /** + * @throws Exception If failed. + */ + public void testFairReentrantLockFailsWhenServersLeft() throws Exception { + testReentrantLockFailsWhenServersLeft(true); + } + + /** + * @throws Exception If failed. + */ + public void testReentrantLockFailsWhenServersLeft(final boolean fair) throws Exception { + client = true; + + Ignite client = startGrid(gridCount()); + + Ignite server = grid(0); + + // Initialize lock. + IgniteLock srvLock = server.reentrantLock("lock", true, fair, true); + + IgniteSemaphore semaphore = server.semaphore("sync", 0, true, true); + + IgniteCompute compute = client.compute().withAsync(); + + compute.apply(new IgniteClosure<Ignite, Object>() { + @Override public Object apply(Ignite ignite) { + final IgniteLock l = ignite.reentrantLock("lock", true, fair, true); + + l.lock(); + + assertTrue(l.isHeldByCurrentThread()); + + l.unlock(); + + assertFalse(l.isHeldByCurrentThread()); + + // Signal the server to go down. + ignite.semaphore("sync", 0, true, true).release(); + + boolean isExceptionThrown = false; + + try { + // Wait for the server to go down. + Thread.sleep(1000); + + l.lock(); + + fail("Exception must be thrown."); + } + catch (InterruptedException e) { + fail("Interrupted exception not expected here."); + } + catch (IgniteException e) { + isExceptionThrown = true; + } + finally { + assertTrue(isExceptionThrown); + + assertFalse(l.isHeldByCurrentThread()); + } + return null; + } + }, client); + + // Wait for the lock on client to be acquired then released. + semaphore.acquire(); + + for (int i = 0; i < gridCount(); i++) + stopGrid(i); + + compute.future().get(); + + client.close(); + } + + /** + * @throws Exception If failed. + */ + public void testReentrantLockConstantTopologyChangeFailoverSafe() throws Exception { + doTestReentrantLock(new ConstantTopologyChangeWorker(TOP_CHANGE_THREAD_CNT), true, false); + } + + /** + * @throws Exception If failed. + */ + public void testReentrantLockConstantMultipleTopologyChangeFailoverSafe() throws Exception { + doTestReentrantLock(multipleTopologyChangeWorker(TOP_CHANGE_THREAD_CNT), true, false); + } + + /** + * @throws Exception If failed. + */ + public void testReentrantLockConstantTopologyChangeNonFailoverSafe() throws Exception { + doTestReentrantLock(new ConstantTopologyChangeWorker(TOP_CHANGE_THREAD_CNT), false, false); + } + + /** + * @throws Exception If failed. + */ + public void testReentrantLockConstantMultipleTopologyChangeNonFailoverSafe() throws Exception { + doTestReentrantLock(multipleTopologyChangeWorker(TOP_CHANGE_THREAD_CNT), false, false); + } + + /** + * @throws Exception If failed. + */ + public void testFairReentrantLockConstantTopologyChangeFailoverSafe() throws Exception { + doTestReentrantLock(new ConstantTopologyChangeWorker(TOP_CHANGE_THREAD_CNT), true, true); + } + + /** + * @throws Exception If failed. + */ + public void testFairReentrantLockConstantMultipleTopologyChangeFailoverSafe() throws Exception { + doTestReentrantLock(multipleTopologyChangeWorker(TOP_CHANGE_THREAD_CNT), true, true); + } + + /** + * @throws Exception If failed. + */ + public void testFairReentrantLockConstantTopologyChangeNonFailoverSafe() throws Exception { + doTestReentrantLock(new ConstantTopologyChangeWorker(TOP_CHANGE_THREAD_CNT), false, true); + } + + /** + * @throws Exception If failed. + */ + public void testFairReentrantLockConstantMultipleTopologyChangeNonFailoverSafe() throws Exception { + doTestReentrantLock(multipleTopologyChangeWorker(TOP_CHANGE_THREAD_CNT), false, true); + } + + /** + * @throws Exception If failed. + */ + private void doTestReentrantLock(ConstantTopologyChangeWorker topWorker, final boolean failoverSafe, final boolean fair) throws Exception { + try (IgniteLock lock = grid(0).reentrantLock(STRUCTURE_NAME, failoverSafe, fair, true)) { + IgniteInternalFuture<?> fut = topWorker.startChangingTopology(new IgniteClosure<Ignite, Object>() { + @Override public Object apply(Ignite ignite) { + final IgniteLock l = ignite.reentrantLock(STRUCTURE_NAME, failoverSafe, fair, false); + + final AtomicBoolean done = new AtomicBoolean(false); + + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + try{ + l.lock(); + } + finally { + done.set(true); + } + + return null; + } + }); + + // Wait until l.lock() has been called. + while(!l.hasQueuedThreads() && !done.get()){ + // No-op. + } + + return null; + } + }); + + while (!fut.isDone()) { + while (true) { + try { + lock.lock(); + } + catch (IgniteException e) { + // Exception may happen in non-failoversafe mode. + if (failoverSafe) + throw e; + } + finally { + // Broken lock cannot be used in non-failoversafe mode. + if(!lock.isBroken() || failoverSafe) { + assertTrue(lock.isHeldByCurrentThread()); + + lock.unlock(); + + assertFalse(lock.isHeldByCurrentThread()); + } + break; + } + } + } + + fut.get(); + + for (Ignite g : G.allGrids()) + assertFalse(g.reentrantLock(STRUCTURE_NAME, failoverSafe, fair, false).isHeldByCurrentThread()); + } + } + + /** + * @throws Exception If failed. + */ public void testCountDownLatchConstantTopologyChange() throws Exception { doTestCountDownLatch(new ConstantTopologyChangeWorker(TOP_CHANGE_THREAD_CNT)); } http://git-wip-us.apache.org/repos/asf/ignite/blob/f8bf93a7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java index 34e7080..5929d42 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java @@ -18,11 +18,13 @@ package org.apache.ignite.internal.processors.cache.datastructures; import java.util.concurrent.Callable; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteAtomicLong; import org.apache.ignite.IgniteAtomicSequence; import org.apache.ignite.IgniteCountDownLatch; +import org.apache.ignite.IgniteLock; import org.apache.ignite.IgniteQueue; import org.apache.ignite.IgniteSemaphore; import org.apache.ignite.IgniteSet; @@ -327,6 +329,74 @@ public abstract class IgniteClientDataStructuresAbstractTest extends GridCommonA /** * @throws Exception If failed. */ + public void testReentrantLock() throws Exception { + Ignite clientNode = clientIgnite(); + Ignite srvNode = serverNode(); + + testReentrantLock(clientNode, srvNode); + testReentrantLock(srvNode, clientNode); + } + + /** + * @param creator Creator node. + * @param other Other node. + * @throws Exception If failed. + */ + private void testReentrantLock(Ignite creator, final Ignite other) throws Exception { + assertNull(creator.reentrantLock("lock1", true, false, false)); + assertNull(other.reentrantLock("lock1", true, false, false)); + + try (IgniteLock lock = creator.reentrantLock("lock1", true, false, true)) { + assertNotNull(lock); + + assertFalse(lock.isLocked()); + + final Semaphore semaphore = new Semaphore(0); + + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + IgniteLock lock0 = other.reentrantLock("lock1", true, false, false); + + lock0.lock(); + + assertTrue(lock0.isLocked()); + + semaphore.release(); + + U.sleep(1000); + + log.info("Release reentrant lock."); + + lock0.unlock(); + + return null; + } + }); + + semaphore.acquire(); + + log.info("Try acquire lock."); + + assertTrue(lock.tryLock(5000, TimeUnit.MILLISECONDS)); + + log.info("Finished wait."); + + fut.get(); + + assertTrue(lock.isLocked()); + + lock.unlock(); + + assertFalse(lock.isLocked()); + } + + assertNull(creator.reentrantLock("lock1", true, false, false)); + assertNull(other.reentrantLock("lock1", true, false, false)); + } + + /** + * @throws Exception If failed. + */ public void testQueue() throws Exception { Ignite clientNode = clientIgnite(); Ignite srvNode = serverNode(); http://git-wip-us.apache.org/repos/asf/ignite/blob/f8bf93a7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java index e88c97b..a8e0095 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java @@ -31,6 +31,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteCountDownLatch; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteQueue; +import org.apache.ignite.IgniteLock; import org.apache.ignite.IgniteSemaphore; import org.apache.ignite.IgniteSet; import org.apache.ignite.cache.CacheAtomicityMode; @@ -241,7 +242,7 @@ public class IgniteDataStructureUniqueNameTest extends IgniteCollectionAbstractT private void testUniqueName(final boolean singleGrid) throws Exception { final String name = IgniteUuid.randomUuid().toString(); - final int DS_TYPES = 8; + final int DS_TYPES = 9; final int THREADS = DS_TYPES * 3; @@ -322,6 +323,13 @@ public class IgniteDataStructureUniqueNameTest extends IgniteCollectionAbstractT res = ignite.semaphore(name, 0, false, true); break; + + case 8: + log.info("Create atomic reentrant lock, grid: " + ignite.name()); + + res = ignite.reentrantLock(name, true, true, true); + + break; default: fail(); @@ -361,7 +369,8 @@ public class IgniteDataStructureUniqueNameTest extends IgniteCollectionAbstractT res instanceof IgniteCountDownLatch || res instanceof IgniteQueue || res instanceof IgniteSet || - res instanceof IgniteSemaphore); + res instanceof IgniteSemaphore || + res instanceof IgniteLock); log.info("Data structure created: " + dataStructure);
