IGNITE-8791 Fixed missed update counter in WAL data record for backup 
transaction - Fixes #4264.

Signed-off-by: Alexey Goncharuk <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/13e2a314
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/13e2a314
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/13e2a314

Branch: refs/heads/ignite-8446
Commit: 13e2a314b72d9155ce7f0126651805064e20358c
Parents: 281a400
Author: Pavel Kovalenko <[email protected]>
Authored: Tue Jul 24 17:48:57 2018 +0300
Committer: Alexey Goncharuk <[email protected]>
Committed: Tue Jul 24 17:48:57 2018 +0300

----------------------------------------------------------------------
 .../internal/pagemem/wal/record/DataEntry.java  |  12 ++
 .../pagemem/wal/record/LazyDataEntry.java       |   3 +
 .../GridDistributedTxRemoteAdapter.java         |  44 +++--
 .../wal/serializer/RecordDataV1Serializer.java  |   4 +
 ...PdsAtomicCacheHistoricalRebalancingTest.java |   5 +
 .../IgnitePdsCacheRebalancingAbstractTest.java  | 179 +++++++++++++------
 .../IgnitePdsTxCacheRebalancingTest.java        |   1 -
 .../IgnitePdsTxHistoricalRebalancingTest.java   |  64 +++++++
 modules/indexing/pom.xml                        |   7 +
 .../IgnitePdsWithIndexingCoreTestSuite.java     |   9 +-
 10 files changed, 257 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/13e2a314/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java
index 3511aff..d13a68a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java
@@ -158,6 +158,18 @@ public class DataEntry {
     }
 
     /**
+     * Sets partition update counter to entry.
+     *
+     * @param partCnt Partition update counter.
+     * @return {@code this} for chaining.
+     */
+    public DataEntry partitionCounter(long partCnt) {
+        this.partCnt = partCnt;
+
+        return this;
+    }
+
+    /**
      * @return Expire time.
      */
     public long expireTime() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/13e2a314/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyDataEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyDataEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyDataEntry.java
index 0ad87d7..6b56da5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyDataEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyDataEntry.java
@@ -96,6 +96,9 @@ public class LazyDataEntry extends DataEntry {
                 IgniteCacheObjectProcessor co = 
cctx.kernalContext().cacheObjects();
 
                 key = co.toKeyCacheObject(cacheCtx.cacheObjectContext(), 
keyType, keyBytes);
+
+                if (key.partition() == -1)
+                    key.partition(partId);
             }
 
             return key;

http://git-wip-us.apache.org/repos/asf/ignite/blob/13e2a314/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 5e3111c..1b9b3a8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.stream.Collectors;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.InvalidEnvironmentException;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -63,6 +64,7 @@ import org.apache.ignite.internal.util.lang.GridTuple;
 import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -485,7 +487,8 @@ public abstract class GridDistributedTxRemoteAdapter 
extends IgniteTxAdapter
                     try {
                         Collection<IgniteTxEntry> entries = near() || 
cctx.snapshot().needTxReadLogging() ? allEntries() : writeEntries();
 
-                        List<DataEntry> dataEntries = null;
+                        // Data entry to write to WAL and associated with it 
TxEntry.
+                        List<T2<DataEntry, IgniteTxEntry>> dataEntries = null;
 
                         batchStoreCommit(writeMap().values());
 
@@ -571,17 +574,20 @@ public abstract class GridDistributedTxRemoteAdapter 
extends IgniteTxAdapter
                                                     dataEntries = new 
ArrayList<>(entries.size());
 
                                                 dataEntries.add(
-                                                    new DataEntry(
-                                                        cacheCtx.cacheId(),
-                                                        txEntry.key(),
-                                                        val,
-                                                        op,
-                                                        nearXidVersion(),
-                                                        writeVersion(),
-                                                        0,
-                                                        
txEntry.key().partition(),
-                                                        txEntry.updateCounter()
-                                                    )
+                                                        new T2<>(
+                                                                new DataEntry(
+                                                                        
cacheCtx.cacheId(),
+                                                                        
txEntry.key(),
+                                                                        val,
+                                                                        op,
+                                                                        
nearXidVersion(),
+                                                                        
writeVersion(),
+                                                                        0,
+                                                                        
txEntry.key().partition(),
+                                                                        
txEntry.updateCounter()
+                                                                ),
+                                                                txEntry
+                                                        )
                                                 );
                                             }
 
