ignite-1093
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d89f1b0a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d89f1b0a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d89f1b0a Branch: refs/heads/ignite-1093 Commit: d89f1b0af5ed23bc0f6e68fa9f0d377e6be41fcc Parents: a7ddb62 Author: Anton Vinogradov <[email protected]> Authored: Fri Aug 28 15:37:14 2015 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Fri Aug 28 15:37:14 2015 +0300 ---------------------------------------------------------------------- .../apache/ignite/internal/IgniteKernal.java | 1 + .../ignite/internal/IgniteNodeAttributes.java | 3 + .../GridCachePartitionExchangeManager.java | 3 +- .../dht/preloader/GridDhtPartitionDemander.java | 709 ++++++++++++++++--- .../dht/preloader/GridDhtPartitionSupplier.java | 290 +++++++- .../dht/preloader/GridDhtPreloader.java | 10 +- ...ridCacheMassiveRebalancingAsyncSelfTest.java | 91 --- ...GridCacheMassiveRebalancingSyncSelfTest.java | 392 ---------- .../GridCacheRebalancingAsyncSelfTest.java | 85 +++ .../GridCacheRebalancingSyncSelfTest.java | 269 +++++++ 10 files changed, 1267 insertions(+), 586 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d89f1b0a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 1db73bf..03110c9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -1170,6 +1170,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { add(ATTR_MARSHALLER, cfg.getMarshaller().getClass().getName()); add(ATTR_USER_NAME, System.getProperty("user.name")); add(ATTR_GRID_NAME, gridName); + add(REBALANCING_VERSION, 1); add(ATTR_PEER_CLASSLOADING, cfg.isPeerClassLoadingEnabled()); add(ATTR_DEPLOYMENT_MODE, cfg.getDeploymentMode()); http://git-wip-us.apache.org/repos/asf/ignite/blob/d89f1b0a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java index 10b8df0..c04c69b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java @@ -135,6 +135,9 @@ public final class IgniteNodeAttributes { /** Node consistent id. */ public static final String ATTR_NODE_CONSISTENT_ID = ATTR_PREFIX + ".consistent.id"; + /** Rebalancing version id. */ + public static final String REBALANCING_VERSION = ATTR_PREFIX + ".rebalancing.version"; + /** * Enforces singleton. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/d89f1b0a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 003e8db..bf77d1e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -282,7 +282,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana cctx.io().addOrderedHandler(demanderTopic(cnt), new CI2<UUID, GridDhtPartitionSupplyMessageV2>() { @Override public void apply(final UUID id, final GridDhtPartitionSupplyMessageV2 m) { - enterBusy(); + if (!enterBusy()) + return; try { http://git-wip-us.apache.org/repos/asf/ignite/blob/d89f1b0a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 0474bf9..0aa30b4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -36,12 +36,17 @@ import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.thread.*; +import org.jetbrains.annotations.*; import org.jsr166.*; import java.util.*; +import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import java.util.concurrent.locks.*; +import static java.util.concurrent.TimeUnit.*; import static org.apache.ignite.events.EventType.*; +import static org.apache.ignite.internal.GridTopic.*; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*; import static org.apache.ignite.internal.processors.dr.GridDrType.*; @@ -69,13 +74,17 @@ public class GridDhtPartitionDemander { /** Last exchange future. */ private volatile GridDhtPartitionsExchangeFuture lastExchangeFut; + /** Demand lock. */ + private final ReadWriteLock demandLock; + /** * @param cctx Cache context. */ - public GridDhtPartitionDemander(GridCacheContext<?, ?> cctx) { + public GridDhtPartitionDemander(GridCacheContext<?, ?> cctx, ReadWriteLock demandLock) { assert cctx != null; this.cctx = cctx; + this.demandLock = demandLock; log = cctx.logger(getClass()); @@ -199,7 +208,13 @@ public class GridDhtPartitionDemander { else fut.init(assigns); - if (assigns.isEmpty() || topologyChanged(topVer)) { + if (assigns.isEmpty()) { + fut.onDone(); + + return; + } + + if (topologyChanged(topVer)) { fut.onCancel(); return; @@ -225,13 +240,17 @@ public class GridDhtPartitionDemander { } } catch (IgniteInterruptedCheckedException ignored) { - if (log.isDebugEnabled()) + if (log.isDebugEnabled()) { log.debug("Failed to wait for marshaller cache preload future (grid is stopping): " + "[cacheName=" + cctx.name() + ']'); + cSF.onCancel(); - return; + return; + } } catch (IgniteCheckedException e) { + cSF.onCancel(); + throw new Error("Ordered preload future should never fail: " + e.getMessage(), e); } } @@ -257,18 +276,22 @@ public class GridDhtPartitionDemander { } } catch (IgniteInterruptedCheckedException ignored) { - if (log.isDebugEnabled()) + if (log.isDebugEnabled()) { log.debug("Failed to wait for ordered rebalance future (grid is stopping): " + "[cacheName=" + cctx.name() + ", rebalanceOrder=" + rebalanceOrder + ']'); + cSF.onCancel(); - return; + return; + } } catch (IgniteCheckedException e) { + cSF.onCancel(); + throw new Error("Ordered rebalance future should never fail: " + e.getMessage(), e); } } - requestPartitions(assigns, topVer, cSF); + requestPartitions(cSF); } }).start(); @@ -300,12 +323,13 @@ public class GridDhtPartitionDemander { } /** - * @param assigns Assigns. + * @param fut Future. */ - private void requestPartitions( - final GridDhtPreloaderAssignments assigns, - AffinityTopologyVersion topVer, - SyncFuture fut) { + private void requestPartitions(SyncFuture fut) { + final GridDhtPreloaderAssignments assigns = fut.assigns; + + AffinityTopologyVersion topVer = fut.topologyVersion(); + for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) { if (topologyChanged(topVer)) { fut.onCancel(); @@ -313,73 +337,77 @@ public class GridDhtPartitionDemander { return; } + final ClusterNode node = e.getKey(); + GridDhtPartitionDemandMessage d = e.getValue(); d.timeout(cctx.config().getRebalanceTimeout()); d.workerId(0);//old api support. - final ClusterNode node = e.getKey(); + final CacheConfiguration cfg = cctx.config(); final long start = U.currentTimeMillis(); - final CacheConfiguration cfg = cctx.config(); + fut.logStart(node.id(), start); - final AffinityTopologyVersion top = d.topologyVersion(); + U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() + + ", from node=" + node.id() + ", partitions count=" + d.partitions().size() + ", topology=" + d.topologyVersion() + "]"); - if (cfg.getRebalanceDelay() >= 0 && !cctx.kernalContext().clientNode()) { - U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() + - ", from node=" + node.id() + ", partitions count=" + d.partitions().size() + ", topology=" + d.topologyVersion() + "]"); + //Check remote node rebalancing API version. + if (new Integer(1).equals(node.attribute(IgniteNodeAttributes.REBALANCING_VERSION))) { + GridConcurrentHashSet<Integer> remainings = new GridConcurrentHashSet<>(); - fut.listen(new CI1<IgniteInternalFuture<Boolean>>() { - @Override public void apply(IgniteInternalFuture<Boolean> t) { - Boolean completed = ((SyncFuture)t).isCompleted(); - U.log(log, (!completed ? "Cancelled" : "Completed") + " rebalancing [cache=" + cctx.name() + ", mode=" - + cfg.getRebalanceMode() + ", from node=" + node.id() + ", topology=" + top + - ", time=" + (U.currentTimeMillis() - start) + " ms]"); - } - }); - } + remainings.addAll(d.partitions()); - GridConcurrentHashSet<Integer> remainings = new GridConcurrentHashSet<>(); + fut.append(node.id(), remainings); - remainings.addAll(d.partitions()); + int lsnrCnt = Math.max(1, cctx.gridConfig().getRebalanceThreadPoolSize() / 2); - fut.append(node.id(), remainings); + List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt); - int lsnrCnt = Math.max(1, cctx.gridConfig().getRebalanceThreadPoolSize() / 2); + for (int cnt = 0; cnt < lsnrCnt; cnt++) + sParts.add(new HashSet<Integer>()); - List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt); + Iterator<Integer> it = d.partitions().iterator(); - for (int cnt = 0; cnt < lsnrCnt; cnt++) - sParts.add(new HashSet<Integer>()); + int cnt = 0; - Iterator<Integer> it = d.partitions().iterator(); + while (it.hasNext()) + sParts.get(cnt++ % lsnrCnt).add(it.next()); - int cnt = 0; + for (cnt = 0; cnt < lsnrCnt; cnt++) { - while (it.hasNext()) - sParts.get(cnt++ % lsnrCnt).add(it.next()); + if (!sParts.get(cnt).isEmpty()) { - for (cnt = 0; cnt < lsnrCnt; cnt++) { + // Create copy. + GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt)); - if (!sParts.get(cnt).isEmpty()) { + initD.topic(GridCachePartitionExchangeManager.demanderTopic(cnt)); - // Create copy. - GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt)); + try { + if (!topologyChanged(topVer)) + cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.supplierTopic(cnt), initD, cctx.ioPolicy(), d.timeout()); + else + fut.onCancel(); + } + catch (IgniteCheckedException ex) { + fut.onCancel(); - initD.topic(GridCachePartitionExchangeManager.demanderTopic(cnt)); + U.error(log, "Failed to send partition demand message to node", ex); + } - try { - cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.supplierTopic(cnt), initD, cctx.ioPolicy(), d.timeout()); - } - catch (IgniteCheckedException ex) { - U.error(log, "Failed to send partition demand message to local node", ex); + if (log.isDebugEnabled()) + log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" + cnt + ", partitions count=" + sParts.get(cnt).size() + " (" + partitionsList(sParts.get(cnt)) + ")]"); } - - if (log.isDebugEnabled()) - log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" + cnt + ", partitions count=" + sParts.get(cnt).size() + " (" + partitionsList(sParts.get(cnt)) + ")]"); } } + else { + DemandWorker dw = new DemandWorker(dmIdx.incrementAndGet(), fut); + + fut.append(node.id(), d.partitions()); + + dw.run(node, d); + } } } @@ -445,7 +473,7 @@ public class GridDhtPartitionDemander { final SyncFuture fut = syncFut; if (topologyChanged(topVer)) { - fut.onCancel(id, topVer); + fut.onCancel(); return; } @@ -462,7 +490,7 @@ public class GridDhtPartitionDemander { if (log.isDebugEnabled()) log.debug("Class got undeployed during preloading: " + supply.classError()); - fut.onCancel(id, topVer); + fut.onCancel(id); return; } @@ -515,7 +543,7 @@ public class GridDhtPartitionDemander { if (last) { top.own(part); - fut.onPartitionDone(id, p, topVer); + fut.onPartitionDone(id, p); if (log.isDebugEnabled()) log.debug("Finished rebalancing partition: " + part); @@ -527,14 +555,14 @@ public class GridDhtPartitionDemander { } } else { - fut.onPartitionDone(id, p, topVer); + fut.onPartitionDone(id, p); if (log.isDebugEnabled()) log.debug("Skipping rebalancing partition (state is not MOVING): " + part); } } else { - fut.onPartitionDone(id, p, topVer); + fut.onPartitionDone(id, p); if (log.isDebugEnabled()) log.debug("Skipping rebalancing partition (it does not belong on current node): " + p); @@ -544,10 +572,10 @@ public class GridDhtPartitionDemander { // Only request partitions based on latest topology version. for (Integer miss : supply.missed()) if (cctx.affinity().localNode(miss, topVer)) - fut.onMissedPartition(id, miss, topVer); + fut.onMissedPartition(id, miss); for (Integer miss : supply.missed()) - fut.onPartitionDone(id, miss, topVer); + fut.onPartitionDone(id, miss); if (!fut.isDone()) { @@ -569,15 +597,15 @@ public class GridDhtPartitionDemander { } catch (ClusterTopologyCheckedException e) { if (log.isDebugEnabled()) - log.debug("Node left during rebalancing (will retry) [node=" + node.id() + + log.debug("Node left during rebalancing [node=" + node.id() + ", msg=" + e.getMessage() + ']'); - fut.onCancel(id, topVer); + fut.onCancel(); } catch (IgniteCheckedException ex) { U.error(log, "Failed to receive partitions from node (rebalancing will not " + "fully finish) [node=" + node.id() + ", msg=" + supply + ']', ex); - fut.onCancel(id, topVer); + fut.onCancel(node.id()); } } @@ -687,6 +715,10 @@ public class GridDhtPartitionDemander { private ConcurrentHashMap8<UUID, Collection<Integer>> missed = new ConcurrentHashMap8<>(); + private ConcurrentHashMap8<UUID, Long> started = new ConcurrentHashMap8<>(); + + private Lock lock = new ReentrantLock(); + private volatile GridLocalEventListener lsnr; /** Assignments. */ @@ -694,14 +726,23 @@ public class GridDhtPartitionDemander { private volatile boolean completed = true; + /** + * @param assigns Assigns. + */ SyncFuture(GridDhtPreloaderAssignments assigns) { this.assigns = assigns; } + /** + * + */ public AffinityTopologyVersion topologyVersion() { return assigns != null ? assigns.topologyVersion() : null; } + /** + * @param assigns Assigns. + */ void init(GridDhtPreloaderAssignments assigns) { final SyncFuture fut = this; @@ -716,80 +757,157 @@ public class GridDhtPartitionDemander { this.assigns = assigns; } + /** + * + */ boolean isInited() { return assigns != null; } + /** + * @param nodeId Node id. + * @param parts Parts. + */ void append(UUID nodeId, Collection<Integer> parts) { remaining.put(nodeId, parts); missed.put(nodeId, new GridConcurrentHashSet<Integer>()); } + /** + * @param nodeId Node id. + * @param time Time. + */ + void logStart(UUID nodeId, long time) { + started.put(nodeId, time); + } + + /** + * @param topVer Topology version. + * @param node Node. + */ GridDhtPartitionDemandMessage getDemandMessage(AffinityTopologyVersion topVer, ClusterNode node) { - if (!topVer.equals(assigns.topologyVersion())) + if (isDone() || !topVer.equals(assigns.topologyVersion())) return null; return assigns.get(node); } + /** + * + */ void onCancel() { - remaining.clear(); + lock.lock(); + try { + if (isDone()) + return; + + remaining.clear(); - completed = false; + completed = false; - checkIsDone(); + U.log(log, (!completed ? "Cancelled" : "Completed") + " rebalancing from all nodes [cache=" + cctx.name() + + ", topology=" + topologyVersion() + + ", time=" + + (started.isEmpty() ? 0 : (U.currentTimeMillis() - Collections.min(started.values()))) + " ms]"); + + checkIsDone(); + } + finally { + lock.unlock(); + } } - void onCancel(UUID nodeId, AffinityTopologyVersion topVer) { - if (isDone() || !topVer.equals(assigns.topologyVersion())) - return; + /** + * @param nodeId Node id. + */ + void onCancel(UUID nodeId) { + lock.lock(); + try { + if (isDone()) + return; - remaining.remove(nodeId); + remaining.remove(nodeId); - completed = false; + completed = false; + + U.log(log, ("Cancelled rebalancing [cache=" + cctx.name() + + ", from node=" + nodeId + ", topology=" + topologyVersion() + + ", time=" + (U.currentTimeMillis() - started.get(nodeId)) + " ms]")); + + checkIsDone(); + } + finally { + lock.unlock(); + } - checkIsDone(); } + /** + * @return Is completed. + */ boolean isCompleted() { return completed; } - void onMissedPartition(UUID nodeId, int p, AffinityTopologyVersion topVer) { - if (isDone() || !topVer.equals(assigns.topologyVersion())) - return; + /** + * @param nodeId Node id. + * @param p P. + */ + void onMissedPartition(UUID nodeId, int p) { + lock.lock(); + try { + if (isDone()) + return; - if (missed.get(nodeId) == null) - missed.put(nodeId, new GridConcurrentHashSet<Integer>()); + if (missed.get(nodeId) == null) + missed.put(nodeId, new GridConcurrentHashSet<Integer>()); - missed.get(nodeId).add(p); + missed.get(nodeId).add(p); + } + finally { + lock.unlock(); + } } - void onPartitionDone(UUID nodeId, int p, AffinityTopologyVersion topVer) { - if (isDone() || !topVer.equals(assigns.topologyVersion())) - return; + /** + * @param nodeId Node id. + * @param p P. + */ + void onPartitionDone(UUID nodeId, int p) { + lock.lock(); + try { + if (isDone()) + return; - if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED)) - preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED, - assigns.exchangeFuture().discoveryEvent()); + if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED)) + preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED, + assigns.exchangeFuture().discoveryEvent()); - Collection<Integer> parts = remaining.get(nodeId); + Collection<Integer> parts = remaining.get(nodeId); - if (parts != null) { - parts.remove(p); + if (parts != null) { + parts.remove(p); - if (parts.isEmpty()) { - remaining.remove(nodeId); + if (parts.isEmpty()) { + remaining.remove(nodeId); - if (log.isDebugEnabled()) - log.debug("Completed full partition iteration for node [nodeId=" + nodeId + ']'); + U.log(log, ("Completed rebalancing [cache=" + cctx.name() + + ", from node=" + nodeId + ", topology=" + topologyVersion() + + ", time=" + (U.currentTimeMillis() - started.get(nodeId)) + " ms]")); + } } - } - checkIsDone(); + checkIsDone(); + } + finally { + lock.unlock(); + } } + /** + * + */ private void checkIsDone() { if (remaining.isEmpty()) { if (log.isDebugEnabled()) @@ -809,8 +927,6 @@ public class GridDhtPartitionDemander { cctx.shared().exchange().forceDummyExchange(true, assigns.exchangeFuture()); } - missed.clear(); - cctx.shared().exchange().scheduleResendPartitions(); if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED)) @@ -820,7 +936,412 @@ public class GridDhtPartitionDemander { cctx.events().removeListener(lsnr); onDone(completed); + + missed.clear(); + remaining.clear(); + started.clear(); + assigns.clear(); + } + } + } + + /** + * Supply message wrapper. + */ + @Deprecated//Backward compatibility. To be removed in future. + private static class SupplyMessage { + /** Sender ID. */ + private UUID sndId; + + /** Supply message. */ + private GridDhtPartitionSupplyMessage supply; + + /** + * Dummy constructor. + */ + private SupplyMessage() { + // No-op. + } + + /** + * @param sndId Sender ID. + * @param supply Supply message. + */ + SupplyMessage(UUID sndId, GridDhtPartitionSupplyMessage supply) { + this.sndId = sndId; + this.supply = supply; + } + + /** + * @return Sender ID. + */ + UUID senderId() { + return sndId; + } + + /** + * @return Message. + */ + GridDhtPartitionSupplyMessage supply() { + return supply; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(SupplyMessage.class, this); + } + } + + /** DemandWorker index. */ + @Deprecated//Backward compatibility. To be removed in future. + private final AtomicInteger dmIdx = new AtomicInteger(); + + /** + * + */ + @Deprecated//Backward compatibility. To be removed in future. + private class DemandWorker { + /** Worker ID. */ + private int id; + + /** Partition-to-node assignments. */ + private final LinkedBlockingDeque<GridDhtPreloaderAssignments> assignQ = new LinkedBlockingDeque<>(); + + /** Message queue. */ + private final LinkedBlockingDeque<SupplyMessage> msgQ = + new LinkedBlockingDeque<>(); + + /** Counter. */ + private long cntr; + + /** Hide worker logger and use cache logger instead. */ + private IgniteLogger log = GridDhtPartitionDemander.this.log; + + private volatile SyncFuture fut; + + /** + * @param id Worker ID. + */ + private DemandWorker(int id, SyncFuture fut) { + assert id >= 0; + + this.id = id; + this.fut = fut; + } + + /** + * @param msg Message. + */ + private void addMessage(SupplyMessage msg) { + msgQ.offer(msg); + } + + /** + * @param deque Deque to poll from. + * @param time Time to wait. + * @return Polled item. + * @throws InterruptedException If interrupted. + */ + @Nullable private <T> T poll(BlockingQueue<T> deque, long time) throws InterruptedException { + return deque.poll(time, MILLISECONDS); + } + + /** + * @param idx Unique index for this topic. + * @return Topic for partition. + */ + public Object topic(long idx) { + return TOPIC_CACHE.topic(cctx.namexx(), cctx.nodeId(), id, idx); + } + + /** + * @param node Node to demand from. + * @param topVer Topology version. + * @param d Demand message. + * @param exchFut Exchange future. + * @return Missed partitions. + * @throws InterruptedException If interrupted. + * @throws ClusterTopologyCheckedException If node left. + * @throws IgniteCheckedException If failed to send message. + */ + private Set<Integer> demandFromNode( + ClusterNode node, + final AffinityTopologyVersion topVer, + GridDhtPartitionDemandMessage d, + GridDhtPartitionsExchangeFuture exchFut + ) throws InterruptedException, IgniteCheckedException { + GridDhtPartitionTopology top = cctx.dht().topology(); + + cntr++; + + d.topic(topic(cntr)); + d.workerId(id); + + Set<Integer> missed = new HashSet<>(); + + // Get the same collection that will be sent in the message. + Collection<Integer> remaining = d.partitions(); + + if (topologyChanged(topVer)) + return missed; + + cctx.io().addOrderedHandler(d.topic(), new CI2<UUID, GridDhtPartitionSupplyMessage>() { + @Override public void apply(UUID nodeId, GridDhtPartitionSupplyMessage msg) { + addMessage(new SupplyMessage(nodeId, msg)); + } + }); + + try { + boolean retry; + + // DoWhile. + // ======= + do { + retry = false; + + // Create copy. + d = new GridDhtPartitionDemandMessage(d, remaining); + + long timeout = cctx.config().getRebalanceTimeout(); + + d.timeout(timeout); + + if (log.isDebugEnabled()) + log.debug("Sending demand message [node=" + node.id() + ", demand=" + d + ']'); + + // Send demand message. + cctx.io().send(node, d, cctx.ioPolicy()); + + // While. + // ===== + while (!topologyChanged(topVer)) { + SupplyMessage s = poll(msgQ, timeout); + + // If timed out. + if (s == null) { + if (msgQ.isEmpty()) { // Safety check. + U.warn(log, "Timed out waiting for partitions to load, will retry in " + timeout + + " ms (you may need to increase 'networkTimeout' or 'rebalanceBatchSize'" + + " configuration properties)."); + + // Ordered listener was removed if timeout expired. + cctx.io().removeOrderedHandler(d.topic()); + + // Must create copy to be able to work with IO manager thread local caches. + d = new GridDhtPartitionDemandMessage(d, remaining); + + // Create new topic. + d.topic(topic(++cntr)); + + // Create new ordered listener. + cctx.io().addOrderedHandler(d.topic(), + new CI2<UUID, GridDhtPartitionSupplyMessage>() { + @Override public void apply(UUID nodeId, + GridDhtPartitionSupplyMessage msg) { + addMessage(new SupplyMessage(nodeId, msg)); + } + }); + + // Resend message with larger timeout. + retry = true; + + break; // While. + } + else + continue; // While. + } + + // Check that message was received from expected node. + if (!s.senderId().equals(node.id())) { + U.warn(log, "Received supply message from unexpected node [expectedId=" + node.id() + + ", rcvdId=" + s.senderId() + ", msg=" + s + ']'); + + continue; // While. + } + + if (log.isDebugEnabled()) + log.debug("Received supply message: " + s); + + GridDhtPartitionSupplyMessage supply = s.supply(); + + // Check whether there were class loading errors on unmarshal + if (supply.classError() != null) { + if (log.isDebugEnabled()) + log.debug("Class got undeployed during preloading: " + supply.classError()); + + retry = true; + + // Quit preloading. + break; + } + + // Preload. + for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) { + int p = e.getKey(); + + if (cctx.affinity().localNode(p, topVer)) { + GridDhtLocalPartition part = top.localPartition(p, topVer, true); + + assert part != null; + + if (part.state() == MOVING) { + boolean reserved = part.reserve(); + + assert reserved : "Failed to reserve partition [gridName=" + + cctx.gridName() + ", cacheName=" + cctx.namex() + ", part=" + part + ']'; + + part.lock(); + + try { + Collection<Integer> invalidParts = new GridLeanSet<>(); + + // Loop through all received entries and try to preload them. + for (GridCacheEntryInfo entry : e.getValue().infos()) { + if (!invalidParts.contains(p)) { + if (!part.preloadingPermitted(entry.key(), entry.version())) { + if (log.isDebugEnabled()) + log.debug("Preloading is not permitted for entry due to " + + "evictions [key=" + entry.key() + + ", ver=" + entry.version() + ']'); + + continue; + } + + if (!preloadEntry(node, p, entry, topVer)) { + invalidParts.add(p); + + if (log.isDebugEnabled()) + log.debug("Got entries for invalid partition during " + + "preloading (will skip) [p=" + p + ", entry=" + entry + ']'); + } + } + } + + boolean last = supply.last().contains(p); + + // If message was last for this partition, + // then we take ownership. + if (last) { + remaining.remove(p); + fut.onPartitionDone(node.id(), p); + + top.own(part); + + if (log.isDebugEnabled()) + log.debug("Finished rebalancing partition: " + part); + + if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED)) + preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED, + exchFut.discoveryEvent()); + } + } + finally { + part.unlock(); + part.release(); + } + } + else { + remaining.remove(p); + fut.onPartitionDone(node.id(), p); + + if (log.isDebugEnabled()) + log.debug("Skipping rebalancing partition (state is not MOVING): " + part); + } + } + else { + remaining.remove(p); + fut.onPartitionDone(node.id(), p); + + if (log.isDebugEnabled()) + log.debug("Skipping rebalancing partition (it does not belong on current node): " + p); + } + } + + remaining.removeAll(s.supply().missed()); + + // Only request partitions based on latest topology version. + for (Integer miss : s.supply().missed()) { + if (cctx.affinity().localNode(miss, topVer)) + missed.add(miss); + + fut.onMissedPartition(node.id(), miss); + } + + if (remaining.isEmpty()) + break; // While. + + if (s.supply().ack()) { + retry = true; + + break; + } + } + } + while (retry && !topologyChanged(topVer)); + + return missed; + } + finally { + cctx.io().removeOrderedHandler(d.topic()); + } + } + + /** + * @param node Node. + * @param d D. + */ + public void run(ClusterNode node, GridDhtPartitionDemandMessage d) { + demandLock.readLock().lock(); + + try { + GridDhtPartitionsExchangeFuture exchFut = fut.assigns.exchangeFuture(); + + AffinityTopologyVersion topVer = fut.assigns.topologyVersion(); + + Collection<Integer> missed = new HashSet<>(); + + if (topologyChanged(topVer)) { + fut.onCancel(); + + return; + } + + try { + Set<Integer> set = demandFromNode(node, topVer, d, exchFut); + + if (!set.isEmpty()) { + if (log.isDebugEnabled()) + log.debug("Missed partitions from node [nodeId=" + node.id() + ", missed=" + + set + ']'); + + missed.addAll(set); + } + } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Node left during rebalancing (will retry) [node=" + node.id() + + ", msg=" + e.getMessage() + ']'); + + fut.onCancel(); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to receive partitions from node (rebalancing will not " + + "fully finish) [node=" + node.id() + ", msg=" + d + ']', e); + + fut.onCancel(node.id()); + } + catch (InterruptedException e) { + fut.onCancel(); + } } + finally { + demandLock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(DemandWorker.class, this, "assignQ", assignQ, "msgQ", msgQ, "super", super.toString()); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/d89f1b0a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java index 347a394..0686376 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java @@ -25,15 +25,12 @@ import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.jsr166.*; import java.util.*; -import java.util.concurrent.locks.*; -import static org.apache.ignite.internal.GridTopic.*; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*; /** @@ -73,13 +70,14 @@ class GridDhtPartitionSupplier { top = cctx.dht().topology(); - depEnabled = cctx.gridDeploy().enabled(); + depEnabled = cctx.gridDeploy().enabled(); } /** * */ void start() { + startOldListeners(); } /** @@ -463,14 +461,14 @@ class GridDhtPartitionSupplier { int phase, Iterator<Integer> partIt, int part, - Iterator<?> entryIt, GridCacheEntryInfoCollectSwapListener swapLsnr){ + Iterator<?> entryIt, GridCacheEntryInfoCollectSwapListener swapLsnr) { scMap.put(t, new SupplyContext(phase, partIt, entryIt, swapLsnr, part)); } /** * Supply context. */ - private static class SupplyContext{ + private static class SupplyContext { /** Phase. */ private int phase; @@ -502,4 +500,284 @@ class GridDhtPartitionSupplier { this.part = part; } } + + @Deprecated//Backward compatibility. To be removed in future. + public void startOldListeners() { + if (!cctx.kernalContext().clientNode()) { + int poolSize = cctx.rebalanceEnabled() ? cctx.config().getRebalanceThreadPoolSize() : 0; + + cctx.io().addHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class, new CI2<UUID, GridDhtPartitionDemandMessage>() { + @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) { + processOldDemandMessage(m, id); + } + }); + } + } + + /** + * @param d D. + * @param id Id. + */ + @Deprecated//Backward compatibility. To be removed in future. + private void processOldDemandMessage(GridDhtPartitionDemandMessage d, UUID id) { + GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(), + d.updateSequence(), cctx.cacheId()); + + ClusterNode node = cctx.node(id); + + long preloadThrottle = cctx.config().getRebalanceThrottle(); + + boolean ack = false; + + try { + for (int part : d.partitions()) { + GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false); + + if (loc == null || loc.state() != OWNING || !loc.reserve()) { + // Reply with partition of "-1" to let sender know that + // this node is no longer an owner. + s.missed(part); + + if (log.isDebugEnabled()) + log.debug("Requested partition is not owned by local node [part=" + part + + ", demander=" + id + ']'); + + continue; + } + + GridCacheEntryInfoCollectSwapListener swapLsnr = null; + + try { + if (cctx.isSwapOrOffheapEnabled()) { + swapLsnr = new GridCacheEntryInfoCollectSwapListener(log); + + cctx.swap().addOffHeapListener(part, swapLsnr); + cctx.swap().addSwapListener(part, swapLsnr); + } + + boolean partMissing = false; + + for (GridCacheEntryEx e : loc.entries()) { + if (!cctx.affinity().belongs(node, part, d.topologyVersion())) { + // Demander no longer needs this partition, so we send '-1' partition and move on. + s.missed(part); + + if (log.isDebugEnabled()) + log.debug("Demanding node does not need requested partition [part=" + part + + ", nodeId=" + id + ']'); + + partMissing = true; + + break; + } + + if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) { + ack = true; + + if (!replyOld(node, d, s)) + return; + + // Throttle preloading. + if (preloadThrottle > 0) + U.sleep(preloadThrottle); + + s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(), + cctx.cacheId()); + } + + GridCacheEntryInfo info = e.info(); + + if (info != null && !info.isNew()) { + if (preloadPred == null || preloadPred.apply(info)) + s.addEntry(part, info, cctx); + else if (log.isDebugEnabled()) + log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " + + info); + } + } + + if (partMissing) + continue; + + if (cctx.isSwapOrOffheapEnabled()) { + GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter = + cctx.swap().iterator(part); + + // Iterator may be null if space does not exist. + if (iter != null) { + try { + boolean prepared = false; + + for (Map.Entry<byte[], GridCacheSwapEntry> e : iter) { + if (!cctx.affinity().belongs(node, part, d.topologyVersion())) { + // Demander no longer needs this partition, + // so we send '-1' partition and move on. + s.missed(part); + + if (log.isDebugEnabled()) + log.debug("Demanding node does not need requested partition " + + "[part=" + part + ", nodeId=" + id + ']'); + + partMissing = true; + + break; // For. + } + + if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) { + ack = true; + + if (!replyOld(node, d, s)) + return; + + // Throttle preloading. + if (preloadThrottle > 0) + U.sleep(preloadThrottle); + + s = new GridDhtPartitionSupplyMessage(d.workerId(), + d.updateSequence(), cctx.cacheId()); + } + + GridCacheSwapEntry swapEntry = e.getValue(); + + GridCacheEntryInfo info = new GridCacheEntryInfo(); + + info.keyBytes(e.getKey()); + info.ttl(swapEntry.ttl()); + info.expireTime(swapEntry.expireTime()); + info.version(swapEntry.version()); + info.value(swapEntry.value()); + + if (preloadPred == null || preloadPred.apply(info)) + s.addEntry0(part, info, cctx); + else { + if (log.isDebugEnabled()) + log.debug("Rebalance predicate evaluated to false (will not send " + + "cache entry): " + info); + + continue; + } + + // Need to manually prepare cache message. + if (depEnabled && !prepared) { + ClassLoader ldr = swapEntry.keyClassLoaderId() != null ? + cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) : + swapEntry.valueClassLoaderId() != null ? + cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) : + null; + + if (ldr == null) + continue; + + if (ldr instanceof GridDeploymentInfo) { + s.prepare((GridDeploymentInfo)ldr); + + prepared = true; + } + } + } + + if (partMissing) + continue; + } + finally { + iter.close(); + } + } + } + + // Stop receiving promote notifications. + if (swapLsnr != null) { + cctx.swap().removeOffHeapListener(part, swapLsnr); + cctx.swap().removeSwapListener(part, swapLsnr); + } + + if (swapLsnr != null) { + Collection<GridCacheEntryInfo> entries = swapLsnr.entries(); + + swapLsnr = null; + + for (GridCacheEntryInfo info : entries) { + if (!cctx.affinity().belongs(node, part, d.topologyVersion())) { + // Demander no longer needs this partition, + // so we send '-1' partition and move on. + s.missed(part); + + if (log.isDebugEnabled()) + log.debug("Demanding node does not need requested partition " + + "[part=" + part + ", nodeId=" + id + ']'); + + // No need to continue iteration over swap entries. + break; + } + + if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) { + ack = true; + + if (!replyOld(node, d, s)) + return; + + s = new GridDhtPartitionSupplyMessage(d.workerId(), + d.updateSequence(), + cctx.cacheId()); + } + + if (preloadPred == null || preloadPred.apply(info)) + s.addEntry(part, info, cctx); + else if (log.isDebugEnabled()) + log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " + + info); + } + } + + // Mark as last supply message. + s.last(part); + + if (ack) { + s.markAck(); + + break; // Partition for loop. + } + } + finally { + loc.release(); + + if (swapLsnr != null) { + cctx.swap().removeOffHeapListener(part, swapLsnr); + cctx.swap().removeSwapListener(part, swapLsnr); + } + } + } + + replyOld(node, d, s); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send partition supply message to node: " + node.id(), e); + } + } + + /** + * @param n Node. + * @param d Demand message. + * @param s Supply message. + * @return {@code True} if message was sent, {@code false} if recipient left grid. + * @throws IgniteCheckedException If failed. + */ + @Deprecated//Backward compatibility. To be removed in future. + private boolean replyOld(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s) + throws IgniteCheckedException { + try { + if (log.isDebugEnabled()) + log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']'); + + cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout()); + + return true; + } + catch (ClusterTopologyCheckedException ignore) { + if (log.isDebugEnabled()) + log.debug("Failed to send partition supply message because node left grid: " + n.id()); + + return false; + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/d89f1b0a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 585566b..7a9deba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -163,7 +163,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { }); supplier = new GridDhtPartitionSupplier(cctx); - demander = new GridDhtPartitionDemander(cctx); + demander = new GridDhtPartitionDemander(cctx, demandLock); cctx.events().addListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED); } @@ -350,7 +350,13 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { /** {@inheritDoc} */ public void handleSupplyMessage(int idx, UUID id, final GridDhtPartitionSupplyMessageV2 s) { - demander.handleSupplyMessage(idx, id, s); + demandLock.readLock().lock(); + try { + demander.handleSupplyMessage(idx, id, s); + } + finally { + demandLock.readLock().unlock(); + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/d89f1b0a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingAsyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingAsyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingAsyncSelfTest.java deleted file mode 100644 index ca564ed..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingAsyncSelfTest.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed.rebalancing; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.spi.discovery.tcp.*; - -/** - * - */ -public class GridCacheMassiveRebalancingAsyncSelfTest extends GridCacheMassiveRebalancingSyncSelfTest { - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration iCfg = super.getConfiguration(gridName); - - CacheConfiguration cacheCfg = iCfg.getCacheConfiguration()[0]; - - cacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC); - - iCfg.setDiscoverySpi(new FailableTcpDiscoverySpi()); - - ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder); - ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true); - - if (getTestGridName(20).equals(gridName)) - spi =(FailableTcpDiscoverySpi)iCfg.getDiscoverySpi(); - - return iCfg; - } - - public static class FailableTcpDiscoverySpi extends TcpDiscoverySpi{ - public void fail(){ - simulateNodeFailure(); - } - } - - private volatile FailableTcpDiscoverySpi spi; - - /** - * @throws Exception - */ - public void testNodeFailedAtRebalancing() throws Exception { - Ignite ignite = startGrid(0); - - generateData(ignite); - - log.info("Preloading started."); - - startGrid(1); - - IgniteInternalFuture f1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture(); - - f1.get(); - - startGrid(20); - - U.sleep(500); - - spi.fail(); - - U.sleep(500); - - f1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture(); - IgniteInternalFuture f0 = ((GridCacheAdapter)grid(0).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture(); - - f1.get(); - f0.get(); - - stopAllGrids(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/d89f1b0a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java deleted file mode 100644 index f69b710..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java +++ /dev/null @@ -1,392 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed.rebalancing; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.affinity.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.testframework.junits.common.*; - -import java.util.concurrent.atomic.*; - -/** - * - */ -public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractTest { - /** */ - protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - - private static int TEST_SIZE = 1_000_000; - - /** cache name. */ - protected static String CACHE_NAME_DHT = "cache"; - - /** cache 2 name. */ - protected static String CACHE_2_NAME_DHT = "cache2"; - - /** {@inheritDoc} */ - @Override protected long getTestTimeout() { - return Long.MAX_VALUE; - } - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration iCfg = super.getConfiguration(gridName); - - ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder); - ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true); - - if (getTestGridName(10).equals(gridName)) - iCfg.setClientMode(true); - - CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>(); - - cacheCfg.setName(CACHE_NAME_DHT); - cacheCfg.setCacheMode(CacheMode.PARTITIONED); - //cacheCfg.setRebalanceBatchSize(1024); - //cacheCfg.setRebalanceBatchesCount(1); - cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC); - cacheCfg.setBackups(1); - - CacheConfiguration<Integer, Integer> cacheCfg2 = new CacheConfiguration<>(); - - cacheCfg2.setName(CACHE_2_NAME_DHT); - cacheCfg2.setCacheMode(CacheMode.PARTITIONED); - //cacheCfg2.setRebalanceBatchSize(1024); - //cacheCfg2.setRebalanceBatchesCount(1); - cacheCfg2.setRebalanceMode(CacheRebalanceMode.SYNC); - cacheCfg2.setBackups(1); - - iCfg.setRebalanceThreadPoolSize(4); - iCfg.setCacheConfiguration(cacheCfg, cacheCfg2); - return iCfg; - } - - /** - * @param ignite Ignite. - */ - protected void generateData(Ignite ignite) { - try (IgniteDataStreamer<Integer, Integer> stmr = ignite.dataStreamer(CACHE_NAME_DHT)) { - for (int i = 0; i < TEST_SIZE; i++) { - if (i % 1_000_000 == 0) - log.info("Prepared " + i / 1_000_000 + "m entries."); - - stmr.addData(i, i); - } - } - try (IgniteDataStreamer<Integer, Integer> stmr = ignite.dataStreamer(CACHE_2_NAME_DHT)) { - for (int i = 0; i < TEST_SIZE; i++) { - if (i % 1_000_000 == 0) - log.info("Prepared " + i / 1_000_000 + "m entries."); - - stmr.addData(i, i + 3); - } - } - } - - /** - * @param ignite Ignite. - * @throws IgniteCheckedException - */ - protected void checkData(Ignite ignite) throws IgniteCheckedException { - for (int i = 0; i < TEST_SIZE; i++) { - if (i % 1_000_000 == 0) - log.info("Checked " + i / 1_000_000 + "m entries."); - - assert ignite.cache(CACHE_NAME_DHT).get(i) != null && ignite.cache(CACHE_NAME_DHT).get(i).equals(i) : - "keys " + i + " does not match (" + ignite.cache(CACHE_NAME_DHT).get(i) + ")"; - } - for (int i = 0; i < TEST_SIZE; i++) { - if (i % 1_000_000 == 0) - log.info("Checked " + i / 1_000_000 + "m entries."); - - assert ignite.cache(CACHE_2_NAME_DHT).get(i) != null && ignite.cache(CACHE_2_NAME_DHT).get(i).equals(i + 3) : - "keys " + i + " does not match (" + ignite.cache(CACHE_2_NAME_DHT).get(i) + ")"; - } - } - - /** - * @throws Exception - */ - public void testSimpleRebalancing() throws Exception { - Ignite ignite = startGrid(0); - - generateData(ignite); - - log.info("Preloading started."); - - long start = System.currentTimeMillis(); - - startGrid(1); - - IgniteInternalFuture f1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture(); - - f1.get(); - - long spend = (System.currentTimeMillis() - start) / 1000; - - stopGrid(0); - - checkData(grid(1)); - - log.info("Spend " + spend + " seconds to rebalance entries."); - - stopAllGrids(); - } - - /** - * @throws Exception - */ - public void testComplexRebalancing() throws Exception { - Ignite ignite = startGrid(0); - - generateData(ignite); - - log.info("Preloading started."); - - long start = System.currentTimeMillis(); - - //will be started simultaneously in case of ASYNC mode - startGrid(1); - startGrid(2); - startGrid(3); - startGrid(4); - - //wait until cache rebalanced in async mode - - GridCachePreloader p11 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader(); - GridCachePreloader p12 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader(); - GridCachePreloader p13 = ((GridCacheAdapter)grid(3).context().cache().internalCache(CACHE_NAME_DHT)).preloader(); - GridCachePreloader p14 = ((GridCacheAdapter)grid(4).context().cache().internalCache(CACHE_NAME_DHT)).preloader(); - - GridCachePreloader p21 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_2_NAME_DHT)).preloader(); - GridCachePreloader p22 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_2_NAME_DHT)).preloader(); - GridCachePreloader p23 = ((GridCacheAdapter)grid(3).context().cache().internalCache(CACHE_2_NAME_DHT)).preloader(); - GridCachePreloader p24 = ((GridCacheAdapter)grid(4).context().cache().internalCache(CACHE_2_NAME_DHT)).preloader(); - - IgniteInternalFuture f24 = p24.syncFuture(); - f24.get(); - - IgniteInternalFuture f14 = p14.syncFuture(); - f14.get(); - - AffinityTopologyVersion f4Top = ((GridDhtPartitionDemander.SyncFuture)f24).topologyVersion(); - - IgniteInternalFuture f11 = p11.syncFuture(); - IgniteInternalFuture f12 = p12.syncFuture(); - IgniteInternalFuture f13 = p13.syncFuture(); - - while (!((GridDhtPartitionDemander.SyncFuture)f11).topologyVersion().equals(f4Top) || - !((GridDhtPartitionDemander.SyncFuture)f12).topologyVersion().equals(f4Top) || - !((GridDhtPartitionDemander.SyncFuture)f13).topologyVersion().equals(f4Top)) { - U.sleep(100); - - f11 = p11.syncFuture(); - f12 = p12.syncFuture(); - f13 = p13.syncFuture(); - } - f11.get(); - f12.get(); - f13.get(); - - IgniteInternalFuture f21 = p21.syncFuture(); - IgniteInternalFuture f22 = p22.syncFuture(); - IgniteInternalFuture f23 = p23.syncFuture(); - - while (!((GridDhtPartitionDemander.SyncFuture)f21).topologyVersion().equals(f4Top) || - !((GridDhtPartitionDemander.SyncFuture)f22).topologyVersion().equals(f4Top) || - !((GridDhtPartitionDemander.SyncFuture)f23).topologyVersion().equals(f4Top)) { - U.sleep(100); - - f21 = p21.syncFuture(); - f22 = p22.syncFuture(); - f23 = p23.syncFuture(); - } - f21.get(); - f22.get(); - f23.get(); - - //cache rebalanced in async node - - f11 = p11.syncFuture(); - f12 = p12.syncFuture(); - f13 = p13.syncFuture(); - f14 = p14.syncFuture(); - - f21 = p21.syncFuture(); - f22 = p22.syncFuture(); - f23 = p23.syncFuture(); - f24 = p24.syncFuture(); - - stopGrid(0); - - //wait until cache rebalanced - - while (f11 == p11.syncFuture() || f12 == p12.syncFuture() || f13 == p13.syncFuture() || f14 == p14.syncFuture()) - U.sleep(100); - - while (f21 == p21.syncFuture() || f22 == p22.syncFuture() || f23 == p23.syncFuture() || f24 == p24.syncFuture()) - U.sleep(100); - - p11.syncFuture().get(); - p12.syncFuture().get(); - p13.syncFuture().get(); - p14.syncFuture().get(); - - p21.syncFuture().get(); - p22.syncFuture().get(); - p23.syncFuture().get(); - p24.syncFuture().get(); - - //cache rebalanced - - f12 = p12.syncFuture(); - f13 = p13.syncFuture(); - f14 = p14.syncFuture(); - - f22 = p22.syncFuture(); - f23 = p23.syncFuture(); - f24 = p24.syncFuture(); - - stopGrid(1); - - //wait until cache rebalanced - - while (f12 == p12.syncFuture() || f13 == p13.syncFuture() || f14 == p14.syncFuture()) - U.sleep(100); - - while (f22 == p22.syncFuture() || f23 == p23.syncFuture() || f24 == p24.syncFuture()) - U.sleep(100); - - p12.syncFuture().get(); - p13.syncFuture().get(); - p14.syncFuture().get(); - - p22.syncFuture().get(); - p23.syncFuture().get(); - p24.syncFuture().get(); - - //cache rebalanced - - f13 = p13.syncFuture(); - f14 = p14.syncFuture(); - - f23 = p23.syncFuture(); - f24 = p24.syncFuture(); - - stopGrid(2); - - //wait until cache rebalanced - - while (f13 == p13.syncFuture() || f14 == p14.syncFuture()) - U.sleep(100); - - while (f23 == p23.syncFuture() || f24 == p24.syncFuture()) - U.sleep(100); - - p13.syncFuture().get(); - p14.syncFuture().get(); - - p23.syncFuture().get(); - p24.syncFuture().get(); - - //cache rebalanced - - stopGrid(3); - - long spend = (System.currentTimeMillis() - start) / 1000; - - checkData(grid(4)); - - log.info("Spend " + spend + " seconds to rebalance entries."); - - stopAllGrids(); - } - - /** - * @throws Exception - */ - public void _testOpPerSecRebalancingTest() throws Exception { - startGrid(0); - - final AtomicBoolean cancelled = new AtomicBoolean(false); - - generateData(grid(0)); - - startGrid(1); - startGrid(2); - startGrid(10); - - Thread t = new Thread(new Runnable() { - @Override public void run() { - - long spend = 0; - - long ops = 0; - - while (!cancelled.get()) { - try { - long start = System.currentTimeMillis(); - - int size = 1000; - - for (int i = 0; i < size; i++) - grid(10).cachex(CACHE_NAME_DHT).remove(i); - - for (int i = 0; i < size; i++) - grid(10).cachex(CACHE_NAME_DHT).put(i, i); - - spend += System.currentTimeMillis() - start; - - ops += size * 2; - } - catch (IgniteCheckedException e) { - e.printStackTrace(); - } - - log.info("Ops. per ms: " + ops / spend); - } - } - }); - t.start(); - - stopGrid(0); - startGrid(0); - - stopGrid(0); - startGrid(0); - - stopGrid(0); - startGrid(0); - - cancelled.set(true); - t.join(); - - checkData(grid(10)); - - //stopAllGrids(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/d89f1b0a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java new file mode 100644 index 0000000..a17fc7a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.rebalancing; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.spi.discovery.tcp.*; + +/** + * + */ +public class GridCacheRebalancingAsyncSelfTest extends GridCacheRebalancingSyncSelfTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration iCfg = super.getConfiguration(gridName); + + CacheConfiguration cacheCfg = iCfg.getCacheConfiguration()[0]; + + cacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC); + + cacheCfg = iCfg.getCacheConfiguration()[1]; + + cacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC); + + iCfg.setDiscoverySpi(new FailableTcpDiscoverySpi()); + + ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder); + ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true); + + if (getTestGridName(20).equals(gridName)) + spi = (FailableTcpDiscoverySpi)iCfg.getDiscoverySpi(); + + return iCfg; + } + + public static class FailableTcpDiscoverySpi extends TcpDiscoverySpi { + public void fail() { + simulateNodeFailure(); + } + } + + private volatile FailableTcpDiscoverySpi spi; + + /** + * @throws Exception + */ + public void testNodeFailedAtRebalancing() throws Exception { + Ignite ignite = startGrid(0); + + generateData(ignite); + + log.info("Preloading started."); + + startGrid(1); + + waitForRebalancing(1, 2); + + startGrid(20); + + waitForRebalancing(20, 3); + + spi.fail(); + + waitForRebalancing(0, 4); + waitForRebalancing(1, 4); + + stopAllGrids(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/d89f1b0a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java new file mode 100644 index 0000000..0cb6c7b --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.rebalancing; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.internal.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; + +/** + * + */ +public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { + /** */ + protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + private static int TEST_SIZE = 1_000_000; + + /** cache name. */ + protected static String CACHE_NAME_DHT = "cache"; + + /** cache 2 name. */ + protected static String CACHE_2_NAME_DHT = "cache2"; + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return Long.MAX_VALUE; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration iCfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder); + ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true); + + if (getTestGridName(10).equals(gridName)) + iCfg.setClientMode(true); + + CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>(); + + cacheCfg.setName(CACHE_NAME_DHT); + cacheCfg.setCacheMode(CacheMode.PARTITIONED); + //cacheCfg.setRebalanceBatchSize(1024); + //cacheCfg.setRebalanceBatchesCount(1); + cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC); + cacheCfg.setBackups(1); + + CacheConfiguration<Integer, Integer> cacheCfg2 = new CacheConfiguration<>(); + + cacheCfg2.setName(CACHE_2_NAME_DHT); + cacheCfg2.setCacheMode(CacheMode.PARTITIONED); + //cacheCfg2.setRebalanceBatchSize(1024); + //cacheCfg2.setRebalanceBatchesCount(1); + cacheCfg2.setRebalanceMode(CacheRebalanceMode.SYNC); + cacheCfg2.setBackups(1); + + iCfg.setRebalanceThreadPoolSize(4); + iCfg.setCacheConfiguration(cacheCfg, cacheCfg2); + return iCfg; + } + + /** + * @param ignite Ignite. + */ + protected void generateData(Ignite ignite) { + try (IgniteDataStreamer<Integer, Integer> stmr = ignite.dataStreamer(CACHE_NAME_DHT)) { + for (int i = 0; i < TEST_SIZE; i++) { + if (i % 1_000_000 == 0) + log.info("Prepared " + i / 1_000_000 + "m entries."); + + stmr.addData(i, i); + } + } + try (IgniteDataStreamer<Integer, Integer> stmr = ignite.dataStreamer(CACHE_2_NAME_DHT)) { + for (int i = 0; i < TEST_SIZE; i++) { + if (i % 1_000_000 == 0) + log.info("Prepared " + i / 1_000_000 + "m entries."); + + stmr.addData(i, i + 3); + } + } + } + + /** + * @param ignite Ignite. + * @throws IgniteCheckedException + */ + protected void checkData(Ignite ignite) throws IgniteCheckedException { + for (int i = 0; i < TEST_SIZE; i++) { + if (i % 1_000_000 == 0) + log.info("Checked " + i / 1_000_000 + "m entries."); + + assert ignite.cache(CACHE_NAME_DHT).get(i) != null && ignite.cache(CACHE_NAME_DHT).get(i).equals(i) : + "key " + i + " does not match (" + ignite.cache(CACHE_NAME_DHT).get(i) + ")"; + } + for (int i = 0; i < TEST_SIZE; i++) { + if (i % 1_000_000 == 0) + log.info("Checked " + i / 1_000_000 + "m entries."); + + assert ignite.cache(CACHE_2_NAME_DHT).get(i) != null && ignite.cache(CACHE_2_NAME_DHT).get(i).equals(i + 3) : + "key " + i + " does not match (" + ignite.cache(CACHE_2_NAME_DHT).get(i) + ")"; + } + } + + /** + * @throws Exception + */ + public void testSimpleRebalancing() throws Exception { + Ignite ignite = startGrid(0); + + generateData(ignite); + + log.info("Preloading started."); + + long start = System.currentTimeMillis(); + + startGrid(1); + + IgniteInternalFuture f1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture(); + + f1.get(); + + long spend = (System.currentTimeMillis() - start) / 1000; + + stopGrid(0); + + checkData(grid(1)); + + log.info("Spend " + spend + " seconds to rebalance entries."); + + stopAllGrids(); + } + + /** + * @param id Id. + * @param top Topology. + */ + protected void waitForRebalancing(int id, int top) throws IgniteCheckedException { + boolean finished = false; + + while (!finished) { + finished = true; + + for (GridCacheAdapter c : grid(id).context().cache().internalCaches()) { + GridDhtPartitionDemander.SyncFuture fut = (GridDhtPartitionDemander.SyncFuture)c.preloader().syncFuture(); + if (fut.topologyVersion().topologyVersion() != top) { + finished = false; + + break; + } + else + fut.get(); + } + } + } + + /** + * @throws Exception + */ + public void testComplexRebalancing() throws Exception { + Ignite ignite = startGrid(0); + + generateData(ignite); + + log.info("Preloading started."); + + long start = System.currentTimeMillis(); + + //will be started simultaneously in case of ASYNC mode + startGrid(1); + startGrid(2); + startGrid(3); + startGrid(4); + + //wait until cache rebalanced in async mode + waitForRebalancing(1, 5); + waitForRebalancing(2, 5); + waitForRebalancing(3, 5); + waitForRebalancing(4, 5); + + //cache rebalanced in async node + + stopGrid(0); + + //wait until cache rebalanced + waitForRebalancing(1, 6); + waitForRebalancing(2, 6); + waitForRebalancing(3, 6); + waitForRebalancing(4, 6); + + //cache rebalanced + + stopGrid(1); + + //wait until cache rebalanced + waitForRebalancing(2, 7); + waitForRebalancing(3, 7); + waitForRebalancing(4, 7); + + //cache rebalanced + + stopGrid(2); + + //wait until cache rebalanced + waitForRebalancing(3, 8); + waitForRebalancing(4, 8); + + //cache rebalanced + + stopGrid(3); + + long spend = (System.currentTimeMillis() - start) / 1000; + + checkData(grid(4)); + + log.info("Spend " + spend + " seconds to rebalance entries."); + + stopAllGrids(); + } + + /** + * @throws Exception + */ + public void testBackwardCompatibility() throws Exception { + Ignite ignite = startGrid(0); + + Map<String, Object> map = new HashMap<>(ignite.cluster().localNode().attributes()); + + map.put(IgniteNodeAttributes.REBALANCING_VERSION, 0); + + ((TcpDiscoveryNode)ignite.cluster().localNode()).setAttributes(map); + + generateData(ignite); + + startGrid(1); + + waitForRebalancing(1, 2); + + stopGrid(0); + + checkData(grid(1)); + + } +} \ No newline at end of file
