Repository: ignite Updated Branches: refs/heads/master 7ee1683e1 -> 2030bb531
IGNITE-8080 Avoid updating tx entries twice if near entry update throws EntryRemovedException. - Fixes #3721. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2030bb53 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2030bb53 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2030bb53 Branch: refs/heads/master Commit: 2030bb5317a106b1de2d726f5b01fe8f4f988213 Parents: 7ee1683 Author: Pavel Kovalenko <[email protected]> Authored: Mon Apr 2 15:26:23 2018 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Mon Apr 2 15:27:15 2018 +0300 ---------------------------------------------------------------------- .../transactions/IgniteTxLocalAdapter.java | 152 +++++++++++++------ 1 file changed, 104 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/2030bb53/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 8916796..dac4e09 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -538,13 +538,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig if (cached.detached()) break; - GridCacheEntryEx nearCached = null; + boolean updateNearCache = updateNearCache(cacheCtx, txEntry.key(), topVer); boolean metrics = true; - if (updateNearCache(cacheCtx, txEntry.key(), topVer)) - nearCached = cacheCtx.dht().near().peekEx(txEntry.key()); - else if (cacheCtx.isNear() && txEntry.locallyMapped()) + if (!updateNearCache && cacheCtx.isNear() && txEntry.locallyMapped()) metrics = false; boolean evt = !isNearLocallyMapped(txEntry, false); @@ -681,29 +679,34 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig if (updRes.loggedPointer() != null) ptr = updRes.loggedPointer(); - if (nearCached != null && updRes.success()) { - nearCached.innerSet( - null, - eventNodeId(), - nodeId, - val, - false, - false, - txEntry.ttl(), - false, - metrics, - txEntry.keepBinary(), - txEntry.hasOldValue(), - txEntry.oldValue(), - topVer, - CU.empty0(), - DR_NONE, - txEntry.conflictExpireTime(), - null, - CU.subjectId(this, cctx), - resolveTaskName(), - dhtVer, - null); + if (updRes.success() && updateNearCache) { + final CacheObject val0 = val; + final boolean metrics0 = metrics; + final GridCacheVersion dhtVer0 = dhtVer; + + updateNearEntrySafely(cacheCtx, txEntry.key(), entry -> entry.innerSet( + null, + eventNodeId(), + nodeId, + val0, + false, + false, + txEntry.ttl(), + false, + metrics0, + txEntry.keepBinary(), + txEntry.hasOldValue(), + txEntry.oldValue(), + topVer, + CU.empty0(), + DR_NONE, + txEntry.conflictExpireTime(), + null, + CU.subjectId(this, cctx), + resolveTaskName(), + dhtVer0, + null) + ); } } else if (op == DELETE) { @@ -732,32 +735,36 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig if (updRes.loggedPointer() != null) ptr = updRes.loggedPointer(); - if (nearCached != null && updRes.success()) { - nearCached.innerRemove( - null, - eventNodeId(), - nodeId, - false, - false, - metrics, - txEntry.keepBinary(), - txEntry.hasOldValue(), - txEntry.oldValue(), - topVer, - CU.empty0(), - DR_NONE, - null, - CU.subjectId(this, cctx), - resolveTaskName(), - dhtVer, - null); + if (updRes.success() && updateNearCache) { + final boolean metrics0 = metrics; + final GridCacheVersion dhtVer0 = dhtVer; + + updateNearEntrySafely(cacheCtx, txEntry.key(), entry -> entry.innerRemove( + null, + eventNodeId(), + nodeId, + false, + false, + metrics0, + txEntry.keepBinary(), + txEntry.hasOldValue(), + txEntry.oldValue(), + topVer, + CU.empty0(), + DR_NONE, + null, + CU.subjectId(this, cctx), + resolveTaskName(), + dhtVer0, + null) + ); } } else if (op == RELOAD) { cached.innerReload(); - if (nearCached != null) - nearCached.innerReload(); + if (updateNearCache) + updateNearEntrySafely(cacheCtx, txEntry.key(), entry -> entry.innerReload()); } else if (op == READ) { CacheGroupContext grp = cacheCtx.group(); @@ -906,6 +913,40 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig } /** + * Safely performs {@code updateClojure} operation on near cache entry with given {@code entryKey}. + * In case of {@link GridCacheEntryRemovedException} operation will be retried. + * + * @param cacheCtx Cache context. + * @param entryKey Entry key. + * @param updateClojure Near entry update clojure. + * @throws IgniteCheckedException If update is failed. + */ + private void updateNearEntrySafely( + GridCacheContext cacheCtx, + KeyCacheObject entryKey, + NearEntryUpdateClojure<GridCacheEntryEx> updateClojure + ) throws IgniteCheckedException { + while (true) { + GridCacheEntryEx nearCached = cacheCtx.dht().near().peekEx(entryKey); + + if (nearCached == null) + break; + + try { + updateClojure.apply(nearCached); + + break; + } + catch (GridCacheEntryRemovedException ignored) { + if (log.isDebugEnabled()) + log.debug("Got removed entry during transaction commit (will retry): " + nearCached); + + cacheCtx.dht().near().removeEntry(nearCached); + } + } + } + + /** * Commits transaction to transaction manager. Used for one-phase commit transactions only. * * @param commit If {@code true} commits transaction, otherwise rollbacks. @@ -1786,4 +1827,19 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig */ protected abstract IgniteInternalFuture<T> postMiss(T t) throws IgniteCheckedException; } + + /** + * Clojure to perform operations with near cache entries. + */ + @FunctionalInterface + private interface NearEntryUpdateClojure<E extends GridCacheEntryEx> { + /** + * Apply clojure to given {@code entry}. + * + * @param entry Entry. + * @throws IgniteCheckedException If operation is failed. + * @throws GridCacheEntryRemovedException If entry is removed. + */ + public void apply(E entry) throws IgniteCheckedException, GridCacheEntryRemovedException; + } }
