Done.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/deeee8cb Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/deeee8cb Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/deeee8cb Branch: refs/heads/ignite-4565-ddl Commit: deeee8cb5b05137303dbb06ba4d0180426f4dd43 Parents: 19381da Author: devozerov <[email protected]> Authored: Fri Mar 17 17:02:15 2017 +0300 Committer: devozerov <[email protected]> Committed: Fri Mar 17 17:02:15 2017 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 25 ++++++++++++++------ .../processors/cache/GridCacheProcessor.java | 10 ++++++++ 2 files changed, 28 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/deeee8cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 1ce8cfe..b4604e8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -54,6 +54,7 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.events.DiscoveryCustomEvent; import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; @@ -221,10 +222,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana exchFut = exchangeFuture(exchId, evt, cache,null, null); } else { - DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)evt; + DiscoveryCustomMessage customMsg = ((DiscoveryCustomEvent)evt).customMessage(); - if (customEvt.customMessage() instanceof DynamicCacheChangeBatch) { - DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customEvt.customMessage(); + if (customMsg instanceof DynamicCacheChangeBatch) { + DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customMsg; Collection<DynamicCacheChangeRequest> valid = new ArrayList<>(batch.requests().size()); @@ -256,8 +257,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana exchFut = exchangeFuture(exchId, evt, cache, valid, null); } } - else if (customEvt.customMessage() instanceof CacheAffinityChangeMessage) { - CacheAffinityChangeMessage msg = (CacheAffinityChangeMessage)customEvt.customMessage(); + else if (customMsg instanceof CacheAffinityChangeMessage) { + CacheAffinityChangeMessage msg = (CacheAffinityChangeMessage)customMsg; if (msg.exchangeId() == null) { if (msg.exchangeNeeded()) { @@ -266,8 +267,18 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana exchFut = exchangeFuture(exchId, evt, cache, null, msg); } } - else - exchangeFuture(msg.exchangeId(), null, null, null, null).onAffinityChangeMessage(customEvt.eventNode(), msg); + else { + exchangeFuture(msg.exchangeId(), null, null, null, null) + .onAffinityChangeMessage(evt.eventNode(), msg); + } + } + else { + // Process event as custom discovery task if needed. + CachePartitionExchangeWorkerTask task = + cctx.cache().exchangeTaskForCustomDiscoveryMessage(customMsg); + + if (task != null) + exchWorker.addCustomTask(task); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/deeee8cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 459cf3a..a7d38a7 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -367,6 +367,16 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** + * Create exchange worker task for custom discovery message. + * + * @param msg Custom discovery message. + * @return Task or {@code null} if message doesn't require any special processing. + */ + public CachePartitionExchangeWorkerTask exchangeTaskForCustomDiscoveryMessage(DiscoveryCustomMessage msg) { + return null; + } + + /** * Process custom exchange task. * * @param task Task.
