pending
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3a8b66f3 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3a8b66f3 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3a8b66f3 Branch: refs/heads/ignite-10537 Commit: 3a8b66f31a0e7efa66c522fab25d55097c61a87b Parents: 7301ada Author: Igor Seliverstov <[email protected]> Authored: Sun Dec 9 22:30:45 2018 +0300 Committer: Igor Seliverstov <[email protected]> Committed: Sun Dec 9 22:30:45 2018 +0300 ---------------------------------------------------------------------- .../discovery/GridDiscoveryManager.java | 4 +- .../processors/cache/GridCacheAdapter.java | 6 + .../GridDhtPartitionsExchangeFuture.java | 2 +- .../processors/cache/mvcc/MvccCoordinator.java | 54 ++-- .../cache/mvcc/MvccDiscoveryData.java | 52 ---- .../processors/cache/mvcc/MvccProcessor.java | 24 +- .../cache/mvcc/MvccProcessorImpl.java | 282 +++++++++---------- .../cache/mvcc/MvccQueryTrackerImpl.java | 6 +- .../processors/cache/mvcc/MvccUtils.java | 2 +- .../ignite/internal/util/GridLongList.java | 1 + 10 files changed, 191 insertions(+), 242 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3a8b66f3/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 5abe63c..04a4c5e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -785,7 +785,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { discoWrk.discoCache = discoCache; if (!isLocDaemon && !ctx.clientDisconnected()) { - ctx.cache().context().coordinators().onLocalJoin(discoEvt); + ctx.cache().context().coordinators().onLocalJoin(discoEvt, discoCache); ctx.cache().context().exchange().onLocalJoin(discoEvt, discoCache); @@ -845,6 +845,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { ((IgniteKernal)ctx.grid()).onReconnected(clusterRestarted); + ctx.cache().context().coordinators().onLocalJoin(localJoinEvent(), discoCache); + ctx.cache().context().exchange().onLocalJoin(localJoinEvent(), discoCache); ctx.cluster().clientReconnectFuture().listen(new CI1<IgniteFuture<?>>() { http://git-wip-us.apache.org/repos/asf/ignite/blob/3a8b66f3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 1cd94d2..4ac9774 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -4316,6 +4316,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V AffinityTopologyVersion awaitVer = new AffinityTopologyVersion( topVer.topologyVersion() + 1, 0); + U.warn(log, X.getFullStackTrace(e)); + + U.warn(log,"txTopVer=" + topVer.topologyVersion() + ", waitVer=" + awaitVer); + + U.warn(log, "curCrd=" + ctx.shared().coordinators().currentCoordinator()); + ctx.shared().exchange().affinityReadyFuture(awaitVer).get(); continue; http://git-wip-us.apache.org/repos/asf/ignite/blob/3a8b66f3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index ffc55a9..7849f1e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -2070,7 +2070,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte tryToPerformLocalSnapshotOperation(); if (err == null) - cctx.coordinators().onExchangeDone(exchCtx.events().discoveryCache()); + cctx.coordinators().onExchangeDone(initialVersion(), res, exchCtx.events().discoveryCache()); // Create and destory caches and cache proxies. cctx.cache().onExchangeDone(initialVersion(), exchActions, err); http://git-wip-us.apache.org/repos/asf/ignite/blob/3a8b66f3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java index 045177a..a9dd18e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java @@ -31,38 +31,46 @@ public class MvccCoordinator implements Serializable { private static final long serialVersionUID = 0L; /** */ + @GridToStringInclude + private final AffinityTopologyVersion topVer; + + /** */ private final UUID nodeId; /** * Unique coordinator version, increases when new coordinator is assigned, * can differ from topVer if we decide to assign coordinator manually. */ - private final long crdVer; + private final long ver; /** */ - @GridToStringInclude - private final AffinityTopologyVersion topVer; + private final boolean local; - /** - * @param nodeId Coordinator node ID. - * @param crdVer Coordinator version. + /** * @param topVer Topology version when coordinator was assigned. */ - public MvccCoordinator(UUID nodeId, long crdVer, AffinityTopologyVersion topVer) { - assert nodeId != null; - assert crdVer > 0 : crdVer; - assert topVer != null; + public MvccCoordinator(AffinityTopologyVersion topVer) { + this(topVer, null, 0, false); + } + /** + * @param topVer Topology version when coordinator was assigned. + * @param nodeId Coordinator node ID. + * @param ver Coordinator version. + */ + public MvccCoordinator(AffinityTopologyVersion topVer, UUID nodeId, long ver, + boolean local) { this.nodeId = nodeId; - this.crdVer = crdVer; + this.ver = ver; this.topVer = topVer; + this.local = local; } /** - * @return Unique coordinator version. + * @return Topology version when coordinator was assigned. */ - public long coordinatorVersion() { - return crdVer; + public AffinityTopologyVersion topologyVersion() { + return topVer; } /** @@ -73,10 +81,18 @@ public class MvccCoordinator implements Serializable { } /** - * @return Topology version when coordinator was assigned. + * @return Unique coordinator version. */ - public AffinityTopologyVersion topologyVersion() { - return topVer; + public long version() { + return ver; + } + + /** + * + * @return {@code True} if the coordinator is local. + */ + public boolean local() { + return local; } /** {@inheritDoc} */ @@ -89,12 +105,12 @@ public class MvccCoordinator implements Serializable { MvccCoordinator that = (MvccCoordinator)o; - return crdVer == that.crdVer; + return ver == that.ver; } /** {@inheritDoc} */ @Override public int hashCode() { - return (int)(crdVer ^ (crdVer >>> 32)); + return (int)(ver ^ (ver >>> 32)); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/3a8b66f3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccDiscoveryData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccDiscoveryData.java deleted file mode 100644 index d2e936f..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccDiscoveryData.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.mvcc; - -import org.apache.ignite.internal.util.typedef.internal.S; - -import java.io.Serializable; - -/** - * MVCC discovery data to be shared between nodes on join. - */ -public class MvccDiscoveryData implements Serializable { - /** */ - private static final long serialVersionUID = 0L; - - /** Current coordinator. */ - private MvccCoordinator crd; - - /** - * @param crd Coordinator. - */ - public MvccDiscoveryData(MvccCoordinator crd) { - this.crd = crd; - } - - /** - * @return Current coordinator. - */ - public MvccCoordinator coordinator() { - return crd; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(MvccDiscoveryData.class, this); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/3a8b66f3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java index 161342f..5a648b9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.cache.mvcc; -import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.configuration.CacheConfiguration; @@ -27,9 +26,10 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.processors.GridProcessor; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; -import org.apache.ignite.internal.util.GridLongList; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; /** @@ -41,30 +41,22 @@ public interface MvccProcessor extends GridProcessor { * * @param evt Discovery event. */ - void onLocalJoin(DiscoveryEvent evt); + void onLocalJoin(DiscoveryEvent evt, DiscoCache discoCache); /** * Exchange done callback. * + * @param initialVersion Initial exchange version. + * @param resultVersion Result exchange version, defers from initial if several exchange were merged. * @param discoCache Disco cache. */ - void onExchangeDone(DiscoCache discoCache); - - /** - * @param nodeId Node ID - * @param activeQueries Active queries. - */ - void processClientActiveQueries(UUID nodeId, @Nullable GridLongList activeQueries); + void onExchangeDone(AffinityTopologyVersion initialVersion, + AffinityTopologyVersion resultVersion, DiscoCache discoCache); /** * @return Coordinator. */ - @Nullable MvccCoordinator currentCoordinator(); - - /** - * @return Current coordinator node ID. - */ - UUID currentCoordinatorId(); + @NotNull MvccCoordinator currentCoordinator(); /** * @param crdVer Mvcc coordinator version. http://git-wip-us.apache.org/repos/asf/ignite/blob/3a8b66f3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java index d026607..b29c782 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.mvcc; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -40,7 +41,6 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.events.DiscoveryEvent; -import org.apache.ignite.events.Event; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteDiagnosticPrepareContext; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; @@ -51,7 +51,6 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.discovery.CustomEventListener; import org.apache.ignite.internal.managers.discovery.DiscoCache; -import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.GridProcessorAdapter; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheGroupContext; @@ -106,6 +105,7 @@ import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT; +import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; @@ -156,11 +156,8 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce MvccProcessorImpl.crdC = crdC; } - /** Topology version when local node was assigned as coordinator. */ - private volatile long crdVer; - /** */ - private volatile MvccCoordinator curCrd; + private volatile MvccCoordinator curCrd = new MvccCoordinator(AffinityTopologyVersion.NONE); /** */ private TxLog txLog; @@ -218,9 +215,6 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce /** Flag whether all nodes in cluster support MVCC. */ private volatile boolean mvccSupported = true; - /** Flag whether coordinator was changed by the last discovery event. */ - private volatile boolean crdChanged; - /** * Maps failed node id to votes accumulator for that node. */ @@ -237,12 +231,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce /** {@inheritDoc} */ @Override public void start() throws IgniteCheckedException { - ctx.event().addLocalEventListener(new GridLocalEventListener() { - @Override public void onEvent(Event evt) { - onDiscovery((DiscoveryEvent)evt); - } - }, - EVT_NODE_FAILED, EVT_NODE_LEFT, EVT_NODE_JOINED); + ctx.event().addDiscoveryEventListener(this::onDiscovery, EVT_NODE_FAILED, EVT_NODE_LEFT, EVT_CLIENT_NODE_DISCONNECTED, EVT_NODE_JOINED); ctx.io().addMessageListener(TOPIC_CACHE_COORDINATOR, new CoordinatorMessageListener()); @@ -370,81 +359,78 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce } /** {@inheritDoc} */ - @Override public void onExchangeDone(DiscoCache discoCache) { + @Override public void onExchangeDone(AffinityTopologyVersion initialVersion, + AffinityTopologyVersion resultVersion, DiscoCache discoCache) { MvccCoordinator curCrd0 = curCrd; - if (crdChanged) { + if (coordinatorChanged(curCrd0, initialVersion, resultVersion)) { // Rollback all transactions with old snapshots. ctx.cache().context().tm().rollbackMvccTxOnCoordinatorChange(); // Complete init future if local node is a new coordinator. All previous txs are already completed here. - if (crdVer != 0 && !initFut.isDone()) { - assert curCrd0 != null && curCrd0.nodeId().equals(ctx.localNodeId()); - + if (curCrd0.local()) initFut.onDone(); - } - - crdChanged = false; } else { - if (curCrd0 != null && ctx.localNodeId().equals(curCrd0.nodeId()) && discoCache != null) + if (curCrd0.local() && discoCache != null) cleanupOrphanedServerTransactions(discoCache.serverNodes()); } } /** {@inheritDoc} */ - @Override public void onLocalJoin(DiscoveryEvent evt) { - assert evt.type() == EVT_NODE_JOINED && ctx.localNodeId().equals(evt.eventNode().id()); + @Override public void onLocalJoin(DiscoveryEvent evt, DiscoCache discoCache) { + assert evt.type() == EVT_NODE_JOINED && evt.eventNode().isLocal(); + + AffinityTopologyVersion topVer = discoCache.version(); + List<ClusterNode> nodes = discoCache.allNodes(); - onCoordinatorChanged(evt.topologyNodes(), evt.topologyVersion(), false); + checkMvccSupported(nodes); + + onCoordinatorChanged(topVer, nodes, false); } /** - * Discovery listener. Note: initial join event is handled by {@link MvccProcessorImpl#onLocalJoin(DiscoveryEvent)} + * Discovery listener. Note: initial join event is handled by {@link MvccProcessorImpl#onLocalJoin} * method. * * @param evt Discovery event. */ - private void onDiscovery(DiscoveryEvent evt) { - assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_JOINED; + private void onDiscovery(DiscoveryEvent evt, DiscoCache discoCache) { + assert evt.type() == EVT_NODE_FAILED + || evt.type() == EVT_NODE_LEFT + || evt.type() == EVT_NODE_JOINED + || evt.type() == EVT_CLIENT_NODE_DISCONNECTED; UUID nodeId = evt.eventNode().id(); + AffinityTopologyVersion topVer = discoCache.version(); + List<ClusterNode> nodes = discoCache.allNodes(); + + checkMvccSupported(nodes); MvccCoordinator curCrd0 = curCrd; if (evt.type() == EVT_NODE_JOINED) { - if (curCrd0 == null) // Handle join event only if coordinator has not been elected yet. - onCoordinatorChanged(evt.topologyNodes(), evt.topologyVersion(), false); - - return; + if (curCrd0.nodeId() == null) // Handle join event only if coordinator has not been elected yet. + onCoordinatorChanged(topVer, nodes, false); } + else if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + if (curCrd0.nodeId() != null) { + // 1. Notify all listeners waiting for a snapshot. + onCoordinatorFailed(curCrd0.nodeId()); - // Process mvcc coordinator left event on the rest nodes. - if (nodeId.equals(curCrd0.nodeId())) { - // 1. Notify all listeners waiting for a snapshot. - Map<Long, MvccSnapshotResponseListener> map = snapLsnrs.remove(nodeId); - - if (map != null) { - ClusterTopologyCheckedException ex = new ClusterTopologyCheckedException("Failed to request mvcc " + - "version, coordinator failed: " + nodeId); - - MvccSnapshotResponseListener lsnr; - - for (Long id : map.keySet()) { - if ((lsnr = map.remove(id)) != null) - lsnr.onError(ex); - } + // 2. Process coordinator change. + onCoordinatorChanged(topVer, Collections.emptyList(), false); } + } + else if (nodeId.equals(curCrd0.nodeId())) { + // 1. Notify all listeners waiting for a snapshot. + onCoordinatorFailed(nodeId); - // 2. Notify acknowledge futures. - for (WaitAckFuture fut : ackFuts.values()) - fut.onNodeLeft(nodeId); - - // 3. Process coordinator change. - onCoordinatorChanged(evt.topologyNodes(), evt.topologyVersion(), true); + // 2. Process coordinator change. + onCoordinatorChanged(topVer, discoCache.allNodes(), true); } // Process node left event on the current mvcc coordinator. - else if (curCrd0.nodeId().equals(ctx.localNodeId())) { + else if (curCrd0.local()) { // 1. Notify active queries. activeQueries.onNodeFailed(nodeId); @@ -471,65 +457,83 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce } } + /** */ + private void onCoordinatorFailed(UUID nodeId) { + // 1. Notify all listeners waiting for a snapshot. + Map<Long, MvccSnapshotResponseListener> map = snapLsnrs.remove(nodeId); + + if (map != null) { + ClusterTopologyCheckedException ex = new ClusterTopologyCheckedException("Failed to request mvcc " + + "version, coordinator left: " + nodeId); + + MvccSnapshotResponseListener lsnr; + + for (Long id : map.keySet()) { + if ((lsnr = map.remove(id)) != null) + lsnr.onError(ex); + } + } + + // 2. Notify acknowledge futures. + for (WaitAckFuture fut : ackFuts.values()) + fut.onNodeLeft(nodeId); + } + /** * Coordinator change callback. Performs all needed actions for handling new coordinator assignment. * - * @param nodes Cluster topology snapshot. - * @param topVer Topology version. * @param sndQrys {@code True} if it is need to send an active queries list to the new coordinator. */ - private void onCoordinatorChanged(Collection<ClusterNode> nodes, long topVer, boolean sndQrys) { - MvccCoordinator newCrd = pickMvccCoordinator(nodes, topVer); + private void onCoordinatorChanged(AffinityTopologyVersion topVer, Collection<ClusterNode> nodes, boolean sndQrys) { + MvccCoordinator crd0 = pickMvccCoordinator(nodes, topVer); - if (newCrd == null) - return; + if (sndQrys && crd0.nodeId() == null) + sndQrys = false; // Nowhere to send - // Update current coordinator, collect active queries and send it to the new coordinator if needed. - GridLongList activeQryTrackers = null; + assert crd0.topologyVersion().compareTo(curCrd.topologyVersion()) >= 0; - synchronized (activeTrackers) { - assert curCrd == null || newCrd.topologyVersion().compareTo(curCrd.topologyVersion()) > 0; + curCrd = crd0; - if (sndQrys) { - activeQryTrackers = new GridLongList(); + GridLongList qrys = null; - for (MvccQueryTracker tracker : activeTrackers.values()) { - long trackerId = tracker.onMvccCoordinatorChange(newCrd); + if (sndQrys) { + qrys = new GridLongList(); - if (trackerId != MVCC_TRACKER_ID_NA) - activeQryTrackers.add(trackerId); - } - } + for (MvccQueryTracker tracker : activeTrackers.values()) { + long trackerId = tracker.onMvccCoordinatorChange(crd0); - curCrd = newCrd; + if (trackerId != MVCC_TRACKER_ID_NA) + qrys.add(trackerId); + } } - // Send local active queries to remote coordinator, if needed. - if (!newCrd.nodeId().equals(ctx.localNodeId())) { + if (crd0.local()) + prevCrdQueries.init(qrys, F.view(nodes, this::supportsMvcc), ctx.discovery()); + // Send local active queries to remote coordinator, if needed. + else if (qrys != null && !qrys.isEmpty()) { try { - if (sndQrys) - sendMessage(newCrd.nodeId(), new MvccActiveQueriesMessage(activeQryTrackers)); + sendMessage(crd0.nodeId(), new MvccActiveQueriesMessage(qrys)); } catch (IgniteCheckedException e) { U.error(log, "Failed to send active queries to mvcc coordinator: " + e); } } - // If a current node was elected as a new mvcc coordinator, we need to pre-initialize it. - else { - assert crdVer == 0 : crdVer; - - crdVer = newCrd.coordinatorVersion(); + } - if (log.isInfoEnabled()) - log.info("Initialize local node as mvcc coordinator [node=" + ctx.localNodeId() + - ", crdVer=" + crdVer + ']'); + /** + * @param currCrd Current Mvcc coordinator. + * @param from Start topology version. + * @param to End topology version + * @return {@code True} if coordinator was changed between two passed topology versions. + */ + private boolean coordinatorChanged(MvccCoordinator currCrd, AffinityTopologyVersion from, + AffinityTopologyVersion to) { + if (currCrd == null) + return false; - prevCrdQueries.init(activeQryTrackers, F.view(nodes, this::supportsMvcc), ctx.discovery()); + AffinityTopologyVersion crdVersion = currCrd.topologyVersion(); - // Do not complete init future here, because we should wait until all old transactions become terminated. - } - - crdChanged = true; + return from.compareTo(crdVersion) <= 0 && to .compareTo(crdVersion) >= 0; } /** @@ -561,23 +565,11 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce } /** {@inheritDoc} */ - @Override public void processClientActiveQueries(UUID nodeId, @Nullable GridLongList activeQueries) { - prevCrdQueries.addNodeActiveQueries(nodeId, activeQueries); - } - - /** {@inheritDoc} */ - @Override @Nullable public MvccCoordinator currentCoordinator() { + @Override @NotNull public MvccCoordinator currentCoordinator() { return curCrd; } /** {@inheritDoc} */ - @Override public UUID currentCoordinatorId() { - MvccCoordinator curCrd = this.curCrd; - - return curCrd != null ? curCrd.nodeId() : null; - } - - /** {@inheritDoc} */ @Override public byte state(MvccVersion ver) throws IgniteCheckedException { return state(ver.coordinatorVersion(), ver.counter()); } @@ -661,7 +653,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce @Override public MvccSnapshot tryRequestSnapshotLocal(@Nullable IgniteInternalTx tx) throws ClusterTopologyCheckedException { MvccCoordinator crd = currentCoordinator(); - if (crd == null) + if (crd.nodeId() == null) throw noCoordinatorError(); if (tx != null) { @@ -672,7 +664,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce "for the locked topology version. [crd=" + crd + ", tx=" + tx + ']'); } - if (!ctx.localNodeId().equals(crd.nodeId()) || !initFut.isDone()) + if (!crd.local() || !initFut.isDone()) return null; else if (tx != null) return assignTxSnapshot(0L, ctx.localNodeId(), false); @@ -698,7 +690,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce @Override public void requestSnapshotAsync(IgniteInternalTx tx, MvccSnapshotResponseListener lsnr) { MvccCoordinator crd = currentCoordinator(); - if (crd == null) { + if (crd.nodeId() == null) { lsnr.onError(noCoordinatorError()); return; @@ -759,10 +751,10 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce MvccCoordinator crd = curCrd; - if (updateVer.coordinatorVersion() == crd.coordinatorVersion()) - return sendTxCommit(crd, new MvccAckRequestTx(futIdCntr.incrementAndGet(), updateVer.counter())); + if (crd.version() != updateVer.coordinatorVersion()) + return new GridFinishedFuture<>(); - return new GridFinishedFuture<>(); + return sendTxCommit(crd, new MvccAckRequestTx(futIdCntr.incrementAndGet(), updateVer.counter())); } /** {@inheritDoc} */ @@ -771,7 +763,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce MvccCoordinator crd = curCrd; - if (crd.coordinatorVersion() != updateVer.coordinatorVersion()) + if (crd.version() != updateVer.coordinatorVersion()) return; MvccAckRequestTx msg = new MvccAckRequestTx((long)-1, updateVer.counter()); @@ -794,17 +786,18 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce @Override public void ackQueryDone(MvccSnapshot snapshot, long qryId) { MvccCoordinator crd = currentCoordinator(); - if (crd == null || snapshot != null - && crd.coordinatorVersion() == snapshot.coordinatorVersion() - && sendQueryDone(crd, new MvccAckRequestQueryCntr(queryTrackCounter(snapshot)))) + if (crd.nodeId() == null || snapshot == null) return; - Message msg = new MvccAckRequestQueryId(qryId); + if (crd.version() != snapshot.coordinatorVersion() + || !sendQueryDone(crd, new MvccAckRequestQueryCntr(queryTrackCounter(snapshot)))) { + Message msg = new MvccAckRequestQueryId(qryId); - do { - crd = currentCoordinator(); + do { + crd = currentCoordinator(); + } + while (!sendQueryDone(crd, msg)); } - while (!sendQueryDone(crd, msg)); } /** {@inheritDoc} */ @@ -871,13 +864,9 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce /** * Picks mvcc coordinator from the given list of nodes. * - * @param nodes List of nodes. - * @param topVer Topology version. * @return Chosen mvcc coordinator. */ - private MvccCoordinator pickMvccCoordinator(Collection<ClusterNode> nodes, long topVer) { - checkMvccSupported(nodes); - + private @NotNull MvccCoordinator pickMvccCoordinator(Collection<ClusterNode> nodes, AffinityTopologyVersion topVer) { ClusterNode crdNode = null; if (crdC != null) { @@ -897,15 +886,16 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce } } - MvccCoordinator crd = crdNode != null ? new MvccCoordinator(crdNode.id(), coordinatorVersion(crdNode), - new AffinityTopologyVersion(topVer, 0)) : null; + if (crdNode != null) + return new MvccCoordinator(topVer, crdNode.id(), coordinatorVersion(crdNode), crdNode.isLocal()); - if (log.isInfoEnabled() && crd != null) - log.info("Assigned mvcc coordinator [crd=" + crd + ", crdNode=" + crdNode + ']'); - else if (crd == null) - U.warn(log, "New mvcc coordinator was not assigned [topVer=" + topVer + ']'); +// TODO +// if (log.isInfoEnabled() && crd != null) +// log.info("Assigned mvcc coordinator [crd=" + crd + ", crdNode=" + crdNode + ']'); +// else if (crd == null) +// U.warn(log, "New mvcc coordinator was not assigned [topVer=" + topVer + ']'); - return crd; + return new MvccCoordinator(topVer); } @@ -964,9 +954,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce /** */ private MvccSnapshotResponse assignTxSnapshot(long futId, UUID nearId, boolean client) { - assert initFut.isDone(); - assert crdVer != 0; - assert ctx.localNodeId().equals(currentCoordinatorId()); + assert initFut.isDone() && curCrd != null && curCrd.local(); MvccSnapshotResponse res = new MvccSnapshotResponse(); @@ -998,7 +986,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce cleanup = prevCrdQueries.previousQueriesDone() ? cleanup - 1 : MVCC_COUNTER_NA; - res.init(futId, crdVer, ver, MVCC_START_OP_CNTR, cleanup, tracking); + res.init(futId, curCrd.version(), ver, MVCC_START_OP_CNTR, cleanup, tracking); return res; } @@ -1121,11 +1109,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce IgniteInternalFuture<VacuumMetrics> runVacuum() { assert !ctx.clientNode(); - MvccCoordinator crd0 = currentCoordinator(); - - if (Thread.currentThread().isInterrupted() || - crd0 == null || - crdVer == 0 && ctx.localNodeId().equals(crd0.nodeId())) + if (Thread.currentThread().isInterrupted()) return new GridFinishedFuture<>(new VacuumMetrics()); final GridFutureAdapter<VacuumMetrics> res = new GridFutureAdapter<>(); @@ -1236,8 +1220,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce if (U.assertionsEnabled()) { MvccCoordinator crd = currentCoordinator(); - assert crd != null - && crd.coordinatorVersion() >= snapshot.coordinatorVersion(); + assert crd.version() >= snapshot.coordinatorVersion(); for (TxKey key : waitMap.keySet()) { if (!( key.major() == snapshot.coordinatorVersion() @@ -1318,10 +1301,11 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce /** * @param crd Mvcc coordinator. * @param msg Message. - * @return {@code True} if no need to resend the message to a new coordinator. + * @return {@code True} if the message was sent successfully. */ + @SuppressWarnings("BooleanMethodIsAlwaysInverted") private boolean sendQueryDone(MvccCoordinator crd, Message msg) { - if (crd == null) + if (crd.nodeId() == null) return true; // no need to send ack; try { @@ -1336,7 +1320,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce MvccCoordinator crd0 = currentCoordinator(); // Coordinator is unassigned or still the same. - return crd0 == null || crd.coordinatorVersion() == crd0.coordinatorVersion(); + return crd0.nodeId() == null || crd.version() == crd0.version(); } catch (IgniteCheckedException e) { U.error(log, "Failed to send query ack [crd=" + crd + ", msg=" + msg + ']', e); @@ -1502,7 +1486,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce * @param nodeId Node ID. * @param msg Message. */ - private void processCoordinatorActiveQueriesMessage(UUID nodeId, MvccActiveQueriesMessage msg) { + private void processActiveQueriesMessage(UUID nodeId, MvccActiveQueriesMessage msg) { prevCrdQueries.addNodeActiveQueries(nodeId, msg.activeQueries()); } @@ -1558,7 +1542,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce if (minQry == null) minQry = tracking; - res.init(futId, crdVer, ver, MVCC_READ_OP_CNTR, MVCC_COUNTER_NA, tracking); + res.init(futId, curCrd.version(), ver, MVCC_READ_OP_CNTR, MVCC_COUNTER_NA, tracking); return res; } @@ -1663,7 +1647,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce if (msg0.waitForCoordinatorInit() && !initFut.isDone()) { initFut.listen(new IgniteInClosure<IgniteInternalFuture<Void>>() { @Override public void apply(IgniteInternalFuture<Void> future) { - assert crdVer != 0L; + assert curCrd.local(); processMessage(nodeId, msg); } @@ -1695,7 +1679,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce else if (msg instanceof MvccAckRequestQueryId) processNewCoordinatorQueryAckRequest(nodeId, (MvccAckRequestQueryId)msg); else if (msg instanceof MvccActiveQueriesMessage) - processCoordinatorActiveQueriesMessage(nodeId, (MvccActiveQueriesMessage)msg); + processActiveQueriesMessage(nodeId, (MvccActiveQueriesMessage)msg); else if (msg instanceof MvccRecoveryFinishedMessage) processRecoveryFinishedMessage(nodeId, ((MvccRecoveryFinishedMessage)msg)); else http://git-wip-us.apache.org/repos/asf/ignite/blob/3a8b66f3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTrackerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTrackerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTrackerImpl.java index d93a2e9..4db1c2b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTrackerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTrackerImpl.java @@ -147,8 +147,8 @@ public class MvccQueryTrackerImpl implements MvccQueryTracker { if (snapshot != null) { assert crdVer != 0 : this; - if (crdVer != newCrd.coordinatorVersion()) { - crdVer = newCrd.coordinatorVersion(); + if (crdVer != newCrd.version()) { + crdVer = newCrd.version(); return id; } @@ -223,7 +223,7 @@ public class MvccQueryTrackerImpl implements MvccQueryTracker { if (done) return false; - crdVer = crd.coordinatorVersion(); + crdVer = crd.version(); } return true; http://git-wip-us.apache.org/repos/asf/ignite/blob/3a8b66f3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java index 43f87e3..b717895 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java @@ -188,7 +188,7 @@ public class MvccUtils { if ((state == TxState.NA || state == TxState.PREPARED) && (proc.currentCoordinator() == null // Recovery from WAL. - || mvccCrd < proc.currentCoordinator().coordinatorVersion())) + || mvccCrd < proc.currentCoordinator().version())) state = TxState.ABORTED; return state; http://git-wip-us.apache.org/repos/asf/ignite/blob/3a8b66f3/modules/core/src/main/java/org/apache/ignite/internal/util/GridLongList.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridLongList.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridLongList.java index 1c022b0..38ca146 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridLongList.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridLongList.java @@ -26,6 +26,7 @@ import java.io.ObjectOutput; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.NoSuchElementException; +import java.util.stream.Collector; import org.apache.ignite.internal.IgniteCodeGeneratingFail; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S;
