Repository: ignite
Updated Branches:
  refs/heads/ignite-2.5 b80fdbedc -> 5cd32329f


IGNITE-8116 Fixed historical rebalance from WAL


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5cd32329
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5cd32329
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5cd32329

Branch: refs/heads/ignite-2.5
Commit: 5cd32329fe5f303eaf369519771ee22f2a2cf822
Parents: b80fdbe
Author: Pavel Kovalenko <[email protected]>
Authored: Wed Apr 18 19:41:44 2018 +0300
Committer: Alexey Goncharuk <[email protected]>
Committed: Wed Apr 18 19:42:16 2018 +0300

----------------------------------------------------------------------
 .../cache/IgniteCacheOffheapManagerImpl.java    |  28 ++--
 .../dht/preloader/GridDhtPartitionDemander.java | 118 ++++++++-----
 .../dht/preloader/GridDhtPartitionSupplier.java |  54 ++++--
 .../dht/preloader/GridDhtPreloader.java         |   4 +-
 .../IgniteDhtPartitionsToReloadMap.java         |   2 +-
 .../persistence/GridCacheOffheapManager.java    |  82 ++++++++--
 modules/core/src/test/config/log4j-test.xml     |   6 -
 ...PdsAtomicCacheHistoricalRebalancingTest.java |  40 +++++
 .../IgnitePdsCacheRebalancingAbstractTest.java  |  32 ++--
 ...nitePdsTxCacheHistoricalRebalancingTest.java |  39 +++++
 .../db/wal/IgniteWalRebalanceTest.java          | 164 +++++++++++++++++++
 ...lientQueryReplicatedNodeRestartSelfTest.java |   1 -
 .../IgnitePdsWithIndexingCoreTestSuite.java     |   7 +
 13 files changed, 467 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5cd32329/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index f8cc86f..5c78eb5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -861,7 +861,8 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
             }
 
             @Override protected void onClose() throws IgniteCheckedException {
-                assert loc != null && loc.state() == OWNING && 
loc.reservations() > 0;
+                assert loc != null && loc.state() == OWNING && 
loc.reservations() > 0
+                    : "Partition should be in OWNING state and has at least 1 
reservation: " + loc;
 
                 loc.release();
             }
@@ -874,36 +875,37 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
         throws IgniteCheckedException {
 
         final TreeMap<Integer, GridCloseableIterator<CacheDataRow>> iterators 
= new TreeMap<>();
-        Set<Integer> missing = null;
+
+        Set<Integer> missing = new HashSet<>();
 
         for (Integer p : parts.fullSet()) {
             GridCloseableIterator<CacheDataRow> partIter = reservedIterator(p, 
topVer);
 
             if (partIter == null) {
-                if (missing == null)
-                    missing = new HashSet<>();
-
                 missing.add(p);
+
+                continue;
             }
-            else
-                iterators.put(p, partIter);
+
+            iterators.put(p, partIter);
         }
 
-        IgniteRebalanceIterator iter = new 
IgniteRebalanceIteratorImpl(iterators, 
historicalIterator(parts.historicalMap()));
+        IgniteHistoricalIterator historicalIterator = 
historicalIterator(parts.historicalMap(), missing);
 
-        if (missing != null) {
-            for (Integer p : missing)
-                iter.setPartitionMissing(p);
-        }
+        IgniteRebalanceIterator iter = new 
IgniteRebalanceIteratorImpl(iterators, historicalIterator);
+
+        for (Integer p : missing)
+            iter.setPartitionMissing(p);
 
         return iter;
     }
 
     /**
      * @param partCntrs Partition counters map.
+     * @param missing Set of partitions need to populate if partition is 
missing or failed to reserve.
      * @return Historical iterator.
      */
-    @Nullable protected IgniteHistoricalIterator 
historicalIterator(CachePartitionPartialCountersMap partCntrs)
+    @Nullable protected IgniteHistoricalIterator 
historicalIterator(CachePartitionPartialCountersMap partCntrs, Set<Integer> 
missing)
         throws IgniteCheckedException {
         return null;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cd32329/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index dc4bfe9..c94f511 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -459,7 +459,9 @@ public class GridDhtPartitionDemander {
                         + ", topology=" + fut.topologyVersion() + ", 
rebalanceId=" + fut.rebalanceId + "]");
             }
 
-            int stripes = ctx.gridConfig().getRebalanceThreadPoolSize();
+            int totalStripes = ctx.gridConfig().getRebalanceThreadPoolSize();
+
+            int stripes = totalStripes;
 
             final List<IgniteDhtDemandedPartitionsMap> stripePartitions = new 
ArrayList<>(stripes);
             for (int i = 0; i < stripes; i++)
