IGNITE-4834: Added ability to execute custom tasks from exchange thread.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/61c845d6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/61c845d6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/61c845d6

Branch: refs/heads/ignite-4565-ddl
Commit: 61c845d66b82463bfad71c28093f2cbf54d99eb0
Parents: 9020d12
Author: devozerov <voze...@gridgain.com>
Authored: Mon Mar 20 10:16:36 2017 +0300
Committer: devozerov <voze...@gridgain.com>
Committed: Mon Mar 20 10:16:36 2017 +0300

----------------------------------------------------------------------
 .../cache/CachePartitionExchangeWorkerTask.java |  29 ++++
 .../GridCachePartitionExchangeManager.java      | 171 +++++++++++++------
 .../processors/cache/GridCacheProcessor.java    |  19 +++
 .../GridDhtPartitionsExchangeFuture.java        |   8 +-
 4 files changed, 176 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/61c845d6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java
new file mode 100644
index 0000000..80ef9f5
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+/**
+ * Cache partition exchange worker task marker interface.
+ */
+public interface CachePartitionExchangeWorkerTask {
+    /**
+     * @return {@code True) if task denotes standard exchange task, {@code 
false} if this is a custom task which
+     * must be executed from within exchange thread.
+     */
+    boolean isExchange();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/61c845d6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 3e72efb..f7edb08 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -32,7 +32,6 @@ import java.util.Map;
 import java.util.NavigableMap;
 import java.util.TreeMap;
 import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.LinkedBlockingDeque;
@@ -55,6 +54,7 @@ import 
org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
@@ -222,10 +222,10 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                     exchFut = exchangeFuture(exchId, evt, cache,null, null);
                 }
                 else {
-                    DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)evt;
+                    DiscoveryCustomMessage customMsg = 
((DiscoveryCustomEvent)evt).customMessage();
 
-                    if (customEvt.customMessage() instanceof 
DynamicCacheChangeBatch) {
-                        DynamicCacheChangeBatch batch = 
(DynamicCacheChangeBatch)customEvt.customMessage();
+                    if (customMsg instanceof DynamicCacheChangeBatch) {
+                        DynamicCacheChangeBatch batch = 
(DynamicCacheChangeBatch)customMsg;
 
                         Collection<DynamicCacheChangeRequest> valid = new 
ArrayList<>(batch.requests().size());
 
@@ -257,8 +257,8 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                             exchFut = exchangeFuture(exchId, evt, cache, 
valid, null);
                         }
                     }
-                    else if (customEvt.customMessage() instanceof 
CacheAffinityChangeMessage) {
-                        CacheAffinityChangeMessage msg = 
(CacheAffinityChangeMessage)customEvt.customMessage();
+                    else if (customMsg instanceof CacheAffinityChangeMessage) {
+                        CacheAffinityChangeMessage msg = 
(CacheAffinityChangeMessage)customMsg;
 
                         if (msg.exchangeId() == null) {
                             if (msg.exchangeNeeded()) {
@@ -267,8 +267,18 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                                 exchFut = exchangeFuture(exchId, evt, cache, 
null, msg);
                             }
                         }
-                        else
-                            exchangeFuture(msg.exchangeId(), null, null, null, 
null).onAffinityChangeMessage(customEvt.eventNode(), msg);
+                        else {
+                            exchangeFuture(msg.exchangeId(), null, null, null, 
null)
+                                .onAffinityChangeMessage(evt.eventNode(), msg);
+                        }
+                    }
+                    else {
+                        // Process event as custom discovery task if needed.
+                        CachePartitionExchangeWorkerTask task =
+                            
cctx.cache().exchangeTaskForCustomDiscoveryMessage(customMsg);
+
+                        if (task != null)
+                            exchWorker.addCustomTask(task);
                     }
                 }
 
@@ -369,7 +379,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
         if (reconnect)
             reconnectExchangeFut = new GridFutureAdapter<>();
 
