Repository: ignite
Updated Branches:
  refs/heads/master 7ee1683e1 -> 2030bb531


IGNITE-8080 Avoid updating tx entries twice if near entry update throws 
EntryRemovedException. - Fixes #3721.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2030bb53
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2030bb53
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2030bb53

Branch: refs/heads/master
Commit: 2030bb5317a106b1de2d726f5b01fe8f4f988213
Parents: 7ee1683
Author: Pavel Kovalenko <[email protected]>
Authored: Mon Apr 2 15:26:23 2018 +0300
Committer: Alexey Goncharuk <[email protected]>
Committed: Mon Apr 2 15:27:15 2018 +0300

----------------------------------------------------------------------
 .../transactions/IgniteTxLocalAdapter.java      | 152 +++++++++++++------
 1 file changed, 104 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/2030bb53/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 8916796..dac4e09 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -538,13 +538,11 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter implements Ig
                                     if (cached.detached())
                                         break;
 
-                                    GridCacheEntryEx nearCached = null;
+                                    boolean updateNearCache = 
updateNearCache(cacheCtx, txEntry.key(), topVer);
 
                                     boolean metrics = true;
 
-                                    if (updateNearCache(cacheCtx, 
txEntry.key(), topVer))
-                                        nearCached = 
cacheCtx.dht().near().peekEx(txEntry.key());
-                                    else if (cacheCtx.isNear() && 
txEntry.locallyMapped())
+                                    if (!updateNearCache && cacheCtx.isNear() 
&& txEntry.locallyMapped())
                                         metrics = false;
 
                                     boolean evt = 
!isNearLocallyMapped(txEntry, false);
@@ -681,29 +679,34 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter implements Ig
                                         if (updRes.loggedPointer() != null)
                                             ptr = updRes.loggedPointer();
 
