IGNITE-7177 Correctly handle custom messages which do not change affinity
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7cf049e8 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7cf049e8 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7cf049e8 Branch: refs/heads/ignite-zk-ce Commit: 7cf049e86297427054d07298b33cc6874aa3c1c0 Parents: c10aa0c Author: Alexey Goncharuk <[email protected]> Authored: Tue Dec 12 19:11:46 2017 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Tue Dec 12 19:11:46 2017 +0300 ---------------------------------------------------------------------- .../cache/CacheAffinitySharedManager.java | 31 +++++++++++++++----- .../GridDhtPartitionsExchangeFuture.java | 14 +++++++-- 2 files changed, 34 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7cf049e8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 8441a5e..6347d03 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -684,6 +684,28 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } /** + * @param fut Exchange future. + * @param crd Coordinator flag. + * @param exchActions Exchange actions. + */ + public void onCustomMessageNoAffinityChange( + GridDhtPartitionsExchangeFuture fut, + boolean crd, + @Nullable final ExchangeActions exchActions + ) { + final ExchangeDiscoveryEvents evts = fut.context().events(); + + forAllCacheGroups(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() { + @Override public void applyx(GridAffinityAssignmentCache aff) { + if (exchActions != null && exchActions.cacheGroupStopping(aff.groupId())) + return; + + aff.clientEventTopologyChange(evts.lastEvent(), evts.topologyVersion()); + } + }); + } + + /** * Called on exchange initiated for cache start/stop request. * * @param fut Exchange future. @@ -703,14 +725,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap caches.updateCachesInfo(exchActions); // Affinity did not change for existing caches. - forAllCacheGroups(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() { - @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException { - if (exchActions.cacheGroupStopping(aff.groupId())) - return; - - aff.clientEventTopologyChange(evts.lastEvent(), evts.topologyVersion()); - } - }); + onCustomMessageNoAffinityChange(fut, crd, exchActions); for (ExchangeActions.CacheActionData action : exchActions.cacheStartRequests()) { DynamicCacheDescriptor cacheDesc = action.descriptor(); http://git-wip-us.apache.org/repos/asf/ignite/blob/7cf049e8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index d29293e..a8b195d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -588,9 +588,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte exchange = onCacheChangeRequest(crdNode); } else if (msg instanceof SnapshotDiscoveryMessage) { - exchange = CU.clientNode(firstDiscoEvt.eventNode()) ? - onClientNodeEvent(crdNode) : - onServerNodeEvent(crdNode); + exchange = onCustomMessageNoAffinityChange(crdNode); } else { assert affChangeMsg != null : this; @@ -894,6 +892,16 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** * @param crd Coordinator flag. + * @return Exchange type. + */ + private ExchangeType onCustomMessageNoAffinityChange(boolean crd) { + cctx.affinity().onCustomMessageNoAffinityChange(this, crd, exchActions); + + return cctx.kernalContext().clientNode() ? ExchangeType.CLIENT : ExchangeType.ALL; + } + + /** + * @param crd Coordinator flag. * @throws IgniteCheckedException If failed. * @return Exchange type. */
