IGNITE-3339 - Fixed entries eviction.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bb7a6728 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bb7a6728 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bb7a6728 Branch: refs/heads/ignite-1232 Commit: bb7a6728bbd886677a4e7f0c7ad92161dd51122b Parents: 8ce29a9 Author: Alexey Goncharuk <[email protected]> Authored: Wed Jun 22 06:34:57 2016 -0700 Committer: Alexey Goncharuk <[email protected]> Committed: Wed Jun 22 06:42:38 2016 -0700 ---------------------------------------------------------------------- .../distributed/near/GridNearGetFuture.java | 8 +- .../transactions/IgniteTxLocalAdapter.java | 12 +- .../cache/transactions/IgniteTxManager.java | 16 +- .../IgniteCacheTxIteratorSelfTest.java | 241 +++++++++++++++++++ .../testsuites/IgniteCacheTestSuite5.java | 2 + 5 files changed, 266 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/bb7a6728/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index 290c08e..b7fcbbd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -62,8 +62,6 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; - /** * */ @@ -636,7 +634,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap catch (GridCacheEntryRemovedException ignored) { // Retry. } - catch (GridDhtInvalidPartitionException e) { + catch (GridDhtInvalidPartitionException ignored) { return false; } catch (IgniteCheckedException e) { @@ -645,7 +643,9 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap return false; } finally { - if (dhtEntry != null && (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED))) + if (dhtEntry != null) + // Near cache is enabled, so near entry will be enlisted in the transaction. + // Always touch DHT entry in this case. dht.context().evicts().touch(dhtEntry, topVer); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/bb7a6728/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index d51d873..d9aca4a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -1395,11 +1395,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig log.debug("Got removed entry in transaction getAllAsync(..) (will retry): " + key); } finally { - if (cacheCtx.isNear() && entry != null && readCommitted()) { - if (cacheCtx.affinity().belongs(cacheCtx.localNode(), entry.partition(), topVer)) { - if (entry.markObsolete(xidVer)) - cacheCtx.cache().removeEntry(entry); + if (entry != null && readCommitted()) { + if (cacheCtx.isNear()) { + if (cacheCtx.affinity().belongs(cacheCtx.localNode(), entry.partition(), topVer)) { + if (entry.markObsolete(xidVer)) + cacheCtx.cache().removeEntry(entry); + } } + else + entry.context().evicts().touch(entry, topVer); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/bb7a6728/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 4ec280f..e8d20b6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -1236,7 +1236,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * @param tx Transaction to finish. * @param commit {@code True} if transaction is committed, {@code false} if rolled back. */ - public void fastFinishTx(IgniteInternalTx tx, boolean commit) { + public void fastFinishTx(GridNearTxLocal tx, boolean commit) { assert tx != null; assert tx.writeMap().isEmpty(); assert tx.optimistic() || tx.readMap().isEmpty(); @@ -1247,16 +1247,22 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { // 1. Notify evictions. notifyEvitions(tx); - // 2. Remove obsolete entries. + // 2. Evict near entries. + if (!tx.readMap().isEmpty()) { + for (IgniteTxEntry entry : tx.readMap().values()) + tx.evictNearEntry(entry, false); + } + + // 3. Remove obsolete entries. removeObsolete(tx); - // 3. Remove from per-thread storage. + // 4. Remove from per-thread storage. clearThreadMap(tx); - // 4. Clear context. + // 5. Clear context. resetContext(); - // 5. Update metrics. + // 6. Update metrics. if (!tx.dht() && tx.local()) { if (!tx.system()) { if (commit) http://git-wip-us.apache.org/repos/asf/ignite/blob/bb7a6728/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheTxIteratorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheTxIteratorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheTxIteratorSelfTest.java new file mode 100644 index 0000000..769a5f6 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheTxIteratorSelfTest.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMemoryMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.eviction.fifo.FifoEvictionPolicy; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.configuration.TransactionConfiguration; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; + +import javax.cache.Cache; + +/** + * + */ +public class IgniteCacheTxIteratorSelfTest extends GridCommonAbstractTest { + /** */ + public static final String CACHE_NAME = "testCache"; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + final IgniteConfiguration cfg = super.getConfiguration(gridName); + + final TransactionConfiguration txCfg = new TransactionConfiguration(); + + txCfg.setDefaultTxIsolation(TransactionIsolation.READ_COMMITTED); + + cfg.setTransactionConfiguration(txCfg); + + return cfg; + } + + /** + * @return Cache configuration. + */ + private CacheConfiguration<String, TestClass> cacheConfiguration( + CacheMode mode, + CacheAtomicityMode atomMode, + CacheMemoryMode memMode, + boolean nearEnabled, + boolean useEvictPlc + ) { + final CacheConfiguration<String, TestClass> ccfg = new CacheConfiguration<>(CACHE_NAME); + + ccfg.setAtomicityMode(atomMode); + ccfg.setCacheMode(mode); + ccfg.setMemoryMode(memMode); + ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + + if (nearEnabled) + ccfg.setNearConfiguration(new NearCacheConfiguration<String, TestClass>()); + + if (memMode == CacheMemoryMode.ONHEAP_TIERED && useEvictPlc) { + ccfg.setOffHeapMaxMemory(10 * 1024 * 1024); + ccfg.setEvictionPolicy(new FifoEvictionPolicy(50)); + } + + return ccfg; + } + + /** + * @throws Exception if failed. + */ + public void testModesSingleNode() throws Exception { + checkModes(1); + } + + /** + * @throws Exception if failed. + */ + public void testModesMultiNode() throws Exception { + checkModes(3); + } + + /** + * @throws Exception if failed. + */ + public void checkModes(int gridCnt) throws Exception { + startGrids(gridCnt); + + try { + for (CacheMode mode : CacheMode.values()) { + for (CacheAtomicityMode atomMode : CacheAtomicityMode.values()) { + for (CacheMemoryMode memMode : CacheMemoryMode.values()) { + if (mode == CacheMode.PARTITIONED) { + // Near cache makes sense only for partitioned cache. + checkTxCache(CacheMode.PARTITIONED, atomMode, memMode, true, false); + } + + if (memMode == CacheMemoryMode.ONHEAP_TIERED) + checkTxCache(mode, atomMode, CacheMemoryMode.ONHEAP_TIERED, false, true); + + checkTxCache(mode, atomMode, memMode, false, false); + } + } + } + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + private void checkTxCache( + CacheMode mode, + CacheAtomicityMode atomMode, + CacheMemoryMode memMode, + boolean nearEnabled, + boolean useEvicPlc + ) throws Exception { + final Ignite ignite = grid(0); + + final CacheConfiguration<String, TestClass> ccfg = cacheConfiguration( + mode, + atomMode, + memMode, + nearEnabled, + useEvicPlc); + + final IgniteCache<String, TestClass> cache = ignite.createCache(ccfg); + + info("Checking cache [mode=" + mode + ", atomMode=" + atomMode + ", memMode=" + memMode + + ", near=" + nearEnabled + ']'); + + try { + for (int i = 0; i < 30; i++) { + final TestClass val = new TestClass("data"); + final String key = "key-" + i; + + cache.put(key, val); + + assertEquals(i + 1, cache.size()); + + for (TransactionIsolation iso : TransactionIsolation.values()) { + for (TransactionConcurrency con : TransactionConcurrency.values()) { + try (Transaction transaction = ignite.transactions().txStart(con, iso)) { + assertEquals(val, cache.get(key)); + + transaction.commit(); + } + + int cnt = iterateOverKeys(cache); + + assertEquals("Failed [con=" + con + ", iso=" + iso + ']', i + 1, cnt); + + assertEquals("Failed [con=" + con + ", iso=" + iso + ']', i + 1, cache.size()); + } + } + } + } + finally { + grid(0).destroyCache(CACHE_NAME); + } + } + + /** + * @param cache Cache. + */ + @SuppressWarnings("TypeMayBeWeakened") + private int iterateOverKeys(final IgniteCache<String, TestClass> cache) { + int cnt = 0; + + for (final Cache.Entry<String, TestClass> ignore : cache) + cnt++; + + return cnt; + } + + /** + * + */ + private static class TestClass { + /** */ + private String data; + + /** + * @param data Data. + */ + private TestClass(String data) { + this.data = data; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + final TestClass testCls = (TestClass)o; + + return data != null ? data.equals(testCls.data) : testCls.data == null; + + } + + /** + * {@inheritDoc} + */ + @Override + public int hashCode() { + return data != null ? data.hashCode() : 0; + } + + /** + * {@inheritDoc} + */ + @Override + public String toString() { + return S.toString(TestClass.class, this); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/bb7a6728/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java index 41e0ed1..7582f5c 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java @@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.cache.distributed.CacheLateAffinity import org.apache.ignite.internal.processors.cache.distributed.CacheLateAffinityAssignmentNodeJoinValidationTest; import org.apache.ignite.internal.processors.cache.distributed.CacheLateAffinityAssignmentTest; import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheSyncRebalanceModeSelfTest; +import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheTxIteratorSelfTest; import org.apache.ignite.internal.processors.cache.store.IgniteCacheWriteBehindNoUpdateSelfTest; /** @@ -56,6 +57,7 @@ public class IgniteCacheTestSuite5 extends TestSuite { suite.addTestSuite(IgniteCacheSyncRebalanceModeSelfTest.class); suite.addTest(IgniteCacheReadThroughEvictionsVariationsSuite.suite()); + suite.addTestSuite(IgniteCacheTxIteratorSelfTest.class); return suite; }
