Done.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/deeee8cb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/deeee8cb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/deeee8cb

Branch: refs/heads/ignite-4565-ddl
Commit: deeee8cb5b05137303dbb06ba4d0180426f4dd43
Parents: 19381da
Author: devozerov <[email protected]>
Authored: Fri Mar 17 17:02:15 2017 +0300
Committer: devozerov <[email protected]>
Committed: Fri Mar 17 17:02:15 2017 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      | 25 ++++++++++++++------
 .../processors/cache/GridCacheProcessor.java    | 10 ++++++++
 2 files changed, 28 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/deeee8cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 1ce8cfe..b4604e8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -54,6 +54,7 @@ import 
org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
@@ -221,10 +222,10 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                     exchFut = exchangeFuture(exchId, evt, cache,null, null);
                 }
                 else {
-                    DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)evt;
+                    DiscoveryCustomMessage customMsg = 
((DiscoveryCustomEvent)evt).customMessage();
 
-                    if (customEvt.customMessage() instanceof 
DynamicCacheChangeBatch) {
-                        DynamicCacheChangeBatch batch = 
(DynamicCacheChangeBatch)customEvt.customMessage();
+                    if (customMsg instanceof DynamicCacheChangeBatch) {
+                        DynamicCacheChangeBatch batch = 
(DynamicCacheChangeBatch)customMsg;
 
                         Collection<DynamicCacheChangeRequest> valid = new 
ArrayList<>(batch.requests().size());
 
@@ -256,8 +257,8 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                             exchFut = exchangeFuture(exchId, evt, cache, 
valid, null);
                         }
                     }
-                    else if (customEvt.customMessage() instanceof 
CacheAffinityChangeMessage) {
-                        CacheAffinityChangeMessage msg = 
(CacheAffinityChangeMessage)customEvt.customMessage();
+                    else if (customMsg instanceof CacheAffinityChangeMessage) {
+                        CacheAffinityChangeMessage msg = 
(CacheAffinityChangeMessage)customMsg;
 
                         if (msg.exchangeId() == null) {
                             if (msg.exchangeNeeded()) {
@@ -266,8 +267,18 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                                 exchFut = exchangeFuture(exchId, evt, cache, 
null, msg);
                             }
                         }
-                        else
-                            exchangeFuture(msg.exchangeId(), null, null, null, 
null).onAffinityChangeMessage(customEvt.eventNode(), msg);
+                        else {
+                            exchangeFuture(msg.exchangeId(), null, null, null, 
null)
+                                .onAffinityChangeMessage(evt.eventNode(), msg);
+                        }
+                    }
+                    else {
+                        // Process event as custom discovery task if needed.
+                        CachePartitionExchangeWorkerTask task =
+                            
cctx.cache().exchangeTaskForCustomDiscoveryMessage(customMsg);
+
+                        if (task != null)
+                            exchWorker.addCustomTask(task);
                     }
                 }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/deeee8cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 459cf3a..a7d38a7 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -367,6 +367,16 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
     }
 
     /**
+     * Create exchange worker task for custom discovery message.
+     *
+     * @param msg Custom discovery message.
+     * @return Task or {@code null} if message doesn't require any special 
processing.
+     */
+    public CachePartitionExchangeWorkerTask 
exchangeTaskForCustomDiscoveryMessage(DiscoveryCustomMessage msg) {
+        return null;
+    }
+
+    /**
      * Process custom exchange task.
      *
      * @param task Task.

Reply via email to