Repository: ignite Updated Branches: refs/heads/ignite-1093-2 3d1a95fd6 -> 0566a77b5
1093 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0566a77b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0566a77b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0566a77b Branch: refs/heads/ignite-1093-2 Commit: 0566a77b5772043347e52d9376567948855fc3fb Parents: 3d1a95f Author: Anton Vinogradov <[email protected]> Authored: Sat Oct 17 10:18:24 2015 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Sat Oct 17 10:18:24 2015 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 28 +++++++++--- .../processors/cache/GridCachePreloader.java | 5 ++- .../cache/GridCachePreloaderAdapter.java | 5 ++- .../dht/preloader/GridDhtPartitionDemander.java | 45 ++++++++------------ .../dht/preloader/GridDhtPreloader.java | 5 ++- 5 files changed, 48 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0566a77b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 4d95894..ababac1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -29,6 +29,7 @@ import java.util.Queue; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicBoolean; @@ -141,7 +142,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana private GridFutureAdapter<?> reconnectExchangeFut; /** */ - private final Queue<Runnable> rebalancingQueue = new ConcurrentLinkedDeque8<>(); + private final Queue<Callable<Boolean>> rebalancingQueue = new ConcurrentLinkedDeque8<>(); /** * Partition map futures. @@ -1295,7 +1296,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana orderMap.get(order).add(cacheId); } - Runnable marsR = null; + Callable<Boolean> marsR = null; //Ordered rebalance scheduling. for (Integer order : orderMap.keySet()) { @@ -1310,7 +1311,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } } - Runnable r = cacheCtx.preloader().addAssignments( + Callable<Boolean> r = cacheCtx.preloader().addAssignments( assignsMap.get(cacheId), forcePreload, waitList, cnt); if (r != null) { @@ -1333,7 +1334,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana U.log(log, "Starting caches rebalancing [top=" + exchFut.topologyVersion() + "]"); if (marsR != null) - marsR.run();//Marshaller cache rebalancing launches in sync way. + try { + marsR.call();//Marshaller cache rebalancing launches in sync way. + } + catch (Exception ex) { + U.error(log, "Failed to send partition demand message to node", ex); + + continue; + } final GridFutureAdapter fut = new GridFutureAdapter(); @@ -1343,14 +1351,20 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana @Override public Boolean call() { try { while (true) { - Runnable rn = rebalancingQueue.poll(); + Callable<Boolean> r = rebalancingQueue.poll(); - if (rn == null) + if (r == null) return false; - rn.run(); + if (!r.call()) + return false; } } + catch (Exception ex) { + U.error(log, "Failed to send partition demand message to node", ex); + + return false; + } finally { fut.onDone(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/0566a77b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java index 9555bf4..79861a2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache; import java.util.Collection; import java.util.UUID; +import java.util.concurrent.Callable; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -97,8 +98,8 @@ public interface GridCachePreloader { * @param caches Rebalancing of these caches will be finished before this started. * @param cnt Counter. */ - public Runnable addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload, - Collection<String> caches, int cnt) throws IgniteCheckedException; + public Callable<Boolean> addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload, + Collection<String> caches, int cnt); /** * @param p Preload predicate. http://git-wip-us.apache.org/repos/asf/ignite/blob/0566a77b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java index 4aba537..b784383 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache; import java.util.Collection; import java.util.UUID; +import java.util.concurrent.Callable; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.affinity.AffinityFunction; @@ -163,8 +164,8 @@ public class GridCachePreloaderAdapter implements GridCachePreloader { } /** {@inheritDoc} */ - @Override public Runnable addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload, - Collection<String> caches, int cnt) throws IgniteCheckedException { + @Override public Callable<Boolean> addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload, + Collection<String> caches, int cnt) { return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/0566a77b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index b8aa2b0..85649c4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -267,8 +268,8 @@ public class GridDhtPartitionDemander { * @param cnt Counter. * @throws IgniteCheckedException Exception */ - Runnable addAssignments(final GridDhtPreloaderAssignments assigns, boolean force, final Collection<String> caches, - int cnt) throws IgniteCheckedException { + Callable<Boolean> addAssignments(final GridDhtPreloaderAssignments assigns, boolean force, + final Collection<String> caches, int cnt) { if (log.isDebugEnabled()) log.debug("Adding partition assignments: " + assigns); @@ -298,17 +299,18 @@ public class GridDhtPartitionDemander { return null; } - return new Runnable() { - @Override - public void run() { + return new Callable<Boolean>() { + @Override public Boolean call() throws Exception{ for (String c : caches) { waitForCacheRebalancing(c, fut); if (fut.isDone()) - return; + return false; } requestPartitions(fut, assigns); + + return true; } }; } @@ -343,7 +345,7 @@ public class GridDhtPartitionDemander { /** * @param fut Future. */ - private void requestPartitions(RebalanceFuture fut, GridDhtPreloaderAssignments assigns) { + private void requestPartitions(RebalanceFuture fut, GridDhtPreloaderAssignments assigns) throws IgniteCheckedException { for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) { if (topologyChanged(fut)) { fut.cancel(); @@ -394,20 +396,13 @@ public class GridDhtPartitionDemander { initD.topic(GridCachePartitionExchangeManager.rebalanceTopic(cnt)); initD.updateSequence(fut.updateSeq); - try { - cctx.io().sendOrderedMessage(node, - GridCachePartitionExchangeManager.rebalanceTopic(cnt), initD, cctx.ioPolicy(), d.timeout()); - - if (log.isDebugEnabled()) - log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" + - cnt + ", partitions count=" + sParts.get(cnt).size() + - " (" + partitionsList(sParts.get(cnt)) + ")]"); - } - catch (IgniteCheckedException ex) { - fut.cancel(); + cctx.io().sendOrderedMessage(node, + GridCachePartitionExchangeManager.rebalanceTopic(cnt), initD, cctx.ioPolicy(), d.timeout()); - U.error(log, "Failed to send partition demand message to node", ex); - } + if (log.isDebugEnabled()) + log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" + + cnt + ", partitions count=" + sParts.get(cnt).size() + + " (" + partitionsList(sParts.get(cnt)) + ")]"); } } } @@ -1365,7 +1360,7 @@ public class GridDhtPartitionDemander { * @param node Node. * @param d D. */ - public void run(ClusterNode node, GridDhtPartitionDemandMessage d) { + public void run(ClusterNode node, GridDhtPartitionDemandMessage d) throws IgniteCheckedException{ demandLock.readLock().lock(); try { @@ -1399,14 +1394,10 @@ public class GridDhtPartitionDemander { fut.cancel(); } - catch (IgniteCheckedException e) { - U.error(log, "Failed to receive partitions from node (rebalancing will not " + - "fully finish) [node=" + node.id() + ", msg=" + d + ']', e); - - fut.cancel(node.id()); - } catch (InterruptedException e) { fut.cancel(); + + throw new IgniteCheckedException(e); } } finally { http://git-wip-us.apache.org/repos/asf/ignite/blob/0566a77b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 0080406..3441f94 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.List; import java.util.Queue; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; @@ -408,8 +409,8 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { } /** {@inheritDoc} */ - @Override public Runnable addAssignments(GridDhtPreloaderAssignments assignments, - boolean forcePreload, Collection<String> caches, int cnt) throws IgniteCheckedException { + @Override public Callable<Boolean> addAssignments(GridDhtPreloaderAssignments assignments, + boolean forcePreload, Collection<String> caches, int cnt) { return demander.addAssignments(assignments, forcePreload, caches, cnt); }
