ignite-1607 WIP
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4da8ac1f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4da8ac1f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4da8ac1f Branch: refs/heads/ignite-1607 Commit: 4da8ac1fab94d9aeea9428f82401acebc5167120 Parents: 11e6449 Author: sboikov <[email protected]> Authored: Tue Oct 20 17:43:48 2015 +0300 Committer: sboikov <[email protected]> Committed: Tue Oct 20 17:43:48 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheMapEntry.java | 53 ++++++--- .../distributed/dht/GridDhtTxLocalAdapter.java | 7 ++ .../distributed/dht/GridDhtTxPrepareFuture.java | 107 ++++++++++++++++--- .../distributed/near/CacheVersionedValue.java | 2 +- .../distributed/near/GridNearCacheEntry.java | 79 +++++++++++--- .../distributed/near/GridNearGetFuture.java | 75 +++++++++---- ...arOptimisticSerializableTxPrepareFuture.java | 62 +++++++++-- .../near/GridNearTxPrepareFutureAdapter.java | 1 + .../near/GridNearTxPrepareResponse.java | 13 +++ .../transactions/IgniteTxLocalAdapter.java | 12 ++- .../IgniteTxOptimisticCheckedException.java | 27 +++++ .../CacheSerializableTransactionsTest.java | 83 ++++++++------ .../testsuites/IgniteCacheTestSuite4.java | 3 + 13 files changed, 416 insertions(+), 108 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4da8ac1f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 9271923..ce841f2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1079,6 +1079,14 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme synchronized (this) { checkObsolete(); + if (isNear()) { + assert dhtVer != null; + + // It is possible that 'get' could load more recent value. + if (!((GridNearCacheEntry)this).recordDhtVersion(dhtVer)) + return new GridCacheUpdateTxResult(false, null); + } + assert tx == null || (!tx.local() && tx.onePhaseCommit()) || tx.ownsLock(this) : "Transaction does not own lock for update [entry=" + this + ", tx=" + tx + ']'; @@ -1147,12 +1155,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme update(val, expireTime, ttl, newVer); - if (isNear()) { - assert dhtVer != null; - - ((GridNearCacheEntry)this).recordDhtVersion(dhtVer); - } - drReplicate(drType, val, newVer); recordNodeId(affNodeId, topVer); @@ -1245,6 +1247,14 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme synchronized (this) { checkObsolete(); + if (isNear()) { + assert dhtVer != null; + + // It is possible that 'get' could load more recent value. + if (!((GridNearCacheEntry)this).recordDhtVersion(dhtVer)) + return new GridCacheUpdateTxResult(false, null); + } + assert tx == null || (!tx.local() && tx.onePhaseCommit()) || tx.ownsLock(this) : "Transaction does not own lock for remove[entry=" + this + ", tx=" + tx + ']'; @@ -1282,12 +1292,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme update(null, 0, 0, newVer); - if (isNear()) { - assert dhtVer != null; - - ((GridNearCacheEntry)this).recordDhtVersion(dhtVer); - } - if (cctx.offheapTiered() && hadValPtr) { boolean rmv = cctx.swap().removeOffheap(key); @@ -2606,6 +2610,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } /** + * @return {@code True} if this entry should not be evicted from cache. + */ + protected boolean evictionDisabled() { + return false; + } + + /** * <p> * Note that {@link #onMarkedObsolete()} should always be called after this method * returns {@code true}. @@ -2617,6 +2628,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme protected final boolean markObsolete0(GridCacheVersion ver, boolean clear) { assert Thread.holdsLock(this); + if (evictionDisabled()) { + assert !obsolete() : this; + + return false; + } + GridCacheVersion obsoleteVer = obsoleteVersionExtras(); if (ver != null) { @@ -3758,6 +3775,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme try { if (F.isEmptyOrNulls(filter)) { synchronized (this) { + if (evictionDisabled()) { + assert !obsolete(); + + return false; + } + if (obsoleteVersionExtras() != null) return true; @@ -3802,6 +3825,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme return false; synchronized (this) { + if (evictionDisabled()) { + assert !obsolete(); + + return false; + } + if (obsoleteVersionExtras() != null) return true; http://git-wip-us.apache.org/repos/asf/ignite/blob/4da8ac1f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index 8c7d985..c4fda29 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -189,6 +189,13 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { } /** + * @return {@code True} if originating node has a near cache that participates in this transaction. + */ + public boolean nearOnOriginatingNode() { + return nearOnOriginatingNode; + } + + /** * @return {@code True} if explicit lock transaction. */ public boolean explicitLock() { http://git-wip-us.apache.org/repos/asf/ignite/blob/4da8ac1f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 1e1a6b3..7d281d5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -42,7 +42,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; -import org.apache.ignite.internal.processors.cache.GridCacheFilterFailedException; import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture; import org.apache.ignite.internal.processors.cache.GridCacheOperation; @@ -51,6 +50,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; +import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; @@ -689,6 +689,14 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter tx.implicitSingleResult(ret); } + else if (prepErr instanceof IgniteTxOptimisticCheckedException) { + IgniteTxOptimisticCheckedException err = (IgniteTxOptimisticCheckedException)prepErr; + + if (err.values() != null) { + for (Map.Entry<IgniteTxKey, CacheVersionedValue> e : err.values().entrySet()) + res.addOwnedValue(e.getKey(), e.getValue()); + } + } res.filterFailedKeys(filterFailedKeys); @@ -923,8 +931,37 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter /** * @param entries Entries. * @return Not null exception if version check failed. + * @throws IgniteCheckedException If failed. + */ + @Nullable private IgniteCheckedException checkReadConflict(Collection<IgniteTxEntry> entries) + throws IgniteCheckedException { + try { + for (IgniteTxEntry entry : entries) { + GridCacheVersion serReadVer = entry.serializableReadVersion(); + + if (serReadVer != null) { + entry.cached().unswap(); + + if (!entry.cached().checkSerializableReadVersion(serReadVer)) + return versionCheckError(entry, null); + } + } + } + catch (GridCacheEntryRemovedException e) { + assert false : "Got removed exception on entry with dht local candidate: " + entries; + } + + return null; + } + + /** + * @param entries Entries. + * @param vals Values map. + * @return Value map. + * @throws IgniteCheckedException If failed. */ - @Nullable private IgniteCheckedException checkReadConflict(Collection<IgniteTxEntry> entries) { + @Nullable private Map<IgniteTxKey, CacheVersionedValue> checkReadConflict(Collection<IgniteTxEntry> entries, + @Nullable Map<IgniteTxKey, CacheVersionedValue> vals) throws IgniteCheckedException { try { for (IgniteTxEntry entry : entries) { GridCacheVersion serReadVer = entry.serializableReadVersion(); @@ -933,25 +970,37 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter entry.cached().unswap(); if (!entry.cached().checkSerializableReadVersion(serReadVer)) { - GridCacheContext cctx = entry.context(); + if (vals == null) + vals = U.newHashMap(3); + + GridCacheEntryInfo info = entry.cached().info(); - return new IgniteTxOptimisticCheckedException("Failed to prepare transaction, " + - "read/write conflict [key=" + entry.key().value(cctx.cacheObjectContext(), false) + - ", cache=" + cctx.name() + ']'); + assert info != null : entry.cached(); + + vals.put(entry.txKey(), new CacheVersionedValue(info.value(), info.version())); } } } } - catch (IgniteCheckedException e) { - U.error(log, "Failed to unswap entry: " + e, e); - - return e; - } catch (GridCacheEntryRemovedException e) { assert false : "Got removed exception on entry with dht local candidate: " + entries; } - return null; + return vals; + } + + /** + * @param entry Entry. + * @param vals Values. + * @return Optimistic version check error. + */ + private IgniteTxOptimisticCheckedException versionCheckError(IgniteTxEntry entry, + Map<IgniteTxKey, CacheVersionedValue> vals) { + GridCacheContext cctx = entry.context(); + + return new IgniteTxOptimisticCheckedException("Failed to prepare transaction, " + + "read/write conflict [key=" + entry.key().value(cctx.cacheObjectContext(), false) + + ", cache=" + cctx.name() + ']', vals); } /** @@ -960,10 +1009,35 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter private void prepare0() { try { if (tx.optimistic() && tx.serializable()) { - IgniteCheckedException err0 = checkReadConflict(writes); + IgniteCheckedException err0 = null; + + try { + if (tx.nearOnOriginatingNode()) { + Map<IgniteTxKey, CacheVersionedValue> vals; - if (err0 == null) - err0 = checkReadConflict(reads); + vals = checkReadConflict(writes, null); + vals = checkReadConflict(reads, vals); + + if (vals != null) { + IgniteTxEntry entry = tx.entry(F.firstKey(vals)); + + assert entry != null; + + err0 = versionCheckError(entry, vals); + } + } + else { + err0 = checkReadConflict(writes); + + if (err0 == null) + err0 = checkReadConflict(reads); + } + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to check entry version: " + e, e); + + err0 = e; + } if (err0 != null) { err.compareAndSet(null, err0); @@ -974,7 +1048,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) { if (GridDhtTxPrepareFuture.super.onDone(res, res.error())) { try { - sendPrepareResponse(res); + if (replied.compareAndSet(false, true)) + sendPrepareResponse(res); } catch (IgniteCheckedException e) { U.error(log, "Failed to send prepare response for transaction: " + tx, e); http://git-wip-us.apache.org/repos/asf/ignite/blob/4da8ac1f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/CacheVersionedValue.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/CacheVersionedValue.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/CacheVersionedValue.java index 1559a91..c14621a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/CacheVersionedValue.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/CacheVersionedValue.java @@ -53,7 +53,7 @@ public class CacheVersionedValue implements Message { * @param val Cache value. * @param ver Cache version. */ - CacheVersionedValue(CacheObject val, GridCacheVersion ver) { + public CacheVersionedValue(CacheObject val, GridCacheVersion ver) { this.val = val; this.ver = ver; } http://git-wip-us.apache.org/repos/asf/ignite/blob/4da8ac1f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java index 036ee6d..7bfd979 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java @@ -46,7 +46,7 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ; @SuppressWarnings({"NonPrivateFieldAccessedInSynchronizedContext", "TooBroadScope"}) public class GridNearCacheEntry extends GridDistributedCacheEntry { /** */ - private static final int NEAR_SIZE_OVERHEAD = 36; + private static final int NEAR_SIZE_OVERHEAD = 36 + 16; /** Topology version at the moment when value was initialized from primary node. */ private volatile long topVer = -1L; @@ -58,6 +58,9 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { /** Partition. */ private int part; + /** */ + private short evictReservations; + /** * @param ctx Cache context. * @param key Cache key. @@ -316,13 +319,21 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { primaryNode(primaryNodeId, topVer); } - /** + /* * @param dhtVer DHT version to record. + * @return {@code False} if given version is lower then existing version. */ - public void recordDhtVersion(GridCacheVersion dhtVer) { + public final boolean recordDhtVersion(GridCacheVersion dhtVer) { + assert dhtVer != null; assert Thread.holdsLock(this); - this.dhtVer = dhtVer; + if (this.dhtVer == null || this.dhtVer.compareTo(dhtVer) <= 0) { + this.dhtVer = dhtVer; + + return true; + } + + return false; } /** {@inheritDoc} */ @@ -342,12 +353,32 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { } /** + * @param primaryNodeId Primary node ID. + * @param topVer Topology version. + * @param val Value. + * @param dhtVer DHT version received from remote node. + * @throws GridCacheEntryRemovedException If entry was removed. + */ + synchronized public void loadedValue(UUID primaryNodeId, + AffinityTopologyVersion topVer, + CacheObject val, + GridCacheVersion dhtVer) + throws GridCacheEntryRemovedException { + checkObsolete(); + + if (recordDhtVersion(dhtVer)) { + primaryNode(primaryNodeId, topVer); + + value(val); + } + } + + /** * @param tx Transaction. * @param primaryNodeId Primary node ID. * @param val New value. * @param ver Version to use. * @param dhtVer DHT version received from remote node. - * @param expVer Optional version to match. * @param ttl Time to live. * @param expireTime Expiration time. * @param evt Event flag. @@ -363,13 +394,14 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { CacheObject val, GridCacheVersion ver, GridCacheVersion dhtVer, - @Nullable GridCacheVersion expVer, long ttl, long expireTime, boolean evt, AffinityTopologyVersion topVer, UUID subjId) throws IgniteCheckedException, GridCacheEntryRemovedException { + assert dhtVer != null; + boolean valid = valid(tx != null ? tx.topologyVersion() : cctx.affinity().affinityTopologyVersion()); GridCacheVersion enqueueVer = null; @@ -378,9 +410,6 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { synchronized (this) { checkObsolete(); - if (this.dhtVer != null && this.dhtVer.compareTo(dhtVer) >= 0) - return false; - if (cctx.cache().configuration().isStatisticsEnabled()) cctx.cache().metrics0().onRead(false); @@ -389,11 +418,11 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { CacheObject old = this.val; boolean hasVal = hasValueUnlocked(); - if (isNew() || !valid || expVer == null || expVer.equals(this.dhtVer)) { + if (isNew() || !valid) { primaryNode(primaryNodeId, topVer); // Change entry only if dht version has changed. - if (!dhtVer.equals(dhtVersion())) { + if (this.dhtVer == null || this.dhtVer.compareTo(dhtVer) < 0) { update(val, expireTime, ttl, ver); if (cctx.deferredDelete() && !isInternal()) { @@ -407,7 +436,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { } } - recordDhtVersion(dhtVer); + this.dhtVer = dhtVer; ret = true; } @@ -647,6 +676,32 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { } /** + * @throws GridCacheEntryRemovedException If entry was removed. + */ + public synchronized void reserveEviction() throws GridCacheEntryRemovedException { + checkObsolete(); + + evictReservations++; + } + + /** + * + */ + public synchronized void releaseEviction() { + assert evictReservations > 0 : this; + assert !obsolete() : this; + + evictReservations--; + } + + /** {@inheritDoc} */ + @Override protected boolean evictionDisabled() { + assert Thread.holdsLock(this); + + return evictReservations > 0; + } + + /** * @param nodeId Primary node ID. * @param topVer Topology version. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/4da8ac1f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index b89d688..7384a89 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -268,7 +268,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings = U.newHashMap(affNodes.size()); - Map<KeyCacheObject, GridCacheVersion> savedVers = null; + Map<KeyCacheObject, GridNearCacheEntry> savedVers = null; // Assign keys to primary nodes. for (KeyCacheObject key : keys) @@ -277,7 +277,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap if (isDone()) return; - final Map<KeyCacheObject, GridCacheVersion> saved = savedVers; + final Map<KeyCacheObject, GridNearCacheEntry> saved = savedVers; final int keysSize = keys.size(); @@ -335,6 +335,9 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap return Collections.emptyMap(); } + finally { + releaseEvictions(mappedKeys.keySet(), saved, topVer); + } } })); } @@ -385,12 +388,12 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap * @param savedVers Saved versions. * @return Map. */ - private Map<KeyCacheObject, GridCacheVersion> map( + private Map<KeyCacheObject, GridNearCacheEntry> map( KeyCacheObject key, Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings, AffinityTopologyVersion topVer, Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mapped, - Map<KeyCacheObject, GridCacheVersion> savedVers + Map<KeyCacheObject, GridNearCacheEntry> savedVers ) { final GridNearCacheAdapter near = cache(); @@ -558,15 +561,6 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap } } - GridNearCacheEntry nearEntry = allowLocRead ? near.peekExx(key) : null; - - entry = nearEntry; - - if (savedVers == null) - savedVers = U.newHashMap(3); - - savedVers.put(key, nearEntry == null ? null : nearEntry.dhtVersion()); - LinkedHashMap<KeyCacheObject, Boolean> keys = mapped.get(affNode); if (keys != null && keys.containsKey(key)) { @@ -579,6 +573,17 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap } } + GridNearCacheEntry nearEntry = near.entryExx(key, topVer); + + nearEntry.reserveEviction(); + + entry = null; + + if (savedVers == null) + savedVers = U.newHashMap(3); + + savedVers.put(key, nearEntry); + // Don't add reader if transaction acquires lock anyway to avoid deadlock. boolean addRdr = tx == null || tx.optimistic(); @@ -660,7 +665,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap UUID nodeId, Collection<KeyCacheObject> keys, Collection<GridCacheEntryInfo> infos, - Map<KeyCacheObject, GridCacheVersion> savedVers, + Map<KeyCacheObject, GridNearCacheEntry> savedVers, AffinityTopologyVersion topVer ) { boolean empty = F.isEmpty(keys); @@ -678,9 +683,9 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap // Entries available locally in DHT should not be loaded into near cache for reading. if (!cctx.affinity().localNode(info.key(), cctx.affinity().affinityTopologyVersion())) { - GridNearCacheEntry entry = cache().entryExx(info.key(), topVer); + GridNearCacheEntry entry = savedVers.get(info.key()); - GridCacheVersion saved = savedVers.get(info.key()); + assert entry != null : info.key(); // Load entry into cache. entry.loadedValue(tx, @@ -688,14 +693,11 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap info.value(), atomic ? info.version() : ver, info.version(), - saved, info.ttl(), info.expireTime(), true, topVer, subjId); - - cctx.evicts().touch(entry, topVer); } CacheObject val = info.value(); @@ -730,6 +732,26 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap return map; } + /** + * @param keys Keys. + * @param saved Saved entries. + * @param topVer Topology version. + */ + private void releaseEvictions(Collection<KeyCacheObject> keys, + Map<KeyCacheObject, GridNearCacheEntry> saved, + AffinityTopologyVersion topVer) { + for (KeyCacheObject key : keys) { + GridNearCacheEntry entry = saved.get(key); + + assert entry != null : key; + + entry.releaseEviction(); + + if (tx == null) + cctx.evicts().touch(entry, topVer); + } + } + /** {@inheritDoc} */ @Override public String toString() { Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() { @@ -769,7 +791,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap private LinkedHashMap<KeyCacheObject, Boolean> keys; /** Saved entry versions. */ - private Map<KeyCacheObject, GridCacheVersion> savedVers; + private Map<KeyCacheObject, GridNearCacheEntry> savedVers; /** Topology version on which this future was mapped. */ private AffinityTopologyVersion topVer; @@ -786,7 +808,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap MiniFuture( ClusterNode node, LinkedHashMap<KeyCacheObject, Boolean> keys, - Map<KeyCacheObject, GridCacheVersion> savedVers, + Map<KeyCacheObject, GridNearCacheEntry> savedVers, AffinityTopologyVersion topVer ) { this.node = node; @@ -827,6 +849,17 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap onDone(e); } + /** {@inheritDoc} */ + @Override public boolean onDone(@Nullable Map<K, V> res, @Nullable Throwable err) { + if (super.onDone(res, err)) { + releaseEvictions(keys.keySet(), savedVers, topVer); + + return true; + } + else + return false; + } + /** * @param e Topology exception. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/4da8ac1f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java index b5ae659..5f41cdf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java @@ -155,11 +155,12 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre return found; } - /** + /* * @param m Failed mapping. * @param e Error. + * @param res Response. */ - void onError(@Nullable GridDistributedTxMapping m, Throwable e) { + void onError(@Nullable GridDistributedTxMapping m, Throwable e, GridNearTxPrepareResponse res) { if (X.hasCause(e, ClusterTopologyCheckedException.class) || X.hasCause(e, ClusterTopologyException.class)) { if (tx.onePhaseCommit()) { tx.markForBackupCheck(); @@ -170,8 +171,49 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre } } - if (e instanceof IgniteTxOptimisticCheckedException && m != null) - tx.removeMapping(m.node().id()); + if (e instanceof IgniteTxOptimisticCheckedException) { + if (res != null) { + assert m != null; + + Map<IgniteTxKey, CacheVersionedValue> ownVals = res.ownedValues(); + + if (ownVals != null) { + + for (Map.Entry<IgniteTxKey, CacheVersionedValue> val : ownVals.entrySet()) { + IgniteTxEntry txEntry = tx.entry(val.getKey()); + + assert txEntry != null : val.getKey(); + + GridCacheContext cctx = txEntry.context(); + + if (cctx.isNear()) { + GridNearCacheEntry entry = (GridNearCacheEntry)txEntry.cached(); + + assert entry != null; + + while (true) { + try { + CacheVersionedValue verVal = val.getValue(); + + entry.loadedValue(m.node().id(), + tx.topologyVersion(), + verVal.value(), + verVal.version()); + + break; + } + catch (GridCacheEntryRemovedException rmvErr) { + txEntry.cached(entry.context().cache().entryEx(entry.key())); + } + } + } + } + } + } + + if (m != null) + tx.removeMapping(m.node().id()); + } err.compareAndSet(null, e); @@ -425,14 +467,14 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre if (tx.setRollbackOnly()) { if (tx.timedOut()) onError(null, new IgniteTxTimeoutCheckedException("Transaction timed out and " + - "was rolled back: " + this)); + "was rolled back: " + this), null); else onError(null, new IgniteCheckedException("Invalid transaction state for prepare " + - "[state=" + tx.state() + ", tx=" + this + ']')); + "[state=" + tx.state() + ", tx=" + this + ']'), null); } else onError(null, new IgniteTxRollbackCheckedException("Invalid transaction state for " + - "prepare [state=" + tx.state() + ", tx=" + this + ']')); + "prepare [state=" + tx.state() + ", tx=" + this + ']'), null); return; } @@ -842,7 +884,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre */ void onResult(Throwable e) { if (rcvRes.compareAndSet(false, true)) { - onError(m, e); + onError(m, e, null); if (log.isDebugEnabled()) log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']'); @@ -866,7 +908,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre if (log.isDebugEnabled()) log.debug("Remote node left grid while sending or waiting for reply (will not retry): " + this); - onError(null, e); + onError(null, e, null); onDone(e); } @@ -882,7 +924,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre if (rcvRes.compareAndSet(false, true)) { if (res.error() != null) { // Fail the whole compound future. - onError(m, res.error()); + onError(m, res.error(), res); onDone(res.error()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/4da8ac1f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java index c9ea42a..45477a0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java @@ -201,6 +201,7 @@ public abstract class GridNearTxPrepareFutureAdapter extends GridCompoundFuture< } catch (GridCacheEntryRemovedException ignored) { // Retry. + txEntry.cached(cacheCtx.cache().entryEx(txEntry.key())); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/4da8ac1f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java index 6558f97..6a914ca 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java @@ -197,6 +197,19 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse * Adds owned value. * * @param key Key. + * @param val Versioned value. + */ + public void addOwnedValue(IgniteTxKey key, CacheVersionedValue val) { + if (ownedVals == null) + ownedVals = new HashMap<>(); + + ownedVals.put(key, val); + } + + /** + * Adds owned value. + * + * @param key Key. * @param ver DHT version. * @param val Value. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/4da8ac1f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 5a7e5c8..fd807c9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -1019,7 +1019,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter resolveTaskName(), dhtVer); - if (nearCached != null && updRes.success()) + if (nearCached != null && updRes.success()) { + if (dhtVer == null) + dhtVer = explicitVer != null ? explicitVer : writeVersion(); + nearCached.innerSet( null, eventNodeId(), @@ -1038,6 +1041,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter CU.subjectId(this, cctx), resolveTaskName(), dhtVer); + } } else if (op == DELETE) { GridCacheUpdateTxResult updRes = cached.innerRemove( @@ -1056,7 +1060,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter resolveTaskName(), dhtVer); - if (nearCached != null && updRes.success()) + if (nearCached != null && updRes.success()) { + if (dhtVer == null) + dhtVer = explicitVer != null ? explicitVer : writeVersion(); + nearCached.innerRemove( null, eventNodeId(), @@ -1072,6 +1079,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter CU.subjectId(this, cctx), resolveTaskName(), dhtVer); + } } else if (op == RELOAD) { cached.innerReload(); http://git-wip-us.apache.org/repos/asf/ignite/blob/4da8ac1f/modules/core/src/main/java/org/apache/ignite/internal/transactions/IgniteTxOptimisticCheckedException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/transactions/IgniteTxOptimisticCheckedException.java b/modules/core/src/main/java/org/apache/ignite/internal/transactions/IgniteTxOptimisticCheckedException.java index b2b0e2c..3324779 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/transactions/IgniteTxOptimisticCheckedException.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/transactions/IgniteTxOptimisticCheckedException.java @@ -17,7 +17,11 @@ package org.apache.ignite.internal.transactions; +import java.util.Map; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; /** * Exception thrown whenever grid transactions fail optimistically. @@ -26,6 +30,10 @@ public class IgniteTxOptimisticCheckedException extends IgniteCheckedException { /** */ private static final long serialVersionUID = 0L; + /** */ + @GridToStringExclude + private transient Map<IgniteTxKey, CacheVersionedValue> vals; + /** * Creates new optimistic exception with given error message. * @@ -36,6 +44,25 @@ public class IgniteTxOptimisticCheckedException extends IgniteCheckedException { } /** + * Creates new optimistic exception with given error message. + * + * @param msg Error message. + * @param vals Current values. + */ + public IgniteTxOptimisticCheckedException(String msg, Map<IgniteTxKey, CacheVersionedValue> vals) { + super(msg); + + this.vals = vals; + } + + /** + * @return Current values. + */ + public Map<IgniteTxKey, CacheVersionedValue> values() { + return vals; + } + + /** * Creates new optimistic exception with given error message and optional nested exception. * * @param msg Error message. http://git-wip-us.apache.org/repos/asf/ignite/blob/4da8ac1f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java index 7ef2b34..0c9bf9e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java @@ -2929,24 +2929,32 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testGetRemoveTx() throws Exception { - getRemoveTx(false); + getRemoveTx(false, false); } /** * @throws Exception If failed. */ - public void testGetRemoveTxNearCache() throws Exception { - getRemoveTx(true); + public void testGetRemoveTxNearCache1() throws Exception { + getRemoveTx(true, false); + } + + /** + * @throws Exception If failed. + */ + public void testGetRemoveTxNearCache2() throws Exception { + getRemoveTx(true, true); } /** * @param nearCache If {@code true} near cache is enabled. + * @param store If {@code true} cache store is enabled. * @throws Exception If failed. */ - private void getRemoveTx(boolean nearCache) throws Exception { + private void getRemoveTx(boolean nearCache, boolean store) throws Exception { final Ignite ignite0 = ignite(0); - CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 0, false, false); + CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 0, store, false); final List<Ignite> clients = clients(); @@ -2973,9 +2981,11 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { final CyclicBarrier barrier = new CyclicBarrier(THREADS); - GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + final IgniteInternalFuture<?> updateFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { @Override public Void call() throws Exception { - int idx = threadIdx.getAndIncrement() % caches.size(); + int thread = threadIdx.getAndIncrement(); + + int idx = thread % caches.size(); IgniteCache<Integer, Integer> cache = caches.get(idx); @@ -2985,53 +2995,58 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { log.info("Started update thread: " + ignite.name()); + Thread.currentThread().setName("update-thread-" + ignite.name() + "-" + thread); + barrier.await(); - for (int i = 0; i < 100; i++) { - try { - ThreadLocalRandom rnd = ThreadLocalRandom.current(); + for (int i = 0; i < 50; i++) { + while (true) { + try { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); - boolean rmv = rnd.nextInt(3) == 0; + boolean rmv = rnd.nextInt(3) == 0; - Integer val; + Integer val; - try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { - val = cache.get(key); + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + val = cache.get(key); - if (rmv) - cache.remove(key); - else - cache.put(key, val == null ? 1 : val + 1); + if (rmv) + cache.remove(key); + else + cache.put(key, val == null ? 1 : val + 1); - tx.commit(); + tx.commit(); - if (rmv) { - if (val != null) { - for (int j = 0; j < val; j++) - cntr.decrementAndGet(); + if (rmv) { + if (val != null) { + for (int j = 0; j < val; j++) + cntr.decrementAndGet(); + } } + else + cntr.incrementAndGet(); } - else - cntr.incrementAndGet(); + + break; + } + catch (TransactionOptimisticException ignore) { + // Retry. } - } - catch (TransactionOptimisticException ignore) { - // Retry. } } return null; } - }, THREADS, "update-thread").get(); + }, THREADS, "update-thread"); + + updateFut.get(); - int val = cntr.get(); + Integer val = cntr.get(); log.info("Iteration [iter=" + i + ", val=" + val + ']'); - if (val == 0) - checkValue(key, null, cacheName); - else - checkValue(key, val, cacheName); + checkValue(key, val == 0 ? null : val, cacheName); } } finally { http://git-wip-us.apache.org/repos/asf/ignite/blob/4da8ac1f/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java index dbf8928..149603e 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java @@ -21,6 +21,7 @@ import junit.framework.TestSuite; import org.apache.ignite.cache.store.jdbc.CacheJdbcStoreSessionListenerSelfTest; import org.apache.ignite.internal.processors.GridCacheTxLoadFromStoreOnLockSelfTest; import org.apache.ignite.internal.processors.cache.CacheClientStoreSelfTest; +import org.apache.ignite.internal.processors.cache.CacheNearReaderUpdateTest; import org.apache.ignite.internal.processors.cache.CacheOffheapMapEntrySelfTest; import org.apache.ignite.internal.processors.cache.CacheReadThroughAtomicRestartSelfTest; import org.apache.ignite.internal.processors.cache.CacheReadThroughLocalAtomicRestartSelfTest; @@ -142,6 +143,8 @@ public class IgniteCacheTestSuite4 extends TestSuite { suite.addTestSuite(CacheSerializableTransactionsTest.class); + suite.addTestSuite(CacheNearReaderUpdateTest.class); + // Multi node update. suite.addTestSuite(GridCacheMultinodeUpdateSelfTest.class); suite.addTestSuite(GridCacheMultinodeUpdateNearEnabledSelfTest.class);
