Repository: ignite Updated Branches: refs/heads/ignite-1607 de2bdc4e9 -> ca8d0364e
ignite-1607 WIP Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ca8d0364 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ca8d0364 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ca8d0364 Branch: refs/heads/ignite-1607 Commit: ca8d0364ea186416533e63d73888ee8fdaf6d835 Parents: de2bdc4 Author: sboikov <[email protected]> Authored: Wed Oct 21 12:01:13 2015 +0300 Committer: sboikov <[email protected]> Committed: Wed Oct 21 12:01:13 2015 +0300 ---------------------------------------------------------------------- .../distributed/near/GridNearCacheEntry.java | 27 +++---- .../distributed/near/GridNearGetFuture.java | 77 ++++++++++---------- .../transactions/IgniteTxLocalAdapter.java | 9 +-- .../cache/CacheNearReaderUpdateTest.java | 2 +- ...niteCacheClientNodeChangingTopologyTest.java | 3 + 5 files changed, 57 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ca8d0364/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java index 7bfd979..0ad236c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java @@ -402,8 +402,6 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { throws IgniteCheckedException, GridCacheEntryRemovedException { assert dhtVer != null; - boolean valid = valid(tx != null ? tx.topologyVersion() : cctx.affinity().affinityTopologyVersion()); - GridCacheVersion enqueueVer = null; try { @@ -418,28 +416,25 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { CacheObject old = this.val; boolean hasVal = hasValueUnlocked(); - if (isNew() || !valid) { + if (this.dhtVer == null || this.dhtVer.compareTo(dhtVer) < 0) { primaryNode(primaryNodeId, topVer); - // Change entry only if dht version has changed. - if (this.dhtVer == null || this.dhtVer.compareTo(dhtVer) < 0) { - update(val, expireTime, ttl, ver); + update(val, expireTime, ttl, ver); - if (cctx.deferredDelete() && !isInternal()) { - boolean deleted = val == null; + if (cctx.deferredDelete() && !isInternal()) { + boolean deleted = val == null; - if (deleted != deletedUnlocked()) { - deletedUnlocked(deleted); + if (deleted != deletedUnlocked()) { + deletedUnlocked(deleted); - if (deleted) - enqueueVer = ver; - } + if (deleted) + enqueueVer = ver; } + } - this.dhtVer = dhtVer; + this.dhtVer = dhtVer; - ret = true; - } + ret = true; } if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) http://git-wip-us.apache.org/repos/asf/ignite/blob/ca8d0364/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index 7384a89..8dc273c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -268,16 +268,17 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings = U.newHashMap(affNodes.size()); - Map<KeyCacheObject, GridNearCacheEntry> savedVers = null; + Map<KeyCacheObject, GridNearCacheEntry> savedEntries = null; // Assign keys to primary nodes. for (KeyCacheObject key : keys) - savedVers = map(key, mappings, topVer, mapped, savedVers); + savedEntries = map(key, mappings, topVer, mapped, savedEntries); if (isDone()) return; - final Map<KeyCacheObject, GridNearCacheEntry> saved = savedVers; + final Map<KeyCacheObject, GridNearCacheEntry> saved = savedEntries != null ? savedEntries : + Collections.<KeyCacheObject, GridNearCacheEntry>emptyMap(); final int keysSize = keys.size(); @@ -335,9 +336,6 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap return Collections.emptyMap(); } - finally { - releaseEvictions(mappedKeys.keySet(), saved, topVer); - } } })); } @@ -385,7 +383,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap * @param key Key to map. * @param topVer Topology version * @param mapped Previously mapped. - * @param savedVers Saved versions. + * @param saved Reserved near cache entries. * @return Map. */ private Map<KeyCacheObject, GridNearCacheEntry> map( @@ -393,16 +391,16 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings, AffinityTopologyVersion topVer, Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mapped, - Map<KeyCacheObject, GridNearCacheEntry> savedVers + Map<KeyCacheObject, GridNearCacheEntry> saved ) { final GridNearCacheAdapter near = cache(); // Allow to get cached value from the local node. boolean allowLocRead = !forcePrimary || cctx.affinity().primary(cctx.localNode(), key, topVer); - GridCacheEntryEx entry = allowLocRead ? near.peekEx(key) : null; - while (true) { + GridNearCacheEntry entry = allowLocRead ? (GridNearCacheEntry)near.peekEx(key) : null; + try { CacheObject v = null; GridCacheVersion ver = null; @@ -506,7 +504,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + "(all partition nodes left the grid).")); - return savedVers; + return saved; } if (!affNode.isLocal() && cctx.cache().configuration().isStatisticsEnabled() && !skipVals) @@ -557,7 +555,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + "(all partition nodes left the grid).")); - return savedVers; + return saved; } } @@ -569,20 +567,22 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap MAX_REMAP_CNT + " attempts (key got remapped to the same node) " + "[key=" + key + ", node=" + U.toShortString(affNode) + ", mappings=" + mapped + ']')); - return savedVers; + return saved; } } - GridNearCacheEntry nearEntry = near.entryExx(key, topVer); + if (!cctx.affinity().localNode(key, topVer)) { + GridNearCacheEntry nearEntry = entry != null ? entry : near.entryExx(key, topVer); - nearEntry.reserveEviction(); + nearEntry.reserveEviction(); - entry = null; + entry = null; - if (savedVers == null) - savedVers = U.newHashMap(3); + if (saved == null) + saved = U.newHashMap(3); - savedVers.put(key, nearEntry); + saved.put(key, nearEntry); + } // Don't add reader if transaction acquires lock anyway to avoid deadlock. boolean addRdr = tx == null || tx.optimistic(); @@ -606,7 +606,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap break; } catch (GridCacheEntryRemovedException ignored) { - entry = allowLocRead ? near.peekEx(key) : null; + // Retry. } finally { if (entry != null && tx == null) @@ -614,7 +614,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap } } - return savedVers; + return saved; } /** @@ -657,7 +657,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap * @param nodeId Node id. * @param keys Keys. * @param infos Entry infos. - * @param savedVers Saved versions. + * @param savedEntries Saved entries. * @param topVer Topology version * @return Result map. */ @@ -665,7 +665,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap UUID nodeId, Collection<KeyCacheObject> keys, Collection<GridCacheEntryInfo> infos, - Map<KeyCacheObject, GridNearCacheEntry> savedVers, + Map<KeyCacheObject, GridNearCacheEntry> savedEntries, AffinityTopologyVersion topVer ) { boolean empty = F.isEmpty(keys); @@ -683,9 +683,10 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap // Entries available locally in DHT should not be loaded into near cache for reading. if (!cctx.affinity().localNode(info.key(), cctx.affinity().affinityTopologyVersion())) { - GridNearCacheEntry entry = savedVers.get(info.key()); + GridNearCacheEntry entry = savedEntries.get(info.key()); - assert entry != null : info.key(); + if (entry == null) + entry = cache().entryExx(info.key(), topVer); // Load entry into cache. entry.loadedValue(tx, @@ -743,12 +744,12 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap for (KeyCacheObject key : keys) { GridNearCacheEntry entry = saved.get(key); - assert entry != null : key; + if (entry != null) { + entry.releaseEviction(); - entry.releaseEviction(); - - if (tx == null) - cctx.evicts().touch(entry, topVer); + if (tx == null) + cctx.evicts().touch(entry, topVer); + } } } @@ -791,7 +792,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap private LinkedHashMap<KeyCacheObject, Boolean> keys; /** Saved entry versions. */ - private Map<KeyCacheObject, GridNearCacheEntry> savedVers; + private Map<KeyCacheObject, GridNearCacheEntry> savedEntries; /** Topology version on which this future was mapped. */ private AffinityTopologyVersion topVer; @@ -802,18 +803,18 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap /** * @param node Node. * @param keys Keys. - * @param savedVers Saved entry versions. + * @param savedEntries Saved entries. * @param topVer Topology version. */ MiniFuture( ClusterNode node, LinkedHashMap<KeyCacheObject, Boolean> keys, - Map<KeyCacheObject, GridNearCacheEntry> savedVers, + Map<KeyCacheObject, GridNearCacheEntry> savedEntries, AffinityTopologyVersion topVer ) { this.node = node; this.keys = keys; - this.savedVers = savedVers; + this.savedEntries = savedEntries; this.topVer = topVer; } @@ -852,7 +853,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap /** {@inheritDoc} */ @Override public boolean onDone(@Nullable Map<K, V> res, @Nullable Throwable err) { if (super.onDone(res, err)) { - releaseEvictions(keys.keySet(), savedVers, topVer); + releaseEvictions(keys.keySet(), savedEntries, topVer); return true; } @@ -954,7 +955,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap }), F.t(node, keys), topVer); // It is critical to call onDone after adding futures to compound list. - onDone(loadEntries(node.id(), keys.keySet(), res.entries(), savedVers, topVer)); + onDone(loadEntries(node.id(), keys.keySet(), res.entries(), savedEntries, topVer)); return; } @@ -974,12 +975,12 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap }), F.t(node, keys), new AffinityTopologyVersion(readyTopVer)); // It is critical to call onDone after adding futures to compound list. - onDone(loadEntries(node.id(), keys.keySet(), res.entries(), savedVers, topVer)); + onDone(loadEntries(node.id(), keys.keySet(), res.entries(), savedEntries, topVer)); } }); } else - onDone(loadEntries(node.id(), keys.keySet(), res.entries(), savedVers, topVer)); + onDone(loadEntries(node.id(), keys.keySet(), res.entries(), savedEntries, topVer)); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/ca8d0364/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index fd807c9..d495103 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -999,6 +999,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter txEntry.conflictVersion(explicitVer); } + if (dhtVer == null) + dhtVer = explicitVer != null ? explicitVer : writeVersion(); + if (op == CREATE || op == UPDATE) { GridCacheUpdateTxResult updRes = cached.innerSet( this, @@ -1020,9 +1023,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter dhtVer); if (nearCached != null && updRes.success()) { - if (dhtVer == null) - dhtVer = explicitVer != null ? explicitVer : writeVersion(); - nearCached.innerSet( null, eventNodeId(), @@ -1061,9 +1061,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter dhtVer); if (nearCached != null && updRes.success()) { - if (dhtVer == null) - dhtVer = explicitVer != null ? explicitVer : writeVersion(); - nearCached.innerRemove( null, eventNodeId(), http://git-wip-us.apache.org/repos/asf/ignite/blob/ca8d0364/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNearReaderUpdateTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNearReaderUpdateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNearReaderUpdateTest.java index aed2db8..c2f9fab 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNearReaderUpdateTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNearReaderUpdateTest.java @@ -115,7 +115,7 @@ public class CacheNearReaderUpdateTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected long getTestTimeout() { - return 5 * 60_000; + return 10 * 60_000; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/ca8d0364/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java index f0d7582..cb83798 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java @@ -805,6 +805,9 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac TestCommunicationSpi spi = (TestCommunicationSpi)ignite3.configuration().getCommunicationSpi(); + for (int i = 0; i < 100; i++) + primaryCache(i, null).put(i, -1); + final Map<Integer, Integer> map = new HashMap<>(); for (int i = 0; i < 100; i++)
