ignite-5578
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b4b3c78f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b4b3c78f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b4b3c78f Branch: refs/heads/ignite-5578-debug Commit: b4b3c78f41713597896861212b7f7ee97d64f95c Parents: 952ca3d Author: sboikov <sboi...@gridgain.com> Authored: Wed Aug 2 12:51:51 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed Aug 2 12:58:25 2017 +0300 ---------------------------------------------------------------------- .../cache/CacheAffinitySharedManager.java | 37 +++++++++++++------- .../processors/cache/GridCacheProcessor.java | 2 +- .../GridDhtPartitionsExchangeFuture.java | 8 ++--- .../preloader/GridDhtPartitionsFullMessage.java | 4 +-- .../GridDhtPartitionsSingleMessage.java | 16 ++++++--- 5 files changed, 42 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b4b3c78f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index f33c355..5214f34 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -1337,28 +1337,45 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap }); } - public void mergeExchangesOnServerJoin(GridDhtPartitionsExchangeFuture fut, boolean crd) + /** + * @param fut Current exchange future. + * @param crd Coordinator flag. + * @throws IgniteCheckedException If failed. + */ + public void onServerJoinWithExchangeMergeProtocol(GridDhtPartitionsExchangeFuture fut, boolean crd) throws IgniteCheckedException { final ExchangeDiscoveryEvents evts = fut.context().events(); - log.info("mergeExchangesOnServerJoin [topVer=" + evts.discoveryCache().version() + ']'); - + assert fut.context().mergeExchanges(); assert evts.serverJoin() && !evts.serverLeft(); WaitRebalanceInfo waitRebalanceInfo = initAffinityOnNodeJoin(fut, crd); - setWaitRebalanceInfo(waitRebalanceInfo, evts.waitRebalanceEventVersion(), crd); + this.waitInfo = waitRebalanceInfo != null && !waitRebalanceInfo.empty() ? waitRebalanceInfo : null; + + WaitRebalanceInfo info = this.waitInfo; + + if (crd) { + if (log.isDebugEnabled()) { + log.debug("Computed new affinity after node join [topVer=" + evts.topologyVersion() + + ", waitGrps=" + (info != null ? groupNames(info.waitGrps.keySet()) : null) + ']'); + } + } } - public Map<Integer, CacheGroupAffinityMessage> mergeExchangesInitAffinityOnServerLeft( + /** + * @param fut Current exchange future. + * @return Computed difference with ideal affinity. + * @throws IgniteCheckedException If failed. + */ + public Map<Integer, CacheGroupAffinityMessage> onServerLeftWithExchangeMergeProtocol( GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException { final ExchangeDiscoveryEvents evts = fut.context().events(); + assert fut.context().mergeExchanges(); assert evts.serverLeft(); - log.info("mergeExchangesInitAffinityOnServerLeft [topVer=" + evts.topologyVersion()+ ']'); - forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() { @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException { AffinityTopologyVersion topVer = evts.topologyVersion(); @@ -1417,17 +1434,13 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap else waitRebalanceInfo = initAffinityOnNodeJoin(fut, crd); - setWaitRebalanceInfo(waitRebalanceInfo, fut.initialVersion(), crd); - } - - private void setWaitRebalanceInfo(WaitRebalanceInfo waitRebalanceInfo, AffinityTopologyVersion topVer, boolean crd) { this.waitInfo = waitRebalanceInfo != null && !waitRebalanceInfo.empty() ? waitRebalanceInfo : null; WaitRebalanceInfo info = this.waitInfo; if (crd) { if (log.isDebugEnabled()) { - log.debug("Computed new affinity after node join [topVer=" + topVer + + log.debug("Computed new affinity after node join [topVer=" + fut.initialVersion() + ", waitGrps=" + (info != null ? groupNames(info.waitGrps.keySet()) : null) + ']'); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b4b3c78f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 7490c2c..f616e5b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -2100,7 +2100,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** * Callback invoked when first exchange future for dynamic cache is completed. * - * @param topVer Completed topology version. + * @param cacheStartVer Started caches version to create proxy for. * @param exchActions Change requests. * @param err Error. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/b4b3c78f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 575161d..2e0f742 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -54,7 +54,6 @@ import org.apache.ignite.internal.events.DiscoveryCustomEvent; import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; -import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; @@ -79,7 +78,6 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage; -import org.apache.ignite.internal.util.GridPartitionStateMap; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -2163,9 +2161,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte exchCtx.events().processEvents(this); if (exchCtx.events().serverLeft()) - idealAffDiff = cctx.affinity().mergeExchangesInitAffinityOnServerLeft(this); + idealAffDiff = cctx.affinity().onServerLeftWithExchangeMergeProtocol(this); else - cctx.affinity().mergeExchangesOnServerJoin(this, true); + cctx.affinity().onServerJoinWithExchangeMergeProtocol(this, true); for (CacheGroupDescriptor desc : cctx.affinity().cacheGroups()) { if (desc.config().getCacheMode() == CacheMode.LOCAL) @@ -2648,7 +2646,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (exchCtx.events().serverLeft()) cctx.affinity().mergeExchangesOnServerLeft(this, msg); else - cctx.affinity().mergeExchangesOnServerJoin(this, false); + cctx.affinity().onServerJoinWithExchangeMergeProtocol(this, false); for (CacheGroupContext grp : cctx.cache().cacheGroups()) { if (grp.isLocal() || cacheGroupStopping(grp.groupId())) http://git-wip-us.apache.org/repos/asf/ignite/blob/b4b3c78f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index 6153b15..d27e302 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -202,14 +202,14 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa } /** - * @return + * @return Difference with ideal affinity. */ @Nullable public Map<Integer, CacheGroupAffinityMessage> idealAffinityDiff() { return idealAffDiff; } /** - * @param idealAffDiff + * @param idealAffDiff Difference with ideal affinity. */ void idealAffinityDiff(Map<Integer, CacheGroupAffinityMessage> idealAffDiff) { this.idealAffDiff = idealAffDiff; http://git-wip-us.apache.org/repos/asf/ignite/blob/b4b3c78f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index 4df6d67..ed50634 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@ -120,23 +120,29 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes this.compress = compress; } - public void finishMessage(GridDhtPartitionsFullMessage finishMsg) { + /** + * @param finishMsg Exchange finish message (used to restore exchange state on new coordinator). + */ + void finishMessage(GridDhtPartitionsFullMessage finishMsg) { this.finishMsg = finishMsg; } - public GridDhtPartitionsFullMessage finishMessage() { + /** + * @return Exchange finish message (used to restore exchange state on new coordinator). + */ + GridDhtPartitionsFullMessage finishMessage() { return finishMsg; } /** - * @param grpsAffRequest + * @param grpsAffRequest Cache groups to get affinity for (affinity is requested when node joins cluster). */ - public void cacheGroupsAffinityRequest(Collection<Integer> grpsAffRequest) { + void cacheGroupsAffinityRequest(Collection<Integer> grpsAffRequest) { this.grpsAffRequest = grpsAffRequest; } /** - * @return + * @return Cache groups to get affinity for (affinity is requested when node joins cluster). */ @Nullable public Collection<Integer> cacheGroupsAffinityRequest() { return grpsAffRequest;