Fixed conflict resolver API.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4e61602e Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4e61602e Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4e61602e Branch: refs/heads/sql-store Commit: 4e61602eca679bf3689bb23f2bc1c9e58b4eb8dc Parents: 1945b98 Author: nikolay_tikhonov <[email protected]> Authored: Mon Jan 25 19:16:47 2016 +0300 Committer: Tikhonov Nikolay <[email protected]> Committed: Mon Feb 1 07:43:59 2016 +0300 ---------------------------------------------------------------------- .../processors/cache/CacheOperationContext.java | 43 +++++-- .../processors/cache/GridCacheAdapter.java | 8 +- .../processors/cache/GridCacheProxyImpl.java | 11 +- .../processors/cache/IgniteCacheProxy.java | 43 ++++++- .../dht/atomic/GridDhtAtomicCache.java | 104 +++++++++++++---- .../dht/atomic/GridNearAtomicUpdateFuture.java | 4 +- .../processors/cache/dr/GridCacheDrInfo.java | 49 +++++++- .../transactions/IgniteTxLocalAdapter.java | 81 +++++++++----- .../cache/version/GridCacheVersionManager.java | 23 ++-- .../testframework/junits/GridAbstractTest.java | 3 + parent/pom.xml | 111 +++++++++++-------- pom.xml | 16 --- 12 files changed, 351 insertions(+), 145 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java index 21934d0..f39a09d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java @@ -48,6 +48,9 @@ public class CacheOperationContext implements Serializable { /** Expiry policy. */ private final ExpiryPolicy expiryPlc; + /** Data center Id. */ + private final Byte dataCenterId; + /** * Constructor with default values. */ @@ -61,6 +64,8 @@ public class CacheOperationContext implements Serializable { expiryPlc = null; noRetries = false; + + dataCenterId = null; } /** @@ -68,13 +73,15 @@ public class CacheOperationContext implements Serializable { * @param subjId Subject ID. * @param keepBinary Keep binary flag. * @param expiryPlc Expiry policy. + * @param dataCenterId Data center id. */ public CacheOperationContext( boolean skipStore, @Nullable UUID subjId, boolean keepBinary, @Nullable ExpiryPolicy expiryPlc, - boolean noRetries) { + boolean noRetries, + @Nullable Byte dataCenterId) { this.skipStore = skipStore; this.subjId = subjId; @@ -84,6 +91,8 @@ public class CacheOperationContext implements Serializable { this.expiryPlc = expiryPlc; this.noRetries = noRetries; + + this.dataCenterId = dataCenterId; } /** @@ -94,6 +103,13 @@ public class CacheOperationContext implements Serializable { } /** + * @return {@code True} if data center id is set otherwise {@code false}. + */ + public boolean hasDataCenterId() { + return dataCenterId != null; + } + + /** * See {@link IgniteInternalCache#keepBinary()}. * * @return New instance of CacheOperationContext with keep binary flag. @@ -104,7 +120,8 @@ public class CacheOperationContext implements Serializable { subjId, true, expiryPlc, - noRetries); + noRetries, + dataCenterId); } /** @@ -117,6 +134,15 @@ public class CacheOperationContext implements Serializable { } /** + * Gets data center ID. + * + * @return Client ID. + */ + @Nullable public Byte dataCenterId() { + return dataCenterId; + } + + /** * See {@link IgniteInternalCache#forSubjectId(UUID)}. * * @param subjId Subject id. @@ -128,7 +154,8 @@ public class CacheOperationContext implements Serializable { subjId, keepBinary, expiryPlc, - noRetries); + noRetries, + dataCenterId); } /** @@ -150,7 +177,8 @@ public class CacheOperationContext implements Serializable { subjId, keepBinary, expiryPlc, - noRetries); + noRetries, + dataCenterId); } /** @@ -172,7 +200,8 @@ public class CacheOperationContext implements Serializable { subjId, true, plc, - noRetries); + noRetries, + dataCenterId); } /** @@ -185,8 +214,8 @@ public class CacheOperationContext implements Serializable { subjId, keepBinary, expiryPlc, - noRetries - ); + noRetries, + dataCenterId); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 3081cfb..9fd65e5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -447,7 +447,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** {@inheritDoc} */ @Override public GridCacheProxyImpl<K, V> forSubjectId(UUID subjId) { - CacheOperationContext opCtx = new CacheOperationContext(false, subjId, false, null, false); + CacheOperationContext opCtx = new CacheOperationContext(false, subjId, false, null, false, null); return new GridCacheProxyImpl<>(ctx, this, opCtx); } @@ -459,14 +459,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** {@inheritDoc} */ @Override public GridCacheProxyImpl<K, V> setSkipStore(boolean skipStore) { - CacheOperationContext opCtx = new CacheOperationContext(true, null, false, null, false); + CacheOperationContext opCtx = new CacheOperationContext(true, null, false, null, false, null); return new GridCacheProxyImpl<>(ctx, this, opCtx); } /** {@inheritDoc} */ @Override public <K1, V1> GridCacheProxyImpl<K1, V1> keepBinary() { - CacheOperationContext opCtx = new CacheOperationContext(false, null, true, null, false); + CacheOperationContext opCtx = new CacheOperationContext(false, null, true, null, false, null); return new GridCacheProxyImpl<>((GridCacheContext<K1, V1>)ctx, (GridCacheAdapter<K1, V1>)this, opCtx); } @@ -483,7 +483,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V assert !CU.isAtomicsCache(ctx.name()); assert !CU.isMarshallerCache(ctx.name()); - CacheOperationContext opCtx = new CacheOperationContext(false, null, false, plc, false); + CacheOperationContext opCtx = new CacheOperationContext(false, null, false, plc, false, null); return new GridCacheProxyImpl<>(ctx, this, opCtx); } http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java index 8ffd273..3a53942 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java @@ -209,7 +209,8 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte /** {@inheritDoc} */ @Override public GridCacheProxyImpl<K, V> forSubjectId(UUID subjId) { return new GridCacheProxyImpl<>(ctx, delegate, - opCtx != null ? opCtx.forSubjectId(subjId) : new CacheOperationContext(false, subjId, false, null, false)); + opCtx != null ? opCtx.forSubjectId(subjId) : + new CacheOperationContext(false, subjId, false, null, false, null)); } /** {@inheritDoc} */ @@ -221,7 +222,8 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte return this; return new GridCacheProxyImpl<>(ctx, delegate, - opCtx != null ? opCtx.setSkipStore(skipStore) : new CacheOperationContext(true, null, false, null, false)); + opCtx != null ? opCtx.setSkipStore(skipStore) : + new CacheOperationContext(true, null, false, null, false, null)); } finally { gate.leave(prev); @@ -236,7 +238,7 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte return new GridCacheProxyImpl<>((GridCacheContext<K1, V1>)ctx, (GridCacheAdapter<K1, V1>)delegate, - opCtx != null ? opCtx.keepBinary() : new CacheOperationContext(false, null, true, null, false)); + opCtx != null ? opCtx.keepBinary() : new CacheOperationContext(false, null, true, null, false, null)); } /** {@inheritDoc} */ @@ -1608,7 +1610,8 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte try { return new GridCacheProxyImpl<>(ctx, delegate, - opCtx != null ? opCtx.withExpiryPolicy(plc) : new CacheOperationContext(false, null, false, plc, false)); + opCtx != null ? opCtx.withExpiryPolicy(plc) : + new CacheOperationContext(false, null, false, plc, false, null)); } finally { gate.leave(prev); http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index b64c69c..9e66d4d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -307,7 +307,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V try { CacheOperationContext prj0 = opCtx != null ? opCtx.withExpiryPolicy(plc) : - new CacheOperationContext(false, null, false, plc, false); + new CacheOperationContext(false, null, false, plc, false, null); return new IgniteCacheProxy<>(ctx, delegate, prj0, isAsync(), lock); } @@ -339,7 +339,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return this; CacheOperationContext opCtx0 = opCtx != null ? opCtx.setNoRetries(true) : - new CacheOperationContext(false, null, false, null, true); + new CacheOperationContext(false, null, false, null, true, null); return new IgniteCacheProxy<>(ctx, delegate, @@ -1788,7 +1788,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V opCtx != null ? opCtx.subjectId() : null, true, opCtx != null ? opCtx.expiry() : null, - opCtx != null && opCtx.noRetries()); + opCtx != null && opCtx.noRetries(), + opCtx != null ? opCtx.dataCenterId() : null); return new IgniteCacheProxy<>((GridCacheContext<K1, V1>)ctx, (GridCacheAdapter<K1, V1>)delegate, @@ -1802,6 +1803,39 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** + * @return Projection for data center id. + */ + @SuppressWarnings("unchecked") + public IgniteCache<K, V> withDataCenterId(byte dataCenterId) { + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + Byte prevDataCenterId = opCtx != null ? opCtx.dataCenterId() : null; + + if (prevDataCenterId != null && dataCenterId == prevDataCenterId) + return this; + + CacheOperationContext opCtx0 = + new CacheOperationContext( + opCtx != null && opCtx.skipStore(), + opCtx != null ? opCtx.subjectId() : null, + opCtx != null && opCtx.isKeepBinary(), + opCtx != null ? opCtx.expiry() : null, + opCtx != null && opCtx.noRetries(), + dataCenterId); + + return new IgniteCacheProxy<>(ctx, + delegate, + opCtx0, + isAsync(), + lock); + } + finally { + onLeave(gate, prev); + } + } + + /** * @return Cache with skip store enabled. */ public IgniteCache<K, V> skipStore() { @@ -1820,7 +1854,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V opCtx != null ? opCtx.subjectId() : null, opCtx != null && opCtx.isKeepBinary(), opCtx != null ? opCtx.expiry() : null, - opCtx != null && opCtx.noRetries()); + opCtx != null && opCtx.noRetries(), + opCtx != null ? opCtx.dataCenterId() : null); return new IgniteCacheProxy<>(ctx, delegate, http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index aa79cfa..6b23550 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -77,7 +77,6 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSing import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; -import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx; @@ -99,6 +98,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.security.SecurityPermission; @@ -448,7 +448,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { true, false, filter, - true); + true, + UPDATE); } /** {@inheritDoc} */ @@ -464,7 +465,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { false, false, filter, - true); + true, + UPDATE); } /** {@inheritDoc} */ @@ -479,7 +481,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { true, false, ctx.noValArray(), - false).get(); + false, + UPDATE).get(); } /** {@inheritDoc} */ @@ -571,7 +574,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { true, true, ctx.equalsValArray(oldVal), - true); + true, + UPDATE); } /** {@inheritDoc} */ @@ -589,7 +593,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { false, false, CU.empty0(), - true).chain(RET2NULL); + true, + UPDATE).chain(RET2NULL); } /** {@inheritDoc} */ @@ -610,7 +615,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { false, false, null, - true); + true, + UPDATE); } /** {@inheritDoc} */ @@ -790,7 +796,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { false, false, null, - true); + true, + TRANSFORM); return fut.chain(new CX1<IgniteInternalFuture<Map<K, EntryProcessorResult<T>>>, EntryProcessorResult<T>>() { @Override public EntryProcessorResult<T> applyx(IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> fut) @@ -846,7 +853,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { false, false, null, - true); + true, + TRANSFORM); return resFut.chain(new CX1<IgniteInternalFuture<Map<K, EntryProcessorResult<T>>>, Map<K, EntryProcessorResult<T>>>() { @Override public Map<K, EntryProcessorResult<T>> applyx(IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> fut) throws IgniteCheckedException { @@ -882,7 +890,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { false, false, null, - true); + true, + TRANSFORM); } /** @@ -901,15 +910,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { */ @SuppressWarnings("ConstantConditions") private IgniteInternalFuture updateAllAsync0( - @Nullable final Map<? extends K, ? extends V> map, - @Nullable final Map<? extends K, ? extends EntryProcessor> invokeMap, + @Nullable Map<? extends K, ? extends V> map, + @Nullable Map<? extends K, ? extends EntryProcessor> invokeMap, @Nullable Object[] invokeArgs, - @Nullable final Map<KeyCacheObject, GridCacheDrInfo> conflictPutMap, - @Nullable final Map<KeyCacheObject, GridCacheVersion> conflictRmvMap, + @Nullable Map<KeyCacheObject, GridCacheDrInfo> conflictPutMap, + @Nullable Map<KeyCacheObject, GridCacheVersion> conflictRmvMap, final boolean retval, final boolean rawRetval, @Nullable final CacheEntryPredicate[] filter, - final boolean waitTopFut + final boolean waitTopFut, + final GridCacheOperation op ) { assert ctx.updatesAllowed(); @@ -918,7 +928,47 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ctx.checkSecurity(SecurityPermission.CACHE_PUT); - CacheOperationContext opCtx = ctx.operationContextPerCall(); + final CacheOperationContext opCtx = ctx.operationContextPerCall(); + + if (opCtx != null && opCtx.hasDataCenterId()) { + assert conflictPutMap == null : conflictPutMap; + assert conflictRmvMap == null : conflictRmvMap; + + if (op == GridCacheOperation.TRANSFORM) { + assert invokeMap != null : invokeMap; + + conflictPutMap = F.viewReadOnly((Map)invokeMap, + new IgniteClosure<EntryProcessor, GridCacheDrInfo>() { + @Override public GridCacheDrInfo apply(EntryProcessor o) { + return new GridCacheDrInfo(o, ctx.versions().next(opCtx.dataCenterId())); + } + }); + + invokeMap = null; + } + else if (op == GridCacheOperation.DELETE) { + assert map != null : map; + + conflictRmvMap = F.viewReadOnly((Map)map, new IgniteClosure<V, GridCacheVersion>() { + @Override public GridCacheVersion apply(V o) { + return ctx.versions().next(opCtx.dataCenterId()); + } + }); + + map = null; + } + else { + assert map != null : map; + + conflictPutMap = F.viewReadOnly((Map)map, new IgniteClosure<V, GridCacheDrInfo>() { + @Override public GridCacheDrInfo apply(V o) { + return new GridCacheDrInfo(ctx.toCacheObject(o), ctx.versions().next(opCtx.dataCenterId())); + } + }); + + map = null; + } + } UUID subjId = ctx.subjectIdPerCall(null, opCtx); @@ -928,7 +978,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ctx, this, ctx.config().getWriteSynchronizationMode(), - invokeMap != null ? TRANSFORM : UPDATE, + op, map != null ? map.keySet() : invokeMap != null ? invokeMap.keySet() : conflictPutMap != null ? conflictPutMap.keySet() : conflictRmvMap.keySet(), map != null ? map.values() : invokeMap != null ? invokeMap.values() : null, @@ -966,8 +1016,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @return Completion future. */ private IgniteInternalFuture removeAllAsync0( - @Nullable final Collection<? extends K> keys, - @Nullable final Map<KeyCacheObject, GridCacheVersion> conflictMap, + @Nullable Collection<? extends K> keys, + @Nullable Map<KeyCacheObject, GridCacheVersion> conflictMap, final boolean retval, boolean rawRetval, @Nullable final CacheEntryPredicate[] filter @@ -985,12 +1035,24 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ctx.checkSecurity(SecurityPermission.CACHE_REMOVE); - CacheOperationContext opCtx = ctx.operationContextPerCall(); + final CacheOperationContext opCtx = ctx.operationContextPerCall(); UUID subjId = ctx.subjectIdPerCall(null, opCtx); int taskNameHash = ctx.kernalContext().job().currentTaskNameHash(); + Collection<GridCacheVersion> drVers = null; + + if (opCtx != null && keys != null && opCtx.hasDataCenterId()) { + assert conflictMap == null : conflictMap; + + drVers = F.transform(keys, new C1<K, GridCacheVersion>() { + @Override public GridCacheVersion apply(K k) { + return ctx.versions().next(opCtx.dataCenterId()); + } + }); + } + final GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture( ctx, this, @@ -1000,7 +1062,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, null, null, - keys != null ? null : conflictMap.values(), + drVers != null ? drVers : (keys != null ? null : conflictMap.values()), retval, rawRetval, (filter != null && opCtx != null) ? opCtx.expiry() : null, http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 3c86083..c9e1a11 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -1034,7 +1034,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> else if (conflictPutVals != null) { GridCacheDrInfo conflictPutVal = conflictPutValsIt.next(); - val = conflictPutVal.value(); + val = conflictPutVal.valueEx(); conflictVer = conflictPutVal.version(); conflictTtl = conflictPutVal.ttl(); conflictExpireTime = conflictPutVal.expireTime(); @@ -1142,7 +1142,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> // Conflict PUT. GridCacheDrInfo conflictPutVal = F.first(conflictPutVals); - val = conflictPutVal.value(); + val = conflictPutVal.valueEx(); conflictVer = conflictPutVal.version(); conflictTtl = conflictPutVal.ttl(); conflictExpireTime = conflictPutVal.expireTime(); http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java index 8635fe2..02bc6b5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java @@ -21,6 +21,7 @@ import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; +import javax.cache.processor.EntryProcessor; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -36,6 +37,9 @@ public class GridCacheDrInfo implements Externalizable { /** Value. */ private CacheObject val; + /** Entry processor. */ + private EntryProcessor proc; + /** DR version. */ private GridCacheVersion ver; @@ -61,6 +65,29 @@ public class GridCacheDrInfo implements Externalizable { } /** + * Constructor. + * + * @param ver Version. + */ + public GridCacheDrInfo(GridCacheVersion ver) { + this.ver = ver; + } + + /** + * Constructor. + * + * @param proc Entry processor. + * @param ver Version. + */ + public GridCacheDrInfo(EntryProcessor proc, GridCacheVersion ver) { + assert proc != null; + assert ver != null; + + this.proc = proc; + this.ver = ver; + } + + /** * @return Value. */ public CacheObject value() { @@ -68,6 +95,20 @@ public class GridCacheDrInfo implements Externalizable { } /** + * @return Entry processor. + */ + public EntryProcessor entryProcessor() { + return proc; + } + + /** + * @return Value (entry processor or cache object. + */ + public Object valueEx() { + return val == null ? proc : val; + } + + /** * @return Version. */ public GridCacheVersion version() { @@ -88,13 +129,13 @@ public class GridCacheDrInfo implements Externalizable { return CU.EXPIRE_TIME_ETERNAL; } - @Override - public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { assert false; } - @Override - public void writeExternal(ObjectOutput out) throws IOException { + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { assert false; } http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 926eaf2..aad9841 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -1977,8 +1977,14 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig GridCacheContext cacheCtx, Map<KeyCacheObject, GridCacheDrInfo> drMap ) { + Map<KeyCacheObject, Object> map = F.viewReadOnly(drMap, new IgniteClosure<GridCacheDrInfo, Object>() { + @Override public Object apply(GridCacheDrInfo val) { + return val.value(); + } + }); + return this.<Object, Object>putAllAsync0(cacheCtx, - null, + map, null, null, drMap, @@ -2055,7 +2061,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig final GridCacheReturn ret, boolean skipStore, final boolean singleRmv, - boolean keepBinary) { + boolean keepBinary, + Byte dataCenterId) { try { addActiveCache(cacheCtx); @@ -2066,6 +2073,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig if (entryProcessor != null) transform = true; + GridCacheVersion drVer = dataCenterId != null ? cctx.versions().next(dataCenterId) : null; + boolean loadMissed = enlistWriteEntry(cacheCtx, cacheKey, val, @@ -2075,7 +2084,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig retval, lockOnly, filter, - /*drVer*/null, + /*drVer*/drVer, /*drTtl*/-1L, /*drExpireTime*/-1L, ret, @@ -2125,6 +2134,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig * @param drRmvMap DR remove map (optional). * @param skipStore Skip store flag. * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}. + * @param keepBinary Keep binary flag. + * @param dataCenterId Optional data center ID. * @return Future for missing values loading. */ private <K, V> IgniteInternalFuture<Void> enlistWrite( @@ -2143,7 +2154,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig @Nullable Map<KeyCacheObject, GridCacheVersion> drRmvMap, boolean skipStore, final boolean singleRmv, - final boolean keepBinary + final boolean keepBinary, + Byte dataCenterId ) { assert retval || invokeMap == null; @@ -2197,6 +2209,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig drTtl = -1L; drExpireTime = -1L; } + else if (dataCenterId != null) { + drVer = cctx.versions().next(dataCenterId); + drTtl = -1L; + drExpireTime = -1L; + } else { drVer = null; drTtl = -1L; @@ -2938,6 +2955,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig CacheOperationContext opCtx = cacheCtx.operationContextPerCall(); + final Byte dataCenterId = opCtx != null ? opCtx.dataCenterId() : null; + KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key); boolean keepBinary = opCtx != null && opCtx.isKeepBinary(); @@ -2955,7 +2974,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig ret, opCtx != null && opCtx.skipStore(), /*singleRmv*/false, - keepBinary); + keepBinary, + dataCenterId); if (pessimistic()) { assert loadFut == null || loadFut.isDone() : loadFut; @@ -3053,7 +3073,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig @Nullable Map<? extends K, ? extends V> map, @Nullable Map<? extends K, ? extends EntryProcessor<K, V, Object>> invokeMap, @Nullable final Object[] invokeArgs, - @Nullable final Map<KeyCacheObject, GridCacheDrInfo> drMap, + @Nullable Map<KeyCacheObject, GridCacheDrInfo> drMap, final boolean retval, @Nullable final CacheEntryPredicate[] filter ) { @@ -3066,25 +3086,22 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig return new GridFinishedFuture(e); } - // Cached entry may be passed only from entry wrapper. - final Map<?, ?> map0; - final Map<?, EntryProcessor<K, V, Object>> invokeMap0; + final CacheOperationContext opCtx = cacheCtx.operationContextPerCall(); - if (drMap != null) { - assert map == null; + final Byte dataCenterId; - map0 = F.viewReadOnly(drMap, new IgniteClosure<GridCacheDrInfo, Object>() { - @Override public Object apply(GridCacheDrInfo val) { - return val.value(); - } - }); + if (opCtx != null && opCtx.hasDataCenterId()) { + assert drMap == null : drMap; + assert map != null || invokeMap != null; - invokeMap0 = null; - } - else { - map0 = map; - invokeMap0 = (Map<K, EntryProcessor<K, V, Object>>)invokeMap; + dataCenterId = opCtx.dataCenterId(); } + else + dataCenterId = null; + + // Cached entry may be passed only from entry wrapper. + final Map<?, ?> map0 = map; + final Map<?, EntryProcessor<K, V, Object>> invokeMap0 = (Map<K, EntryProcessor<K, V, Object>>)invokeMap; if (log.isDebugEnabled()) log.debug("Called putAllAsync(...) [tx=" + this + ", map=" + map0 + ", retval=" + retval + "]"); @@ -3110,8 +3127,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig final Collection<KeyCacheObject> enlisted = new ArrayList<>(keySet.size()); - CacheOperationContext opCtx = cacheCtx.operationContextPerCall(); - final boolean keepBinary = opCtx != null && opCtx.isKeepBinary(); final IgniteInternalFuture<Void> loadFut = enlistWrite( @@ -3130,7 +3145,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig null, opCtx != null && opCtx.skipStore(), false, - keepBinary); + keepBinary, + dataCenterId); if (pessimistic()) { assert loadFut == null || loadFut.isDone() : loadFut; @@ -3334,6 +3350,18 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig else keys0 = keys; + CacheOperationContext opCtx = cacheCtx.operationContextPerCall(); + + final Byte dataCenterId; + + if (opCtx != null && opCtx.hasDataCenterId()) { + assert drMap == null : drMap; + + dataCenterId = opCtx.dataCenterId(); + } + else + dataCenterId = null; + assert keys0 != null; if (log.isDebugEnabled()) { @@ -3367,8 +3395,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig final Collection<KeyCacheObject> enlisted = new ArrayList<>(); - CacheOperationContext opCtx = cacheCtx.operationContextPerCall(); - ExpiryPolicy plc; if (!F.isEmpty(filter)) @@ -3394,7 +3420,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig drMap, opCtx != null && opCtx.skipStore(), singleRmv, - keepBinary + keepBinary, + dataCenterId ); if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java index 68d03cd..166c713 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java @@ -176,7 +176,15 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { * @return Next version based on current topology. */ public GridCacheVersion next() { - return next(cctx.kernalContext().discovery().topologyVersion(), true, false); + return next(cctx.kernalContext().discovery().topologyVersion(), true, false, dataCenterId); + } + + /** + * @param dataCenterId Data center id. + * @return Next version based on current topology with given data center id. + */ + public GridCacheVersion next(byte dataCenterId) { + return next(cctx.kernalContext().discovery().topologyVersion(), true, false, dataCenterId); } /** @@ -188,7 +196,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { * @return Next version based on given topology version. */ public GridCacheVersion next(AffinityTopologyVersion topVer) { - return next(topVer.topologyVersion(), true, false); + return next(topVer.topologyVersion(), true, false, dataCenterId); } /** @@ -197,7 +205,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { * @return Next version for cache store operations. */ public GridCacheVersion nextForLoad() { - return next(cctx.kernalContext().discovery().topologyVersion(), true, true); + return next(cctx.kernalContext().discovery().topologyVersion(), true, true, dataCenterId); } /** @@ -206,7 +214,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { * @return Next version for cache store operations. */ public GridCacheVersion nextForLoad(AffinityTopologyVersion topVer) { - return next(topVer.topologyVersion(), true, true); + return next(topVer.topologyVersion(), true, true, dataCenterId); } /** @@ -215,7 +223,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { * @return Next version for cache store operations. */ public GridCacheVersion nextForLoad(GridCacheVersion ver) { - return next(ver.topologyVersion(), false, true); + return next(ver.topologyVersion(), false, true, dataCenterId); } /** @@ -225,7 +233,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { * @return Next version based on given cache version. */ public GridCacheVersion next(GridCacheVersion ver) { - return next(ver.topologyVersion(), false, false); + return next(ver.topologyVersion(), false, false, dataCenterId); } /** @@ -237,9 +245,10 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { * @param topVer Topology version for which new version should be obtained. * @param addTime If {@code true} then adds to the given topology version number of seconds * from the start time of the first grid node. + * @param dataCenterId Data center id. * @return New lock order. */ - private GridCacheVersion next(long topVer, boolean addTime, boolean forLoad) { + private GridCacheVersion next(long topVer, boolean addTime, boolean forLoad, byte dataCenterId) { if (topVer == -1) topVer = cctx.kernalContext().discovery().topologyVersion(); http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java index 99d1a42..8bf877a 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java @@ -1026,6 +1026,9 @@ public abstract class GridAbstractTest extends TestCase { protected IgniteConfiguration loadConfiguration(String springCfgPath) throws IgniteCheckedException { URL cfgLocation = U.resolveIgniteUrl(springCfgPath); + if (cfgLocation == null) + cfgLocation = U.resolveIgniteUrl(springCfgPath, false); + assert cfgLocation != null; ApplicationContext springCtx; http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/parent/pom.xml ---------------------------------------------------------------------- diff --git a/parent/pom.xml b/parent/pom.xml index 21d8c69..c0f49c8 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -214,13 +214,6 @@ <version>4.11</version> <scope>test</scope> </dependency> - - <dependency> - <groupId>org.apache.ignite</groupId> - <artifactId>ignite-apache-license-gen</artifactId> - <version>${project.version}</version> - <scope>test</scope><!-- hack to have ignite-apache-license-gen at first place at mvn reactor --> - </dependency> </dependencies> <build> @@ -715,48 +708,6 @@ </execution> </executions> </plugin> - - <plugin><!-- generates dependencies licenses --> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-remote-resources-plugin</artifactId> - <executions> - <execution> - <id>ignite-dependencies</id> - <goals> - <goal>process</goal> - </goals> - <configuration> - <resourceBundles> - <resourceBundle>org.apache.ignite:ignite-apache-license-gen:${project.version}</resourceBundle> - </resourceBundles> - <excludeTransitive>true</excludeTransitive> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-antrun-plugin</artifactId> - <version>1.7</version> - <executions> - <execution> - <id>licenses-file-rename</id> - <goals> - <goal>run</goal> - </goals> - <phase>compile</phase> - <configuration> - <target> - <!-- moving licenses generated by "ignite-dependencies" --> - <move file="${basedir}/target/classes/META-INF/licenses.txt" tofile="${basedir}/target/licenses/${project.artifactId}-licenses.txt"/> - </target> - <failOnError>false</failOnError> - </configuration> - </execution> - </executions> - </plugin> - </plugins> </build> @@ -998,5 +949,67 @@ </dependency> </dependencies> </profile> + + <profile> + <id>release</id> + <activation> + <activeByDefault>true</activeByDefault> + </activation> + + <dependencies> + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-apache-license-gen</artifactId> + <version>${project.version}</version> + <scope>test</scope><!-- hack to have ignite-apache-license-gen at first place at mvn reactor --> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin><!-- generates dependencies licenses --> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-remote-resources-plugin</artifactId> + <executions> + <execution> + <id>ignite-dependencies</id> + <goals> + <goal>process</goal> + </goals> + <configuration> + <resourceBundles> + <resourceBundle>org.apache.ignite:ignite-apache-license-gen:${project.version} + </resourceBundle> + </resourceBundles> + <excludeTransitive>true</excludeTransitive> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-antrun-plugin</artifactId> + <version>1.7</version> + <executions> + <execution> + <id>licenses-file-rename</id> + <goals> + <goal>run</goal> + </goals> + <phase>compile</phase> + <configuration> + <target> + <!-- moving licenses generated by "ignite-dependencies" --> + <move file="${basedir}/target/classes/META-INF/licenses.txt" tofile="${basedir}/target/licenses/${project.artifactId}-licenses.txt"/> + </target> + <failOnError>false</failOnError> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> </profiles> </project> http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 4f797e8..bead3ae 100644 --- a/pom.xml +++ b/pom.xml @@ -918,22 +918,6 @@ </execution> </executions> </plugin> - - <plugin><!-- skipping generates dependencies licenses --> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-remote-resources-plugin</artifactId> - <executions> - <execution> - <id>ignite-dependencies</id> - <goals> - <goal>process</goal> - </goals> - <configuration> - <skip>true</skip> - </configuration> - </execution> - </executions> - </plugin> </plugins> </build> </project>
