Repository: ignite Updated Branches: refs/heads/ignite-1607 2322eb3f1 -> 9f77d2723
ignite-1607 WIP Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9f77d272 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9f77d272 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9f77d272 Branch: refs/heads/ignite-1607 Commit: 9f77d2723ae5d097f920b8e12be43bf6e83f4115 Parents: 2322eb3 Author: sboikov <[email protected]> Authored: Thu Oct 22 16:03:02 2015 +0300 Committer: sboikov <[email protected]> Committed: Thu Oct 22 16:03:02 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 19 ++-- .../processors/cache/GridCacheEntryEx.java | 4 +- .../processors/cache/GridCacheMapEntry.java | 36 ++++---- .../distributed/dht/GridDhtTxLocalAdapter.java | 7 -- .../distributed/dht/GridDhtTxPrepareFuture.java | 38 ++------ .../distributed/near/GridNearCacheEntry.java | 21 ----- ...arOptimisticSerializableTxPrepareFuture.java | 53 ++--------- .../near/GridNearTransactionalCache.java | 2 +- .../cache/distributed/near/GridNearTxLocal.java | 2 +- .../transactions/IgniteTxLocalAdapter.java | 59 +++++++------ .../datastreamer/DataStreamProcessor.java | 3 +- .../datastreamer/DataStreamerImpl.java | 37 ++++++-- .../datastreamer/DataStreamerRequest.java | 34 +++++++- .../datastreamer/DataStreamerUpdateJob.java | 33 ++++--- .../IgniteTxOptimisticCheckedException.java | 27 ------ .../CacheSerializableTransactionsTest.java | 92 ++++++++++++++++++++ .../processors/cache/GridCacheTestEntryEx.java | 8 +- 17 files changed, 261 insertions(+), 214 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/9f77d272/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 400d76e..74951b5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -1680,9 +1680,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V ctx.closures().callLocalSafe(ctx.projectSafe(new GPC<Map<K1, V1>>() { @Override public Map<K1, V1> call() throws Exception { ctx.store().loadAll(null/*tx*/, loadKeys.keySet(), new CI2<KeyCacheObject, Object>() { - /** New version for all new entries. */ - private GridCacheVersion nextVer; - @Override public void apply(KeyCacheObject key, Object val) { GridCacheVersion ver = loadKeys.get(key); @@ -1694,10 +1691,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return; } - // Initialize next version. - if (nextVer == null) - nextVer = ctx.versions().next(); - loaded.add(key); CacheObject cacheVal = ctx.toCacheObject(val); @@ -1706,11 +1699,15 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V GridCacheEntryEx entry = entryEx(key); try { - boolean set = entry.versionedValue(cacheVal, ver, nextVer); + GridCacheVersion verSet = entry.versionedValue(cacheVal, ver, null); + + boolean set = verSet != null; if (log.isDebugEnabled()) - log.debug("Set value loaded from store into entry [set=" + set + - ", curVer=" + ver + ", newVer=" + nextVer + ", " + + log.debug("Set value loaded from store into entry [" + + "set=" + set + + ", curVer=" + ver + + ", newVer=" + verSet + ", " + "entry=" + entry + ']'); // Don't put key-value pair into result map if value is null. @@ -1718,7 +1715,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (needVer) { assert keepCacheObjects; - map.put((K1)key, (V1)new T2<>(cacheVal, set ? nextVer : ver)); + map.put((K1)key, (V1)new T2<>(cacheVal, set ? verSet : ver)); } else { ctx.addResult(map, http://git-wip-us.apache.org/repos/asf/ignite/blob/9f77d272/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index 0b5a859..3b636b4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -696,11 +696,11 @@ public interface GridCacheEntryEx { * @param val New value. * @param curVer Version to match or {@code null} if match is not required. * @param newVer Version to set. - * @return {@code True} if versioned matched. + * @return Non null version if value was set. * @throws IgniteCheckedException If index could not be updated. * @throws GridCacheEntryRemovedException If entry was removed. */ - public boolean versionedValue(CacheObject val, + public GridCacheVersion versionedValue(CacheObject val, @Nullable GridCacheVersion curVer, @Nullable GridCacheVersion newVer) throws IgniteCheckedException, GridCacheEntryRemovedException; http://git-wip-us.apache.org/repos/asf/ignite/blob/9f77d272/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 5608b4c..621ed99 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -3208,11 +3208,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } /** {@inheritDoc} */ - @Override public synchronized boolean versionedValue(CacheObject val, + @Override public synchronized GridCacheVersion versionedValue(CacheObject val, GridCacheVersion curVer, GridCacheVersion newVer) throws IgniteCheckedException, GridCacheEntryRemovedException { - assert newVer != null; checkObsolete(); @@ -3220,33 +3219,34 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme GridCacheMvcc mvcc = mvccExtras(); if (mvcc != null && !mvcc.isEmpty()) - return false; + return null; - if (val != this.val) { - CacheObject old = rawGetOrUnmarshalUnlocked(false); + if (newVer == null) + newVer = cctx.versions().next(); - long ttl = ttlExtras(); + CacheObject old = rawGetOrUnmarshalUnlocked(false); - long expTime = CU.toExpireTime(ttl); + long ttl = ttlExtras(); - // Detach value before index update. - val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx); + long expTime = CU.toExpireTime(ttl); - if (val != null) { - updateIndex(val, expTime, newVer, old); + // Detach value before index update. + val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx); - if (deletedUnlocked()) - deletedUnlocked(false); - } + if (val != null) { + updateIndex(val, expTime, newVer, old); - // Version does not change for load ops. - update(val, expTime, ttl, newVer); + if (deletedUnlocked()) + deletedUnlocked(false); } - return true; + // Version does not change for load ops. + update(val, expTime, ttl, newVer); + + return newVer; } - return false; + return null; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/9f77d272/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index c4fda29..8c7d985 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -189,13 +189,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { } /** - * @return {@code True} if originating node has a near cache that participates in this transaction. - */ - public boolean nearOnOriginatingNode() { - return nearOnOriginatingNode; - } - - /** * @return {@code True} if explicit lock transaction. */ public boolean explicitLock() { http://git-wip-us.apache.org/repos/asf/ignite/blob/9f77d272/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 7d281d5..6f8ac2e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -689,14 +689,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter tx.implicitSingleResult(ret); } - else if (prepErr instanceof IgniteTxOptimisticCheckedException) { - IgniteTxOptimisticCheckedException err = (IgniteTxOptimisticCheckedException)prepErr; - - if (err.values() != null) { - for (Map.Entry<IgniteTxKey, CacheVersionedValue> e : err.values().entrySet()) - res.addOwnedValue(e.getKey(), e.getValue()); - } - } res.filterFailedKeys(filterFailedKeys); @@ -943,7 +935,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter entry.cached().unswap(); if (!entry.cached().checkSerializableReadVersion(serReadVer)) - return versionCheckError(entry, null); + return versionCheckError(entry); } } } @@ -991,16 +983,14 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter /** * @param entry Entry. - * @param vals Values. * @return Optimistic version check error. */ - private IgniteTxOptimisticCheckedException versionCheckError(IgniteTxEntry entry, - Map<IgniteTxKey, CacheVersionedValue> vals) { + private IgniteTxOptimisticCheckedException versionCheckError(IgniteTxEntry entry) { GridCacheContext cctx = entry.context(); return new IgniteTxOptimisticCheckedException("Failed to prepare transaction, " + "read/write conflict [key=" + entry.key().value(cctx.cacheObjectContext(), false) + - ", cache=" + cctx.name() + ']', vals); + ", cache=" + cctx.name() + ']'); } /** @@ -1012,26 +1002,10 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter IgniteCheckedException err0 = null; try { - if (tx.nearOnOriginatingNode()) { - Map<IgniteTxKey, CacheVersionedValue> vals; - - vals = checkReadConflict(writes, null); - vals = checkReadConflict(reads, vals); - - if (vals != null) { - IgniteTxEntry entry = tx.entry(F.firstKey(vals)); + err0 = checkReadConflict(writes); - assert entry != null; - - err0 = versionCheckError(entry, vals); - } - } - else { - err0 = checkReadConflict(writes); - - if (err0 == null) - err0 = checkReadConflict(reads); - } + if (err0 == null) + err0 = checkReadConflict(reads); } catch (IgniteCheckedException e) { U.error(log, "Failed to check entry version: " + e, e); http://git-wip-us.apache.org/repos/asf/ignite/blob/9f77d272/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java index 0ad236c..d558cc5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java @@ -353,27 +353,6 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { } /** - * @param primaryNodeId Primary node ID. - * @param topVer Topology version. - * @param val Value. - * @param dhtVer DHT version received from remote node. - * @throws GridCacheEntryRemovedException If entry was removed. - */ - synchronized public void loadedValue(UUID primaryNodeId, - AffinityTopologyVersion topVer, - CacheObject val, - GridCacheVersion dhtVer) - throws GridCacheEntryRemovedException { - checkObsolete(); - - if (recordDhtVersion(dhtVer)) { - primaryNode(primaryNodeId, topVer); - - value(val); - } - } - - /** * @param tx Transaction. * @param primaryNodeId Primary node ID. * @param val New value. http://git-wip-us.apache.org/repos/asf/ignite/blob/9f77d272/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java index 48e44e6..eb88149 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java @@ -187,9 +187,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim /** * @param m Failed mapping. * @param e Error. - * @param res Response. */ - private void onError(@Nullable GridDistributedTxMapping m, Throwable e, GridNearTxPrepareResponse res) { + private void onError(@Nullable GridDistributedTxMapping m, Throwable e) { if (X.hasCause(e, ClusterTopologyCheckedException.class) || X.hasCause(e, ClusterTopologyException.class)) { if (tx.onePhaseCommit()) { tx.markForBackupCheck(); @@ -201,44 +200,6 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim } if (e instanceof IgniteTxOptimisticCheckedException) { - if (res != null) { - assert m != null; - - Map<IgniteTxKey, CacheVersionedValue> ownVals = res.ownedValues(); - - if (ownVals != null) { - for (Map.Entry<IgniteTxKey, CacheVersionedValue> val : ownVals.entrySet()) { - IgniteTxEntry txEntry = tx.entry(val.getKey()); - - assert txEntry != null : val.getKey(); - - GridCacheContext cctx = txEntry.context(); - - if (cctx.isNear()) { - GridNearCacheEntry entry = (GridNearCacheEntry)txEntry.cached(); - - assert entry != null; - - while (true) { - try { - CacheVersionedValue verVal = val.getValue(); - - entry.loadedValue(m.node().id(), - tx.topologyVersion(), - verVal.value(), - verVal.version()); - - break; - } - catch (GridCacheEntryRemovedException rmvErr) { - txEntry.cached(entry.context().cache().entryEx(entry.key())); - } - } - } - } - } - } - if (m != null) tx.removeMapping(m.node().id()); } @@ -324,14 +285,14 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim if (tx.setRollbackOnly()) { if (tx.timedOut()) onError(null, new IgniteTxTimeoutCheckedException("Transaction timed out and " + - "was rolled back: " + this), null); + "was rolled back: " + this)); else onError(null, new IgniteCheckedException("Invalid transaction state for prepare " + - "[state=" + tx.state() + ", tx=" + this + ']'), null); + "[state=" + tx.state() + ", tx=" + this + ']')); } else onError(null, new IgniteTxRollbackCheckedException("Invalid transaction state for " + - "prepare [state=" + tx.state() + ", tx=" + this + ']'), null); + "prepare [state=" + tx.state() + ", tx=" + this + ']')); return; } @@ -745,7 +706,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim */ void onResult(Throwable e) { if (rcvRes.compareAndSet(false, true)) { - onError(m, e, null); + onError(m, e); if (log.isDebugEnabled()) log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']'); @@ -769,7 +730,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim if (log.isDebugEnabled()) log.debug("Remote node left grid while sending or waiting for reply (will not retry): " + this); - onError(null, e, null); + onError(null, e); onDone(e); } @@ -786,7 +747,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim if (rcvRes.compareAndSet(false, true)) { if (res.error() != null) { // Fail the whole compound future. - onError(m, res.error(), res); + onError(m, res.error()); onDone(res.error()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/9f77d272/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java index 00b78b3..13743d5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java @@ -186,7 +186,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> GridNearGetFuture<K, V> fut = new GridNearGetFuture<>(ctx, keys, readThrough, - false, + /*force primary*/needVer, tx, CU.subjectId(tx, ctx.shared()), tx.resolveTaskName(), http://git-wip-us.apache.org/repos/asf/ignite/blob/9f77d272/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index e856b24..67fd81e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -380,7 +380,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { return cacheCtx.colocated().loadAsync( keys, readThrough, - /*force primary*/false, + /*force primary*/needVer, topologyVersion(), CU.subjectId(this, cctx), resolveTaskName(), http://git-wip-us.apache.org/repos/asf/ignite/blob/9f77d272/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 6dee304..4d3ad72 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -486,31 +486,28 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter final Map<KeyCacheObject, GridCacheVersion> misses0 = misses; cacheCtx.store().loadAll(this, misses.keySet(), new CI2<KeyCacheObject, Object>() { - private GridCacheVersion nextVer; - @Override public void apply(KeyCacheObject key, Object val) { GridCacheVersion ver = misses0.remove(key); assert ver != null : key; if (val != null) { - if (nextVer == null) - nextVer = cacheCtx.versions().next(); - CacheObject cacheVal = cacheCtx.toCacheObject(val); while (true) { GridCacheEntryEx entry = cacheCtx.cache().entryEx(key); try { - boolean set = entry.versionedValue(cacheVal, ver, nextVer); + GridCacheVersion setVer = entry.versionedValue(cacheVal, ver, null); + + boolean set = setVer != null; if (set) - ver = nextVer; + ver = setVer; if (log.isDebugEnabled()) log.debug("Set value loaded from store into entry [set=" + set + - ", curVer=" + ver + ", newVer=" + nextVer + ", " + + ", curVer=" + ver + ", newVer=" + setVer + ", " + "entry=" + entry + ']'); break; @@ -1316,6 +1313,14 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } /** + * @param entry Entry. + * @return {@code True} if local node is current primary for given entry. + */ + private boolean primaryLocal(GridCacheEntryEx entry) { + return entry.context().affinity().primary(cctx.localNode(), entry.partition(), AffinityTopologyVersion.NONE); + } + + /** * Checks if there is a cached or swapped value for * {@link #getAllAsync(GridCacheContext, Collection, boolean, boolean, boolean, boolean)} method. * @@ -1452,15 +1457,16 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter optimistic() ? accessPolicy(cacheCtx, txKey, expiryPlc) : null; if (needReadVer) { - T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(this, - /*swap*/true, - /*unmarshal*/true, - /*metrics*/true, - /*event*/true, - CU.subjectId(this, cctx), - null, - resolveTaskName(), - accessPlc); + T2<CacheObject, GridCacheVersion> res = primaryLocal(entry) ? + entry.innerGetVersioned(this, + /*swap*/true, + /*unmarshal*/true, + /*metrics*/true, + /*event*/true, + CU.subjectId(this, cctx), + null, + resolveTaskName(), + accessPlc) : null; if (res != null) { val = res.get1(); @@ -2129,15 +2135,16 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter if (optimistic() && !implicit()) { try { if (needReadVer) { - T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(this, - /*swap*/false, - /*unmarshal*/retval, - /*metrics*/retval, - /*events*/retval, - CU.subjectId(this, cctx), - entryProcessor, - resolveTaskName(), - null); + T2<CacheObject, GridCacheVersion> res = primaryLocal(entry) ? + entry.innerGetVersioned(this, + /*swap*/false, + /*unmarshal*/retval, + /*metrics*/retval, + /*events*/retval, + CU.subjectId(this, cctx), + entryProcessor, + resolveTaskName(), + null) : null; if (res != null) { old = res.get1(); http://git-wip-us.apache.org/repos/asf/ignite/blob/9f77d272/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java index 5150d83..20a013b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java @@ -289,7 +289,8 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter { col, req.ignoreDeploymentOwnership(), req.skipStore(), - updater); + updater, + req.version()); Exception err = null; http://git-wip-us.apache.org/repos/asf/ignite/blob/9f77d272/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index ab2a6e8..bf9dc78 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -213,6 +213,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed /** */ private int maxRemapCnt = DFLT_MAX_REMAP_CNT; + /** */ + private GridCacheVersion ver; + /** Whether a warning at {@link DataStreamerImpl#allowOverwrite()} printed */ private static boolean isWarningPrinted; @@ -1242,8 +1245,16 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed IgniteInternalFuture<Object> fut; if (isLocNode) { - fut = ctx.closure().callLocalSafe( - new DataStreamerUpdateJob(ctx, log, cacheName, entries, false, skipStore, rcvr), false); + DataStreamerUpdateJob job = new DataStreamerUpdateJob(ctx, + log, + cacheName, + entries, + false, + skipStore, + rcvr, + ver); + + fut = ctx.closure().callLocalSafe(job, false); locFuts.add(fut); @@ -1277,6 +1288,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed assert rcvr != null; updaterBytes = ctx.config().getMarshaller().marshal(rcvr); + + if (rcvr == ISOLATED_UPDATER) + ver = ctx.cache().context().versions().next(); } if (topicBytes == null) @@ -1337,7 +1351,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed dep != null ? dep.participants() : null, dep != null ? dep.classLoaderId() : null, dep == null, - topVer); + topVer, + ver); try { ctx.io().send(node, TOPIC_DATASTREAM, req, PUBLIC_POOL); @@ -1537,7 +1552,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed /** * Isolated receiver which only loads entry initial value. */ - private static class IsolatedUpdater implements StreamReceiver<KeyCacheObject, CacheObject>, + static class IsolatedUpdater implements StreamReceiver<KeyCacheObject, CacheObject>, DataStreamerCacheUpdaters.InternalUpdater { /** */ private static final long serialVersionUID = 0L; @@ -1545,6 +1560,17 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed /** {@inheritDoc} */ @Override public void receive(IgniteCache<KeyCacheObject, CacheObject> cache, Collection<Map.Entry<KeyCacheObject, CacheObject>> entries) { + receive(cache, entries, null); + } + + /** + * @param cache Cache. + * @param entries Entries. + * @param ver Entries version. + */ + void receive(IgniteCache<KeyCacheObject, CacheObject> cache, + Collection<Map.Entry<KeyCacheObject, CacheObject>> entries, + GridCacheVersion ver) { IgniteCacheProxy<KeyCacheObject, CacheObject> proxy = (IgniteCacheProxy<KeyCacheObject, CacheObject>)cache; GridCacheAdapter<KeyCacheObject, CacheObject> internalCache = proxy.context().cache(); @@ -1556,7 +1582,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion(); - GridCacheVersion ver = cctx.versions().next(topVer); + if (ver == null) + ver = cctx.versions().next(topVer); long ttl = CU.TTL_ETERNAL; long expiryTime = CU.EXPIRE_TIME_ETERNAL; http://git-wip-us.apache.org/repos/asf/ignite/blob/9f77d272/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java index c1a1528..59810ee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java @@ -25,6 +25,7 @@ import org.apache.ignite.configuration.DeploymentMode; import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; @@ -87,6 +88,9 @@ public class DataStreamerRequest implements Message { /** Topology version. */ private AffinityTopologyVersion topVer; + /** */ + private GridCacheVersion ver; + /** * {@code Externalizable} support. */ @@ -109,6 +113,7 @@ public class DataStreamerRequest implements Message { * @param clsLdrId Class loader ID. * @param forceLocDep Force local deployment. * @param topVer Topology version. + * @param ver Entries version for {@link DataStreamerImpl.IsolatedUpdater}. */ public DataStreamerRequest(long reqId, byte[] resTopicBytes, @@ -123,7 +128,8 @@ public class DataStreamerRequest implements Message { Map<UUID, IgniteUuid> ldrParticipants, IgniteUuid clsLdrId, boolean forceLocDep, - @NotNull AffinityTopologyVersion topVer) { + @NotNull AffinityTopologyVersion topVer, + @Nullable GridCacheVersion ver) { assert topVer != null; this.reqId = reqId; @@ -140,6 +146,14 @@ public class DataStreamerRequest implements Message { this.clsLdrId = clsLdrId; this.forceLocDep = forceLocDep; this.topVer = topVer; + this.ver = ver; + } + + /** + * @return Version. + */ + @Nullable public GridCacheVersion version() { + return ver; } /** @@ -341,6 +355,12 @@ public class DataStreamerRequest implements Message { writer.incrementState(); + case 14: + if (!writer.writeMessage("ver", ver)) + return false; + + writer.incrementState(); + } return true; @@ -470,6 +490,14 @@ public class DataStreamerRequest implements Message { reader.incrementState(); + case 14: + ver = reader.readMessage("ver"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } return reader.afterMessageRead(DataStreamerRequest.class); @@ -482,6 +510,6 @@ public class DataStreamerRequest implements Message { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 14; + return 15; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9f77d272/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java index 42084a3..12eee88 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java @@ -24,6 +24,7 @@ import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.lang.GridPlainCallable; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.F; @@ -56,6 +57,9 @@ class DataStreamerUpdateJob implements GridPlainCallable<Object> { /** */ private final StreamReceiver rcvr; + /** */ + private final GridCacheVersion ver; + /** * @param ctx Context. * @param log Log. @@ -64,6 +68,7 @@ class DataStreamerUpdateJob implements GridPlainCallable<Object> { * @param ignoreDepOwnership {@code True} to ignore deployment ownership. * @param skipStore Skip store flag. * @param rcvr Updater. + * @param ver Entries version for {@link DataStreamerImpl.IsolatedUpdater}. */ DataStreamerUpdateJob( GridKernalContext ctx, @@ -72,7 +77,8 @@ class DataStreamerUpdateJob implements GridPlainCallable<Object> { Collection<DataStreamerEntry> col, boolean ignoreDepOwnership, boolean skipStore, - StreamReceiver<?, ?> rcvr) { + StreamReceiver<?, ?> rcvr, + @Nullable GridCacheVersion ver) { this.ctx = ctx; this.log = log; @@ -84,6 +90,7 @@ class DataStreamerUpdateJob implements GridPlainCallable<Object> { this.ignoreDepOwnership = ignoreDepOwnership; this.skipStore = skipStore; this.rcvr = rcvr; + this.ver = ver; } /** {@inheritDoc} */ @@ -119,17 +126,21 @@ class DataStreamerUpdateJob implements GridPlainCallable<Object> { checkSecurityPermission(SecurityPermission.CACHE_REMOVE); } - if (unwrapEntries()) { - Collection<Map.Entry> col0 = F.viewReadOnly(col, new C1<DataStreamerEntry, Map.Entry>() { - @Override public Map.Entry apply(DataStreamerEntry e) { - return e.toEntry(cctx); - } - }); - - rcvr.receive(cache, col0); + if (rcvr instanceof DataStreamerImpl.IsolatedUpdater) + ((DataStreamerImpl.IsolatedUpdater)rcvr).receive(cache, (Collection)col, ver); + else { + if (unwrapEntries()) { + Collection<Map.Entry> col0 = F.viewReadOnly(col, new C1<DataStreamerEntry, Map.Entry>() { + @Override public Map.Entry apply(DataStreamerEntry e) { + return e.toEntry(cctx); + } + }); + + rcvr.receive(cache, col0); + } + else + rcvr.receive(cache, col); } - else - rcvr.receive(cache, col); return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/9f77d272/modules/core/src/main/java/org/apache/ignite/internal/transactions/IgniteTxOptimisticCheckedException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/transactions/IgniteTxOptimisticCheckedException.java b/modules/core/src/main/java/org/apache/ignite/internal/transactions/IgniteTxOptimisticCheckedException.java index 3324779..b2b0e2c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/transactions/IgniteTxOptimisticCheckedException.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/transactions/IgniteTxOptimisticCheckedException.java @@ -17,11 +17,7 @@ package org.apache.ignite.internal.transactions; -import java.util.Map; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue; -import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; -import org.apache.ignite.internal.util.tostring.GridToStringExclude; /** * Exception thrown whenever grid transactions fail optimistically. @@ -30,10 +26,6 @@ public class IgniteTxOptimisticCheckedException extends IgniteCheckedException { /** */ private static final long serialVersionUID = 0L; - /** */ - @GridToStringExclude - private transient Map<IgniteTxKey, CacheVersionedValue> vals; - /** * Creates new optimistic exception with given error message. * @@ -44,25 +36,6 @@ public class IgniteTxOptimisticCheckedException extends IgniteCheckedException { } /** - * Creates new optimistic exception with given error message. - * - * @param msg Error message. - * @param vals Current values. - */ - public IgniteTxOptimisticCheckedException(String msg, Map<IgniteTxKey, CacheVersionedValue> vals) { - super(msg); - - this.vals = vals; - } - - /** - * @return Current values. - */ - public Map<IgniteTxKey, CacheVersionedValue> values() { - return vals; - } - - /** * Creates new optimistic exception with given error message and optional nested exception. * * @param msg Error message. http://git-wip-us.apache.org/repos/asf/ignite/blob/9f77d272/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java index 510f3e7..70ddfa0 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java @@ -42,6 +42,7 @@ import javax.cache.processor.EntryProcessorResult; import javax.cache.processor.MutableEntry; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteTransactions; import org.apache.ignite.cache.CacheMode; @@ -149,6 +150,97 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testTxStreamerLoad() throws Exception { + txStreamerLoad(false); + } + + /** + * @throws Exception If failed. + */ + public void testTxStreamerLoadAllowOverwrite() throws Exception { + txStreamerLoad(true); + } + + /** + * @param allowOverwrite Streamer flag. + * @throws Exception If failed. + */ + private void txStreamerLoad(boolean allowOverwrite) throws Exception { + Ignite ignite0 = ignite(0); + + for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) { + if (ccfg.getCacheStoreFactory() == null) + continue; + + logCacheInfo(ccfg); + + try { + IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg); + + List<Integer> keys = testKeys(cache); + + for (Integer key : keys) + txStreamerLoad(ignite0, key, cache.getName(), allowOverwrite); + + txStreamerLoad(ignite(SRVS), 10_000, cache.getName(), allowOverwrite); + } + finally { + destroyCache(ignite0, ccfg.getName()); + } + } + } + + /** + * @param ignite Node. + * @param key Key. + * @param cacheName Cache name. + * @param allowOverwrite Streamer flag. + * @throws Exception If failed. + */ + private void txStreamerLoad(Ignite ignite, + Integer key, + String cacheName, + boolean allowOverwrite) throws Exception { + IgniteCache<Integer, Integer> cache = ignite.cache(cacheName); + + log.info("Test key: " + key); + + Integer loadVal = -1; + + IgniteTransactions txs = ignite.transactions(); + + try (IgniteDataStreamer<Integer, Integer> streamer = ignite.dataStreamer(cache.getName())) { + streamer.allowOverwrite(allowOverwrite); + + streamer.addData(key, loadVal); + } + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Integer val = cache.get(key); + + assertEquals(loadVal, val); + + tx.commit(); + } + + checkValue(key, loadVal, cache.getName()); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Integer val = cache.get(key); + + assertEquals(loadVal, val); + + cache.put(key, 0); + + tx.commit(); + } + + checkValue(key, 0, cache.getName()); + } + + /** + * @throws Exception If failed. + */ public void testTxLoadFromStore() throws Exception { Ignite ignite0 = ignite(0); http://git-wip-us.apache.org/repos/asf/ignite/blob/9f77d272/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java index e074583..84cc572 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java @@ -670,8 +670,12 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr } /** @inheritDoc */ - @Override public boolean versionedValue(CacheObject val, GridCacheVersion curVer, GridCacheVersion newVer) { - assert false; return false; + @Override public GridCacheVersion versionedValue(CacheObject val, + GridCacheVersion curVer, + GridCacheVersion newVer) { + assert false; + + return null; } /** @inheritDoc */
