http://git-wip-us.apache.org/repos/asf/ignite/blob/e386558a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheReentrantLockImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheReentrantLockImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheReentrantLockImpl.java deleted file mode 100644 index 40df2bd..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheReentrantLockImpl.java +++ /dev/null @@ -1,1150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.datastructures; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.AbstractQueuedSynchronizer; -import java.util.concurrent.locks.ReentrantLock; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteCondition; -import org.apache.ignite.IgniteInterruptedException; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.IgniteInternalCache; -import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.CU; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteBiTuple; -import org.jetbrains.annotations.Nullable; - -import static org.apache.ignite.internal.processors.cache.GridCacheUtils.retryTopologySafe; -import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; -import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; - -/** - * Cache reentrant lock implementation based on AbstractQueuedSynchronizer. - */ -public final class GridCacheReentrantLockImpl implements GridCacheReentrantLockEx, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Deserialization stash. */ - private static final ThreadLocal<IgniteBiTuple<GridKernalContext, String>> stash = - new ThreadLocal<IgniteBiTuple<GridKernalContext, String>>() { - @Override protected IgniteBiTuple<GridKernalContext, String> initialValue() { - return F.t2(); - } - }; - - /** Logger. */ - private IgniteLogger log; - - /** Reentrant lock name. */ - private String name; - - /** Removed flag. */ - private volatile boolean rmvd; - - /** Reentrant lock key. */ - private GridCacheInternalKey key; - - /** Reentrant lock projection. */ - private IgniteInternalCache<GridCacheInternalKey, GridCacheReentrantLockState> lockView; - - /** Cache context. */ - private GridCacheContext ctx; - - /** Initialization guard. */ - private final AtomicBoolean initGuard = new AtomicBoolean(); - - /** Initialization latch. */ - private final CountDownLatch initLatch = new CountDownLatch(1); - - /** Lock that provides non-overlapping processing of updates. */ - private ReentrantLock updateLock = new ReentrantLock(); - - /** Internal synchronization object. */ - private Sync sync; - - /** - * Empty constructor required by {@link Externalizable}. - */ - public GridCacheReentrantLockImpl() { - // No-op. - } - - /** - * Synchronization implementation for reentrant lock using AbstractQueuedSynchronizer. - */ - class Sync extends AbstractQueuedSynchronizer { - private static final long serialVersionUID = 1192457210091910933L; - - private static final long LOCK_FREE = 0; - - /** Map containing condition objects. */ - private Map<String, ConditionObject> conditionMap; - - /** List of condition signal calls on this node. */ - private Map<String, Integer> outgoingSignals; - - /** Last condition waited on. */ - @Nullable - private volatile String lastCondition; - - /** Failed nodes. */ - private Set<UUID> failedNodes = Collections.newSetFromMap(new ConcurrentHashMap<>()); - - /** True if any node owning the lock had failed. */ - private volatile boolean isBroken = false; - - /** UUID of the node that currently owns the lock. */ - private volatile UUID currentOwnerNode; - - /** ID of the thread that currently owns the lock. */ - private volatile long currentOwnerThreadId; - - /** UUID of this node. */ - private final UUID thisNode; - - /** FailoverSafe flag. */ - private final boolean failoverSafe; - - /** Flag indicating that every operation on this lock should be interrupted. */ - private boolean interruptAll = false; - - protected Sync(GridCacheReentrantLockState state) { - setState(state.get()); - - thisNode = ctx.localNodeId(); - - currentOwnerNode = state.getId(); - - currentOwnerThreadId = state.getThreadId(); - - conditionMap = new HashMap<>(); - - outgoingSignals = new HashMap<>(); - - failoverSafe = state.isFailoverSafe(); - } - - /** */ - protected void addOutgoingSignal(String condition) { - int cnt = 0; - if (outgoingSignals.containsKey(condition)) { - cnt = outgoingSignals.get(condition); - - // SignalAll has already been called. - if (cnt == 0) - return; - } - - outgoingSignals.put(condition, cnt + 1); - } - - protected void addOutgoingSignalAll(String condition) { - outgoingSignals.put(condition, 0); - } - - /** Process any condition await calls on this node. */ - private String processAwait() { - if(lastCondition == null) - return null; - - String ret = lastCondition; - - lastCondition = null; - - return ret; - } - - /** */ - private Map<String, Integer> processSignal(){ - Map<String,Integer> ret = new HashMap<>(outgoingSignals); - - outgoingSignals.clear(); - - return ret; - } - - /** */ - private void registerFailedNode(UUID failedNode){ - failedNodes.add(failedNode); - } - - /** */ - private void clearFailedNodes(Set<UUID> nodes){ - failedNodes.retainAll(nodes); - } - - /** Interrupt every thread on this node waiting on this lock. */ - private void interruptAll(){ - interruptAll = true; - } - - /** Check if lock is in correct state (i.e. not broken in non-failoversafe mode), - * if not throw {@linkplain IgniteInterruptedException} */ - private void validate(){ - if(interruptAll){ - throw new IgniteInterruptedException("Lock broken in non-failoversafe mode."); - } - } - - /** - * Sets the number of permits currently acquired on this lock. This method should only be used in {@linkplain - * GridCacheReentrantLockImpl#onUpdate(GridCacheReentrantLockState)}. - * - * @param permits Number of permits acquired at this reentrant lock. - */ - final synchronized void setPermits(int permits) { - setState(permits); - } - - /** - * Gets the number of permissions currently acquired at this lock. - * - * @return Number of permits acquired at this reentrant lock. - */ - final int getPermits() { - return getState(); - } - - /** - * Sets the UUID of the node that currently owns this lock. This method should only be used in {@linkplain - * GridCacheReentrantLockImpl#onUpdate(GridCacheReentrantLockState)}. - * - * @param ownerNode UUID of the node owning this lock. - */ - final synchronized void setCurrentOwnerNode(UUID ownerNode) { - currentOwnerNode = ownerNode; - } - - /** - * Gets the UUID of the node that currently owns the lock. - * - * @return UUID of the node that currently owns the lock. - */ - final UUID getOwnerNode() { - return currentOwnerNode; - } - - /** - * Checks if latest call to acquire/release was called on this node. - * Should only be called from update method. - * - * @param newOwnerID ID of the node that is about to acquire this lock (or null). - * @return true if acquire/release that triggered last update came from this node. - */ - protected boolean isLockedLocally(UUID newOwnerID){ - if(thisNode.equals(getOwnerNode()) || thisNode.equals(newOwnerID)){ - return true; - } - - return false; - } - - protected void setCurrentOwnerThread(long newOwnerThreadId){ - this.currentOwnerThreadId = newOwnerThreadId; - } - - /** - * Returns true if node that owned the locked failed before call to unlock. - * - * @return true if any node failed while owning the lock. - */ - protected boolean isBroken() { - return isBroken; - } - - /** */ - protected void setBroken(boolean isBroken) { - this.isBroken = isBroken; - } - - /** - * Performs non-fair tryLock. - */ - final boolean nonfairTryAcquire(int acquires) { - // If broken in non-failoversafe mode, exit immediately. - if(interruptAll){ - return true; - } - - final Thread current = Thread.currentThread(); - - final UUID currentOwner = this.currentOwnerNode; - - int c = getState(); - - if(c == 0 || failedNodes.contains(currentOwner)){ - if (compareAndSetGlobalState(0, acquires, current.getId())){ - - // Not used for synchronization (we use ThreadID), but updated anyway. - setExclusiveOwnerThread(current); - - while(getState() != acquires) - Thread.yield(); - - return true; - } - } - else if (isHeldExclusively()) { - int nextc = c + acquires; - - if (nextc < 0) // overflow - throw new Error("Maximum lock count exceeded"); - - setState(nextc); - - return true; - } - - return false; - } - - /** - * Performs lock. - */ - final void lock() { - acquire(1); - } - - /** {@inheritDoc} */ - protected final boolean tryAcquire(int acquires) { - return nonfairTryAcquire(acquires); - } - - /** {@inheritDoc} */ - protected final boolean tryRelease(int releases) { - // This method is called with release==0 only when trying to wake through update, - // to check if some other node released the lock. - if(releases == 0) { - return true; - } - - // If broken in non-failoversafe mode, exit immediately. - if(interruptAll) - return true; - - int c = getState() - releases; - - if (!isHeldExclusively()) { - throw new IllegalMonitorStateException(); - } - - boolean free = false; - - if (c == 0){ - free = true; - - setGlobalState(0, processAwait(), processSignal()); - - while(getState() != 0) - Thread.yield(); - } - else - setState(c); - - return free; - } - - - /** {@inheritDoc} */ - protected final boolean isHeldExclusively() { - // While we must in general read state before owner, - // we don't need to do so to check if current thread is owner - - return this.currentOwnerThreadId == Thread.currentThread().getId() && thisNode.equals(currentOwnerNode); - } - - /** {@inheritDoc} */ - final synchronized IgniteCondition newCondition(String name) { - if(conditionMap.containsKey(name)) - return new IgniteConditionObject(name, conditionMap.get(name)); - - ConditionObject cond = new ConditionObject(); - - conditionMap.put(name, cond); - - return new IgniteConditionObject(name, cond); - } - - // Methods relayed from outer class - - final int getHoldCount() { - return isHeldExclusively() ? getState() : 0; - } - - final boolean isLocked() throws IgniteCheckedException { - return getState() != 0 || lockView.get(key).get() != 0; - } - - /** - * This method is used for synchronizing the reentrant lock state across all nodes. - */ - protected boolean compareAndSetGlobalState(final int expVal, final int newVal, long newThreadID) { - try { - return CU.outTx( - retryTopologySafe(new Callable<Boolean>() { - @Override public Boolean call() throws Exception { - try (IgniteInternalTx tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) { - - GridCacheReentrantLockState val = lockView.get(key); - - if (val == null) - throw new IgniteCheckedException("Failed to find reentrant lock with given name: " + name); - - if (val.get() == expVal || sync.failedNodes.contains(val.getId())) { - val.set(newVal); - - val.setId(thisNode); - - val.setThreadId(newThreadID); - - val.getNodes().add(thisNode); - - lockView.put(key, val); - - tx.commit(); - - return true; - } - - return false; - } - catch (Error | Exception e) { - U.error(log, "Failed to compare and set: " + this, e); - - throw e; - } - } - }), - ctx - ); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - } - - /** - * Sets the global state across all nodes after releasing the reentrant lock. - * - * @param newVal New state. - * @param lastCondition Id of the condition await is called. - * @param outgoingSignals Map containing signal calls on this node since the last acquisition of the lock. - */ - protected boolean setGlobalState(final int newVal, @Nullable final String lastCondition, final Map<String, Integer> outgoingSignals) { - try { - return CU.outTx( - retryTopologySafe(new Callable<Boolean>() { - @Override public Boolean call() throws Exception { - try (IgniteInternalTx tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheReentrantLockState val = lockView.get(key); - - if (val == null) - throw new IgniteCheckedException("Failed to find reentrant lock with given name: " + name); - - val.set(newVal); - - val.setId(null); - - val.setThreadId(LOCK_FREE); - - // Get global condition queue. - Map<String, LinkedList<UUID>> condMap = val.getConditionMap(); - - // Check if any nodes in the global waiting queue have failed and remove them. - if(!sync.failedNodes.isEmpty()){ - val.getNodes().removeAll(sync.failedNodes); - } - - // Create map containing signals from this node. - Map<UUID, LinkedList<String>> signalMap = new HashMap<UUID, LinkedList<String>>(); - - // Put any signal calls on this node to global state. - if (!outgoingSignals.isEmpty()) { - for (String condition : outgoingSignals.keySet()) { - int cnt = outgoingSignals.get(condition); - - // Get queue for this condition. - List<UUID> list = condMap.get(condition); - - if (list != null && !list.isEmpty()) { - // Check if signalAll was called. - if (cnt == 0) { - cnt = list.size(); - } - - // Remove from global condition queue. - for (int i = 0; i < cnt; i++) { - if(list.isEmpty()) - break; - - UUID uuid = list.remove(0); - - // Skip if node to be released is not alive anymore. - if(!val.getNodes().contains(uuid)){ - cnt++; - - continue; - } - - LinkedList<String> queue = signalMap.get(uuid); - - if (queue == null) { - queue = new LinkedList<String>(); - - signalMap.put(uuid, queue); - } - - queue.add(condition); - } - } - } - } - - val.setSignals(signalMap); - - // Check if this release is called after condition.await() call; - // If true, add this node to the global waiting queue. - if (lastCondition != null) { - LinkedList<UUID> queue; - - if (!condMap.containsKey(lastCondition)) { - // New condition object. - queue = new LinkedList(); - } - else { - // Existing condition object. - queue = condMap.get(lastCondition); - } - - queue.add(thisNode); - - condMap.put(lastCondition, queue); - } - - val.setConditionMap(condMap); - - lockView.put(key, val); - - tx.commit(); - - return true; - } - catch (Error | Exception e) { - U.error(log, "Failed to compare and set: " + this, e); - - throw e; - } - } - }), - ctx - ); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - } - - protected synchronized boolean checkIncomingSignals(GridCacheReentrantLockState state){ - if(state.getSignals() == null) - return false; - - LinkedList<String> signals = state.getSignals().get(thisNode); - - if(signals == null || signals.isEmpty()) - return false; - - UUID tempUUID = getOwnerNode(); - - Thread tempThread = getExclusiveOwnerThread(); - - long tempThreadID = currentOwnerThreadId; - - // Temporarily allow current thread to signal condition object. - // This is safe to do because: - // 1. if release was called on this node, - // it was called from currently active thread; - // 2. if release came from a thread on any other node, - // all threads on this node are already blocked. - this.setCurrentOwnerNode(thisNode); - - this.setExclusiveOwnerThread(Thread.currentThread()); - - this.currentOwnerThreadId = Thread.currentThread().getId(); - - for(String signal: signals){ - conditionMap.get(signal).signal(); - } - - // Restore owner node and owner thread. - this.setCurrentOwnerNode(tempUUID); - - this.setExclusiveOwnerThread(tempThread); - - this.currentOwnerThreadId = tempThreadID; - - return true; - } - - /** - * - * - * */ - public class IgniteConditionObject implements IgniteCondition { - - private final String name; - - private final AbstractQueuedSynchronizer.ConditionObject object; - - protected IgniteConditionObject(String name, ConditionObject object){ - this.name = name; - - this.object = object; - } - - @Override public String name() { - return name; - } - - /** {@inheritDoc} */ - @Override public void await() throws IgniteInterruptedException { - if(!isHeldExclusively()) - throw new IllegalMonitorStateException(); - - lastCondition = this.name; - - try { - object.await(); - } - catch (InterruptedException e) { - throw new IgniteInterruptedException(e); - } - finally { - sync.validate(); - } - } - - /** {@inheritDoc} */ - @Override public void awaitUninterruptibly() { - if(!isHeldExclusively()) - throw new IllegalMonitorStateException(); - - lastCondition = this.name; - - object.awaitUninterruptibly(); - - } - - /** {@inheritDoc} */ - @Override public long awaitNanos(long nanosTimeout) throws IgniteInterruptedException { - if(!isHeldExclusively()) - throw new IllegalMonitorStateException(); - - lastCondition = this.name; - - try { - return object.awaitNanos(nanosTimeout); - } - catch (InterruptedException e) { - throw new IgniteInterruptedException(e); - } - finally { - sync.validate(); - } - } - - /** {@inheritDoc} */ - @Override public boolean await(long time, TimeUnit unit) throws IgniteInterruptedException { - if(!isHeldExclusively()) - throw new IllegalMonitorStateException(); - - lastCondition = this.name; - - try { - return object.await(time, unit); - } - catch (InterruptedException e) { - throw new IgniteInterruptedException(e); - } - finally { - sync.validate(); - } - } - - /** {@inheritDoc} */ - @Override public boolean awaitUntil(Date deadline) throws IgniteInterruptedException { - if(!isHeldExclusively()) - throw new IllegalMonitorStateException(); - - lastCondition = this.name; - - try { - return object.awaitUntil(deadline); - } - catch (InterruptedException e) { - throw new IgniteInterruptedException(e); - } - finally { - sync.validate(); - } - } - - /** {@inheritDoc} */ - @Override public void signal() { - if(!isHeldExclusively() || interruptAll) - throw new IllegalMonitorStateException(); - - addOutgoingSignal(name); - } - - /** {@inheritDoc} */ - @Override public void signalAll() { - if(!isHeldExclusively() || interruptAll) - throw new IllegalMonitorStateException(); - - addOutgoingSignalAll(name); - } - } - } - - /** - * Constructor. - * - * @param name Reentrant lock name. - * @param key Reentrant lock key. - * @param lockView Reentrant lock projection. - * @param ctx Cache context. - */ - public GridCacheReentrantLockImpl(String name, - GridCacheInternalKey key, - IgniteInternalCache<GridCacheInternalKey, GridCacheReentrantLockState> lockView, - GridCacheContext ctx) { - assert name != null; - assert key != null; - assert ctx != null; - assert lockView != null; - - this.name = name; - this.key = key; - this.lockView = lockView; - this.ctx = ctx; - - log = ctx.logger(getClass()); - } - - /** - * @throws IgniteCheckedException If operation failed. - */ - private void initializeReentrantLock() throws IgniteCheckedException { - if (initGuard.compareAndSet(false, true)) { - try { - sync = CU.outTx( - retryTopologySafe(new Callable<Sync>() { - @Override public Sync call() throws Exception { - try (IgniteInternalTx tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheReentrantLockState val = lockView.get(key); - - if (val == null) { - if (log.isDebugEnabled()) - log.debug("Failed to find reentrant lock with given name: " + name); - - return null; - } - - tx.rollback(); - - return new Sync(val); - } - } - }), - ctx - ); - - if (log.isDebugEnabled()) - log.debug("Initialized internal sync structure: " + sync); - } - finally { - initLatch.countDown(); - } - } - else { - U.await(initLatch); - - if (sync == null) - throw new IgniteCheckedException("Internal reentrant lock has not been properly initialized."); - } - } - - /** {@inheritDoc} */ - @Override public void onUpdate(GridCacheReentrantLockState val) { - // Called only on initialization, so it's safe to ignore update. - if (sync == null) - return; - - try { - updateLock.lock(); - - // Check if update came from this node. - boolean local = sync.isLockedLocally(val.getId()); - - // Process any incoming signals. - boolean incomingSignals = sync.checkIncomingSignals(val); - - // Update owner's node id. - sync.setCurrentOwnerNode(val.getId()); - - // Update owner's thread id. - sync.setCurrentOwnerThread(val.getThreadId()); - - // Update permission count. - sync.setPermits(val.get()); - - // Remove all processed failed nodes. - sync.clearFailedNodes(val.getNodes()); - - // Check if any threads waiting on this node need to be notified. - if ((incomingSignals || sync.getPermits() == 0) && !local) { - // Try to notify any waiting threads. - sync.release(0); - } - - } finally{ - updateLock.unlock(); - } - } - - /** {@inheritDoc} */ - @Override public void onNodeRemoved(UUID nodeId) { - try { - updateLock.lock(); - - sync.registerFailedNode(nodeId); - - if (nodeId.equals(sync.getOwnerNode())){ - sync.setBroken(true); - - if(!sync.failoverSafe){ - sync.interruptAll(); - } - - // Try to notify any waiting threads. - sync.release(0); - } - } finally{ - updateLock.unlock(); - } - } - - /** {@inheritDoc} */ - @Override public String name() { - return name; - } - - /** {@inheritDoc} */ - @Override public void lock() { - try{ - initializeReentrantLock(); - - sync.lock(); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - finally { - sync.validate(); - } - } - - /** {@inheritDoc} */ - @Override public void lockInterruptibly() throws IgniteInterruptedException { - try { - initializeReentrantLock(); - - sync.acquireInterruptibly(1); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - catch (InterruptedException e) { - throw new IgniteInterruptedException(e); - } - finally { - sync.validate(); - } - } - - /** {@inheritDoc} */ - @Override public boolean tryLock() { - try{ - initializeReentrantLock(); - - return sync.nonfairTryAcquire(1); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - finally { - sync.validate(); - } - } - - /** {@inheritDoc} */ - @Override public boolean tryLock(long timeout, TimeUnit unit) throws IgniteInterruptedException { - try{ - initializeReentrantLock(); - - return sync.tryAcquireNanos(1, unit.toNanos(timeout)); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - catch (InterruptedException e){ - throw new IgniteInterruptedException(e); - } - finally { - sync.validate(); - } - } - - /** {@inheritDoc} */ - @Override public void unlock() { - try{ - initializeReentrantLock(); - - sync.release(1); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - finally { - sync.validate(); - } - } - - /** {@inheritDoc} */ - @Override public IgniteCondition newCondition(String name) { - try{ - initializeReentrantLock(); - - return sync.newCondition(name); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - finally { - sync.validate(); - } - } - - /** {@inheritDoc} */ - @Override public int getHoldCount() { - try{ - initializeReentrantLock(); - - return sync.getHoldCount(); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - finally { - sync.validate(); - } - } - - /** {@inheritDoc} */ - @Override public boolean isHeldByCurrentThread() { - if(sync == null) - return false; - - return sync.isHeldExclusively(); - } - - /** {@inheritDoc} */ - @Override public boolean isLocked() { - try{ - if(sync != null && !sync.failoverSafe && sync.isBroken()) - return false; - - initializeReentrantLock(); - - return sync.isLocked(); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - } - - /** {@inheritDoc} */ - @Override public boolean hasQueuedThreads() { - if(sync != null ) - return false; - - return sync.hasQueuedThreads(); - } - - /** {@inheritDoc} */ - @Override public boolean hasQueuedThread(Thread thread) { - try{ - initializeReentrantLock(); - - return sync.isQueued(thread); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - } - - /** {@inheritDoc} */ - @Override public int getQueueLength() { - try{ - initializeReentrantLock(); - - return sync.getQueueLength(); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - } - - /** {@inheritDoc} */ - @Override public boolean hasWaiters(IgniteCondition condition) { - try{ - initializeReentrantLock(); - - AbstractQueuedSynchronizer.ConditionObject c = sync.conditionMap.get(condition.name()); - - if(c == null) - throw new IllegalArgumentException(); - - return sync.hasWaiters(c); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - finally { - sync.validate(); - } - } - - /** {@inheritDoc} */ - @Override public int getWaitQueueLength(IgniteCondition condition) { - try{ - initializeReentrantLock(); - - AbstractQueuedSynchronizer.ConditionObject c = sync.conditionMap.get(condition.name()); - - if(c == null) - throw new IllegalArgumentException(); - - return sync.getWaitQueueLength(c); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - finally { - sync.validate(); - } - } - - @Override public boolean isFailoverSafe() { - return sync.failoverSafe; - } - - /** {@inheritDoc} */ - @Override public boolean isBroken() { - if(sync == null) - return false; - - return sync.isBroken(); - } - - /** {@inheritDoc} */ - @Override public GridCacheInternalKey key() { - return key; - } - - /** {@inheritDoc} */ - @Override public boolean removed() { - return rmvd; - } - - /** {@inheritDoc} */ - @Override public boolean onRemoved() { - return rmvd = true; - } - - /** {@inheritDoc} */ - @Override public void needCheckNotRemoved() { - - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(ctx.kernalContext()); - out.writeUTF(name); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - IgniteBiTuple<GridKernalContext, String> t = stash.get(); - - t.set1((GridKernalContext)in.readObject()); - t.set2(in.readUTF()); - } - - /** {@inheritDoc} */ - @Override public void close() { - if (!rmvd) { - try { - boolean force = sync != null ? sync.isBroken() && !sync.failoverSafe : false; - - ctx.kernalContext().dataStructures().removeReentrantLock(name, force); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridCacheReentrantLockImpl.class, this); - } -}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e386558a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheReentrantLockState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheReentrantLockState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheReentrantLockState.java deleted file mode 100644 index e9af65f..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheReentrantLockState.java +++ /dev/null @@ -1,298 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.datastructures; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.HashMap; -import java.util.LinkedHashSet; -import java.util.LinkedList; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import org.apache.ignite.internal.processors.cache.GridCacheInternal; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; - -/** - * Grid cache reentrant lock state. - */ -public final class GridCacheReentrantLockState implements GridCacheInternal, Externalizable, Cloneable { - /** */ - private static final long serialVersionUID = 0L; - - /** Count. */ - private int cnt; - - /** Owner thread local id. */ - private long threadId; - - /** Owner node ID. */ - private UUID id; - - /** FailoverSafe flag. */ - private boolean failoverSafe; - - /** Map containing state for each condition object associated with this lock. */ - @GridToStringInclude - private Map<String, LinkedList<UUID>> conditionMap; - - /** Map containing unprocessed signals for condition objects that are associated with this lock. */ - @GridToStringInclude - private Map<UUID, LinkedList<String>> signals; - - /** Set containing nodes that are using this lock. */ - @GridToStringInclude - private Set<UUID> nodes; - - /** - * Constructor. - * - * @param cnt Initial count. - * @param id UUID of owning node. - */ - public GridCacheReentrantLockState(int cnt, UUID id, long threadID, boolean failoverSafe) { - assert cnt >= 0; - - this.id = id; - - this.threadId = threadID; - - conditionMap = new HashMap(); - - signals = null; - - nodes = new LinkedHashSet<>(); - - nodes.add(id); - - this.failoverSafe = failoverSafe; - } - - /** - * Empty constructor required for {@link Externalizable}. - */ - public GridCacheReentrantLockState() { - // No-op. - } - - /** - * @param cnt New count. - */ - public void set(int cnt) { - this.cnt = cnt; - } - - /** - * @return Current count. - */ - public int get() { - return cnt; - } - - /** - * @return Current owner thread id. - */ - public long getThreadId() { - return threadId; - } - - /** - * @param threadId New thread owner id. - */ - public void setThreadId(long threadId) { - this.threadId = threadId; - } - - /** - * @return Current owner node id. - */ - public UUID getId() { - return id; - } - - /** - * @return New owner node id. - */ - public void setId(UUID id) { - this.id = id; - } - - public boolean isFailoverSafe() { - return failoverSafe; - } - - public int condtionCount(){ - return conditionMap.size(); - } - - public Map<String, LinkedList<UUID>> getConditionMap() { - return conditionMap; - } - - public void setConditionMap(Map<String, LinkedList<UUID>> conditionMap) { - this.conditionMap = conditionMap; - } - - public Map<UUID, LinkedList<String>> getSignals() { - return signals; - } - - public void setSignals(Map<UUID, LinkedList<String>> signals) { - this.signals = signals; - } - - public Set<UUID> getNodes() { - return nodes; - } - - public void setNodes(Set<UUID> nodes) { - this.nodes = nodes; - } - - /** {@inheritDoc} */ - @Override public Object clone() throws CloneNotSupportedException { - return super.clone(); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeInt(cnt); - out.writeLong(threadId); - U.writeUuid(out, id); - - out.writeBoolean(failoverSafe); - - out.writeBoolean(conditionMap != null); - - if (conditionMap != null) { - out.writeInt(conditionMap.size()); - - for (Map.Entry<String, LinkedList<UUID>> e : conditionMap.entrySet()) { - U.writeString(out, e.getKey()); - - out.writeInt(e.getValue().size()); - - for(UUID uuid:e.getValue()){ - U.writeUuid(out,uuid); - } - } - } - - out.writeBoolean(signals != null); - - if (signals != null) { - out.writeInt(signals.size()); - - for (Map.Entry<UUID, LinkedList<String>> e : signals.entrySet()) { - U.writeUuid(out, e.getKey()); - - out.writeInt(e.getValue().size()); - - for(String condition:e.getValue()){ - U.writeString(out, condition); - } - } - } - - out.writeBoolean(nodes != null); - - if (nodes != null) { - out.writeInt(nodes.size()); - - for (UUID uuid: nodes) { - U.writeUuid(out, uuid); - } - } - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException { - cnt = in.readInt(); - threadId = in.readLong(); - id = U.readUuid(in); - - failoverSafe = in.readBoolean(); - - if (in.readBoolean()) { - int size = in.readInt(); - - conditionMap = U.newLinkedHashMap(size); - - for (int i = 0; i < size; i++) { - String key = U.readString(in); - - int size1 = in.readInt(); - - LinkedList<UUID> list = new LinkedList(); - - for (int j = 0; j < size1; j++) { - list.add(U.readUuid(in)); - } - - conditionMap.put(key, list); - } - } - - if(in.readBoolean()) { - assert (conditionMap != null); - - int size = in.readInt(); - - signals = U.newLinkedHashMap(size); - - for (int i = 0; i < size; i++) { - UUID node = U.readUuid(in); - - int size1 = in.readInt(); - - LinkedList<String> list = new LinkedList(); - - for (int j = 0; j < size1; j++) { - list.add(U.readString(in)); - } - - signals.put(node, list); - } - } - else{ - signals = null; - } - - if (in.readBoolean()) { - int size = in.readInt(); - - nodes = U.newLinkedHashSet(size); - - for (int i = 0; i < size; i++) { - nodes.add(U.readUuid(in)); - } - } - else{ - nodes = null; - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridCacheReentrantLockState.class, this); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/e386558a/modules/core/src/main/resources/META-INF/classnames.properties ---------------------------------------------------------------------- diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties index 1d9d445..42529be 100644 --- a/modules/core/src/main/resources/META-INF/classnames.properties +++ b/modules/core/src/main/resources/META-INF/classnames.properties @@ -1009,8 +1009,8 @@ org.apache.ignite.internal.processors.datastructures.GridCacheQueueItemKey org.apache.ignite.internal.processors.datastructures.GridCacheQueueProxy org.apache.ignite.internal.processors.datastructures.GridCacheSemaphoreImpl org.apache.ignite.internal.processors.datastructures.GridCacheSemaphoreState -org.apache.ignite.internal.processors.datastructures.GridCacheReentrantLockImpl -org.apache.ignite.internal.processors.datastructures.GridCacheReentrantLockState +org.apache.ignite.internal.processors.datastructures.GridCacheLockImpl +org.apache.ignite.internal.processors.datastructures.GridCacheLockState org.apache.ignite.internal.processors.datastructures.GridCacheSetHeader org.apache.ignite.internal.processors.datastructures.GridCacheSetHeaderKey org.apache.ignite.internal.processors.datastructures.GridCacheSetImpl$CollocatedItemKey http://git-wip-us.apache.org/repos/asf/ignite/blob/e386558a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java index af10c2a..c3ee082 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java @@ -20,7 +20,6 @@ package org.apache.ignite.internal; import java.util.Collection; import java.util.Collections; import java.util.concurrent.Callable; -import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteAtomicLong; @@ -29,10 +28,9 @@ import org.apache.ignite.IgniteAtomicSequence; import org.apache.ignite.IgniteAtomicStamped; import org.apache.ignite.IgniteClientDisconnectedException; import org.apache.ignite.IgniteCountDownLatch; -import org.apache.ignite.IgniteReentrantLock; +import org.apache.ignite.IgniteLock; import org.apache.ignite.IgniteSemaphore; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse; -import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.testframework.GridTestUtils; /** @@ -788,11 +786,11 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr Ignite srv = clientRouter(client); - IgniteReentrantLock clientLock = client.reentrantLock("lock1", true, true); + IgniteLock clientLock = client.reentrantLock("lock1", true, true); assertEquals(false, clientLock.isLocked()); - final IgniteReentrantLock srvLock = srv.reentrantLock("lock1", true, true); + final IgniteLock srvLock = srv.reentrantLock("lock1", true, true); assertEquals(false, srvLock.isLocked()); http://git-wip-us.apache.org/repos/asf/ignite/blob/e386558a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java index df0a146..f626411 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java @@ -36,8 +36,8 @@ import org.apache.ignite.IgniteAtomicStamped; import org.apache.ignite.IgniteCountDownLatch; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteInterruptedException; +import org.apache.ignite.IgniteLock; import org.apache.ignite.IgniteQueue; -import org.apache.ignite.IgniteReentrantLock; import org.apache.ignite.IgniteSemaphore; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.configuration.AtomicConfiguration; @@ -572,10 +572,10 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig * @throws Exception If failed. */ private void doTestReentrantLock(ConstantTopologyChangeWorker topWorker, boolean failoverSafe) throws Exception { - try (IgniteReentrantLock lock = grid(0).reentrantLock(STRUCTURE_NAME, failoverSafe, true)) { + try (IgniteLock lock = grid(0).reentrantLock(STRUCTURE_NAME, failoverSafe, true)) { IgniteInternalFuture<?> fut = topWorker.startChangingTopology(new IgniteClosure<Ignite, Object>() { @Override public Object apply(Ignite ignite) { - IgniteReentrantLock l = ignite.reentrantLock(STRUCTURE_NAME, failoverSafe, false); + IgniteLock l = ignite.reentrantLock(STRUCTURE_NAME, failoverSafe, false); IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { @Override public Void call() throws Exception { http://git-wip-us.apache.org/repos/asf/ignite/blob/e386558a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java index 62b66b7..06b11a1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java @@ -24,8 +24,8 @@ import org.apache.ignite.Ignite; import org.apache.ignite.IgniteAtomicLong; import org.apache.ignite.IgniteAtomicSequence; import org.apache.ignite.IgniteCountDownLatch; +import org.apache.ignite.IgniteLock; import org.apache.ignite.IgniteQueue; -import org.apache.ignite.IgniteReentrantLock; import org.apache.ignite.IgniteSemaphore; import org.apache.ignite.IgniteSet; import org.apache.ignite.configuration.CollectionConfiguration; @@ -346,7 +346,7 @@ public abstract class IgniteClientDataStructuresAbstractTest extends GridCommonA assertNull(creator.reentrantLock("lock1", true, false)); assertNull(other.reentrantLock("lock1", true, false)); - try (IgniteReentrantLock lock = creator.reentrantLock("lock1", true, true)) { + try (IgniteLock lock = creator.reentrantLock("lock1", true, true)) { assertNotNull(lock); assertFalse(lock.isLocked()); @@ -355,7 +355,7 @@ public abstract class IgniteClientDataStructuresAbstractTest extends GridCommonA IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Object>() { @Override public Object call() throws Exception { - IgniteReentrantLock lock0 = other.reentrantLock("lock1", true, false); + IgniteLock lock0 = other.reentrantLock("lock1", true, false); lock0.lock(); http://git-wip-us.apache.org/repos/asf/ignite/blob/e386558a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java index b31ca8b..e45e252 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java @@ -30,7 +30,7 @@ import org.apache.ignite.IgniteAtomicStamped; import org.apache.ignite.IgniteCountDownLatch; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteQueue; -import org.apache.ignite.IgniteReentrantLock; +import org.apache.ignite.IgniteLock; import org.apache.ignite.IgniteSemaphore; import org.apache.ignite.IgniteSet; import org.apache.ignite.cache.CacheAtomicityMode; @@ -369,7 +369,7 @@ public class IgniteDataStructureUniqueNameTest extends IgniteCollectionAbstractT res instanceof IgniteQueue || res instanceof IgniteSet || res instanceof IgniteSemaphore || - res instanceof IgniteReentrantLock); + res instanceof IgniteLock); log.info("Data structure created: " + dataStructure); http://git-wip-us.apache.org/repos/asf/ignite/blob/e386558a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteLockAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteLockAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteLockAbstractSelfTest.java new file mode 100644 index 0000000..da8ab34 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteLockAbstractSelfTest.java @@ -0,0 +1,425 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.datastructures; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Callable; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteCompute; +import org.apache.ignite.IgniteCondition; +import org.apache.ignite.IgniteLock; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteSemaphore; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.PA; +import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.resources.LoggerResource; +import org.apache.ignite.testframework.GridTestUtils; +import org.jetbrains.annotations.Nullable; +import org.junit.Rule; +import org.junit.rules.ExpectedException; + +import static java.util.concurrent.TimeUnit.MICROSECONDS; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.MINUTES; +import static org.apache.ignite.cache.CacheMode.LOCAL; + +/** + * Cache reentrant lock self test. + */ +public abstract class IgniteLockAbstractSelfTest extends IgniteAtomicsAbstractTest + implements Externalizable { + /** */ + private static final int NODES_CNT = 4; + + /** */ + protected static final int THREADS_CNT = 5; + + /** */ + private static final Random RND = new Random(); + + /** */ + @Rule + public final ExpectedException exception = ExpectedException.none(); + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return NODES_CNT; + } + + /** + * @throws Exception If failed. + */ + public void testReentrantLock() throws Exception { + checkReentrantLock(); + } + + /** + * @throws Exception If failed. + */ + public void testFailover() throws Exception { + if (atomicsCacheMode() == LOCAL) + return; + + checkFailover(true); + checkFailover(false); + } + + /** + * @param failoverSafe Failover safe flag. + * @throws Exception + */ + private void checkFailover(boolean failoverSafe) throws Exception { + IgniteEx g = startGrid(NODES_CNT + 1); + + // For vars locality. + { + // Ensure not exists. + assert g.semaphore("sem", 2, failoverSafe, false) == null; + + IgniteSemaphore sem = g.semaphore( + "sem", + 2, + failoverSafe, + true); + + sem.acquire(2); + + assert !sem.tryAcquire(); + assertEquals( + 0, + sem.availablePermits()); + } + + Ignite g0 = grid(0); + + final IgniteSemaphore sem0 = g0.semaphore( + "sem", + -10, + false, + false); + + assert !sem0.tryAcquire(); + assertEquals(0, sem0.availablePermits()); + + IgniteInternalFuture<?> fut = multithreadedAsync( + new Callable<Object>() { + @Override public Object call() throws Exception { + sem0.acquire(); + + info("Acquired in separate thread."); + + return null; + } + }, + 1); + + Thread.sleep(100); + + g.close(); + + try { + fut.get(500); + } + catch (IgniteCheckedException e) { + if (!failoverSafe && e.hasCause(InterruptedException.class)) + info("Ignored expected exception: " + e); + else + throw e; + } + + sem0.close(); + } + + /** + * @throws Exception If failed. + */ + private void checkReentrantLock() throws Exception { + // Test API. + checkLock(); + + checkFailoverSafe(); + + // Test main functionality. + IgniteLock lock1 = grid(0).reentrantLock("lock", true, true); + + assertFalse(lock1.isLocked()); + + lock1.lock(); + + IgniteCompute comp = grid(0).compute().withAsync(); + + comp.call(new IgniteCallable<Object>() { + @IgniteInstanceResource + private Ignite ignite; + + @LoggerResource + private IgniteLogger log; + + @Nullable @Override public Object call() throws Exception { + // Test reentrant lock in multiple threads on each node. + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync( + new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + IgniteLock lock = ignite.reentrantLock("lock", true, true); + + assert lock != null; + + log.info("Thread is going to wait on reentrant lock: " + Thread.currentThread().getName()); + + assert lock.tryLock(1, MINUTES); + + log.info("Thread is again runnable: " + Thread.currentThread().getName()); + + lock.unlock(); + + return null; + } + }, + 5, + "test-thread" + ); + + fut.get(); + + return null; + } + }); + + IgniteFuture<Object> fut = comp.future(); + + Thread.sleep(3000); + + assert lock1.isHeldByCurrentThread() == true; + + assert lock1.getHoldCount() == 1; + + lock1.lock(); + + assert lock1.isHeldByCurrentThread() == true; + + assert lock1.getHoldCount() == 2; + + lock1.unlock(); + + lock1.unlock(); + + // Ensure there are no hangs. + fut.get(); + + // Test operations on removed semaphore. + lock1.close(); + + checkRemovedReentrantLock(lock1); + } + + /** + * @param lock IgniteLock. + * @throws Exception If failed. + */ + protected void checkRemovedReentrantLock(final IgniteLock lock) throws Exception { + assert GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return lock.removed(); + } + }, 5000); + + assert lock.removed(); + } + + /** + * This method only checks if parameter of new reentrant lock is initialized properly. + * For tests considering failure recovery see @GridCachePartitionedNodeFailureSelfTest. + * + * @throws Exception Exception. + */ + private void checkFailoverSafe() throws Exception { + // Checks only if reentrant lock is initialized properly + IgniteLock lock = createReentrantLock("rmv", true); + + assert lock.isFailoverSafe(); + + removeReentrantLock("rmv"); + + IgniteLock lock1 = createReentrantLock("rmv1", false); + + assert !lock1.isFailoverSafe(); + + removeReentrantLock("rmv1"); + } + + /** + * @throws Exception Exception. + */ + private void checkLock() throws Exception { + // Check only 'false' cases here. Successful lock is tested over the grid. + final IgniteLock lock = createReentrantLock("acquire", false); + + lock.lock(); + + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + assertNotNull(lock); + + assert !lock.tryLock(); + + assert !lock.tryLock(10, MICROSECONDS); + + return null; + } + }); + + fut.get(); + + lock.unlock(); + + removeReentrantLock("acquire"); + } + + /** + * @param lockName Reentrant lock name. + * @param failoverSafe Fairness flag. + * @return New distributed reentrant lock. + * @throws Exception If failed. + */ + private IgniteLock createReentrantLock(String lockName, boolean failoverSafe) + throws Exception { + IgniteLock lock = grid(RND.nextInt(NODES_CNT)).reentrantLock(lockName, failoverSafe, true); + + // Test initialization. + assert lockName.equals(lock.name()); + assert lock.isLocked() == false; + assert lock.isFailoverSafe() == failoverSafe; + + return lock; + } + + /** + * @param lockName Reentrant lock name. + * @throws Exception If failed. + */ + private void removeReentrantLock(String lockName) + throws Exception { + IgniteLock lock = grid(RND.nextInt(NODES_CNT)).reentrantLock(lockName, false, true); + + assert lock != null; + + // Remove lock on random node. + IgniteLock lock0 = grid(RND.nextInt(NODES_CNT)).reentrantLock(lockName, false, true); + + assertNotNull(lock0); + + lock0.close(); + + // Ensure reentrant lock is removed on all nodes. + for (Ignite g : G.allGrids()) + assertNull(((IgniteKernal)g).context().dataStructures().reentrantLock(lockName, false, false)); + + checkRemovedReentrantLock(lock); + } + + /** + * @throws Exception If failed. + */ + public void testReentrantLockMultinode1() throws Exception { + if (gridCount() == 1) + return; + + IgniteLock lock = grid(0).reentrantLock("s1", true, true); + + List<IgniteInternalFuture<?>> futs = new ArrayList<>(); + + for (int i = 0; i < gridCount(); i++) { + final Ignite ignite = grid(i); + + futs.add(GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + IgniteLock lock = ignite.reentrantLock("s1", true, false); + + assertNotNull(lock); + + IgniteCondition cond1 = lock.getOrCreateCondition("c1"); + + IgniteCondition cond2 = lock.getOrCreateCondition("c2"); + + try { + boolean wait = lock.tryLock(30_000, MILLISECONDS); + + assertTrue(wait); + + cond2.signal(); + + cond1.await(); + } + finally { + lock.unlock(); + } + + return null; + } + })); + } + + boolean done = false; + + while(!done) { + done = true; + + for (IgniteInternalFuture<?> fut : futs){ + if(!fut.isDone()) + done = false; + } + + try{ + lock.lock(); + + lock.getOrCreateCondition("c1").signal(); + + lock.getOrCreateCondition("c2").await(10,MILLISECONDS); + } + finally { + lock.unlock(); + } + } + + for (IgniteInternalFuture<?> fut : futs) + fut.get(30_000); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + // No-op. + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e386558a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteReentrantLockAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteReentrantLockAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteReentrantLockAbstractSelfTest.java deleted file mode 100644 index 6a441bb..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteReentrantLockAbstractSelfTest.java +++ /dev/null @@ -1,428 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.datastructures; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.ArrayList; -import java.util.List; -import java.util.Random; -import java.util.concurrent.Callable; -import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteCompute; -import org.apache.ignite.IgniteCondition; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.IgniteReentrantLock; -import org.apache.ignite.IgniteSemaphore; -import org.apache.ignite.internal.IgniteEx; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.IgniteKernal; -import org.apache.ignite.internal.util.typedef.G; -import org.apache.ignite.internal.util.typedef.PA; -import org.apache.ignite.lang.IgniteCallable; -import org.apache.ignite.lang.IgniteFuture; -import org.apache.ignite.resources.IgniteInstanceResource; -import org.apache.ignite.resources.LoggerResource; -import org.apache.ignite.testframework.GridTestUtils; -import org.jetbrains.annotations.Nullable; -import org.junit.Rule; -import org.junit.rules.ExpectedException; - -import static java.util.concurrent.TimeUnit.MICROSECONDS; -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.MINUTES; -import static org.apache.ignite.cache.CacheMode.LOCAL; - -/** - * Cache reentrant lock self test. - */ -public abstract class IgniteReentrantLockAbstractSelfTest extends IgniteAtomicsAbstractTest - implements Externalizable { - /** */ - private static final int NODES_CNT = 4; - - /** */ - protected static final int THREADS_CNT = 5; - - /** */ - private static final Random RND = new Random(); - - /** */ - @Rule - public final ExpectedException exception = ExpectedException.none(); - - /** {@inheritDoc} */ - @Override protected int gridCount() { - return NODES_CNT; - } - - /** - * @throws Exception If failed. - */ - public void testReentrantLock() throws Exception { - checkReentrantLock(); - } - - /** - * @throws Exception If failed. - */ - public void testFailover() throws Exception { - if (atomicsCacheMode() == LOCAL) - return; - - checkFailover(true); - checkFailover(false); - } - - /** - * @param failoverSafe Failover safe flag. - * @throws Exception - */ - private void checkFailover(boolean failoverSafe) throws Exception { - IgniteEx g = startGrid(NODES_CNT + 1); - - // For vars locality. - { - // Ensure not exists. - assert g.semaphore("sem", 2, failoverSafe, false) == null; - - IgniteSemaphore sem = g.semaphore( - "sem", - 2, - failoverSafe, - true); - - sem.acquire(2); - - assert !sem.tryAcquire(); - assertEquals( - 0, - sem.availablePermits()); - } - - Ignite g0 = grid(0); - - final IgniteSemaphore sem0 = g0.semaphore( - "sem", - -10, - false, - false); - - assert !sem0.tryAcquire(); - assertEquals(0, sem0.availablePermits()); - - IgniteInternalFuture<?> fut = multithreadedAsync( - new Callable<Object>() { - @Override public Object call() throws Exception { - sem0.acquire(); - - info("Acquired in separate thread."); - - return null; - } - }, - 1); - - Thread.sleep(100); - - g.close(); - - try { - fut.get(500); - } - catch (IgniteCheckedException e) { - if (!failoverSafe && e.hasCause(InterruptedException.class)) - info("Ignored expected exception: " + e); - else - throw e; - } - - sem0.close(); - } - - /** - * @throws Exception If failed. - */ - private void checkReentrantLock() throws Exception { - // Test API. - checkLock(); - - checkFailoverSafe(); - - // Test main functionality. - IgniteReentrantLock lock1 = grid(0).reentrantLock("lock", true, true); - - assertFalse(lock1.isLocked()); - - lock1.lock(); - - IgniteCompute comp = grid(0).compute().withAsync(); - - comp.call(new IgniteCallable<Object>() { - @IgniteInstanceResource - private Ignite ignite; - - @LoggerResource - private IgniteLogger log; - - @Nullable @Override public Object call() throws Exception { - // Test reentrant lock in multiple threads on each node. - IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync( - new Callable<Object>() { - @Nullable @Override public Object call() throws Exception { - IgniteReentrantLock lock = ignite.reentrantLock("lock", true, true); - - assert lock != null; - - log.info("Thread is going to wait on reentrant lock: " + Thread.currentThread().getName()); - - assert lock.tryLock(1, MINUTES); - - log.info("Thread is again runnable: " + Thread.currentThread().getName()); - - lock.unlock(); - - return null; - } - }, - 5, - "test-thread" - ); - - fut.get(); - - return null; - } - }); - - IgniteFuture<Object> fut = comp.future(); - - Thread.sleep(3000); - - assert lock1.isHeldByCurrentThread() == true; - - assert lock1.getHoldCount() == 1; - - lock1.lock(); - - assert lock1.isHeldByCurrentThread() == true; - - assert lock1.getHoldCount() == 2; - - lock1.unlock(); - - lock1.unlock(); - - // Ensure there are no hangs. - fut.get(); - - // Test operations on removed semaphore. - lock1.close(); - - checkRemovedReentrantLock(lock1); - } - - /** - * @param lock IgniteReentrantLock. - * @throws Exception If failed. - */ - protected void checkRemovedReentrantLock(final IgniteReentrantLock lock) throws Exception { - assert GridTestUtils.waitForCondition(new PA() { - @Override public boolean apply() { - return lock.removed(); - } - }, 5000); - - assert lock.removed(); - } - - /** - * This method only checks if parameter of new reentrant lock is initialized properly. - * For tests considering failure recovery see @GridCachePartitionedNodeFailureSelfTest. - * - * @throws Exception Exception. - */ - private void checkFailoverSafe() throws Exception { - /* - // Checks only if semaphore is initialized properly - IgniteReentrantLock lock = createReentrantLock("rmv", true); - - assert lock.isFailoverSafe(); - - removeReentrantLock("rmv"); - - IgniteReentrantLock lock1 = createReentrantLock("rmv1", false); - - assert !lock1.isFailoverSafe(); - - removeReentrantLock("rmv1"); - */ - } - - /** - * @throws Exception Exception. - */ - private void checkLock() throws Exception { - // Check only 'false' cases here. Successful lock is tested over the grid. - final IgniteReentrantLock lock = createReentrantLock("acquire", false); - - lock.lock(); - - IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Void>() { - @Override public Void call() throws Exception { - assertNotNull(lock); - - assert !lock.tryLock(); - - assert !lock.tryLock(10, MICROSECONDS); - - return null; - } - }); - - fut.get(); - - lock.unlock(); - - removeReentrantLock("acquire"); - } - - /** - * @param lockName Reentrant lock name. - * @param failoverSafe Fairness flag. - * @return New distributed reentrant lock. - * @throws Exception If failed. - */ - private IgniteReentrantLock createReentrantLock(String lockName, boolean failoverSafe) - throws Exception { - IgniteReentrantLock lock = grid(RND.nextInt(NODES_CNT)).reentrantLock(lockName, failoverSafe, true); - - // Test initialization. - assert lockName.equals(lock.name()); - assert lock.isLocked() == false; - assert lock.getQueueLength() == 0; - assert lock.isFailoverSafe() == failoverSafe; - - return lock; - } - - /** - * @param lockName Reentrant lock name. - * @throws Exception If failed. - */ - private void removeReentrantLock(String lockName) - throws Exception { - IgniteReentrantLock lock = grid(RND.nextInt(NODES_CNT)).reentrantLock(lockName, false, true); - - assert lock != null; - - // Remove lock on random node. - IgniteReentrantLock lock0 = grid(RND.nextInt(NODES_CNT)).reentrantLock(lockName, false, true); - - assertNotNull(lock0); - - lock0.close(); - - // Ensure reentrant lock is removed on all nodes. - for (Ignite g : G.allGrids()) - assertNull(((IgniteKernal)g).context().dataStructures().reentrantLock(lockName, false, false)); - - checkRemovedReentrantLock(lock); - } - - /** - * @throws Exception If failed. - */ - public void testReentrantLockMultinode1() throws Exception { - if (gridCount() == 1) - return; - - IgniteReentrantLock lock = grid(0).reentrantLock("s1", true, true); - - List<IgniteInternalFuture<?>> futs = new ArrayList<>(); - - for (int i = 0; i < gridCount(); i++) { - final Ignite ignite = grid(i); - - futs.add(GridTestUtils.runAsync(new Callable<Void>() { - @Override public Void call() throws Exception { - IgniteReentrantLock lock = ignite.reentrantLock("s1", true, false); - - assertNotNull(lock); - - IgniteCondition cond1 = lock.newCondition("c1"); - - IgniteCondition cond2 = lock.newCondition("c2"); - - try { - boolean wait = lock.tryLock(30_000, MILLISECONDS); - - assertTrue(wait); - - cond2.signal(); - - cond1.await(); - } - finally { - lock.unlock(); - } - - return null; - } - })); - } - - boolean done = false; - - while(!done) { - done = true; - - for (IgniteInternalFuture<?> fut : futs){ - if(!fut.isDone()) - done = false; - } - - try{ - lock.lock(); - - lock.newCondition("c1").signal(); - - lock.newCondition("c2").await(10,MILLISECONDS); - } - finally { - lock.unlock(); - } - } - - for (IgniteInternalFuture<?> fut : futs) - fut.get(30_000); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - // No-op. - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/e386558a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/local/IgniteLocalLockSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/local/IgniteLocalLockSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/local/IgniteLocalLockSelfTest.java new file mode 100644 index 0000000..543655c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/local/IgniteLocalLockSelfTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.datastructures.local; + +import java.util.concurrent.Callable; +import org.apache.ignite.IgniteLock; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.cache.datastructures.IgniteLockAbstractSelfTest; +import org.apache.ignite.testframework.GridTestUtils; +import org.jetbrains.annotations.Nullable; + +import static java.util.concurrent.TimeUnit.MINUTES; +import static org.apache.ignite.cache.CacheMode.LOCAL; + +/** + * + */ +public class IgniteLocalLockSelfTest extends IgniteLockAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected CacheMode atomicsCacheMode() { + return LOCAL; + } + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override public void testReentrantLock() throws Exception { + // Test main functionality. + IgniteLock lock = grid(0).reentrantLock("lock", true, true); + + assertNotNull(lock); + + assertEquals(0, lock.getHoldCount()); + + lock.lock(); + + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync( + new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + IgniteLock lock = grid(0).reentrantLock("lock", true, true); + + assert lock != null; + + info("Thread is going to wait on lock: " + Thread.currentThread().getName()); + + assert lock.tryLock(1, MINUTES); + + info("Thread is again runnable: " + Thread.currentThread().getName()); + + lock.unlock(); + + return null; + } + }, + THREADS_CNT, + "test-thread" + ); + + Thread.sleep(3000); + + assert lock.isLocked(); + + assert lock.getHoldCount() == 1; + + lock.lock(); + + assert lock.isLocked(); + + assert lock.getHoldCount() == 2; + + lock.unlock(); + + assert lock.isLocked(); + + assert lock.getHoldCount() == 1; + + lock.unlock(); + + // Ensure there are no hangs. + fut.get(); + + // Test operations on removed lock. + IgniteLock lock0 = grid(0).reentrantLock("lock", true, false); + + assertNotNull(lock0); + + lock0.close(); + + checkRemovedReentrantLock(lock0); + } +}
