Repository: ignite Updated Branches: refs/heads/master 7868e6679 -> 7414c9956
IGNITE-10130 Added an option to disable cache interceptor trigger in case of conflicts - Fixes #5251. Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7414c995 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7414c995 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7414c995 Branch: refs/heads/master Commit: 7414c9956d9f00e6514a508035156f2605df286a Parents: 7868e66 Author: Sergey Antonov <antonovserge...@gmail.com> Authored: Mon Dec 24 14:33:06 2018 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Mon Dec 24 14:47:53 2018 +0300 ---------------------------------------------------------------------- .../apache/ignite/IgniteSystemProperties.java | 7 + .../processors/cache/GridCacheContext.java | 12 ++ .../processors/cache/GridCacheMapEntry.java | 168 +++++++++++-------- 3 files changed, 120 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7414c995/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 9979ee1..4cfb361 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -1042,6 +1042,13 @@ public final class IgniteSystemProperties { public static final String IGNITE_RECOVERY_VERBOSE_LOGGING = "IGNITE_RECOVERY_VERBOSE_LOGGING"; /** + * Disables cache interceptor triggering in case of conflicts. + * + * Default is {@code false}. + */ + public static final String IGNITE_DISABLE_TRIGGERING_CACHE_INTERCEPTOR_ON_CONFLICT = "IGNITE_DISABLE_TRIGGERING_CACHE_INTERCEPTOR_ON_CONFLICT"; + + /** * Sets default {@link CacheConfiguration#setDiskPageCompression disk page compression}. */ public static final String IGNITE_DEFAULT_DISK_PAGE_COMPRESSION = "IGNITE_DEFAULT_DISK_PAGE_COMPRESSION"; http://git-wip-us.apache.org/repos/asf/ignite/blob/7414c995/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 19c01b2..2f73218 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -109,6 +109,7 @@ import org.apache.ignite.plugin.security.SecurityException; import org.apache.ignite.plugin.security.SecurityPermission; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISABLE_TRIGGERING_CACHE_INTERCEPTOR_ON_CONFLICT; import static org.apache.ignite.IgniteSystemProperties.IGNITE_READ_LOAD_BALANCING; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; @@ -276,6 +277,10 @@ public class GridCacheContext<K, V> implements Externalizable { /** Recovery mode flag. */ private volatile boolean recoveryMode; + /** */ + private final boolean disableTriggeringCacheInterceptorOnConflict = + Boolean.parseBoolean(System.getProperty(IGNITE_DISABLE_TRIGGERING_CACHE_INTERCEPTOR_ON_CONFLICT, "false")); + /** * Empty constructor required for {@link Externalizable}. */ @@ -861,6 +866,13 @@ public class GridCacheContext<K, V> implements Externalizable { } /** + * @return {@code True} if cache interceptor should be skipped in case of conflicts. + */ + public boolean disableTriggeringCacheInterceptorOnConflict() { + return disableTriggeringCacheInterceptorOnConflict; + } + + /** * @return Local node. */ public ClusterNode localNode() { http://git-wip-us.apache.org/repos/asf/ignite/blob/7414c995/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 881e868..899417d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -34,6 +34,7 @@ import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.CacheInterceptor; import org.apache.ignite.cache.eviction.EvictableEntry; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.NodeStoppingException; @@ -1157,17 +1158,17 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme assert !invoke || res.invokeResult() != null; - if(invoke) // No-op invoke happened. + if (invoke) // No-op invoke happened. updRes.invokeResult(res.invokeResult()); updRes.filtered(true); - if(retVal) + if (retVal) updRes.prevValue(res.oldValue()); return updRes; } - else if(noCreate && !invoke && res.resultType() == ResultType.PREV_NULL) + else if (noCreate && !invoke && res.resultType() == ResultType.PREV_NULL) return new GridCacheUpdateTxResult(false); else if (res.resultType() == ResultType.LOCKED) { unlockEntry(); @@ -1262,7 +1263,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme updRes.newValue(res.newValue()); - if(invoke) { + if (invoke) { assert res.invokeResult() != null; updRes.invokeResult(res.invokeResult()); @@ -1369,7 +1370,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme GridCacheUpdateTxResult updRes = valid ? new GridCacheUpdateTxResult(true, 0L, logPtr) : new GridCacheUpdateTxResult(false, logPtr); - if(retVal && (res.resultType() == ResultType.PREV_NOT_NULL || res.resultType() == ResultType.VERSION_FOUND)) + if (retVal && (res.resultType() == ResultType.PREV_NOT_NULL || res.resultType() == ResultType.VERSION_FOUND)) updRes.prevValue(res.oldValue()); if (needOldVal && compareIgnoreOpCounter(res.resultVersion(), mvccVer) != 0 && @@ -1520,17 +1521,18 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme old = oldValPresent ? oldVal : this.val; + if (intercept) + intercept = !skipInterceptor(explicitVer); + if (intercept) { val0 = cctx.unwrapBinaryIfNeeded(val, keepBinary, false); CacheLazyEntry e = new CacheLazyEntry(cctx, key, old, keepBinary); - Object interceptorVal = cctx.config().getInterceptor().onBeforePut( - new CacheLazyEntry(cctx, key, old, keepBinary), - val0); - key0 = e.key(); + Object interceptorVal = cctx.config().getInterceptor().onBeforePut(e, val0); + if (interceptorVal == null) return new GridCacheUpdateTxResult(false, logPtr); else if (interceptorVal != val0) @@ -1749,6 +1751,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme old = oldValPresent ? oldVal : val; + if (intercept) + intercept = !skipInterceptor(explicitVer); + if (intercept) { entry0 = new CacheLazyEntry(cctx, key, old, keepBinary); @@ -2072,12 +2077,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme else updated = (CacheObject)writeObj; - op = updated == null ? GridCacheOperation.DELETE : GridCacheOperation.UPDATE; + op = updated == null ? DELETE : UPDATE; if (intercept) { CacheLazyEntry e; - if (op == GridCacheOperation.UPDATE) { + if (op == UPDATE) { updated0 = value(updated0, updated, keepBinary, false); e = new CacheLazyEntry(cctx, key, key0, old, old0, keepBinary); @@ -2110,7 +2115,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme long ttl = CU.TTL_ETERNAL; long expireTime = CU.EXPIRE_TIME_ETERNAL; - if (op == GridCacheOperation.UPDATE) { + if (op == UPDATE) { if (expiryPlc != null) { ttl = CU.toTtl(hadVal ? expiryPlc.getExpiryForUpdate() : expiryPlc.getExpiryForCreation()); @@ -2128,14 +2133,14 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } if (ttl == CU.TTL_ZERO) { - op = GridCacheOperation.DELETE; + op = DELETE; //If time expired no transformation needed. transformOp = false; } // Try write-through. - if (op == GridCacheOperation.UPDATE) { + if (op == UPDATE) { // Detach value before index update. updated = cctx.kernalContext().cacheObjects().prepareForCache(updated, cctx); @@ -2229,7 +2234,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } if (intercept) { - if (op == GridCacheOperation.UPDATE) + if (op == UPDATE) cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key, key0, updated, updated0, keepBinary, 0L)); else cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(cctx, key, key0, old, old0, keepBinary, 0L)); @@ -2322,7 +2327,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme conflictVer, conflictResolve, intercept, - updateCntr); + updateCntr, + cctx.disableTriggeringCacheInterceptorOnConflict() + ); key.valueBytes(cctx.cacheObjectContext()); @@ -2443,7 +2450,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme keepBinary); } - if (c.op == GridCacheOperation.UPDATE) { + if (c.op == UPDATE) { updateVal = val; assert updateVal != null : c; @@ -2474,7 +2481,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } } else { - assert c.op == GridCacheOperation.DELETE : c.op; + assert c.op == DELETE : c.op; clearReaders(); @@ -2523,29 +2530,23 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme topVer); } - if (intercept) { - if (c.op == GridCacheOperation.UPDATE) { - cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry( - cctx, - key, - null, - updateVal, - null, - keepBinary, - c.updateRes.updateCounter())); - } - else { - assert c.op == GridCacheOperation.DELETE : c.op; + if (intercept && c.wasIntercepted) { + assert c.op == UPDATE || c.op == DELETE : c.op; - cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry( - cctx, - key, - null, - oldVal, - null, - keepBinary, - c.updateRes.updateCounter())); - } + Cache.Entry<?,?> entry = new CacheLazyEntry<>( + cctx, + key, + null, + c.op == UPDATE ? updateVal : oldVal, + null, + keepBinary, + c.updateRes.updateCounter() + ); + + if (c.op == UPDATE) + cctx.config().getInterceptor().onAfterPut(entry); + else + cctx.config().getInterceptor().onAfterRemove(entry); } } finally { @@ -3288,6 +3289,34 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme return val != null; } + /** + * Checks, that changes were got by DR. + * + * @param explicitVer â Explicit version (if any). + * @return {@code true} if changes were got by DR and {@code false} otherwise. + */ + private boolean isRemoteDrUpdate(@Nullable GridCacheVersion explicitVer) { + return explicitVer != null && explicitVer.dataCenterId() != cctx.dr().dataCenterId(); + } + + /** + * Checks, that cache interceptor should be skipped. + * <p> + * It is expects by default behavior that Interceptor methods ({@link CacheInterceptor#onBeforePut(Cache.Entry, + * Object)}, {@link CacheInterceptor#onAfterPut(Cache.Entry)}, {@link CacheInterceptor#onBeforeRemove(Cache.Entry)} + * and {@link CacheInterceptor#onAfterRemove(Cache.Entry)}) will be called, but {@link + * CacheInterceptor#onGet(Object, Object)}. This can even make DR-update flow broken in case of non-idempotent + * Interceptor and force users to call onGet manually as the only workaround. Also, user may want to skip + * Interceptor to avoid redundant entry transformation for DR updates and exchange with internal data b/w data + * centres which is a normal case. + * + * @param explicitVer - Explicit version (if any). + * @return {@code true} if cache interceptor should be skipped and {@code false} otherwise. + */ + private boolean skipInterceptor(@Nullable GridCacheVersion explicitVer) { + return isRemoteDrUpdate(explicitVer) && cctx.disableTriggeringCacheInterceptorOnConflict(); + } + /** {@inheritDoc} */ @Override public CacheObject rawPut(CacheObject val, long ttl) { lockEntry(); @@ -3470,7 +3499,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme cctx.cacheId(), key, val, - val == null ? GridCacheOperation.DELETE : GridCacheOperation.CREATE, + val == null ? DELETE : GridCacheOperation.CREATE, null, ver, expireTime, @@ -3483,7 +3512,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme cctx.cacheId(), key, val, - val == null ? GridCacheOperation.DELETE : GridCacheOperation.CREATE, + val == null ? DELETE : GridCacheOperation.CREATE, null, ver, expireTime, @@ -4347,9 +4376,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (tx.local()) { // For remote tx we log all updates in batch: GridDistributedTxRemoteAdapter.commitIfLocked() GridCacheOperation op; if (val == null) - op = GridCacheOperation.DELETE; + op = DELETE; else - op = this.val == null ? GridCacheOperation.CREATE : GridCacheOperation.UPDATE; + op = this.val == null ? GridCacheOperation.CREATE : UPDATE; return cctx.shared().wal().log(new DataRecord(new DataEntry( cctx.cacheId(), @@ -4383,9 +4412,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (tx.local()) { // For remote tx we log all updates in batch: GridDistributedTxRemoteAdapter.commitIfLocked() GridCacheOperation op; if (val == null) - op = GridCacheOperation.DELETE; + op = DELETE; else - op = this.val == null ? GridCacheOperation.CREATE : GridCacheOperation.UPDATE; + op = this.val == null ? GridCacheOperation.CREATE : UPDATE; return cctx.shared().wal().log(new MvccDataRecord(new MvccDataEntry( cctx.cacheId(), @@ -4959,7 +4988,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme */ private void updateMetrics(GridCacheOperation op, boolean metrics, boolean transformed, boolean hasOldVal) { if (metrics && cctx.statisticsEnabled()) { - if (op == GridCacheOperation.DELETE) { + if (op == DELETE) { cctx.cache().metrics0().onRemove(); if (transformed) @@ -5535,7 +5564,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme updRes.filtered(true); - if(needVal) + if (needVal) updRes.prevValue(res.oldValue()); resFut.onDone(updRes); @@ -5619,7 +5648,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme GridCacheUpdateTxResult updRes = valid ? new GridCacheUpdateTxResult(true, 0L, logPtr) : new GridCacheUpdateTxResult(false, logPtr); - if(invoke) { + if (invoke) { assert res.invokeResult() != null; updRes.invokeResult(res.invokeResult()); @@ -5894,6 +5923,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme private final Long updateCntr; /** */ + private final boolean skipInterceptorOnConflict; + + /** */ private GridCacheUpdateAtomicResult updateRes; /** */ @@ -5906,8 +5938,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme private CacheDataRow oldRow; /** OldRow expiration flag. */ - private boolean oldRowExpiredFlag = false; + private boolean oldRowExpiredFlag; + + /** Disable interceptor invocation onAfter* methods flag. */ + private boolean wasIntercepted; + /** */ AtomicCacheUpdateClosure( GridCacheMapEntry entry, AffinityTopologyVersion topVer, @@ -5927,7 +5963,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme @Nullable GridCacheVersion conflictVer, boolean conflictResolve, boolean intercept, - @Nullable Long updateCntr) { + @Nullable Long updateCntr, + boolean skipInterceptorOnConflict) { assert op == UPDATE || op == DELETE || op == TRANSFORM : op; this.entry = entry; @@ -5949,6 +5986,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme this.conflictResolve = conflictResolve; this.intercept = intercept; this.updateCntr = updateCntr; + this.skipInterceptorOnConflict = skipInterceptorOnConflict; switch (op) { case UPDATE: @@ -6306,7 +6344,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme newExpireTime = entry.expireTimeExtras(); } else if (newSysTtl == CU.TTL_ZERO) { - op = GridCacheOperation.DELETE; + op = DELETE; writeObj = null; @@ -6326,18 +6364,16 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme newSysExpireTime = newExpireTime = conflictCtx.expireTime(); } - if (intercept) { + if (intercept && (conflictVer == null || !skipInterceptorOnConflict)) { Object updated0 = cctx.unwrapBinaryIfNeeded(updated, keepBinary, false); - CacheLazyEntry<Object, Object> interceptEntry = new CacheLazyEntry<>(cctx, - entry.key, - null, - oldVal, - null, - keepBinary); + CacheLazyEntry<Object, Object> interceptEntry = + new CacheLazyEntry<>(cctx, entry.key, null, oldVal, null, keepBinary); Object interceptorVal = cctx.config().getInterceptor().onBeforePut(interceptEntry, updated0); + wasIntercepted = true; + if (interceptorVal == null) { treeOp = IgniteTree.OperationType.NOOP; @@ -6441,16 +6477,14 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme IgniteBiTuple<Boolean, Object> interceptRes = null; - if (intercept) { - CacheLazyEntry<Object, Object> intercepEntry = new CacheLazyEntry<>(cctx, - entry.key, - null, - oldVal, - null, - keepBinary); + if (intercept && (conflictVer == null || !skipInterceptorOnConflict)) { + CacheLazyEntry<Object, Object> intercepEntry = + new CacheLazyEntry<>(cctx, entry.key, null, oldVal, null, keepBinary); interceptRes = cctx.config().getInterceptor().onBeforeRemove(intercepEntry); + wasIntercepted = true; + if (cctx.cancelRemove(interceptRes)) { treeOp = IgniteTree.OperationType.NOOP; @@ -6614,7 +6648,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme assert conflictCtx.isUseNew(); // Update value is known at this point, so update operation type. - op = writeObj != null ? GridCacheOperation.UPDATE : GridCacheOperation.DELETE; + op = writeObj != null ? UPDATE : DELETE; } return conflictCtx;