@@ -467,7 +469,7 @@ public class GridDhtPartitionDemander {
 
             // Reserve one stripe for historical partitions.
             if (parts.hasHistorical()) {
-                stripePartitions.add(stripes - 1, new 
IgniteDhtDemandedPartitionsMap(parts.historicalMap(), null));
+                stripePartitions.set(stripes - 1, new 
IgniteDhtDemandedPartitionsMap(parts.historicalMap(), null));
 
                 if (stripes > 1)
                     stripes--;
@@ -478,7 +480,7 @@ public class GridDhtPartitionDemander {
             for (int i = 0; it.hasNext(); i++)
                 stripePartitions.get(i % stripes).addFull(it.next());
 
-            for (int stripe = 0; stripe < stripes; stripe++) {
+            for (int stripe = 0; stripe < totalStripes; stripe++) {
                 if (!stripePartitions.get(stripe).isEmpty()) {
                     // Create copy of demand message with new striped 
partitions map.
                     final GridDhtPartitionDemandMessage demandMsg = 
d.withNewPartitionsMap(stripePartitions.get(stripe));
@@ -489,23 +491,27 @@ public class GridDhtPartitionDemander {
 
                     final int topicId = stripe;
 
-                    Runnable initDemandRequestTask = () -> {
+                    IgniteInternalFuture<?> clearAllFuture = 
clearFullPartitions(fut, demandMsg.partitions().fullSet());
+
+                    // Start rebalancing after clearing full partitions is 
finished.
+                    clearAllFuture.listen(f -> 
ctx.kernalContext().closure().runLocalSafe(() -> {
+                        if (fut.isDone())
+                            return;
+
                         try {
-                            if (!fut.isDone()) {
-                                ctx.io().sendOrderedMessage(node, 
rebalanceTopics.get(topicId),
-                                        
demandMsg.convertIfNeeded(node.version()), grp.ioPolicy(), demandMsg.timeout());
-
-                                // Cleanup required in case partitions 
demanded in parallel with cancellation.
-                                synchronized (fut) {
-                                    if (fut.isDone())
-                                        fut.cleanupRemoteContexts(node.id());
-                                }
+                            ctx.io().sendOrderedMessage(node, 
rebalanceTopics.get(topicId),
+                                demandMsg.convertIfNeeded(node.version()), 
grp.ioPolicy(), demandMsg.timeout());
 
-                                if (log.isDebugEnabled())
-                                    log.debug("Requested rebalancing [from 
node=" + node.id() + ", listener index=" +
-                                            topicId + ", partitions count=" + 
stripePartitions.get(topicId).size() +
-                                            " (" + 
stripePartitions.get(topicId).partitionsList() + ")]");
+                            // Cleanup required in case partitions demanded in 
parallel with cancellation.
+                            synchronized (fut) {
+                                if (fut.isDone())
+                                    fut.cleanupRemoteContexts(node.id());
                             }
+
+                            if (log.isDebugEnabled())
+                                log.debug("Requested rebalancing [from node=" 
+ node.id() + ", listener index=" +
+                                    topicId + " " + demandMsg.rebalanceId() + 
", partitions count=" + stripePartitions.get(topicId).size() +
+                                    " (" + 
stripePartitions.get(topicId).partitionsList() + ")]");
                         }
                         catch (IgniteCheckedException e1) {
                             ClusterTopologyCheckedException cause = 
e1.getCause(ClusterTopologyCheckedException.class);
@@ -522,31 +528,26 @@ public class GridDhtPartitionDemander {
 
                             fut.cancel();
                         }
-                    };
-
-                    awaitClearingAndStartRebalance(fut, demandMsg, 
initDemandRequestTask);
+                    }, true));
                 }
             }
         }
     }
 
     /**
-     * Awaits partitions clearing for full partitions and sends initial demand 
request
-     * after all partitions are cleared and safe to consume data.
+     * Creates future which will be completed when all {@code fullPartitions} 
are cleared.
      *
      * @param fut Rebalance future.
-     * @param demandMessage Initial demand message which contains set of full 
partitions to await.
-     * @param initDemandRequestTask Task which sends initial demand request.
+     * @param fullPartitions Set of full partitions need to be cleared.
+     * @return Future which will be completed when given partitions are 
cleared.
      */
-    private void awaitClearingAndStartRebalance(RebalanceFuture fut,
-                                                GridDhtPartitionDemandMessage 
demandMessage,
-                                                Runnable 
initDemandRequestTask) {
-        Set<Integer> fullPartitions = demandMessage.partitions().fullSet();
+    private IgniteInternalFuture<?> clearFullPartitions(RebalanceFuture fut, 
Set<Integer> fullPartitions) {
+        final GridFutureAdapter clearAllFuture = new GridFutureAdapter();
 
         if (fullPartitions.isEmpty()) {
-            ctx.kernalContext().closure().runLocalSafe(initDemandRequestTask, 
true);
+            clearAllFuture.onDone();
 
-            return;
+            return clearAllFuture;
         }
 
         for (GridCacheContext cctx : grp.caches()) {
@@ -560,16 +561,19 @@ public class GridDhtPartitionDemander {
         final AtomicInteger clearingPartitions = new 
AtomicInteger(fullPartitions.size());
 
         for (int partId : fullPartitions) {
-            if (fut.isDone())
-                return;
+            if (fut.isDone()) {
+                clearAllFuture.onDone();
+
+                return clearAllFuture;
+            }
 
             GridDhtLocalPartition part = grp.topology().localPartition(partId);
 
             if (part != null && part.state() == MOVING) {
                 part.onClearFinished(f -> {
-                    // Cancel rebalance if partition clearing was failed.
-                    if (f.error() != null) {
-                        if (!fut.isDone()) {
+                    if (!fut.isDone()) {
+                        // Cancel rebalance if partition clearing was failed.
+                        if (f.error() != null) {
                             for (GridCacheContext cctx : grp.caches()) {
                                 if (cctx.statisticsEnabled()) {
                                     final CacheMetricsImpl metrics = 
cctx.cache().metrics0();
@@ -581,30 +585,54 @@ public class GridDhtPartitionDemander {
                             log.error("Unable to await partition clearing " + 
part, f.error());
 
                             fut.cancel();
+
+                            clearAllFuture.onDone(f.error());
                         }
-                    }
-                    else {
-                        if (!fut.isDone()) {
-                            int existed = clearingPartitions.decrementAndGet();
+                        else {
+                            int remaining = 
clearingPartitions.decrementAndGet();
 
                             for (GridCacheContext cctx : grp.caches()) {
                                 if (cctx.statisticsEnabled()) {
                                     final CacheMetricsImpl metrics = 
cctx.cache().metrics0();
 
-                                    
metrics.rebalanceClearingPartitions(existed);
+                                    
metrics.rebalanceClearingPartitions(remaining);
                                 }
                             }
 
-                            // If all partitions are cleared send initial 
demand message.
-                            if (existed == 0)
-                                
ctx.kernalContext().closure().runLocalSafe(initDemandRequestTask, true);
+                            if (log.isDebugEnabled())
+                                log.debug("Remaining clearing partitions 
[grp=" + grp.cacheOrGroupName()
+                                    + ", remaining=" + remaining + "]");
+
+                            if (remaining == 0)
+                                clearAllFuture.onDone();
                         }
                     }
+                    else {
+                        clearAllFuture.onDone();
+                    }
                 });
             }
-            else
-                clearingPartitions.decrementAndGet();
+            else {
+                int remaining = clearingPartitions.decrementAndGet();
+
+                for (GridCacheContext cctx : grp.caches()) {
+                    if (cctx.statisticsEnabled()) {
+                        final CacheMetricsImpl metrics = 
cctx.cache().metrics0();
+
+                        metrics.rebalanceClearingPartitions(remaining);
+                    }
+                }
+
+                if (log.isDebugEnabled())
+                    log.debug("Remaining clearing partitions [grp=" + 
grp.cacheOrGroupName()
+                        + ", remaining=" + remaining + "]");
+
+                if (remaining == 0)
+                    clearAllFuture.onDone();
+            }
         }
+
+        return clearAllFuture;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cd32329/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 6d2f526..a3ee305 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -173,7 +173,8 @@ class GridDhtPartitionSupplier {
 
         if (curTop.compareTo(demTop) > 0) {
             if (log.isDebugEnabled())
-                log.debug("Demand request outdated [currentTopVer=" + curTop
+                log.debug("Demand request outdated [grp=" + 
grp.cacheOrGroupName()
+                        + ", currentTopVer=" + curTop
                         + ", demandTopVer=" + demTop
                         + ", from=" + nodeId
                         + ", topicId=" + topicId + "]");
@@ -189,10 +190,19 @@ class GridDhtPartitionSupplier {
 
                 if (sctx != null && sctx.rebalanceId == -d.rebalanceId()) {
                     clearContext(scMap.remove(contextId), log);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Supply context cleaned [grp=" + 
grp.cacheOrGroupName()
+                            + ", from=" + nodeId
+                            + ", demandMsg=" + d
+                            + ", supplyContext=" + sctx + "]");
                 }
                 else {
                     if (log.isDebugEnabled())
-                        log.debug("Stale context cleanup message " + d + ", 
supplyContext=" + sctx);
+                        log.debug("Stale supply context cleanup message [grp=" 
+ grp.cacheOrGroupName()
+                            + ", from=" + nodeId
+                            + ", demandMsg=" + d
+                            + ", supplyContext=" + sctx + "]");
                 }
 
                 return;
@@ -200,13 +210,16 @@ class GridDhtPartitionSupplier {
         }
 
         if (log.isDebugEnabled())
-            log.debug("Demand request accepted [current=" + curTop + ", 
demanded=" + demTop +
-                ", from=" + nodeId + ", topicId=" + topicId + "]");
+            log.debug("Demand request accepted [grp=" + grp.cacheOrGroupName()
+                + ", from=" + nodeId
+                + ", currentVer=" + curTop
+                + ", demandedVer=" + demTop
+                + ", topicId=" + topicId + "]");
 
         ClusterNode node = grp.shared().discovery().node(nodeId);
 
         if (node == null)
-            return; // Context will be cleaned at topology change.
+            return;
 
         try {
             SupplyContext sctx;
@@ -217,13 +230,27 @@ class GridDhtPartitionSupplier {
                 if (sctx != null && d.rebalanceId() < sctx.rebalanceId) {
                     // Stale message, return context back and return.
                     scMap.put(contextId, sctx);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Stale demand message [grp=" + 
grp.cacheOrGroupName()
+                            + ", actualContext=" + sctx
+                            + ", from=" + nodeId
+                            + ", demandMsg=" + d + "]");
+
                     return;
                 }
             }
 
             // Demand request should not contain empty partitions if no supply 
context is associated with it.
-            if (sctx == null && (d.partitions() == null || 
d.partitions().isEmpty()))
+            if (sctx == null && (d.partitions() == null || 
d.partitions().isEmpty())) {
+                if (log.isDebugEnabled())
+                    log.debug("Empty demand message [grp=" + 
grp.cacheOrGroupName()
+                        + ", from=" + nodeId
+                        + ", topicId=" + topicId
+                        + ", demandMsg=" + d + "]");
+
                 return;
+            }
 
             assert !(sctx != null && !d.partitions().isEmpty());
 
@@ -271,7 +298,8 @@ class GridDhtPartitionSupplier {
 
                     GridDhtLocalPartition loc = top.localPartition(part, 
d.topologyVersion(), false);
 
-                    assert loc != null && loc.state() == 
GridDhtPartitionState.OWNING;
+                    assert loc != null && loc.state() == 
GridDhtPartitionState.OWNING
+                        : "Partition should be in OWNING state: " + loc;
 
                     
s.addEstimatedKeysCount(grp.offheap().totalPartitionEntriesCount(part));
                 }
@@ -323,7 +351,8 @@ class GridDhtPartitionSupplier {
 
                 GridDhtLocalPartition loc = top.localPartition(part, 
d.topologyVersion(), false);
 
-                assert (loc != null && loc.state() == OWNING && 
loc.reservations() > 0) || iter.isPartitionMissing(part) : loc;
+                assert (loc != null && loc.state() == OWNING && 
loc.reservations() > 0) || iter.isPartitionMissing(part)
+                    : "Partition should be in OWNING state and has at least 1 
reservation " + loc;
 
                 if (iter.isPartitionMissing(part) && 
remainingParts.contains(part)) {
                     s.missed(part);
@@ -361,9 +390,6 @@ class GridDhtPartitionSupplier {
 
                     remainingParts.remove(part);
                 }
-
-                // Need to manually prepare cache message.
-                // TODO GG-11141.
             }
 
             Iterator<Integer> remainingIter = remainingParts.iterator();
@@ -374,7 +400,8 @@ class GridDhtPartitionSupplier {
                 if (iter.isPartitionDone(p)) {
                     GridDhtLocalPartition loc = top.localPartition(p, 
d.topologyVersion(), false);
 
-                    assert loc != null;
+                    assert loc != null
+                        : "Supply partition is gone: grp=" + 
grp.cacheOrGroupName() + ", p=" + p;
 
                     s.last(p, loc.updateCounter());
 
@@ -387,7 +414,8 @@ class GridDhtPartitionSupplier {
                 }
             }
 
-            assert remainingParts.isEmpty();
+            assert remainingParts.isEmpty()
+                : "Partitions after rebalance should be either done or 
missing: " + remainingParts;
 
             if (sctx != null)
                 clearContext(sctx, log);

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cd32329/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index ddcb81e..700f0cf 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -187,7 +187,7 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
 
         AffinityAssignment aff = grp.affinity().cachedAffinity(topVer);
 
-        CachePartitionFullCountersMap cntrMap = top.fullUpdateCounters();
+        CachePartitionFullCountersMap countersMap = 
grp.topology().fullUpdateCounters();
 
         for (int p = 0; p < partCnt; p++) {
             if (ctx.exchange().hasPendingExchange()) {
@@ -251,7 +251,7 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
                         );
                     }
 
-                    msg.partitions().addHistorical(p, 
cntrMap.initialUpdateCounter(p), cntrMap.updateCounter(p), partCnt);
+                    msg.partitions().addHistorical(p, 
part.initialUpdateCounter(), countersMap.updateCounter(p), partCnt);
                 }
                 else {
                     Collection<ClusterNode> picked = pickOwners(p, topVer);

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cd32329/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java
index 7066e0d..8515004 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java
@@ -90,7 +90,7 @@ public class IgniteDhtPartitionsToReloadMap implements 
Serializable {
     /**
      * @return {@code True} if empty.
      */
-    public boolean isEmpty() {
+    public synchronized boolean isEmpty() {
         return map == null || map.isEmpty();
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cd32329/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 68ec83d..5feaa25 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -755,8 +755,8 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
     }
 
     /** {@inheritDoc} */
-    @Override @Nullable protected IgniteHistoricalIterator historicalIterator(
-        CachePartitionPartialCountersMap partCntrs) throws 
IgniteCheckedException {
+    @Override @Nullable protected WALHistoricalIterator historicalIterator(
+        CachePartitionPartialCountersMap partCntrs, Set<Integer> missing) 
throws IgniteCheckedException {
         if (partCntrs == null || partCntrs.isEmpty())
             return null;
 
@@ -773,13 +773,18 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
             if (startPtr == null)
                 throw new IgniteCheckedException("Could not find start pointer 
for partition [part=" + p + ", partCntrSince=" + initCntr + "]");
 
-            if (minPtr == null || startPtr.compareTo(minPtr) == -1)
+            if (minPtr == null || startPtr.compareTo(minPtr) < 0)
                 minPtr = startPtr;
         }
 
         WALIterator it = grp.shared().wal().replay(minPtr);
 
-        return new WALIteratorAdapter(grp, partCntrs, it);
+        WALHistoricalIterator iterator = new WALHistoricalIterator(grp, 
partCntrs, it);
+
+        // Add historical partitions which are unabled to reserve to missing 
set.
+        missing.addAll(iterator.missingParts);
+
+        return iterator;
     }
 
     /**
@@ -807,7 +812,7 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
     /**
      *
      */
-    private static class WALIteratorAdapter implements 
IgniteHistoricalIterator {
+    private static class WALHistoricalIterator implements 
IgniteHistoricalIterator {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -817,6 +822,9 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
         /** Partition counters map. */
         private final CachePartitionPartialCountersMap partMap;
 
+        /** Partitions marked as missing (unable to reserve or partition is 
not in OWNING state). */
+        private final Set<Integer> missingParts = new HashSet<>();
+
         /** Partitions marked as done. */
         private final Set<Integer> doneParts = new HashSet<>();
 
@@ -830,19 +838,24 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
         private Iterator<DataEntry> entryIt;
 
         /** */
-        private CacheDataRow next;
+        private DataEntry next;
+
+        /** Flag indicates that partition belongs to current {@link #next} is 
finished and no longer needs to rebalance. */
+        private boolean reachedPartitionEnd;
 
         /**
          * @param grp Cache context.
          * @param walIt WAL iterator.
          */
-        private WALIteratorAdapter(CacheGroupContext grp, 
CachePartitionPartialCountersMap partMap, WALIterator walIt) {
+        private WALHistoricalIterator(CacheGroupContext grp, 
CachePartitionPartialCountersMap partMap, WALIterator walIt) {
             this.grp = grp;
             this.partMap = partMap;
             this.walIt = walIt;
 
             cacheIds = grp.cacheIds();
 
+            reservePartitions();
+
             advance();
         }
 
@@ -859,6 +872,7 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
         /** {@inheritDoc} */
         @Override public void close() throws IgniteCheckedException {
             walIt.close();
+            releasePartitions();
         }
 
         /** {@inheritDoc} */
@@ -896,7 +910,13 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
             if (next == null)
                 throw new NoSuchElementException();
 
-            CacheDataRow val = next;
+            CacheDataRow val = new DataEntryRow(next);
+
+            if (reachedPartitionEnd) {
+                doneParts.add(next.partitionId());
+
+                reachedPartitionEnd = false;
+            }
 
             advance();
 
@@ -909,6 +929,46 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
         }
 
         /**
+         * Reserve historical partitions.
+         * If partition is unable to reserve, id of that partition is placed 
to {@link #missingParts} set.
+         */
+        private void reservePartitions() {
+            for (int i = 0; i < partMap.size(); i++) {
+                int p = partMap.partitionAt(i);
+                GridDhtLocalPartition part = grp.topology().localPartition(p);
+
+                if (part == null || !part.reserve()) {
+                    missingParts.add(p);
+                    continue;
+                }
+
+                if (part.state() != OWNING) {
+                    part.release();
+                    missingParts.add(p);
+                }
+            }
+        }
+
+        /**
+         * Release historical partitions.
+         */
+        private void releasePartitions() {
+            for (int i = 0; i < partMap.size(); i++) {
+                int p = partMap.partitionAt(i);
+
+                if (missingParts.contains(p))
+                    continue;
+
+                GridDhtLocalPartition part = grp.topology().localPartition(p);
+
+                assert part != null && part.state() == OWNING && 
part.reservations() > 0
+                    : "Partition should in OWNING state and has at least 1 
reservation";
+
+                part.release();
+            }
+        }
+
+        /**
          *
          */
         private void advance() {
@@ -922,7 +982,7 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
                         if (cacheIds.contains(entry.cacheId())) {
                             int idx = 
partMap.partitionIndex(entry.partitionId());
 
-                            if (idx < 0)
+                            if (idx < 0 || missingParts.contains(idx))
                                 continue;
 
                             long from = partMap.initialUpdateCounterAt(idx);
@@ -930,9 +990,9 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
 
                             if (entry.partitionCounter() >= from && 
entry.partitionCounter() <= to) {
                                 if (entry.partitionCounter() == to)
-                                    doneParts.add(entry.partitionId());
+                                    reachedPartitionEnd = true;
 
-                                next = new DataEntryRow(entry);
+                                next = entry;
 
                                 return;
                             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cd32329/modules/core/src/test/config/log4j-test.xml
----------------------------------------------------------------------
diff --git a/modules/core/src/test/config/log4j-test.xml 
b/modules/core/src/test/config/log4j-test.xml
index 9138c02..b0b08e7 100755
--- a/modules/core/src/test/config/log4j-test.xml
+++ b/modules/core/src/test/config/log4j-test.xml
@@ -79,12 +79,6 @@
         Uncomment to enable Ignite query execution debugging.
     -->
     <!--
-    <category name="org.apache.ignite.cache.query">
-        <level value="DEBUG"/>
-    </category>
-    -->
-
-    <!--
     <category name="org.apache.ignite.internal.processors.query">
         <level value="DEBUG"/>
     </category>

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cd32329/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsAtomicCacheHistoricalRebalancingTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsAtomicCacheHistoricalRebalancingTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsAtomicCacheHistoricalRebalancingTest.java
new file mode 100644
index 0000000..a090381
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsAtomicCacheHistoricalRebalancingTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence;
+
+import org.apache.ignite.IgniteSystemProperties;
+
+/**
+ *
+ */
+public class IgnitePdsAtomicCacheHistoricalRebalancingTest extends 
IgnitePdsAtomicCacheRebalancingTest {
+    /** {@inheritDoc */
+    @Override protected void beforeTest() throws Exception {
+        // Use rebalance from WAL if possible.
+        
System.setProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD, 
"0");
+
+        super.beforeTest();
+    }
+
+    /** {@inheritDoc */
+    @Override protected void afterTest() throws Exception {
+        
System.clearProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD);
+
+        super.afterTest();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cd32329/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java
index 10f9b03..0dd9c78 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java
@@ -78,12 +78,15 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest 
extends GridCommonAb
 
         cfg.setConsistentId(gridName);
 
+        cfg.setRebalanceThreadPoolSize(2);
+
         CacheConfiguration ccfg1 = cacheConfiguration(cacheName)
             .setPartitionLossPolicy(PartitionLossPolicy.READ_WRITE_SAFE)
-            .setBackups(2)
+            .setBackups(1)
             .setRebalanceMode(CacheRebalanceMode.ASYNC)
             .setIndexedTypes(Integer.class, Integer.class)
             .setAffinity(new RendezvousAffinityFunction(false, 32))
+            .setRebalanceBatchesPrefetchCount(2)
             
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
 
         CacheConfiguration ccfg2 = cacheConfiguration("indexed");
@@ -172,8 +175,6 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest 
extends GridCommonAb
         stopAllGrids();
 
         cleanPersistenceDir();
-
-        
System.clearProperty(IgniteSystemProperties.IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE);
     }
 
     /**
@@ -184,7 +185,7 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest 
extends GridCommonAb
     public void testRebalancingOnRestart() throws Exception {
         Ignite ignite0 = startGrid(0);
 
-        ignite0.active(true);
+        ignite0.cluster().active(true);
 
         startGrid(1);
 
@@ -233,8 +234,6 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest 
extends GridCommonAb
      * @throws Exception If fails.
      */
     public void testRebalancingOnRestartAfterCheckpoint() throws Exception {
-        fail("IGNITE-5302");
-
         IgniteEx ignite0 = startGrid(0);
 
         IgniteEx ignite1 = startGrid(1);
@@ -242,7 +241,7 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest 
extends GridCommonAb
         IgniteEx ignite2 = startGrid(2);
         IgniteEx ignite3 = startGrid(3);
 
-        ignite0.active(true);
+        ignite0.cluster().active(true);
 
         ignite0.cache(cacheName).rebalance().get();
         ignite1.cache(cacheName).rebalance().get();
@@ -264,6 +263,8 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest 
extends GridCommonAb
         ignite2.close();
         ignite3.close();
 
+        resetBaselineTopology();
+
         
ignite0.resetLostPartitions(Collections.singletonList(cache1.getName()));
 
         assert cache1.lostPartitions().isEmpty();
@@ -306,7 +307,7 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest 
extends GridCommonAb
         IgniteEx ignite3 = (IgniteEx)G.start(getConfiguration("test3"));
         IgniteEx ignite4 = (IgniteEx)G.start(getConfiguration("test4"));
 
-        ignite1.active(true);
+        ignite1.cluster().active(true);
 
         awaitPartitionMapExchange();
 
@@ -325,7 +326,7 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest 
extends GridCommonAb
         ignite3 = (IgniteEx)G.start(getConfiguration("test3"));
         ignite4 = (IgniteEx)G.start(getConfiguration("test4"));
 
-        ignite1.active(true);
+        ignite1.cluster().active(true);
 
         awaitPartitionMapExchange();
 
@@ -348,13 +349,13 @@ public abstract class 
IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb
      * @throws Exception If fails.
      */
     public void testPartitionLossAndRecover() throws Exception {
-        fail("IGNITE-5302");
-
         Ignite ignite1 = startGrid(0);
         Ignite ignite2 = startGrid(1);
         Ignite ignite3 = startGrid(2);
         Ignite ignite4 = startGrid(3);
 
+        ignite1.cluster().active(true);
+
         awaitPartitionMapExchange();
 
         IgniteCache<String, String> cache1 = ignite1.cache(cacheName);
@@ -537,7 +538,7 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest 
extends GridCommonAb
 
         final Ignite ig = grid(1);
 
-        ig.active(true);
+        ig.cluster().active(true);
 
         awaitPartitionMapExchange();
 
@@ -574,8 +575,6 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest 
extends GridCommonAb
      * @throws Exception If failed
      */
     public void testPartitionCounterConsistencyOnUnstableTopology() throws 
Exception {
-        
System.setProperty(IgniteSystemProperties.IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE,
 "1");
-
         final Ignite ig = startGrids(4);
 
         ig.cluster().active(true);
@@ -595,9 +594,8 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest 
extends GridCommonAb
                 try {
                     stopGrid(3);
 
-                    // Clear checkpoint history to avoid rebalance from WAL.
-                    forceCheckpoint();
                     forceCheckpoint();
+
                     U.sleep(500); // Wait for data load.
 
                     IgniteEx ig0 = startGrid(3);
@@ -609,8 +607,6 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest 
extends GridCommonAb
 
                         awaitPartitionMapExchange();
 
-                        // Clear checkpoint history to avoid rebalance from 
WAL.
-                        forceCheckpoint();
                         forceCheckpoint();
 
                         startGrid(2);

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cd32329/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTxCacheHistoricalRebalancingTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTxCacheHistoricalRebalancingTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTxCacheHistoricalRebalancingTest.java
new file mode 100644
index 0000000..179c8e0
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTxCacheHistoricalRebalancingTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.persistence;
+
+import org.apache.ignite.IgniteSystemProperties;
+
+/**
+ *
+ */
+public class IgnitePdsTxCacheHistoricalRebalancingTest extends 
IgnitePdsTxCacheRebalancingTest {
+    /** {@inheritDoc */
+    @Override protected void beforeTest() throws Exception {
+        // Use rebalance from WAL if possible.
+        
System.setProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD, 
"0");
+
+        super.beforeTest();
+    }
+
+    /** {@inheritDoc */
+    @Override protected void afterTest() throws Exception {
+        
System.clearProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD);
+
+        super.afterTest();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cd32329/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
new file mode 100644
index 0000000..6387dac
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db.wal;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
+
+/**
+ * Historic WAL rebalance base test.
+ */
+public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
+    /** Cache name. */
+    private static final String CACHE_NAME = "cache";
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        System.setProperty(IGNITE_PDS_WAL_REBALANCE_THRESHOLD, "0"); //to make 
all rebalance wal-based
+
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration<Integer, IndexedObject> ccfg = new 
CacheConfiguration<>(CACHE_NAME);
+
+        ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
+
+        ccfg.setRebalanceMode(CacheRebalanceMode.ASYNC);
+
+        ccfg.setCacheMode(CacheMode.REPLICATED);
+
+        ccfg.setAffinity(new RendezvousAffinityFunction(false, 32));
+
+        cfg.setCacheConfiguration(ccfg);
+
+        DataStorageConfiguration dbCfg = new DataStorageConfiguration()
+                    .setWalHistorySize(Integer.MAX_VALUE)
+                    .setWalMode(WALMode.LOG_ONLY)
+                    .setDefaultDataRegionConfiguration(new 
DataRegionConfiguration().setPersistenceEnabled(true));
+
+        cfg.setDataStorageConfiguration(dbCfg);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        System.clearProperty(IGNITE_PDS_WAL_REBALANCE_THRESHOLD);
+
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void test() throws Exception {
+        IgniteEx ig0 = startGrid(0);
+        IgniteEx ig1 = startGrid(1);
+        final int entryCnt = 10_000;
+
+        ig0.cluster().active(true);
+
+        IgniteCache<Object, Object> cache = ig0.cache(CACHE_NAME);
+
+        for (int k = 0; k < entryCnt; k++)
+            cache.put(k, new IndexedObject(k));
+
+        forceCheckpoint();
+
+        stopGrid(1, false);
+
+        for (int k = 0; k < entryCnt; k++)
+            cache.put(k, new IndexedObject(k + 1));
+
+        forceCheckpoint();
+
+        ig1 = startGrid(1);
+
+        IgniteCache<Object, Object> cache1 = ig1.cache(CACHE_NAME);
+
+        cache1.rebalance().get(2, TimeUnit.MINUTES);
+
+        for (int k = 0; k < entryCnt; k++)
+            assertEquals(new IndexedObject(k + 1), cache.get(k));
+    }
+
+    /**
+     *
+     */
+    private static class IndexedObject {
+        /** */
+        @QuerySqlField(index = true)
+        private int iVal;
+
+        /** */
+        private byte[] payload = new byte[1024];
+
+        /**
+         * @param iVal Integer value.
+         */
+        private IndexedObject(int iVal) {
+            this.iVal = iVal;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (!(o instanceof IndexedObject))
+                return false;
+
+            IndexedObject that = (IndexedObject)o;
+
+            return iVal == that.iVal;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return iVal;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(IndexedObject.class, this);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cd32329/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheClientQueryReplicatedNodeRestartSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheClientQueryReplicatedNodeRestartSelfTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheClientQueryReplicatedNodeRestartSelfTest.java
index de7ee5f..996d6a7 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheClientQueryReplicatedNodeRestartSelfTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheClientQueryReplicatedNodeRestartSelfTest.java
@@ -209,7 +209,6 @@ public class 
IgniteCacheClientQueryReplicatedNodeRestartSelfTest extends GridCom
      * @throws Exception If failed.
      */
     public void testRestarts() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-7946";);
         int duration = 90 * 1000;
         int qryThreadNum = 5;
         int restartThreadsNum = 2; // 2 of 4 data nodes

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cd32329/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
index 447b622..943d43f 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
@@ -17,10 +17,12 @@
 package org.apache.ignite.testsuites;
 
 import junit.framework.TestSuite;
+import 
org.apache.ignite.internal.processors.cache.persistence.IgnitePdsAtomicCacheHistoricalRebalancingTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.IgnitePdsAtomicCacheRebalancingTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.IgnitePdsBinaryMetadataOnClusterRestartTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.IgnitePdsBinarySortObjectFieldsTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.IgnitePdsMarshallerMappingRestoreOnNodeStartTest;
+import 
org.apache.ignite.internal.processors.cache.persistence.IgnitePdsTxCacheHistoricalRebalancingTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.IgnitePdsTxCacheRebalancingTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.IgnitePersistentStoreCacheGroupsTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsMultiNodePutGetRestartTest;
@@ -30,6 +32,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePds
 import 
org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsDiskErrorsRecoveringTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsNoActualWalHistoryTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsThreadInterruptionTest;
+import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRebalanceTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRecoveryPPCTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRecoveryTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRecoveryWithCompactionTest;
@@ -60,6 +63,10 @@ public class IgnitePdsWithIndexingCoreTestSuite extends 
TestSuite {
         suite.addTestSuite(IgnitePdsAtomicCacheRebalancingTest.class);
         suite.addTestSuite(IgnitePdsTxCacheRebalancingTest.class);
 
+        
suite.addTestSuite(IgnitePdsAtomicCacheHistoricalRebalancingTest.class);
+        suite.addTestSuite(IgnitePdsTxCacheHistoricalRebalancingTest.class);
+        suite.addTestSuite(IgniteWalRebalanceTest.class);
+
         suite.addTestSuite(IgniteWalRecoveryPPCTest.class);
         suite.addTestSuite(IgnitePdsDiskErrorsRecoveringTest.class);
         suite.addTestSuite(IgnitePdsCacheDestroyDuringCheckpointTest.class);

Reply via email to