http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index 63156fc..68bc705 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -41,10 +41,13 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; -import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; +import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryAware; +import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker; +import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.GridLeanMap; import org.apache.ignite.internal.util.future.GridFinishedFuture; @@ -58,14 +61,13 @@ import org.apache.ignite.internal.util.typedef.P1; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; /** * Colocated get future. */ -public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAdapter<K, V> { +public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAdapter<K, V> implements MvccQueryAware { /** */ private static final long serialVersionUID = 0L; @@ -76,10 +78,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda private static IgniteLogger log; /** */ - private ClusterNode mvccCrd; - - /** */ - private MvccCoordinatorVersion mvccVer; + private MvccQueryTracker mvccTracker; /** * @param cctx Context. @@ -128,6 +127,20 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda } /** + * @return Mvcc version if mvcc is enabled for cache. + */ + @Nullable private MvccCoordinatorVersion mvccVersion() { + if (!cctx.mvccEnabled()) + return null; + + MvccCoordinatorVersion ver = mvccTracker.mvccVersion(); + + assert ver != null : "[fut=" + this + ", mvccTracker=" + mvccTracker + "]"; + + return ver; + } + + /** * Initializes future. * * @param topVer Topology version. @@ -145,40 +158,43 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion(); } - // TODO IGNITE-3478 (correct failover and remap). if (cctx.mvccEnabled()) { - mvccCrd = cctx.shared().coordinators().coordinator(topVer); + mvccTracker = new MvccQueryTracker(cctx, canRemap, this); - if (mvccCrd == null) { - onDone(new ClusterTopologyCheckedException("Mvcc coordinator is not assigned: " + topVer)); + trackable = true; - return; - } + cctx.mvcc().addFuture(this, futId); - final AffinityTopologyVersion topVer0 = topVer; + mvccTracker.requestVersion(topVer); - IgniteInternalFuture<MvccCoordinatorVersion> cntrFut = cctx.shared().coordinators().requestQueryCounter(mvccCrd); + return; + } - cntrFut.listen(new IgniteInClosure<IgniteInternalFuture<MvccCoordinatorVersion>>() { - @Override public void apply(IgniteInternalFuture<MvccCoordinatorVersion> fut) { - try { - mvccVer = fut.get(); + initialMap(topVer); + } + + /** {@inheritDoc} */ + @Override public void onMvccVersionReceived(AffinityTopologyVersion topVer) { + initialMap(topVer); + } - map(keys, - Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), - topVer0); + /** {@inheritDoc} */ + @Override public void onMvccVersionError(IgniteCheckedException e) { + onDone(e); + } - markInitialized(); - } - catch (IgniteCheckedException e) { - onDone(e); - } - } - }); + /** {@inheritDoc} */ + @Nullable @Override public MvccCoordinatorVersion onMvccCoordinatorChange(MvccCoordinator newCrd) { + if (mvccTracker != null) + return mvccTracker.onMvccCoordinatorChange(newCrd); - return; - } + return null; + } + /** + * @param topVer Topology version. + */ + private void initialMap(AffinityTopologyVersion topVer) { map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), topVer); markInitialized(); @@ -241,11 +257,8 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda if (trackable) cctx.mvcc().removeFuture(futId); - if (mvccVer != null) { - assert mvccCrd != null; - - cctx.shared().coordinators().ackQueryDone(mvccCrd, mvccVer); - } + if (mvccTracker != null) + mvccTracker.onQueryDone(); cache().sendTtlUpdateRequest(expiryPlc); @@ -340,7 +353,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda expiryPlc, skipVals, recovery, - mvccVer); + mvccVersion()); final Collection<Integer> invalidParts = fut.invalidPartitions(); @@ -397,7 +410,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda skipVals, cctx.deploymentEnabled(), recovery, - mvccVer); + mvccVersion()); add(fut); // Append new future. @@ -504,7 +517,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda if (readNoEntry) { CacheDataRow row = cctx.mvccEnabled() ? - cctx.offheap().mvccRead(cctx, key, mvccVer) : + cctx.offheap().mvccRead(cctx, key, mvccVersion()) : cctx.offheap().read(cctx, key); if (row != null) { @@ -548,7 +561,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda taskName, expiryPlc, !deserializeBinary, - mvccVer, + mvccVersion(), null); if (getRes != null) { @@ -568,7 +581,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda taskName, expiryPlc, !deserializeBinary, - mvccVer); + mvccVersion()); } cache.context().evicts().touch(entry, topVer); @@ -662,6 +675,17 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda return Collections.emptyMap(); } + /** + * @param curTopVer Current topology version. + * @return Future to wait for before remapping. + */ + private IgniteInternalFuture<AffinityTopologyVersion> waitRemapFuture(AffinityTopologyVersion curTopVer) { + AffinityTopologyVersion updTopVer = + new AffinityTopologyVersion(Math.max(curTopVer.topologyVersion() + 1, cctx.discovery().topologyVersion())); + + return cctx.affinity().affinityReadyFuture(updTopVer); + } + /** {@inheritDoc} */ @Override public String toString() { Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() { @@ -766,17 +790,15 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda onDone(Collections.<K, V>emptyMap()); } else { - final AffinityTopologyVersion updTopVer = - new AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion())); + IgniteInternalFuture<AffinityTopologyVersion> waitFut = waitRemapFuture(topVer); - cctx.affinity().affinityReadyFuture(updTopVer).listen( - new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + waitFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { try { - fut.get(); + AffinityTopologyVersion topVer = fut.get(); // Remap. - map(keys.keySet(), F.t(node, keys), updTopVer); + map(keys.keySet(), F.t(node, keys), topVer); onDone(Collections.<K, V>emptyMap()); }
http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 03b7b6e..830d50b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -67,6 +67,7 @@ import org.apache.ignite.internal.processors.cache.ExchangeActions; import org.apache.ignite.internal.processors.cache.ExchangeContext; import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheFuture; import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.StateChangeRequest; @@ -75,6 +76,10 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalP import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCounter; +import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryAware; import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -552,7 +557,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte boolean crdNode = crd != null && crd.isLocal(); - exchCtx = new ExchangeContext(crdNode, this); + MvccCoordinator mvccCrd = firstEvtDiscoCache.mvccCoordinator(); + + boolean mvccCrdChange = mvccCrd != null && + initialVersion().equals(mvccCrd.topologyVersion()); + + cctx.kernalContext().coordinators().currentCoordinator(mvccCrd); + + exchCtx = new ExchangeContext(crdNode, mvccCrdChange, this); assert state == null : state; @@ -563,6 +575,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (exchLog.isInfoEnabled()) { exchLog.info("Started exchange init [topVer=" + topVer + + ", mvccCrd=" + mvccCrd + ", crd=" + crdNode + ", evt=" + IgniteUtils.gridEventName(firstDiscoEvt.type()) + ", evtNode=" + firstDiscoEvt.eventNode().id() + @@ -644,7 +657,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } } - updateTopologies(crdNode); + updateTopologies(crdNode, cctx.coordinators().currentCoordinator()); switch (exchange) { case ALL: { @@ -748,9 +761,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** * @param crd Coordinator flag. + * @param mvccCrd Mvcc coordinator. * @throws IgniteCheckedException If failed. */ - private void updateTopologies(boolean crd) throws IgniteCheckedException { + private void updateTopologies(boolean crd, MvccCoordinator mvccCrd) throws IgniteCheckedException { for (CacheGroupContext grp : cctx.cache().cacheGroups()) { if (grp.isLocal()) continue; @@ -776,12 +790,43 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte top.updateTopologyVersion( this, events().discoveryCache(), + mvccCrd, updSeq, cacheGroupStopping(grp.groupId())); } - for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) - top.updateTopologyVersion(this, events().discoveryCache(), -1, cacheGroupStopping(top.groupId())); + for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) { + top.updateTopologyVersion(this, + events().discoveryCache(), + mvccCrd, + -1, + cacheGroupStopping(top.groupId())); + } + + if (exchCtx.newMvccCoordinator()) { + assert mvccCrd != null; + + Map<MvccCounter, Integer> activeQrys = new HashMap<>(); + + for (GridCacheFuture<?> fut : cctx.mvcc().activeFutures()) { + if (fut instanceof MvccQueryAware) { + MvccCoordinatorVersion ver = ((MvccQueryAware)fut).onMvccCoordinatorChange(mvccCrd); + + if (ver != null ) { + MvccCounter cntr = new MvccCounter(ver.coordinatorVersion(), ver.counter()); + + Integer cnt = activeQrys.get(cntr); + + if (cnt == null) + activeQrys.put(cntr, 1); + else + activeQrys.put(cntr, cnt + 1); + } + } + } + + exchCtx.addActiveQueries(cctx.localNodeId(), activeQrys); + } } /** @@ -1243,6 +1288,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte msg.partitionHistoryCounters(partHistReserved0); } + Map<UUID, Map<MvccCounter, Integer>> activeQueries = exchCtx.activeQueries(); + + msg.activeQueries(activeQueries != null ? activeQueries.get(cctx.localNodeId()) : null); + if (stateChangeExchange() && changeGlobalStateE != null) msg.setError(changeGlobalStateE); else if (localJoinExchange()) @@ -1418,7 +1467,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } if (err == null) { - cctx.coordinators().assignCoordinator(exchCtx.events().discoveryCache()); + if (exchCtx.newMvccCoordinator() && cctx.localNodeId().equals(cctx.coordinators().currentCoordinatorId())) + cctx.coordinators().initCoordinator(res, exchCtx.events().discoveryCache(), exchCtx.activeQueries()); if (centralizedAff) { assert !exchCtx.mergeExchanges(); @@ -1840,6 +1890,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte */ private void processSingleMessage(UUID nodeId, GridDhtPartitionsSingleMessage msg) { if (msg.client()) { + if (msg.activeQueries() != null) + cctx.coordinators().processClientActiveQueries(nodeId, msg.activeQueries()); + waitAndReplyToNode(nodeId, msg); return; @@ -2188,7 +2241,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } } - if (exchCtx.mergeExchanges()) { + if (exchCtx.mergeExchanges() && !exchCtx.newMvccCoordinator()) { if (log.isInfoEnabled()) log.info("Coordinator received all messages, try merge [ver=" + initialVersion() + ']'); @@ -2260,6 +2313,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) { GridDhtPartitionsSingleMessage msg = e.getValue(); + if (exchCtx.newMvccCoordinator()) + exchCtx.addActiveQueries(e.getKey(), msg.activeQueries()); + else + assert msg.activeQueries() == null; + // Apply update counters after all single messages are received. for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg.partitions().entrySet()) { Integer grpId = entry.getKey(); http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index 215152d..c461e4c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@ -29,12 +29,14 @@ import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCounter; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; @@ -100,6 +102,10 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes */ private GridDhtPartitionsFullMessage finishMsg; + /** */ + @GridDirectMap(keyType = Message.class, valueType = Integer.class) + private Map<MvccCounter, Integer> activeQrys; + /** * Required by {@link Externalizable}. */ @@ -124,6 +130,20 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes } /** + * @return Active queries started with previous coordinator. + */ + Map<MvccCounter, Integer> activeQueries() { + return activeQrys; + } + + /** + * @param activeQrys Active queries started with previous coordinator. + */ + void activeQueries(Map<MvccCounter, Integer> activeQrys) { + this.activeQrys = activeQrys; + } + + /** * @param finishMsg Exchange finish message (used to restore exchange state on new coordinator). */ void finishMessage(GridDhtPartitionsFullMessage finishMsg) { @@ -404,48 +424,54 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes switch (writer.state()) { case 5: - if (!writer.writeBoolean("client", client)) + if (!writer.writeMap("activeQrys", activeQrys, MessageCollectionItemType.MSG, MessageCollectionItemType.INT)) return false; writer.incrementState(); case 6: - if (!writer.writeMap("dupPartsData", dupPartsData, MessageCollectionItemType.INT, MessageCollectionItemType.INT)) + if (!writer.writeBoolean("client", client)) return false; writer.incrementState(); case 7: - if (!writer.writeByteArray("errBytes", errBytes)) + if (!writer.writeMap("dupPartsData", dupPartsData, MessageCollectionItemType.INT, MessageCollectionItemType.INT)) return false; writer.incrementState(); case 8: - if (!writer.writeMessage("finishMsg", finishMsg)) + if (!writer.writeByteArray("errBytes", errBytes)) return false; writer.incrementState(); case 9: - if (!writer.writeCollection("grpsAffRequest", grpsAffRequest, MessageCollectionItemType.INT)) + if (!writer.writeMessage("finishMsg", finishMsg)) return false; writer.incrementState(); case 10: - if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes)) + if (!writer.writeCollection("grpsAffRequest", grpsAffRequest, MessageCollectionItemType.INT)) return false; writer.incrementState(); case 11: - if (!writer.writeByteArray("partHistCntrsBytes", partHistCntrsBytes)) + if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes)) return false; writer.incrementState(); case 12: + if (!writer.writeByteArray("partHistCntrsBytes", partHistCntrsBytes)) + return false; + + writer.incrementState(); + + case 13: if (!writer.writeByteArray("partsBytes", partsBytes)) return false; @@ -468,7 +494,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes switch (reader.state()) { case 5: - client = reader.readBoolean("client"); + activeQrys = reader.readMap("activeQrys", MessageCollectionItemType.MSG, MessageCollectionItemType.INT, false); if (!reader.isLastRead()) return false; @@ -476,7 +502,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes reader.incrementState(); case 6: - dupPartsData = reader.readMap("dupPartsData", MessageCollectionItemType.INT, MessageCollectionItemType.INT, false); + client = reader.readBoolean("client"); if (!reader.isLastRead()) return false; @@ -484,7 +510,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes reader.incrementState(); case 7: - errBytes = reader.readByteArray("errBytes"); + dupPartsData = reader.readMap("dupPartsData", MessageCollectionItemType.INT, MessageCollectionItemType.INT, false); if (!reader.isLastRead()) return false; @@ -492,7 +518,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes reader.incrementState(); case 8: - finishMsg = reader.readMessage("finishMsg"); + errBytes = reader.readByteArray("errBytes"); if (!reader.isLastRead()) return false; @@ -500,7 +526,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes reader.incrementState(); case 9: - grpsAffRequest = reader.readCollection("grpsAffRequest", MessageCollectionItemType.INT); + finishMsg = reader.readMessage("finishMsg"); if (!reader.isLastRead()) return false; @@ -508,7 +534,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes reader.incrementState(); case 10: - partCntrsBytes = reader.readByteArray("partCntrsBytes"); + grpsAffRequest = reader.readCollection("grpsAffRequest", MessageCollectionItemType.INT); if (!reader.isLastRead()) return false; @@ -516,7 +542,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes reader.incrementState(); case 11: - partHistCntrsBytes = reader.readByteArray("partHistCntrsBytes"); + partCntrsBytes = reader.readByteArray("partCntrsBytes"); if (!reader.isLastRead()) return false; @@ -524,6 +550,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes reader.incrementState(); case 12: + partHistCntrsBytes = reader.readByteArray("partHistCntrsBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 13: partsBytes = reader.readByteArray("partsBytes"); if (!reader.isLastRead()) @@ -543,7 +577,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 13; + return 14; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java index 8247b46..4a2aeb8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java @@ -36,9 +36,11 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping; -import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager; +import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator; import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; import org.apache.ignite.internal.processors.cache.mvcc.MvccResponseListener; +import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; @@ -79,17 +81,19 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA boolean found = false; for (IgniteInternalFuture<?> fut : futures()) { - MiniFuture f = (MiniFuture)fut; + if (fut instanceof MiniFuture) { + MiniFuture f = (MiniFuture)fut; - if (f.primary().id().equals(nodeId)) { - ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Remote node left grid: " + - nodeId); + if (f.primary().id().equals(nodeId)) { + ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Remote node left grid: " + + nodeId); - e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion())); + e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion())); - f.onNodeLeft(e); + f.onNodeLeft(e); - found = true; + found = true; + } } } @@ -269,17 +273,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA AffinityTopologyVersion topVer = tx.topologyVersion(); - ClusterNode mvccCrd = null; - - if (tx.txState().mvccEnabled(cctx)) { - mvccCrd = cctx.coordinators().coordinator(topVer); - - if (mvccCrd == null) { - onDone(new ClusterTopologyCheckedException("Mvcc coordinator is not assigned: " + topVer)); - - return; - } - } + MvccCoordinator mvccCrd = null; GridDhtTxMapping txMapping = new GridDhtTxMapping(); @@ -303,6 +297,16 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA else nodes = cacheCtx.affinity().nodesByKey(txEntry.key(), topVer); + if (mvccCrd == null && cacheCtx.mvccEnabled()) { + mvccCrd = cacheCtx.affinity().mvccCoordinator(topVer); + + if (mvccCrd == null) { + onDone(new IgniteCheckedException("Mvcc coordinator is not assigned: " + topVer)); + + return; + } + } + if (F.isEmpty(nodes)) { onDone(new ClusterTopologyServerNotFoundException("Failed to map keys to nodes (partition " + "is not mapped to any node) [key=" + txEntry.key() + @@ -325,6 +329,8 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA txMapping.addMapping(nodes); } + assert !tx.txState().mvccEnabled(cctx) || mvccCrd != null; + tx.transactionNodes(txMapping.transactionNodes()); if (!hasNearCache) @@ -427,13 +433,14 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA if (mvccCrd != null) { assert !tx.onePhaseCommit(); - if (mvccCrd.isLocal()) { + if (mvccCrd.nodeId().equals(cctx.localNodeId())) { MvccCoordinatorVersion mvccVer = cctx.coordinators().requestTxCounterOnCoordinator(tx); - tx.mvccCoordinatorVersion(mvccVer); + onMvccResponse(cctx.localNodeId(), mvccVer); } else { - IgniteInternalFuture<Long> cntrFut = cctx.coordinators().requestTxCounter(mvccCrd, this, tx.nearXidVersion()); + IgniteInternalFuture<MvccCoordinatorVersion> cntrFut = + cctx.coordinators().requestTxCounter(mvccCrd, this, tx.nearXidVersion()); add((IgniteInternalFuture)cntrFut); } @@ -443,8 +450,8 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA } /** {@inheritDoc} */ - @Override public void onMvccResponse(MvccCoordinatorVersion res) { - tx.mvccCoordinatorVersion(res); + @Override public void onMvccResponse(UUID crdId, MvccCoordinatorVersion res) { + tx.mvccInfo(new TxMvccInfo(crdId, res)); } /** {@inheritDoc} */ @@ -485,12 +492,12 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA ", loc=" + ((MiniFuture)f).primary().isLocal() + ", done=" + f.isDone() + "]"; } - else if (f instanceof CacheCoordinatorsSharedManager.MvccVersionFuture) { - CacheCoordinatorsSharedManager.MvccVersionFuture crdFut = - (CacheCoordinatorsSharedManager.MvccVersionFuture)f; + else if (f instanceof CacheCoordinatorsProcessor.MvccVersionFuture) { + CacheCoordinatorsProcessor.MvccVersionFuture crdFut = + (CacheCoordinatorsProcessor.MvccVersionFuture)f; - return "[mvccCrdNode=" + crdFut.crd.id() + - ", loc=" + crdFut.crd.isLocal() + + return "[mvccCrdNode=" + crdFut.crdId + + ", loc=" + crdFut.crdId.equals(cctx.localNodeId()) + ", done=" + f.isDone() + "]"; } else @@ -500,6 +507,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA return S.toString(GridNearPessimisticTxPrepareFuture.class, this, "innerFuts", futs, + "txId", tx.nearXidVersion(), "super", super.toString()); } @@ -544,8 +552,8 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA if (res.error() != null) onError(res.error()); else { - if (res.mvccCoordinatorVersion() != null) - tx.mvccCoordinatorVersion(res.mvccCoordinatorVersion()); + if (res.mvccInfo() != null) + tx.mvccInfo(res.mvccInfo()); onPrepareResponse(m, res, updateMapping); http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java index 7d03d46..c24551b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java @@ -18,8 +18,8 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.internal.S; @@ -53,13 +53,11 @@ public class GridNearTxFinishAndAckFuture extends GridFutureAdapter<IgniteIntern @Override public void apply(final GridNearTxFinishFuture fut) { GridNearTxLocal tx = fut.tx(); - if (tx.mvccCoordinatorVersion() != null) { - ClusterNode crd = fut.context().coordinators().coordinator(tx.topologyVersion()); - - assert crd != null; + TxMvccInfo mvccInfo = tx.mvccInfo(); + if (mvccInfo != null) { IgniteInternalFuture<Void> ackFut = fut.context().coordinators().ackTxCommit( - crd, tx.mvccCoordinatorVersion()); + mvccInfo.coordinator(), mvccInfo.version()); ackFut.listen(new IgniteInClosure<IgniteInternalFuture<Void>>() { @Override public void apply(IgniteInternalFuture<Void> ackFut) { http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index 347a694..a9b60d7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse; +import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -420,12 +421,10 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit return; } - if (!commit && tx.mvccCoordinatorVersion() != null) { - ClusterNode crd = cctx.coordinators().coordinator(tx.topologyVersion()); + if (!commit && tx.mvccInfo() != null) { + TxMvccInfo mvccInfo = tx.mvccInfo(); - assert crd != null; - - cctx.coordinators().ackTxRollback(crd, tx.mvccCoordinatorVersion()); + cctx.coordinators().ackTxRollback(mvccInfo.coordinator(), mvccInfo.version()); } try { @@ -433,11 +432,11 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit GridLongList waitTxs = tx.mvccWaitTransactions(); if (waitTxs != null) { - ClusterNode crd = cctx.coordinators().coordinator(tx.topologyVersion()); + TxMvccInfo mvccInfo = tx.mvccInfo(); - assert crd != null; + assert mvccInfo != null; - IgniteInternalFuture fut = cctx.coordinators().waitTxsFuture(crd, waitTxs); + IgniteInternalFuture fut = cctx.coordinators().waitTxsFuture(mvccInfo.coordinator(), waitTxs); add(fut); } @@ -752,7 +751,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit tx.size(), tx.subjectId(), tx.taskNameHash(), - tx.mvccCoordinatorVersion(), + tx.mvccInfo(), tx.activeCachesDeploymentEnabled() ); http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java index 918724e..d436aed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java @@ -24,7 +24,7 @@ import java.util.UUID; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishRequest; -import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; +import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringBuilder; import org.apache.ignite.lang.IgniteUuid; @@ -44,7 +44,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest { private int miniId; /** */ - private MvccCoordinatorVersion mvccVer; + private TxMvccInfo mvccInfo; /** * Empty constructor required for {@link Externalizable}. @@ -91,7 +91,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest { int txSize, @Nullable UUID subjId, int taskNameHash, - MvccCoordinatorVersion mvccVer, + TxMvccInfo mvccInfo, boolean addDepInfo) { super( xidVer, @@ -116,14 +116,14 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest { explicitLock(explicitLock); storeEnabled(storeEnabled); - this.mvccVer = mvccVer; + this.mvccInfo = mvccInfo; } /** - * @return Counter. + * @return Mvcc info. */ - public MvccCoordinatorVersion mvccCoordinatorVersion() { - return mvccVer; + @Nullable public TxMvccInfo mvccInfo() { + return mvccInfo; } /** @@ -192,7 +192,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest { writer.incrementState(); case 22: - if (!writer.writeMessage("mvccVer", mvccVer)) + if (!writer.writeMessage("mvccInfo", mvccInfo)) return false; writer.incrementState(); @@ -222,7 +222,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); case 22: - mvccVer = reader.readMessage("mvccVer"); + mvccInfo = reader.readMessage("mvccInfo"); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java index ddc5826..987a751 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java @@ -160,7 +160,7 @@ public abstract class GridNearTxPrepareFutureAdapter extends * @param txMapping Transaction mapping. */ final void checkOnePhase(GridDhtTxMapping txMapping) { - if (tx.storeWriteThrough()) + if (tx.storeWriteThrough() || tx.txState().mvccEnabled(cctx)) // TODO IGNITE-3479 (onePhase + mvcc) return; Map<UUID, Collection<UUID>> map = txMapping.transactionNodes(); http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java index 7fe2e53..10883de 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java @@ -33,7 +33,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxPrepareResponse; -import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; +import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -99,7 +99,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse private AffinityTopologyVersion clientRemapVer; /** */ - private MvccCoordinatorVersion mvccVer; + private TxMvccInfo mvccInfo; /** * Empty constructor required by {@link Externalizable}. @@ -150,17 +150,17 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse } /** - * @param mvccVer Mvcc version. + * @param mvccInfo Mvcc info. */ - public void mvccCoordinatorVersion(MvccCoordinatorVersion mvccVer) { - this.mvccVer = mvccVer; + public void mvccInfo(TxMvccInfo mvccInfo) { + this.mvccInfo = mvccInfo; } /** - * @return Mvcc version. + * @return Mvcc info. */ - public MvccCoordinatorVersion mvccCoordinatorVersion() { - return mvccVer; + @Nullable public TxMvccInfo mvccInfo() { + return mvccInfo; } /** @@ -407,7 +407,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse writer.incrementState(); case 15: - if (!writer.writeMessage("mvccVer", mvccVer)) + if (!writer.writeMessage("mvccInfo", mvccInfo)) return false; writer.incrementState(); @@ -499,7 +499,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse reader.incrementState(); case 15: - mvccVer = reader.readMessage("mvccVer"); + mvccInfo = reader.readMessage("mvccInfo"); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsDiscoveryData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsDiscoveryData.java new file mode 100644 index 0000000..39baec9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsDiscoveryData.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import java.io.Serializable; + +/** + * + */ +public class CacheCoordinatorsDiscoveryData implements Serializable { + /** */ + private MvccCoordinator crd; + + /** + * @param crd Coordinator. + */ + public CacheCoordinatorsDiscoveryData(MvccCoordinator crd) { + this.crd = crd; + } + + /** + * @return Current coordinator. + */ + public MvccCoordinator coordinator() { + return crd; + } +}
