http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 6ec02a6..cbaaed4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -35,6 +35,8 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.affinity.GridAffinityAssignment; +import org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; @@ -43,7 +45,6 @@ import org.apache.ignite.internal.processors.cache.GridCachePreloaderAdapter; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAssignmentFetchFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; @@ -62,7 +63,6 @@ import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentHashMap8; import org.jsr166.ConcurrentLinkedDeque8; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST; @@ -112,10 +112,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { /** Demand lock. */ private final ReadWriteLock demandLock = new ReentrantReadWriteLock(); - /** Pending affinity assignment futures. */ - private ConcurrentMap<AffinityTopologyVersion, GridDhtAssignmentFetchFuture> pendingAssignmentFetchFuts = - new ConcurrentHashMap8<>(); - /** */ private final ConcurrentLinkedDeque8<GridDhtLocalPartition> partsToEvict = new ConcurrentLinkedDeque8<>(); @@ -145,11 +141,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { assert e.type() != EVT_NODE_JOINED || n.order() > loc.order() : "Node joined with smaller-than-local " + "order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']'; - if (e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED) { - for (GridDhtAssignmentFetchFuture fut : pendingAssignmentFetchFuts.values()) - fut.onNodeLeft(e.eventNode().id()); - } - if (!initRebalanceFut.isDone()) { startFut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> fut) { @@ -199,19 +190,16 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { } }); - cctx.io().addHandler(cctx.cacheId(), GridDhtAffinityAssignmentRequest.class, - new MessageHandler<GridDhtAffinityAssignmentRequest>() { - @Override protected void onMessage(ClusterNode node, GridDhtAffinityAssignmentRequest msg) { - processAffinityAssignmentRequest(node, msg); - } - }); + if (!cctx.kernalContext().clientNode()) { + cctx.io().addHandler(cctx.cacheId(), GridDhtAffinityAssignmentRequest.class, + new MessageHandler<GridDhtAffinityAssignmentRequest>() { + @Override protected void onMessage(ClusterNode node, GridDhtAffinityAssignmentRequest msg) { + processAffinityAssignmentRequest(node, msg); + } + }); + } - cctx.io().addHandler(cctx.cacheId(), GridDhtAffinityAssignmentResponse.class, - new MessageHandler<GridDhtAffinityAssignmentResponse>() { - @Override protected void onMessage(ClusterNode node, GridDhtAffinityAssignmentResponse msg) { - processAffinityAssignmentResponse(node, msg); - } - }); + cctx.shared().affinity().onCacheCreated(cctx); supplier = new GridDhtPartitionSupplier(cctx); demander = new GridDhtPartitionDemander(cctx, demandLock); @@ -267,13 +255,10 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { } /** {@inheritDoc} */ - @Override public void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) { - demander.updateLastExchangeFuture(lastFut); - } + @Override public void onTopologyChanged(GridDhtPartitionsExchangeFuture lastFut) { + supplier.onTopologyChanged(lastFut.topologyVersion()); - /** {@inheritDoc} */ - @Override public void onTopologyChanged(AffinityTopologyVersion topVer) { - supplier.onTopologyChanged(topVer); + demander.updateLastExchangeFuture(lastFut); } /** {@inheritDoc} */ @@ -289,7 +274,8 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { assert exchFut.forcePreload() || exchFut.dummyReassign() || exchFut.exchangeId().topologyVersion().equals(top.topologyVersion()) : "Topology version mismatch [exchId=" + exchFut.exchangeId() + - ", topVer=" + top.topologyVersion() + ']'; + ", cache=" + cctx.name() + + ", topVer=" + top.topologyVersion() + ']'; GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion()); @@ -458,26 +444,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { } /** - * @param topVer Requested topology version. - * @param fut Future to add. - */ - public void addDhtAssignmentFetchFuture(AffinityTopologyVersion topVer, GridDhtAssignmentFetchFuture fut) { - GridDhtAssignmentFetchFuture old = pendingAssignmentFetchFuts.putIfAbsent(topVer, fut); - - assert old == null : "More than one thread is trying to fetch partition assignments: " + topVer; - } - - /** - * @param topVer Requested topology version. - * @param fut Future to remove. - */ - public void removeDhtAssignmentFetchFuture(AffinityTopologyVersion topVer, GridDhtAssignmentFetchFuture fut) { - boolean rmv = pendingAssignmentFetchFuts.remove(topVer, fut); - - assert rmv : "Failed to remove assignment fetch future: " + topVer; - } - - /** * @return {@code true} if entered to busy state. */ private boolean enterBusy() { @@ -644,11 +610,23 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { log.debug("Affinity is ready for topology version, will send response [topVer=" + topVer + ", node=" + node + ']'); - List<List<ClusterNode>> assignment = cctx.affinity().assignments(topVer); + GridAffinityAssignment assignment = cctx.affinity().assignment(topVer); + + boolean newAffMode = node.version().compareTo(CacheAffinitySharedManager.LATE_AFF_ASSIGN_SINCE) >= 0; + + GridDhtAffinityAssignmentResponse res = new GridDhtAffinityAssignmentResponse(cctx.cacheId(), + topVer, + assignment.assignment(), + newAffMode); + + if (newAffMode && cctx.affinity().affinityCache().centralizedAffinityFunction()) { + assert assignment.idealAssignment() != null; + + res.idealAffinityAssignment(assignment.idealAssignment()); + } try { - cctx.io().send(node, - new GridDhtAffinityAssignmentResponse(cctx.cacheId(), topVer, assignment), AFFINITY_POOL); + cctx.io().send(node, res, AFFINITY_POOL); } catch (IgniteCheckedException e) { U.error(log, "Failed to send affinity assignment response to remote node [node=" + node + ']', e); @@ -658,18 +636,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { } /** - * @param node Node. - * @param res Response. - */ - private void processAffinityAssignmentResponse(ClusterNode node, GridDhtAffinityAssignmentResponse res) { - if (log.isDebugEnabled()) - log.debug("Processing affinity assignment response [node=" + node + ", res=" + res + ']'); - - for (GridDhtAssignmentFetchFuture fut : pendingAssignmentFetchFuts.values()) - fut.onResponse(node, res); - } - - /** * Resends partitions on partition evict within configured timeout. * * @param part Evicted partition. @@ -833,13 +799,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { U.warn(log, ">>> " + fut); } - if (!pendingAssignmentFetchFuts.isEmpty()) { - U.warn(log, "Pending assignment fetch futures [cache=" + cctx.name() +"]:"); - - for (GridDhtAssignmentFetchFuture fut : pendingAssignmentFetchFuts.values()) - U.warn(log, ">>> " + fut); - } - supplier.dumpDebugInfo(); }
http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java index 8483cb1..4b876b4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java @@ -267,7 +267,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda false); // init() will register future for responses if future has remote mappings. - fut.init(); + fut.init(null); return fut; } http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java index 943a91a..d495f83 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java @@ -48,7 +48,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { private static final int NEAR_SIZE_OVERHEAD = 36 + 16; /** Topology version at the moment when value was initialized from primary node. */ - private volatile long topVer = -1L; + private volatile AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE; /** DHT version which caused the last update. */ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") @@ -96,50 +96,34 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { @Override public boolean valid(AffinityTopologyVersion topVer) { assert topVer.topologyVersion() > 0 : "Topology version is invalid: " + topVer; - long topVer0 = this.topVer; + AffinityTopologyVersion topVer0 = this.topVer; - if (topVer0 == topVer.topologyVersion()) + if (topVer0.equals(topVer)) return true; - if (topVer0 == -1L || topVer.topologyVersion() < topVer0) + if (topVer0.equals(AffinityTopologyVersion.NONE) || topVer.compareTo(topVer0) < 0) return false; try { - ClusterNode primary = null; + if (cctx.affinity().primaryChanged(partition(), topVer0, topVer)) { + this.topVer = AffinityTopologyVersion.NONE; - for (long ver = topVer0; ver <= topVer.topologyVersion(); ver++) { - ClusterNode primary0 = cctx.affinity().primary(part, new AffinityTopologyVersion(ver)); - - if (primary0 == null) { - this.topVer = -1L; - - return false; - } - - if (primary == null) - primary = primary0; - else { - if (!primary.equals(primary0)) { - this.topVer = -1L; - - return false; - } - } + return false; } if (cctx.affinity().backup(cctx.localNode(), part, topVer)) { - this.topVer = -1L; + this.topVer = AffinityTopologyVersion.NONE; return false; } - this.topVer = topVer.topologyVersion(); + this.topVer = topVer; return true; } catch (IllegalStateException ignore) { // Do not have affinity history. - this.topVer = -1L; + this.topVer = AffinityTopologyVersion.NONE; return false; } @@ -147,61 +131,52 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { /** * @param topVer Topology version. - * @return {@code True} if this entry was initialized by this call. * @throws GridCacheEntryRemovedException If this entry is obsolete. */ - public boolean initializeFromDht(AffinityTopologyVersion topVer) throws GridCacheEntryRemovedException { - while (true) { - GridDhtCacheEntry entry = cctx.near().dht().peekExx(key); + public void initializeFromDht(AffinityTopologyVersion topVer) throws GridCacheEntryRemovedException { + GridDhtCacheEntry entry = cctx.near().dht().peekExx(key); - if (entry != null) { - GridCacheEntryInfo e = entry.info(); + if (entry != null) { + GridCacheEntryInfo e = entry.info(); - if (e != null) { - GridCacheVersion enqueueVer = null; + if (e != null) { + GridCacheVersion enqueueVer = null; - try { - synchronized (this) { - checkObsolete(); + try { + synchronized (this) { + checkObsolete(); - if (isNew() || !valid(topVer)) { - // Version does not change for load ops. - update(e.value(), e.expireTime(), e.ttl(), e.isNew() ? ver : e.version(), true); + if (isNew() || !valid(topVer)) { + // Version does not change for load ops. + update(e.value(), e.expireTime(), e.ttl(), e.isNew() ? ver : e.version(), true); - if (cctx.deferredDelete() && !isNew() && !isInternal()) { - boolean deleted = val == null; + if (cctx.deferredDelete() && !isNew() && !isInternal()) { + boolean deleted = val == null; - if (deleted != deletedUnlocked()) { - deletedUnlocked(deleted); + if (deleted != deletedUnlocked()) { + deletedUnlocked(deleted); - if (deleted) - enqueueVer = e.version(); - } + if (deleted) + enqueueVer = e.version(); } + } - ClusterNode primaryNode = cctx.affinity().primary(key, topVer); - - if (primaryNode == null) - this.topVer = -1L; - else - recordNodeId(primaryNode.id(), topVer); - - dhtVer = e.isNew() || e.isDeleted() ? null : e.version(); + ClusterNode primaryNode = cctx.affinity().primary(key, topVer); - return true; - } + if (primaryNode == null) + this.topVer = AffinityTopologyVersion.NONE; + else + recordNodeId(primaryNode.id(), topVer); - return false; + dhtVer = e.isNew() || e.isDeleted() ? null : e.version(); } } - finally { - if (enqueueVer != null) - cctx.onDeferredDelete(this, enqueueVer); - } + } + finally { + if (enqueueVer != null) + cctx.onDeferredDelete(this, enqueueVer); } } - else - return false; } } @@ -318,10 +293,15 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { @Override protected void recordNodeId(UUID primaryNodeId, AffinityTopologyVersion topVer) { assert Thread.holdsLock(this); + assert topVer.compareTo(cctx.affinity().affinityTopologyVersion()) <= 0 : "Affinity not ready [" + + "topVer=" + topVer + + ", readyVer=" + cctx.affinity().affinityTopologyVersion() + + ", cache=" + cctx.name() + ']'; + primaryNode(primaryNodeId, topVer); } - /* + /** * @param dhtVer DHT version to record. * @return {@code False} if given version is lower then existing version. */ @@ -661,7 +641,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { /** {@inheritDoc} */ @Override protected void onInvalidate() { - topVer = -1L; + topVer = AffinityTopologyVersion.NONE; dhtVer = null; } @@ -709,13 +689,13 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { } if (primary == null || !nodeId.equals(primary.id())) { - this.topVer = -1L; + this.topVer = AffinityTopologyVersion.NONE; return; } - if (topVer.topologyVersion() > this.topVer) - this.topVer = topVer.topologyVersion(); + if (topVer.compareTo(this.topVer) > 0) + this.topVer = topVer; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index 06fc0a5..c3d75c2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -141,8 +141,10 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap /** * Initializes future. + * + * @param topVer Topology version. */ - public void init() { + public void init(@Nullable AffinityTopologyVersion topVer) { AffinityTopologyVersion lockedTopVer = cctx.shared().lockedTopologyVersion(null); if (lockedTopVer != null) { @@ -151,11 +153,15 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), lockedTopVer); } else { - AffinityTopologyVersion topVer = tx == null ? - (canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion()) : - tx.topologyVersion(); + AffinityTopologyVersion mapTopVer = topVer; + + if (mapTopVer == null) { + mapTopVer = tx == null ? + (canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion()) : + tx.topologyVersion(); + } - map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), topVer); + map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), mapTopVer); } markInitialized(); @@ -982,18 +988,18 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap } // Need to wait for next topology version to remap. - IgniteInternalFuture<Long> topFut = cctx.discovery().topologyFuture(rmtTopVer.topologyVersion()); + IgniteInternalFuture<AffinityTopologyVersion> topFut = cctx.affinity().affinityReadyFuture(rmtTopVer); - topFut.listen(new CIX1<IgniteInternalFuture<Long>>() { - @Override public void applyx(IgniteInternalFuture<Long> fut) throws IgniteCheckedException { - long readyTopVer = fut.get(); + topFut.listen(new CIX1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void applyx(IgniteInternalFuture<AffinityTopologyVersion> fut) throws IgniteCheckedException { + AffinityTopologyVersion readyTopVer = fut.get(); // This will append new futures to compound list. map(F.view(keys.keySet(), new P1<KeyCacheObject>() { @Override public boolean apply(KeyCacheObject key) { return invalidParts.contains(cctx.affinity().partition(key)); } - }), F.t(node, keys), new AffinityTopologyVersion(readyTopVer)); + }), F.t(node, keys), readyTopVer); // It is critical to call onDone after adding futures to compound list. onDone(loadEntries(node.id(), keys.keySet(), res.entries(), savedEntries, topVer)); http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java index d5483cd..6515140 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java @@ -24,7 +24,6 @@ import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; - import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.cluster.ClusterTopologyException; @@ -127,7 +126,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim break; } catch (GridCacheEntryRemovedException e) { - entry = ctx.cache().entryEx(entry.key()); + entry = ctx.cache().entryEx(entry.key(), tx.topologyVersion()); txEntry.cached(entry); } @@ -601,7 +600,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim break; } catch (GridCacheEntryRemovedException ignore) { - entry.cached(cacheCtx.near().entryEx(entry.key())); + entry.cached(cacheCtx.near().entryEx(entry.key(), topVer)); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index f146071..5d3f604 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -109,7 +109,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion())); - f.onResult(e); + f.onNodeLeft(e, true); found = true; } @@ -121,13 +121,14 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa /** * @param e Error. + * @param discoThread {@code True} if executed from discovery thread. */ - void onError(Throwable e) { + void onError(Throwable e, boolean discoThread) { if (X.hasCause(e, ClusterTopologyCheckedException.class) || X.hasCause(e, ClusterTopologyException.class)) { if (tx.onePhaseCommit()) { tx.markForBackupCheck(); - onComplete(); + onComplete(discoThread); return; } @@ -147,7 +148,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa } } - onComplete(); + onComplete(discoThread); } } @@ -202,7 +203,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa ERR_UPD.compareAndSet(this, null, err); - return onComplete(); + return onComplete(false); } /** @@ -216,9 +217,10 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa /** * Completeness callback. * + * @param discoThread {@code True} if executed from discovery thread. * @return {@code True} if future was finished by this call. */ - private boolean onComplete() { + private boolean onComplete(boolean discoThread) { Throwable err0 = err; if (err0 == null || tx.needCheckBackup()) @@ -248,14 +250,14 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa if (tx.setRollbackOnly()) { if (tx.timedOut()) onError(new IgniteTxTimeoutCheckedException("Transaction timed out and " + - "was rolled back: " + this)); + "was rolled back: " + this), false); else onError(new IgniteCheckedException("Invalid transaction state for prepare " + - "[state=" + tx.state() + ", tx=" + this + ']')); + "[state=" + tx.state() + ", tx=" + this + ']'), false); } else onError(new IgniteTxRollbackCheckedException("Invalid transaction state for " + - "prepare [state=" + tx.state() + ", tx=" + this + ']')); + "prepare [state=" + tx.state() + ", tx=" + this + ']'), false); return; } @@ -270,7 +272,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa markInitialized(); } catch (TransactionTimeoutException e) { - onError(e); + onError(e, false); } } @@ -450,7 +452,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa tx.userPrepare(); } catch (IgniteCheckedException e) { - onError(e); + onError(e, false); } } @@ -485,7 +487,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa catch (ClusterTopologyCheckedException e) { e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion())); - fut.onResult(e); + fut.onNodeLeft(e, false); } catch (IgniteCheckedException e) { fut.onResult(e); @@ -586,7 +588,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa break; } catch (GridCacheEntryRemovedException ignore) { - entry.cached(cacheCtx.near().entryEx(entry.key())); + entry.cached(cacheCtx.near().entryEx(entry.key(), topVer)); } } } @@ -695,7 +697,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa /** * @param e Node failure. */ - void onResult(ClusterTopologyCheckedException e) { + void onNodeLeft(ClusterTopologyCheckedException e, boolean discoThread) { if (isDone()) return; @@ -705,7 +707,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa // Fail the whole future (make sure not to remap on different primary node // to prevent multiple lock coordinators). - parent.onError(e); + parent.onError(e, discoThread); } } @@ -721,7 +723,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa if (RCV_RES_UPD.compareAndSet(this, 0, 1)) { if (res.error() != null) { // Fail the whole compound future. - parent.onError(res.error()); + parent.onError(res.error(), false); } else { if (res.clientRemapVersion() != null) { http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java index 7132567..4d77a3c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java @@ -52,22 +52,16 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT // Obtain the topology version to use. long threadId = Thread.currentThread().getId(); - AffinityTopologyVersion topVer = null; + AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId); - if (tx.system()) { - topVer = tx.topologyVersionSnapshot(); + // If there is another system transaction in progress, use it's topology version to prevent deadlock. + if (topVer == null && tx.system()) { + topVer = cctx.tm().lockedTopologyVersion(threadId, tx); if (topVer == null) - topVer = cctx.exchange().readyAffinityVersion(); + topVer = tx.topologyVersionSnapshot(); } - if (topVer == null) - topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId); - - // If there is another system transaction in progress, use it's topology version to prevent deadlock. - if (topVer == null && tx.system()) - topVer = cctx.tm().lockedTopologyVersion(threadId, tx); - if (topVer != null) { tx.topologyVersion(topVer); http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java index a3130cd..e954a7f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java @@ -141,8 +141,9 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> if (tx != null && !tx.implicit() && !skipTx) { return asyncOp(tx, new AsyncOp<Map<K, V>>(keys) { - @Override public IgniteInternalFuture<Map<K, V>> op(IgniteTxLocalAdapter tx) { + @Override public IgniteInternalFuture<Map<K, V>> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { return tx.getAllAsync(ctx, + readyTopVer, ctx.cacheKeysView(keys), deserializeBinary, skipVals, @@ -179,6 +180,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> * @return Future. */ IgniteInternalFuture<Map<K, V>> txLoadAsync(GridNearTxLocal tx, + AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> keys, boolean readThrough, boolean deserializeBinary, @@ -202,7 +204,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> /*keepCacheObjects*/true); // init() will register future for responses if it has remote mappings. - fut.init(); + fut.init(topVer); return fut; } @@ -314,6 +316,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> if (tx == null) { tx = new GridNearTxRemote( ctx.shared(), + req.topologyVersion(), nodeId, req.nearNodeId(), req.nearXidVersion(), http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index 078e322..5c4aca0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -136,7 +136,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu if (isMini(fut)) { MinFuture f = (MinFuture)fut; - if (f.onNodeLeft(nodeId)) { + if (f.onNodeLeft(nodeId, true)) { // Remove previous mapping. mappings.remove(nodeId); @@ -211,7 +211,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu CheckRemoteTxMiniFuture f = (CheckRemoteTxMiniFuture)fut; if (f.futureId().equals(res.miniId())) - f.onDhtFinishResponse(nodeId); + f.onDhtFinishResponse(nodeId, false); } } } @@ -486,7 +486,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu } } catch (ClusterTopologyCheckedException e) { - mini.onNodeLeft(backupId); + mini.onNodeLeft(backupId, false); } catch (IgniteCheckedException e) { mini.onDone(e); @@ -628,7 +628,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu // Remove previous mapping. mappings.remove(m.node().id()); - fut.onNodeLeft(n.id()); + fut.onNodeLeft(n.id(), false); } catch (IgniteCheckedException e) { // Fail the whole thing. @@ -652,9 +652,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu ", loc=" + node.isLocal() + ", done=" + fut.isDone() + ']'; } - else { + else return "FinishFuture[node=null, done=" + fut.isDone() + ']'; - } } else if (f.getClass() == CheckBackupMiniFuture.class) { CheckBackupMiniFuture fut = (CheckBackupMiniFuture)f; @@ -728,10 +727,11 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu private final IgniteUuid futId = IgniteUuid.randomUuid(); /** - * @param nodeId Node ID. + * @param nodeId Node ID. + * @param discoThread {@code True} if executed from discovery thread. * @return {@code True} if future processed node failure. */ - abstract boolean onNodeLeft(UUID nodeId); + abstract boolean onNodeLeft(UUID nodeId, boolean discoThread); /** * @return Future ID. @@ -774,10 +774,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu return m; } - /** - * @param nodeId Failed node ID. - */ - boolean onNodeLeft(UUID nodeId) { + /** {@inheritDoc} */ + boolean onNodeLeft(UUID nodeId, boolean discoThread) { if (nodeId.equals(m.node().id())) { if (log.isDebugEnabled()) log.debug("Remote node left grid while sending or waiting for reply: " + this); @@ -806,7 +804,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu fut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> fut) { - mini.onDhtFinishResponse(cctx.localNodeId()); + mini.onDhtFinishResponse(cctx.localNodeId(), true); } }); } @@ -815,7 +813,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu cctx.io().send(backup, req, tx.ioPolicy()); } catch (ClusterTopologyCheckedException e) { - mini.onNodeLeft(backupId); + mini.onNodeLeft(backupId, discoThread); } catch (IgniteCheckedException e) { mini.onDone(e); @@ -823,7 +821,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu } } else - mini.onDhtFinishResponse(backupId); + mini.onDhtFinishResponse(backupId, true); } } } @@ -881,7 +879,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu } /** {@inheritDoc} */ - @Override boolean onNodeLeft(UUID nodeId) { + @Override boolean onNodeLeft(UUID nodeId, boolean discoThread) { if (nodeId.equals(backup.id())) { readyNearMappingFromBackup(m); @@ -942,22 +940,24 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu } /** {@inheritDoc} */ - @Override boolean onNodeLeft(UUID nodeId) { - return onResponse(nodeId); + @Override boolean onNodeLeft(UUID nodeId, boolean discoThread) { + return onResponse(nodeId, discoThread); } /** * @param nodeId Node ID. + * @param discoThread {@code True} if executed from discovery thread. */ - void onDhtFinishResponse(UUID nodeId) { - onResponse(nodeId); + void onDhtFinishResponse(UUID nodeId, boolean discoThread) { + onResponse(nodeId, discoThread); } /** * @param nodeId Node ID. + * @param discoThread {@code True} if executed from discovery thread. * @return {@code True} if processed node response. */ - private boolean onResponse(UUID nodeId) { + private boolean onResponse(UUID nodeId, boolean discoThread) { boolean done; boolean ret; http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index a70fb3a..f7c330e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -51,6 +51,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; +import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.lang.GridClosureException; @@ -344,6 +345,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { /** {@inheritDoc} */ @Override public IgniteInternalFuture<Void> loadMissing( final GridCacheContext cacheCtx, + AffinityTopologyVersion topVer, boolean readThrough, boolean async, final Collection<KeyCacheObject> keys, @@ -354,6 +356,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { ) { if (cacheCtx.isNear()) { return cacheCtx.nearTx().txLoadAsync(this, + topVer, keys, readThrough, /*deserializeBinary*/false, @@ -384,7 +387,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { key, readThrough, /*force primary*/needVer, - topologyVersion(), + topVer, CU.subjectId(this, cctx), resolveTaskName(), /*deserializeBinary*/false, @@ -415,7 +418,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { keys, readThrough, /*force primary*/needVer, - topologyVersion(), + topVer, CU.subjectId(this, cctx), resolveTaskName(), /*deserializeBinary*/false, @@ -445,7 +448,15 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { else { assert cacheCtx.isLocal(); - return super.loadMissing(cacheCtx, readThrough, async, keys, skipVals, keepBinary, needVer, c); + return super.loadMissing(cacheCtx, + topVer, + readThrough, + async, + keys, + skipVals, + keepBinary, + needVer, + c); } } @@ -512,7 +523,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { txEntry.explicitVersion(candVer); - if (candVer.isLess(minVer)) + if (candVer.compareTo(minVer) < 0) minVer = candVer; } } @@ -715,7 +726,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { ", tx=" + this + ']'); // Replace the entry. - txEntry.cached(txEntry.context().cache().entryEx(txEntry.key())); + txEntry.cached(txEntry.context().cache().entryEx(txEntry.key(), topologyVersion())); } } } @@ -1338,6 +1349,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { /** {@inheritDoc} */ @Override public String toString() { - return S.toString(GridNearTxLocal.class, this, "mappings", mappings, "super", super.toString()); + return S.toString(GridNearTxLocal.class, this, + "thread", IgniteUtils.threadName(threadId), + "mappings", mappings, + "super", super.toString()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java index 2acc139..03e1034 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java @@ -209,7 +209,7 @@ public abstract class GridNearTxPrepareFutureAdapter extends } catch (GridCacheEntryRemovedException ignored) { // Retry. - txEntry.cached(cacheCtx.cache().entryEx(txEntry.key())); + txEntry.cached(cacheCtx.cache().entryEx(txEntry.key(), tx.topologyVersion())); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java index 9c52c80..6b17d5e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java @@ -25,6 +25,7 @@ import java.util.LinkedList; import java.util.Map; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; @@ -72,12 +73,14 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter { /** * This constructor is meant for optimistic transactions. * + * @param topVer Transaction topology version. * @param ldr Class loader. * @param nodeId Node ID. * @param nearNodeId Near node ID. * @param xidVer XID version. * @param commitVer Commit version. * @param sys System flag. + * @param plc IO policy. * @param concurrency Concurrency level (should be pessimistic). * @param isolation Transaction isolation. * @param invalidate Invalidate flag. @@ -85,10 +88,13 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter { * @param writeEntries Write entries. * @param ctx Cache registry. * @param txSize Expected transaction size. + * @param subjId Subject ID. + * @param taskNameHash Task name hash code. * @throws IgniteCheckedException If unmarshalling failed. */ public GridNearTxRemote( GridCacheSharedContext ctx, + AffinityTopologyVersion topVer, ClassLoader ldr, UUID nodeId, UUID nearNodeId, @@ -137,26 +143,35 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter { addEntry(entry); } } + + assert topVer != null && topVer.topologyVersion() > 0 : topVer; + + topologyVersion(topVer); } /** * This constructor is meant for pessimistic transactions. * + * @param topVer Transaction topology version. * @param nodeId Node ID. * @param nearNodeId Near node ID. * @param nearXidVer Near transaction ID. * @param xidVer XID version. * @param commitVer Commit version. * @param sys System flag. + * @param plc IO policy. * @param concurrency Concurrency level (should be pessimistic). * @param isolation Transaction isolation. * @param invalidate Invalidate flag. * @param timeout Timeout. * @param ctx Cache registry. * @param txSize Expected transaction size. + * @param subjId Subject ID. + * @param taskNameHash Task name hash code. */ public GridNearTxRemote( GridCacheSharedContext ctx, + AffinityTopologyVersion topVer, UUID nodeId, UUID nearNodeId, GridCacheVersion nearXidVer, @@ -195,6 +210,10 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter { txState = new IgniteTxRemoteStateImpl(U.<IgniteTxKey, IgniteTxEntry>newLinkedHashMap(1), U.<IgniteTxKey, IgniteTxEntry>newLinkedHashMap(txSize)); + + assert topVer != null && topVer.topologyVersion() > 0 : topVer; + + topologyVersion(topVer); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrManager.java index 358f90c..f2a4b30 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrManager.java @@ -56,13 +56,13 @@ public interface GridCacheDrManager extends GridCacheManager { AffinityTopologyVersion topVer)throws IgniteCheckedException; /** - * Process partitions "before exchange" event. + * Process partitions exchange event. * * @param topVer Topology version. * @param left {@code True} if exchange has been caused by node leave. * @throws IgniteCheckedException If failed. */ - public void beforeExchange(AffinityTopologyVersion topVer, boolean left) throws IgniteCheckedException; + public void onExchange(AffinityTopologyVersion topVer, boolean left) throws IgniteCheckedException; /** * @return {@code True} is DR is enabled. http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridOsCacheDrManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridOsCacheDrManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridOsCacheDrManager.java index 825769f..a82adf0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridOsCacheDrManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridOsCacheDrManager.java @@ -78,7 +78,7 @@ public class GridOsCacheDrManager implements GridCacheDrManager { } /** {@inheritDoc} */ - @Override public void beforeExchange(AffinityTopologyVersion topVer, boolean left) throws IgniteCheckedException { + @Override public void onExchange(AffinityTopologyVersion topVer, boolean left) throws IgniteCheckedException { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java index 6ddd2e5..10fa116 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java @@ -176,33 +176,6 @@ public class GridLocalCacheEntry extends GridCacheMapEntry { return owner; } - /** - * - * @param ver Candidate version. - * @return Current owner. - */ - @Nullable public GridCacheMvccCandidate readyLocal(GridCacheVersion ver) { - GridCacheMvccCandidate prev = null; - GridCacheMvccCandidate owner = null; - - synchronized (this) { - GridCacheMvcc mvcc = mvccExtras(); - - if (mvcc != null) { - prev = mvcc.localOwner(); - - owner = mvcc.readyLocal(ver); - - if (mvcc.isEmpty()) - mvccExtras(null); - } - } - - checkOwnerChanged(prev, owner); - - return owner; - } - /** {@inheritDoc} */ @Override public boolean tmLock(IgniteInternalTx tx, long timeout, http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java index 2e41f2a..c69759b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java @@ -23,7 +23,6 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; - import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; @@ -174,20 +173,6 @@ public final class GridLocalLockFuture<K, V> extends GridFutureAdapter<Boolean> } /** - * @return Lock version. - */ - GridCacheVersion lockVersion() { - return lockVer; - } - - /** - * @return Entries. - */ - List<GridLocalCacheEntry> entries() { - return entries; - } - - /** * @return {@code True} if transaction is not {@code null}. */ private boolean inTx() { http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 401b61b..8bcf564 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -2485,11 +2485,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte * @return Expire time. */ abstract long expireTime(); - - /** - * @return Version. - */ - abstract GridCacheVersion version(); } /** @@ -2528,11 +2523,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte @Override long expireTime() { return GridCacheSwapEntryImpl.expireTime(e.getValue()); } - - /** {@inheritDoc} */ - @Override GridCacheVersion version() { - return GridCacheSwapEntryImpl.version(e.getValue()); - } } /** @@ -2578,11 +2568,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte @Override long expireTime() { return GridCacheOffheapSwapEntry.expireTime(valPtr.get1()); } - - /** {@inheritDoc} */ - @Override GridCacheVersion version() { - return GridCacheOffheapSwapEntry.version(valPtr.get1()); - } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 55bcf45..77f3765 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -465,21 +465,29 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement @Override public AffinityTopologyVersion topologyVersion() { AffinityTopologyVersion res = topVer; - if (res.equals(AffinityTopologyVersion.NONE)) + if (res.equals(AffinityTopologyVersion.NONE)) { + if (system()) { + AffinityTopologyVersion topVer = cctx.tm().lockedTopologyVersion(Thread.currentThread().getId(), this); + + if (topVer != null) + return topVer; + } + return cctx.exchange().topologyVersion(); + } return res; } /** {@inheritDoc} */ - @Override public AffinityTopologyVersion topologyVersionSnapshot() { + @Override public final AffinityTopologyVersion topologyVersionSnapshot() { AffinityTopologyVersion ret = topVer; return AffinityTopologyVersion.NONE.equals(ret) ? null : ret; } /** {@inheritDoc} */ - @Override public AffinityTopologyVersion topologyVersion(AffinityTopologyVersion topVer) { + @Override public final AffinityTopologyVersion topologyVersion(AffinityTopologyVersion topVer) { AffinityTopologyVersion topVer0 = this.topVer; if (!AffinityTopologyVersion.NONE.equals(topVer0)) http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index 9060fa7..e75ce91 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -40,6 +40,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.IgniteExternalizableExpiryPolicy; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.lang.GridPeerDeployAware; import org.apache.ignite.internal.util.tostring.GridToStringBuilder; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -920,13 +921,6 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { } /** - * @return Read version for serializable transaction. - */ - @Nullable public GridCacheVersion serializableReadVersion() { - return serReadVer; - } - - /** * Gets stored entry version. Version is stored for all entries in serializable transaction or * when value is read using {@link IgniteCache#getEntry(Object)} method. * http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 547c018..41dc43f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.transactions; import java.util.Collection; +import java.util.List; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; @@ -349,6 +350,7 @@ public class IgniteTxHandler { tx = new GridDhtTxLocal( ctx, + req.topologyVersion(), nearNode.id(), req.version(), req.futureId(), @@ -464,8 +466,19 @@ public class IgniteTxHandler { Collection<ClusterNode> cacheNodes0 = ctx.discovery().cacheAffinityNodes(ctx.name(), expVer); Collection<ClusterNode> cacheNodes1 = ctx.discovery().cacheAffinityNodes(ctx.name(), curVer); - if (!cacheNodes0.equals(cacheNodes1)) + if (!cacheNodes0.equals(cacheNodes1) || ctx.affinity().affinityTopologyVersion().compareTo(curVer) < 0) + return true; + + try { + List<List<ClusterNode>> aff1 = ctx.affinity().assignments(expVer); + List<List<ClusterNode>> aff2 = ctx.affinity().assignments(curVer); + + if (!aff1.equals(aff2)) + return true; + } + catch (IllegalStateException err) { return true; + } } return false; @@ -1247,6 +1260,7 @@ public class IgniteTxHandler { if (tx == null) { tx = new GridNearTxRemote( ctx, + req.topologyVersion(), ldr, nodeId, req.nearNodeId(), http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 11a35cb..b1e150b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -343,11 +343,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig txState.seal(); } - /** {@inheritDoc} */ - @Override public GridCacheReturn implicitSingleResult() { - return implicitRes; - } - /** * @param ret Result. */ @@ -412,6 +407,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig /** {@inheritDoc} */ @Override public IgniteInternalFuture<Void> loadMissing( final GridCacheContext cacheCtx, + AffinityTopologyVersion topVer, final boolean readThrough, boolean async, final Collection<KeyCacheObject> keys, @@ -472,7 +468,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig log.debug("Got removed entry, will retry: " + key); if (txEntry != null) - txEntry.cached(cacheCtx.cache().entryEx(key)); + txEntry.cached(cacheCtx.cache().entryEx(key, topologyVersion())); } } } @@ -1137,7 +1133,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig if (log.isDebugEnabled()) log.debug("Got removed entry during transaction commit (will retry): " + txEntry); - txEntry.cached(entryEx(cacheCtx, txEntry.txKey())); + txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topologyVersion())); } } } @@ -1334,9 +1330,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig } /** - * Checks if there is a cached or swapped value for - * {@link #getAllAsync(GridCacheContext, Collection, boolean, boolean, boolean, boolean, boolean)} method. - * * @param cacheCtx Cache context. * @param keys Key to enlist. * @param expiryPlc Explicitly specified expiry policy for entry. @@ -1353,6 +1346,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig @SuppressWarnings({"RedundantTypeArguments"}) private <K, V> Collection<KeyCacheObject> enlistRead( final GridCacheContext cacheCtx, + @Nullable AffinityTopologyVersion entryTopVer, Collection<KeyCacheObject> keys, @Nullable ExpiryPolicy expiryPlc, Map<K, V> map, @@ -1373,7 +1367,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig Collection<KeyCacheObject> lockKeys = null; - AffinityTopologyVersion topVer = topologyVersion(); + AffinityTopologyVersion topVer = entryTopVer != null ? entryTopVer : topologyVersion(); boolean needReadVer = (serializable() && optimistic()) || needVer; @@ -1653,10 +1647,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig } /** - * Loads all missed keys for - * {@link #getAllAsync(GridCacheContext, Collection, boolean, boolean, boolean, boolean, boolean)} method. - * * @param cacheCtx Cache context. + * @param topVer Topology version. * @param map Return map. * @param missedMap Missed keys. * @param deserializeBinary Deserialize binary flag. @@ -1667,6 +1659,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig */ private <K, V> IgniteInternalFuture<Map<K, V>> checkMissed( final GridCacheContext cacheCtx, + final AffinityTopologyVersion topVer, final Map<K, V> map, final Map<KeyCacheObject, GridCacheVersion> missedMap, final boolean deserializeBinary, @@ -1695,6 +1688,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig }, loadMissing( cacheCtx, + topVer, !skipStore, false, missedMap.keySet(), @@ -1776,6 +1770,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig @SuppressWarnings("unchecked") @Override public <K, V> IgniteInternalFuture<Map<K, V>> getAllAsync( final GridCacheContext cacheCtx, + @Nullable final AffinityTopologyVersion entryTopVer, Collection<KeyCacheObject> keys, final boolean deserializeBinary, final boolean skipVals, @@ -1803,6 +1798,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig ExpiryPolicy expiryPlc = opCtx != null ? opCtx.expiry() : null; final Collection<KeyCacheObject> lockKeys = enlistRead(cacheCtx, + entryTopVer, keys, expiryPlc, retMap, @@ -1934,13 +1930,19 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig log.debug("Got removed exception in get postLock (will retry): " + cached); - txEntry.cached(entryEx(cacheCtx, txKey)); + txEntry.cached(entryEx(cacheCtx, txKey, topologyVersion())); } } } if (!missed.isEmpty() && cacheCtx.isLocal()) { + AffinityTopologyVersion topVer = topologyVersionSnapshot(); + + if (topVer == null) + topVer = entryTopVer; + return checkMissed(cacheCtx, + topVer != null ? topVer : topologyVersion(), retMap, missed, deserializeBinary, @@ -2007,7 +2009,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig if (missed.isEmpty()) return new GridFinishedFuture<>(retMap); + AffinityTopologyVersion topVer = topologyVersionSnapshot(); + + if (topVer == null) + topVer = entryTopVer; + return checkMissed(cacheCtx, + topVer != null ? topVer : topologyVersion(), retMap, missed, deserializeBinary, @@ -2031,10 +2039,12 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig @SuppressWarnings("unchecked") @Override public <K, V> IgniteInternalFuture<GridCacheReturn> putAllAsync( GridCacheContext cacheCtx, + @Nullable AffinityTopologyVersion entryTopVer, Map<? extends K, ? extends V> map, boolean retval ) { return (IgniteInternalFuture<GridCacheReturn>)putAllAsync0(cacheCtx, + entryTopVer, map, null, null, @@ -2045,19 +2055,35 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig /** {@inheritDoc} */ @Override public <K, V> IgniteInternalFuture<GridCacheReturn> putAsync( GridCacheContext cacheCtx, + @Nullable AffinityTopologyVersion entryTopVer, K key, V val, boolean retval, CacheEntryPredicate filter) { - return putAsync0(cacheCtx, key, val, null, null, retval, filter); + return putAsync0(cacheCtx, + entryTopVer, + key, + val, + null, + null, + retval, + filter); } /** {@inheritDoc} */ @Override public <K, V> IgniteInternalFuture<GridCacheReturn> invokeAsync(GridCacheContext cacheCtx, + @Nullable AffinityTopologyVersion entryTopVer, K key, EntryProcessor<K, V, Object> entryProcessor, Object... invokeArgs) { - return (IgniteInternalFuture)putAsync0(cacheCtx, key, null, entryProcessor, invokeArgs, true, null); + return (IgniteInternalFuture)putAsync0(cacheCtx, + entryTopVer, + key, + null, + entryProcessor, + invokeArgs, + true, + null); } /** {@inheritDoc} */ @@ -2072,6 +2098,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig }); return this.<Object, Object>putAllAsync0(cacheCtx, + null, map, null, null, @@ -2083,10 +2110,12 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig @SuppressWarnings("unchecked") @Override public <K, V, T> IgniteInternalFuture<GridCacheReturn> invokeAsync( GridCacheContext cacheCtx, + @Nullable AffinityTopologyVersion entryTopVer, @Nullable Map<? extends K, ? extends EntryProcessor<K, V, Object>> map, Object... invokeArgs ) { return (IgniteInternalFuture<GridCacheReturn>)putAllAsync0(cacheCtx, + entryTopVer, null, map, invokeArgs, @@ -2099,7 +2128,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig GridCacheContext cacheCtx, Map<KeyCacheObject, GridCacheVersion> drMap ) { - return removeAllAsync0(cacheCtx, null, drMap, false, null, false); + return removeAllAsync0(cacheCtx, null, null, drMap, false, null, false); } /** @@ -2136,6 +2165,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig */ private <K, V> IgniteInternalFuture<Void> enlistWrite( final GridCacheContext cacheCtx, + @Nullable AffinityTopologyVersion entryTopVer, KeyCacheObject cacheKey, Object val, @Nullable ExpiryPolicy expiryPlc, @@ -2162,6 +2192,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig GridCacheVersion drVer = dataCenterId != null ? cctx.versions().next(dataCenterId) : null; boolean loadMissed = enlistWriteEntry(cacheCtx, + entryTopVer, cacheKey, val, entryProcessor, @@ -2183,7 +2214,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig keepBinary); if (loadMissed) { + AffinityTopologyVersion topVer = topologyVersionSnapshot(); + + if (topVer == null) + topVer = entryTopVer; + return loadMissing(cacheCtx, + topVer != null ? topVer : topologyVersion(), Collections.singleton(cacheKey), filter, ret, @@ -2226,6 +2263,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig */ private <K, V> IgniteInternalFuture<Void> enlistWrite( final GridCacheContext cacheCtx, + @Nullable AffinityTopologyVersion entryTopVer, Collection<?> keys, @Nullable ExpiryPolicy expiryPlc, @Nullable Map<?, ?> lookup, @@ -2315,6 +2353,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key); boolean loadMissed = enlistWriteEntry(cacheCtx, + entryTopVer, cacheKey, val, entryProcessor, @@ -2344,7 +2383,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig } if (missedForLoad != null) { + AffinityTopologyVersion topVer = topologyVersionSnapshot(); + + if (topVer == null) + topVer = entryTopVer; + return loadMissing(cacheCtx, + topVer != null ? topVer : topologyVersion(), missedForLoad, filter, ret, @@ -2377,6 +2422,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig */ private IgniteInternalFuture<Void> loadMissing( final GridCacheContext cacheCtx, + final AffinityTopologyVersion topVer, final Set<KeyCacheObject> keys, final CacheEntryPredicate[] filter, final GridCacheReturn ret, @@ -2441,6 +2487,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig return loadMissing( cacheCtx, + topVer, /*read through*/cacheCtx.config().isLoadPreviousValue() && !skipStore, /*async*/true, keys, @@ -2474,6 +2521,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig * @throws IgniteCheckedException If failed. */ private boolean enlistWriteEntry(GridCacheContext cacheCtx, + @Nullable AffinityTopologyVersion entryTopVer, final KeyCacheObject cacheKey, @Nullable final Object val, @Nullable final EntryProcessor<?, ?, ?> entryProcessor, @@ -2505,7 +2553,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig // First time access. if (txEntry == null) { while (true) { - GridCacheEntryEx entry = entryEx(cacheCtx, txKey, topologyVersion()); + GridCacheEntryEx entry = entryEx(cacheCtx, txKey, entryTopVer != null ? entryTopVer : topologyVersion()); try { entry.unswap(false); @@ -2936,7 +2984,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig if (log.isDebugEnabled()) log.debug("Got removed entry in putAllAsync method (will retry): " + cached); - txEntry.cached(entryEx(cached.context(), txEntry.txKey())); + txEntry.cached(entryEx(cached.context(), txEntry.txKey(), topologyVersion())); } } } @@ -3024,6 +3072,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig */ private <K, V> IgniteInternalFuture putAsync0( final GridCacheContext cacheCtx, + @Nullable AffinityTopologyVersion entryTopVer, K key, @Nullable V val, @Nullable EntryProcessor<K, V, Object> entryProcessor, @@ -3050,6 +3099,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig final IgniteInternalFuture<Void> loadFut = enlistWrite( cacheCtx, + entryTopVer, cacheKey, val, opCtx != null ? opCtx.expiry() : null, @@ -3156,6 +3206,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig @SuppressWarnings("unchecked") private <K, V> IgniteInternalFuture putAllAsync0( final GridCacheContext cacheCtx, + @Nullable AffinityTopologyVersion entryTopVer, @Nullable Map<? extends K, ? extends V> map, @Nullable Map<? extends K, ? extends EntryProcessor<K, V, Object>> invokeMap, @Nullable final Object[] invokeArgs, @@ -3214,6 +3265,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig final IgniteInternalFuture<Void> loadFut = enlistWrite( cacheCtx, + entryTopVer, keySet, opCtx != null ? opCtx.expiry() : null, map0, @@ -3386,12 +3438,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig /** {@inheritDoc} */ @Override public <K, V> IgniteInternalFuture<GridCacheReturn> removeAllAsync( GridCacheContext cacheCtx, + @Nullable AffinityTopologyVersion entryTopVer, Collection<? extends K> keys, boolean retval, CacheEntryPredicate filter, boolean singleRmv ) { - return removeAllAsync0(cacheCtx, keys, null, retval, filter, singleRmv); + return removeAllAsync0(cacheCtx, entryTopVer, keys, null, retval, filter, singleRmv); } /** @@ -3406,6 +3459,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig @SuppressWarnings("unchecked") private <K, V> IgniteInternalFuture<GridCacheReturn> removeAllAsync0( final GridCacheContext cacheCtx, + @Nullable AffinityTopologyVersion entryTopVer, @Nullable final Collection<? extends K> keys, @Nullable Map<KeyCacheObject, GridCacheVersion> drMap, final boolean retval, @@ -3491,6 +3545,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig final IgniteInternalFuture<Void> loadFut = enlistWrite( cacheCtx, + entryTopVer, keys0, plc, /** lookup map */null, http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java index 5911e89..6741885 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java @@ -23,6 +23,7 @@ import javax.cache.Cache; import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheReturn; @@ -72,6 +73,7 @@ public interface IgniteTxLocalEx extends IgniteInternalTx { */ public <K, V> IgniteInternalFuture<Map<K, V>> getAllAsync( GridCacheContext cacheCtx, + @Nullable AffinityTopologyVersion entryTopVer, Collection<KeyCacheObject> keys, boolean deserializeBinary, boolean skipVals, @@ -87,6 +89,7 @@ public interface IgniteTxLocalEx extends IgniteInternalTx { */ public <K, V> IgniteInternalFuture<GridCacheReturn> putAllAsync( GridCacheContext cacheCtx, + @Nullable AffinityTopologyVersion entryTopVer, Map<? extends K, ? extends V> map, boolean retval); @@ -100,6 +103,7 @@ public interface IgniteTxLocalEx extends IgniteInternalTx { */ public <K, V> IgniteInternalFuture<GridCacheReturn> putAsync( GridCacheContext cacheCtx, + @Nullable AffinityTopologyVersion entryTopVer, K key, V val, boolean retval, @@ -114,6 +118,7 @@ public interface IgniteTxLocalEx extends IgniteInternalTx { */ public <K, V> IgniteInternalFuture<GridCacheReturn> invokeAsync( GridCacheContext cacheCtx, + @Nullable AffinityTopologyVersion entryTopVer, K key, EntryProcessor<K, V, Object> entryProcessor, Object... invokeArgs); @@ -126,6 +131,7 @@ public interface IgniteTxLocalEx extends IgniteInternalTx { */ public <K, V, T> IgniteInternalFuture<GridCacheReturn> invokeAsync( GridCacheContext cacheCtx, + @Nullable AffinityTopologyVersion entryTopVer, Map<? extends K, ? extends EntryProcessor<K, V, Object>> map, Object... invokeArgs); @@ -139,6 +145,7 @@ public interface IgniteTxLocalEx extends IgniteInternalTx { */ public <K, V> IgniteInternalFuture<GridCacheReturn> removeAllAsync( GridCacheContext cacheCtx, + @Nullable AffinityTopologyVersion entryTopVer, Collection<? extends K> keys, boolean retval, CacheEntryPredicate filter, @@ -163,11 +170,6 @@ public interface IgniteTxLocalEx extends IgniteInternalTx { Map<KeyCacheObject, GridCacheVersion> drMap); /** - * @return Return value for - */ - public GridCacheReturn implicitSingleResult(); - - /** * Finishes transaction (either commit or rollback). * * @param commit {@code True} if commit, {@code false} if rollback. @@ -188,6 +190,7 @@ public interface IgniteTxLocalEx extends IgniteInternalTx { */ public IgniteInternalFuture<Void> loadMissing( GridCacheContext cacheCtx, + AffinityTopologyVersion topVer, boolean readThrough, boolean async, Collection<KeyCacheObject> keys,
