Repository: ignite Updated Branches: refs/heads/ignite-971 52dea6d36 -> ee0041f8b
ignite-971 Fix offheap to swap eviction. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ee0041f8 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ee0041f8 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ee0041f8 Branch: refs/heads/ignite-971 Commit: ee0041f8b0c4de7c5965538b81ae9c26e4a10cbd Parents: 52dea6d Author: sboikov <[email protected]> Authored: Fri Sep 11 17:27:40 2015 +0300 Committer: sboikov <[email protected]> Committed: Fri Sep 11 17:40:49 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheMapEntry.java | 2 +- .../processors/cache/GridCacheSwapManager.java | 5 +- .../GridCacheAbstractRemoveFailureTest.java | 109 +++++++++++-------- 3 files changed, 65 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ee0041f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 5a7cc42..f2bb646 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -3023,7 +3023,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme synchronized (this) { checkObsolete(); - if (isNew() || (!preload && deletedUnlocked())) { + if ((isNew() && !cctx.swap().containsKey(key, partition())) || (!preload && deletedUnlocked())) { long expTime = expireTime < 0 ? CU.toExpireTime(ttl) : expireTime; val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx); http://git-wip-us.apache.org/repos/asf/ignite/blob/ee0041f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java index b5f754a..389d63d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java @@ -525,17 +525,16 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { /** * @param key Key to check. + * @param part Partition. * @return {@code True} if key is contained. * @throws IgniteCheckedException If failed. */ - public boolean containsKey(KeyCacheObject key) throws IgniteCheckedException { + public boolean containsKey(KeyCacheObject key, int part) throws IgniteCheckedException { if (!offheapEnabled && !swapEnabled) return false; checkIteratorQueue(); - int part = cctx.affinity().partition(key); - // First check off-heap store. if (offheapEnabled) { boolean contains = offheap.contains(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext())); http://git-wip-us.apache.org/repos/asf/ignite/blob/ee0041f8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractRemoveFailureTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractRemoveFailureTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractRemoveFailureTest.java index 3b4af81..647746e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractRemoveFailureTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractRemoveFailureTest.java @@ -21,17 +21,15 @@ import java.util.Collection; import java.util.HashSet; import java.util.Map; import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.atomic.AtomicReference; import javax.cache.CacheException; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.CacheAtomicWriteOrderMode; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMode; @@ -66,7 +64,7 @@ public abstract class GridCacheAbstractRemoveFailureTest extends GridCommonAbstr private static final int GRID_CNT = 3; /** Keys count. */ - private static final int KEYS_CNT = 10000; + private static final int KEYS_CNT = 10_000; /** Test duration. */ private static final long DUR = 90 * 1000L; @@ -80,20 +78,8 @@ public abstract class GridCacheAbstractRemoveFailureTest extends GridCommonAbstr /** Start delay. */ private static final T2<Integer, Integer> START_DELAY = new T2<>(2000, 5000); - /** Node kill lock (used to prevent killing while cache data is compared). */ - private final Lock killLock = new ReentrantLock(); - - /** */ - private CountDownLatch assertLatch; - - /** */ - private CountDownLatch updateLatch; - - /** Caches comparison request flag. */ - private volatile boolean cmp; - /** */ - private String sizePropVal; + private static String sizePropVal; /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { @@ -170,6 +156,20 @@ public abstract class GridCacheAbstractRemoveFailureTest extends GridCommonAbstr } /** + * @throws Exception If failed. + */ + public void testPutAndRemoveOffheapEvict() throws Exception { + putAndRemove(30_000, GridTestUtils.TestMemoryMode.OFFHEAP_EVICT); + } + + /** + * @throws Exception If failed. + */ + public void testPutAndRemoveOffheapEvictSwap() throws Exception { + putAndRemove(30_000, GridTestUtils.TestMemoryMode.OFFHEAP_EVICT_SWAP); + } + + /** * @param duration Test duration. * @param memMode Memory mode. * @throws Exception If failed. @@ -179,7 +179,7 @@ public abstract class GridCacheAbstractRemoveFailureTest extends GridCommonAbstr grid(0).destroyCache(null); - CacheConfiguration ccfg = new CacheConfiguration(); + CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(); ccfg.setWriteSynchronizationMode(FULL_SYNC); @@ -205,8 +205,12 @@ public abstract class GridCacheAbstractRemoveFailureTest extends GridCommonAbstr // Expected values in cache. final Map<Integer, GridTuple<Integer>> expVals = new ConcurrentHashMap8<>(); + final AtomicReference<CyclicBarrier> cmp = new AtomicReference<>(); + IgniteInternalFuture<?> updateFut = GridTestUtils.runAsync(new Callable<Void>() { @Override public Void call() throws Exception { + Thread.currentThread().setName("update-thread"); + ThreadLocalRandom rnd = ThreadLocalRandom.current(); while (!stop.get()) { @@ -243,10 +247,14 @@ public abstract class GridCacheAbstractRemoveFailureTest extends GridCommonAbstr cntr.addAndGet(100); - if (cmp) { - assertLatch.countDown(); + CyclicBarrier barrier = cmp.get(); + + if (barrier != null) { + log.info("Wait data check."); + + barrier.await(60_000, TimeUnit.MILLISECONDS); - updateLatch.await(); + log.info("Finished wait data check."); } } @@ -256,16 +264,21 @@ public abstract class GridCacheAbstractRemoveFailureTest extends GridCommonAbstr IgniteInternalFuture<?> killFut = GridTestUtils.runAsync(new Callable<Void>() { @Override public Void call() throws Exception { + Thread.currentThread().setName("restart-thread"); + while (!stop.get()) { U.sleep(random(KILL_DELAY.get1(), KILL_DELAY.get2())); - killLock.lock(); + killAndRestart(stop); - try { - killAndRestart(stop); - } - finally { - killLock.unlock(); + CyclicBarrier barrier = cmp.get(); + + if (barrier != null) { + log.info("Wait data check."); + + barrier.await(60_000, TimeUnit.MILLISECONDS); + + log.info("Finished wait data check."); } } @@ -294,31 +307,34 @@ public abstract class GridCacheAbstractRemoveFailureTest extends GridCommonAbstr log.info("Operations/second: " + opsPerSecond); if (U.currentTimeMillis() >= nextAssert) { - updateLatch = new CountDownLatch(1); + CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() { + @Override public void run() { + try { + cmp.set(null); - assertLatch = new CountDownLatch(1); + log.info("Checking cache content."); - cmp = true; + assertCacheContent(expVals); - killLock.lock(); + log.info("Finished check cache content."); + } + catch (Throwable e) { + log.error("Unexpected error: " + e, e); - try { - if (!assertLatch.await(60_000, TimeUnit.MILLISECONDS)) - throw new IgniteCheckedException("Failed to suspend thread executing updates."); + throw e; + } + } + }); - log.info("Checking cache content."); + log.info("Start cache content check."); - assertCacheContent(expVals); + cmp.set(barrier); - nextAssert = System.currentTimeMillis() + ASSERT_FREQ; - } - finally { - killLock.unlock(); + barrier.await(60_000, TimeUnit.MILLISECONDS); - updateLatch.countDown(); + log.info("Cache content check done."); - U.sleep(500); - } + nextAssert = System.currentTimeMillis() + ASSERT_FREQ; } } } @@ -337,7 +353,7 @@ public abstract class GridCacheAbstractRemoveFailureTest extends GridCommonAbstr * @param stop Stop flag. * @throws Exception If failed. */ - void killAndRestart(AtomicBoolean stop) throws Exception { + private void killAndRestart(AtomicBoolean stop) throws Exception { if (stop.get()) return; @@ -361,10 +377,9 @@ public abstract class GridCacheAbstractRemoveFailureTest extends GridCommonAbstr /** * @param expVals Expected values in cache. - * @throws Exception If failed. */ @SuppressWarnings({"TooBroadScope", "ConstantIfStatement"}) - private void assertCacheContent(Map<Integer, GridTuple<Integer>> expVals) throws Exception { + private void assertCacheContent(Map<Integer, GridTuple<Integer>> expVals) { assert !expVals.isEmpty(); Collection<Integer> failedKeys = new HashSet<>();