@@ -630,6 +636,8 @@ public abstract class GridDistributedTxRemoteAdapter 
extends IgniteTxAdapter
                                                         dhtVer,
                                                         
txEntry.updateCounter());
 
+                                                    
txEntry.updateCounter(updRes.updatePartitionCounter());
+
                                                     if (updRes.loggedPointer() 
!= null)
                                                         ptr = 
updRes.loggedPointer();
 
@@ -665,6 +673,8 @@ public abstract class GridDistributedTxRemoteAdapter 
extends IgniteTxAdapter
                                                     dhtVer,
                                                     txEntry.updateCounter());
 
+                                                
txEntry.updateCounter(updRes.updatePartitionCounter());
+
                                                 if (updRes.loggedPointer() != 
null)
                                                     ptr = 
updRes.loggedPointer();
 
@@ -757,8 +767,14 @@ public abstract class GridDistributedTxRemoteAdapter 
extends IgniteTxAdapter
                                 }
                             }
 
-                            if (!near() && !F.isEmpty(dataEntries) && 
cctx.wal() != null)
-                                cctx.wal().log(new DataRecord(dataEntries));
+                            if (!near() && !F.isEmpty(dataEntries) && 
cctx.wal() != null) {
+                                // Set new update counters for data entries 
received from persisted tx entries.
+                                List<DataEntry> entriesWithCounters = 
dataEntries.stream()
+                                        .map(tuple -> 
tuple.get1().partitionCounter(tuple.get2().updateCounter()))
+                                        .collect(Collectors.toList());
+
+                                cctx.wal().log(new 
DataRecord(entriesWithCounters));
+                            }
 
                             if (ptr != null && !cctx.tm().logTxRecords())
                                 cctx.wal().flush(ptr, false);

http://git-wip-us.apache.org/repos/asf/ignite/blob/13e2a314/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
index f433d26..ad06090 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
@@ -1505,6 +1505,10 @@ public class RecordDataV1Serializer implements 
RecordDataSerializer {
             CacheObjectContext coCtx = cacheCtx.cacheObjectContext();
 
             KeyCacheObject key = co.toKeyCacheObject(coCtx, keyType, keyBytes);
+
+            if (key.partition() == -1)
+                key.partition(partId);
+
             CacheObject val = valBytes != null ? co.toCacheObject(coCtx, 
valType, valBytes) : null;
 
             return new DataEntry(

http://git-wip-us.apache.org/repos/asf/ignite/blob/13e2a314/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsAtomicCacheHistoricalRebalancingTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsAtomicCacheHistoricalRebalancingTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsAtomicCacheHistoricalRebalancingTest.java
index f06494b..cce9a40 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsAtomicCacheHistoricalRebalancingTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsAtomicCacheHistoricalRebalancingTest.java
@@ -35,6 +35,11 @@ public class IgnitePdsAtomicCacheHistoricalRebalancingTest 
extends IgnitePdsAtom
     }
 
     /** {@inheritDoc */
+    @Override protected long checkpointFrequency() {
+        return 15 * 1000;
+    }
+
+    /** {@inheritDoc */
     @Override protected void beforeTest() throws Exception {
         // Use rebalance from WAL if possible.
         
System.setProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD, 
"0");

http://git-wip-us.apache.org/repos/asf/ignite/blob/13e2a314/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java
index 347412d..368c609 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java
@@ -24,16 +24,19 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.collect.Lists;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteDataStreamer;
-import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.CacheRebalanceMode;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cache.PartitionLossPolicy;
@@ -137,8 +140,7 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest 
extends GridCommonAb
 
         DataStorageConfiguration dsCfg = new DataStorageConfiguration()
             .setConcurrencyLevel(Runtime.getRuntime().availableProcessors() * 
4)
-            .setPageSize(1024)
-            .setCheckpointFrequency(10 * 1000)
+            .setCheckpointFrequency(checkpointFrequency())
             .setWalMode(WALMode.LOG_ONLY)
             .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
                 .setName("dfltDataRegion")
@@ -167,6 +169,13 @@ public abstract class 
IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb
         return res;
     }
 
