ignite-3479 Coordinators reassign on failure
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/761e43d3 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/761e43d3 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/761e43d3 Branch: refs/heads/ignite-3478 Commit: 761e43d3039cf8c58c9c7b0ec2dde68238d71647 Parents: 7f4defd Author: sboikov <[email protected]> Authored: Fri Sep 29 14:29:03 2017 +0300 Committer: sboikov <[email protected]> Committed: Fri Sep 29 14:29:03 2017 +0300 ---------------------------------------------------------------------- .../apache/ignite/internal/GridComponent.java | 5 +- .../ignite/internal/GridKernalContext.java | 6 + .../ignite/internal/GridKernalContextImpl.java | 14 +- .../apache/ignite/internal/IgniteKernal.java | 4 +- .../managers/communication/GridIoManager.java | 3 + .../communication/GridIoMessageFactory.java | 18 + .../internal/managers/discovery/DiscoCache.java | 13 + .../discovery/GridDiscoveryManager.java | 4 + .../processors/affinity/AffinityAssignment.java | 9 +- .../affinity/GridAffinityAssignment.java | 15 +- .../affinity/GridAffinityAssignmentCache.java | 21 +- .../affinity/GridAffinityProcessor.java | 2 +- .../processors/affinity/GridAffinityUtils.java | 2 +- .../affinity/HistoryAffinityAssignment.java | 12 +- .../cache/CacheAffinitySharedManager.java | 29 +- .../processors/cache/ExchangeContext.java | 43 +- .../cache/GridCacheAffinityManager.java | 5 + .../GridCachePartitionExchangeManager.java | 74 +- .../processors/cache/GridCacheProcessor.java | 3 - .../cache/GridCacheSharedContext.java | 20 +- .../GridDistributedTxRemoteAdapter.java | 8 +- .../dht/GridClientPartitionTopology.java | 7 + .../distributed/dht/GridDhtCacheAdapter.java | 2 + .../dht/GridDhtPartitionTopology.java | 4 + .../dht/GridDhtPartitionTopologyImpl.java | 11 + .../distributed/dht/GridDhtTxFinishFuture.java | 15 +- .../distributed/dht/GridDhtTxFinishRequest.java | 20 +- .../distributed/dht/GridDhtTxLocalAdapter.java | 2 - .../distributed/dht/GridDhtTxPrepareFuture.java | 45 +- .../dht/GridDhtTxPrepareRequest.java | 18 +- .../dht/GridPartitionedGetFuture.java | 118 +- .../GridDhtPartitionsExchangeFuture.java | 72 +- .../GridDhtPartitionsSingleMessage.java | 64 +- .../GridNearPessimisticTxPrepareFuture.java | 70 +- .../near/GridNearTxFinishAndAckFuture.java | 10 +- .../near/GridNearTxFinishFuture.java | 17 +- .../near/GridNearTxFinishRequest.java | 18 +- .../near/GridNearTxPrepareFutureAdapter.java | 2 +- .../near/GridNearTxPrepareResponse.java | 20 +- .../mvcc/CacheCoordinatorsDiscoveryData.java | 42 + .../cache/mvcc/CacheCoordinatorsProcessor.java | 1304 ++++++++++++++++++ .../mvcc/CacheCoordinatorsSharedManager.java | 999 -------------- .../mvcc/CoordinatorAssignmentHistory.java | 71 - .../processors/cache/mvcc/MvccCoordinator.java | 101 ++ .../processors/cache/mvcc/MvccCounter.java | 163 +++ .../processors/cache/mvcc/MvccQueryAware.java | 43 + .../processors/cache/mvcc/MvccQueryTracker.java | 232 ++++ .../cache/mvcc/MvccResponseListener.java | 10 +- .../mvcc/NewCoordinatorQueryAckRequest.java | 156 +++ .../cache/mvcc/PreviousCoordinatorQueries.java | 190 +++ .../processors/cache/mvcc/TxMvccInfo.java | 141 ++ .../wal/reader/IgniteWalIteratorFactory.java | 2 +- .../wal/reader/StandaloneGridKernalContext.java | 6 + .../query/GridCacheDistributedQueryManager.java | 5 +- .../cache/query/GridCacheQueryManager.java | 11 +- .../cache/transactions/IgniteInternalTx.java | 6 +- .../cache/transactions/IgniteTxAdapter.java | 17 +- .../cache/transactions/IgniteTxHandler.java | 8 +- .../transactions/IgniteTxLocalAdapter.java | 12 +- .../cache/tree/AbstractDataInnerIO.java | 6 +- .../cache/tree/AbstractDataLeafIO.java | 6 +- .../processors/cache/tree/CacheDataTree.java | 4 +- .../cache/tree/CacheIdAwareDataInnerIO.java | 4 +- .../cache/tree/CacheIdAwareDataLeafIO.java | 4 +- .../processors/cache/tree/DataInnerIO.java | 4 +- .../processors/cache/tree/DataLeafIO.java | 4 +- .../processors/cache/tree/MvccDataRow.java | 4 +- .../processors/cache/tree/SearchRow.java | 4 +- .../util/future/GridCompoundFuture.java | 4 +- .../cache/mvcc/CacheMvccTransactionsTest.java | 764 +++++++++- .../pagemem/BPlusTreePageMemoryImplTest.java | 1 - .../BPlusTreeReuseListPageMemoryImplTest.java | 1 - .../MetadataStoragePageMemoryImplTest.java | 1 - .../pagemem/PageMemoryImplNoLoadTest.java | 1 - .../persistence/pagemem/PageMemoryImplTest.java | 1 - .../loadtests/hashmap/GridCacheTestContext.java | 2 - .../testframework/junits/GridAbstractTest.java | 8 + 77 files changed, 3765 insertions(+), 1402 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java index 93ffe95..c3a8127 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java @@ -61,7 +61,10 @@ public interface GridComponent { BINARY_PROC, /** Query processor. */ - QUERY_PROC + QUERY_PROC, + + /** */ + CACHE_CRD_PROC } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index 99c7cce..88251aa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -34,6 +34,7 @@ import org.apache.ignite.internal.managers.indexing.GridIndexingManager; import org.apache.ignite.internal.managers.loadbalancer.GridLoadBalancerManager; import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor; import org.apache.ignite.internal.processors.cache.GridCacheProcessor; +import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; import org.apache.ignite.internal.processors.closure.GridClosureProcessor; import org.apache.ignite.internal.processors.cluster.ClusterProcessor; @@ -643,4 +644,9 @@ public interface GridKernalContext extends Iterable<GridComponent> { * @return Platform processor. */ public PlatformProcessor platform(); + + /** + * @return Cache mvcc coordinator processor. + */ + public CacheCoordinatorsProcessor coordinators(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index 07e5970..86c0adc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -49,6 +49,7 @@ import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor; import org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager; import org.apache.ignite.internal.processors.cache.GridCacheProcessor; import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; +import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; import org.apache.ignite.internal.processors.closure.GridClosureProcessor; import org.apache.ignite.internal.processors.cluster.ClusterProcessor; @@ -282,6 +283,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable @GridToStringExclude private DataStructuresProcessor dataStructuresProc; + /** Cache mvcc coordinators. */ + @GridToStringExclude + private CacheCoordinatorsProcessor coordProc; + /** */ @GridToStringExclude private List<GridComponent> comps = new LinkedList<>(); @@ -344,7 +349,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable /** */ @GridToStringExclude - Map<String, ? extends ExecutorService> customExecSvcs; + private Map<String, ? extends ExecutorService> customExecSvcs; /** */ @GridToStringExclude @@ -579,6 +584,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable poolProc = (PoolProcessor) comp; else if (comp instanceof GridMarshallerMappingProcessor) mappingProc = (GridMarshallerMappingProcessor)comp; + else if (comp instanceof CacheCoordinatorsProcessor) + coordProc = (CacheCoordinatorsProcessor)comp; else if (!(comp instanceof DiscoveryNodeValidationProcessor || comp instanceof PlatformPluginProcessor)) assert (comp instanceof GridPluginComponent) : "Unknown manager class: " + comp.getClass(); @@ -834,6 +841,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable } /** {@inheritDoc} */ + @Override public CacheCoordinatorsProcessor coordinators() { + return coordProc; + } + + /** {@inheritDoc} */ @Override public IgniteLogger log(String ctgr) { return config().getGridLogger().getLogger(ctgr); } http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index b015666..2dbbb7c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -114,6 +114,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheUtilityKey; import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; +import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor; import org.apache.ignite.internal.processors.cache.persistence.MemoryPolicy; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; import org.apache.ignite.internal.processors.closure.GridClosureProcessor; @@ -937,8 +938,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { // Start processors before discovery manager, so they will // be able to start receiving messages once discovery completes. try { + startProcessor(new CacheCoordinatorsProcessor(ctx)); startProcessor(createComponent(DiscoveryNodeValidationProcessor.class, ctx)); - startProcessor(new GridAffinityProcessor(ctx)); + startProcessor(new GridAffinityProcessor(ctx)); startProcessor(createComponent(GridSegmentationProcessor.class, ctx)); startProcessor(createComponent(IgniteCacheObjectProcessor.class, ctx)); startProcessor(new GridClusterStateProcessor(ctx)); http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 8f03911..adce492 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -1659,6 +1659,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (e.getCause() instanceof ClusterTopologyCheckedException) throw (ClusterTopologyCheckedException)e.getCause(); + if (!ctx.discovery().alive(node)) + throw new ClusterTopologyCheckedException("Failed to send message, node left: " + node.id()); + throw new IgniteCheckedException("Failed to send message (node may have left the grid or " + "TCP connection cannot be established due to firewall issues) " + "[node=" + node + ", topic=" + topic + http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 9bd04fa..99bc8af 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -110,6 +110,9 @@ import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorFutureRespons import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorTxCounterRequest; import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorWaitTxsRequest; import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersionResponse; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCounter; +import org.apache.ignite.internal.processors.cache.mvcc.NewCoordinatorQueryAckRequest; +import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse; import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; @@ -923,6 +926,21 @@ public class GridIoMessageFactory implements MessageFactory { return msg; + case 139: + msg = new TxMvccInfo(); + + return msg; + + case 140: + msg = new NewCoordinatorQueryAckRequest(); + + return msg; + + case 141: + msg = new MvccCounter(); + + return msg; + // [-3..119] [124..128] [-23..-27] [-36..-55]- this // [120..123] - DR http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java index 95e855a..b6cae3f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java @@ -25,6 +25,7 @@ import java.util.Set; import java.util.UUID; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator; import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -81,6 +82,9 @@ public class DiscoCache { /** */ private final AffinityTopologyVersion topVer; + /** */ + private final MvccCoordinator mvccCrd; + /** * @param topVer Topology version. * @param state Current cluster state. @@ -99,6 +103,7 @@ public class DiscoCache { AffinityTopologyVersion topVer, DiscoveryDataClusterState state, ClusterNode loc, + MvccCoordinator mvccCrd, List<ClusterNode> rmtNodes, List<ClusterNode> allNodes, List<ClusterNode> srvNodes, @@ -111,6 +116,7 @@ public class DiscoCache { this.topVer = topVer; this.state = state; this.loc = loc; + this.mvccCrd = mvccCrd; this.rmtNodes = rmtNodes; this.allNodes = allNodes; this.srvNodes = srvNodes; @@ -136,6 +142,13 @@ public class DiscoCache { } /** + * @return Mvcc coordinator node. + */ + @Nullable public MvccCoordinator mvccCoordinator() { + return mvccCrd; + } + + /** * @return Topology version. */ public AffinityTopologyVersion version() { http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 527399d..584df82 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -74,6 +74,7 @@ import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator; import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage; @@ -616,6 +617,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { DiscoCache discoCache = null; + ctx.coordinators().onDiscoveryEvent(type, topSnapshot, topVer); + boolean locJoinEvt = type == EVT_NODE_JOINED && node.id().equals(locNode.id()); IgniteInternalFuture<Boolean> transitionWaitFut = null; @@ -2261,6 +2264,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { topVer, state, loc, + ctx.coordinators().discoveryData().coordinator(), Collections.unmodifiableList(rmtNodes), Collections.unmodifiableList(allNodes), Collections.unmodifiableList(srvNodes), http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java index 06207d3..28dec1c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java @@ -17,12 +17,12 @@ package org.apache.ignite.internal.processors.affinity; -import org.apache.ignite.cluster.ClusterNode; - import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.UUID; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator; /** * Cached affinity calculations. @@ -85,4 +85,9 @@ public interface AffinityAssignment { * @return Backup partitions for specified node ID. */ public Set<Integer> backupPartitions(UUID nodeId); + + /** + * @return Mvcc coordinator. + */ + public MvccCoordinator mvccCoordinator(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java index 35130a3..a7549cc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator; import org.apache.ignite.internal.util.typedef.internal.S; /** @@ -39,6 +40,9 @@ public class GridAffinityAssignment implements AffinityAssignment, Serializable /** Topology version. */ private final AffinityTopologyVersion topVer; + /** */ + private final MvccCoordinator mvccCrd; + /** Collection of calculated affinity nodes. */ private List<List<ClusterNode>> assignment; @@ -69,6 +73,7 @@ public class GridAffinityAssignment implements AffinityAssignment, Serializable this.topVer = topVer; primary = new HashMap<>(); backup = new HashMap<>(); + mvccCrd = null; clientEvtChange = false; } @@ -79,7 +84,8 @@ public class GridAffinityAssignment implements AffinityAssignment, Serializable */ GridAffinityAssignment(AffinityTopologyVersion topVer, List<List<ClusterNode>> assignment, - List<List<ClusterNode>> idealAssignment) { + List<List<ClusterNode>> idealAssignment, + MvccCoordinator mvccCrd) { assert topVer != null; assert assignment != null; assert idealAssignment != null; @@ -87,6 +93,7 @@ public class GridAffinityAssignment implements AffinityAssignment, Serializable this.topVer = topVer; this.assignment = assignment; this.idealAssignment = idealAssignment.equals(assignment) ? assignment : idealAssignment; + this.mvccCrd = mvccCrd; primary = new HashMap<>(); backup = new HashMap<>(); @@ -106,6 +113,7 @@ public class GridAffinityAssignment implements AffinityAssignment, Serializable idealAssignment = aff.idealAssignment; primary = aff.primary; backup = aff.backup; + mvccCrd = aff.mvccCrd; clientEvtChange = true; } @@ -264,6 +272,11 @@ public class GridAffinityAssignment implements AffinityAssignment, Serializable } /** {@inheritDoc} */ + @Override public MvccCoordinator mvccCoordinator() { + return mvccCrd; + } + + /** {@inheritDoc} */ @Override public int hashCode() { return topVer.hashCode(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java index f921251..fb4092a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java @@ -41,6 +41,7 @@ import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.GridNodeOrderComparator; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -184,10 +185,24 @@ public class GridAffinityAssignmentCache { * @param affAssignment Affinity assignment for topology version. */ public void initialize(AffinityTopologyVersion topVer, List<List<ClusterNode>> affAssignment) { + MvccCoordinator mvccCrd = ctx.cache().context().coordinators().currentCoordinatorForCacheAffinity(topVer); + + initialize(topVer, affAssignment, mvccCrd); + } + + /** + * Initializes affinity with given topology version and assignment. + * + * @param topVer Topology version. + * @param affAssignment Affinity assignment for topology version. + * @param mvccCrd Mvcc coordinator. + */ + public void initialize(AffinityTopologyVersion topVer, List<List<ClusterNode>> affAssignment, MvccCoordinator mvccCrd) { assert topVer.compareTo(lastVersion()) >= 0 : "[topVer = " + topVer + ", last=" + lastVersion() + ']'; assert idealAssignment != null; + assert mvccCrd == null || topVer.compareTo(mvccCrd.topologyVersion()) >= 0 : "[mvccCrd=" + mvccCrd + ", topVer=" + topVer + ']'; - GridAffinityAssignment assignment = new GridAffinityAssignment(topVer, affAssignment, idealAssignment); + GridAffinityAssignment assignment = new GridAffinityAssignment(topVer, affAssignment, idealAssignment, mvccCrd); affCache.put(topVer, new HistoryAffinityAssignment(assignment)); head.set(assignment); @@ -570,7 +585,9 @@ public class GridAffinityAssignmentCache { idealAssignment(aff.idealAssignment()); - initialize(aff.lastVersion(), aff.assignments(aff.lastVersion())); + AffinityAssignment assign = aff.cachedAffinity(aff.lastVersion()); + + initialize(aff.lastVersion(), assign.assignment(), assign.mvccCoordinator()); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java index 9c9fb8f..3a142c6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java @@ -384,7 +384,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { try { GridAffinityAssignment assign = assign0 instanceof GridAffinityAssignment ? (GridAffinityAssignment)assign0 : - new GridAffinityAssignment(topVer, assign0.assignment(), assign0.idealAssignment()); + new GridAffinityAssignment(topVer, assign0.assignment(), assign0.idealAssignment(), assign0.mvccCoordinator()); AffinityInfo info = new AffinityInfo( cctx.config().getAffinity(), http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java index abd5292..15d7e4e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java @@ -184,7 +184,7 @@ class GridAffinityUtils { GridAffinityAssignment assign = assign0 instanceof GridAffinityAssignment ? (GridAffinityAssignment)assign0 : - new GridAffinityAssignment(topVer, assign0.assignment(), assign0.idealAssignment()); + new GridAffinityAssignment(topVer, assign0.assignment(), assign0.idealAssignment(), assign0.mvccCoordinator()); return F.t( affinityMessage(ctx, cctx.config().getAffinity()), http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java index e502dd5..d9c03e5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.affinity; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -43,17 +44,26 @@ public class HistoryAffinityAssignment implements AffinityAssignment { /** */ private final boolean clientEvtChange; + /** */ + private final MvccCoordinator mvccCrd; + /** * @param assign Assignment. */ - public HistoryAffinityAssignment(GridAffinityAssignment assign) { + HistoryAffinityAssignment(GridAffinityAssignment assign) { this.topVer = assign.topologyVersion(); this.assignment = assign.assignment(); this.idealAssignment = assign.idealAssignment(); + this.mvccCrd = assign.mvccCoordinator(); this.clientEvtChange = assign.clientEventChange(); } /** {@inheritDoc} */ + @Override public MvccCoordinator mvccCoordinator() { + return mvccCrd; + } + + /** {@inheritDoc} */ @Override public boolean clientEventChange() { return clientEvtChange; } http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 741e204..1f9890c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -57,6 +57,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage; import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState; import org.apache.ignite.internal.util.GridLongList; @@ -448,7 +449,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (grpHolder.client()) { ClientCacheDhtTopologyFuture topFut = new ClientCacheDhtTopologyFuture(topVer); - grp.topology().updateTopologyVersion(topFut, discoCache, -1, false); + grp.topology().updateTopologyVersion(topFut, + discoCache, + cctx.coordinators().currentCoordinator(), + -1, + false); grpHolder = new CacheGroupHolder1(grp, grpHolder.affinity()); @@ -495,6 +500,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assert grp != null; GridDhtAffinityAssignmentResponse res = fetchAffinity(topVer, + cctx.coordinators().currentCoordinator(), null, discoCache, grp.affinity(), @@ -517,7 +523,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap new ClusterTopologyServerNotFoundException("All server nodes left grid.")); } - grp.topology().updateTopologyVersion(topFut, discoCache, -1, false); + grp.topology().updateTopologyVersion(topFut, + discoCache, + cctx.coordinators().currentCoordinator(), + -1, + false); grp.topology().update(topVer, partMap, null, Collections.<Integer>emptySet(), null); @@ -1180,6 +1190,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap fetchFut.init(false); fetchAffinity(evts.topologyVersion(), + cctx.coordinators().currentCoordinator(), evts.lastEvent(), evts.discoveryCache(), aff, fetchFut); @@ -1528,6 +1539,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap int grpId = fetchFut.groupId(); fetchAffinity(topVer, + cctx.coordinators().currentCoordinator(), fut.events().lastEvent(), fut.events().discoveryCache(), cctx.cache().cacheGroup(grpId).affinity(), @@ -1537,6 +1549,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap /** * @param topVer Topology version. + * @param mvccCrd Mvcc coordinator to set in affinity. * @param discoveryEvt Discovery event. * @param discoCache Discovery data cache. * @param affCache Affinity. @@ -1544,7 +1557,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @throws IgniteCheckedException If failed. * @return Affinity assignment response. */ - private GridDhtAffinityAssignmentResponse fetchAffinity(AffinityTopologyVersion topVer, + private GridDhtAffinityAssignmentResponse fetchAffinity( + AffinityTopologyVersion topVer, + MvccCoordinator mvccCrd, @Nullable DiscoveryEvent discoveryEvt, DiscoCache discoCache, GridAffinityAssignmentCache affCache, @@ -1557,7 +1572,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (res == null) { List<List<ClusterNode>> aff = affCache.calculate(topVer, discoveryEvt, discoCache); - affCache.initialize(topVer, aff); + affCache.initialize(topVer, aff, mvccCrd); } else { List<List<ClusterNode>> idealAff = res.idealAffinityAssignment(discoCache); @@ -1574,7 +1589,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assert aff != null : res; - affCache.initialize(topVer, aff); + affCache.initialize(topVer, aff, mvccCrd); } return res; @@ -1624,7 +1639,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @throws IgniteCheckedException If failed. * @return Future completed when caches initialization is done. */ - public IgniteInternalFuture<?> initCoordinatorCaches(final GridDhtPartitionsExchangeFuture fut, + public IgniteInternalFuture<?> initCoordinatorCaches( + final GridDhtPartitionsExchangeFuture fut, final boolean newAff) throws IgniteCheckedException { final List<IgniteInternalFuture<AffinityTopologyVersion>> futs = new ArrayList<>(); @@ -1692,6 +1708,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap @Override public void applyx(IgniteInternalFuture<GridDhtAffinityAssignmentResponse> fetchFut) throws IgniteCheckedException { fetchAffinity(prev.topologyVersion(), + null, // Pass null mvcc coordinator, this affinity version should be used for queries. prev.events().lastEvent(), prev.events().discoveryCache(), aff, http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java index 4046c98..55ffdaf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java @@ -17,11 +17,15 @@ package org.apache.ignite.internal.processors.cache; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Set; +import java.util.UUID; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCounter; import org.apache.ignite.internal.util.typedef.internal.S; import org.jetbrains.annotations.Nullable; @@ -51,11 +55,20 @@ public class ExchangeContext { /** */ private final boolean compatibilityNode = getBoolean(IGNITE_EXCHANGE_COMPATIBILITY_VER_1, false); + /** */ + private final boolean newMvccCrd; + + /** Currently running mvcc queries, initialized when mvcc coordinator is changed. */ + private Map<UUID, Map<MvccCounter, Integer>> activeQueries; + /** * @param crd Coordinator flag. + * @param newMvccCrd {@code True} if new coordinator assigned during this exchange. * @param fut Exchange future. */ - public ExchangeContext(boolean crd, GridDhtPartitionsExchangeFuture fut) { + public ExchangeContext(boolean crd, boolean newMvccCrd, GridDhtPartitionsExchangeFuture fut) { + this.newMvccCrd = newMvccCrd; + int protocolVer = exchangeProtocolVersion(fut.firstEventCache().minimumNodeVersion()); if (compatibilityNode || (crd && fut.localJoinExchange())) { @@ -124,6 +137,34 @@ public class ExchangeContext { return merge; } + /** + * @return {@code True} if new node assigned as mvcc coordinator node during this exchange. + */ + public boolean newMvccCoordinator() { + return newMvccCrd; + } + + /** + * @return Active queries. + */ + public Map<UUID, Map<MvccCounter, Integer>> activeQueries() { + return activeQueries; + } + + /** + * @param nodeId Node ID. + * @param nodeQueries Node queries. + */ + public void addActiveQueries(UUID nodeId, @Nullable Map<MvccCounter, Integer> nodeQueries) { + if (nodeQueries == null) + return; + + if (activeQueries == null) + activeQueries = new HashMap<>(); + + activeQueries.put(nodeId, nodeQueries); + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(ExchangeContext.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java index 702b848..91e4505 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java @@ -32,6 +32,7 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.typedef.F; import org.jetbrains.annotations.Nullable; @@ -238,6 +239,10 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { return aff0.cachedAffinity(topVer); } + public MvccCoordinator mvccCoordinator(AffinityTopologyVersion topVer) { + return assignment(topVer).mvccCoordinator(); + } + /** * @param key Key to check. * @param topVer Topology version. http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index fe9ed29..097d90f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -805,7 +805,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * @param ver Topology version. * @return Future or {@code null} is future is already completed. */ - @Nullable public IgniteInternalFuture<?> affinityReadyFuture(AffinityTopologyVersion ver) { + @Nullable public IgniteInternalFuture<AffinityTopologyVersion> affinityReadyFuture(AffinityTopologyVersion ver) { GridDhtPartitionsExchangeFuture lastInitializedFut0 = lastInitializedFut; if (lastInitializedFut0 != null && lastInitializedFut0.initialVersion().compareTo(ver) == 0) { @@ -1719,9 +1719,15 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana IgniteTxManager tm = cctx.tm(); if (tm != null) { - U.warn(diagnosticLog, "Pending transactions:"); + boolean first = true; for (IgniteInternalTx tx : tm.activeTransactions()) { + if (first) { + U.warn(diagnosticLog, "Pending transactions:"); + + first = false; + } + if (exchTopVer != null) { U.warn(diagnosticLog, ">>> [txVer=" + tx.topologyVersionSnapshot() + ", exchWait=" + tm.needWaitTransaction(tx, exchTopVer) + @@ -1735,31 +1741,66 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana GridCacheMvccManager mvcc = cctx.mvcc(); if (mvcc != null) { - U.warn(diagnosticLog, "Pending explicit locks:"); + boolean first = true; + + for (GridCacheExplicitLockSpan lockSpan : mvcc.activeExplicitLocks()) { + if (first) { + U.warn(diagnosticLog, "Pending explicit locks:"); + + first = false; + } - for (GridCacheExplicitLockSpan lockSpan : mvcc.activeExplicitLocks()) U.warn(diagnosticLog, ">>> " + lockSpan); + } - U.warn(diagnosticLog, "Pending cache futures:"); + first = true; + + for (GridCacheFuture<?> fut : mvcc.activeFutures()) { + if (first) { + U.warn(diagnosticLog, "Pending cache futures:"); + + first = false; + } - for (GridCacheFuture<?> fut : mvcc.activeFutures()) dumpDiagnosticInfo(fut, diagCtx); + } + + first = true; + + for (GridCacheFuture<?> fut : mvcc.atomicFutures()) { + if (first) { + U.warn(diagnosticLog, "Pending atomic cache futures:"); - U.warn(diagnosticLog, "Pending atomic cache futures:"); + first = false; + } - for (GridCacheFuture<?> fut : mvcc.atomicFutures()) dumpDiagnosticInfo(fut, diagCtx); + } + + first = true; - U.warn(diagnosticLog, "Pending data streamer futures:"); + for (IgniteInternalFuture<?> fut : mvcc.dataStreamerFutures()) { + if (first) { + U.warn(diagnosticLog, "Pending data streamer futures:"); + + first = false; + } - for (IgniteInternalFuture<?> fut : mvcc.dataStreamerFutures()) dumpDiagnosticInfo(fut, diagCtx); + } if (tm != null) { - U.warn(diagnosticLog, "Pending transaction deadlock detection futures:"); + first = true; + + for (IgniteInternalFuture<?> fut : tm.deadlockDetectionFutures()) { + if (first) { + U.warn(diagnosticLog, "Pending transaction deadlock detection futures:"); + + first = false; + } - for (IgniteInternalFuture<?> fut : tm.deadlockDetectionFutures()) dumpDiagnosticInfo(fut, diagCtx); + } } } @@ -1781,6 +1822,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana affDumpCnt++; } } + + cctx.kernalContext().coordinators().dumpDebugInfo(diagnosticLog, diagCtx); } /** @@ -1949,6 +1992,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana ClusterNode node = evt.eventNode(); + if ((evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT) && + node.equals(cctx.coordinators().currentCoordinator())) { + if (log.isInfoEnabled()) + log.info("Stop merge, need exchange for mvcc coordinator failure: " + node); + + break; + } if (!curFut.context().supportsMergeExchanges(node)) { if (log.isInfoEnabled()) log.info("Stop merge, node does not support merge: " + node); http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index dc24586..2af7fd8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -86,7 +86,6 @@ import org.apache.ignite.internal.processors.cache.dr.GridCacheDrManager; import org.apache.ignite.internal.processors.cache.jta.CacheJtaManagerAdapter; import org.apache.ignite.internal.processors.cache.local.GridLocalCache; import org.apache.ignite.internal.processors.cache.local.atomic.GridLocalAtomicCache; -import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager; import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.MemoryPolicy; @@ -2176,7 +2175,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { @SuppressWarnings("unchecked") private GridCacheSharedContext createSharedContext(GridKernalContext kernalCtx, Collection<CacheStoreSessionListener> storeSesLsnrs) throws IgniteCheckedException { - CacheCoordinatorsSharedManager coord = new CacheCoordinatorsSharedManager(); IgniteTxManager tm = new IgniteTxManager(); GridCacheMvccManager mvccMgr = new GridCacheMvccManager(); GridCacheVersionManager verMgr = new GridCacheVersionManager(); @@ -2215,7 +2213,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { return new GridCacheSharedContext( kernalCtx, - coord, tm, verMgr, mvccMgr, http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index bf5b999..f4e4d48 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -36,17 +36,15 @@ import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.managers.communication.GridIoManager; import org.apache.ignite.internal.managers.deployment.GridDeploymentManager; -import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager; import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.jta.CacheJtaManagerAdapter; -import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager; +import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor; import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager; import org.apache.ignite.internal.processors.cache.store.CacheStoreManager; @@ -123,9 +121,6 @@ public class GridCacheSharedContext<K, V> { /** Ttl cleanup manager. */ private GridCacheSharedTtlCleanupManager ttlMgr; - /** Cache mvcc coordinator. */ - private CacheCoordinatorsSharedManager crd; - /** Cache contexts map. */ private ConcurrentHashMap8<Integer, GridCacheContext<K, V>> ctxMap; @@ -170,7 +165,6 @@ public class GridCacheSharedContext<K, V> { /** * @param kernalCtx Context. - * @param crd Cache mvcc coordinator manager. * @param txMgr Transaction manager. * @param verMgr Version manager. * @param mvccMgr MVCC manager. @@ -184,7 +178,6 @@ public class GridCacheSharedContext<K, V> { */ public GridCacheSharedContext( GridKernalContext kernalCtx, - CacheCoordinatorsSharedManager crd, IgniteTxManager txMgr, GridCacheVersionManager verMgr, GridCacheMvccManager mvccMgr, @@ -203,7 +196,6 @@ public class GridCacheSharedContext<K, V> { this.kernalCtx = kernalCtx; setManagers(mgrs, - crd, txMgr, jtaMgr, verMgr, @@ -376,7 +368,6 @@ public class GridCacheSharedContext<K, V> { List<GridCacheSharedManager<K, V>> mgrs = new LinkedList<>(); setManagers(mgrs, - crd, txMgr, jtaMgr, verMgr, @@ -416,7 +407,6 @@ public class GridCacheSharedContext<K, V> { /** * @param mgrs Managers list. - * @param coord Cache mvcc coordinator manager. * @param txMgr Transaction manager. * @param jtaMgr JTA manager. * @param verMgr Version manager. @@ -428,7 +418,6 @@ public class GridCacheSharedContext<K, V> { * @param ttlMgr Ttl cleanup manager. */ private void setManagers(List<GridCacheSharedManager<K, V>> mgrs, - CacheCoordinatorsSharedManager coord, IgniteTxManager txMgr, CacheJtaManagerAdapter jtaMgr, GridCacheVersionManager verMgr, @@ -442,7 +431,6 @@ public class GridCacheSharedContext<K, V> { CacheAffinitySharedManager affMgr, GridCacheIoManager ioMgr, GridCacheSharedTtlCleanupManager ttlMgr) { - this.crd = add(mgrs, coord); this.mvccMgr = add(mgrs, mvccMgr); this.verMgr = add(mgrs, verMgr); this.txMgr = add(mgrs, txMgr); @@ -785,8 +773,8 @@ public class GridCacheSharedContext<K, V> { /** * @return Cache mvcc coordinator manager. */ - public CacheCoordinatorsSharedManager coordinators() { - return crd; + public CacheCoordinatorsProcessor coordinators() { + return kernalCtx.coordinators(); } /** @@ -844,7 +832,7 @@ public class GridCacheSharedContext<K, V> { /** * Captures all ongoing operations that we need to wait before we can proceed to the next topology version. * This method must be called only after - * {@link GridDhtPartitionTopology#updateTopologyVersion(GridDhtTopologyFuture, DiscoCache, long, boolean)} + * {@link GridDhtPartitionTopology#updateTopologyVersion} * method is called so that all new updates will wait to switch to the new version. * This method will capture: * <ul> http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index a31f91b..77039cc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -474,7 +474,7 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter cctx.database().checkpointReadLock(); try { - assert !txState.mvccEnabled(cctx) || mvccVer != null; + assert !txState.mvccEnabled(cctx) || mvccInfo != null; Collection<IgniteTxEntry> entries = near() ? allEntries() : writeEntries(); @@ -597,7 +597,7 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter resolveTaskName(), dhtVer, txEntry.updateCounter(), - mvccVer); + mvccInfo != null ? mvccInfo.version() : null); else { assert val != null : txEntry; @@ -622,7 +622,7 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter resolveTaskName(), dhtVer, txEntry.updateCounter(), - mvccVer); + mvccInfo != null ? mvccInfo.version() : null); // Keep near entry up to date. if (nearCached != null) { @@ -655,7 +655,7 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter resolveTaskName(), dhtVer, txEntry.updateCounter(), - mvccVer); + mvccInfo != null ? mvccInfo.version() : null); // Keep near entry up to date. if (nearCached != null) http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index e994113..e328c25 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -45,6 +45,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator; import org.apache.ignite.internal.util.F0; import org.apache.ignite.internal.util.GridAtomicLong; import org.apache.ignite.internal.util.GridPartitionStateMap; @@ -195,9 +196,15 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } /** {@inheritDoc} */ + @Override public MvccCoordinator mvccCoordinator() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ @Override public void updateTopologyVersion( GridDhtTopologyFuture exchFut, DiscoCache discoCache, + MvccCoordinator mvccCrd, long updSeq, boolean stopping ) throws IgniteInterruptedCheckedException { http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index ee42a14..5dbb3a8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -1229,6 +1229,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap if (expVer.equals(curVer)) return false; + // TODO IGNITE-3478 check mvcc crd for mvcc enabled txs. + Collection<ClusterNode> cacheNodes0 = ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), expVer); Collection<ClusterNode> cacheNodes1 = ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), curVer); http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index 4ae68ef..cf6554a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.jetbrains.annotations.Nullable; @@ -69,6 +70,7 @@ public interface GridDhtPartitionTopology { public void updateTopologyVersion( GridDhtTopologyFuture exchFut, DiscoCache discoCache, + MvccCoordinator mvccCrd, long updateSeq, boolean stopping ) throws IgniteInterruptedCheckedException; @@ -379,4 +381,6 @@ public interface GridDhtPartitionTopology { * @param updateRebalanceVer {@code True} if need check rebalance state. */ public void onExchangeDone(GridDhtPartitionsExchangeFuture fut, AffinityAssignment assignment, boolean updateRebalanceVer); + + public MvccCoordinator mvccCoordinator(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 380066a..1f3d00d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -49,6 +49,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator; import org.apache.ignite.internal.util.F0; import org.apache.ignite.internal.util.GridAtomicLong; import org.apache.ignite.internal.util.GridPartitionStateMap; @@ -137,6 +138,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** */ private volatile AffinityTopologyVersion rebalancedTopVer = AffinityTopologyVersion.NONE; + /** */ + private volatile MvccCoordinator mvccCrd; + /** * @param ctx Cache shared context. * @param grp Cache group. @@ -229,9 +233,15 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** {@inheritDoc} */ + @Override public MvccCoordinator mvccCoordinator() { + return mvccCrd; + } + + /** {@inheritDoc} */ @Override public void updateTopologyVersion( GridDhtTopologyFuture exchFut, DiscoCache discoCache, + MvccCoordinator mvccCrd, long updSeq, boolean stopping ) throws IgniteInterruptedCheckedException { @@ -255,6 +265,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { lastTopChangeVer = exchTopVer; this.discoCache = discoCache; + this.mvccCrd = mvccCrd; } finally { lock.writeLock().unlock(); http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index dd00ad1..d624e2c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -35,6 +35,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFutu import org.apache.ignite.internal.processors.cache.GridCacheFuture; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; +import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -295,11 +296,11 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity GridLongList waitTxs = tx.mvccWaitTransactions(); if (waitTxs != null) { - ClusterNode crd = cctx.coordinators().coordinator(tx.topologyVersion()); + TxMvccInfo mvccInfo = tx.mvccInfo(); - assert crd != null; + assert mvccInfo != null; - IgniteInternalFuture fut = cctx.coordinators().waitTxsFuture(crd, waitTxs); + IgniteInternalFuture fut = cctx.coordinators().waitTxsFuture(mvccInfo.coordinator(), waitTxs); add(fut); @@ -411,7 +412,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity if (tx.onePhaseCommit()) return false; - assert !commit || !tx.txState().mvccEnabled(cctx) || tx.mvccCoordinatorVersion() != null; + assert !commit || !tx.txState().mvccEnabled(cctx) || tx.mvccInfo() != null; boolean sync = tx.syncMode() == FULL_SYNC; @@ -469,7 +470,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity updCntrs, false, false, - tx.mvccCoordinatorVersion()); + tx.mvccInfo()); req.writeVersion(tx.writeVersion() != null ? tx.writeVersion() : tx.xidVersion()); @@ -539,7 +540,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity tx.activeCachesDeploymentEnabled(), false, false, - tx.mvccCoordinatorVersion()); + tx.mvccInfo()); req.writeVersion(tx.writeVersion()); @@ -582,6 +583,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity @SuppressWarnings("unchecked") @Override public void addDiagnosticRequest(IgniteDiagnosticPrepareContext ctx) { if (!isDone()) { + // TODO IGNITE-3478 (mvcc wait txs fut) for (IgniteInternalFuture fut : futures()) { if (!fut.isDone()) { MiniFuture f = (MiniFuture)fut; @@ -608,6 +610,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity /** {@inheritDoc} */ @Override public String toString() { + // TODO IGNITE-3478 (mvcc wait txs fut) Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() { @SuppressWarnings("unchecked") @Override public String apply(IgniteInternalFuture<?> f) { http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java index df8c951..0df0b66 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java @@ -25,7 +25,7 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishRequest; -import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; +import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -68,7 +68,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { private GridCacheVersion writeVer; /** */ - private MvccCoordinatorVersion mvccVer; + private TxMvccInfo mvccInfo; /** * Empty constructor required for {@link Externalizable}. @@ -126,7 +126,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { boolean addDepInfo, boolean retVal, boolean waitRemoteTxs, - MvccCoordinatorVersion mvccVer + TxMvccInfo mvccInfo ) { super( xidVer, @@ -155,7 +155,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { this.nearNodeId = nearNodeId; this.isolation = isolation; this.miniId = miniId; - this.mvccVer = mvccVer; + this.mvccInfo = mvccInfo; needReturnValue(retVal); waitRemoteTransactions(waitRemoteTxs); @@ -213,7 +213,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { Collection<Long> updateIdxs, boolean retVal, boolean waitRemoteTxs, - MvccCoordinatorVersion mvccVer + TxMvccInfo mvccInfo ) { this(nearNodeId, futId, @@ -239,7 +239,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { addDepInfo, retVal, waitRemoteTxs, - mvccVer); + mvccInfo); if (updateIdxs != null && !updateIdxs.isEmpty()) { partUpdateCnt = new GridLongList(updateIdxs.size()); @@ -252,8 +252,8 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { /** * @return Counter. */ - public MvccCoordinatorVersion mvccCoordinatorVersion() { - return mvccVer; + public TxMvccInfo mvccInfo() { + return mvccInfo; } /** @@ -382,7 +382,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { writer.incrementState(); case 23: - if (!writer.writeMessage("mvccVer", mvccVer)) + if (!writer.writeMessage("mvccInfo", mvccInfo)) return false; writer.incrementState(); @@ -448,7 +448,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); case 23: - mvccVer = reader.readMessage("mvccVer"); + mvccInfo = reader.readMessage("mvccInfo"); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index cee5d9b..e4a7141 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -37,8 +37,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse; -import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager; -import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter; http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 99e1a7a..3143c4f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -61,8 +61,10 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTx import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator; import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; import org.apache.ignite.internal.processors.cache.mvcc.MvccResponseListener; +import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; @@ -258,6 +260,11 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite } /** {@inheritDoc} */ + @Nullable @Override public IgniteLogger logger() { + return log; + } + + /** {@inheritDoc} */ @Override public IgniteUuid futureId() { return futId; } @@ -872,7 +879,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite tx.onePhaseCommit(), tx.activeCachesDeploymentEnabled()); - res.mvccCoordinatorVersion(tx.mvccCoordinatorVersion()); + res.mvccInfo(tx.mvccInfo()); if (prepErr == null) { if (tx.needReturnValue() || tx.nearOnOriginatingNode() || tx.hasInterceptor()) @@ -1229,19 +1236,23 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite } } - IgniteInternalFuture<Long> waitCrdCntrFut = null; + IgniteInternalFuture<MvccCoordinatorVersion> waitCrdCntrFut = null; if (req.requestMvccCounter()) { + assert last; + assert tx.txState().mvccEnabled(cctx); - ClusterNode crd = cctx.coordinators().coordinator(tx.topologyVersion()); + MvccCoordinator crd = cctx.coordinators().currentCoordinator(); assert crd != null : tx.topologyVersion(); - if (crd.isLocal()) - tx.mvccCoordinatorVersion(cctx.coordinators().requestTxCounterOnCoordinator(tx)); + if (crd.nodeId().equals(cctx.localNodeId())) + onMvccResponse(cctx.localNodeId(), cctx.coordinators().requestTxCounterOnCoordinator(tx)); else { - IgniteInternalFuture<Long> crdCntrFut = cctx.coordinators().requestTxCounter(crd, this, tx.nearXidVersion()); + IgniteInternalFuture<MvccCoordinatorVersion> crdCntrFut = cctx.coordinators().requestTxCounter(crd, + this, + tx.nearXidVersion()); if (tx.onePhaseCommit()) waitCrdCntrFut = crdCntrFut; @@ -1271,23 +1282,23 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite if (waitCrdCntrFut != null) { skipInit = true; - waitCrdCntrFut.listen(new IgniteInClosure<IgniteInternalFuture<Long>>() { - @Override public void apply(IgniteInternalFuture<Long> fut) { + waitCrdCntrFut.listen(new IgniteInClosure<IgniteInternalFuture<MvccCoordinatorVersion>>() { + @Override public void apply(IgniteInternalFuture<MvccCoordinatorVersion> fut) { try { fut.get(); sendPrepareRequests(); + + markInitialized(); } catch (Throwable e) { - U.error(log, "Failed to get coordinator counter: " + e, e); + U.error(log, "Failed to get mvcc version for tx [txId=" + tx.nearXidVersion() + + ", err=" + e + ']', e); GridNearTxPrepareResponse res = createPrepareResponse(e); onDone(res, res.error()); } - finally { - markInitialized(); - } } }); } @@ -1302,8 +1313,8 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite } /** {@inheritDoc} */ - @Override public void onMvccResponse(MvccCoordinatorVersion res) { - tx.mvccCoordinatorVersion(res); + @Override public void onMvccResponse(UUID crdId, MvccCoordinatorVersion res) { + tx.mvccInfo(new TxMvccInfo(crdId, res)); } /** {@inheritDoc} */ @@ -1325,7 +1336,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite } } - assert !tx.txState().mvccEnabled(cctx) || !tx.onePhaseCommit() || tx.mvccCoordinatorVersion() != null; + assert !tx.txState().mvccEnabled(cctx) || !tx.onePhaseCommit() || tx.mvccInfo() != null; int miniId = 0; @@ -1376,7 +1387,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite tx.activeCachesDeploymentEnabled(), tx.storeWriteThrough(), retVal, - tx.mvccCoordinatorVersion()); + tx.mvccInfo()); int idx = 0; @@ -1490,7 +1501,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite tx.activeCachesDeploymentEnabled(), tx.storeWriteThrough(), retVal, - tx.mvccCoordinatorVersion()); + tx.mvccInfo()); for (IgniteTxEntry entry : nearMapping.entries()) { if (CU.writes().apply(entry)) { http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java index da7f831..e099a32 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java @@ -34,7 +34,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxPrepareRequest; -import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; +import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -105,7 +105,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { private List<IgniteTxKey> nearWritesCacheMissed; /** */ - private MvccCoordinatorVersion mvccVer; + private TxMvccInfo mvccInfo; /** * Empty constructor required for {@link Externalizable}. @@ -146,7 +146,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { boolean addDepInfo, boolean storeWriteThrough, boolean retVal, - MvccCoordinatorVersion mvccVer) { + TxMvccInfo mvccInfo) { super(tx, timeout, null, @@ -175,14 +175,14 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { nearNodeId = tx.nearNodeId(); - this.mvccVer = mvccVer; + this.mvccInfo = mvccInfo; } /** - * @return Counter. + * @return Mvcc info. */ - public MvccCoordinatorVersion mvccCoordinatorVersion() { - return mvccVer; + public TxMvccInfo mvccInfo() { + return mvccInfo; } /** @@ -421,7 +421,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { writer.incrementState(); case 23: - if (!writer.writeMessage("mvccVer", mvccVer)) + if (!writer.writeMessage("mvccInfo", mvccInfo)) return false; writer.incrementState(); @@ -521,7 +521,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 23: - mvccVer = reader.readMessage("mvccVer"); + mvccInfo = reader.readMessage("mvccInfo"); if (!reader.isLastRead()) return false;
