Repository: ignite Updated Branches: refs/heads/ignite-2523-1-dht [created] 581f544f6
WIP. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/581f544f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/581f544f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/581f544f Branch: refs/heads/ignite-2523-1-dht Commit: 581f544f6f8606dc28b25a631cbe225b23603b57 Parents: 96cc54c Author: vozerov-gridgain <[email protected]> Authored: Mon Apr 25 09:01:53 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Mon Apr 25 09:01:53 2016 +0300 ---------------------------------------------------------------------- .../dht/atomic/GridDhtAtomicCache.java | 205 ++++++++++++------- .../GridNearAtomicAbstractUpdateRequest.java | 16 ++ .../GridNearAtomicSingleUpdateRequest.java | 12 ++ .../dht/atomic/GridNearAtomicUpdateRequest.java | 10 + 4 files changed, 167 insertions(+), 76 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/581f544f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index f412dfa..ac3f73f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1484,9 +1484,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, req.futureVersion(), ctx.deploymentEnabled()); - List<KeyCacheObject> keys = req.keys(); - - assert !req.returnValue() || (req.operation() == TRANSFORM || keys.size() == 1); + assert !req.returnValue() || (req.operation() == TRANSFORM || req.keysCount() == 1); GridDhtAtomicUpdateFuture dhtFut = null; @@ -1499,9 +1497,20 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { try { // If batch store update is enabled, we need to lock all entries. // First, need to acquire locks on cache entries, then check filter. - List<GridDhtCacheEntry> locked = lockEntries(keys, req.topologyVersion()); + List<GridDhtCacheEntry> lockedEntries; + GridDhtCacheEntry lockedEntry; + + if (req.keysCount() == 1) { + lockedEntries = null; + lockedEntry = lockEntry(req); + } + else { + lockedEntries = lockEntries(req); + lockedEntry = null; + } - Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null; + Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deletedEntries = null; + IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion> deletedEntry = null; try { GridDhtPartitionTopology top = topology(); @@ -1510,7 +1519,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { try { if (top.stopping()) { - res.addFailedKeys(keys, new IgniteCheckedException("Failed to perform cache operation " + + // TODO: Add this method to request instead. + res.addFailedKeys(req.keys(), new IgniteCheckedException("Failed to perform cache operation " + "(cache is stopped): " + name())); completionCb.apply(req, res); @@ -1558,7 +1568,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { GridCacheReturn retVal = null; - if (keys.size() > 1 && // Several keys ... + if (req.keysCount() > 1 && // Several keys ... writeThrough() && !req.skipStore() && // and store is enabled ... !ctx.store().isLocal() && // and this is not local store ... !ctx.dr().receiveEnabled() // and no DR. @@ -1568,7 +1578,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { hasNear, req, res, - locked, + lockedEntries, ver, dhtFut, completionCb, @@ -1577,7 +1587,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { expiry, sndPrevVal); - deleted = updRes.deleted(); + deletedEntries = updRes.deleted(); dhtFut = updRes.dhtFuture(); if (req.operation() == TRANSFORM) @@ -1588,7 +1598,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { hasNear, req, res, - locked, + lockedEntries, ver, dhtFut, completionCb, @@ -1598,7 +1608,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { sndPrevVal); retVal = updRes.returnValue(); - deleted = updRes.deleted(); + deletedEntries = updRes.deleted(); dhtFut = updRes.dhtFuture(); } @@ -1627,15 +1637,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { e.printStackTrace(); } finally { - if (locked != null) - unlockEntries(locked, req.topologyVersion()); + if (lockedEntry != null) + unlockEntry(lockedEntry, req.topologyVersion()); + + if (lockedEntries != null) + unlockEntries(lockedEntries, req.topologyVersion()); // Enqueue if necessary after locks release. - if (deleted != null) { - assert !deleted.isEmpty(); + if (deletedEntries != null) { + assert !deletedEntries.isEmpty(); assert ctx.deferredDelete() : this; - for (IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion> e : deleted) + for (IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion> e : deletedEntries) ctx.onDeferredDelete(e.get1(), e.get2()); } } @@ -1653,7 +1666,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { // an attempt to use cleaned resources. U.error(log, "Unexpected exception during cache update", e); - res.addFailedKeys(keys, e); + res.addFailedKeys(req.keys(), e); completionCb.apply(req, res); @@ -1666,7 +1679,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (remap) { assert dhtFut == null; - res.remapKeys(keys); + res.remapKeys(req.keys()); completionCb.apply(req, res); } @@ -2137,8 +2150,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { GridCacheReturn retVal = null; Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null; - List<KeyCacheObject> keys = req.keys(); - AffinityTopologyVersion topVer = req.topologyVersion(); boolean checkReaders = hasNear || ctx.discovery().hasNearCache(name(), topVer); @@ -2148,8 +2159,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { boolean intercept = ctx.config().getInterceptor() != null; // Avoid iterator creation. - for (int i = 0; i < keys.size(); i++) { - KeyCacheObject k = keys.get(i); + for (int i = 0; i < req.keysCount(); i++) { + KeyCacheObject k = req.key(i); GridCacheOperation op = req.operation(); @@ -2284,7 +2295,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (updRes.removeVersion() != null) { if (deleted == null) - deleted = new ArrayList<>(keys.size()); + deleted = new ArrayList<>(req.keysCount()); deleted.add(F.t(entry, updRes.removeVersion())); } @@ -2597,88 +2608,130 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** * Acquires java-level locks on cache entries. Returns collection of locked entries. * - * @param keys Keys to lock. - * @param topVer Topology version to lock on. + * @param req Request. * @return Collection of locked entries. * @throws GridDhtInvalidPartitionException If entry does not belong to local node. If exception is thrown, * locks are released. */ @SuppressWarnings("ForLoopReplaceableByForEach") - private List<GridDhtCacheEntry> lockEntries(List<KeyCacheObject> keys, AffinityTopologyVersion topVer) + private GridDhtCacheEntry lockEntry(GridNearAtomicAbstractUpdateRequest req) throws GridDhtInvalidPartitionException { - if (keys.size() == 1) { - KeyCacheObject key = keys.get(0); + while (true) { + try { + GridDhtCacheEntry entry = entryExx(req.key(0), req.topologyVersion()); - while (true) { - try { - GridDhtCacheEntry entry = entryExx(key, topVer); + GridUnsafe.monitorEnter(entry); - GridUnsafe.monitorEnter(entry); + if (entry.obsolete()) + GridUnsafe.monitorExit(entry); + else + return entry; + } + catch (GridDhtInvalidPartitionException e) { + // Ignore invalid partition exception in CLOCK ordering mode. + if (ctx.config().getAtomicWriteOrderMode() == CLOCK) + return null; + else + throw e; + } + } + } - if (entry.obsolete()) - GridUnsafe.monitorExit(entry); - else - return Collections.singletonList(entry); + /** + * Acquires java-level locks on cache entries. Returns collection of locked entries. + * + * @param req Request. + * @return Collection of locked entries. + * @throws GridDhtInvalidPartitionException If entry does not belong to local node. If exception is thrown, + * locks are released. + */ + @SuppressWarnings("ForLoopReplaceableByForEach") + private List<GridDhtCacheEntry> lockEntries(GridNearAtomicAbstractUpdateRequest req) + throws GridDhtInvalidPartitionException { + List<GridDhtCacheEntry> locked = new ArrayList<>(req.keysCount()); + + while (true) { + for (int i = 0; i < req.keysCount(); i++) { + KeyCacheObject key = req.key(i); + + try { + GridDhtCacheEntry entry = entryExx(key, req.topologyVersion()); + + locked.add(entry); } catch (GridDhtInvalidPartitionException e) { // Ignore invalid partition exception in CLOCK ordering mode. if (ctx.config().getAtomicWriteOrderMode() == CLOCK) - return Collections.singletonList(null); + locked.add(null); else throw e; } } - } - else { - List<GridDhtCacheEntry> locked = new ArrayList<>(keys.size()); - while (true) { - for (KeyCacheObject key : keys) { - try { - GridDhtCacheEntry entry = entryExx(key, topVer); + boolean retry = false; - locked.add(entry); - } - catch (GridDhtInvalidPartitionException e) { - // Ignore invalid partition exception in CLOCK ordering mode. - if (ctx.config().getAtomicWriteOrderMode() == CLOCK) - locked.add(null); - else - throw e; - } - } + for (int i = 0; i < locked.size(); i++) { + GridCacheMapEntry entry = locked.get(i); - boolean retry = false; + if (entry == null) + continue; - for (int i = 0; i < locked.size(); i++) { - GridCacheMapEntry entry = locked.get(i); + GridUnsafe.monitorEnter(entry); - if (entry == null) - continue; + if (entry.obsolete()) { + // Unlock all locked. + for (int j = 0; j <= i; j++) { + if (locked.get(j) != null) + GridUnsafe.monitorExit(locked.get(j)); + } - GridUnsafe.monitorEnter(entry); + // Clear entries. + locked.clear(); - if (entry.obsolete()) { - // Unlock all locked. - for (int j = 0; j <= i; j++) { - if (locked.get(j) != null) - GridUnsafe.monitorExit(locked.get(j)); - } + // Retry. + retry = true; - // Clear entries. - locked.clear(); + break; + } + } - // Retry. - retry = true; + if (!retry) + return locked; + } + } - break; - } - } + /** + * Releases java-level locks on cache entries. + * + * @param locked Locked entries. + * @param topVer Topology version. + */ + private void unlockEntry(GridDhtCacheEntry locked, AffinityTopologyVersion topVer) { + // Process deleted entries before locks release. + assert ctx.deferredDelete() : this; - if (!retry) - return locked; - } + // Entries to skip eviction manager notification for. + // Enqueue entries while holding locks. + boolean skip = false; + + try { + if (locked.deleted()) + skip = true; } + finally { + // At least RuntimeException can be thrown by the code above when GridCacheContext is cleaned and there is + // an attempt to use cleaned resources. + // That's why releasing locks in the finally block.. + GridUnsafe.monitorExit(locked); + } + + // Try evict partitions. + locked.onUnlock(); + + // Must touch all entries since update may have deleted entries. + // Eviction manager will remove empty entries. + if (!skip) + ctx.evicts().touch(locked, topVer); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/581f544f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java index 37d9e45..cbf4a7d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java @@ -131,6 +131,14 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa public abstract boolean keepBinary(); /** + * Get key at the given index. + * + * @param idx Index. + * @return Key. + */ + public abstract KeyCacheObject key(int idx); + + /** * @return Keys for this update request. */ // TODO @@ -226,4 +234,12 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa * @param clearKeys If {@code true} clears keys. */ public abstract void cleanup(boolean clearKeys); + + /** + * Whether this is single=key request. + * + * @return {@code True} if this is a single-key request. + */ + // TODO: Looks like we can remove it. + public abstract boolean isSingle(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/581f544f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java index 37d86d9..f62f512 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java @@ -347,6 +347,13 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd } /** {@inheritDoc} */ + @Override public KeyCacheObject key(int idx) { + assert idx == 0; + + return key; + } + + /** {@inheritDoc} */ @Override public List<KeyCacheObject> keys() { return Collections.singletonList(key); } @@ -494,6 +501,11 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd } /** {@inheritDoc} */ + @Override public boolean isSingle() { + return true; + } + + /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); http://git-wip-us.apache.org/repos/asf/ignite/blob/581f544f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java index f061868..ed7d595 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java @@ -411,6 +411,11 @@ public class GridNearAtomicUpdateRequest extends GridNearAtomicAbstractUpdateReq } /** {@inheritDoc} */ + @Override public KeyCacheObject key(int idx) { + return keys.get(idx); + } + + /** {@inheritDoc} */ @Override public List<KeyCacheObject> keys() { return keys; } @@ -593,6 +598,11 @@ public class GridNearAtomicUpdateRequest extends GridNearAtomicAbstractUpdateReq } /** {@inheritDoc} */ + @Override public boolean isSingle() { + return false; + } + + /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf);