+    /**
+     * @return Checkpoint frequency;
+     */
+    protected long checkpointFrequency() {
+        return DataStorageConfiguration.DFLT_CHECKPOINT_FREQ;
+    }
+
     /** {@inheritDoc} */
     @Override protected long getTestTimeout() {
         return 20 * 60 * 1000;
@@ -315,14 +324,17 @@ public abstract class 
IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb
      * @throws Exception If failed.
      */
     public void testTopologyChangesWithConstantLoad() throws Exception {
-        final long timeOut = U.currentTimeMillis() + 10 * 60 * 1000;
+        final long timeOut = U.currentTimeMillis() + 5 * 60 * 1000;
 
         final int entriesCnt = 10_000;
         final int maxNodesCnt = 4;
-        final int topChanges = 50;
+        final int topChanges = 25;
+        final boolean allowRemoves = true;
 
+        final AtomicLong orderCounter = new AtomicLong();
         final AtomicBoolean stop = new AtomicBoolean();
         final AtomicBoolean suspend = new AtomicBoolean();
+        final AtomicBoolean suspended = new AtomicBoolean();
 
         final ConcurrentMap<Integer, TestValue> map = new 
ConcurrentHashMap<>();
 
@@ -333,45 +345,64 @@ public abstract class 
IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb
         IgniteCache<Integer, TestValue> cache = ignite.cache(INDEXED_CACHE);
 
         for (int i = 0; i < entriesCnt; i++) {
-            cache.put(i, new TestValue(i, i));
-            map.put(i, new TestValue(i, i));
+            long order = orderCounter.get();
+
+            cache.put(i, new TestValue(order, i, i));
+            map.put(i, new TestValue(order, i, i));
+
+            orderCounter.incrementAndGet();
         }
 
         final AtomicInteger nodesCnt = new AtomicInteger(4);
 
         IgniteInternalFuture fut = runMultiThreadedAsync(new Callable<Void>() {
+            /**
+             * @param chance Chance of remove operation in percents.
+             * @return {@code true} if it should be remove operation.
+             */
+            private boolean removeOp(int chance) {
+                return ThreadLocalRandom.current().nextInt(100) + 1 <= chance;
+            }
+
             @Override public Void call() throws Exception {
+                Random rnd = ThreadLocalRandom.current();
+
                 while (true) {
                     if (stop.get())
                         return null;
 
                     if (suspend.get()) {
+                        suspended.set(true);
+
                         U.sleep(10);
 
                         continue;
                     }
 
-                    int k = ThreadLocalRandom.current().nextInt(entriesCnt);
-                    int v1 = ThreadLocalRandom.current().nextInt();
-                    int v2 = ThreadLocalRandom.current().nextInt();
+                    int k = rnd.nextInt(entriesCnt);
+                    long order = orderCounter.get();
 
-                    int n = nodesCnt.get();
+                    int v1 = 0, v2 = 0;
+                    boolean remove = false;
 
-                    if (n <= 0)
-                        continue;
+                    if (removeOp(allowRemoves ? 20 : 0))
+                        remove = true;
+                    else {
+                        v1 = rnd.nextInt();
+                        v2 = rnd.nextInt();
+                    }
+
+                    int nodes = nodesCnt.get();
 
                     Ignite ignite;
 
                     try {
-                        ignite = grid(ThreadLocalRandom.current().nextInt(n));
+                        ignite = grid(rnd.nextInt(nodes));
                     }
                     catch (Exception ignored) {
                         continue;
                     }
 
-                    if (ignite == null)
-                        continue;
-
                     Transaction tx = null;
                     boolean success = true;
 
@@ -379,7 +410,12 @@ public abstract class 
IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb
                         tx = ignite.transactions().txStart();
 
                     try {
-                        ignite.cache(INDEXED_CACHE).put(k, new TestValue(v1, 
v2));
+                        IgniteCache<Object, Object> cache = 
ignite.cache(INDEXED_CACHE);
+
+                        if (remove)
+                            cache.remove(k);
+                        else
+                            cache.put(k, new TestValue(order, v1, v2));
                     }
                     catch (Exception ignored) {
                         success = false;
@@ -395,13 +431,19 @@ public abstract class 
IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb
                         }
                     }
 
-                    if (success)
-                        map.put(k, new TestValue(v1, v2));
+                    if (success) {
+                        map.put(k, new TestValue(order, v1, v2, remove));
+
+                        orderCounter.incrementAndGet();
+                    }
                 }
             }
         }, 1, "load-runner");
 
-        boolean[] changes = new boolean[] {false, false, true, true};
+        // "False" means stop last started node, "True" - start new node.
+        List<Boolean> predefinedChanges = Lists.newArrayList(false, false, 
true, true);
+
+        List<Boolean> topChangesHistory = new ArrayList<>();
 
         try {
             for (int it = 0; it < topChanges; it++) {
@@ -410,32 +452,62 @@ public abstract class 
IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb
 
                 U.sleep(3_000);
 
-                boolean add;
+                boolean addNode;
 
-                if (it < changes.length)
-                    add = changes[it];
+                if (it < predefinedChanges.size())
+                    addNode = predefinedChanges.get(it);
                 else if (nodesCnt.get() <= maxNodesCnt / 2)
-                    add = true;
+                    addNode = true;
                 else if (nodesCnt.get() >= maxNodesCnt)
-                    add = false;
+                    addNode = false;
                 else // More chance that node will be added
-                    add = ThreadLocalRandom.current().nextInt(3) <= 1;
+                    addNode = ThreadLocalRandom.current().nextInt(3) <= 1;
 
-                if (add)
+                if (addNode)
                     startGrid(nodesCnt.getAndIncrement());
                 else
                     stopGrid(nodesCnt.decrementAndGet());
 
+                topChangesHistory.add(addNode);
+
                 awaitPartitionMapExchange();
 
+                if (fut.error() != null)
+                    break;
+
+                // Suspend loader and wait for last operation completion.
                 suspend.set(true);
+                GridTestUtils.waitForCondition(suspended::get, 5_000);
+
+                // Fix last successful cache operation to skip operations that 
can be performed during check.
+                long maxOrder = orderCounter.get();
 
-                U.sleep(200);
+                for (Map.Entry<Integer, TestValue> entry : map.entrySet()) {
+                    final String assertMsg = "Iteration: " + it + ". Changes: 
" + Objects.toString(topChangesHistory)
+                            + ". Key: " + Integer.toString(entry.getKey());
 
-                for (Map.Entry<Integer, TestValue> entry : map.entrySet())
-                    assertEquals(it + " " + Integer.toString(entry.getKey()), 
entry.getValue(), cache.get(entry.getKey()));
+                    TestValue expected = entry.getValue();
 
+                    if (expected.order < maxOrder)
+                        continue;
+
+                    TestValue actual = cache.get(entry.getKey());
+
+                    if (expected.removed) {
+                        assertNull(assertMsg + " should be removed.", actual);
+
+                        continue;
+                    }
+
+                    if (entry.getValue().order < maxOrder)
+                        continue;
+
+                    assertEquals(assertMsg, expected, actual);
+                }
+
+                // Resume progress for loader.
                 suspend.set(false);
+                suspended.set(false);
             }
         }
         finally {
@@ -443,11 +515,6 @@ public abstract class 
IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb
         }
 
         fut.get();
-
-        awaitPartitionMapExchange();
-
-        for (Map.Entry<Integer, TestValue> entry : map.entrySet())
-            assertEquals(Integer.toString(entry.getKey()), entry.getValue(), 
cache.get(entry.getKey()));
     }
 
     /**
@@ -596,46 +663,52 @@ public abstract class 
IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb
      *
      */
     private static class TestValue implements Serializable {
+        /** Operation order. */
+        private final long order;
+
         /** V 1. */
         private final int v1;
+
         /** V 2. */
         private final int v2;
 
-        /**
-         * @param v1 V 1.
-         * @param v2 V 2.
-         */
-        private TestValue(int v1, int v2) {
+        /** Flag indicates that value has removed. */
+        private final boolean removed;
+
+        private TestValue(long order, int v1, int v2) {
+            this(order, v1, v2, false);
+        }
+
+        private TestValue(long order, int v1, int v2, boolean removed) {
+            this.order = order;
             this.v1 = v1;
             this.v2 = v2;
+            this.removed = removed;
         }
 
         /** {@inheritDoc} */
         @Override public boolean equals(Object o) {
-            if (this == o)
-                return true;
-            if (o == null || getClass() != o.getClass())
-                return false;
+            if (this == o) return true;
 
-            TestValue val = (TestValue)o;
+            if (o == null || getClass() != o.getClass()) return false;
 
-            return v1 == val.v1 && v2 == val.v2;
+            TestValue testValue = (TestValue) o;
 
+            return order == testValue.order &&
+                v1 == testValue.v1 &&
+                v2 == testValue.v2;
         }
 
         /** {@inheritDoc} */
         @Override public int hashCode() {
-            int res = v1;
-
-            res = 31 * res + v2;
-
-            return res;
+            return Objects.hash(order, v1, v2);
         }
 
         /** {@inheritDoc} */
         @Override public String toString() {
             return "TestValue{" +
-                "v1=" + v1 +
+                "order=" + order +
+                ", v1=" + v1 +
                 ", v2=" + v2 +
                 '}';
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/13e2a314/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTxCacheRebalancingTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTxCacheRebalancingTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTxCacheRebalancingTest.java
index c641ea4..3b324c3 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTxCacheRebalancingTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTxCacheRebalancingTest.java
@@ -36,7 +36,6 @@ public class IgnitePdsTxCacheRebalancingTest extends 
IgnitePdsCacheRebalancingAb
         ccfg.setCacheMode(CacheMode.PARTITIONED);
         ccfg.setRebalanceMode(CacheRebalanceMode.SYNC);
         ccfg.setBackups(1);
-        ccfg.setRebalanceDelay(10_000);
         ccfg.setAffinity(new RendezvousAffinityFunction(false, 32));
         
ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/13e2a314/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTxHistoricalRebalancingTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTxHistoricalRebalancingTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTxHistoricalRebalancingTest.java
new file mode 100644
index 0000000..8236bd3
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTxHistoricalRebalancingTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence;
+
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRebalanceTest;
+
+/**
+ *
+ */
+public class IgnitePdsTxHistoricalRebalancingTest extends 
IgnitePdsTxCacheRebalancingTest {
+    /** {@inheritDoc */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setCommunicationSpi(new 
IgniteWalRebalanceTest.WalRebalanceCheckingCommunicationSpi());
+
+        return cfg;
+    }
+
+    /** {@inheritDoc */
+    @Override protected long checkpointFrequency() {
+        return 15 * 1000;
+    }
+
+    /** {@inheritDoc */
+    @Override protected void beforeTest() throws Exception {
+        // Use rebalance from WAL if possible.
+        
System.setProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD, 
"0");
+
+        super.beforeTest();
+    }
+
+    /** {@inheritDoc */
+    @Override protected void afterTest() throws Exception {
+        boolean walRebalanceInvoked = 
!IgniteWalRebalanceTest.WalRebalanceCheckingCommunicationSpi.allRebalances()
+            .isEmpty();
+
+        IgniteWalRebalanceTest.WalRebalanceCheckingCommunicationSpi.cleanup();
+
+        
System.clearProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD);
+
+        super.afterTest();
+
+        if (!walRebalanceInvoked)
+            throw new AssertionError("WAL rebalance hasn't been invoked.");
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/13e2a314/modules/indexing/pom.xml
----------------------------------------------------------------------
diff --git a/modules/indexing/pom.xml b/modules/indexing/pom.xml
index b6b7089..19b481f 100644
--- a/modules/indexing/pom.xml
+++ b/modules/indexing/pom.xml
@@ -119,6 +119,13 @@
             <version>${spring.version}</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>${guava.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/ignite/blob/13e2a314/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
index 2d967cd..491bab7 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
@@ -24,6 +24,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.IgnitePdsBinarySo
 import 
org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCorruptedIndexTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.IgnitePdsMarshallerMappingRestoreOnNodeStartTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.IgnitePdsTxCacheRebalancingTest;
+import 
org.apache.ignite.internal.processors.cache.persistence.IgnitePdsTxHistoricalRebalancingTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.IgnitePersistentStoreCacheGroupsTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.PersistenceDirectoryWarningLoggingTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsMultiNodePutGetRestartTest;
@@ -62,11 +63,13 @@ public class IgnitePdsWithIndexingCoreTestSuite extends 
TestSuite {
         suite.addTestSuite(IgniteWalRecoveryTest.class);
         suite.addTestSuite(IgniteWalRecoveryWithCompactionTest.class);
         suite.addTestSuite(IgnitePdsNoActualWalHistoryTest.class);
-        suite.addTestSuite(IgnitePdsAtomicCacheRebalancingTest.class);
-        suite.addTestSuite(IgnitePdsTxCacheRebalancingTest.class);
+        suite.addTestSuite(IgniteWalRebalanceTest.class);
 
+        suite.addTestSuite(IgnitePdsAtomicCacheRebalancingTest.class);
         
suite.addTestSuite(IgnitePdsAtomicCacheHistoricalRebalancingTest.class);
-        suite.addTestSuite(IgniteWalRebalanceTest.class);
+
+        suite.addTestSuite(IgnitePdsTxCacheRebalancingTest.class);
+        suite.addTestSuite(IgnitePdsTxHistoricalRebalancingTest.class);
 
         suite.addTestSuite(IgniteWalRecoveryPPCTest.class);
         suite.addTestSuite(IgnitePdsDiskErrorsRecoveringTest.class);

Reply via email to