http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 4b8db00..9f18c98 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -44,6 +44,8 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.concurrent.locks.*; +import static org.apache.ignite.events.EventType.*; +import static org.apache.ignite.internal.events.DiscoveryCustomEvent.*; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; /** @@ -117,8 +119,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT private GridFutureAdapter<Boolean> initFut; /** Topology snapshot. */ - private AtomicReference<GridDiscoveryTopologySnapshot> topSnapshot = - new AtomicReference<>(); + private AtomicReference<GridDiscoveryTopologySnapshot> topSnapshot = new AtomicReference<>(); /** Last committed cache version before next topology version use. */ private AtomicReference<GridCacheVersion> lastVer = new AtomicReference<>(); @@ -146,8 +147,12 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT /** Dynamic cache change requests. */ private Collection<DynamicCacheChangeRequest> reqs; + /** Cache validation results. */ private volatile Map<Integer, Boolean> cacheValidRes; + /** Skip preload flag. */ + private boolean skipPreload; + /** * Dummy future created to trigger reassignments if partition * topology changed while preloading. @@ -200,6 +205,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT * @param cctx Cache context. * @param busyLock Busy lock. * @param exchId Exchange ID. + * @param reqs Cache change requests. */ public GridDhtPartitionsExchangeFuture( GridCacheSharedContext cctx, @@ -221,16 +227,17 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT log = cctx.logger(getClass()); - // Grab all nodes with order of equal or less than last joined node. - oldestNode.set(CU.oldest(cctx, exchId.topologyVersion())); - - assert oldestNode.get() != null; - initFut = new GridFutureAdapter<>(); if (log.isDebugEnabled()) - log.debug("Creating exchange future [localNode=" + cctx.localNodeId() + - ", fut=" + this + ']'); + log.debug("Creating exchange future [localNode=" + cctx.localNodeId() + ", fut=" + this + ']'); + } + + /** + * @param reqs Cache change requests. + */ + public void cacheChangeRequests(Collection<DynamicCacheChangeRequest> reqs) { + this.reqs = reqs; } /** {@inheritDoc} */ @@ -250,6 +257,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } /** + * @return Skip preload flag. + */ + public boolean skipPreload() { + return skipPreload; + } + + /** * @return Dummy flag. */ public boolean dummy() { @@ -279,9 +293,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT /** * @param cacheId Cache ID to check. + * @param topVer Topology version. * @return {@code True} if cache was added during this exchange. */ - public boolean isCacheAdded(int cacheId) { + public boolean isCacheAdded(int cacheId, AffinityTopologyVersion topVer) { if (!F.isEmpty(reqs)) { for (DynamicCacheChangeRequest req : reqs) { if (req.start() && !req.clientStartOnly()) { @@ -291,7 +306,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } } - return false; + GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId); + + return cacheCtx != null && F.eq(cacheCtx.startTopologyVersion(), topVer); } /** @@ -312,7 +329,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } /** - * Rechecks topology. + * @param cacheCtx Cache context. + * @throws IgniteCheckedException If failed. */ private void initTopology(GridCacheContext cacheCtx) throws IgniteCheckedException { if (stopping(cacheCtx.cacheId())) @@ -330,8 +348,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT exchId + ']'); // Fetch affinity assignment from remote node. - GridDhtAssignmentFetchFuture fetchFut = - new GridDhtAssignmentFetchFuture(cacheCtx, exchId.topologyVersion(), CU.affinityNodes(cacheCtx)); + GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cacheCtx, + exchId.topologyVersion(), + CU.affinityNodes(cacheCtx, exchId.topologyVersion())); fetchFut.init(); @@ -341,11 +360,21 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT log.debug("Fetched affinity from remote node, initializing affinity assignment [locNodeId=" + cctx.localNodeId() + ", topVer=" + exchId.topologyVersion() + ']'); + if (affAssignment == null) { + affAssignment = new ArrayList<>(cacheCtx.affinity().partitions()); + + List<ClusterNode> empty = Collections.emptyList(); + + for (int i = 0; i < cacheCtx.affinity().partitions(); i++) + affAssignment.add(empty); + } + cacheCtx.affinity().initializeAffinity(exchId.topologyVersion(), affAssignment); } } /** + * @param cacheCtx Cache context. * @return {@code True} if local node can calculate affinity on it's own for this partition map exchange. */ private boolean canCalculateAffinity(GridCacheContext cacheCtx) { @@ -391,20 +420,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } /** - * @return Exchange id. - */ - GridDhtPartitionExchangeId key() { - return exchId; - } - - /** - * @return Oldest node. - */ - ClusterNode oldestNode() { - return oldestNode.get(); - } - - /** * @return Exchange ID. */ public GridDhtPartitionExchangeId exchangeId() { @@ -412,13 +427,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } /** - * @return Init future. - */ - IgniteInternalFuture<?> initFuture() { - return initFut; - } - - /** * @return {@code true} if entered to busy state. */ private boolean enterBusy() { @@ -444,7 +452,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT * @throws IgniteInterruptedCheckedException If interrupted. */ public void init() throws IgniteInterruptedCheckedException { - assert oldestNode.get() != null; + if (isDone()) + return; if (init.compareAndSet(false, true)) { if (isDone()) @@ -455,10 +464,118 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT // will return corresponding nodes. U.await(evtLatch); + assert discoEvt != null : this; + assert !dummy && !forcePreload : this; + + ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, exchId.topologyVersion()); + + oldestNode.set(oldest); + startCaches(); + // True if client node joined or failed. + boolean clientNodeEvt; + + if (F.isEmpty(reqs)) { + int type = discoEvt.type(); + + assert type == EVT_NODE_JOINED || type == EVT_NODE_LEFT || type == EVT_NODE_FAILED : discoEvt; + + clientNodeEvt = CU.clientNode(discoEvt.eventNode()); + } + else { + assert discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT : discoEvt; + + boolean clientOnlyStart = true; + + for (DynamicCacheChangeRequest req : reqs) { + if (!req.clientStartOnly()) { + clientOnlyStart = false; + + break; + } + } + + clientNodeEvt = clientOnlyStart; + } + + if (clientNodeEvt) { + ClusterNode node = discoEvt.eventNode(); + + // Client need to initialize affinity for local join event or for stated client caches. + if (!node.isLocal()) { + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { + if (cacheCtx.isLocal()) + continue; + + GridDhtPartitionTopology top = cacheCtx.topology(); + + top.updateTopologyVersion(exchId, this, -1, stopping(cacheCtx.cacheId())); + + if (cacheCtx.affinity().affinityTopologyVersion() == AffinityTopologyVersion.NONE) { + initTopology(cacheCtx); + + top.beforeExchange(this); + } + else + cacheCtx.affinity().clientEventTopologyChange(discoEvt, exchId.topologyVersion()); + } + + if (exchId.isLeft()) + cctx.mvcc().removeExplicitNodeLocks(exchId.nodeId(), exchId.topologyVersion()); + + onDone(exchId.topologyVersion()); + + skipPreload = cctx.kernalContext().clientNode(); + + return; + } + } + + if (cctx.kernalContext().clientNode()) { + skipPreload = true; + + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { + if (cacheCtx.isLocal()) + continue; + + GridDhtPartitionTopology top = cacheCtx.topology(); + + top.updateTopologyVersion(exchId, this, -1, stopping(cacheCtx.cacheId())); + } + + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { + if (cacheCtx.isLocal()) + continue; + + initTopology(cacheCtx); + } + + if (oldestNode.get() != null) { + rmtNodes = new ConcurrentLinkedQueue<>(CU.aliveRemoteServerNodesWithCaches(cctx, + exchId.topologyVersion())); + + rmtIds = Collections.unmodifiableSet(new HashSet<>(F.nodeIds(rmtNodes))); + + ready.set(true); + + initFut.onDone(true); + + if (log.isDebugEnabled()) + log.debug("Initialized future: " + this); + + sendPartitions(); + } + else + onDone(exchId.topologyVersion()); + + return; + } + + assert oldestNode.get() != null; + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (isCacheAdded(cacheCtx.cacheId())) { + if (isCacheAdded(cacheCtx.cacheId(), exchId.topologyVersion())) { if (cacheCtx.discovery().cacheAffinityNodes(cacheCtx.name(), topologyVersion()).isEmpty()) U.quietAndWarn(log, "No server nodes found for cache client: " + cacheCtx.namex()); } @@ -468,8 +585,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT List<String> cachesWithoutNodes = null; - for (String name : cctx.cache().cacheNames()) { - if (exchId.isLeft()) { + if (exchId.isLeft()) { + for (String name : cctx.cache().cacheNames()) { if (cctx.discovery().cacheAffinityNodes(name, topologyVersion()).isEmpty()) { if (cachesWithoutNodes == null) cachesWithoutNodes = new ArrayList<>(); @@ -505,7 +622,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } if (cachesWithoutNodes != null) { - StringBuilder sb = new StringBuilder("All server nodes for the following caches have left the cluster: "); + StringBuilder sb = + new StringBuilder("All server nodes for the following caches have left the cluster: "); for (int i = 0; i < cachesWithoutNodes.size(); i++) { String cache = cachesWithoutNodes.get(i); @@ -537,7 +655,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } // Grab all alive remote nodes with order of equal or less than last joined node. - rmtNodes = new ConcurrentLinkedQueue<>(CU.aliveRemoteCacheNodes(cctx, + rmtNodes = new ConcurrentLinkedQueue<>(CU.aliveRemoteServerNodesWithCaches(cctx, exchId.topologyVersion())); rmtIds = Collections.unmodifiableSet(new HashSet<>(F.nodeIds(rmtNodes))); @@ -591,6 +709,28 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (exchId.isLeft()) cctx.mvcc().removeExplicitNodeLocks(exchId.nodeId(), exchId.topologyVersion()); + IgniteInternalFuture<?> locksFut = cctx.mvcc().finishLocks(exchId.topologyVersion()); + + while (true) { + try { + locksFut.get(2 * cctx.gridConfig().getNetworkTimeout(), TimeUnit.MILLISECONDS); + + break; + } + catch (IgniteFutureTimeoutCheckedException ignored) { + U.warn(log, "Failed to wait for locks release future. " + + "Dumping pending objects that might be the cause: " + cctx.localNodeId()); + + U.warn(log, "Locked entries:"); + + Map<IgniteTxKey, Collection<GridCacheMvccCandidate>> locks = + cctx.mvcc().unfinishedLocks(exchId.topologyVersion()); + + for (Map.Entry<IgniteTxKey, Collection<GridCacheMvccCandidate>> e : locks.entrySet()) + U.warn(log, "Locked entry [key=" + e.getKey() + ", mvcc=" + e.getValue() + ']'); + } + } + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (cacheCtx.isLocal()) continue; @@ -650,36 +790,25 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (log.isDebugEnabled()) log.debug("Initialized future: " + this); - if (canSkipExchange()) - onDone(exchId.topologyVersion()); + // If this node is not oldest. + if (!oldestNode.get().id().equals(cctx.localNodeId())) + sendPartitions(); else { - // If this node is not oldest. - if (!oldestNode.get().id().equals(cctx.localNodeId())) - sendPartitions(); - else { - boolean allReceived = allReceived(); + boolean allReceived = allReceived(); - if (allReceived && replied.compareAndSet(false, true)) { - if (spreadPartitions()) - onDone(exchId.topologyVersion()); - } + if (allReceived && replied.compareAndSet(false, true)) { + if (spreadPartitions()) + onDone(exchId.topologyVersion()); } - - scheduleRecheck(); } + + scheduleRecheck(); } else assert false : "Skipped init future: " + this; } /** - * @return {@code True} if no distributed exchange is needed. - */ - private boolean canSkipExchange() { - return false; // TODO ignite-23; - } - - /** * */ private void dumpPendingObjects() { @@ -755,7 +884,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT * @throws IgniteCheckedException If failed. */ private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) throws IgniteCheckedException { - GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id, cctx.versions().last()); + GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id, + cctx.kernalContext().clientNode(), + cctx.versions().last()); for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (!cacheCtx.isLocal()) @@ -780,8 +911,14 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT id.topologyVersion()); for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (!cacheCtx.isLocal()) - m.addFullPartitionsMap(cacheCtx.cacheId(), cacheCtx.topology().partitionMap(true)); + if (!cacheCtx.isLocal()) { + AffinityTopologyVersion startTopVer = cacheCtx.startTopologyVersion(); + + boolean ready = startTopVer == null || startTopVer.compareTo(id.topologyVersion()) <= 0; + + if (ready) + m.addFullPartitionsMap(cacheCtx.cacheId(), cacheCtx.topology().partitionMap(true)); + } } // It is important that client topologies be added after contexts. @@ -839,14 +976,18 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT /** {@inheritDoc} */ @Override public boolean onDone(AffinityTopologyVersion res, Throwable err) { - Map<Integer, Boolean> m = new HashMap<>(); + Map<Integer, Boolean> m = null; for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (cacheCtx.config().getTopologyValidator() != null && !CU.isSystemCache(cacheCtx.name())) + if (cacheCtx.config().getTopologyValidator() != null && !CU.isSystemCache(cacheCtx.name())) { + if (m == null) + m = new HashMap<>(); + m.put(cacheCtx.cacheId(), cacheCtx.config().getTopologyValidator().validate(discoEvt.topologyNodes())); + } } - cacheValidRes = m; + cacheValidRes = m != null ? m : Collections.<Integer, Boolean>emptyMap(); cctx.cache().onExchangeDone(exchId.topologyVersion(), reqs, err); @@ -864,8 +1005,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (timeoutObj != null) cctx.kernalContext().timeout().removeTimeoutObject(timeoutObj); - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (exchId.event() == EventType.EVT_NODE_FAILED || exchId.event() == EventType.EVT_NODE_LEFT) + if (exchId.isLeft()) { + for (GridCacheContext cacheCtx : cctx.cacheContexts()) cacheCtx.config().getAffinity().removeNode(exchId.nodeId()); } @@ -1018,39 +1159,39 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT return; } - ClusterNode curOldest = oldestNode.get(); + if (log.isDebugEnabled()) + log.debug("Received full partition map from node [nodeId=" + nodeId + ", msg=" + msg + ']'); - if (!nodeId.equals(curOldest.id())) { - if (log.isDebugEnabled()) - log.debug("Received full partition map from unexpected node [oldest=" + curOldest.id() + - ", unexpectedNodeId=" + nodeId + ']'); + assert exchId.topologyVersion().equals(msg.topologyVersion()); - ClusterNode sender = cctx.discovery().node(nodeId); + initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() { + @Override public void apply(IgniteInternalFuture<Boolean> t) { + ClusterNode curOldest = oldestNode.get(); - if (sender == null) { - if (log.isDebugEnabled()) - log.debug("Sender node left grid, will ignore message from unexpected node [nodeId=" + nodeId + - ", exchId=" + msg.exchangeId() + ']'); + if (!nodeId.equals(curOldest.id())) { + if (log.isDebugEnabled()) + log.debug("Received full partition map from unexpected node [oldest=" + curOldest.id() + + ", unexpectedNodeId=" + nodeId + ']'); - return; - } + ClusterNode snd = cctx.discovery().node(nodeId); - // Will process message later if sender node becomes oldest node. - if (sender.order() > curOldest.order()) - fullMsgs.put(nodeId, msg); + if (snd == null) { + if (log.isDebugEnabled()) + log.debug("Sender node left grid, will ignore message from unexpected node [nodeId=" + nodeId + + ", exchId=" + msg.exchangeId() + ']'); - return; - } + return; + } - assert msg.exchangeId().equals(exchId); + // Will process message later if sender node becomes oldest node. + if (snd.order() > curOldest.order()) + fullMsgs.put(nodeId, msg); - if (log.isDebugEnabled()) - log.debug("Received full partition map from node [nodeId=" + nodeId + ", msg=" + msg + ']'); + return; + } - assert exchId.topologyVersion().equals(msg.topologyVersion()); + assert msg.exchangeId().equals(exchId); - initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() { - @Override public void apply(IgniteInternalFuture<Boolean> t) { assert msg.lastVersion() != null; cctx.versions().onReceived(nodeId, msg.lastVersion()); @@ -1075,8 +1216,12 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (cacheCtx != null) cacheCtx.topology().update(exchId, entry.getValue()); - else if (CU.oldest(cctx).isLocal()) - cctx.exchange().clientTopology(cacheId, this).update(exchId, entry.getValue()); + else { + ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, AffinityTopologyVersion.NONE); + + if (oldest != null && oldest.isLocal()) + cctx.exchange().clientTopology(cacheId, this).update(exchId, entry.getValue()); + } } } @@ -1135,40 +1280,42 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT boolean set = false; - ClusterNode newOldest = CU.oldest(cctx, exchId.topologyVersion()); - - // If local node is now oldest. - if (newOldest.id().equals(cctx.localNodeId())) { - synchronized (mux) { - if (oldestNode.compareAndSet(oldest, newOldest)) { - // If local node is just joining. - if (exchId.nodeId().equals(cctx.localNodeId())) { - try { - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (!cacheCtx.isLocal()) - cacheCtx.topology().beforeExchange( - GridDhtPartitionsExchangeFuture.this); + ClusterNode newOldest = CU.oldestAliveCacheServerNode(cctx, exchId.topologyVersion()); + + if (newOldest != null) { + // If local node is now oldest. + if (newOldest.id().equals(cctx.localNodeId())) { + synchronized (mux) { + if (oldestNode.compareAndSet(oldest, newOldest)) { + // If local node is just joining. + if (exchId.nodeId().equals(cctx.localNodeId())) { + try { + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { + if (!cacheCtx.isLocal()) + cacheCtx.topology().beforeExchange( + GridDhtPartitionsExchangeFuture.this); + } } - } - catch (IgniteCheckedException e) { - onDone(e); + catch (IgniteCheckedException e) { + onDone(e); - return; + return; + } } - } - set = true; + set = true; + } } } - } - else { - synchronized (mux) { - set = oldestNode.compareAndSet(oldest, newOldest); - } + else { + synchronized (mux) { + set = oldestNode.compareAndSet(oldest, newOldest); + } - if (set && log.isDebugEnabled()) - log.debug("Reassigned oldest node [this=" + cctx.localNodeId() + - ", old=" + oldest.id() + ", new=" + newOldest.id() + ']'); + if (set && log.isDebugEnabled()) + log.debug("Reassigned oldest node [this=" + cctx.localNodeId() + + ", old=" + oldest.id() + ", new=" + newOldest.id() + ']'); + } } if (set) { @@ -1190,9 +1337,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT assert rmtNodes != null; - for (Iterator<ClusterNode> it = rmtNodes.iterator(); it.hasNext(); ) + for (Iterator<ClusterNode> it = rmtNodes.iterator(); it.hasNext(); ) { if (it.next().id().equals(nodeId)) it.remove(); + } if (allReceived() && ready.get() && replied.compareAndSet(false, true)) if (spreadPartitions()) @@ -1254,30 +1402,34 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT GridTimeoutObject timeoutObj = new GridTimeoutObjectAdapter( cctx.gridConfig().getNetworkTimeout() * Math.max(1, cctx.gridConfig().getCacheConfiguration().length)) { @Override public void onTimeout() { - if (isDone()) - return; - - if (!enterBusy()) - return; + cctx.kernalContext().closure().runLocalSafe(new Runnable() { + @Override public void run() { + if (isDone()) + return; + + if (!enterBusy()) + return; + + try { + U.warn(log, + "Retrying preload partition exchange due to timeout [done=" + isDone() + + ", dummy=" + dummy + ", exchId=" + exchId + ", rcvdIds=" + F.id8s(rcvdIds) + + ", rmtIds=" + F.id8s(rmtIds) + ", remaining=" + F.id8s(remaining()) + + ", init=" + init + ", initFut=" + initFut.isDone() + + ", ready=" + ready + ", replied=" + replied + ", added=" + added + + ", oldest=" + U.id8(oldestNode.get().id()) + ", oldestOrder=" + + oldestNode.get().order() + ", evtLatch=" + evtLatch.getCount() + + ", locNodeOrder=" + cctx.localNode().order() + + ", locNodeId=" + cctx.localNode().id() + ']', + "Retrying preload partition exchange due to timeout."); - try { - U.warn(log, - "Retrying preload partition exchange due to timeout [done=" + isDone() + - ", dummy=" + dummy + ", exchId=" + exchId + ", rcvdIds=" + F.id8s(rcvdIds) + - ", rmtIds=" + F.id8s(rmtIds) + ", remaining=" + F.id8s(remaining()) + - ", init=" + init + ", initFut=" + initFut.isDone() + - ", ready=" + ready + ", replied=" + replied + ", added=" + added + - ", oldest=" + U.id8(oldestNode.get().id()) + ", oldestOrder=" + - oldestNode.get().order() + ", evtLatch=" + evtLatch.getCount() + - ", locNodeOrder=" + cctx.localNode().order() + - ", locNodeId=" + cctx.localNode().id() + ']', - "Retrying preload partition exchange due to timeout."); - - recheck(); - } - finally { - leaveBusy(); - } + recheck(); + } + finally { + leaveBusy(); + } + } + }); } };
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index 8256274..73794ae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -59,8 +59,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa /** * @param id Exchange ID. * @param lastVer Last version. + * @param topVer Topology version. */ - public GridDhtPartitionsFullMessage(@Nullable GridDhtPartitionExchangeId id, @Nullable GridCacheVersion lastVer, + public GridDhtPartitionsFullMessage(@Nullable GridDhtPartitionExchangeId id, + @Nullable GridCacheVersion lastVer, @NotNull AffinityTopologyVersion topVer) { super(id, lastVer); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index 66140cd..713a80b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@ -45,6 +45,9 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes /** Serialized partitions. */ private byte[] partsBytes; + /** */ + private boolean client; + /** * Required by {@link Externalizable}. */ @@ -54,10 +57,22 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes /** * @param exchId Exchange ID. + * @param client Client message flag. * @param lastVer Last version. */ - public GridDhtPartitionsSingleMessage(GridDhtPartitionExchangeId exchId, @Nullable GridCacheVersion lastVer) { + public GridDhtPartitionsSingleMessage(GridDhtPartitionExchangeId exchId, + boolean client, + @Nullable GridCacheVersion lastVer) { super(exchId, lastVer); + + this.client = client; + } + + /** + * @return {@code True} if sent from client node. + */ + public boolean client() { + return client; } /** @@ -110,6 +125,12 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes switch (writer.state()) { case 5: + if (!writer.writeBoolean("client", client)) + return false; + + writer.incrementState(); + + case 6: if (!writer.writeByteArray("partsBytes", partsBytes)) return false; @@ -132,6 +153,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes switch (reader.state()) { case 5: + client = reader.readBoolean("client"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 6: partsBytes = reader.readByteArray("partsBytes"); if (!reader.isLastRead()) @@ -151,7 +180,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 6; + return 7; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index d6373f0..51010ce 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; import org.apache.ignite.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; @@ -46,7 +47,7 @@ import static org.apache.ignite.internal.util.GridConcurrentFactory.*; /** * DHT cache preloader. */ -public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { +public class GridDhtPreloader extends GridCachePreloaderAdapter { /** Default preload resend timeout. */ public static final long DFLT_PRELOAD_RESEND_TIMEOUT = 1500; @@ -57,13 +58,13 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { private final GridAtomicLong topVer = new GridAtomicLong(); /** Force key futures. */ - private final ConcurrentMap<IgniteUuid, GridDhtForceKeysFuture<K, V>> forceKeyFuts = newMap(); + private final ConcurrentMap<IgniteUuid, GridDhtForceKeysFuture<?, ?>> forceKeyFuts = newMap(); /** Partition suppliers. */ - private GridDhtPartitionSupplyPool<K, V> supplyPool; + private GridDhtPartitionSupplyPool supplyPool; /** Partition demanders. */ - private GridDhtPartitionDemandPool<K, V> demandPool; + private GridDhtPartitionDemandPool demandPool; /** Start future. */ private final GridFutureAdapter<Object> startFut; @@ -92,7 +93,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { assert !loc.id().equals(n.id()); - for (GridDhtForceKeysFuture<K, V> f : forceKeyFuts.values()) + for (GridDhtForceKeysFuture<?, ?> f : forceKeyFuts.values()) f.onDiscoveryEvent(e); assert e.type() != EVT_NODE_JOINED || n.order() > loc.order() : "Node joined with smaller-than-local " + @@ -117,7 +118,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { /** * @param cctx Cache context. */ - public GridDhtPreloader(GridCacheContext<K, V> cctx) { + public GridDhtPreloader(GridCacheContext<?, ?> cctx) { super(cctx); top = cctx.dht().topology(); @@ -158,8 +159,8 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { } }); - supplyPool = new GridDhtPartitionSupplyPool<>(cctx, busyLock); - demandPool = new GridDhtPartitionDemandPool<>(cctx, busyLock); + supplyPool = new GridDhtPartitionSupplyPool(cctx, busyLock); + demandPool = new GridDhtPartitionDemandPool(cctx, busyLock); cctx.events().addListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED); } @@ -227,12 +228,14 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { final long start = U.currentTimeMillis(); - if (cctx.config().getRebalanceDelay() >= 0) { - U.log(log, "Starting rebalancing in " + cctx.config().getRebalanceMode() + " mode: " + cctx.name()); + final CacheConfiguration cfg = cctx.config(); + + if (cfg.getRebalanceDelay() >= 0) { + U.log(log, "Starting rebalancing in " + cfg.getRebalanceMode() + " mode: " + cctx.name()); demandPool.syncFuture().listen(new CI1<Object>() { @Override public void apply(Object t) { - U.log(log, "Completed rebalancing in " + cctx.config().getRebalanceMode() + " mode " + + U.log(log, "Completed rebalancing in " + cfg.getRebalanceMode() + " mode " + "[cache=" + cctx.name() + ", time=" + (U.currentTimeMillis() - start) + " ms]"); } }); @@ -253,12 +256,12 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { } /** {@inheritDoc} */ - @Override public GridDhtPreloaderAssignments<K, V> assign(GridDhtPartitionsExchangeFuture exchFut) { + @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) { return demandPool.assign(exchFut); } /** {@inheritDoc} */ - @Override public void addAssignments(GridDhtPreloaderAssignments<K, V> assignments, boolean forcePreload) { + @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) { demandPool.addAssignments(assignments, forcePreload); } @@ -271,7 +274,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> syncFuture() { - return demandPool.syncFuture(); + return cctx.kernalContext().clientNode() ? startFut : demandPool.syncFuture(); } /** @@ -406,7 +409,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { return; try { - GridDhtForceKeysFuture<K, V> f = forceKeyFuts.get(msg.futureId()); + GridDhtForceKeysFuture<?, ?> f = forceKeyFuts.get(msg.futureId()); if (f != null) f.onResult(node.id(), msg); @@ -491,7 +494,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { */ @SuppressWarnings( {"unchecked", "RedundantCast"}) @Override public GridDhtFuture<Object> request(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) { - final GridDhtForceKeysFuture<K, V> fut = new GridDhtForceKeysFuture<>(cctx, topVer, keys, this); + final GridDhtForceKeysFuture<?, ?> fut = new GridDhtForceKeysFuture<>(cctx, topVer, keys, this); IgniteInternalFuture<?> topReadyFut = cctx.affinity().affinityReadyFuturex(topVer); @@ -543,7 +546,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { * * @param fut Future to add. */ - void addFuture(GridDhtForceKeysFuture<K, V> fut) { + void addFuture(GridDhtForceKeysFuture<?, ?> fut) { forceKeyFuts.put(fut.futureId(), fut); } @@ -552,7 +555,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> { * * @param fut Future to remove. */ - void remoteFuture(GridDhtForceKeysFuture<K, V> fut) { + void remoteFuture(GridDhtForceKeysFuture<?, ?> fut) { forceKeyFuts.remove(fut.futureId(), fut); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java index 369fc68..2f6ef6f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java @@ -27,8 +27,7 @@ import java.util.concurrent.*; /** * Partition to node assignments. */ -public class GridDhtPreloaderAssignments<K, V> extends - ConcurrentHashMap<ClusterNode, GridDhtPartitionDemandMessage> { +public class GridDhtPreloaderAssignments extends ConcurrentHashMap<ClusterNode, GridDhtPartitionDemandMessage> { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index ba3357d..041f83a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -433,6 +433,11 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { } /** {@inheritDoc} */ + @Nullable @Override public V tryPutIfAbsent(K key, V val) throws IgniteCheckedException { + return dht.tryPutIfAbsent(key, val); + } + + /** {@inheritDoc} */ @Override public V getAndReplace(K key, V val) throws IgniteCheckedException { return dht.getAndReplace(key, val); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java index 8258b14..351d6cd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java @@ -95,7 +95,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda } /** {@inheritDoc} */ - @Override public GridCachePreloader<K, V> preloader() { + @Override public GridCachePreloader preloader() { return dht().preloader(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index fc178e3..74438bb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -274,7 +274,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma if (affNodes.isEmpty()) { assert !cctx.affinityNode(); - onDone(new ClusterTopologyCheckedException("Failed to map keys for near-only cache (all partition " + + onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for near-only cache (all partition " + "nodes left the grid).")); return; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index 0ffb4e5..3d28018 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -45,7 +45,7 @@ import static org.apache.ignite.events.EventType.*; /** * Cache lock future. */ -public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<Boolean> +public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean> implements GridCacheMvccFuture<Boolean> { /** */ private static final long serialVersionUID = 0L; @@ -58,7 +58,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B /** Cache registry. */ @GridToStringExclude - private GridCacheContext<K, V> cctx; + private GridCacheContext<?, ?> cctx; /** Lock owner thread. */ @GridToStringInclude @@ -135,7 +135,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B * @param skipStore skipStore */ public GridNearLockFuture( - GridCacheContext<K, V> cctx, + GridCacheContext<?, ?> cctx, Collection<KeyCacheObject> keys, @Nullable GridNearTxLocal tx, boolean read, @@ -184,15 +184,14 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B * @return Participating nodes. */ @Override public Collection<? extends ClusterNode> nodes() { - return - F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() { - @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) { - if (isMini(f)) - return ((MiniFuture)f).node(); + return F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() { + @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) { + if (isMini(f)) + return ((MiniFuture)f).node(); - return cctx.discovery().localNode(); - } - }); + return cctx.discovery().localNode(); + } + }); } /** {@inheritDoc} */ @@ -350,13 +349,14 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B * Undoes all locks. * * @param dist If {@code true}, then remove locks from remote nodes as well. + * @param rollback {@code True} if should rollback tx. */ - private void undoLocks(boolean dist) { + private void undoLocks(boolean dist, boolean rollback) { // Transactions will undo during rollback. if (dist && tx == null) cctx.nearTx().removeLocks(lockVer, keys); else { - if (tx != null) { + if (rollback && tx != null) { if (tx.setRollbackOnly()) { if (log.isDebugEnabled()) log.debug("Marked transaction as rollback only because locks could not be acquired: " + tx); @@ -397,7 +397,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B * @param dist {@code True} if need to distribute lock release. */ private void onFailed(boolean dist) { - undoLocks(dist); + undoLocks(dist, true); complete(false); } @@ -607,7 +607,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B ", fut=" + this + ']'); if (!success) - undoLocks(distribute); + undoLocks(distribute, true); if (tx != null) cctx.tm().txContext(tx); @@ -682,7 +682,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B // Continue mapping on the same topology version as it was before. this.topVer.compareAndSet(null, topVer); - map(keys); + map(keys, false); markInitialized(); @@ -690,14 +690,16 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B } // Must get topology snapshot and map on that version. - mapOnTopology(); + mapOnTopology(false); } /** * Acquires topology future and checks it completeness under the read lock. If it is not complete, * will asynchronously wait for it's completeness and then try again. + * + * @param remap Remap flag. */ - void mapOnTopology() { + void mapOnTopology(final boolean remap) { // We must acquire topology snapshot from the topology version future. cctx.topology().readLock(); @@ -721,19 +723,27 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B AffinityTopologyVersion topVer = fut.topologyVersion(); - if (tx != null) - tx.topologyVersion(topVer); + if (remap) { + if (tx != null) + tx.onRemap(topVer); - this.topVer.compareAndSet(null, topVer); + this.topVer.set(topVer); + } + else { + if (tx != null) + tx.topologyVersion(topVer); + + this.topVer.compareAndSet(null, topVer); + } - map(keys); + map(keys, remap); markInitialized(); } else { fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { - mapOnTopology(); + mapOnTopology(remap); } }); } @@ -749,14 +759,15 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B * groups belonging to one primary node and locks for these groups are acquired sequentially. * * @param keys Keys. + * @param remap Remap flag. */ - private void map(Iterable<KeyCacheObject> keys) { + private void map(Iterable<KeyCacheObject> keys, boolean remap) { try { AffinityTopologyVersion topVer = this.topVer.get(); assert topVer != null; - assert topVer.topologyVersion() > 0; + assert topVer.topologyVersion() > 0 : topVer; if (CU.affinityNodes(cctx, topVer).isEmpty()) { onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for near-only cache (all " + @@ -765,8 +776,11 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B return; } - ConcurrentLinkedDeque8<GridNearLockMapping> mappings = - new ConcurrentLinkedDeque8<>(); + boolean clientNode = cctx.kernalContext().clientNode(); + + assert !remap || (clientNode && (tx == null || !tx.hasRemoteLocks())); + + ConcurrentLinkedDeque8<GridNearLockMapping> mappings = new ConcurrentLinkedDeque8<>(); // Assign keys to primary nodes. GridNearLockMapping map = null; @@ -795,6 +809,8 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B if (log.isDebugEnabled()) log.debug("Starting (re)map for mappings [mappings=" + mappings + ", fut=" + this + ']'); + boolean first = true; + // Create mini futures. for (Iterator<GridNearLockMapping> iter = mappings.iterator(); iter.hasNext(); ) { GridNearLockMapping mapping = iter.next(); @@ -872,6 +888,14 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B if (!cand.reentry()) { if (req == null) { + boolean clientFirst = false; + + if (first) { + clientFirst = clientNode && (tx == null || !tx.hasRemoteLocks()); + + first = false; + } + req = new GridNearLockRequest( cctx.cacheId(), topVer, @@ -893,7 +917,8 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B inTx() ? tx.subjectId() : null, inTx() ? tx.taskNameHash() : 0, read ? accessTtl : -1L, - skipStore); + skipStore, + clientFirst); mapping.request(req); } @@ -1197,7 +1222,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B /** * @return DHT cache. */ - private GridDhtTransactionalCacheAdapter<K, V> dht() { + private GridDhtTransactionalCacheAdapter<?, ?> dht() { return cctx.nearTx().dht(); } @@ -1356,110 +1381,146 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B return; } - int i = 0; + if (res.clientRemapVersion() != null) { + assert cctx.kernalContext().clientNode(); - AffinityTopologyVersion topVer = GridNearLockFuture.this.topVer.get(); + IgniteInternalFuture<?> affFut = + cctx.shared().exchange().affinityReadyFuture(res.clientRemapVersion()); - for (KeyCacheObject k : keys) { - while (true) { - GridNearCacheEntry entry = cctx.near().entryExx(k, topVer); + if (affFut != null && !affFut.isDone()) { + affFut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> fut) { + remap(); + } + }); + } + else + remap(); + } + else { + int i = 0; - try { - if (res.dhtVersion(i) == null) { - onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " + - "(will fail the lock): " + res)); + AffinityTopologyVersion topVer = GridNearLockFuture.this.topVer.get(); - return; - } + for (KeyCacheObject k : keys) { + while (true) { + GridNearCacheEntry entry = cctx.near().entryExx(k, topVer); - IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(entry.key()); + try { + if (res.dhtVersion(i) == null) { + onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " + + "(will fail the lock): " + res)); - CacheObject oldVal = entry.rawGet(); - boolean hasOldVal = false; - CacheObject newVal = res.value(i); + return; + } - boolean readRecordable = false; + IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(entry.key()); - if (retval) { - readRecordable = cctx.events().isRecordable(EVT_CACHE_OBJECT_READ); + CacheObject oldVal = entry.rawGet(); + boolean hasOldVal = false; + CacheObject newVal = res.value(i); - if (readRecordable) - hasOldVal = entry.hasValue(); - } + boolean readRecordable = false; - GridCacheVersion dhtVer = res.dhtVersion(i); - GridCacheVersion mappedVer = res.mappedVersion(i); + if (retval) { + readRecordable = cctx.events().isRecordable(EVT_CACHE_OBJECT_READ); + + if (readRecordable) + hasOldVal = entry.hasValue(); + } - if (newVal == null) { - if (oldValTup != null) { - if (oldValTup.get1().equals(dhtVer)) - newVal = oldValTup.get2(); + GridCacheVersion dhtVer = res.dhtVersion(i); + GridCacheVersion mappedVer = res.mappedVersion(i); - oldVal = oldValTup.get2(); + if (newVal == null) { + if (oldValTup != null) { + if (oldValTup.get1().equals(dhtVer)) + newVal = oldValTup.get2(); + + oldVal = oldValTup.get2(); + } } - } - // Lock is held at this point, so we can set the - // returned value if any. - entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id(), topVer); + // Lock is held at this point, so we can set the + // returned value if any. + entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id(), topVer); - if (inTx() && implicitTx() && tx.onePhaseCommit()) { - boolean pass = res.filterResult(i); + if (inTx()) { + tx.hasRemoteLocks(true); - tx.entry(cctx.txKey(k)).filters(pass ? CU.empty0() : CU.alwaysFalse0Arr()); - } + if (implicitTx() && tx.onePhaseCommit()) { + boolean pass = res.filterResult(i); - entry.readyNearLock(lockVer, mappedVer, res.committedVersions(), res.rolledbackVersions(), - res.pending()); - - if (retval) { - if (readRecordable) - cctx.events().addEvent( - entry.partition(), - entry.key(), - tx, - null, - EVT_CACHE_OBJECT_READ, - newVal, - newVal != null, - oldVal, - hasOldVal, - CU.subjectId(tx, cctx.shared()), - null, - inTx() ? tx.resolveTaskName() : null); - - if (cctx.cache().configuration().isStatisticsEnabled()) - cctx.cache().metrics0().onRead(false); - } + tx.entry(cctx.txKey(k)).filters(pass ? CU.empty0() : CU.alwaysFalse0Arr()); + } + } - if (log.isDebugEnabled()) - log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']'); + entry.readyNearLock(lockVer, + mappedVer, + res.committedVersions(), + res.rolledbackVersions(), + res.pending()); + + if (retval) { + if (readRecordable) + cctx.events().addEvent( + entry.partition(), + entry.key(), + tx, + null, + EVT_CACHE_OBJECT_READ, + newVal, + newVal != null, + oldVal, + hasOldVal, + CU.subjectId(tx, cctx.shared()), + null, + inTx() ? tx.resolveTaskName() : null); + + if (cctx.cache().configuration().isStatisticsEnabled()) + cctx.cache().metrics0().onRead(false); + } - break; // Inner while loop. - } - catch (GridCacheEntryRemovedException ignored) { - if (log.isDebugEnabled()) - log.debug("Failed to add candidates because entry was removed (will renew)."); + if (log.isDebugEnabled()) + log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']'); - // Replace old entry with new one. - entries.set(i, (GridDistributedCacheEntry)cctx.cache().entryEx(entry.key())); + break; // Inner while loop. + } + catch (GridCacheEntryRemovedException ignored) { + if (log.isDebugEnabled()) + log.debug("Failed to add candidates because entry was removed (will renew)."); + + // Replace old entry with new one. + entries.set(i, (GridDistributedCacheEntry)cctx.cache().entryEx(entry.key())); + } } + + i++; } - i++; - } + try { + proceedMapping(mappings); + } + catch (IgniteCheckedException e) { + onDone(e); + } - try { - proceedMapping(mappings); - } - catch (IgniteCheckedException e) { - onDone(e); + onDone(true); } - - onDone(true); } } + /** + * + */ + private void remap() { + undoLocks(false, false); + + mapOnTopology(true); + + onDone(true); + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(MiniFuture.class, this, "node", node.id(), "super", super.toString()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java index e71dd65..81184a9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java @@ -80,6 +80,9 @@ public class GridNearLockRequest extends GridDistributedLockRequest { /** Flag indicating whether cache operation requires a previous value. */ private boolean retVal; + /** {@code True} if first lock request for lock operation sent from client node. */ + private boolean firstClientReq; + /** * Empty constructor required for {@link Externalizable}. */ @@ -98,6 +101,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { * @param implicitTx Flag to indicate that transaction is implicit. * @param implicitSingleTx Implicit-transaction-with-one-key flag. * @param isRead Indicates whether implicit lock is for read or write operation. + * @param retVal Return value flag. * @param isolation Transaction isolation. * @param isInvalidate Invalidation flag. * @param timeout Lock timeout. @@ -108,6 +112,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { * @param taskNameHash Task name hash code. * @param accessTtl TTL for read operation. * @param skipStore Skip store flag. + * @param firstClientReq {@code True} if first lock request for lock operation sent from client node. */ public GridNearLockRequest( int cacheId, @@ -130,7 +135,8 @@ public class GridNearLockRequest extends GridDistributedLockRequest { @Nullable UUID subjId, int taskNameHash, long accessTtl, - boolean skipStore + boolean skipStore, + boolean firstClientReq ) { super( cacheId, @@ -158,11 +164,19 @@ public class GridNearLockRequest extends GridDistributedLockRequest { this.taskNameHash = taskNameHash; this.accessTtl = accessTtl; this.retVal = retVal; + this.firstClientReq = firstClientReq; dhtVers = new GridCacheVersion[keyCnt]; } /** + * @return {@code True} if first lock request for lock operation sent from client node. + */ + public boolean firstClientRequest() { + return firstClientReq; + } + + /** * @return Topology version. */ @Override public AffinityTopologyVersion topologyVersion() { @@ -368,60 +382,66 @@ public class GridNearLockRequest extends GridDistributedLockRequest { writer.incrementState(); case 24: - if (!writer.writeBoolean("hasTransforms", hasTransforms)) + if (!writer.writeBoolean("firstClientReq", firstClientReq)) return false; writer.incrementState(); case 25: - if (!writer.writeBoolean("implicitSingleTx", implicitSingleTx)) + if (!writer.writeBoolean("hasTransforms", hasTransforms)) return false; writer.incrementState(); case 26: - if (!writer.writeBoolean("implicitTx", implicitTx)) + if (!writer.writeBoolean("implicitSingleTx", implicitSingleTx)) return false; writer.incrementState(); case 27: - if (!writer.writeIgniteUuid("miniId", miniId)) + if (!writer.writeBoolean("implicitTx", implicitTx)) return false; writer.incrementState(); case 28: - if (!writer.writeBoolean("onePhaseCommit", onePhaseCommit)) + if (!writer.writeIgniteUuid("miniId", miniId)) return false; writer.incrementState(); case 29: - if (!writer.writeBoolean("retVal", retVal)) + if (!writer.writeBoolean("onePhaseCommit", onePhaseCommit)) return false; writer.incrementState(); case 30: - if (!writer.writeUuid("subjId", subjId)) + if (!writer.writeBoolean("retVal", retVal)) return false; writer.incrementState(); case 31: - if (!writer.writeBoolean("syncCommit", syncCommit)) + if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); case 32: - if (!writer.writeInt("taskNameHash", taskNameHash)) + if (!writer.writeBoolean("syncCommit", syncCommit)) return false; writer.incrementState(); case 33: + if (!writer.writeInt("taskNameHash", taskNameHash)) + return false; + + writer.incrementState(); + + case 34: if (!writer.writeMessage("topVer", topVer)) return false; @@ -468,7 +488,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); case 24: - hasTransforms = reader.readBoolean("hasTransforms"); + firstClientReq = reader.readBoolean("firstClientReq"); if (!reader.isLastRead()) return false; @@ -476,7 +496,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); case 25: - implicitSingleTx = reader.readBoolean("implicitSingleTx"); + hasTransforms = reader.readBoolean("hasTransforms"); if (!reader.isLastRead()) return false; @@ -484,7 +504,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); case 26: - implicitTx = reader.readBoolean("implicitTx"); + implicitSingleTx = reader.readBoolean("implicitSingleTx"); if (!reader.isLastRead()) return false; @@ -492,7 +512,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); case 27: - miniId = reader.readIgniteUuid("miniId"); + implicitTx = reader.readBoolean("implicitTx"); if (!reader.isLastRead()) return false; @@ -500,7 +520,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); case 28: - onePhaseCommit = reader.readBoolean("onePhaseCommit"); + miniId = reader.readIgniteUuid("miniId"); if (!reader.isLastRead()) return false; @@ -508,7 +528,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); case 29: - retVal = reader.readBoolean("retVal"); + onePhaseCommit = reader.readBoolean("onePhaseCommit"); if (!reader.isLastRead()) return false; @@ -516,7 +536,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); case 30: - subjId = reader.readUuid("subjId"); + retVal = reader.readBoolean("retVal"); if (!reader.isLastRead()) return false; @@ -524,7 +544,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); case 31: - syncCommit = reader.readBoolean("syncCommit"); + subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) return false; @@ -532,7 +552,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); case 32: - taskNameHash = reader.readInt("taskNameHash"); + syncCommit = reader.readBoolean("syncCommit"); if (!reader.isLastRead()) return false; @@ -540,6 +560,14 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); case 33: + taskNameHash = reader.readInt("taskNameHash"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 34: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -559,7 +587,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 34; + return 35; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java index 20928de..f324198 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import org.apache.ignite.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.version.*; @@ -58,6 +59,9 @@ public class GridNearLockResponse extends GridDistributedLockResponse { /** Filter evaluation results for fast-commit transactions. */ private boolean[] filterRes; + /** {@code True} if client node should remap lock request. */ + private AffinityTopologyVersion clientRemapVer; + /** * Empty constructor (required by {@link Externalizable}). */ @@ -73,6 +77,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse { * @param filterRes {@code True} if need to allocate array for filter evaluation results. * @param cnt Count. * @param err Error. + * @param clientRemapVer {@code True} if client node should remap lock request. */ public GridNearLockResponse( int cacheId, @@ -81,13 +86,15 @@ public class GridNearLockResponse extends GridDistributedLockResponse { IgniteUuid miniId, boolean filterRes, int cnt, - Throwable err + Throwable err, + AffinityTopologyVersion clientRemapVer ) { super(cacheId, lockVer, futId, cnt, err); assert miniId != null; this.miniId = miniId; + this.clientRemapVer = clientRemapVer; dhtVers = new GridCacheVersion[cnt]; mappedVers = new GridCacheVersion[cnt]; @@ -97,6 +104,13 @@ public class GridNearLockResponse extends GridDistributedLockResponse { } /** + * @return {@code True} if client node should remap lock request. + */ + @Nullable public AffinityTopologyVersion clientRemapVersion() { + return clientRemapVer; + } + + /** * Gets pending versions that are less than {@link #version()}. * * @return Pending versions. @@ -192,30 +206,36 @@ public class GridNearLockResponse extends GridDistributedLockResponse { switch (writer.state()) { case 11: - if (!writer.writeObjectArray("dhtVers", dhtVers, MessageCollectionItemType.MSG)) + if (!writer.writeMessage("clientRemapVer", clientRemapVer)) return false; writer.incrementState(); case 12: - if (!writer.writeBooleanArray("filterRes", filterRes)) + if (!writer.writeObjectArray("dhtVers", dhtVers, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 13: - if (!writer.writeObjectArray("mappedVers", mappedVers, MessageCollectionItemType.MSG)) + if (!writer.writeBooleanArray("filterRes", filterRes)) return false; writer.incrementState(); case 14: - if (!writer.writeIgniteUuid("miniId", miniId)) + if (!writer.writeObjectArray("mappedVers", mappedVers, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 15: + if (!writer.writeIgniteUuid("miniId", miniId)) + return false; + + writer.incrementState(); + + case 16: if (!writer.writeCollection("pending", pending, MessageCollectionItemType.MSG)) return false; @@ -238,7 +258,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse { switch (reader.state()) { case 11: - dhtVers = reader.readObjectArray("dhtVers", MessageCollectionItemType.MSG, GridCacheVersion.class); + clientRemapVer = reader.readMessage("clientRemapVer"); if (!reader.isLastRead()) return false; @@ -246,7 +266,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse { reader.incrementState(); case 12: - filterRes = reader.readBooleanArray("filterRes"); + dhtVers = reader.readObjectArray("dhtVers", MessageCollectionItemType.MSG, GridCacheVersion.class); if (!reader.isLastRead()) return false; @@ -254,7 +274,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse { reader.incrementState(); case 13: - mappedVers = reader.readObjectArray("mappedVers", MessageCollectionItemType.MSG, GridCacheVersion.class); + filterRes = reader.readBooleanArray("filterRes"); if (!reader.isLastRead()) return false; @@ -262,7 +282,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse { reader.incrementState(); case 14: - miniId = reader.readIgniteUuid("miniId"); + mappedVers = reader.readObjectArray("mappedVers", MessageCollectionItemType.MSG, GridCacheVersion.class); if (!reader.isLastRead()) return false; @@ -270,6 +290,14 @@ public class GridNearLockResponse extends GridDistributedLockResponse { reader.incrementState(); case 15: + miniId = reader.readIgniteUuid("miniId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 16: pending = reader.readCollection("pending", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -289,7 +317,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 16; + return 17; } /** {@inheritDoc} */
