http://git-wip-us.apache.org/repos/asf/ignite/blob/f8bf93a7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteLockAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteLockAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteLockAbstractSelfTest.java new file mode 100644 index 0000000..8fb9049 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteLockAbstractSelfTest.java @@ -0,0 +1,1629 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.datastructures; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteCompute; +import org.apache.ignite.IgniteCondition; +import org.apache.ignite.IgniteCountDownLatch; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteInterruptedException; +import org.apache.ignite.IgniteLock; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.util.GridConcurrentHashSet; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.PA; +import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.lang.IgniteClosure; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.resources.LoggerResource; +import org.apache.ignite.testframework.GridTestUtils; +import org.jetbrains.annotations.Nullable; +import org.junit.Rule; +import org.junit.rules.ExpectedException; + +import static java.util.concurrent.TimeUnit.MICROSECONDS; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.MINUTES; +import static org.apache.ignite.cache.CacheMode.LOCAL; + +/** + * Cache reentrant lock self test. + */ +public abstract class IgniteLockAbstractSelfTest extends IgniteAtomicsAbstractTest + implements Externalizable { + /** */ + private static final int NODES_CNT = 4; + + /** */ + protected static final int THREADS_CNT = 5; + + /** */ + private static final Random RND = new Random(); + + /** */ + @Rule + public final ExpectedException exception = ExpectedException.none(); + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return NODES_CNT; + } + + /** + * @throws Exception If failed. + */ + public void testReentrantLock() throws Exception { + checkReentrantLock(false); + + checkReentrantLock(true); + } + + /** + * @throws Exception If failed. + */ + public void testFailover() throws Exception { + if (atomicsCacheMode() == LOCAL) + return; + + checkFailover(true, false); + + checkFailover(false, false); + + checkFailover(true, true); + + checkFailover(false, true); + } + + /** + * @param failoverSafe Failover safe flag. + * @throws Exception + */ + private void checkFailover(final boolean failoverSafe, final boolean fair) throws Exception { + IgniteEx g = startGrid(NODES_CNT + 1); + + // For vars locality. + { + // Ensure not exists. + assert g.reentrantLock("lock", failoverSafe, fair, false) == null; + + IgniteLock lock = g.reentrantLock("lock", failoverSafe, fair, true); + + lock.lock(); + + assert lock.tryLock(); + + assertEquals(2, lock.getHoldCount()); + } + + Ignite g0 = grid(0); + + final IgniteLock lock0 = g0.reentrantLock("lock", false, fair, false); + + assert !lock0.tryLock(); + + assertEquals(0, lock0.getHoldCount()); + + IgniteInternalFuture<?> fut = multithreadedAsync( + new Callable<Object>() { + @Override public Object call() throws Exception { + try { + lock0.lock(); + + info("Acquired in separate thread."); + + // Lock is acquired silently only in failoverSafe mode. + assertTrue(failoverSafe); + + lock0.unlock(); + + info("Released lock in separate thread."); + } + catch (IgniteException e) { + if (!failoverSafe) + info("Ignored expected exception: " + e); + else + throw e; + } + return null; + } + }, + 1); + + Thread.sleep(100); + + g.close(); + + fut.get(500); + + lock0.close(); + } + + /** + * @throws Exception If failed. + */ + private void checkReentrantLock(final boolean fair) throws Exception { + // Test API. + checkLock(fair); + + checkFailoverSafe(fair); + + // Test main functionality. + IgniteLock lock1 = grid(0).reentrantLock("lock", true, fair, true); + + assertFalse(lock1.isLocked()); + + lock1.lock(); + + IgniteCompute comp = grid(0).compute().withAsync(); + + comp.call(new IgniteCallable<Object>() { + @IgniteInstanceResource + private Ignite ignite; + + @LoggerResource + private IgniteLogger log; + + @Nullable @Override public Object call() throws Exception { + // Test reentrant lock in multiple threads on each node. + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync( + new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + IgniteLock lock = ignite.reentrantLock("lock", true, fair, true); + + assert lock != null; + + log.info("Thread is going to wait on reentrant lock: " + Thread.currentThread().getName()); + + assert lock.tryLock(1, MINUTES); + + log.info("Thread is again runnable: " + Thread.currentThread().getName()); + + lock.unlock(); + + return null; + } + }, + 5, + "test-thread" + ); + + fut.get(); + + return null; + } + }); + + IgniteFuture<Object> fut = comp.future(); + + Thread.sleep(3000); + + assert lock1.isHeldByCurrentThread(); + + assert lock1.getHoldCount() == 1; + + lock1.lock(); + + assert lock1.isHeldByCurrentThread(); + + assert lock1.getHoldCount() == 2; + + lock1.unlock(); + + lock1.unlock(); + + // Ensure there are no hangs. + fut.get(); + + // Test operations on removed lock. + lock1.close(); + + checkRemovedReentrantLock(lock1); + } + + /** + * @param lock IgniteLock. + * @throws Exception If failed. + */ + protected void checkRemovedReentrantLock(final IgniteLock lock) throws Exception { + assert GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return lock.removed(); + } + }, 5000); + + assert lock.removed(); + } + + /** + * This method only checks if parameter of new reentrant lock is initialized properly. + * For tests considering failure recovery see @GridCachePartitionedNodeFailureSelfTest. + * + * @throws Exception Exception. + */ + private void checkFailoverSafe(final boolean fair) throws Exception { + // Checks only if reentrant lock is initialized properly + IgniteLock lock = createReentrantLock("rmv", true, fair); + + assert lock.isFailoverSafe(); + + removeReentrantLock("rmv", fair); + + IgniteLock lock1 = createReentrantLock("rmv1", false, fair); + + assert !lock1.isFailoverSafe(); + + removeReentrantLock("rmv1", fair); + } + + /** + * @throws Exception Exception. + */ + private void checkLock(final boolean fair) throws Exception { + // Check only 'false' cases here. Successful lock is tested over the grid. + final IgniteLock lock = createReentrantLock("acquire", false, fair); + + lock.lock(); + + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + assertNotNull(lock); + + assert !lock.tryLock(); + + assert !lock.tryLock(10, MICROSECONDS); + + return null; + } + }); + + fut.get(); + + lock.unlock(); + + removeReentrantLock("acquire", fair); + } + + /** + * @param lockName Reentrant lock name. + * @param failoverSafe FailoverSafe flag. + * @param fair Fairness flag. + * @return New distributed reentrant lock. + * @throws Exception If failed. + */ + private IgniteLock createReentrantLock(String lockName, boolean failoverSafe, boolean fair) + throws Exception { + IgniteLock lock = grid(RND.nextInt(NODES_CNT)).reentrantLock(lockName, failoverSafe, fair, true); + + // Test initialization. + assert lockName.equals(lock.name()); + assert !lock.isLocked(); + assert lock.isFailoverSafe() == failoverSafe; + assert lock.isFair() == fair; + + return lock; + } + + /** + * @param lockName Reentrant lock name. + * @throws Exception If failed. + */ + private void removeReentrantLock(String lockName, final boolean fair) + throws Exception { + IgniteLock lock = grid(RND.nextInt(NODES_CNT)).reentrantLock(lockName, false, fair, true); + + assert lock != null; + + // Remove lock on random node. + IgniteLock lock0 = grid(RND.nextInt(NODES_CNT)).reentrantLock(lockName, false, fair, true); + + assertNotNull(lock0); + + lock0.close(); + + // Ensure reentrant lock is removed on all nodes. + for (Ignite g : G.allGrids()) + assertNull(((IgniteKernal)g).context().dataStructures().reentrantLock(lockName, false, fair, false)); + + checkRemovedReentrantLock(lock); + } + + /** + * @throws Exception If failed. + */ + public void testLockSerialization() throws Exception { + final IgniteLock lock = grid(0).reentrantLock("lock", true, true, true); + + info("Lock created: " + lock); + + lock.isFailoverSafe(); + lock.isFair(); + + grid(ThreadLocalRandom.current().nextInt(G.allGrids().size())).compute().broadcast(new IgniteCallable<Object>() { + @Nullable @Override public Object call() throws Exception { + Thread.sleep(1000); + + lock.lock(); + + try { + info("Inside lock: " + lock.getHoldCount()); + } + finally { + lock.unlock(); + } + + return null; + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testInitialization() throws Exception { + // Test #name() method. + { + IgniteLock lock = grid(0).reentrantLock("lock", true, true, true); + + assertEquals("lock", lock.name()); + + lock.close(); + } + + // Test #isFailoverSafe() method. + { + IgniteLock lock = grid(0).reentrantLock("lock", true, true, true); + + info("Lock created: " + lock); + + assertTrue(lock.isFailoverSafe()); + + lock.close(); + } + + // Test #isFair() method. + { + IgniteLock lock = grid(0).reentrantLock("lock", true, true, true); + + assertTrue(lock.isFair()); + + lock.close(); + } + + // Test #isBroken() method. + { + IgniteLock lock = grid(0).reentrantLock("lock", true, true, true); + + assertFalse(lock.isBroken()); + + lock.close(); + } + + // Test #getOrCreateCondition(String ) method. + { + IgniteLock lock = grid(0).reentrantLock("lock", true, true, true); + + assertNotNull(lock.getOrCreateCondition("condition")); + + lock.close(); + } + + // Test #getHoldCount() method. + { + IgniteLock lock = grid(0).reentrantLock("lock", true, true, true); + + assertEquals(0, lock.getHoldCount()); + + lock.close(); + } + + // Test #isHeldByCurrentThread() method. + { + IgniteLock lock = grid(0).reentrantLock("lock", true, true, true); + + assertFalse(lock.isHeldByCurrentThread()); + + lock.close(); + } + + // Test #isLocked() method. + { + IgniteLock lock = grid(0).reentrantLock("lock", true, true, true); + + assertFalse(lock.isLocked()); + + lock.close(); + } + + // Test #hasQueuedThreads() method. + { + IgniteLock lock = grid(0).reentrantLock("lock", true, true, true); + + assertFalse(lock.hasQueuedThreads()); + + lock.close(); + } + + // Test #hasQueuedThread(Thread ) method. + { + IgniteLock lock = grid(0).reentrantLock("lock", true, true, true); + + assertFalse(lock.hasQueuedThread(Thread.currentThread())); + + lock.close(); + } + + // Test #hasWaiters(IgniteCondition ) method. + { + IgniteLock lock = grid(0).reentrantLock("lock", true, true, true); + + try { + IgniteCondition cond = grid(0).reentrantLock("lock2", true, true, true).getOrCreateCondition("cond"); + + lock.hasWaiters(cond); + + fail("Condition not associated with this lock passed as argument."); + } + catch (IllegalArgumentException e) { + info("IllegalArgumentException thrown as it should be."); + } + + try { + IgniteCondition cond = lock.getOrCreateCondition("condition"); + + lock.hasWaiters(cond); + + fail("This method should throw exception when lock is not held."); + } + catch (IllegalMonitorStateException e) { + info("IllegalMonitorStateException thrown as it should be."); + } + + lock.close(); + } + + // Test #getWaitQueueLength(IgniteCondition ) method. + { + IgniteLock lock = grid(0).reentrantLock("lock", true, true, true); + + try { + IgniteCondition cond = grid(0).reentrantLock("lock2", true, true, true).getOrCreateCondition("cond"); + + lock.getWaitQueueLength(cond); + + fail("Condition not associated with this lock passed as argument."); + } + catch (IllegalArgumentException e) { + info("IllegalArgumentException thrown as it should be."); + } + + try { + IgniteCondition cond = lock.getOrCreateCondition("condition"); + + lock.getWaitQueueLength(cond); + + fail("This method should throw exception when lock is not held."); + } + catch (IllegalMonitorStateException e) { + info("IllegalMonitorStateException thrown as it should be."); + } + + lock.close(); + } + + // Test #lock() method. + { + IgniteLock lock = grid(0).reentrantLock("lock", true, true, true); + + lock.lock(); + + lock.unlock(); + + lock.close(); + } + + // Test #lockInterruptibly() method. + { + IgniteLock lock = grid(0).reentrantLock("lock", true, true, true); + + lock.lockInterruptibly(); + + lock.unlock(); + + lock.close(); + } + + // Test #tryLock() method. + { + IgniteLock lock = grid(0).reentrantLock("lock", true, true, true); + + boolean success = lock.tryLock(); + + assertTrue(success); + + lock.unlock(); + + lock.close(); + } + + // Test #tryLock(long, TimeUnit) method. + { + IgniteLock lock = grid(0).reentrantLock("lock", true, true, true); + + boolean success = lock.tryLock(1, MILLISECONDS); + + assertTrue(success); + + lock.unlock(); + + lock.close(); + } + + // Test #unlock() method. + { + IgniteLock lock = grid(0).reentrantLock("lock", true, true, true); + + try { + lock.unlock(); + + fail("This method should throw exception when lock is not held."); + } + catch (IllegalMonitorStateException e) { + info("IllegalMonitorStateException thrown as it should be."); + } + + lock.close(); + } + + // Test #removed() method. + { + IgniteLock lock = grid(0).reentrantLock("lock", true, true, true); + + assertFalse(lock.removed()); + + lock.close(); + + assertTrue(lock.removed()); + } + + // Test #close() method. + { + IgniteLock lock = grid(0).reentrantLock("lock", true, true, true); + + lock.close(); + + assertTrue(lock.removed()); + } + } + + /** + * @throws Exception If failed. + */ + public void testReentrantLockMultinode1() throws Exception { + testReentrantLockMultinode1(false); + + testReentrantLockMultinode1(true); + } + + /** + * @throws Exception If failed. + */ + private void testReentrantLockMultinode1(final boolean fair) throws Exception { + if (gridCount() == 1) + return; + + IgniteLock lock = grid(0).reentrantLock("s1", true, fair, true); + + List<IgniteInternalFuture<?>> futs = new ArrayList<>(); + + for (int i = 0; i < gridCount(); i++) { + final Ignite ignite = grid(i); + + futs.add(GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + IgniteLock lock = ignite.reentrantLock("s1", true, fair, false); + + assertNotNull(lock); + + IgniteCondition cond1 = lock.getOrCreateCondition("c1"); + + IgniteCondition cond2 = lock.getOrCreateCondition("c2"); + + try { + boolean wait = lock.tryLock(30_000, MILLISECONDS); + + assertTrue(wait); + + cond2.signal(); + + cond1.await(); + } + finally { + lock.unlock(); + } + + return null; + } + })); + } + + boolean done = false; + + while(!done) { + done = true; + + for (IgniteInternalFuture<?> fut : futs){ + if(!fut.isDone()) + done = false; + } + + try{ + lock.lock(); + + lock.getOrCreateCondition("c1").signal(); + + lock.getOrCreateCondition("c2").await(10,MILLISECONDS); + } + finally { + lock.unlock(); + } + } + + for (IgniteInternalFuture<?> fut : futs) + fut.get(30_000); + } + + /** + * @throws Exception If failed. + */ + public void testLockInterruptibly() throws Exception { + testLockInterruptibly(false); + + testLockInterruptibly(true); + } + + /** + * @throws Exception If failed. + */ + private void testLockInterruptibly(final boolean fair) throws Exception { + final IgniteLock lock0 = grid(0).reentrantLock("lock", true, fair, true); + + assertEquals(0, lock0.getHoldCount()); + + assertFalse(lock0.hasQueuedThreads()); + + final int totalThreads = 2; + + final Set<Thread> startedThreads = new GridConcurrentHashSet<>(); + + lock0.lock(); + + IgniteInternalFuture<?> fut = multithreadedAsync( + new Callable<Object>() { + @Override public Object call() throws Exception { + assertFalse(lock0.isHeldByCurrentThread()); + + startedThreads.add(Thread.currentThread()); + + boolean isInterrupted = false; + + try { + lock0.lockInterruptibly(); + } + catch (IgniteInterruptedException e) { + assertFalse(Thread.currentThread().isInterrupted()); + + isInterrupted = true; + } + finally { + // Assert that thread was interrupted. + assertTrue(isInterrupted); + + // Assert that locked is still owned by main thread. + assertTrue(lock0.isLocked()); + + // Assert that this thread doesn't own the lock. + assertFalse(lock0.isHeldByCurrentThread()); + } + + return null; + } + }, totalThreads); + + // Wait for all threads to attempt to acquire lock. + while (startedThreads.size() != totalThreads) { + Thread.sleep(1000); + } + + for (Thread t : startedThreads) + t.interrupt(); + + fut.get(); + + lock0.unlock(); + + assertFalse(lock0.isLocked()); + + for (Thread t : startedThreads) + assertFalse(lock0.hasQueuedThread(t)); + + lock0.close(); + } + + /** + * @throws Exception If failed. + */ + public void testLockInterruptiblyMultinode() throws Exception { + testLockInterruptiblyMultinode(false); + + testLockInterruptiblyMultinode(true); + } + + /** + * @throws Exception If failed. + */ + private void testLockInterruptiblyMultinode(final boolean fair) throws Exception { + if (gridCount() == 1) + return; + + // Initialize reentrant lock. + final IgniteLock lock0 = grid(0).reentrantLock("lock", true, fair, true); + + assertEquals(0, lock0.getHoldCount()); + + assertFalse(lock0.hasQueuedThreads()); + + lock0.lock(); + + // Number of threads, one per node. + final int threadCount = gridCount(); + + final AtomicLong threadCounter = new AtomicLong(0); + + IgniteInternalFuture<?> fut = multithreadedAsync( + new Callable<Object>() { + @Override public Object call() throws Exception { + final int localNodeId = (int)threadCounter.getAndIncrement(); + + final Ignite grid = grid(localNodeId); + + IgniteClosure<Ignite, Void> closure = new IgniteClosure<Ignite, Void>() { + @Override public Void apply(Ignite ignite) { + final IgniteLock l = ignite.reentrantLock("lock", true, true, true); + + final AtomicReference<Thread> thread = new AtomicReference<>(); + + final AtomicBoolean done = new AtomicBoolean(false); + + final AtomicBoolean exceptionThrown = new AtomicBoolean(false); + + final IgniteCountDownLatch latch = ignite.countDownLatch("latch", threadCount, false, true); + + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + try{ + thread.set(Thread.currentThread()); + + l.lockInterruptibly(); + } + catch(IgniteInterruptedException e){ + exceptionThrown.set(true); + } + finally { + done.set(true); + } + + return null; + } + }); + + // Wait until l.lock() has been called. + while(!l.hasQueuedThreads()){ + // No-op. + } + + latch.countDown(); + + latch.await(); + + thread.get().interrupt(); + + while(!done.get()){ + // No-op. + } + + try { + fut.get(); + } + catch (IgniteCheckedException e) { + fail(e.getMessage()); + + throw new RuntimeException(e); + } + + assertTrue(exceptionThrown.get()); + + return null; + } + }; + + closure.apply(grid); + + return null; + } + }, threadCount); + + fut.get(); + + lock0.unlock(); + + info("Checking if interrupted threads are removed from global waiting queue..."); + + // Check if interrupted threads are removed from global waiting queue. + boolean locked = lock0.tryLock(1000, MILLISECONDS); + + info("Interrupted threads successfully removed from global waiting queue. "); + + assertTrue(locked); + + lock0.unlock(); + + assertFalse(lock0.isLocked()); + + lock0.close(); + } + + /** + * @throws Exception If failed. + */ + public void testLock() throws Exception { + testLock(false); + + testLock(true); + } + + /** + * @throws Exception If failed. + */ + private void testLock(final boolean fair) throws Exception { + final IgniteLock lock0 = grid(0).reentrantLock("lock", true, fair, true); + + assertEquals(0, lock0.getHoldCount()); + + assertFalse(lock0.hasQueuedThreads()); + + final int totalThreads = 2; + + final Set<Thread> startedThreads = new GridConcurrentHashSet<>(); + + lock0.lock(); + + IgniteInternalFuture<?> fut = multithreadedAsync( + new Callable<Object>() { + @Override public Object call() throws Exception { + assertFalse(lock0.isHeldByCurrentThread()); + + startedThreads.add(Thread.currentThread()); + + boolean isInterrupted = false; + + try { + lock0.lock(); + } + catch (IgniteInterruptedException e) { + isInterrupted = true; + + fail("Lock() method is uninterruptible."); + } + finally { + // Assert that thread was not interrupted. + assertFalse(isInterrupted); + + // Assert that interrupted flag is set and clear it in order to call unlock(). + assertTrue(Thread.interrupted()); + + // Assert that lock is still owned by this thread. + assertTrue(lock0.isLocked()); + + // Assert that this thread does own the lock. + assertTrue(lock0.isHeldByCurrentThread()); + + // Release lock. + lock0.unlock(); + } + + return null; + } + }, totalThreads); + + // Wait for all threads to attempt to acquire lock. + while (startedThreads.size() != totalThreads) { + Thread.sleep(500); + } + + for (Thread t : startedThreads) + t.interrupt(); + + lock0.unlock(); + + fut.get(); + + assertFalse(lock0.isLocked()); + + for (Thread t : startedThreads) + assertFalse(lock0.hasQueuedThread(t)); + + lock0.close(); + } + + /** + * @throws Exception If failed. + */ + public void testTryLock() throws Exception { + testTryLock(false); + + testTryLock(true); + } + + /** + * @throws Exception If failed. + */ + private void testTryLock(final boolean fair) throws Exception { + final IgniteLock lock0 = grid(0).reentrantLock("lock", true, fair, true); + + assertEquals(0, lock0.getHoldCount()); + + assertFalse(lock0.hasQueuedThreads()); + + final int totalThreads = 2; + + final Set<Thread> startedThreads = new GridConcurrentHashSet<>(); + + lock0.lock(); + + IgniteInternalFuture<?> fut = multithreadedAsync( + new Callable<Object>() { + @Override public Object call() throws Exception { + assertFalse(lock0.isHeldByCurrentThread()); + + startedThreads.add(Thread.currentThread()); + + boolean isInterrupted = false; + + boolean locked = false; + + try { + locked = lock0.tryLock(); + } + catch (IgniteInterruptedException e) { + isInterrupted = true; + + fail("tryLock() method is uninterruptible."); + } + finally { + // Assert that thread was not interrupted. + assertFalse(isInterrupted); + + // Assert that lock is locked. + assertTrue(lock0.isLocked()); + + // Assert that this thread does own the lock. + assertEquals(locked, lock0.isHeldByCurrentThread()); + + // Release lock. + if (locked) + lock0.unlock(); + } + + return null; + } + }, totalThreads); + + // Wait for all threads to attempt to acquire lock. + while (startedThreads.size() != totalThreads) { + Thread.sleep(500); + } + + for (Thread t : startedThreads) + t.interrupt(); + + fut.get(); + + lock0.unlock(); + + assertFalse(lock0.isLocked()); + + for (Thread t : startedThreads) + assertFalse(lock0.hasQueuedThread(t)); + + lock0.close(); + } + + /** + * @throws Exception If failed. + */ + public void testTryLockTimed() throws Exception { + testTryLockTimed(false); + + testTryLockTimed(true); + } + + /** + * @throws Exception If failed. + */ + private void testTryLockTimed(final boolean fair) throws Exception { + final IgniteLock lock0 = grid(0).reentrantLock("lock", true, fair, true); + + assertEquals(0, lock0.getHoldCount()); + + assertFalse(lock0.hasQueuedThreads()); + + final int totalThreads = 2; + + final Set<Thread> startedThreads = new GridConcurrentHashSet<>(); + + lock0.lock(); + + IgniteInternalFuture<?> fut = multithreadedAsync( + new Callable<Object>() { + @Override public Object call() throws Exception { + assertFalse(lock0.isHeldByCurrentThread()); + + startedThreads.add(Thread.currentThread()); + + boolean isInterrupted = false; + + boolean locked = false; + + try { + locked = lock0.tryLock(100, TimeUnit.MILLISECONDS); + } + catch (IgniteInterruptedException e) { + isInterrupted = true; + } + finally { + // Assert that thread was not interrupted. + assertFalse(isInterrupted); + + // Assert that tryLock returned false. + assertFalse(locked); + + // Assert that lock is still owned by main thread. + assertTrue(lock0.isLocked()); + + // Assert that this thread doesn't own the lock. + assertFalse(lock0.isHeldByCurrentThread()); + + // Release lock. + if (locked) + lock0.unlock(); + } + + return null; + } + }, totalThreads); + + fut.get(); + + lock0.unlock(); + + assertFalse(lock0.isLocked()); + + for (Thread t : startedThreads) + assertFalse(lock0.hasQueuedThread(t)); + + lock0.close(); + } + + /** + * @throws Exception If failed. + */ + public void testConditionAwaitUninterruptibly() throws Exception { + testConditionAwaitUninterruptibly(false); + + testConditionAwaitUninterruptibly(true); + } + + /** + * @throws Exception If failed. + */ + private void testConditionAwaitUninterruptibly(final boolean fair) throws Exception { + final IgniteLock lock0 = grid(0).reentrantLock("lock", true, fair, true); + + assertEquals(0, lock0.getHoldCount()); + + assertFalse(lock0.hasQueuedThreads()); + + final int totalThreads = 2; + + final Set<Thread> startedThreads = new GridConcurrentHashSet<>(); + + IgniteInternalFuture<?> fut = multithreadedAsync( + new Callable<Object>() { + @Override public Object call() throws Exception { + assertFalse(lock0.isHeldByCurrentThread()); + + startedThreads.add(Thread.currentThread()); + + boolean isInterrupted = false; + + lock0.lock(); + + IgniteCondition cond = lock0.getOrCreateCondition("cond"); + + try { + cond.awaitUninterruptibly(); + } + catch (IgniteInterruptedException e) { + isInterrupted = true; + } + finally { + // Assert that thread was not interrupted. + assertFalse(isInterrupted); + + // Assert that lock is still locked. + assertTrue(lock0.isLocked()); + + // Assert that this thread does own the lock. + assertTrue(lock0.isHeldByCurrentThread()); + + // Clear interrupt flag. + assertTrue(Thread.interrupted()); + + // Release lock. + if (lock0.isHeldByCurrentThread()) + lock0.unlock(); + } + + return null; + } + }, totalThreads); + + // Wait for all threads to attempt to acquire lock. + while (startedThreads.size() != totalThreads) { + Thread.sleep(500); + } + + lock0.lock(); + + for (Thread t : startedThreads) { + t.interrupt(); + + lock0.getOrCreateCondition("cond").signal(); + } + + lock0.unlock(); + + fut.get(); + + assertFalse(lock0.isLocked()); + + for (Thread t : startedThreads) + assertFalse(lock0.hasQueuedThread(t)); + + lock0.close(); + } + + /** + * @throws Exception If failed. + */ + public void testConditionInterruptAwait() throws Exception { + testConditionInterruptAwait(false); + + testConditionInterruptAwait(true); + } + + /** + * @throws Exception If failed. + */ + private void testConditionInterruptAwait(final boolean fair) throws Exception { + final IgniteLock lock0 = grid(0).reentrantLock("lock", true, fair, true); + + assertEquals(0, lock0.getHoldCount()); + + assertFalse(lock0.hasQueuedThreads()); + + final int totalThreads = 2; + + final Set<Thread> startedThreads = new GridConcurrentHashSet<>(); + + IgniteInternalFuture<?> fut = multithreadedAsync( + new Callable<Object>() { + @Override public Object call() throws Exception { + assertFalse(lock0.isHeldByCurrentThread()); + + startedThreads.add(Thread.currentThread()); + + boolean isInterrupted = false; + + lock0.lock(); + + IgniteCondition cond = lock0.getOrCreateCondition("cond"); + + try { + cond.await(); + } + catch (IgniteInterruptedException e) { + isInterrupted = true; + } + finally { + // Assert that thread was interrupted. + assertTrue(isInterrupted); + + // Assert that lock is still locked. + assertTrue(lock0.isLocked()); + + // Assert that this thread does own the lock. + assertTrue(lock0.isHeldByCurrentThread()); + + // Release lock. + if (lock0.isHeldByCurrentThread()) + lock0.unlock(); + } + + return null; + } + }, totalThreads); + + // Wait for all threads to attempt to acquire lock. + while (startedThreads.size() != totalThreads) { + Thread.sleep(500); + } + + for (Thread t : startedThreads) + t.interrupt(); + + fut.get(); + + assertFalse(lock0.isLocked()); + + for (Thread t : startedThreads) + assertFalse(lock0.hasQueuedThread(t)); + + lock0.close(); + } + + /** + * @throws Exception If failed. + */ + public void testHasQueuedThreads() throws Exception { + testHasQueuedThreads(false); + + testHasQueuedThreads(true); + } + + /** + * @throws Exception If failed. + */ + private void testHasQueuedThreads(final boolean fair) throws Exception { + final IgniteLock lock0 = grid(0).reentrantLock("lock", true, fair, true); + + assertEquals(0, lock0.getHoldCount()); + + assertFalse(lock0.hasQueuedThreads()); + + final int totalThreads = 5; + + final Set<Thread> startedThreads = new GridConcurrentHashSet<>(); + + final Set<Thread> finishedThreads = new GridConcurrentHashSet<>(); + + IgniteInternalFuture<?> fut = multithreadedAsync( + new Callable<Object>() { + @Override public Object call() throws Exception { + assertFalse(lock0.isHeldByCurrentThread()); + + startedThreads.add(Thread.currentThread()); + + lock0.lock(); + + // Wait until every thread tries to lock. + do { + Thread.sleep(1000); + } + while (startedThreads.size() != totalThreads); + + try { + info("Acquired in separate thread. "); + + assertTrue(lock0.isHeldByCurrentThread()); + + assertFalse(lock0.hasQueuedThread(Thread.currentThread())); + + finishedThreads.add(Thread.currentThread()); + + if (startedThreads.size() != finishedThreads.size()) { + assertTrue(lock0.hasQueuedThreads()); + } + + for (Thread t : startedThreads) { + assertTrue(lock0.hasQueuedThread(t) != finishedThreads.contains(t)); + } + } + finally { + lock0.unlock(); + + assertFalse(lock0.isHeldByCurrentThread()); + } + + return null; + } + }, totalThreads); + + fut.get(); + + assertFalse(lock0.hasQueuedThreads()); + + for (Thread t : startedThreads) + assertFalse(lock0.hasQueuedThread(t)); + + lock0.close(); + } + + /** + * @throws Exception If failed. + */ + public void testHasConditionQueuedThreads() throws Exception { + testHasConditionQueuedThreads(false); + + testHasConditionQueuedThreads(true); + } + + /** + * @throws Exception If failed. + */ + private void testHasConditionQueuedThreads(final boolean fair) throws Exception { + final IgniteLock lock0 = grid(0).reentrantLock("lock", true, fair, true); + + assertEquals(0, lock0.getHoldCount()); + + assertFalse(lock0.hasQueuedThreads()); + + final int totalThreads = 5; + + final Set<Thread> startedThreads = new GridConcurrentHashSet<>(); + + final Set<Thread> finishedThreads = new GridConcurrentHashSet<>(); + + IgniteInternalFuture<?> fut = multithreadedAsync( + new Callable<Object>() { + @Override public Object call() throws Exception { + assertFalse(lock0.isHeldByCurrentThread()); + + IgniteCondition cond = lock0.getOrCreateCondition("cond"); + + lock0.lock(); + + startedThreads.add(Thread.currentThread()); + + // Wait until every thread tries to lock. + do { + cond.await(); + + Thread.sleep(1000); + } + while (startedThreads.size() != totalThreads); + + try { + info("Acquired in separate thread. Number of threads waiting on condition: " + + lock0.getWaitQueueLength(cond)); + + assertTrue(lock0.isHeldByCurrentThread()); + + assertFalse(lock0.hasQueuedThread(Thread.currentThread())); + + finishedThreads.add(Thread.currentThread()); + + if (startedThreads.size() != finishedThreads.size()) { + assertTrue(lock0.hasWaiters(cond)); + } + + for (Thread t : startedThreads) { + if (!finishedThreads.contains(t)) + assertTrue(lock0.hasWaiters(cond)); + } + + assertTrue(lock0.getWaitQueueLength(cond) == (startedThreads.size() - finishedThreads.size())); + } + finally { + cond.signal(); + + lock0.unlock(); + + assertFalse(lock0.isHeldByCurrentThread()); + } + + return null; + } + }, totalThreads); + + IgniteCondition cond = lock0.getOrCreateCondition("cond"); + + lock0.lock(); + + try { + // Wait until all threads are waiting on condition. + while (lock0.getWaitQueueLength(cond) != totalThreads) { + lock0.unlock(); + + Thread.sleep(1000); + + lock0.lock(); + } + + // Signal once to get things started. + cond.signal(); + } + finally { + lock0.unlock(); + } + + fut.get(); + + assertFalse(lock0.hasQueuedThreads()); + + for (Thread t : startedThreads) + assertFalse(lock0.hasQueuedThread(t)); + + lock0.close(); + } + + /** + * Tests if lock is evenly acquired among nodes when fair flag is set on. + * Since exact ordering of lock acquisitions cannot be guaranteed because it also depends + * on the OS thread scheduling, certain deviation from uniform distribution is tolerated. + * @throws Exception If failed. + */ + public void testFairness() throws Exception { + if (gridCount() == 1) + return; + + // Total number of ops. + final long opsCount = 10000; + + // Allowed deviation from uniform distribution. + final double tolerance = 0.05; + + // Shared counter. + final String OPS_COUNTER = "ops_counter"; + + // Number of threads, one per node. + final int threadCount = gridCount(); + + final AtomicLong threadCounter = new AtomicLong(0); + + Ignite ignite = startGrid(gridCount()); + + // Initialize reentrant lock. + IgniteLock l = ignite.reentrantLock("lock", true, true, true); + + // Initialize OPS_COUNTER. + ignite.getOrCreateCache(OPS_COUNTER).put(OPS_COUNTER, (long)0); + + final Map<Integer, Long> counts = new ConcurrentHashMap<>(); + + IgniteInternalFuture<?> fut = multithreadedAsync( + new Callable<Object>() { + @Override public Object call() throws Exception { + final int localNodeId = (int)threadCounter.getAndIncrement(); + + final Ignite grid = grid(localNodeId); + + IgniteClosure<Ignite, Long> closure = new IgniteClosure<Ignite, Long>() { + @Override public Long apply(Ignite ignite) { + IgniteLock l = ignite.reentrantLock("lock", true, true, true); + + long localCount = 0; + + IgniteCountDownLatch latch = ignite.countDownLatch("latch", threadCount, false, true); + + latch.countDown(); + + latch.await(); + + while(true){ + l.lock(); + + try { + long opsCounter = (long) ignite.getOrCreateCache(OPS_COUNTER).get(OPS_COUNTER); + + if(opsCounter == opsCount) + break; + + ignite.getOrCreateCache(OPS_COUNTER).put(OPS_COUNTER, ++opsCounter); + + localCount++; + + if(localCount > 1000){ + assertTrue(localCount < (1 + tolerance) * opsCounter / threadCount); + + assertTrue(localCount > (1 - tolerance) * opsCounter / threadCount); + } + + if(localCount % 100 == 0) { + info("Node [id=" +ignite.cluster().localNode().id() + "] acquired " + + localCount + " times. " + "Total ops count: " + + opsCounter + "/" + opsCount +"]"); + } + } + finally { + l.unlock(); + } + } + + return localCount; + } + }; + + long localCount = closure.apply(grid); + + counts.put(localNodeId, localCount); + + return null; + } + }, threadCount); + + fut.get(); + + long totalSum = 0; + + for(int i=0; i<gridCount(); i++){ + + totalSum += counts.get(i); + + info("Node " + grid(i).localNode().id() + " acquired the lock " + counts.get(i) + " times. "); + } + + assertEquals(totalSum, opsCount); + + l.close(); + + ignite.close(); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + // No-op. + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f8bf93a7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java index e60aed3..200e276 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java @@ -77,6 +77,7 @@ public abstract class IgniteSemaphoreAbstractSelfTest extends IgniteAtomicsAbstr */ public void testSemaphore() throws Exception { checkSemaphore(); + checkSemaphoreSerialization(); } /** @@ -231,6 +232,36 @@ public abstract class IgniteSemaphoreAbstractSelfTest extends IgniteAtomicsAbstr } /** + * @throws Exception If failed. + */ + private void checkSemaphoreSerialization() throws Exception { + final IgniteSemaphore sem = grid(0).semaphore("semaphore", -gridCount() + 1, true, true); + + assertEquals(-gridCount() + 1, sem.availablePermits()); + + grid(0).compute().broadcast(new IgniteCallable<Object>() { + @Nullable @Override public Object call() throws Exception { + sem.release(); + + return null; + } + }); + + assert sem.availablePermits() == 1; + + sem.acquire(); + + assert sem.availablePermits() == 0; + + sem.release(); + + // Test operations on removed semaphore. + sem.close(); + + checkRemovedSemaphore(sem); + } + + /** * @param semaphore Semaphore. * @throws Exception If failed. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/f8bf93a7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/local/IgniteLocalLockSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/local/IgniteLocalLockSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/local/IgniteLocalLockSelfTest.java new file mode 100644 index 0000000..7e1a11c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/local/IgniteLocalLockSelfTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.datastructures.local; + +import java.util.concurrent.Callable; +import org.apache.ignite.IgniteLock; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.cache.datastructures.IgniteLockAbstractSelfTest; +import org.apache.ignite.testframework.GridTestUtils; +import org.jetbrains.annotations.Nullable; + +import static java.util.concurrent.TimeUnit.MINUTES; +import static org.apache.ignite.cache.CacheMode.LOCAL; + +/** + * + */ +public class IgniteLocalLockSelfTest extends IgniteLockAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected CacheMode atomicsCacheMode() { + return LOCAL; + } + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override public void testReentrantLock() throws Exception { + // Test main functionality. + IgniteLock lock = grid(0).reentrantLock("lock", true, false, true); + + assertNotNull(lock); + + assertEquals(0, lock.getHoldCount()); + + lock.lock(); + + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync( + new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + IgniteLock lock = grid(0).reentrantLock("lock", true, false, true); + + assert lock != null; + + info("Thread is going to wait on lock: " + Thread.currentThread().getName()); + + assert lock.tryLock(1, MINUTES); + + info("Thread is again runnable: " + Thread.currentThread().getName()); + + lock.unlock(); + + return null; + } + }, + THREADS_CNT, + "test-thread" + ); + + Thread.sleep(3000); + + assert lock.isLocked(); + + assert lock.getHoldCount() == 1; + + lock.lock(); + + assert lock.isLocked(); + + assert lock.getHoldCount() == 2; + + lock.unlock(); + + assert lock.isLocked(); + + assert lock.getHoldCount() == 1; + + lock.unlock(); + + // Ensure there are no hangs. + fut.get(); + + // Test operations on removed lock. + IgniteLock lock0 = grid(0).reentrantLock("lock", true, false, false); + + assertNotNull(lock0); + + lock0.close(); + + checkRemovedReentrantLock(lock0); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f8bf93a7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedLockSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedLockSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedLockSelfTest.java new file mode 100644 index 0000000..787f1e3 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedLockSelfTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.datastructures.partitioned; + +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.internal.processors.cache.datastructures.IgniteLockAbstractSelfTest; + +import static org.apache.ignite.cache.CacheMode.PARTITIONED; + +/** + * + */ +public class IgnitePartitionedLockSelfTest extends IgniteLockAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected CacheMode atomicsCacheMode() { + return PARTITIONED; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f8bf93a7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/IgniteReplicatedLockSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/IgniteReplicatedLockSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/IgniteReplicatedLockSelfTest.java new file mode 100644 index 0000000..00bb0fa --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/IgniteReplicatedLockSelfTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.datastructures.replicated; + +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.internal.processors.cache.datastructures.IgniteLockAbstractSelfTest; + +import static org.apache.ignite.cache.CacheMode.REPLICATED; + +/** + * + */ +public class IgniteReplicatedLockSelfTest extends IgniteLockAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected CacheMode atomicsCacheMode() { + return REPLICATED; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f8bf93a7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheLockAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheLockAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheLockAbstractTest.java index ae591bd..43acc41 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheLockAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheLockAbstractTest.java @@ -573,4 +573,4 @@ public abstract class GridCacheLockAbstractTest extends GridCommonAbstractTest { } } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f8bf93a7/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheDataStructuresLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheDataStructuresLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheDataStructuresLoadTest.java index d4ca9a5..18537c5 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheDataStructuresLoadTest.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheDataStructuresLoadTest.java @@ -27,6 +27,7 @@ import org.apache.ignite.IgniteAtomicStamped; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteCountDownLatch; +import org.apache.ignite.IgniteLock; import org.apache.ignite.IgniteQueue; import org.apache.ignite.IgniteSemaphore; import org.apache.ignite.IgniteSystemProperties; @@ -64,6 +65,9 @@ public final class GridCacheDataStructuresLoadTest extends GridCacheAbstractLoad /** Semaphore name. */ private static final String TEST_SEMAPHORE_NAME = "test-semaphore"; + /** Reentrant lock name. */ + private static final String TEST_REENTRANT_LOCK_NAME = "test-reentrant-lock"; + /** */ private static final CollectionConfiguration colCfg = new CollectionConfiguration(); @@ -98,6 +102,9 @@ public final class GridCacheDataStructuresLoadTest extends GridCacheAbstractLoad private static final boolean SEMAPHORE = true; /** */ + private static final boolean REENTRANTLOCK = true; + + /** */ private GridCacheDataStructuresLoadTest() { // No-op } @@ -347,6 +354,44 @@ public final class GridCacheDataStructuresLoadTest extends GridCacheAbstractLoad } }; + + /** Reentrant lock read closure. */ + private final CIX1<Ignite> reentrantLockReadClos = + new CIX1<Ignite>() { + @Override public void applyx(Ignite ignite) { + IgniteLock r = ignite.reentrantLock(TEST_REENTRANT_LOCK_NAME, true, false, true); + + for (int i = 0; i < operationsPerTx; i++) { + r.isLocked(); + + long cnt = reads.incrementAndGet(); + + if (cnt % READ_LOG_MOD == 0) + info("Performed " + cnt + " reads."); + } + } + }; + + /** Reentrant lock write closure. */ + private final CIX1<Ignite> reentrantLockWriteClos = + new CIX1<Ignite>() { + @Override public void applyx(Ignite ignite) { + IgniteLock r = ignite.reentrantLock(TEST_REENTRANT_LOCK_NAME, true, false, true); + + for (int i = 0; i < operationsPerTx; i++) { + if ((i % 2) == 0) + r.lock(); + else + r.unlock(); + + long cnt = writes.incrementAndGet(); + + if (cnt % WRITE_LOG_MOD == 0) + info("Performed " + cnt + " writes."); + } + } + }; + /** * @param args Arguments. * @throws IgniteCheckedException In case of error. @@ -417,6 +462,14 @@ public final class GridCacheDataStructuresLoadTest extends GridCacheAbstractLoad test.loadTestIgnite(test.semaphoreWriteClos, test.semaphoreReadClos); } + + System.gc(); + + if (REENTRANTLOCK) { + info("Testing reentrant lock..."); + + test.loadTestIgnite(test.reentrantLockWriteClos, test.reentrantLockReadClos); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/f8bf93a7/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java index c49c730..c9859fc 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java @@ -30,6 +30,7 @@ import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCluster; import org.apache.ignite.IgniteCompute; import org.apache.ignite.IgniteCountDownLatch; +import org.apache.ignite.IgniteLock; import org.apache.ignite.IgniteSemaphore; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.IgniteEvents; @@ -367,6 +368,15 @@ public class IgniteMock implements Ignite { } /** {@inheritDoc} */ + @Nullable @Override public IgniteLock reentrantLock(String name, + boolean failoverSafe, + boolean fair, + boolean create) + { + return null; + } + + /** {@inheritDoc} */ @Nullable @Override public <T> IgniteQueue<T> queue(String name, int cap, CollectionConfiguration cfg) http://git-wip-us.apache.org/repos/asf/ignite/blob/f8bf93a7/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java index a2e0d5a..2598bc5 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java @@ -42,6 +42,7 @@ import org.apache.ignite.IgniteIllegalStateException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteMessaging; import org.apache.ignite.IgniteQueue; +import org.apache.ignite.IgniteLock; import org.apache.ignite.IgniteScheduler; import org.apache.ignite.IgniteSemaphore; import org.apache.ignite.IgniteServices; @@ -545,6 +546,12 @@ public class IgniteProcessProxy implements IgniteEx { } /** {@inheritDoc} */ + @Override public IgniteLock reentrantLock(String name, boolean failoverSafe, + boolean fair, boolean create) throws IgniteException { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ @Override public <T> IgniteQueue<T> queue(String name, int cap, @Nullable CollectionConfiguration cfg) throws IgniteException { throw new UnsupportedOperationException("Operation isn't supported yet."); http://git-wip-us.apache.org/repos/asf/ignite/blob/f8bf93a7/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java index e663a99..59a18e5 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java @@ -33,6 +33,7 @@ import org.apache.ignite.internal.processors.cache.datastructures.local.GridCach import org.apache.ignite.internal.processors.cache.datastructures.local.GridCacheLocalSetSelfTest; import org.apache.ignite.internal.processors.cache.datastructures.local.IgniteLocalAtomicLongApiSelfTest; import org.apache.ignite.internal.processors.cache.datastructures.local.IgniteLocalCountDownLatchSelfTest; +import org.apache.ignite.internal.processors.cache.datastructures.local.IgniteLocalLockSelfTest; import org.apache.ignite.internal.processors.cache.datastructures.local.IgniteLocalSemaphoreSelfTest; import org.apache.ignite.internal.processors.cache.datastructures.partitioned.GridCachePartitionedAtomicOffheapQueueApiSelfTest; import org.apache.ignite.internal.processors.cache.datastructures.partitioned.GridCachePartitionedAtomicOffheapQueueCreateMultiNodeSelfTest; @@ -66,6 +67,7 @@ import org.apache.ignite.internal.processors.cache.datastructures.partitioned.Gr import org.apache.ignite.internal.processors.cache.datastructures.partitioned.GridCachePartitionedSetSelfTest; import org.apache.ignite.internal.processors.cache.datastructures.partitioned.IgnitePartitionedAtomicLongApiSelfTest; import org.apache.ignite.internal.processors.cache.datastructures.partitioned.IgnitePartitionedCountDownLatchSelfTest; +import org.apache.ignite.internal.processors.cache.datastructures.partitioned.IgnitePartitionedLockSelfTest; import org.apache.ignite.internal.processors.cache.datastructures.partitioned.IgnitePartitionedQueueNoBackupsTest; import org.apache.ignite.internal.processors.cache.datastructures.partitioned.IgnitePartitionedSemaphoreSelfTest; import org.apache.ignite.internal.processors.cache.datastructures.partitioned.IgnitePartitionedSetNoBackupsSelfTest; @@ -80,6 +82,7 @@ import org.apache.ignite.internal.processors.cache.datastructures.replicated.Gri import org.apache.ignite.internal.processors.cache.datastructures.replicated.GridCacheReplicatedSetSelfTest; import org.apache.ignite.internal.processors.cache.datastructures.replicated.IgniteReplicatedAtomicLongApiSelfTest; import org.apache.ignite.internal.processors.cache.datastructures.replicated.IgniteReplicatedCountDownLatchSelfTest; +import org.apache.ignite.internal.processors.cache.datastructures.replicated.IgniteReplicatedLockSelfTest; import org.apache.ignite.internal.processors.cache.datastructures.replicated.IgniteReplicatedSemaphoreSelfTest; import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheAtomicReplicatedNodeRestartSelfTest; @@ -107,6 +110,7 @@ public class IgniteCacheDataStructuresSelfTestSuite extends TestSuite { suite.addTest(new TestSuite(GridCacheLocalAtomicQueueApiSelfTest.class)); suite.addTest(new TestSuite(IgniteLocalCountDownLatchSelfTest.class)); suite.addTest(new TestSuite(IgniteLocalSemaphoreSelfTest.class)); + suite.addTest(new TestSuite(IgniteLocalLockSelfTest.class)); suite.addTest(new TestSuite(GridCacheReplicatedSequenceApiSelfTest.class)); suite.addTest(new TestSuite(GridCacheReplicatedSequenceMultiNodeSelfTest.class)); @@ -117,6 +121,7 @@ public class IgniteCacheDataStructuresSelfTestSuite extends TestSuite { suite.addTest(new TestSuite(GridCacheReplicatedDataStructuresFailoverSelfTest.class)); suite.addTest(new TestSuite(IgniteReplicatedCountDownLatchSelfTest.class)); suite.addTest(new TestSuite(IgniteReplicatedSemaphoreSelfTest.class)); + suite.addTest(new TestSuite(IgniteReplicatedLockSelfTest.class)); suite.addTest(new TestSuite(IgniteCacheAtomicReplicatedNodeRestartSelfTest.class)); suite.addTest(new TestSuite(GridCachePartitionedSequenceApiSelfTest.class)); @@ -139,6 +144,7 @@ public class IgniteCacheDataStructuresSelfTestSuite extends TestSuite { suite.addTest(new TestSuite(IgnitePartitionedCountDownLatchSelfTest.class)); suite.addTest(new TestSuite(IgniteDataStructureWithJobTest.class)); suite.addTest(new TestSuite(IgnitePartitionedSemaphoreSelfTest.class)); + suite.addTest(new TestSuite(IgnitePartitionedLockSelfTest.class)); suite.addTest(new TestSuite(GridCachePartitionedSetFailoverSelfTest.class)); suite.addTest(new TestSuite(GridCachePartitionedOffheapSetFailoverSelfTest.class)); http://git-wip-us.apache.org/repos/asf/ignite/blob/f8bf93a7/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java ---------------------------------------------------------------------- diff --git a/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java b/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java index 92d8c1d..03c7b0e 100644 --- a/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java +++ b/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java @@ -425,6 +425,17 @@ public class IgniteSpringBean implements Ignite, DisposableBean, InitializingBea } /** {@inheritDoc} */ + @Nullable @Override public IgniteLock reentrantLock(String name, + boolean failoverSafe, + boolean fair, + boolean create) + { + assert g != null; + + return g.reentrantLock(name, failoverSafe, create, fair); + } + + /** {@inheritDoc} */ @Nullable @Override public <T> IgniteQueue<T> queue(String name, int cap, CollectionConfiguration cfg)