-        exchWorker.futQ.addFirst(fut);
+        exchWorker.addFirstExchangeFuture(fut);
 
         if (!cctx.kernalContext().clientNode()) {
             for (int cnt = 0; cnt < 
cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++) {
@@ -684,7 +694,18 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
      * @return {@code True} if pending future queue is empty.
      */
     public boolean hasPendingExchange() {
-        return !exchWorker.futQ.isEmpty();
+        return exchWorker.hasPendingExchange();
+    }
+
+    /**
+     * Add custom task.
+     *
+     * @param task Task.
+     */
+    public void addCustomTask(CachePartitionExchangeWorkerTask task) {
+        assert !task.isExchange();
+
+        exchWorker.addCustomTask(task);
     }
 
     /**
@@ -704,7 +725,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
      */
     public void forceDummyExchange(boolean reassign,
         GridDhtPartitionsExchangeFuture exchFut) {
-        exchWorker.addFuture(
+        exchWorker.addExchangeFuture(
             new GridDhtPartitionsExchangeFuture(cctx, reassign, 
exchFut.discoveryEvent(), exchFut.exchangeId()));
     }
 
@@ -716,7 +737,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     public IgniteInternalFuture<Boolean> 
forceRebalance(GridDhtPartitionsExchangeFuture exchFut) {
         GridFutureAdapter<Boolean> fut = new GridFutureAdapter<>();
 
-        exchWorker.addFuture(
+        exchWorker.addExchangeFuture(
             new GridDhtPartitionsExchangeFuture(cctx, 
exchFut.discoveryEvent(), exchFut.exchangeId(), fut));
 
         return fut;
@@ -1192,7 +1213,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
      */
     private boolean addFuture(GridDhtPartitionsExchangeFuture fut) {
         if (fut.onAdded()) {
-            exchWorker.addFuture(fut);
+            exchWorker.addExchangeFuture(fut);
 
             return true;
         }
@@ -1345,10 +1366,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
         U.warn(log, "Last exchange future: " + lastInitializedFut);
 
-        U.warn(log, "Pending exchange futures:");
-
-        for (GridDhtPartitionsExchangeFuture fut : exchWorker.futQ)
-            U.warn(log, ">>> " + fut);
+        exchWorker.dumpExchangeDebugInfo();
 
         if (!readyFuts.isEmpty()) {
             U.warn(log, "Pending affinity ready futures:");
@@ -1547,28 +1565,6 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     }
 
     /**
-     * @param deque Deque to poll from.
-     * @param time Time to wait.
-     * @param w Worker.
-     * @return Polled item.
-     * @throws InterruptedException If interrupted.
-     */
-    @Nullable private <T> T poll(BlockingQueue<T> deque, long time, GridWorker 
w) throws InterruptedException {
-        assert w != null;
-
-        // There is currently a case where {@code interrupted}
-        // flag on a thread gets flipped during stop which causes the pool to 
hang.  This check
-        // will always make sure that interrupted flag gets reset before going 
into wait conditions.
-        // The true fix should actually make sure that interrupted flag does 
not get reset or that
-        // interrupted exception gets propagated. Until we find a real fix, 
this method should
-        // always work to make sure that there is no hanging during stop.
-        if (w.isCancelled())
-            Thread.currentThread().interrupt();
-
-        return deque.poll(time, MILLISECONDS);
-    }
-
-    /**
      * @param node Target node.
      * @return {@code True} if can use compression for partition map messages.
      */
@@ -1592,32 +1588,94 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
      */
     private class ExchangeWorker extends GridWorker {
         /** Future queue. */
-        private final LinkedBlockingDeque<GridDhtPartitionsExchangeFuture> 
futQ =
+        private final LinkedBlockingDeque<CachePartitionExchangeWorkerTask> 
futQ =
             new LinkedBlockingDeque<>();
 
         /** Busy flag used as performance optimization to stop current 
preloading. */
         private volatile boolean busy;
 
         /**
-         *
+         * Constructor.
          */
         private ExchangeWorker() {
             super(cctx.igniteInstanceName(), "partition-exchanger", 
GridCachePartitionExchangeManager.this.log);
         }
 
         /**
+         * Add first exchange future.
+         *
+         * @param exchFut Exchange future.
+         */
+        void addFirstExchangeFuture(GridDhtPartitionsExchangeFuture exchFut) {
+            futQ.addFirst(exchFut);
+        }
+
+        /**
          * @param exchFut Exchange future.
          */
-        void addFuture(GridDhtPartitionsExchangeFuture exchFut) {
+        void addExchangeFuture(GridDhtPartitionsExchangeFuture exchFut) {
             assert exchFut != null;
 
-            if (!exchFut.dummy() || (futQ.isEmpty() && !busy))
+            if (!exchFut.dummy() || (!hasPendingExchange() && !busy))
                 futQ.offer(exchFut);
 
             if (log.isDebugEnabled())
                 log.debug("Added exchange future to exchange worker: " + 
exchFut);
         }
 
+        /**
+         * Add custom exchange task.
+         *
+         * @param task Task.
+         */
+        void addCustomTask(CachePartitionExchangeWorkerTask task) {
+            assert task != null;
+
+            assert !task.isExchange();
+
+            futQ.offer(task);
+        }
+
+        /**
+         * Process custom exchange task.
+         *
+         * @param task Task.
+         */
+        void processCustomTask(CachePartitionExchangeWorkerTask task) {
+            try {
+                cctx.cache().processCustomExchangeTask(task);
+            }
+            catch (Exception e) {
+                U.warn(log, "Failed to process custom exchange task: " + task, 
e);
+            }
+        }
+
+        /**
+         * @return Whether pending exchange future exists.
+         */
+        boolean hasPendingExchange() {
+            if (!futQ.isEmpty()) {
+                for (CachePartitionExchangeWorkerTask task : futQ) {
+                    if (task.isExchange())
+                        return true;
+                }
+            }
+
+            return false;
+        }
+
+        /**
+         * Dump debug info.
+         */
+        void dumpExchangeDebugInfo() {
+            U.warn(log, "Pending exchange futures:");
+
+            for (CachePartitionExchangeWorkerTask task: futQ) {
+                if (task.isExchange())
+                    U.warn(log, ">>> " + task);
+            }
+        }
+
         /** {@inheritDoc} */
         @Override protected void body() throws InterruptedException, 
IgniteInterruptedCheckedException {
             long timeout = cctx.gridConfig().getNetworkTimeout();
@@ -1625,7 +1683,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
             int cnt = 0;
 
             while (!isCancelled()) {
-                GridDhtPartitionsExchangeFuture exchFut = null;
+                CachePartitionExchangeWorkerTask task = null;
 
                 cnt++;
 
@@ -1640,7 +1698,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                     }
 
                     // If not first preloading and no more topology events 
present.
-                    if (!cctx.kernalContext().clientNode() && futQ.isEmpty() 
&& preloadFinished)
+                    if (!cctx.kernalContext().clientNode() && 
!hasPendingExchange() && preloadFinished)
                         timeout = cctx.gridConfig().getNetworkTimeout();
 
                     // After workers line up and before preloading starts we 
initialize all futures.
@@ -1656,11 +1714,24 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                     }
 
                     // Take next exchange future.
-                    exchFut = poll(futQ, timeout, this);
+                    if (isCancelled())
+                        Thread.currentThread().interrupt();
 
-                    if (exchFut == null)
+                    task = futQ.poll(timeout, MILLISECONDS);
+
+                    if (task == null)
                         continue; // Main while loop.
 
+                    if (!task.isExchange()) {
+                        processCustomTask(task);
+
+                        continue;
+                    }
+
+                    assert task instanceof GridDhtPartitionsExchangeFuture;
+
+                    GridDhtPartitionsExchangeFuture exchFut = 
(GridDhtPartitionsExchangeFuture)task;
+
                     busy = true;
 
                     Map<Integer, GridDhtPreloaderAssignments> assignsMap = 
null;
@@ -1727,7 +1798,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                                 changed |= 
cacheCtx.topology().afterExchange(exchFut);
                             }
 
-                            if (!cctx.kernalContext().clientNode() && changed 
&& futQ.isEmpty())
+                            if (!cctx.kernalContext().clientNode() && changed 
&& !hasPendingExchange())
                                 refreshPartitions();
                         }
                         else {
@@ -1824,7 +1895,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
                             U.log(log, "Rebalancing scheduled [order=" + 
rebList + "]");
 
-                            if (futQ.isEmpty()) {
+                            if (!hasPendingExchange()) {
                                 U.log(log, "Rebalancing started " +
                                     "[top=" + exchFut.topologyVersion() + ", 
evt=" + exchFut.discoveryEvent().name() +
                                     ", node=" + 
exchFut.discoveryEvent().eventNode().id() + ']');
@@ -1850,7 +1921,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                 }
                 catch (IgniteCheckedException e) {
                     U.error(log, "Failed to wait for completion of partition 
map exchange " +
-                        "(preloading will not start): " + exchFut, e);
+                        "(preloading will not start): " + task, e);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/61c845d6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index c7ac31a..a7d38a7 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -367,6 +367,25 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
     }
 
     /**
+     * Create exchange worker task for custom discovery message.
+     *
+     * @param msg Custom discovery message.
+     * @return Task or {@code null} if message doesn't require any special 
processing.
+     */
+    public CachePartitionExchangeWorkerTask 
exchangeTaskForCustomDiscoveryMessage(DiscoveryCustomMessage msg) {
+        return null;
+    }
+
+    /**
+     * Process custom exchange task.
+     *
+     * @param task Task.
+     */
+    public void processCustomExchangeTask(CachePartitionExchangeWorkerTask 
task) {
+        // No-op.
+    }
+
+    /**
      * @param c Ignite configuration.
      * @param cc Configuration to validate.
      * @param cacheType Cache type.

http://git-wip-us.apache.org/repos/asf/ignite/blob/61c845d6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 46fb144..50937a8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -48,6 +48,7 @@ import 
org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
 import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
+import 
org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
 import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -84,7 +85,7 @@ import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.SYS
  * Future for exchanging partition maps.
  */
 public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityTopologyVersion>
-    implements Comparable<GridDhtPartitionsExchangeFuture>, 
GridDhtTopologyFuture {
+    implements Comparable<GridDhtPartitionsExchangeFuture>, 
GridDhtTopologyFuture, CachePartitionExchangeWorkerTask {
     /** */
     public static final int DUMP_PENDING_OBJECTS_THRESHOLD =
         
IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_DUMP_PENDING_OBJECTS_THRESHOLD,
 10);
@@ -1677,6 +1678,11 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
     }
 
     /** {@inheritDoc} */
+    @Override public boolean isExchange() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
     @Override public int compareTo(GridDhtPartitionsExchangeFuture fut) {
         return exchId.compareTo(fut.exchId);
     }

Reply via email to