-                                        if (nearCached != null && 
updRes.success()) {
-                                            nearCached.innerSet(
-                                                null,
-                                                eventNodeId(),
-                                                nodeId,
-                                                val,
-                                                false,
-                                                false,
-                                                txEntry.ttl(),
-                                                false,
-                                                metrics,
-                                                txEntry.keepBinary(),
-                                                txEntry.hasOldValue(),
-                                                txEntry.oldValue(),
-                                                topVer,
-                                                CU.empty0(),
-                                                DR_NONE,
-                                                txEntry.conflictExpireTime(),
-                                                null,
-                                                CU.subjectId(this, cctx),
-                                                resolveTaskName(),
-                                                dhtVer,
-                                                null);
+                                        if (updRes.success() && 
updateNearCache) {
+                                            final CacheObject val0 = val;
+                                            final boolean metrics0 = metrics;
+                                            final GridCacheVersion dhtVer0 = 
dhtVer;
+
+                                            updateNearEntrySafely(cacheCtx, 
txEntry.key(), entry -> entry.innerSet(
+                                                    null,
+                                                    eventNodeId(),
+                                                    nodeId,
+                                                    val0,
+                                                    false,
+                                                    false,
+                                                    txEntry.ttl(),
+                                                    false,
+                                                    metrics0,
+                                                    txEntry.keepBinary(),
+                                                    txEntry.hasOldValue(),
+                                                    txEntry.oldValue(),
+                                                    topVer,
+                                                    CU.empty0(),
+                                                    DR_NONE,
+                                                    
txEntry.conflictExpireTime(),
+                                                    null,
+                                                    CU.subjectId(this, cctx),
+                                                    resolveTaskName(),
+                                                    dhtVer0,
+                                                    null)
+                                            );
                                         }
                                     }
                                     else if (op == DELETE) {
@@ -732,32 +735,36 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter implements Ig
                                         if (updRes.loggedPointer() != null)
                                             ptr = updRes.loggedPointer();
 
-                                        if (nearCached != null && 
updRes.success()) {
-                                            nearCached.innerRemove(
-                                                null,
-                                                eventNodeId(),
-                                                nodeId,
-                                                false,
-                                                false,
-                                                metrics,
-                                                txEntry.keepBinary(),
-                                                txEntry.hasOldValue(),
-                                                txEntry.oldValue(),
-                                                topVer,
-                                                CU.empty0(),
-                                                DR_NONE,
-                                                null,
-                                                CU.subjectId(this, cctx),
-                                                resolveTaskName(),
-                                                dhtVer,
-                                                null);
+                                        if (updRes.success() && 
updateNearCache) {
+                                            final boolean metrics0 = metrics;
+                                            final GridCacheVersion dhtVer0 = 
dhtVer;
+
+                                            updateNearEntrySafely(cacheCtx, 
txEntry.key(), entry -> entry.innerRemove(
+                                                    null,
+                                                    eventNodeId(),
+                                                    nodeId,
+                                                    false,
+                                                    false,
+                                                    metrics0,
+                                                    txEntry.keepBinary(),
+                                                    txEntry.hasOldValue(),
+                                                    txEntry.oldValue(),
+                                                    topVer,
+                                                    CU.empty0(),
+                                                    DR_NONE,
+                                                    null,
+                                                    CU.subjectId(this, cctx),
+                                                    resolveTaskName(),
+                                                    dhtVer0,
+                                                    null)
+                                            );
                                         }
                                     }
                                     else if (op == RELOAD) {
                                         cached.innerReload();
 
-                                        if (nearCached != null)
-                                            nearCached.innerReload();
+                                        if (updateNearCache)
+                                            updateNearEntrySafely(cacheCtx, 
txEntry.key(), entry -> entry.innerReload());
                                     }
                                     else if (op == READ) {
                                         CacheGroupContext grp = 
cacheCtx.group();
@@ -906,6 +913,40 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter implements Ig
     }
 
     /**
+     * Safely performs {@code updateClojure} operation on near cache entry 
with given {@code entryKey}.
+     * In case of {@link GridCacheEntryRemovedException} operation will be 
retried.
+     *
+     * @param cacheCtx Cache context.
+     * @param entryKey Entry key.
+     * @param updateClojure Near entry update clojure.
+     * @throws IgniteCheckedException If update is failed.
+     */
+    private void updateNearEntrySafely(
+        GridCacheContext cacheCtx,
+        KeyCacheObject entryKey,
+        NearEntryUpdateClojure<GridCacheEntryEx> updateClojure
+    ) throws IgniteCheckedException {
+        while (true) {
+            GridCacheEntryEx nearCached = 
cacheCtx.dht().near().peekEx(entryKey);
+
+            if (nearCached == null)
+                break;
+
+            try {
+                updateClojure.apply(nearCached);
+
+                break;
+            }
+            catch (GridCacheEntryRemovedException ignored) {
+                if (log.isDebugEnabled())
+                    log.debug("Got removed entry during transaction commit 
(will retry): " + nearCached);
+
+                cacheCtx.dht().near().removeEntry(nearCached);
+            }
+        }
+    }
+
+    /**
      * Commits transaction to transaction manager. Used for one-phase commit 
transactions only.
      *
      * @param commit If {@code true} commits transaction, otherwise rollbacks.
@@ -1786,4 +1827,19 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter implements Ig
          */
         protected abstract IgniteInternalFuture<T> postMiss(T t) throws 
IgniteCheckedException;
     }
+
+    /**
+     * Clojure to perform operations with near cache entries.
+     */
+    @FunctionalInterface
+    private interface NearEntryUpdateClojure<E extends GridCacheEntryEx> {
+        /**
+         * Apply clojure to given {@code entry}.
+         *
+         * @param entry Entry.
+         * @throws IgniteCheckedException If operation is failed.
+         * @throws GridCacheEntryRemovedException If entry is removed.
+         */
+        public void apply(E entry) throws IgniteCheckedException, 
GridCacheEntryRemovedException;
+    }
 }

Reply via email to