http://git-wip-us.apache.org/repos/asf/ignite/blob/b4e49bde/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheReentrantLockImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheReentrantLockImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheReentrantLockImpl.java new file mode 100644 index 0000000..40df2bd --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheReentrantLockImpl.java @@ -0,0 +1,1150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastructures; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.AbstractQueuedSynchronizer; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteCondition; +import org.apache.ignite.IgniteInterruptedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.IgniteInternalCache; +import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiTuple; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.internal.processors.cache.GridCacheUtils.retryTopologySafe; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; + +/** + * Cache reentrant lock implementation based on AbstractQueuedSynchronizer. + */ +public final class GridCacheReentrantLockImpl implements GridCacheReentrantLockEx, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Deserialization stash. */ + private static final ThreadLocal<IgniteBiTuple<GridKernalContext, String>> stash = + new ThreadLocal<IgniteBiTuple<GridKernalContext, String>>() { + @Override protected IgniteBiTuple<GridKernalContext, String> initialValue() { + return F.t2(); + } + }; + + /** Logger. */ + private IgniteLogger log; + + /** Reentrant lock name. */ + private String name; + + /** Removed flag. */ + private volatile boolean rmvd; + + /** Reentrant lock key. */ + private GridCacheInternalKey key; + + /** Reentrant lock projection. */ + private IgniteInternalCache<GridCacheInternalKey, GridCacheReentrantLockState> lockView; + + /** Cache context. */ + private GridCacheContext ctx; + + /** Initialization guard. */ + private final AtomicBoolean initGuard = new AtomicBoolean(); + + /** Initialization latch. */ + private final CountDownLatch initLatch = new CountDownLatch(1); + + /** Lock that provides non-overlapping processing of updates. */ + private ReentrantLock updateLock = new ReentrantLock(); + + /** Internal synchronization object. */ + private Sync sync; + + /** + * Empty constructor required by {@link Externalizable}. + */ + public GridCacheReentrantLockImpl() { + // No-op. + } + + /** + * Synchronization implementation for reentrant lock using AbstractQueuedSynchronizer. + */ + class Sync extends AbstractQueuedSynchronizer { + private static final long serialVersionUID = 1192457210091910933L; + + private static final long LOCK_FREE = 0; + + /** Map containing condition objects. */ + private Map<String, ConditionObject> conditionMap; + + /** List of condition signal calls on this node. */ + private Map<String, Integer> outgoingSignals; + + /** Last condition waited on. */ + @Nullable + private volatile String lastCondition; + + /** Failed nodes. */ + private Set<UUID> failedNodes = Collections.newSetFromMap(new ConcurrentHashMap<>()); + + /** True if any node owning the lock had failed. */ + private volatile boolean isBroken = false; + + /** UUID of the node that currently owns the lock. */ + private volatile UUID currentOwnerNode; + + /** ID of the thread that currently owns the lock. */ + private volatile long currentOwnerThreadId; + + /** UUID of this node. */ + private final UUID thisNode; + + /** FailoverSafe flag. */ + private final boolean failoverSafe; + + /** Flag indicating that every operation on this lock should be interrupted. */ + private boolean interruptAll = false; + + protected Sync(GridCacheReentrantLockState state) { + setState(state.get()); + + thisNode = ctx.localNodeId(); + + currentOwnerNode = state.getId(); + + currentOwnerThreadId = state.getThreadId(); + + conditionMap = new HashMap<>(); + + outgoingSignals = new HashMap<>(); + + failoverSafe = state.isFailoverSafe(); + } + + /** */ + protected void addOutgoingSignal(String condition) { + int cnt = 0; + if (outgoingSignals.containsKey(condition)) { + cnt = outgoingSignals.get(condition); + + // SignalAll has already been called. + if (cnt == 0) + return; + } + + outgoingSignals.put(condition, cnt + 1); + } + + protected void addOutgoingSignalAll(String condition) { + outgoingSignals.put(condition, 0); + } + + /** Process any condition await calls on this node. */ + private String processAwait() { + if(lastCondition == null) + return null; + + String ret = lastCondition; + + lastCondition = null; + + return ret; + } + + /** */ + private Map<String, Integer> processSignal(){ + Map<String,Integer> ret = new HashMap<>(outgoingSignals); + + outgoingSignals.clear(); + + return ret; + } + + /** */ + private void registerFailedNode(UUID failedNode){ + failedNodes.add(failedNode); + } + + /** */ + private void clearFailedNodes(Set<UUID> nodes){ + failedNodes.retainAll(nodes); + } + + /** Interrupt every thread on this node waiting on this lock. */ + private void interruptAll(){ + interruptAll = true; + } + + /** Check if lock is in correct state (i.e. not broken in non-failoversafe mode), + * if not throw {@linkplain IgniteInterruptedException} */ + private void validate(){ + if(interruptAll){ + throw new IgniteInterruptedException("Lock broken in non-failoversafe mode."); + } + } + + /** + * Sets the number of permits currently acquired on this lock. This method should only be used in {@linkplain + * GridCacheReentrantLockImpl#onUpdate(GridCacheReentrantLockState)}. + * + * @param permits Number of permits acquired at this reentrant lock. + */ + final synchronized void setPermits(int permits) { + setState(permits); + } + + /** + * Gets the number of permissions currently acquired at this lock. + * + * @return Number of permits acquired at this reentrant lock. + */ + final int getPermits() { + return getState(); + } + + /** + * Sets the UUID of the node that currently owns this lock. This method should only be used in {@linkplain + * GridCacheReentrantLockImpl#onUpdate(GridCacheReentrantLockState)}. + * + * @param ownerNode UUID of the node owning this lock. + */ + final synchronized void setCurrentOwnerNode(UUID ownerNode) { + currentOwnerNode = ownerNode; + } + + /** + * Gets the UUID of the node that currently owns the lock. + * + * @return UUID of the node that currently owns the lock. + */ + final UUID getOwnerNode() { + return currentOwnerNode; + } + + /** + * Checks if latest call to acquire/release was called on this node. + * Should only be called from update method. + * + * @param newOwnerID ID of the node that is about to acquire this lock (or null). + * @return true if acquire/release that triggered last update came from this node. + */ + protected boolean isLockedLocally(UUID newOwnerID){ + if(thisNode.equals(getOwnerNode()) || thisNode.equals(newOwnerID)){ + return true; + } + + return false; + } + + protected void setCurrentOwnerThread(long newOwnerThreadId){ + this.currentOwnerThreadId = newOwnerThreadId; + } + + /** + * Returns true if node that owned the locked failed before call to unlock. + * + * @return true if any node failed while owning the lock. + */ + protected boolean isBroken() { + return isBroken; + } + + /** */ + protected void setBroken(boolean isBroken) { + this.isBroken = isBroken; + } + + /** + * Performs non-fair tryLock. + */ + final boolean nonfairTryAcquire(int acquires) { + // If broken in non-failoversafe mode, exit immediately. + if(interruptAll){ + return true; + } + + final Thread current = Thread.currentThread(); + + final UUID currentOwner = this.currentOwnerNode; + + int c = getState(); + + if(c == 0 || failedNodes.contains(currentOwner)){ + if (compareAndSetGlobalState(0, acquires, current.getId())){ + + // Not used for synchronization (we use ThreadID), but updated anyway. + setExclusiveOwnerThread(current); + + while(getState() != acquires) + Thread.yield(); + + return true; + } + } + else if (isHeldExclusively()) { + int nextc = c + acquires; + + if (nextc < 0) // overflow + throw new Error("Maximum lock count exceeded"); + + setState(nextc); + + return true; + } + + return false; + } + + /** + * Performs lock. + */ + final void lock() { + acquire(1); + } + + /** {@inheritDoc} */ + protected final boolean tryAcquire(int acquires) { + return nonfairTryAcquire(acquires); + } + + /** {@inheritDoc} */ + protected final boolean tryRelease(int releases) { + // This method is called with release==0 only when trying to wake through update, + // to check if some other node released the lock. + if(releases == 0) { + return true; + } + + // If broken in non-failoversafe mode, exit immediately. + if(interruptAll) + return true; + + int c = getState() - releases; + + if (!isHeldExclusively()) { + throw new IllegalMonitorStateException(); + } + + boolean free = false; + + if (c == 0){ + free = true; + + setGlobalState(0, processAwait(), processSignal()); + + while(getState() != 0) + Thread.yield(); + } + else + setState(c); + + return free; + } + + + /** {@inheritDoc} */ + protected final boolean isHeldExclusively() { + // While we must in general read state before owner, + // we don't need to do so to check if current thread is owner + + return this.currentOwnerThreadId == Thread.currentThread().getId() && thisNode.equals(currentOwnerNode); + } + + /** {@inheritDoc} */ + final synchronized IgniteCondition newCondition(String name) { + if(conditionMap.containsKey(name)) + return new IgniteConditionObject(name, conditionMap.get(name)); + + ConditionObject cond = new ConditionObject(); + + conditionMap.put(name, cond); + + return new IgniteConditionObject(name, cond); + } + + // Methods relayed from outer class + + final int getHoldCount() { + return isHeldExclusively() ? getState() : 0; + } + + final boolean isLocked() throws IgniteCheckedException { + return getState() != 0 || lockView.get(key).get() != 0; + } + + /** + * This method is used for synchronizing the reentrant lock state across all nodes. + */ + protected boolean compareAndSetGlobalState(final int expVal, final int newVal, long newThreadID) { + try { + return CU.outTx( + retryTopologySafe(new Callable<Boolean>() { + @Override public Boolean call() throws Exception { + try (IgniteInternalTx tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) { + + GridCacheReentrantLockState val = lockView.get(key); + + if (val == null) + throw new IgniteCheckedException("Failed to find reentrant lock with given name: " + name); + + if (val.get() == expVal || sync.failedNodes.contains(val.getId())) { + val.set(newVal); + + val.setId(thisNode); + + val.setThreadId(newThreadID); + + val.getNodes().add(thisNode); + + lockView.put(key, val); + + tx.commit(); + + return true; + } + + return false; + } + catch (Error | Exception e) { + U.error(log, "Failed to compare and set: " + this, e); + + throw e; + } + } + }), + ctx + ); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** + * Sets the global state across all nodes after releasing the reentrant lock. + * + * @param newVal New state. + * @param lastCondition Id of the condition await is called. + * @param outgoingSignals Map containing signal calls on this node since the last acquisition of the lock. + */ + protected boolean setGlobalState(final int newVal, @Nullable final String lastCondition, final Map<String, Integer> outgoingSignals) { + try { + return CU.outTx( + retryTopologySafe(new Callable<Boolean>() { + @Override public Boolean call() throws Exception { + try (IgniteInternalTx tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheReentrantLockState val = lockView.get(key); + + if (val == null) + throw new IgniteCheckedException("Failed to find reentrant lock with given name: " + name); + + val.set(newVal); + + val.setId(null); + + val.setThreadId(LOCK_FREE); + + // Get global condition queue. + Map<String, LinkedList<UUID>> condMap = val.getConditionMap(); + + // Check if any nodes in the global waiting queue have failed and remove them. + if(!sync.failedNodes.isEmpty()){ + val.getNodes().removeAll(sync.failedNodes); + } + + // Create map containing signals from this node. + Map<UUID, LinkedList<String>> signalMap = new HashMap<UUID, LinkedList<String>>(); + + // Put any signal calls on this node to global state. + if (!outgoingSignals.isEmpty()) { + for (String condition : outgoingSignals.keySet()) { + int cnt = outgoingSignals.get(condition); + + // Get queue for this condition. + List<UUID> list = condMap.get(condition); + + if (list != null && !list.isEmpty()) { + // Check if signalAll was called. + if (cnt == 0) { + cnt = list.size(); + } + + // Remove from global condition queue. + for (int i = 0; i < cnt; i++) { + if(list.isEmpty()) + break; + + UUID uuid = list.remove(0); + + // Skip if node to be released is not alive anymore. + if(!val.getNodes().contains(uuid)){ + cnt++; + + continue; + } + + LinkedList<String> queue = signalMap.get(uuid); + + if (queue == null) { + queue = new LinkedList<String>(); + + signalMap.put(uuid, queue); + } + + queue.add(condition); + } + } + } + } + + val.setSignals(signalMap); + + // Check if this release is called after condition.await() call; + // If true, add this node to the global waiting queue. + if (lastCondition != null) { + LinkedList<UUID> queue; + + if (!condMap.containsKey(lastCondition)) { + // New condition object. + queue = new LinkedList(); + } + else { + // Existing condition object. + queue = condMap.get(lastCondition); + } + + queue.add(thisNode); + + condMap.put(lastCondition, queue); + } + + val.setConditionMap(condMap); + + lockView.put(key, val); + + tx.commit(); + + return true; + } + catch (Error | Exception e) { + U.error(log, "Failed to compare and set: " + this, e); + + throw e; + } + } + }), + ctx + ); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + protected synchronized boolean checkIncomingSignals(GridCacheReentrantLockState state){ + if(state.getSignals() == null) + return false; + + LinkedList<String> signals = state.getSignals().get(thisNode); + + if(signals == null || signals.isEmpty()) + return false; + + UUID tempUUID = getOwnerNode(); + + Thread tempThread = getExclusiveOwnerThread(); + + long tempThreadID = currentOwnerThreadId; + + // Temporarily allow current thread to signal condition object. + // This is safe to do because: + // 1. if release was called on this node, + // it was called from currently active thread; + // 2. if release came from a thread on any other node, + // all threads on this node are already blocked. + this.setCurrentOwnerNode(thisNode); + + this.setExclusiveOwnerThread(Thread.currentThread()); + + this.currentOwnerThreadId = Thread.currentThread().getId(); + + for(String signal: signals){ + conditionMap.get(signal).signal(); + } + + // Restore owner node and owner thread. + this.setCurrentOwnerNode(tempUUID); + + this.setExclusiveOwnerThread(tempThread); + + this.currentOwnerThreadId = tempThreadID; + + return true; + } + + /** + * + * + * */ + public class IgniteConditionObject implements IgniteCondition { + + private final String name; + + private final AbstractQueuedSynchronizer.ConditionObject object; + + protected IgniteConditionObject(String name, ConditionObject object){ + this.name = name; + + this.object = object; + } + + @Override public String name() { + return name; + } + + /** {@inheritDoc} */ + @Override public void await() throws IgniteInterruptedException { + if(!isHeldExclusively()) + throw new IllegalMonitorStateException(); + + lastCondition = this.name; + + try { + object.await(); + } + catch (InterruptedException e) { + throw new IgniteInterruptedException(e); + } + finally { + sync.validate(); + } + } + + /** {@inheritDoc} */ + @Override public void awaitUninterruptibly() { + if(!isHeldExclusively()) + throw new IllegalMonitorStateException(); + + lastCondition = this.name; + + object.awaitUninterruptibly(); + + } + + /** {@inheritDoc} */ + @Override public long awaitNanos(long nanosTimeout) throws IgniteInterruptedException { + if(!isHeldExclusively()) + throw new IllegalMonitorStateException(); + + lastCondition = this.name; + + try { + return object.awaitNanos(nanosTimeout); + } + catch (InterruptedException e) { + throw new IgniteInterruptedException(e); + } + finally { + sync.validate(); + } + } + + /** {@inheritDoc} */ + @Override public boolean await(long time, TimeUnit unit) throws IgniteInterruptedException { + if(!isHeldExclusively()) + throw new IllegalMonitorStateException(); + + lastCondition = this.name; + + try { + return object.await(time, unit); + } + catch (InterruptedException e) { + throw new IgniteInterruptedException(e); + } + finally { + sync.validate(); + } + } + + /** {@inheritDoc} */ + @Override public boolean awaitUntil(Date deadline) throws IgniteInterruptedException { + if(!isHeldExclusively()) + throw new IllegalMonitorStateException(); + + lastCondition = this.name; + + try { + return object.awaitUntil(deadline); + } + catch (InterruptedException e) { + throw new IgniteInterruptedException(e); + } + finally { + sync.validate(); + } + } + + /** {@inheritDoc} */ + @Override public void signal() { + if(!isHeldExclusively() || interruptAll) + throw new IllegalMonitorStateException(); + + addOutgoingSignal(name); + } + + /** {@inheritDoc} */ + @Override public void signalAll() { + if(!isHeldExclusively() || interruptAll) + throw new IllegalMonitorStateException(); + + addOutgoingSignalAll(name); + } + } + } + + /** + * Constructor. + * + * @param name Reentrant lock name. + * @param key Reentrant lock key. + * @param lockView Reentrant lock projection. + * @param ctx Cache context. + */ + public GridCacheReentrantLockImpl(String name, + GridCacheInternalKey key, + IgniteInternalCache<GridCacheInternalKey, GridCacheReentrantLockState> lockView, + GridCacheContext ctx) { + assert name != null; + assert key != null; + assert ctx != null; + assert lockView != null; + + this.name = name; + this.key = key; + this.lockView = lockView; + this.ctx = ctx; + + log = ctx.logger(getClass()); + } + + /** + * @throws IgniteCheckedException If operation failed. + */ + private void initializeReentrantLock() throws IgniteCheckedException { + if (initGuard.compareAndSet(false, true)) { + try { + sync = CU.outTx( + retryTopologySafe(new Callable<Sync>() { + @Override public Sync call() throws Exception { + try (IgniteInternalTx tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheReentrantLockState val = lockView.get(key); + + if (val == null) { + if (log.isDebugEnabled()) + log.debug("Failed to find reentrant lock with given name: " + name); + + return null; + } + + tx.rollback(); + + return new Sync(val); + } + } + }), + ctx + ); + + if (log.isDebugEnabled()) + log.debug("Initialized internal sync structure: " + sync); + } + finally { + initLatch.countDown(); + } + } + else { + U.await(initLatch); + + if (sync == null) + throw new IgniteCheckedException("Internal reentrant lock has not been properly initialized."); + } + } + + /** {@inheritDoc} */ + @Override public void onUpdate(GridCacheReentrantLockState val) { + // Called only on initialization, so it's safe to ignore update. + if (sync == null) + return; + + try { + updateLock.lock(); + + // Check if update came from this node. + boolean local = sync.isLockedLocally(val.getId()); + + // Process any incoming signals. + boolean incomingSignals = sync.checkIncomingSignals(val); + + // Update owner's node id. + sync.setCurrentOwnerNode(val.getId()); + + // Update owner's thread id. + sync.setCurrentOwnerThread(val.getThreadId()); + + // Update permission count. + sync.setPermits(val.get()); + + // Remove all processed failed nodes. + sync.clearFailedNodes(val.getNodes()); + + // Check if any threads waiting on this node need to be notified. + if ((incomingSignals || sync.getPermits() == 0) && !local) { + // Try to notify any waiting threads. + sync.release(0); + } + + } finally{ + updateLock.unlock(); + } + } + + /** {@inheritDoc} */ + @Override public void onNodeRemoved(UUID nodeId) { + try { + updateLock.lock(); + + sync.registerFailedNode(nodeId); + + if (nodeId.equals(sync.getOwnerNode())){ + sync.setBroken(true); + + if(!sync.failoverSafe){ + sync.interruptAll(); + } + + // Try to notify any waiting threads. + sync.release(0); + } + } finally{ + updateLock.unlock(); + } + } + + /** {@inheritDoc} */ + @Override public String name() { + return name; + } + + /** {@inheritDoc} */ + @Override public void lock() { + try{ + initializeReentrantLock(); + + sync.lock(); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + sync.validate(); + } + } + + /** {@inheritDoc} */ + @Override public void lockInterruptibly() throws IgniteInterruptedException { + try { + initializeReentrantLock(); + + sync.acquireInterruptibly(1); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + catch (InterruptedException e) { + throw new IgniteInterruptedException(e); + } + finally { + sync.validate(); + } + } + + /** {@inheritDoc} */ + @Override public boolean tryLock() { + try{ + initializeReentrantLock(); + + return sync.nonfairTryAcquire(1); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + sync.validate(); + } + } + + /** {@inheritDoc} */ + @Override public boolean tryLock(long timeout, TimeUnit unit) throws IgniteInterruptedException { + try{ + initializeReentrantLock(); + + return sync.tryAcquireNanos(1, unit.toNanos(timeout)); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + catch (InterruptedException e){ + throw new IgniteInterruptedException(e); + } + finally { + sync.validate(); + } + } + + /** {@inheritDoc} */ + @Override public void unlock() { + try{ + initializeReentrantLock(); + + sync.release(1); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + sync.validate(); + } + } + + /** {@inheritDoc} */ + @Override public IgniteCondition newCondition(String name) { + try{ + initializeReentrantLock(); + + return sync.newCondition(name); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + sync.validate(); + } + } + + /** {@inheritDoc} */ + @Override public int getHoldCount() { + try{ + initializeReentrantLock(); + + return sync.getHoldCount(); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + sync.validate(); + } + } + + /** {@inheritDoc} */ + @Override public boolean isHeldByCurrentThread() { + if(sync == null) + return false; + + return sync.isHeldExclusively(); + } + + /** {@inheritDoc} */ + @Override public boolean isLocked() { + try{ + if(sync != null && !sync.failoverSafe && sync.isBroken()) + return false; + + initializeReentrantLock(); + + return sync.isLocked(); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public boolean hasQueuedThreads() { + if(sync != null ) + return false; + + return sync.hasQueuedThreads(); + } + + /** {@inheritDoc} */ + @Override public boolean hasQueuedThread(Thread thread) { + try{ + initializeReentrantLock(); + + return sync.isQueued(thread); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public int getQueueLength() { + try{ + initializeReentrantLock(); + + return sync.getQueueLength(); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public boolean hasWaiters(IgniteCondition condition) { + try{ + initializeReentrantLock(); + + AbstractQueuedSynchronizer.ConditionObject c = sync.conditionMap.get(condition.name()); + + if(c == null) + throw new IllegalArgumentException(); + + return sync.hasWaiters(c); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + sync.validate(); + } + } + + /** {@inheritDoc} */ + @Override public int getWaitQueueLength(IgniteCondition condition) { + try{ + initializeReentrantLock(); + + AbstractQueuedSynchronizer.ConditionObject c = sync.conditionMap.get(condition.name()); + + if(c == null) + throw new IllegalArgumentException(); + + return sync.getWaitQueueLength(c); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + sync.validate(); + } + } + + @Override public boolean isFailoverSafe() { + return sync.failoverSafe; + } + + /** {@inheritDoc} */ + @Override public boolean isBroken() { + if(sync == null) + return false; + + return sync.isBroken(); + } + + /** {@inheritDoc} */ + @Override public GridCacheInternalKey key() { + return key; + } + + /** {@inheritDoc} */ + @Override public boolean removed() { + return rmvd; + } + + /** {@inheritDoc} */ + @Override public boolean onRemoved() { + return rmvd = true; + } + + /** {@inheritDoc} */ + @Override public void needCheckNotRemoved() { + + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(ctx.kernalContext()); + out.writeUTF(name); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + IgniteBiTuple<GridKernalContext, String> t = stash.get(); + + t.set1((GridKernalContext)in.readObject()); + t.set2(in.readUTF()); + } + + /** {@inheritDoc} */ + @Override public void close() { + if (!rmvd) { + try { + boolean force = sync != null ? sync.isBroken() && !sync.failoverSafe : false; + + ctx.kernalContext().dataStructures().removeReentrantLock(name, force); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridCacheReentrantLockImpl.class, this); + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b4e49bde/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheReentrantLockState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheReentrantLockState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheReentrantLockState.java new file mode 100644 index 0000000..e9af65f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheReentrantLockState.java @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastructures; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.LinkedList; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import org.apache.ignite.internal.processors.cache.GridCacheInternal; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * Grid cache reentrant lock state. + */ +public final class GridCacheReentrantLockState implements GridCacheInternal, Externalizable, Cloneable { + /** */ + private static final long serialVersionUID = 0L; + + /** Count. */ + private int cnt; + + /** Owner thread local id. */ + private long threadId; + + /** Owner node ID. */ + private UUID id; + + /** FailoverSafe flag. */ + private boolean failoverSafe; + + /** Map containing state for each condition object associated with this lock. */ + @GridToStringInclude + private Map<String, LinkedList<UUID>> conditionMap; + + /** Map containing unprocessed signals for condition objects that are associated with this lock. */ + @GridToStringInclude + private Map<UUID, LinkedList<String>> signals; + + /** Set containing nodes that are using this lock. */ + @GridToStringInclude + private Set<UUID> nodes; + + /** + * Constructor. + * + * @param cnt Initial count. + * @param id UUID of owning node. + */ + public GridCacheReentrantLockState(int cnt, UUID id, long threadID, boolean failoverSafe) { + assert cnt >= 0; + + this.id = id; + + this.threadId = threadID; + + conditionMap = new HashMap(); + + signals = null; + + nodes = new LinkedHashSet<>(); + + nodes.add(id); + + this.failoverSafe = failoverSafe; + } + + /** + * Empty constructor required for {@link Externalizable}. + */ + public GridCacheReentrantLockState() { + // No-op. + } + + /** + * @param cnt New count. + */ + public void set(int cnt) { + this.cnt = cnt; + } + + /** + * @return Current count. + */ + public int get() { + return cnt; + } + + /** + * @return Current owner thread id. + */ + public long getThreadId() { + return threadId; + } + + /** + * @param threadId New thread owner id. + */ + public void setThreadId(long threadId) { + this.threadId = threadId; + } + + /** + * @return Current owner node id. + */ + public UUID getId() { + return id; + } + + /** + * @return New owner node id. + */ + public void setId(UUID id) { + this.id = id; + } + + public boolean isFailoverSafe() { + return failoverSafe; + } + + public int condtionCount(){ + return conditionMap.size(); + } + + public Map<String, LinkedList<UUID>> getConditionMap() { + return conditionMap; + } + + public void setConditionMap(Map<String, LinkedList<UUID>> conditionMap) { + this.conditionMap = conditionMap; + } + + public Map<UUID, LinkedList<String>> getSignals() { + return signals; + } + + public void setSignals(Map<UUID, LinkedList<String>> signals) { + this.signals = signals; + } + + public Set<UUID> getNodes() { + return nodes; + } + + public void setNodes(Set<UUID> nodes) { + this.nodes = nodes; + } + + /** {@inheritDoc} */ + @Override public Object clone() throws CloneNotSupportedException { + return super.clone(); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeInt(cnt); + out.writeLong(threadId); + U.writeUuid(out, id); + + out.writeBoolean(failoverSafe); + + out.writeBoolean(conditionMap != null); + + if (conditionMap != null) { + out.writeInt(conditionMap.size()); + + for (Map.Entry<String, LinkedList<UUID>> e : conditionMap.entrySet()) { + U.writeString(out, e.getKey()); + + out.writeInt(e.getValue().size()); + + for(UUID uuid:e.getValue()){ + U.writeUuid(out,uuid); + } + } + } + + out.writeBoolean(signals != null); + + if (signals != null) { + out.writeInt(signals.size()); + + for (Map.Entry<UUID, LinkedList<String>> e : signals.entrySet()) { + U.writeUuid(out, e.getKey()); + + out.writeInt(e.getValue().size()); + + for(String condition:e.getValue()){ + U.writeString(out, condition); + } + } + } + + out.writeBoolean(nodes != null); + + if (nodes != null) { + out.writeInt(nodes.size()); + + for (UUID uuid: nodes) { + U.writeUuid(out, uuid); + } + } + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException { + cnt = in.readInt(); + threadId = in.readLong(); + id = U.readUuid(in); + + failoverSafe = in.readBoolean(); + + if (in.readBoolean()) { + int size = in.readInt(); + + conditionMap = U.newLinkedHashMap(size); + + for (int i = 0; i < size; i++) { + String key = U.readString(in); + + int size1 = in.readInt(); + + LinkedList<UUID> list = new LinkedList(); + + for (int j = 0; j < size1; j++) { + list.add(U.readUuid(in)); + } + + conditionMap.put(key, list); + } + } + + if(in.readBoolean()) { + assert (conditionMap != null); + + int size = in.readInt(); + + signals = U.newLinkedHashMap(size); + + for (int i = 0; i < size; i++) { + UUID node = U.readUuid(in); + + int size1 = in.readInt(); + + LinkedList<String> list = new LinkedList(); + + for (int j = 0; j < size1; j++) { + list.add(U.readString(in)); + } + + signals.put(node, list); + } + } + else{ + signals = null; + } + + if (in.readBoolean()) { + int size = in.readInt(); + + nodes = U.newLinkedHashSet(size); + + for (int i = 0; i < size; i++) { + nodes.add(U.readUuid(in)); + } + } + else{ + nodes = null; + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridCacheReentrantLockState.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b4e49bde/modules/core/src/main/resources/META-INF/classnames.properties ---------------------------------------------------------------------- diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties index 8c3ad88..1d9d445 100644 --- a/modules/core/src/main/resources/META-INF/classnames.properties +++ b/modules/core/src/main/resources/META-INF/classnames.properties @@ -1009,6 +1009,8 @@ org.apache.ignite.internal.processors.datastructures.GridCacheQueueItemKey org.apache.ignite.internal.processors.datastructures.GridCacheQueueProxy org.apache.ignite.internal.processors.datastructures.GridCacheSemaphoreImpl org.apache.ignite.internal.processors.datastructures.GridCacheSemaphoreState +org.apache.ignite.internal.processors.datastructures.GridCacheReentrantLockImpl +org.apache.ignite.internal.processors.datastructures.GridCacheReentrantLockState org.apache.ignite.internal.processors.datastructures.GridCacheSetHeader org.apache.ignite.internal.processors.datastructures.GridCacheSetHeaderKey org.apache.ignite.internal.processors.datastructures.GridCacheSetImpl$CollocatedItemKey http://git-wip-us.apache.org/repos/asf/ignite/blob/b4e49bde/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java index 13cac81..af10c2a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal; import java.util.Collection; import java.util.Collections; import java.util.concurrent.Callable; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteAtomicLong; @@ -28,8 +29,10 @@ import org.apache.ignite.IgniteAtomicSequence; import org.apache.ignite.IgniteAtomicStamped; import org.apache.ignite.IgniteClientDisconnectedException; import org.apache.ignite.IgniteCountDownLatch; +import org.apache.ignite.IgniteReentrantLock; import org.apache.ignite.IgniteSemaphore; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.testframework.GridTestUtils; /** @@ -774,4 +777,55 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr assertFalse(srvSemaphore.tryAcquire()); assertFalse(srvSemaphore.tryAcquire()); } + + /** + * @throws Exception If failed. + */ + public void testReentrantLockReconnect() throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + IgniteReentrantLock clientLock = client.reentrantLock("lock1", true, true); + + assertEquals(false, clientLock.isLocked()); + + final IgniteReentrantLock srvLock = srv.reentrantLock("lock1", true, true); + + assertEquals(false, srvLock.isLocked()); + + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + srvLock.lock(); + } + }); + + assertTrue(srvLock.isLocked()); + assertTrue(clientLock.isLocked()); + + assertEquals(1, srvLock.getHoldCount()); + + srvLock.lock(); + + assertTrue(srvLock.isLocked()); + assertTrue(clientLock.isLocked()); + + assertEquals(2, srvLock.getHoldCount()); + + srvLock.unlock(); + + assertTrue(srvLock.isLocked()); + assertTrue(clientLock.isLocked()); + + assertEquals(1, srvLock.getHoldCount()); + + srvLock.unlock(); + + assertFalse(srvLock.isLocked()); + assertFalse(clientLock.isLocked()); + + assertEquals(0, srvLock.getHoldCount()); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b4e49bde/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java index e85468e..df0a146 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java @@ -37,6 +37,7 @@ import org.apache.ignite.IgniteCountDownLatch; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.IgniteQueue; +import org.apache.ignite.IgniteReentrantLock; import org.apache.ignite.IgniteSemaphore; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.configuration.AtomicConfiguration; @@ -75,10 +76,10 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig private static final String TRANSACTIONAL_CACHE_NAME = "tx_cache"; /** */ - private static final int TOP_CHANGE_CNT = 5; + private static final int TOP_CHANGE_CNT = 2; /** */ - private static final int TOP_CHANGE_THREAD_CNT = 3; + private static final int TOP_CHANGE_THREAD_CNT = 2; /** */ private boolean client; @@ -542,6 +543,86 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig /** * @throws Exception If failed. */ + public void testReentrantLockConstantTopologyChangeFailoverSafe() throws Exception { + doTestReentrantLock(new ConstantTopologyChangeWorker(TOP_CHANGE_THREAD_CNT), true); + } + + /** + * @throws Exception If failed. + */ + public void testReentrantLockConstantMultipleTopologyChangeFailoverSafe() throws Exception { + doTestReentrantLock(multipleTopologyChangeWorker(TOP_CHANGE_THREAD_CNT), true); + } + + /** + * @throws Exception If failed. + */ + public void testReentrantLockConstantTopologyChangeNonFailoverSafe() throws Exception { + doTestReentrantLock(new ConstantTopologyChangeWorker(TOP_CHANGE_THREAD_CNT), false); + } + + /** + * @throws Exception If failed. + */ + public void testReentrantLockConstantMultipleTopologyChangeNonFailoverSafe() throws Exception { + doTestReentrantLock(multipleTopologyChangeWorker(TOP_CHANGE_THREAD_CNT), false); + } + + /** + * @throws Exception If failed. + */ + private void doTestReentrantLock(ConstantTopologyChangeWorker topWorker, boolean failoverSafe) throws Exception { + try (IgniteReentrantLock lock = grid(0).reentrantLock(STRUCTURE_NAME, failoverSafe, true)) { + IgniteInternalFuture<?> fut = topWorker.startChangingTopology(new IgniteClosure<Ignite, Object>() { + @Override public Object apply(Ignite ignite) { + IgniteReentrantLock l = ignite.reentrantLock(STRUCTURE_NAME, failoverSafe, false); + + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + l.lock(); + + return null; + } + }); + + return null; + } + }); + + while (!fut.isDone()) { + while (true) { + try { + lock.lock(); + } + catch (IgniteInterruptedException e) { + // Exception may happen in non-failoversafe mode. + if (failoverSafe) + throw e; + } + finally { + // Broken lock cannot be used in non-failoversafe mode. + if(!lock.isBroken() || failoverSafe) { + assertTrue(lock.isHeldByCurrentThread()); + + lock.unlock(); + + assertFalse(lock.isHeldByCurrentThread()); + } + break; + } + } + } + + fut.get(); + + for (Ignite g : G.allGrids()) + assertFalse(g.reentrantLock(STRUCTURE_NAME, failoverSafe, false).isHeldByCurrentThread()); + } + } + + /** + * @throws Exception If failed. + */ public void testCountDownLatchConstantTopologyChange() throws Exception { doTestCountDownLatch(new ConstantTopologyChangeWorker(TOP_CHANGE_THREAD_CNT)); } http://git-wip-us.apache.org/repos/asf/ignite/blob/b4e49bde/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java index 34e7080..62b66b7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java @@ -18,12 +18,14 @@ package org.apache.ignite.internal.processors.cache.datastructures; import java.util.concurrent.Callable; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteAtomicLong; import org.apache.ignite.IgniteAtomicSequence; import org.apache.ignite.IgniteCountDownLatch; import org.apache.ignite.IgniteQueue; +import org.apache.ignite.IgniteReentrantLock; import org.apache.ignite.IgniteSemaphore; import org.apache.ignite.IgniteSet; import org.apache.ignite.configuration.CollectionConfiguration; @@ -327,6 +329,74 @@ public abstract class IgniteClientDataStructuresAbstractTest extends GridCommonA /** * @throws Exception If failed. */ + public void testReentrantLock() throws Exception { + Ignite clientNode = clientIgnite(); + Ignite srvNode = serverNode(); + + testReentrantLock(clientNode, srvNode); + testReentrantLock(srvNode, clientNode); + } + + /** + * @param creator Creator node. + * @param other Other node. + * @throws Exception If failed. + */ + private void testReentrantLock(Ignite creator, final Ignite other) throws Exception { + assertNull(creator.reentrantLock("lock1", true, false)); + assertNull(other.reentrantLock("lock1", true, false)); + + try (IgniteReentrantLock lock = creator.reentrantLock("lock1", true, true)) { + assertNotNull(lock); + + assertFalse(lock.isLocked()); + + Semaphore semaphore = new Semaphore(0); + + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + IgniteReentrantLock lock0 = other.reentrantLock("lock1", true, false); + + lock0.lock(); + + assertTrue(lock0.isLocked()); + + semaphore.release(); + + U.sleep(1000); + + log.info("Release reentrant lock."); + + lock0.unlock(); + + return null; + } + }); + + semaphore.acquire(); + + log.info("Try acquire lock."); + + assertTrue(lock.tryLock(5000, TimeUnit.MILLISECONDS)); + + log.info("Finished wait."); + + fut.get(); + + assertTrue(lock.isLocked()); + + lock.unlock(); + + assertFalse(lock.isLocked()); + } + + assertNull(creator.reentrantLock("lock1", true, false)); + assertNull(other.reentrantLock("lock1", true, false)); + } + + /** + * @throws Exception If failed. + */ public void testQueue() throws Exception { Ignite clientNode = clientIgnite(); Ignite srvNode = serverNode(); http://git-wip-us.apache.org/repos/asf/ignite/blob/b4e49bde/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java index 4a21765..b31ca8b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java @@ -30,6 +30,7 @@ import org.apache.ignite.IgniteAtomicStamped; import org.apache.ignite.IgniteCountDownLatch; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteQueue; +import org.apache.ignite.IgniteReentrantLock; import org.apache.ignite.IgniteSemaphore; import org.apache.ignite.IgniteSet; import org.apache.ignite.cache.CacheAtomicityMode; @@ -240,7 +241,7 @@ public class IgniteDataStructureUniqueNameTest extends IgniteCollectionAbstractT private void testUniqueName(final boolean singleGrid) throws Exception { final String name = IgniteUuid.randomUuid().toString(); - final int DS_TYPES = 8; + final int DS_TYPES = 9; final int THREADS = DS_TYPES * 3; @@ -321,6 +322,13 @@ public class IgniteDataStructureUniqueNameTest extends IgniteCollectionAbstractT res = ignite.semaphore(name, 0, false, true); break; + + case 8: + log.info("Create atomic reentrant lock, grid: " + ignite.name()); + + res = ignite.reentrantLock(name, true, true); + + break; default: fail(); @@ -360,7 +368,8 @@ public class IgniteDataStructureUniqueNameTest extends IgniteCollectionAbstractT res instanceof IgniteCountDownLatch || res instanceof IgniteQueue || res instanceof IgniteSet || - res instanceof IgniteSemaphore); + res instanceof IgniteSemaphore || + res instanceof IgniteReentrantLock); log.info("Data structure created: " + dataStructure); http://git-wip-us.apache.org/repos/asf/ignite/blob/b4e49bde/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteReentrantLockAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteReentrantLockAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteReentrantLockAbstractSelfTest.java new file mode 100644 index 0000000..6a441bb --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteReentrantLockAbstractSelfTest.java @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.datastructures; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Callable; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteCompute; +import org.apache.ignite.IgniteCondition; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteReentrantLock; +import org.apache.ignite.IgniteSemaphore; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.PA; +import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.resources.LoggerResource; +import org.apache.ignite.testframework.GridTestUtils; +import org.jetbrains.annotations.Nullable; +import org.junit.Rule; +import org.junit.rules.ExpectedException; + +import static java.util.concurrent.TimeUnit.MICROSECONDS; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.MINUTES; +import static org.apache.ignite.cache.CacheMode.LOCAL; + +/** + * Cache reentrant lock self test. + */ +public abstract class IgniteReentrantLockAbstractSelfTest extends IgniteAtomicsAbstractTest + implements Externalizable { + /** */ + private static final int NODES_CNT = 4; + + /** */ + protected static final int THREADS_CNT = 5; + + /** */ + private static final Random RND = new Random(); + + /** */ + @Rule + public final ExpectedException exception = ExpectedException.none(); + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return NODES_CNT; + } + + /** + * @throws Exception If failed. + */ + public void testReentrantLock() throws Exception { + checkReentrantLock(); + } + + /** + * @throws Exception If failed. + */ + public void testFailover() throws Exception { + if (atomicsCacheMode() == LOCAL) + return; + + checkFailover(true); + checkFailover(false); + } + + /** + * @param failoverSafe Failover safe flag. + * @throws Exception + */ + private void checkFailover(boolean failoverSafe) throws Exception { + IgniteEx g = startGrid(NODES_CNT + 1); + + // For vars locality. + { + // Ensure not exists. + assert g.semaphore("sem", 2, failoverSafe, false) == null; + + IgniteSemaphore sem = g.semaphore( + "sem", + 2, + failoverSafe, + true); + + sem.acquire(2); + + assert !sem.tryAcquire(); + assertEquals( + 0, + sem.availablePermits()); + } + + Ignite g0 = grid(0); + + final IgniteSemaphore sem0 = g0.semaphore( + "sem", + -10, + false, + false); + + assert !sem0.tryAcquire(); + assertEquals(0, sem0.availablePermits()); + + IgniteInternalFuture<?> fut = multithreadedAsync( + new Callable<Object>() { + @Override public Object call() throws Exception { + sem0.acquire(); + + info("Acquired in separate thread."); + + return null; + } + }, + 1); + + Thread.sleep(100); + + g.close(); + + try { + fut.get(500); + } + catch (IgniteCheckedException e) { + if (!failoverSafe && e.hasCause(InterruptedException.class)) + info("Ignored expected exception: " + e); + else + throw e; + } + + sem0.close(); + } + + /** + * @throws Exception If failed. + */ + private void checkReentrantLock() throws Exception { + // Test API. + checkLock(); + + checkFailoverSafe(); + + // Test main functionality. + IgniteReentrantLock lock1 = grid(0).reentrantLock("lock", true, true); + + assertFalse(lock1.isLocked()); + + lock1.lock(); + + IgniteCompute comp = grid(0).compute().withAsync(); + + comp.call(new IgniteCallable<Object>() { + @IgniteInstanceResource + private Ignite ignite; + + @LoggerResource + private IgniteLogger log; + + @Nullable @Override public Object call() throws Exception { + // Test reentrant lock in multiple threads on each node. + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync( + new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + IgniteReentrantLock lock = ignite.reentrantLock("lock", true, true); + + assert lock != null; + + log.info("Thread is going to wait on reentrant lock: " + Thread.currentThread().getName()); + + assert lock.tryLock(1, MINUTES); + + log.info("Thread is again runnable: " + Thread.currentThread().getName()); + + lock.unlock(); + + return null; + } + }, + 5, + "test-thread" + ); + + fut.get(); + + return null; + } + }); + + IgniteFuture<Object> fut = comp.future(); + + Thread.sleep(3000); + + assert lock1.isHeldByCurrentThread() == true; + + assert lock1.getHoldCount() == 1; + + lock1.lock(); + + assert lock1.isHeldByCurrentThread() == true; + + assert lock1.getHoldCount() == 2; + + lock1.unlock(); + + lock1.unlock(); + + // Ensure there are no hangs. + fut.get(); + + // Test operations on removed semaphore. + lock1.close(); + + checkRemovedReentrantLock(lock1); + } + + /** + * @param lock IgniteReentrantLock. + * @throws Exception If failed. + */ + protected void checkRemovedReentrantLock(final IgniteReentrantLock lock) throws Exception { + assert GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return lock.removed(); + } + }, 5000); + + assert lock.removed(); + } + + /** + * This method only checks if parameter of new reentrant lock is initialized properly. + * For tests considering failure recovery see @GridCachePartitionedNodeFailureSelfTest. + * + * @throws Exception Exception. + */ + private void checkFailoverSafe() throws Exception { + /* + // Checks only if semaphore is initialized properly + IgniteReentrantLock lock = createReentrantLock("rmv", true); + + assert lock.isFailoverSafe(); + + removeReentrantLock("rmv"); + + IgniteReentrantLock lock1 = createReentrantLock("rmv1", false); + + assert !lock1.isFailoverSafe(); + + removeReentrantLock("rmv1"); + */ + } + + /** + * @throws Exception Exception. + */ + private void checkLock() throws Exception { + // Check only 'false' cases here. Successful lock is tested over the grid. + final IgniteReentrantLock lock = createReentrantLock("acquire", false); + + lock.lock(); + + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + assertNotNull(lock); + + assert !lock.tryLock(); + + assert !lock.tryLock(10, MICROSECONDS); + + return null; + } + }); + + fut.get(); + + lock.unlock(); + + removeReentrantLock("acquire"); + } + + /** + * @param lockName Reentrant lock name. + * @param failoverSafe Fairness flag. + * @return New distributed reentrant lock. + * @throws Exception If failed. + */ + private IgniteReentrantLock createReentrantLock(String lockName, boolean failoverSafe) + throws Exception { + IgniteReentrantLock lock = grid(RND.nextInt(NODES_CNT)).reentrantLock(lockName, failoverSafe, true); + + // Test initialization. + assert lockName.equals(lock.name()); + assert lock.isLocked() == false; + assert lock.getQueueLength() == 0; + assert lock.isFailoverSafe() == failoverSafe; + + return lock; + } + + /** + * @param lockName Reentrant lock name. + * @throws Exception If failed. + */ + private void removeReentrantLock(String lockName) + throws Exception { + IgniteReentrantLock lock = grid(RND.nextInt(NODES_CNT)).reentrantLock(lockName, false, true); + + assert lock != null; + + // Remove lock on random node. + IgniteReentrantLock lock0 = grid(RND.nextInt(NODES_CNT)).reentrantLock(lockName, false, true); + + assertNotNull(lock0); + + lock0.close(); + + // Ensure reentrant lock is removed on all nodes. + for (Ignite g : G.allGrids()) + assertNull(((IgniteKernal)g).context().dataStructures().reentrantLock(lockName, false, false)); + + checkRemovedReentrantLock(lock); + } + + /** + * @throws Exception If failed. + */ + public void testReentrantLockMultinode1() throws Exception { + if (gridCount() == 1) + return; + + IgniteReentrantLock lock = grid(0).reentrantLock("s1", true, true); + + List<IgniteInternalFuture<?>> futs = new ArrayList<>(); + + for (int i = 0; i < gridCount(); i++) { + final Ignite ignite = grid(i); + + futs.add(GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + IgniteReentrantLock lock = ignite.reentrantLock("s1", true, false); + + assertNotNull(lock); + + IgniteCondition cond1 = lock.newCondition("c1"); + + IgniteCondition cond2 = lock.newCondition("c2"); + + try { + boolean wait = lock.tryLock(30_000, MILLISECONDS); + + assertTrue(wait); + + cond2.signal(); + + cond1.await(); + } + finally { + lock.unlock(); + } + + return null; + } + })); + } + + boolean done = false; + + while(!done) { + done = true; + + for (IgniteInternalFuture<?> fut : futs){ + if(!fut.isDone()) + done = false; + } + + try{ + lock.lock(); + + lock.newCondition("c1").signal(); + + lock.newCondition("c2").await(10,MILLISECONDS); + } + finally { + lock.unlock(); + } + } + + for (IgniteInternalFuture<?> fut : futs) + fut.get(30_000); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + // No-op. + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b4e49bde/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/local/IgniteLocalReentrantLockSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/local/IgniteLocalReentrantLockSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/local/IgniteLocalReentrantLockSelfTest.java new file mode 100644 index 0000000..215121e --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/local/IgniteLocalReentrantLockSelfTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.datastructures.local; + +import java.util.concurrent.Callable; +import org.apache.ignite.IgniteReentrantLock; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.cache.datastructures.IgniteReentrantLockAbstractSelfTest; +import org.apache.ignite.testframework.GridTestUtils; +import org.jetbrains.annotations.Nullable; + +import static java.util.concurrent.TimeUnit.MINUTES; +import static org.apache.ignite.cache.CacheMode.LOCAL; + +/** + * + */ +public class IgniteLocalReentrantLockSelfTest extends IgniteReentrantLockAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected CacheMode atomicsCacheMode() { + return LOCAL; + } + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override public void testReentrantLock() throws Exception { + // Test main functionality. + IgniteReentrantLock lock = grid(0).reentrantLock("lock", true, true); + + assertNotNull(lock); + + assertEquals(0, lock.getHoldCount()); + + lock.lock(); + + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync( + new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + IgniteReentrantLock lock = grid(0).reentrantLock("lock", true, true); + + assert lock != null; + + info("Thread is going to wait on lock: " + Thread.currentThread().getName()); + + assert lock.tryLock(1, MINUTES); + + info("Thread is again runnable: " + Thread.currentThread().getName()); + + lock.unlock(); + + return null; + } + }, + THREADS_CNT, + "test-thread" + ); + + Thread.sleep(3000); + + assert lock.isLocked(); + + assert lock.getHoldCount() == 1; + + lock.lock(); + + assert lock.isLocked(); + + assert lock.getHoldCount() == 2; + + lock.unlock(); + + assert lock.isLocked(); + + assert lock.getHoldCount() == 1; + + lock.unlock(); + + // Ensure there are no hangs. + fut.get(); + + // Test operations on removed lock. + IgniteReentrantLock lock0 = grid(0).reentrantLock("lock", true, false); + + assertNotNull(lock0); + + lock0.close(); + + checkRemovedReentrantLock(lock0); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b4e49bde/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedReentrantLockSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedReentrantLockSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedReentrantLockSelfTest.java new file mode 100644 index 0000000..4b8ad68 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedReentrantLockSelfTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.datastructures.partitioned; + +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.internal.processors.cache.datastructures.IgniteReentrantLockAbstractSelfTest; + +import static org.apache.ignite.cache.CacheMode.PARTITIONED; + +/** + * + */ +public class IgnitePartitionedReentrantLockSelfTest extends IgniteReentrantLockAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected CacheMode atomicsCacheMode() { + return PARTITIONED; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b4e49bde/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/IgniteReplicatedReentrantLockSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/IgniteReplicatedReentrantLockSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/IgniteReplicatedReentrantLockSelfTest.java new file mode 100644 index 0000000..697c4d6 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/IgniteReplicatedReentrantLockSelfTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.datastructures.replicated; + +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.internal.processors.cache.datastructures.IgniteReentrantLockAbstractSelfTest; + +import static org.apache.ignite.cache.CacheMode.REPLICATED; + +/** + * + */ +public class IgniteReplicatedReentrantLockSelfTest extends IgniteReentrantLockAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected CacheMode atomicsCacheMode() { + return REPLICATED; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b4e49bde/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheDataStructuresLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheDataStructuresLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheDataStructuresLoadTest.java index d4ca9a5..715a44c 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheDataStructuresLoadTest.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheDataStructuresLoadTest.java @@ -28,6 +28,7 @@ import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteCountDownLatch; import org.apache.ignite.IgniteQueue; +import org.apache.ignite.IgniteReentrantLock; import org.apache.ignite.IgniteSemaphore; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.Ignition; @@ -64,6 +65,9 @@ public final class GridCacheDataStructuresLoadTest extends GridCacheAbstractLoad /** Semaphore name. */ private static final String TEST_SEMAPHORE_NAME = "test-semaphore"; + /** Reentrant lock name. */ + private static final String TEST_REENTRANT_LOCK_NAME = "test-reentrant-lock"; + /** */ private static final CollectionConfiguration colCfg = new CollectionConfiguration(); @@ -98,6 +102,9 @@ public final class GridCacheDataStructuresLoadTest extends GridCacheAbstractLoad private static final boolean SEMAPHORE = true; /** */ + private static final boolean REENTRANTLOCK = true; + + /** */ private GridCacheDataStructuresLoadTest() { // No-op } @@ -347,6 +354,44 @@ public final class GridCacheDataStructuresLoadTest extends GridCacheAbstractLoad } }; + + /** Reentrant lock read closure. */ + private final CIX1<Ignite> reentrantLockReadClos = + new CIX1<Ignite>() { + @Override public void applyx(Ignite ignite) { + IgniteReentrantLock r = ignite.reentrantLock(TEST_REENTRANT_LOCK_NAME, true, true); + + for (int i = 0; i < operationsPerTx; i++) { + r.isLocked(); + + long cnt = reads.incrementAndGet(); + + if (cnt % READ_LOG_MOD == 0) + info("Performed " + cnt + " reads."); + } + } + }; + + /** Reentrant lock write closure. */ + private final CIX1<Ignite> reentrantLockWriteClos = + new CIX1<Ignite>() { + @Override public void applyx(Ignite ignite) { + IgniteReentrantLock r = ignite.reentrantLock(TEST_REENTRANT_LOCK_NAME, true, true); + + for (int i = 0; i < operationsPerTx; i++) { + if ((i % 2) == 0) + r.lock(); + else + r.unlock(); + + long cnt = writes.incrementAndGet(); + + if (cnt % WRITE_LOG_MOD == 0) + info("Performed " + cnt + " writes."); + } + } + }; + /** * @param args Arguments. * @throws IgniteCheckedException In case of error. @@ -417,6 +462,14 @@ public final class GridCacheDataStructuresLoadTest extends GridCacheAbstractLoad test.loadTestIgnite(test.semaphoreWriteClos, test.semaphoreReadClos); } + + System.gc(); + + if (REENTRANTLOCK) { + info("Testing reentrant lock..."); + + test.loadTestIgnite(test.reentrantLockWriteClos, test.reentrantLockReadClos); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b4e49bde/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java index c49c730..0ab3d45 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java @@ -30,6 +30,7 @@ import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCluster; import org.apache.ignite.IgniteCompute; import org.apache.ignite.IgniteCountDownLatch; +import org.apache.ignite.IgniteReentrantLock; import org.apache.ignite.IgniteSemaphore; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.IgniteEvents; @@ -367,6 +368,14 @@ public class IgniteMock implements Ignite { } /** {@inheritDoc} */ + @Nullable @Override public IgniteReentrantLock reentrantLock(String name, + boolean failoverSafe, + boolean create) + { + return null; + } + + /** {@inheritDoc} */ @Nullable @Override public <T> IgniteQueue<T> queue(String name, int cap, CollectionConfiguration cfg)
