Repository: ignite Updated Branches: refs/heads/ignite-5578 d4030225a -> 51a95a14c
ignite-5578 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/51a95a14 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/51a95a14 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/51a95a14 Branch: refs/heads/ignite-5578 Commit: 51a95a14c1a4377407684d7de94e562c2f8dfcb3 Parents: d403022 Author: sboikov <[email protected]> Authored: Thu Aug 3 16:33:34 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu Aug 3 17:26:22 2017 +0300 ---------------------------------------------------------------------- .../cache/CacheAffinitySharedManager.java | 2 + .../cache/CachePartitionExchangeWorkerTask.java | 3 ++ .../processors/cache/ClusterCachesInfo.java | 22 +++++++++-- .../processors/cache/ExchangeContext.java | 2 +- .../GridCachePartitionExchangeManager.java | 20 ++++++---- .../processors/cache/GridCacheProcessor.java | 8 +++- .../dht/GridDhtPartitionTopology.java | 9 +++-- .../dht/GridDhtPartitionTopologyImpl.java | 7 +--- .../preloader/CacheGroupAffinityMessage.java | 13 ++++++- .../GridDhtPartitionsExchangeFuture.java | 39 ++++++-------------- .../preloader/GridDhtPartitionsFullMessage.java | 22 +++++------ .../GridDhtPartitionsSingleMessage.java | 5 ++- .../GridDhtPartitionsSingleRequest.java | 9 ++--- .../IgniteDhtPartitionCountersMap.java | 3 ++ .../near/GridNearTxPrepareRequest.java | 5 +++ .../datastreamer/PlatformDataStreamer.java | 2 +- .../ignite/internal/util/GridListSet.java | 8 ---- 17 files changed, 100 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/51a95a14/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index dc905a0..d02df5c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -1867,6 +1867,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } /** + * @param evts Discovery events processed during exchange. + * @param addedOnExchnage {@code True} if cache group was added during this exchange. * @param aff Affinity. * @param rebalanceInfo Rebalance information. * @param latePrimary If {@code true} delays primary assignment if it is not owner. http://git-wip-us.apache.org/repos/asf/ignite/blob/51a95a14/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java index cfbfa6e..f4c1392 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java @@ -21,5 +21,8 @@ package org.apache.ignite.internal.processors.cache; * Cache partition exchange worker task marker interface. */ public interface CachePartitionExchangeWorkerTask { + /** + * @return {@code False} if exchange merge should stop if this task is found in exchange worker queue. + */ boolean skipForExchangeMerge(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/51a95a14/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java index bb51a3b..5e2c8db 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java @@ -811,9 +811,26 @@ class ClusterCachesInfo { /** * @param joinedNodeId Joined node ID. + * @return {@code True} if there are new caches received from joined node. + */ + boolean hasCachesReceivedFromJoin(UUID joinedNodeId) { + for (DynamicCacheDescriptor desc : registeredCaches.values()) { + if (desc.staticallyConfigured()) { + assert desc.receivedFrom() != null : desc; + + if (joinedNodeId.equals(desc.receivedFrom())) + return true; + } + } + + return false; + } + + /** + * @param joinedNodeId Joined node ID. * @return New caches received from joined node. */ - @NotNull public List<DynamicCacheDescriptor> cachesReceivedFromJoin(UUID joinedNodeId) { + List<DynamicCacheDescriptor> cachesReceivedFromJoin(UUID joinedNodeId) { assert joinedNodeId != null; List<DynamicCacheDescriptor> started = null; @@ -1732,8 +1749,7 @@ class ClusterCachesInfo { * DIRECT comparator for cache descriptors (first system caches). */ static Comparator<DynamicCacheDescriptor> DIRECT = new Comparator<DynamicCacheDescriptor>() { - @Override - public int compare(DynamicCacheDescriptor o1, DynamicCacheDescriptor o2) { + @Override public int compare(DynamicCacheDescriptor o1, DynamicCacheDescriptor o2) { if (!o1.cacheType().userCache()) return -1; if (!o2.cacheType().userCache()) http://git-wip-us.apache.org/repos/asf/ignite/blob/51a95a14/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java index 6442752..4046c98 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java @@ -65,7 +65,7 @@ public class ExchangeContext { } else { boolean startCaches = fut.exchangeId().isJoined() && - fut.sharedContext().cache().receivedCachesFromNodeJoin(fut.exchangeId().eventNode()); + fut.sharedContext().cache().hasCachesReceivedFromJoin(fut.exchangeId().eventNode()); fetchAffOnJoin = protocolVer == 1; http://git-wip-us.apache.org/repos/asf/ignite/blob/51a95a14/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 75ddacd..efb04a7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -205,6 +205,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** */ private final GridFutureAdapter<?> crdInitFut = new GridFutureAdapter(); + /** For tests only. */ + private volatile AffinityTopologyVersion exchMergeTestWaitVer; + /** Discovery listener. */ private final DiscoveryEventListener discoLsnr = new DiscoveryEventListener() { @Override public void onEvent(DiscoveryEvent evt, DiscoCache cache) { @@ -371,7 +374,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } } - public void coordinatorInitialized() { + /** + * + */ + public void onCoordinatorInitialized() { crdInitFut.onDone(); } @@ -1264,7 +1270,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** - * @param topVer Topology version. + * @param topVer Exchange result topology version. + * @param initTopVer Exchange initial version. * @param err Error. */ public void onExchangeDone(AffinityTopologyVersion topVer, AffinityTopologyVersion initTopVer, @Nullable Throwable err) { @@ -1757,9 +1764,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana ((IgniteDiagnosticAware)fut).addDiagnosticRequest(ctx); } - /** */ - private volatile AffinityTopologyVersion exchMergeTestWaitVer; - /** * For testing only. * @@ -1810,7 +1814,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana final GridDhtPartitionsSingleMessage pendingMsg = fut.mergeJoinExchangeOnDone(curFut); if (pendingMsg != null) - curFut.waitAndReplyToNode(evt.eventNode(), pendingMsg); + curFut.waitAndReplyToNode(evt.eventNode().id(), pendingMsg); } exchWorker.futQ.remove(fut); @@ -1903,7 +1907,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana break; } - if (evt.type() == EVT_NODE_JOINED && cctx.cache().receivedCachesFromNodeJoin(node)) { + if (evt.type() == EVT_NODE_JOINED && cctx.cache().hasCachesReceivedFromJoin(node)) { log.info("Stop merge, received caches from node: " + node); break; @@ -2037,7 +2041,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana */ private void onExchangeDone(AffinityTopologyVersion resVer, GridDhtPartitionsExchangeFuture exchFut) throws IgniteInterruptedCheckedException { - if (resVer.compareTo(exchFut.exchangeId().topologyVersion()) != 0) { + if (resVer.compareTo(exchFut.initialVersion()) != 0) { waitForExchangeFuture(resVer); for (CachePartitionExchangeWorkerTask task : futQ) { http://git-wip-us.apache.org/repos/asf/ignite/blob/51a95a14/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index f616e5b..e7bfb9a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -1760,8 +1760,12 @@ public class GridCacheProcessor extends GridProcessorAdapter { } } - public boolean receivedCachesFromNodeJoin(ClusterNode node) { - return !cachesInfo.cachesReceivedFromJoin(node.id()).isEmpty(); + /** + * @param node Joined node. + * @return {@code True} if there are new caches received from joined node. + */ + boolean hasCachesReceivedFromJoin(ClusterNode node) { + return cachesInfo.hasCachesReceivedFromJoin(node.id()); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/51a95a14/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index 1887473..3365e52 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -69,9 +69,7 @@ public interface GridDhtPartitionTopology { ) throws IgniteInterruptedCheckedException; /** - * Topology version. - * - * @return Topology version. + * @return Result topology version of last finished exchange. */ public AffinityTopologyVersion readyTopologyVersion(); @@ -103,6 +101,7 @@ public interface GridDhtPartitionTopology { * * @param exchFut Exchange future. * @param affReady Affinity ready flag. + * @param updateMoving * @throws IgniteCheckedException If failed. */ public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut, @@ -111,6 +110,7 @@ public interface GridDhtPartitionTopology { throws IgniteCheckedException; /** + * @param affVer Affinity version. * @param exchFut Exchange future. * @throws IgniteInterruptedCheckedException If interrupted. */ @@ -272,6 +272,7 @@ public interface GridDhtPartitionTopology { /** * @param exchId Exchange ID. * @param parts Partitions. + * @param force {@code True} to skip stale update check. * @return {@code True} if local state was changed. */ public boolean update(@Nullable GridDhtPartitionExchangeId exchId, @@ -297,6 +298,8 @@ public interface GridDhtPartitionTopology { /** * Resets the state of all LOST partitions to OWNING. + * + * @param resTopVer Exchange result version. */ public void resetLostPartitions(AffinityTopologyVersion resTopVer); http://git-wip-us.apache.org/repos/asf/ignite/blob/51a95a14/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 5e58502..1770497 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -110,12 +110,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { private volatile AffinityTopologyVersion diffFromAffinityVer = AffinityTopologyVersion.NONE; /** */ - //private AffinityTopologyVersion lastExchangeVer; - - /** */ - // private volatile AffinityTopologyVersion readyTopVer = AffinityTopologyVersion.NONE; - - /** */ private volatile AffinityTopologyVersion readyTopVer = AffinityTopologyVersion.NONE; /** */ @@ -401,6 +395,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** + * @param affVer Affinity version. * @param aff Affinity assignments. * @param updateSeq Update sequence. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/51a95a14/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java index f6d870f..179c76f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java @@ -39,7 +39,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.Nullable; /** - * + * Information about affinity assignment. */ public class CacheGroupAffinityMessage implements Message { /** */ @@ -62,6 +62,7 @@ public class CacheGroupAffinityMessage implements Message { /** * @param assign0 Assignment. + * @param assignDiff0 Difference with ideal affinity assignment. */ private CacheGroupAffinityMessage(List<List<ClusterNode>> assign0, Map<Integer, List<Long>> assignDiff0) { if (assign0 != null) { @@ -95,7 +96,12 @@ public class CacheGroupAffinityMessage implements Message { } } - public static Map<Integer, CacheGroupAffinityMessage> createAffinityDiffMessages(Map<Integer, Map<Integer, List<Long>>> affDiff) { + /** + * @param affDiff + * @return + */ + public static Map<Integer, CacheGroupAffinityMessage> createAffinityDiffMessages( + Map<Integer, Map<Integer, List<Long>>> affDiff) { if (F.isEmpty(affDiff)) return null; @@ -181,6 +187,9 @@ public class CacheGroupAffinityMessage implements Message { return assignments0; } + /** + * @return Difference with ideal affinity assignment. + */ public Map<Integer, GridLongList> assignmentsDiff() { return assignsDiff; } http://git-wip-us.apache.org/repos/asf/ignite/blob/51a95a14/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index b43fe92..528a85b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -266,7 +266,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte @GridToStringExclude private GridDhtPartitionsExchangeFuture mergedWith; - /** * @param cctx Cache context. * @param busyLock Busy lock. @@ -296,7 +295,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte exchLog = cctx.logger(EXCHANGE_LOG); initFut = new GridFutureAdapter<Boolean>() { - @Nullable @Override public IgniteLogger logger() { + @Override public IgniteLogger logger() { return log; } }; @@ -512,7 +511,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (fut != null) fut.get(); - cctx.exchange().coordinatorInitialized(); + cctx.exchange().onCoordinatorInitialized(); } } @@ -1028,7 +1027,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte try { long start = U.currentTimeMillis(); - IgniteInternalFuture fut = cctx.snapshot().tryStartLocalSnapshotOperation(events().lastEvent()); + IgniteInternalFuture fut = cctx.snapshot().tryStartLocalSnapshotOperation(firstDiscoEvt); if (fut != null) { fut.get(); @@ -1272,7 +1271,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** + * @param msg Message to send. * @param nodes Nodes. + * @param mergedJoinExchMsgs Messages received from merged 'join node' exchanges. * @param joinedNodeAff Affinity if was requested by some nodes. */ private void sendAllPartitions( @@ -1555,7 +1556,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (CU.clientNode(node)) { if (msg != null) - waitAndReplyToClient(nodeId, msg); + waitAndReplyToNode(nodeId, msg); } else { if (mergedJoinExchMsgs == null) @@ -1640,7 +1641,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte */ private void processMergedMessage(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) { if (msg.client()) { - waitAndReplyToClient(node.id(), msg); + waitAndReplyToNode(node.id(), msg); return; } @@ -1760,29 +1761,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte }); } - public void waitAndReplyToNode(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) { - listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { - FinishState finishState0; - - synchronized (mux) { - finishState0 = finishState; - } - - assert finishState0 != null; - - sendAllPartitionsToNode(finishState0, msg, node.id()); - } - }); - } - /** * @param nodeId Node ID. * @param msg Client's message. */ - private void waitAndReplyToClient(final UUID nodeId, final GridDhtPartitionsSingleMessage msg) { - assert msg.client(); - + public void waitAndReplyToNode(final UUID nodeId, final GridDhtPartitionsSingleMessage msg) { listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { if (cctx.kernalContext().isStopping()) @@ -1814,9 +1797,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte * @param nodeId Sender node. * @param msg Partition single message. */ - void processSingleMessage(UUID nodeId, GridDhtPartitionsSingleMessage msg) { + private void processSingleMessage(UUID nodeId, GridDhtPartitionsSingleMessage msg) { if (msg.client()) { - waitAndReplyToClient(nodeId, msg); + waitAndReplyToNode(nodeId, msg); return; } @@ -3078,7 +3061,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte private void onBecomeCoordinator(InitNewCoordinatorFuture newCrdFut) { boolean allRcvd = false; - cctx.exchange().coordinatorInitialized(); + cctx.exchange().onCoordinatorInitialized(); if (newCrdFut.restoreState()) { GridDhtPartitionsFullMessage fullMsg = newCrdFut.fullMessage(); http://git-wip-us.apache.org/repos/asf/ignite/blob/51a95a14/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index b060704..a164e85 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -164,6 +164,17 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa } /** + * @return Message copy. + */ + GridDhtPartitionsFullMessage copy() { + GridDhtPartitionsFullMessage cp = new GridDhtPartitionsFullMessage(); + + copyStateTo(cp); + + return cp; + } + + /** * @param resTopVer Result topology version. */ public void resultTopologyVersion(AffinityTopologyVersion resTopVer) { @@ -178,17 +189,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa } /** - * @return Message copy. - */ - GridDhtPartitionsFullMessage copy() { - GridDhtPartitionsFullMessage cp = new GridDhtPartitionsFullMessage(); - - copyStateTo(cp); - - return cp; - } - - /** * @return Caches affinity for joining nodes. */ @Nullable public Map<Integer, CacheGroupAffinityMessage> joinedNodeAffinity() { http://git-wip-us.apache.org/repos/asf/ignite/blob/51a95a14/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index ed50634..bc7d314 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@ -94,7 +94,10 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes @GridDirectCollection(Integer.class) private Collection<Integer> grpsAffRequest; - /** */ + /** + * Exchange finish message, sent to new coordinator when it tries to + * restore state after previous coordinator failed during exchange. + */ private GridDhtPartitionsFullMessage finishMsg; /** http://git-wip-us.apache.org/repos/asf/ignite/blob/51a95a14/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java index 82feb12..6317fbc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java @@ -60,19 +60,18 @@ public class GridDhtPartitionsSingleRequest extends GridDhtPartitionsAbstractMes msg.restoreState(true); - msg.restoreExchangeId(restoreExchId); + msg.restoreExchId = restoreExchId; return msg; } + /** + * @return ID of current exchange on new coordinator. + */ GridDhtPartitionExchangeId restoreExchangeId() { return restoreExchId; } - void restoreExchangeId(GridDhtPartitionExchangeId restoreExchId) { - this.restoreExchId = restoreExchId; - } - /** {@inheritDoc} */ @Override public int handlerId() { return 0; http://git-wip-us.apache.org/repos/asf/ignite/blob/51a95a14/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java index 8266052..dc2fbf8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java @@ -34,6 +34,9 @@ public class IgniteDhtPartitionCountersMap implements Serializable { /** */ private Map<Integer, Map<Integer, T2<Long, Long>>> map; + /** + * @return {@code True} if map is empty. + */ public synchronized boolean empty() { return map == null || map.isEmpty(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/51a95a14/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java index 9744c48..7deceb5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java @@ -100,6 +100,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { * @param subjId Subject ID. * @param taskNameHash Task name hash. * @param firstClientReq {@code True} if first optimistic tx prepare request sent from client node. + * @param {@code True} if it is safe for first client request to wait for topology future. * @param addDepInfo Deployment info flag. */ public GridNearTxPrepareRequest( @@ -147,6 +148,10 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { setFlag(allowWaitTopFut, ALLOW_WAIT_TOP_FUT_FLAG_MASK); } + /** + * @return {@code True} if it is safe for first client request to wait for topology future + * completion. + */ public boolean allowWaitTopologyFuture() { return isFlag(ALLOW_WAIT_TOP_FUT_FLAG_MASK); } http://git-wip-us.apache.org/repos/asf/ignite/blob/51a95a14/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java index 7c893b5..fba0a4c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java @@ -210,7 +210,7 @@ public class PlatformDataStreamer extends PlatformAbstractTarget { GridDiscoveryManager discoMgr = platformCtx.kernalContext().discovery(); AffinityTopologyVersion topVer = - platformCtx.kernalContext().cache().context().exchange().readyAffinityVersion(); + platformCtx.kernalContext().cache().context().exchange().lastTopologyFuture().get(); int topSize = discoMgr.cacheNodes(cacheName, topVer).size(); http://git-wip-us.apache.org/repos/asf/ignite/blob/51a95a14/modules/core/src/main/java/org/apache/ignite/internal/util/GridListSet.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridListSet.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridListSet.java index 1a632b0..6226bd2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridListSet.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridListSet.java @@ -373,14 +373,6 @@ public class GridListSet<V> extends GridSerializableSet<V> implements Cloneable return vals.iterator(); } - /** - * @param idx Start index. - * @return List iterator. - */ - public ListIterator<V> listIterator(int idx) { - return vals.listIterator(idx); - } - /** {@inheritDoc} */ @Override public int size() { return vals.size();
