Repository: ignite
Updated Branches:
  refs/heads/master 7868e6679 -> 7414c9956


IGNITE-10130 Added an option to disable cache interceptor trigger in case of 
conflicts - Fixes #5251.

Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7414c995
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7414c995
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7414c995

Branch: refs/heads/master
Commit: 7414c9956d9f00e6514a508035156f2605df286a
Parents: 7868e66
Author: Sergey Antonov <antonovserge...@gmail.com>
Authored: Mon Dec 24 14:33:06 2018 +0300
Committer: Alexey Goncharuk <alexey.goncha...@gmail.com>
Committed: Mon Dec 24 14:47:53 2018 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |   7 +
 .../processors/cache/GridCacheContext.java      |  12 ++
 .../processors/cache/GridCacheMapEntry.java     | 168 +++++++++++--------
 3 files changed, 120 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7414c995/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 9979ee1..4cfb361 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -1042,6 +1042,13 @@ public final class IgniteSystemProperties {
     public static final String IGNITE_RECOVERY_VERBOSE_LOGGING = 
"IGNITE_RECOVERY_VERBOSE_LOGGING";
 
     /**
+     * Disables cache interceptor triggering in case of conflicts.
+     *
+     * Default is {@code false}.
+     */
+    public static final String 
IGNITE_DISABLE_TRIGGERING_CACHE_INTERCEPTOR_ON_CONFLICT = 
"IGNITE_DISABLE_TRIGGERING_CACHE_INTERCEPTOR_ON_CONFLICT";
+
+    /**
      * Sets default {@link CacheConfiguration#setDiskPageCompression disk page 
compression}.
      */
     public static final String IGNITE_DEFAULT_DISK_PAGE_COMPRESSION = 
"IGNITE_DEFAULT_DISK_PAGE_COMPRESSION";

http://git-wip-us.apache.org/repos/asf/ignite/blob/7414c995/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 19c01b2..2f73218 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -109,6 +109,7 @@ import org.apache.ignite.plugin.security.SecurityException;
 import org.apache.ignite.plugin.security.SecurityPermission;
 import org.jetbrains.annotations.Nullable;
 
+import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_DISABLE_TRIGGERING_CACHE_INTERCEPTOR_ON_CONFLICT;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_READ_LOAD_BALANCING;
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
@@ -276,6 +277,10 @@ public class GridCacheContext<K, V> implements 
Externalizable {
     /** Recovery mode flag. */
     private volatile boolean recoveryMode;
 
+    /** */
+    private final boolean disableTriggeringCacheInterceptorOnConflict =
+        
Boolean.parseBoolean(System.getProperty(IGNITE_DISABLE_TRIGGERING_CACHE_INTERCEPTOR_ON_CONFLICT,
 "false"));
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -861,6 +866,13 @@ public class GridCacheContext<K, V> implements 
Externalizable {
     }
 
     /**
+     * @return {@code True} if cache interceptor should be skipped in case of 
conflicts.
+     */
+    public boolean disableTriggeringCacheInterceptorOnConflict() {
+        return disableTriggeringCacheInterceptorOnConflict;
+    }
+
+    /**
      * @return Local node.
      */
     public ClusterNode localNode() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/7414c995/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 881e868..899417d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -34,6 +34,7 @@ import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheInterceptor;
 import org.apache.ignite.cache.eviction.EvictableEntry;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.NodeStoppingException;
@@ -1157,17 +1158,17 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
 
                 assert !invoke || res.invokeResult() != null;
 
-                if(invoke) // No-op invoke happened.
+                if (invoke) // No-op invoke happened.
                     updRes.invokeResult(res.invokeResult());
 
                 updRes.filtered(true);
 
-                if(retVal)
+                if (retVal)
                     updRes.prevValue(res.oldValue());
 
                 return updRes;
             }
-            else if(noCreate && !invoke && res.resultType() == 
ResultType.PREV_NULL)
+            else if (noCreate && !invoke && res.resultType() == 
ResultType.PREV_NULL)
                 return new GridCacheUpdateTxResult(false);
             else if (res.resultType() == ResultType.LOCKED) {
                 unlockEntry();
@@ -1262,7 +1263,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
 
         updRes.newValue(res.newValue());
 
-        if(invoke) {
+        if (invoke) {
             assert res.invokeResult() != null;
 
             updRes.invokeResult(res.invokeResult());
@@ -1369,7 +1370,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
         GridCacheUpdateTxResult updRes = valid ? new 
GridCacheUpdateTxResult(true, 0L, logPtr) :
             new GridCacheUpdateTxResult(false, logPtr);
 
-        if(retVal && (res.resultType() == ResultType.PREV_NOT_NULL || 
res.resultType() == ResultType.VERSION_FOUND))
+        if (retVal && (res.resultType() == ResultType.PREV_NOT_NULL || 
res.resultType() == ResultType.VERSION_FOUND))
             updRes.prevValue(res.oldValue());
 
         if (needOldVal && compareIgnoreOpCounter(res.resultVersion(), mvccVer) 
!= 0 &&
@@ -1520,17 +1521,18 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
 
             old = oldValPresent ? oldVal : this.val;
 
+            if (intercept)
+                intercept = !skipInterceptor(explicitVer);
+
             if (intercept) {
                 val0 = cctx.unwrapBinaryIfNeeded(val, keepBinary, false);
 
                 CacheLazyEntry e = new CacheLazyEntry(cctx, key, old, 
keepBinary);
 
-                Object interceptorVal = 
cctx.config().getInterceptor().onBeforePut(
-                    new CacheLazyEntry(cctx, key, old, keepBinary),
-                    val0);
-
                 key0 = e.key();
 
+                Object interceptorVal = 
cctx.config().getInterceptor().onBeforePut(e, val0);
+
                 if (interceptorVal == null)
                     return new GridCacheUpdateTxResult(false, logPtr);
                 else if (interceptorVal != val0)
@@ -1749,6 +1751,9 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
 
             old = oldValPresent ? oldVal : val;
 
+            if (intercept)
+                intercept = !skipInterceptor(explicitVer);
+
             if (intercept) {
                 entry0 = new CacheLazyEntry(cctx, key, old, keepBinary);
 
@@ -2072,12 +2077,12 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
             else
                 updated = (CacheObject)writeObj;
 
-            op = updated == null ? GridCacheOperation.DELETE : 
GridCacheOperation.UPDATE;
+            op = updated == null ? DELETE : UPDATE;
 
             if (intercept) {
                 CacheLazyEntry e;
 
-                if (op == GridCacheOperation.UPDATE) {
+                if (op == UPDATE) {
                     updated0 = value(updated0, updated, keepBinary, false);
 
                     e = new CacheLazyEntry(cctx, key, key0, old, old0, 
keepBinary);
@@ -2110,7 +2115,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
             long ttl = CU.TTL_ETERNAL;
             long expireTime = CU.EXPIRE_TIME_ETERNAL;
 
-            if (op == GridCacheOperation.UPDATE) {
+            if (op == UPDATE) {
                 if (expiryPlc != null) {
                     ttl = CU.toTtl(hadVal ? expiryPlc.getExpiryForUpdate() : 
expiryPlc.getExpiryForCreation());
 
@@ -2128,14 +2133,14 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
             }
 
             if (ttl == CU.TTL_ZERO) {
-                op = GridCacheOperation.DELETE;
+                op = DELETE;
 
                 //If time expired no transformation needed.
                 transformOp = false;
             }
 
             // Try write-through.
-            if (op == GridCacheOperation.UPDATE) {
+            if (op == UPDATE) {
                 // Detach value before index update.
                 updated = 
cctx.kernalContext().cacheObjects().prepareForCache(updated, cctx);
 
@@ -2229,7 +2234,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
             }
 
             if (intercept) {
-                if (op == GridCacheOperation.UPDATE)
+                if (op == UPDATE)
                     cctx.config().getInterceptor().onAfterPut(new 
CacheLazyEntry(cctx, key, key0, updated, updated0, keepBinary, 0L));
                 else
                     cctx.config().getInterceptor().onAfterRemove(new 
CacheLazyEntry(cctx, key, key0, old, old0, keepBinary, 0L));
@@ -2322,7 +2327,9 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                 conflictVer,
                 conflictResolve,
                 intercept,
-                updateCntr);
+                updateCntr,
+                cctx.disableTriggeringCacheInterceptorOnConflict()
+            );
 
             key.valueBytes(cctx.cacheObjectContext());
 
@@ -2443,7 +2450,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                     keepBinary);
             }
 
-            if (c.op == GridCacheOperation.UPDATE) {
+            if (c.op == UPDATE) {
                 updateVal = val;
 
                 assert updateVal != null : c;
@@ -2474,7 +2481,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                 }
             }
             else {
-                assert c.op == GridCacheOperation.DELETE : c.op;
+                assert c.op == DELETE : c.op;
 
                 clearReaders();
 
@@ -2523,29 +2530,23 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                     topVer);
             }
 
-            if (intercept) {
-                if (c.op == GridCacheOperation.UPDATE) {
-                    cctx.config().getInterceptor().onAfterPut(new 
CacheLazyEntry(
-                        cctx,
-                        key,
-                        null,
-                        updateVal,
-                        null,
-                        keepBinary,
-                        c.updateRes.updateCounter()));
-                }
-                else {
-                    assert c.op == GridCacheOperation.DELETE : c.op;
+            if (intercept && c.wasIntercepted) {
+                assert c.op == UPDATE || c.op == DELETE : c.op;
 
-                    cctx.config().getInterceptor().onAfterRemove(new 
CacheLazyEntry(
-                        cctx,
-                        key,
-                        null,
-                        oldVal,
-                        null,
-                        keepBinary,
-                        c.updateRes.updateCounter()));
-                }
+                Cache.Entry<?,?> entry = new CacheLazyEntry<>(
+                    cctx,
+                    key,
+                    null,
+                    c.op == UPDATE ? updateVal : oldVal,
+                    null,
+                    keepBinary,
+                    c.updateRes.updateCounter()
+                );
+
+                if (c.op == UPDATE)
+                    cctx.config().getInterceptor().onAfterPut(entry);
+                else
+                    cctx.config().getInterceptor().onAfterRemove(entry);
             }
         }
         finally {
@@ -3288,6 +3289,34 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
         return val != null;
     }
 
+    /**
+     * Checks, that changes were got by DR.
+     *
+     * @param explicitVer – Explicit version (if any).
+     * @return {@code true} if changes were got by DR and {@code false} 
otherwise.
+     */
+    private boolean isRemoteDrUpdate(@Nullable GridCacheVersion explicitVer) {
+        return explicitVer != null && explicitVer.dataCenterId() != 
cctx.dr().dataCenterId();
+    }
+
+    /**
+     * Checks, that cache interceptor should be skipped.
+     * <p>
+     * It is expects by default behavior that Interceptor methods ({@link 
CacheInterceptor#onBeforePut(Cache.Entry,
+     * Object)}, {@link CacheInterceptor#onAfterPut(Cache.Entry)}, {@link 
CacheInterceptor#onBeforeRemove(Cache.Entry)}
+     * and {@link CacheInterceptor#onAfterRemove(Cache.Entry)}) will be 
called, but {@link
+     * CacheInterceptor#onGet(Object, Object)}. This can even make DR-update 
flow broken in case of non-idempotent
+     * Interceptor and force users to call onGet manually as the only 
workaround. Also, user may want to skip
+     * Interceptor to avoid redundant entry transformation for DR updates and 
exchange with internal data b/w data
+     * centres which is a normal case.
+     *
+     * @param explicitVer - Explicit version (if any).
+     * @return {@code true} if cache interceptor should be skipped and {@code 
false} otherwise.
+     */
+    private boolean skipInterceptor(@Nullable GridCacheVersion explicitVer) {
+        return isRemoteDrUpdate(explicitVer) && 
cctx.disableTriggeringCacheInterceptorOnConflict();
+    }
+
     /** {@inheritDoc} */
     @Override public CacheObject rawPut(CacheObject val, long ttl) {
         lockEntry();
@@ -3470,7 +3499,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                             cctx.cacheId(),
                             key,
                             val,
-                            val == null ? GridCacheOperation.DELETE : 
GridCacheOperation.CREATE,
+                            val == null ? DELETE : GridCacheOperation.CREATE,
                             null,
                             ver,
                             expireTime,
@@ -3483,7 +3512,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                             cctx.cacheId(),
                             key,
                             val,
-                            val == null ? GridCacheOperation.DELETE : 
GridCacheOperation.CREATE,
+                            val == null ? DELETE : GridCacheOperation.CREATE,
                             null,
                             ver,
                             expireTime,
@@ -4347,9 +4376,9 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
         if (tx.local()) { // For remote tx we log all updates in batch: 
GridDistributedTxRemoteAdapter.commitIfLocked()
             GridCacheOperation op;
             if (val == null)
-                op = GridCacheOperation.DELETE;
+                op = DELETE;
             else
-                op = this.val == null ? GridCacheOperation.CREATE : 
GridCacheOperation.UPDATE;
+                op = this.val == null ? GridCacheOperation.CREATE : UPDATE;
 
             return cctx.shared().wal().log(new DataRecord(new DataEntry(
                 cctx.cacheId(),
@@ -4383,9 +4412,9 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
         if (tx.local()) { // For remote tx we log all updates in batch: 
GridDistributedTxRemoteAdapter.commitIfLocked()
             GridCacheOperation op;
             if (val == null)
-                op = GridCacheOperation.DELETE;
+                op = DELETE;
             else
-                op = this.val == null ? GridCacheOperation.CREATE : 
GridCacheOperation.UPDATE;
+                op = this.val == null ? GridCacheOperation.CREATE : UPDATE;
 
             return cctx.shared().wal().log(new MvccDataRecord(new 
MvccDataEntry(
                 cctx.cacheId(),
@@ -4959,7 +4988,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
      */
     private void updateMetrics(GridCacheOperation op, boolean metrics, boolean 
transformed, boolean hasOldVal) {
         if (metrics && cctx.statisticsEnabled()) {
-            if (op == GridCacheOperation.DELETE) {
+            if (op == DELETE) {
                 cctx.cache().metrics0().onRemove();
 
                 if (transformed)
@@ -5535,7 +5564,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
 
                     updRes.filtered(true);
 
-                    if(needVal)
+                    if (needVal)
                         updRes.prevValue(res.oldValue());
 
                     resFut.onDone(updRes);
@@ -5619,7 +5648,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
             GridCacheUpdateTxResult updRes = valid ? new 
GridCacheUpdateTxResult(true, 0L, logPtr)
                 : new GridCacheUpdateTxResult(false, logPtr);
 
-            if(invoke) {
+            if (invoke) {
                 assert res.invokeResult() != null;
 
                 updRes.invokeResult(res.invokeResult());
@@ -5894,6 +5923,9 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
         private final Long updateCntr;
 
         /** */
+        private final boolean skipInterceptorOnConflict;
+
+        /** */
         private GridCacheUpdateAtomicResult updateRes;
 
         /** */
@@ -5906,8 +5938,12 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
         private CacheDataRow oldRow;
 
         /** OldRow expiration flag. */
-        private boolean oldRowExpiredFlag = false;
+        private boolean oldRowExpiredFlag;
+
+        /** Disable interceptor invocation onAfter* methods flag. */
+        private boolean wasIntercepted;
 
+        /** */
         AtomicCacheUpdateClosure(
             GridCacheMapEntry entry,
             AffinityTopologyVersion topVer,
@@ -5927,7 +5963,8 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
             @Nullable GridCacheVersion conflictVer,
             boolean conflictResolve,
             boolean intercept,
-            @Nullable Long updateCntr) {
+            @Nullable Long updateCntr,
+            boolean skipInterceptorOnConflict) {
             assert op == UPDATE || op == DELETE || op == TRANSFORM : op;
 
             this.entry = entry;
@@ -5949,6 +5986,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
             this.conflictResolve = conflictResolve;
             this.intercept = intercept;
             this.updateCntr = updateCntr;
+            this.skipInterceptorOnConflict = skipInterceptorOnConflict;
 
             switch (op) {
                 case UPDATE:
@@ -6306,7 +6344,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                         newExpireTime = entry.expireTimeExtras();
                     }
                     else if (newSysTtl == CU.TTL_ZERO) {
-                        op = GridCacheOperation.DELETE;
+                        op = DELETE;
 
                         writeObj = null;
 
@@ -6326,18 +6364,16 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                 newSysExpireTime = newExpireTime = conflictCtx.expireTime();
             }
 
-            if (intercept) {
+            if (intercept && (conflictVer == null || 
!skipInterceptorOnConflict)) {
                 Object updated0 = cctx.unwrapBinaryIfNeeded(updated, 
keepBinary, false);
 
-                CacheLazyEntry<Object, Object> interceptEntry = new 
CacheLazyEntry<>(cctx,
-                    entry.key,
-                    null,
-                    oldVal,
-                    null,
-                    keepBinary);
+                CacheLazyEntry<Object, Object> interceptEntry =
+                    new CacheLazyEntry<>(cctx, entry.key, null, oldVal, null, 
keepBinary);
 
                 Object interceptorVal = 
cctx.config().getInterceptor().onBeforePut(interceptEntry, updated0);
 
+                wasIntercepted = true;
+
                 if (interceptorVal == null) {
                     treeOp = IgniteTree.OperationType.NOOP;
 
@@ -6441,16 +6477,14 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
 
             IgniteBiTuple<Boolean, Object> interceptRes = null;
 
-            if (intercept) {
-                CacheLazyEntry<Object, Object> intercepEntry = new 
CacheLazyEntry<>(cctx,
-                    entry.key,
-                    null,
-                    oldVal,
-                    null,
-                    keepBinary);
+            if (intercept && (conflictVer == null || 
!skipInterceptorOnConflict)) {
+                CacheLazyEntry<Object, Object> intercepEntry =
+                    new CacheLazyEntry<>(cctx, entry.key, null, oldVal, null, 
keepBinary);
 
                 interceptRes = 
cctx.config().getInterceptor().onBeforeRemove(intercepEntry);
 
+                wasIntercepted = true;
+
                 if (cctx.cancelRemove(interceptRes)) {
                     treeOp = IgniteTree.OperationType.NOOP;
 
@@ -6614,7 +6648,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                         assert conflictCtx.isUseNew();
 
                     // Update value is known at this point, so update 
operation type.
-                    op = writeObj != null ? GridCacheOperation.UPDATE : 
GridCacheOperation.DELETE;
+                    op = writeObj != null ? UPDATE : DELETE;
                 }
 
                 return conflictCtx;

Reply via email to