Repository: ignite Updated Branches: refs/heads/ignite-3479 537a3ecb2 -> 4e7f19ede
ignite-3479 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4e7f19ed Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4e7f19ed Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4e7f19ed Branch: refs/heads/ignite-3479 Commit: 4e7f19ede40f35f1d9657f5dcb1b2ea8aeeb42d3 Parents: 537a3ec Author: sboikov <[email protected]> Authored: Wed Sep 27 16:31:54 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed Sep 27 17:50:37 2017 +0300 ---------------------------------------------------------------------- .../apache/ignite/internal/GridComponent.java | 5 +- .../ignite/internal/GridKernalContext.java | 6 + .../ignite/internal/GridKernalContextImpl.java | 14 +- .../apache/ignite/internal/IgniteKernal.java | 4 +- .../GridCachePartitionExchangeManager.java | 2 +- .../processors/cache/GridCacheProcessor.java | 2 - .../cache/GridCacheSharedContext.java | 16 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 10 +- .../dht/GridPartitionedGetFuture.java | 162 ++++------------ .../GridDhtPartitionsExchangeFuture.java | 14 +- .../GridNearPessimisticTxPrepareFuture.java | 17 +- .../mvcc/CacheCoordinatorsSharedManager.java | 97 ++++++---- .../processors/cache/mvcc/MvccCoordinator.java | 28 ++- .../processors/cache/mvcc/MvccQueryAware.java | 43 +++++ .../processors/cache/mvcc/MvccQueryFuture.java | 27 --- .../processors/cache/mvcc/MvccQueryTracker.java | 192 +++++++++++++++++++ .../wal/reader/IgniteWalIteratorFactory.java | 2 +- .../wal/reader/StandaloneGridKernalContext.java | 6 + .../query/GridCacheDistributedQueryManager.java | 2 +- .../cache/query/GridCacheQueryManager.java | 2 +- .../cache/mvcc/CacheMvccTransactionsTest.java | 113 ++++++++++- .../pagemem/BPlusTreePageMemoryImplTest.java | 1 - .../BPlusTreeReuseListPageMemoryImplTest.java | 1 - .../MetadataStoragePageMemoryImplTest.java | 1 - .../pagemem/PageMemoryImplNoLoadTest.java | 1 - .../persistence/pagemem/PageMemoryImplTest.java | 1 - .../loadtests/hashmap/GridCacheTestContext.java | 2 - 27 files changed, 510 insertions(+), 261 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java index 93ffe95..c3a8127 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java @@ -61,7 +61,10 @@ public interface GridComponent { BINARY_PROC, /** Query processor. */ - QUERY_PROC + QUERY_PROC, + + /** */ + CACHE_CRD_PROC } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index 99c7cce..971be7e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -34,6 +34,7 @@ import org.apache.ignite.internal.managers.indexing.GridIndexingManager; import org.apache.ignite.internal.managers.loadbalancer.GridLoadBalancerManager; import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor; import org.apache.ignite.internal.processors.cache.GridCacheProcessor; +import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; import org.apache.ignite.internal.processors.closure.GridClosureProcessor; import org.apache.ignite.internal.processors.cluster.ClusterProcessor; @@ -643,4 +644,9 @@ public interface GridKernalContext extends Iterable<GridComponent> { * @return Platform processor. */ public PlatformProcessor platform(); + + /** + * @return Cache mvcc coordinator processor. + */ + public CacheCoordinatorsSharedManager coordinators(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index 07e5970..1715887 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -49,6 +49,7 @@ import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor; import org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager; import org.apache.ignite.internal.processors.cache.GridCacheProcessor; import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; +import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; import org.apache.ignite.internal.processors.closure.GridClosureProcessor; import org.apache.ignite.internal.processors.cluster.ClusterProcessor; @@ -282,6 +283,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable @GridToStringExclude private DataStructuresProcessor dataStructuresProc; + /** Cache mvcc coordinators. */ + @GridToStringExclude + private CacheCoordinatorsSharedManager coordProc; + /** */ @GridToStringExclude private List<GridComponent> comps = new LinkedList<>(); @@ -344,7 +349,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable /** */ @GridToStringExclude - Map<String, ? extends ExecutorService> customExecSvcs; + private Map<String, ? extends ExecutorService> customExecSvcs; /** */ @GridToStringExclude @@ -579,6 +584,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable poolProc = (PoolProcessor) comp; else if (comp instanceof GridMarshallerMappingProcessor) mappingProc = (GridMarshallerMappingProcessor)comp; + else if (comp instanceof CacheCoordinatorsSharedManager) + coordProc = (CacheCoordinatorsSharedManager)comp; else if (!(comp instanceof DiscoveryNodeValidationProcessor || comp instanceof PlatformPluginProcessor)) assert (comp instanceof GridPluginComponent) : "Unknown manager class: " + comp.getClass(); @@ -834,6 +841,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable } /** {@inheritDoc} */ + @Override public CacheCoordinatorsSharedManager coordinators() { + return coordProc; + } + + /** {@inheritDoc} */ @Override public IgniteLogger log(String ctgr) { return config().getGridLogger().getLogger(ctgr); } http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index fabbeed..7b833bc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -114,6 +114,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheUtilityKey; import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; +import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager; import org.apache.ignite.internal.processors.cache.persistence.MemoryPolicy; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; import org.apache.ignite.internal.processors.closure.GridClosureProcessor; @@ -937,8 +938,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { // Start processors before discovery manager, so they will // be able to start receiving messages once discovery completes. try { + startProcessor(new CacheCoordinatorsSharedManager(ctx)); startProcessor(createComponent(DiscoveryNodeValidationProcessor.class, ctx)); - startProcessor(new GridAffinityProcessor(ctx)); + startProcessor(new GridAffinityProcessor(ctx)); startProcessor(createComponent(GridSegmentationProcessor.class, ctx)); startProcessor(createComponent(IgniteCacheObjectProcessor.class, ctx)); startProcessor(new GridClusterStateProcessor(ctx)); http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index b576789..f850ad3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -805,7 +805,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * @param ver Topology version. * @return Future or {@code null} is future is already completed. */ - @Nullable public IgniteInternalFuture<?> affinityReadyFuture(AffinityTopologyVersion ver) { + @Nullable public IgniteInternalFuture<AffinityTopologyVersion> affinityReadyFuture(AffinityTopologyVersion ver) { GridDhtPartitionsExchangeFuture lastInitializedFut0 = lastInitializedFut; if (lastInitializedFut0 != null && lastInitializedFut0.initialVersion().compareTo(ver) == 0) { http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index dc24586..e52c56c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -2176,7 +2176,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { @SuppressWarnings("unchecked") private GridCacheSharedContext createSharedContext(GridKernalContext kernalCtx, Collection<CacheStoreSessionListener> storeSesLsnrs) throws IgniteCheckedException { - CacheCoordinatorsSharedManager coord = new CacheCoordinatorsSharedManager(); IgniteTxManager tm = new IgniteTxManager(); GridCacheMvccManager mvccMgr = new GridCacheMvccManager(); GridCacheVersionManager verMgr = new GridCacheVersionManager(); @@ -2215,7 +2214,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { return new GridCacheSharedContext( kernalCtx, - coord, tm, verMgr, mvccMgr, http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index bf5b999..1cdee39 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -36,14 +36,12 @@ import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.managers.communication.GridIoManager; import org.apache.ignite.internal.managers.deployment.GridDeploymentManager; -import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager; import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.jta.CacheJtaManagerAdapter; import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager; @@ -123,9 +121,6 @@ public class GridCacheSharedContext<K, V> { /** Ttl cleanup manager. */ private GridCacheSharedTtlCleanupManager ttlMgr; - /** Cache mvcc coordinator. */ - private CacheCoordinatorsSharedManager crd; - /** Cache contexts map. */ private ConcurrentHashMap8<Integer, GridCacheContext<K, V>> ctxMap; @@ -170,7 +165,6 @@ public class GridCacheSharedContext<K, V> { /** * @param kernalCtx Context. - * @param crd Cache mvcc coordinator manager. * @param txMgr Transaction manager. * @param verMgr Version manager. * @param mvccMgr MVCC manager. @@ -184,7 +178,6 @@ public class GridCacheSharedContext<K, V> { */ public GridCacheSharedContext( GridKernalContext kernalCtx, - CacheCoordinatorsSharedManager crd, IgniteTxManager txMgr, GridCacheVersionManager verMgr, GridCacheMvccManager mvccMgr, @@ -203,7 +196,6 @@ public class GridCacheSharedContext<K, V> { this.kernalCtx = kernalCtx; setManagers(mgrs, - crd, txMgr, jtaMgr, verMgr, @@ -376,7 +368,6 @@ public class GridCacheSharedContext<K, V> { List<GridCacheSharedManager<K, V>> mgrs = new LinkedList<>(); setManagers(mgrs, - crd, txMgr, jtaMgr, verMgr, @@ -416,7 +407,6 @@ public class GridCacheSharedContext<K, V> { /** * @param mgrs Managers list. - * @param coord Cache mvcc coordinator manager. * @param txMgr Transaction manager. * @param jtaMgr JTA manager. * @param verMgr Version manager. @@ -428,7 +418,6 @@ public class GridCacheSharedContext<K, V> { * @param ttlMgr Ttl cleanup manager. */ private void setManagers(List<GridCacheSharedManager<K, V>> mgrs, - CacheCoordinatorsSharedManager coord, IgniteTxManager txMgr, CacheJtaManagerAdapter jtaMgr, GridCacheVersionManager verMgr, @@ -442,7 +431,6 @@ public class GridCacheSharedContext<K, V> { CacheAffinitySharedManager affMgr, GridCacheIoManager ioMgr, GridCacheSharedTtlCleanupManager ttlMgr) { - this.crd = add(mgrs, coord); this.mvccMgr = add(mgrs, mvccMgr); this.verMgr = add(mgrs, verMgr); this.txMgr = add(mgrs, txMgr); @@ -786,7 +774,7 @@ public class GridCacheSharedContext<K, V> { * @return Cache mvcc coordinator manager. */ public CacheCoordinatorsSharedManager coordinators() { - return crd; + return kernalCtx.coordinators(); } /** @@ -844,7 +832,7 @@ public class GridCacheSharedContext<K, V> { /** * Captures all ongoing operations that we need to wait before we can proceed to the next topology version. * This method must be called only after - * {@link GridDhtPartitionTopology#updateTopologyVersion(GridDhtTopologyFuture, DiscoCache, long, boolean)} + * {@link GridDhtPartitionTopology#updateTopologyVersion} * method is called so that all new updates will wait to switch to the new version. * This method will capture: * <ul> http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 2af2d51..4eca4e8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -1236,7 +1236,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite } } - IgniteInternalFuture<Long> waitCrdCntrFut = null; + IgniteInternalFuture<MvccCoordinatorVersion> waitCrdCntrFut = null; if (req.requestMvccCounter()) { assert tx.txState().mvccEnabled(cctx); @@ -1245,10 +1245,10 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite assert crd != null : tx.topologyVersion(); - if (crd.node().isLocal()) + if (crd.nodeId().equals(cctx.localNodeId())) onMvccResponse(cctx.localNodeId(), cctx.coordinators().requestTxCounterOnCoordinator(tx)); else { - IgniteInternalFuture<Long> crdCntrFut = cctx.coordinators().requestTxCounter(crd.node(), + IgniteInternalFuture<MvccCoordinatorVersion> crdCntrFut = cctx.coordinators().requestTxCounter(crd, this, tx.nearXidVersion()); @@ -1280,8 +1280,8 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite if (waitCrdCntrFut != null) { skipInit = true; - waitCrdCntrFut.listen(new IgniteInClosure<IgniteInternalFuture<Long>>() { - @Override public void apply(IgniteInternalFuture<Long> fut) { + waitCrdCntrFut.listen(new IgniteInClosure<IgniteInternalFuture<MvccCoordinatorVersion>>() { + @Override public void apply(IgniteInternalFuture<MvccCoordinatorVersion> fut) { try { fut.get(); http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index 37e9feb6..4bfd0fe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -41,12 +41,13 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator; import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; -import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryFuture; +import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryAware; +import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; -import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; -import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.GridLeanMap; import org.apache.ignite.internal.util.future.GridFinishedFuture; @@ -60,14 +61,13 @@ import org.apache.ignite.internal.util.typedef.P1; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; /** * Colocated get future. */ -public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAdapter<K, V> implements MvccQueryFuture { +public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAdapter<K, V> implements MvccQueryAware { /** */ private static final long serialVersionUID = 0L; @@ -78,10 +78,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda private static IgniteLogger log; /** */ - private MvccCoordinator mvccCrd; - - /** */ - private MvccCoordinatorVersion mvccVer; + private MvccQueryTracker mvccTracker; /** * @param cctx Context. @@ -130,6 +127,18 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda } /** + * @return Mvcc version if mvcc is enabled for cache. + */ + @Nullable private MvccCoordinatorVersion mvccVersion() { + if (!cctx.mvccEnabled()) + return null; + + assert mvccTracker != null; + + return mvccTracker.mvccVersion(); + } + + /** * Initializes future. * * @param topVer Topology version. @@ -148,11 +157,13 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda } if (cctx.mvccEnabled()) { + mvccTracker = new MvccQueryTracker(cctx, canRemap, this); + trackable = true; cctx.mvcc().addFuture(this, futId); - requestMvccVersionAndMap(topVer); + mvccTracker.requestVersion(topVer); return; } @@ -160,106 +171,31 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda initialMap(topVer); } - /** - * @param topVer Topology version. - */ - private void initialMap(AffinityTopologyVersion topVer) { - map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), topVer); + /** {@inheritDoc} */ + @Override public void onMvccVersionReceived(AffinityTopologyVersion topVer) { + initialMap(topVer); + } - markInitialized(); + /** {@inheritDoc} */ + @Override public void onMvccVersionError(IgniteCheckedException e) { + onDone(e); } /** {@inheritDoc} */ @Nullable @Override public MvccCoordinatorVersion onMvccCoordinatorChange(MvccCoordinator newCrd) { - if (!cctx.mvccEnabled()) - return null; + if (mvccTracker != null) + return mvccTracker.onMvccCoordinatorChange(newCrd); - synchronized (this) { - if (mvccVer != null) { - mvccCrd = newCrd; - - return mvccVer; - } - else if (mvccCrd != null) - mvccCrd = null; - - return null; - } + return null; } /** * @param topVer Topology version. */ - private void requestMvccVersionAndMap(final AffinityTopologyVersion topVer) { - MvccCoordinator mvccCrd = cctx.affinity().mvccCoordinator(topVer); - - if (mvccCrd == null) { - onDone(new IgniteCheckedException("Mvcc coordinator is not assigned: " + topVer)); - - return; - } - - synchronized (this) { - this.mvccCrd = mvccCrd; - } - - MvccCoordinator curCrd = cctx.topology().mvccCoordinator(); - - if (!mvccCrd.equals(curCrd)) { - assert cctx.topology().topologyVersionFuture().initialVersion().compareTo(topVer) > 0; - - // TODO IGNITE-3479. - } - - IgniteInternalFuture<MvccCoordinatorVersion> cntrFut = cctx.shared().coordinators().requestQueryCounter(mvccCrd.node()); - - cntrFut.listen(new IgniteInClosure<IgniteInternalFuture<MvccCoordinatorVersion>>() { - @Override public void apply(IgniteInternalFuture<MvccCoordinatorVersion> fut) { - boolean needRemap = false; - - try { - MvccCoordinatorVersion rcvdVer = fut.get(); - - synchronized (GridPartitionedGetFuture.class) { - if (GridPartitionedGetFuture.this.mvccCrd != null) { - mvccVer = rcvdVer; - } - else - needRemap = true; - } - - if (!needRemap) - initialMap(topVer); - } - catch (ClusterTopologyCheckedException e) { - needRemap = true; - } - catch (IgniteCheckedException e) { - GridPartitionedGetFuture.this.onDone(e); - } + private void initialMap(AffinityTopologyVersion topVer) { + map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), topVer); - if (needRemap) { - if (canRemap) { - IgniteInternalFuture<AffinityTopologyVersion> waitFut = waitRemapFuture(topVer); - - waitFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { - try { - requestMvccVersionAndMap(fut.get()); - } - catch (IgniteCheckedException e) { - GridPartitionedGetFuture.this.onDone(e); - } - } - }); - } - else { - GridPartitionedGetFuture.this.onDone(new ClusterTopologyCheckedException("Failed to " + - "request mvcc version, coordinator failed")); - } - } - } - }); + markInitialized(); } /** {@inheritDoc} */ @@ -319,24 +255,8 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda if (trackable) cctx.mvcc().removeFuture(futId); - if (cctx.mvccEnabled()) { - MvccCoordinator mvccCrd0 = null; - MvccCoordinatorVersion mvccVer0 = null; - - synchronized (this) { - if (mvccVer != null) { - assert mvccCrd != null; - - mvccCrd0 = mvccCrd; - mvccVer0 = mvccVer; - - mvccVer = null; - } - } - - if (mvccVer0 != null) - cctx.shared().coordinators().ackQueryDone(mvccCrd0, mvccVer0); - } + if (mvccTracker != null) + mvccTracker.onQueryDone(); cache().sendTtlUpdateRequest(expiryPlc); @@ -431,7 +351,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda expiryPlc, skipVals, recovery, - mvccVer); + mvccVersion()); final Collection<Integer> invalidParts = fut.invalidPartitions(); @@ -488,7 +408,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda skipVals, cctx.deploymentEnabled(), recovery, - mvccVer); + mvccVersion()); add(fut); // Append new future. @@ -595,7 +515,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda if (readNoEntry) { CacheDataRow row = cctx.mvccEnabled() ? - cctx.offheap().mvccRead(cctx, key, mvccVer) : + cctx.offheap().mvccRead(cctx, key, mvccVersion()) : cctx.offheap().read(cctx, key); if (row != null) { @@ -639,7 +559,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda taskName, expiryPlc, !deserializeBinary, - mvccVer, + mvccVersion(), null); if (getRes != null) { @@ -659,7 +579,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda taskName, expiryPlc, !deserializeBinary, - mvccVer); + mvccVersion()); } cache.context().evicts().touch(entry, topVer); http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 51da7a0..d93b359 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -26,7 +26,6 @@ import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; @@ -80,7 +79,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopolo import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator; import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; import org.apache.ignite.internal.processors.cache.mvcc.MvccCounter; -import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryFuture; +import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryAware; import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -571,11 +570,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte else if (exchId.isLeft()){ MvccCoordinator mvccCrd = cctx.coordinators().currentCoordinator(); - if (mvccCrd != null && mvccCrd.node().equals(exchId.eventNode())) { - assert !CU.clientNode(mvccCrd.node()) : mvccCrd; - + if (mvccCrd != null && mvccCrd.nodeId().equals(exchId.eventNode().id())) newMvccCrd = cctx.coordinators().reassignCoordinator(firstEvtDiscoCache) != null; - } } exchCtx = new ExchangeContext(crdNode, newMvccCrd, this); @@ -822,8 +818,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte Map<MvccCounter, Integer> activeQrys = null; for (GridCacheFuture<?> fut : cctx.mvcc().activeFutures()) { - if (fut instanceof MvccQueryFuture) { - MvccCoordinatorVersion ver = ((MvccQueryFuture)fut).onMvccCoordinatorChange(mvccCrd); + if (fut instanceof MvccQueryAware) { + MvccCoordinatorVersion ver = ((MvccQueryAware)fut).onMvccCoordinatorChange(mvccCrd); if (ver != null ) { MvccCounter cntr = new MvccCounter(ver.coordinatorVersion(), ver.counter()); @@ -1483,7 +1479,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } if (err == null) { - if (exchCtx.newMvccCoordinator() && cctx.localNode().equals(cctx.coordinators().currentCoordinatorNode())) + if (exchCtx.newMvccCoordinator() && cctx.localNodeId().equals(cctx.coordinators().currentCoordinatorId())) cctx.coordinators().initCoordinator(res, exchCtx.events().discoveryCache(), exchCtx.activeQueries()); if (centralizedAff) { http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java index 80fd326..c6192d9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java @@ -271,7 +271,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA AffinityTopologyVersion topVer = tx.topologyVersion(); - ClusterNode mvccCrd = null; + MvccCoordinator mvccCrd = null; GridDhtTxMapping txMapping = new GridDhtTxMapping(); @@ -296,15 +296,13 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA nodes = cacheCtx.affinity().nodesByKey(txEntry.key(), topVer); if (mvccCrd == null && cacheCtx.mvccEnabled()) { - MvccCoordinator mvccCrd0 = cacheCtx.affinity().mvccCoordinator(topVer); + mvccCrd = cacheCtx.affinity().mvccCoordinator(topVer); - if (mvccCrd0 == null) { + if (mvccCrd == null) { onDone(new IgniteCheckedException("Mvcc coordinator is not assigned: " + topVer)); return; } - else - mvccCrd = mvccCrd0.node(); } if (F.isEmpty(nodes)) { @@ -433,13 +431,14 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA if (mvccCrd != null) { assert !tx.onePhaseCommit(); - if (mvccCrd.isLocal()) { + if (mvccCrd.equals(cctx.localNodeId())) { MvccCoordinatorVersion mvccVer = cctx.coordinators().requestTxCounterOnCoordinator(tx); onMvccResponse(cctx.localNodeId(), mvccVer); } else { - IgniteInternalFuture<Long> cntrFut = cctx.coordinators().requestTxCounter(mvccCrd, this, tx.nearXidVersion()); + IgniteInternalFuture<MvccCoordinatorVersion> cntrFut = + cctx.coordinators().requestTxCounter(mvccCrd, this, tx.nearXidVersion()); add((IgniteInternalFuture)cntrFut); } @@ -495,8 +494,8 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA CacheCoordinatorsSharedManager.MvccVersionFuture crdFut = (CacheCoordinatorsSharedManager.MvccVersionFuture)f; - return "[mvccCrdNode=" + crdFut.crd.id() + - ", loc=" + crdFut.crd.isLocal() + + return "[mvccCrdNode=" + crdFut.crdId + + ", loc=" + crdFut.crdId.equals(cctx.localNodeId()) + ", done=" + f.isDone() + "]"; } else http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java index f144437..73febc0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java @@ -30,6 +30,8 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; +import org.apache.ignite.internal.GridComponent; +import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.GridTopic; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; @@ -37,6 +39,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; +import org.apache.ignite.internal.processors.GridProcessorAdapter; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; @@ -45,6 +48,7 @@ import org.apache.ignite.internal.util.GridAtomicLong; import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.plugin.extensions.communication.Message; @@ -59,7 +63,7 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYS /** * */ -public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { +public class CacheCoordinatorsSharedManager extends GridProcessorAdapter { /** */ public static final long COUNTER_NA = 0L; @@ -128,10 +132,17 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager return Long.compare(ver1.counter(), ver2.counter()); } + public CacheCoordinatorsSharedManager(GridKernalContext ctx) { + super(ctx); + } + /** {@inheritDoc} */ - @Override protected void start0() throws IgniteCheckedException { - super.start0(); + @Override public DiscoveryDataExchangeType discoveryDataType() { + return DiscoveryDataExchangeType.CACHE_CRD_PROC; + } + /** {@inheritDoc} */ + @Override public void start() throws IgniteCheckedException { statCntrs = new StatCounter[7]; statCntrs[0] = new CounterWithAvg("CoordinatorTxCounterRequest", "avgTxs"); @@ -142,12 +153,12 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager statCntrs[5] = new StatCounter("CoordinatorWaitTxsRequest"); statCntrs[6] = new CounterWithAvg("CoordinatorWaitTxsResponse", "avgFutTime"); - cctx.gridEvents().addLocalEventListener(new CacheCoordinatorNodeFailListener(), + ctx.event().addLocalEventListener(new CacheCoordinatorNodeFailListener(), EVT_NODE_FAILED, EVT_NODE_LEFT); - cctx.gridIO().addMessageListener(MSG_TOPIC, new CoordinatorMessageListener()); + ctx.io().addMessageListener(MSG_TOPIC, new CoordinatorMessageListener()); } - + /** * @param log Logger. */ @@ -165,7 +176,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager * @return Counter. */ public MvccCoordinatorVersion requestTxCounterOnCoordinator(IgniteInternalTx tx) { - assert cctx.localNode().equals(currentCoordinatorNode()); + assert ctx.localNodeId().equals(currentCoordinatorId()); return assignTxCounter(tx.nearXidVersion(), 0L); } @@ -176,19 +187,19 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager * @param txVer Transaction version. * @return Counter request future. */ - public IgniteInternalFuture<MvccCoordinatorVersion> requestTxCounter(ClusterNode crd, + public IgniteInternalFuture<MvccCoordinatorVersion> requestTxCounter(MvccCoordinator crd, MvccResponseListener lsnr, GridCacheVersion txVer) { - assert !crd.isLocal() : crd; + assert !ctx.localNodeId().equals(crd.nodeId()); MvccVersionFuture fut = new MvccVersionFuture(futIdCntr.incrementAndGet(), - crd, + crd.nodeId(), lsnr); verFuts.put(fut.id, fut); try { - cctx.gridIO().sendToGridTopic(crd, + ctx.io().sendToGridTopic(crd.nodeId(), MSG_TOPIC, new CoordinatorTxCounterRequest(fut.id, txVer), MSG_POLICY); @@ -224,7 +235,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager new NewCoordinatorQueryAckRequest(mvccVer.coordinatorVersion(), trackCntr); try { - cctx.gridIO().sendToGridTopic(crd.node(), + ctx.io().sendToGridTopic(crd.nodeId(), MSG_TOPIC, msg, MSG_POLICY); @@ -242,16 +253,16 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager * @param crd Coordinator. * @return Counter request future. */ - public IgniteInternalFuture<MvccCoordinatorVersion> requestQueryCounter(ClusterNode crd) { + public IgniteInternalFuture<MvccCoordinatorVersion> requestQueryCounter(MvccCoordinator crd) { assert crd != null; // TODO IGNITE-3478: special case for local? - MvccVersionFuture fut = new MvccVersionFuture(futIdCntr.incrementAndGet(), crd, null); + MvccVersionFuture fut = new MvccVersionFuture(futIdCntr.incrementAndGet(), crd.nodeId(), null); verFuts.put(fut.id, fut); try { - cctx.gridIO().sendToGridTopic(crd, + ctx.io().sendToGridTopic(crd.nodeId(), MSG_TOPIC, new CoordinatorQueryVersionRequest(fut.id), MSG_POLICY); @@ -280,7 +291,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager ackFuts.put(fut.id, fut); try { - cctx.gridIO().sendToGridTopic(crdId, + ctx.io().sendToGridTopic(crdId, MSG_TOPIC, new CoordinatorWaitTxsRequest(fut.id, txs), MSG_POLICY); @@ -311,7 +322,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager ackFuts.put(fut.id, fut); try { - cctx.gridIO().sendToGridTopic(crd, + ctx.io().sendToGridTopic(crd, MSG_TOPIC, new CoordinatorTxAckRequest(fut.id, mvccVer.counter()), MSG_POLICY); @@ -338,7 +349,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager msg.skipResponse(true); try { - cctx.gridIO().sendToGridTopic(crdId, + ctx.io().sendToGridTopic(crdId, MSG_TOPIC, msg, MSG_POLICY); @@ -357,7 +368,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager * @param msg Message. */ private void processCoordinatorTxCounterRequest(UUID nodeId, CoordinatorTxCounterRequest msg) { - ClusterNode node = cctx.discovery().node(nodeId); + ClusterNode node = ctx.discovery().node(nodeId); if (node == null) { if (log.isDebugEnabled()) @@ -372,7 +383,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager statCntrs[0].update(res.size()); try { - cctx.gridIO().sendToGridTopic(node, + ctx.io().sendToGridTopic(node, MSG_TOPIC, res, MSG_POLICY); @@ -392,7 +403,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager * @param msg Message. */ private void processCoordinatorQueryVersionRequest(UUID nodeId, CoordinatorQueryVersionRequest msg) { - ClusterNode node = cctx.discovery().node(nodeId); + ClusterNode node = ctx.discovery().node(nodeId); if (node == null) { if (log.isDebugEnabled()) @@ -404,7 +415,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager MvccCoordinatorVersionResponse res = assignQueryCounter(nodeId, msg.futureId()); try { - cctx.gridIO().sendToGridTopic(node, + ctx.io().sendToGridTopic(node, MSG_TOPIC, res, MSG_POLICY); @@ -436,7 +447,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager fut.onResponse(msg); } else { - if (cctx.discovery().alive(nodeId)) + if (ctx.discovery().alive(nodeId)) U.warn(log, "Failed to find query version future [node=" + nodeId + ", msg=" + msg + ']'); else if (log.isDebugEnabled()) log.debug("Failed to find query version future [node=" + nodeId + ", msg=" + msg + ']'); @@ -470,7 +481,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager if (!msg.skipResponse()) { try { - cctx.gridIO().sendToGridTopic(nodeId, + ctx.io().sendToGridTopic(nodeId, MSG_TOPIC, new CoordinatorFutureResponse(msg.futureId()), MSG_POLICY); @@ -502,7 +513,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager fut.onResponse(); } else { - if (cctx.discovery().alive(nodeId)) + if (ctx.discovery().alive(nodeId)) U.warn(log, "Failed to find tx ack future [node=" + nodeId + ", msg=" + msg + ']'); else if (log.isDebugEnabled()) log.debug("Failed to find tx ack future [node=" + nodeId + ", msg=" + msg + ']'); @@ -715,7 +726,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager */ private void sendFutureResponse(UUID nodeId, CoordinatorWaitTxsRequest msg) { try { - cctx.gridIO().sendToGridTopic(nodeId, + ctx.io().sendToGridTopic(nodeId, MSG_TOPIC, new CoordinatorFutureResponse(msg.futureId()), MSG_POLICY); @@ -733,10 +744,10 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager return curCrd; } - public ClusterNode currentCoordinatorNode() { + public UUID currentCoordinatorId() { MvccCoordinator curCrd = this.curCrd; - return curCrd != null ? curCrd.node() : null; + return curCrd != null ? curCrd.nodeId() : null; } /** @@ -758,10 +769,12 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager * @return New coordinator. */ public MvccCoordinator reassignCoordinator(DiscoCache discoCache) { - assert curCrd == null || !discoCache.allNodes().contains(curCrd.node()) : curCrd; + assert curCrd == null || !F.nodeIds(discoCache.allNodes()).contains(curCrd.nodeId()) : curCrd; if (!discoCache.serverNodes().isEmpty()) { - curCrd = new MvccCoordinator(discoCache.serverNodes().get(0), + ClusterNode node = discoCache.serverNodes().get(0); + + curCrd = new MvccCoordinator(node.id(), discoCache.version().topologyVersion(), discoCache.version()); @@ -777,8 +790,8 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager } /** - * @param nodeId Node ID. - * @param activeQueries Active queries. + * @param nodeId Node ID + * @param activeQueries */ public void processClientActiveQueries(UUID nodeId, @Nullable Map<MvccCounter, Integer> activeQueries) { @@ -793,14 +806,14 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager DiscoCache discoCache, Map<UUID, Map<MvccCounter, Integer>> activeQueries) { - assert cctx.localNode().equals(curCrd.node()); + assert ctx.localNodeId().equals(curCrd.nodeId()); - log.info("Initialize local node as mvcc coordinator [node=" + cctx.localNodeId() + + log.info("Initialize local node as mvcc coordinator [node=" + ctx.localNodeId() + ", topVer=" + topVer + ']'); crdVer = topVer.topologyVersion(); - prevCrdQueries.init(activeQueries, discoCache, cctx.discovery()); + prevCrdQueries.init(activeQueries, discoCache, ctx.discovery()); crdLatch.countDown(); } @@ -816,18 +829,18 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager private MvccResponseListener lsnr; /** */ - public final ClusterNode crd; + public final UUID crdId; /** */ long startTime; /** * @param id Future ID. - * @param crd Coordinator. + * @param crdId Coordinator node ID. */ - MvccVersionFuture(Long id, ClusterNode crd, @Nullable MvccResponseListener lsnr) { + MvccVersionFuture(Long id, UUID crdId, @Nullable MvccResponseListener lsnr) { this.id = id; - this.crd = crd; + this.crdId = crdId; this.lsnr = lsnr; if (STAT_CNTRS) @@ -841,7 +854,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager assert res.counter() != COUNTER_NA; if (lsnr != null) - lsnr.onMvccResponse(crd.id(), res); + lsnr.onMvccResponse(crdId, res); onDone(res); } @@ -859,7 +872,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager * @param nodeId Failed node ID. */ void onNodeLeft(UUID nodeId) { - if (crd.id().equals(nodeId)) { + if (crdId.equals(nodeId)) { ClusterTopologyCheckedException err = new ClusterTopologyCheckedException("Failed to request coordinator version, " + "coordinator failed: " + nodeId); @@ -869,7 +882,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager /** {@inheritDoc} */ @Override public String toString() { - return "MvccVersionFuture [crd=" + crd + ", id=" + id + ']'; + return "MvccVersionFuture [crd=" + crdId + ", id=" + id + ']'; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java index 24ff354..0b449d2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java @@ -17,15 +17,19 @@ package org.apache.ignite.internal.processors.cache.mvcc; -import org.apache.ignite.cluster.ClusterNode; +import java.io.Serializable; +import java.util.UUID; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; /** * */ -public class MvccCoordinator { +public class MvccCoordinator implements Serializable { /** */ - private final ClusterNode crd; + private static final long serialVersionUID = 0L; + + /** */ + private final UUID nodeId; /** * Unique coordinator version, increases when new coordinator is assigned, @@ -37,12 +41,16 @@ public class MvccCoordinator { private final AffinityTopologyVersion topVer; /** - * @param crd Coordinator nde. + * @param nodeId Coordinator node ID. * @param crdVer Coordinator version. * @param topVer Topology version when coordinator was assigned. */ - public MvccCoordinator(ClusterNode crd, long crdVer, AffinityTopologyVersion topVer) { - this.crd = crd; + public MvccCoordinator(UUID nodeId, long crdVer, AffinityTopologyVersion topVer) { + assert nodeId != null; + assert crdVer > 0 : crdVer; + assert topVer != null; + + this.nodeId = nodeId; this.crdVer = crdVer; this.topVer = topVer; } @@ -55,10 +63,10 @@ public class MvccCoordinator { } /** - * @return Coordinator node. + * @return Coordinator node ID. */ - public ClusterNode node() { - return crd; + public UUID nodeId() { + return nodeId; } /** @@ -88,6 +96,6 @@ public class MvccCoordinator { /** {@inheritDoc} */ @Override public String toString() { - return "MvccCoordinator [node=" + crd.id() + ", ver=" + crdVer + ", topVer=" + topVer + ']'; + return "MvccCoordinator [node=" + nodeId + ", ver=" + crdVer + ", topVer=" + topVer + ']'; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryAware.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryAware.java new file mode 100644 index 0000000..d5172c6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryAware.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public interface MvccQueryAware { + /** + * @param newCrd New coordinator. + * @return Version used by this query. + */ + @Nullable public MvccCoordinatorVersion onMvccCoordinatorChange(MvccCoordinator newCrd); + + /** + * @param topVer Topology version when version was requested. + */ + public void onMvccVersionReceived(AffinityTopologyVersion topVer); + + /** + * @param e Error. + */ + public void onMvccVersionError(IgniteCheckedException e); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryFuture.java deleted file mode 100644 index 4d66437..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryFuture.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.mvcc; - -import org.jetbrains.annotations.Nullable; - -/** - * - */ -public interface MvccQueryFuture { - @Nullable public MvccCoordinatorVersion onMvccCoordinatorChange(MvccCoordinator newCrd); -} http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java new file mode 100644 index 0000000..095f630 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.lang.IgniteInClosure; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class MvccQueryTracker { + /** */ + private MvccCoordinator mvccCrd; + + /** */ + private MvccCoordinatorVersion mvccVer; + + /** */ + private final GridCacheContext cctx; + + /** */ + private final boolean canRemap; + + /** */ + private final MvccQueryAware lsnr; + + /** + * @param cctx + * @param canRemap + * @param lsnr + */ + public MvccQueryTracker(GridCacheContext cctx, boolean canRemap, MvccQueryAware lsnr) { + assert cctx.mvccEnabled() : cctx.name(); + + this.cctx = cctx; + this.canRemap = canRemap; + this.lsnr = lsnr; + } + + @Nullable public MvccCoordinatorVersion onMvccCoordinatorChange(MvccCoordinator newCrd) { + synchronized (this) { + if (mvccVer != null) { + mvccCrd = newCrd; + + return mvccVer; + } + else if (mvccCrd != null) + mvccCrd = null; + + return null; + } + } + + public MvccCoordinatorVersion mvccVersion() { + return mvccVer; + } + + public void onQueryDone() { + MvccCoordinator mvccCrd0 = null; + MvccCoordinatorVersion mvccVer0 = null; + + synchronized (this) { + if (mvccVer != null) { + assert mvccCrd != null; + + mvccCrd0 = mvccCrd; + mvccVer0 = mvccVer; + + mvccVer = null; + } + } + + if (mvccVer0 != null) + cctx.shared().coordinators().ackQueryDone(mvccCrd0, mvccVer0); + } + + public void requestVersion(final AffinityTopologyVersion topVer) { + MvccCoordinator mvccCrd0 = cctx.affinity().mvccCoordinator(topVer); + + if (mvccCrd0 == null) { + lsnr.onMvccVersionError(new IgniteCheckedException("Mvcc coordinator is not assigned: " + topVer)); + + return; + } + + synchronized (this) { + this.mvccCrd = mvccCrd0; + } + + MvccCoordinator curCrd = cctx.topology().mvccCoordinator(); + + if (!mvccCrd0.equals(curCrd)) { + assert cctx.topology().topologyVersionFuture().initialVersion().compareTo(topVer) > 0; + + if (!canRemap) { + lsnr.onMvccVersionError(new ClusterTopologyCheckedException("Failed to request mvcc version, coordinator changed.")); + + return; + } + else + waitNextTopology(topVer); + } + + IgniteInternalFuture<MvccCoordinatorVersion> cntrFut = + cctx.shared().coordinators().requestQueryCounter(mvccCrd); + + cntrFut.listen(new IgniteInClosure<IgniteInternalFuture<MvccCoordinatorVersion>>() { + @Override public void apply(IgniteInternalFuture<MvccCoordinatorVersion> fut) { + try { + MvccCoordinatorVersion rcvdVer = fut.get(); + + boolean needRemap = false; + + synchronized (MvccQueryTracker.this) { + if (mvccCrd != null) + mvccVer = rcvdVer; + else + needRemap = true; + } + + if (!needRemap) { + lsnr.onMvccVersionReceived(topVer); + + return; + } + } + catch (ClusterTopologyCheckedException e) { + IgniteLogger log = cctx.logger(MvccQueryTracker.class); + + if (log.isDebugEnabled()) + log.debug("Mvcc coordinator failed: " + e); + } + catch (IgniteCheckedException e) { + lsnr.onMvccVersionError(e); + + return; + } + + // Coordinator failed on reassigned, need remap. + if (canRemap) + waitNextTopology(topVer); + else { + lsnr.onMvccVersionError(new ClusterTopologyCheckedException("Failed to " + + "request mvcc version, coordinator failed.")); + } + } + }); + } + + private void waitNextTopology(AffinityTopologyVersion topVer) { + assert canRemap; + + IgniteInternalFuture<AffinityTopologyVersion> waitFut = + cctx.shared().exchange().affinityReadyFuture(topVer.nextMinorVersion()); + + if (waitFut == null) + requestVersion(cctx.shared().exchange().readyAffinityVersion()); + else { + waitFut.listen(new IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { + try { + requestVersion(fut.get()); + } + catch (IgniteCheckedException e) { + lsnr.onMvccVersionError(e); + } + } + }); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java index ed9848c..3a34e28 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java @@ -198,7 +198,7 @@ public class IgniteWalIteratorFactory { dbMgr.setPageSize(pageSize); return new GridCacheSharedContext<>( - kernalCtx, null, null, null, null, + kernalCtx, null, null, null, null, null, dbMgr, null, null, null, null, null, null, null, null); http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java index 07be8b4..0b507a3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java @@ -47,6 +47,7 @@ import org.apache.ignite.internal.managers.loadbalancer.GridLoadBalancerManager; import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor; import org.apache.ignite.internal.processors.cache.GridCacheProcessor; import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; +import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; import org.apache.ignite.internal.processors.closure.GridClosureProcessor; import org.apache.ignite.internal.processors.cluster.ClusterProcessor; @@ -440,6 +441,11 @@ public class StandaloneGridKernalContext implements GridKernalContext { } /** {@inheritDoc} */ + @Override public CacheCoordinatorsSharedManager coordinators() { + return null; + } + + /** {@inheritDoc} */ @Override public void markSegmented() { } http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java index 83e846f..3a269db 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java @@ -542,7 +542,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage if (cctx.mvccEnabled()) { mvccCrd = cctx.affinity().mvccCoordinator(cctx.shared().exchange().readyAffinityVersion()); - IgniteInternalFuture<MvccCoordinatorVersion> fut0 = cctx.shared().coordinators().requestQueryCounter(mvccCrd.node()); + IgniteInternalFuture<MvccCoordinatorVersion> fut0 = cctx.shared().coordinators().requestQueryCounter(mvccCrd); mvccVer = fut0.get(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index dda1e69..3ddee2c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -1468,7 +1468,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte if (cctx.mvccEnabled()) { mvccCrd = cctx.affinity().mvccCoordinator(cctx.shared().exchange().readyAffinityVersion()); - IgniteInternalFuture<MvccCoordinatorVersion> fut0 = cctx.shared().coordinators().requestQueryCounter(mvccCrd.node()); + IgniteInternalFuture<MvccCoordinatorVersion> fut0 = cctx.shared().coordinators().requestQueryCounter(mvccCrd); qry.mvccVersion(fut0.get()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java index 5c11a4b..2f1a0d6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java @@ -1860,6 +1860,97 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testGetVersionRequestFailover() throws Exception { + final int NODES = 5; + + testSpi = true; + + startGridsMultiThreaded(NODES - 1); + + client = true; + + Ignite client = startGrid(NODES - 1); + + final List<String> cacheNames = new ArrayList<>(); + + final Map<Integer, Integer> vals = new HashMap<>(); + + for (int i = 0; i < 100; i++) + vals.put(i, i); + + for (CacheConfiguration ccfg : cacheConfigurations()) { + ccfg.setName("cache-" + cacheNames.size()); + + ccfg.setNodeFilter(new TestCacheNodeExcludingFilter(getTestIgniteInstanceName(0))); + + cacheNames.add(ccfg.getName()); + + IgniteCache cache = client.createCache(ccfg); + + try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.putAll(vals); + + tx.commit(); + } + } + + final AtomicInteger nodeIdx = new AtomicInteger(1); + + final AtomicBoolean done = new AtomicBoolean(); + + try { + IgniteInternalFuture getFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + Ignite node = ignite(nodeIdx.getAndIncrement()); + + int cnt = 0; + + while (!done.get()) { + for (String cacheName : cacheNames) { + IgniteCache cache = node.cache(cacheName); + + Map<Integer, Integer> res = cache.getAll(vals.keySet()); + + assertEquals(vals, res); + } + + cnt++; + } + + log.info("Finished [node=" + node.name() + ", cnt=" + cnt + ']'); + + return null; + } + }, NODES - 1, "get-thread"); + + doSleep(1000); + + TestRecordingCommunicationSpi crdSpi = TestRecordingCommunicationSpi.spi(ignite(0)); + + crdSpi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { + @Override public boolean apply(ClusterNode node, Message msg) { + return msg instanceof MvccCoordinatorVersionResponse; + } + }); + + crdSpi.waitForBlocked(); + + stopGrid(0); + + doSleep(1000); + + done.set(true); + + getFut.get(); + } + finally { + done.set(true); + } + } + + /** * @param N Number of object to update in single transaction. * @param srvs Number of server nodes. * @param clients Number of client nodes. @@ -2218,29 +2309,35 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { assertTrue("Active queries not empty", GridTestUtils.waitForCondition( new GridAbsPredicate() { @Override public boolean apply() { - Map activeQrys = GridTestUtils.getFieldValue(crd, "activeQueries"); + Map queries = GridTestUtils.getFieldValue(crd, "activeQueries"); + + if (!queries.isEmpty()) + log.info("Active queries: " + queries); - return activeQrys.isEmpty(); + return queries.isEmpty(); } - }, 5_000) + }, 8_000) ); assertTrue("Previous coordinator queries not empty", GridTestUtils.waitForCondition( new GridAbsPredicate() { @Override public boolean apply() { - Map prevCrdQueries = GridTestUtils.getFieldValue(crd, "prevCrdQueries", "activeQueries"); + Map queries = GridTestUtils.getFieldValue(crd, "prevCrdQueries", "activeQueries"); + + if (!queries.isEmpty()) + log.info("Previous coordinator queries: " + queries); - return prevCrdQueries.isEmpty(); + return queries.isEmpty(); } - }, 5_000) + }, 8_000) ); - if (crd.currentCoordinatorNode().equals(node.cluster().localNode())) { + if (node.cluster().localNode().id().equals(crd.currentCoordinatorId())) { assertTrue("prevQueriesDone flag is not set", GridTestUtils.waitForCondition( new GridAbsPredicate() { @Override public boolean apply() { return GridTestUtils.getFieldValue(crd, "prevCrdQueries", "prevQueriesDone"); } - }, 5_000) + }, 8_000) ); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java index 64070d1..56d09f8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java @@ -51,7 +51,6 @@ public class BPlusTreePageMemoryImplTest extends BPlusTreeSelfTest { null, null, null, - null, new NoOpPageStoreManager(), new NoOpWALManager(), new IgniteCacheDatabaseSharedManager(), http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java index 5bbf575..39183b2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java @@ -52,7 +52,6 @@ public class BPlusTreeReuseListPageMemoryImplTest extends BPlusTreeReuseSelfTest null, null, null, - null, new NoOpPageStoreManager(), new NoOpWALManager(), new IgniteCacheDatabaseSharedManager(), http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MetadataStoragePageMemoryImplTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MetadataStoragePageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MetadataStoragePageMemoryImplTest.java index d16e525..a427c63 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MetadataStoragePageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MetadataStoragePageMemoryImplTest.java @@ -67,7 +67,6 @@ public class MetadataStoragePageMemoryImplTest extends MetadataStorageSelfTest{ null, null, null, - null, new NoOpPageStoreManager(), new NoOpWALManager(), new IgniteCacheDatabaseSharedManager(), http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java index bd849b1..467ede4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java @@ -56,7 +56,6 @@ public class PageMemoryImplNoLoadTest extends PageMemoryNoLoadSelfTest { null, null, null, - null, new NoOpPageStoreManager(), new NoOpWALManager(), new IgniteCacheDatabaseSharedManager(), http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java index 37422fb..c5997fa 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java @@ -79,7 +79,6 @@ public class PageMemoryImplTest extends GridCommonAbstractTest { null, null, null, - null, new NoOpPageStoreManager(), new NoOpWALManager(), new IgniteCacheDatabaseSharedManager(), http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java index ee43309..6a1d4f4 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java @@ -38,7 +38,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheTtlManager; import org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager; import org.apache.ignite.internal.processors.cache.dr.GridOsCacheDrManager; import org.apache.ignite.internal.processors.cache.jta.CacheNoopJtaManager; -import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager; import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager; import org.apache.ignite.internal.processors.cache.query.GridCacheLocalQueryManager; @@ -65,7 +64,6 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> { ctx, new GridCacheSharedContext<>( ctx, - new CacheCoordinatorsSharedManager(), new IgniteTxManager(), new GridCacheVersionManager(), new GridCacheMvccManager(),
