Repository: ignite Updated Branches: refs/heads/ignite-gg-10837 [created] 93656982b
IGNITE-GG-10837 WIP Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a011bb72 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a011bb72 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a011bb72 Branch: refs/heads/ignite-gg-10837 Commit: a011bb725b3f2fa7be61c2bb480d222190c67327 Parents: c930e7d Author: nikolay_tikhonov <[email protected]> Authored: Mon Dec 14 15:48:12 2015 +0300 Committer: nikolay_tikhonov <[email protected]> Committed: Tue Dec 15 13:02:13 2015 +0300 ---------------------------------------------------------------------- .../dht/atomic/GridNearAtomicUpdateFuture.java | 5 +-- .../cache/dr/GridCacheDrExpirationInfo.java | 16 ++++++++++ .../processors/cache/dr/GridCacheDrInfo.java | 33 +++++++++++++++++--- .../cache/version/GridCacheVersionManager.java | 25 ++++++++++----- 4 files changed, 65 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a011bb72/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index ba3d546..d9db2ec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -46,6 +46,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheTryPutFailedExceptio import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache; +import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -987,7 +988,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> else if (conflictPutVals != null) { GridCacheDrInfo conflictPutVal = conflictPutValsIt.next(); - val = conflictPutVal.value(); + val = conflictPutVal.valueEx(); conflictVer = conflictPutVal.version(); conflictTtl = conflictPutVal.ttl(); conflictExpireTime = conflictPutVal.expireTime(); @@ -1090,7 +1091,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> // Conflict PUT. GridCacheDrInfo conflictPutVal = F.first(conflictPutVals); - val = conflictPutVal.value(); + val = conflictPutVal.valueEx(); conflictVer = conflictPutVal.version(); conflictTtl = conflictPutVal.ttl(); conflictExpireTime = conflictPutVal.expireTime(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a011bb72/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrExpirationInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrExpirationInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrExpirationInfo.java index 7293950..c5f645f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrExpirationInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrExpirationInfo.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache.dr; +import javax.cache.processor.EntryProcessor; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.typedef.internal.S; @@ -56,6 +57,21 @@ public class GridCacheDrExpirationInfo extends GridCacheDrInfo { this.expireTime = expireTime; } + /** + * Constructor. + * + * @param proc Entry processor. + * @param ver Version. + * @param ttl TTL. + * @param expireTime Expire time. + */ + public GridCacheDrExpirationInfo(EntryProcessor proc, GridCacheVersion ver, long ttl, long expireTime) { + super(proc, ver); + + this.ttl = ttl; + this.expireTime = expireTime; + } + /** {@inheritDoc} */ @Override public long ttl() { return ttl; http://git-wip-us.apache.org/repos/asf/ignite/blob/a011bb72/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java index 8635fe2..d37eb7b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java @@ -21,6 +21,7 @@ import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; +import javax.cache.processor.EntryProcessor; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -36,6 +37,9 @@ public class GridCacheDrInfo implements Externalizable { /** Value. */ private CacheObject val; + /** Entry processor. */ + private EntryProcessor proc; + /** DR version. */ private GridCacheVersion ver; @@ -61,6 +65,20 @@ public class GridCacheDrInfo implements Externalizable { } /** + * Constructor. + * + * @param proc Entry processor. + * @param ver Version. + */ + public GridCacheDrInfo(EntryProcessor proc, GridCacheVersion ver) { + assert val != null; + assert ver != null; + + this.proc = proc; + this.ver = ver; + } + + /** * @return Value. */ public CacheObject value() { @@ -68,6 +86,13 @@ public class GridCacheDrInfo implements Externalizable { } /** + * @return Value (entry processor or cache object. + */ + public Object valueEx() { + return val == null ? proc : val; + } + + /** * @return Version. */ public GridCacheVersion version() { @@ -88,13 +113,13 @@ public class GridCacheDrInfo implements Externalizable { return CU.EXPIRE_TIME_ETERNAL; } - @Override - public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { assert false; } - @Override - public void writeExternal(ObjectOutput out) throws IOException { + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { assert false; } http://git-wip-us.apache.org/repos/asf/ignite/blob/a011bb72/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java index 68d03cd..b5fc4ca 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java @@ -176,7 +176,15 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { * @return Next version based on current topology. */ public GridCacheVersion next() { - return next(cctx.kernalContext().discovery().topologyVersion(), true, false); + return next(cctx.kernalContext().discovery().topologyVersion(), true, false, null); + } + + /** + * @param dataCenterId Data center id. + * @return Next version based on current topology with given data center id. + */ + public GridCacheVersion next(byte dataCenterId) { + return next(cctx.kernalContext().discovery().topologyVersion(), true, false, dataCenterId); } /** @@ -188,7 +196,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { * @return Next version based on given topology version. */ public GridCacheVersion next(AffinityTopologyVersion topVer) { - return next(topVer.topologyVersion(), true, false); + return next(topVer.topologyVersion(), true, false, null); } /** @@ -197,7 +205,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { * @return Next version for cache store operations. */ public GridCacheVersion nextForLoad() { - return next(cctx.kernalContext().discovery().topologyVersion(), true, true); + return next(cctx.kernalContext().discovery().topologyVersion(), true, true, null); } /** @@ -206,7 +214,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { * @return Next version for cache store operations. */ public GridCacheVersion nextForLoad(AffinityTopologyVersion topVer) { - return next(topVer.topologyVersion(), true, true); + return next(topVer.topologyVersion(), true, true, null); } /** @@ -215,7 +223,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { * @return Next version for cache store operations. */ public GridCacheVersion nextForLoad(GridCacheVersion ver) { - return next(ver.topologyVersion(), false, true); + return next(ver.topologyVersion(), false, true, null); } /** @@ -225,7 +233,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { * @return Next version based on given cache version. */ public GridCacheVersion next(GridCacheVersion ver) { - return next(ver.topologyVersion(), false, false); + return next(ver.topologyVersion(), false, false, null); } /** @@ -237,9 +245,10 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { * @param topVer Topology version for which new version should be obtained. * @param addTime If {@code true} then adds to the given topology version number of seconds * from the start time of the first grid node. + * @param dataCenterId0 Data center id. * @return New lock order. */ - private GridCacheVersion next(long topVer, boolean addTime, boolean forLoad) { + private GridCacheVersion next(long topVer, boolean addTime, boolean forLoad, Byte dataCenterId0) { if (topVer == -1) topVer = cctx.kernalContext().discovery().topologyVersion(); @@ -261,7 +270,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { globalTime, ord, locNodeOrder, - dataCenterId); + dataCenterId0 == null ? dataCenterId : dataCenterId0); last = next;
