IGNITE-8791 Fixed missed update counter in WAL data record for backup transaction - Fixes #4264.
Signed-off-by: Alexey Goncharuk <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/13e2a314 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/13e2a314 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/13e2a314 Branch: refs/heads/ignite-8446 Commit: 13e2a314b72d9155ce7f0126651805064e20358c Parents: 281a400 Author: Pavel Kovalenko <[email protected]> Authored: Tue Jul 24 17:48:57 2018 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Tue Jul 24 17:48:57 2018 +0300 ---------------------------------------------------------------------- .../internal/pagemem/wal/record/DataEntry.java | 12 ++ .../pagemem/wal/record/LazyDataEntry.java | 3 + .../GridDistributedTxRemoteAdapter.java | 44 +++-- .../wal/serializer/RecordDataV1Serializer.java | 4 + ...PdsAtomicCacheHistoricalRebalancingTest.java | 5 + .../IgnitePdsCacheRebalancingAbstractTest.java | 179 +++++++++++++------ .../IgnitePdsTxCacheRebalancingTest.java | 1 - .../IgnitePdsTxHistoricalRebalancingTest.java | 64 +++++++ modules/indexing/pom.xml | 7 + .../IgnitePdsWithIndexingCoreTestSuite.java | 9 +- 10 files changed, 257 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/13e2a314/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java index 3511aff..d13a68a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java @@ -158,6 +158,18 @@ public class DataEntry { } /** + * Sets partition update counter to entry. + * + * @param partCnt Partition update counter. + * @return {@code this} for chaining. + */ + public DataEntry partitionCounter(long partCnt) { + this.partCnt = partCnt; + + return this; + } + + /** * @return Expire time. */ public long expireTime() { http://git-wip-us.apache.org/repos/asf/ignite/blob/13e2a314/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyDataEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyDataEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyDataEntry.java index 0ad87d7..6b56da5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyDataEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyDataEntry.java @@ -96,6 +96,9 @@ public class LazyDataEntry extends DataEntry { IgniteCacheObjectProcessor co = cctx.kernalContext().cacheObjects(); key = co.toKeyCacheObject(cacheCtx.cacheObjectContext(), keyType, keyBytes); + + if (key.partition() == -1) + key.partition(partId); } return key; http://git-wip-us.apache.org/repos/asf/ignite/blob/13e2a314/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index 5e3111c..1b9b3a8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.stream.Collectors; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.InvalidEnvironmentException; import org.apache.ignite.internal.IgniteInternalFuture; @@ -63,6 +64,7 @@ import org.apache.ignite.internal.util.lang.GridTuple; import org.apache.ignite.internal.util.tostring.GridToStringBuilder; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; @@ -485,7 +487,8 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter try { Collection<IgniteTxEntry> entries = near() || cctx.snapshot().needTxReadLogging() ? allEntries() : writeEntries(); - List<DataEntry> dataEntries = null; + // Data entry to write to WAL and associated with it TxEntry. + List<T2<DataEntry, IgniteTxEntry>> dataEntries = null; batchStoreCommit(writeMap().values()); @@ -571,17 +574,20 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter dataEntries = new ArrayList<>(entries.size()); dataEntries.add( - new DataEntry( - cacheCtx.cacheId(), - txEntry.key(), - val, - op, - nearXidVersion(), - writeVersion(), - 0, - txEntry.key().partition(), - txEntry.updateCounter() - ) + new T2<>( + new DataEntry( + cacheCtx.cacheId(), + txEntry.key(), + val, + op, + nearXidVersion(), + writeVersion(), + 0, + txEntry.key().partition(), + txEntry.updateCounter() + ), + txEntry + ) ); } @@ -630,6 +636,8 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter dhtVer, txEntry.updateCounter()); + txEntry.updateCounter(updRes.updatePartitionCounter()); + if (updRes.loggedPointer() != null) ptr = updRes.loggedPointer(); @@ -665,6 +673,8 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter dhtVer, txEntry.updateCounter()); + txEntry.updateCounter(updRes.updatePartitionCounter()); + if (updRes.loggedPointer() != null) ptr = updRes.loggedPointer(); @@ -757,8 +767,14 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter } } - if (!near() && !F.isEmpty(dataEntries) && cctx.wal() != null) - cctx.wal().log(new DataRecord(dataEntries)); + if (!near() && !F.isEmpty(dataEntries) && cctx.wal() != null) { + // Set new update counters for data entries received from persisted tx entries. + List<DataEntry> entriesWithCounters = dataEntries.stream() + .map(tuple -> tuple.get1().partitionCounter(tuple.get2().updateCounter())) + .collect(Collectors.toList()); + + cctx.wal().log(new DataRecord(entriesWithCounters)); + } if (ptr != null && !cctx.tm().logTxRecords()) cctx.wal().flush(ptr, false); http://git-wip-us.apache.org/repos/asf/ignite/blob/13e2a314/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java index f433d26..ad06090 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java @@ -1505,6 +1505,10 @@ public class RecordDataV1Serializer implements RecordDataSerializer { CacheObjectContext coCtx = cacheCtx.cacheObjectContext(); KeyCacheObject key = co.toKeyCacheObject(coCtx, keyType, keyBytes); + + if (key.partition() == -1) + key.partition(partId); + CacheObject val = valBytes != null ? co.toCacheObject(coCtx, valType, valBytes) : null; return new DataEntry( http://git-wip-us.apache.org/repos/asf/ignite/blob/13e2a314/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsAtomicCacheHistoricalRebalancingTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsAtomicCacheHistoricalRebalancingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsAtomicCacheHistoricalRebalancingTest.java index f06494b..cce9a40 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsAtomicCacheHistoricalRebalancingTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsAtomicCacheHistoricalRebalancingTest.java @@ -35,6 +35,11 @@ public class IgnitePdsAtomicCacheHistoricalRebalancingTest extends IgnitePdsAtom } /** {@inheritDoc */ + @Override protected long checkpointFrequency() { + return 15 * 1000; + } + + /** {@inheritDoc */ @Override protected void beforeTest() throws Exception { // Use rebalance from WAL if possible. System.setProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD, "0"); http://git-wip-us.apache.org/repos/asf/ignite/blob/13e2a314/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java index 347412d..368c609 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java @@ -24,16 +24,19 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import com.google.common.collect.Lists; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteDataStreamer; -import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.CacheRebalanceMode; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cache.PartitionLossPolicy; @@ -137,8 +140,7 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb DataStorageConfiguration dsCfg = new DataStorageConfiguration() .setConcurrencyLevel(Runtime.getRuntime().availableProcessors() * 4) - .setPageSize(1024) - .setCheckpointFrequency(10 * 1000) + .setCheckpointFrequency(checkpointFrequency()) .setWalMode(WALMode.LOG_ONLY) .setDefaultDataRegionConfiguration(new DataRegionConfiguration() .setName("dfltDataRegion") @@ -167,6 +169,13 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb return res; } + /** + * @return Checkpoint frequency; + */ + protected long checkpointFrequency() { + return DataStorageConfiguration.DFLT_CHECKPOINT_FREQ; + } + /** {@inheritDoc} */ @Override protected long getTestTimeout() { return 20 * 60 * 1000; @@ -315,14 +324,17 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb * @throws Exception If failed. */ public void testTopologyChangesWithConstantLoad() throws Exception { - final long timeOut = U.currentTimeMillis() + 10 * 60 * 1000; + final long timeOut = U.currentTimeMillis() + 5 * 60 * 1000; final int entriesCnt = 10_000; final int maxNodesCnt = 4; - final int topChanges = 50; + final int topChanges = 25; + final boolean allowRemoves = true; + final AtomicLong orderCounter = new AtomicLong(); final AtomicBoolean stop = new AtomicBoolean(); final AtomicBoolean suspend = new AtomicBoolean(); + final AtomicBoolean suspended = new AtomicBoolean(); final ConcurrentMap<Integer, TestValue> map = new ConcurrentHashMap<>(); @@ -333,45 +345,64 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb IgniteCache<Integer, TestValue> cache = ignite.cache(INDEXED_CACHE); for (int i = 0; i < entriesCnt; i++) { - cache.put(i, new TestValue(i, i)); - map.put(i, new TestValue(i, i)); + long order = orderCounter.get(); + + cache.put(i, new TestValue(order, i, i)); + map.put(i, new TestValue(order, i, i)); + + orderCounter.incrementAndGet(); } final AtomicInteger nodesCnt = new AtomicInteger(4); IgniteInternalFuture fut = runMultiThreadedAsync(new Callable<Void>() { + /** + * @param chance Chance of remove operation in percents. + * @return {@code true} if it should be remove operation. + */ + private boolean removeOp(int chance) { + return ThreadLocalRandom.current().nextInt(100) + 1 <= chance; + } + @Override public Void call() throws Exception { + Random rnd = ThreadLocalRandom.current(); + while (true) { if (stop.get()) return null; if (suspend.get()) { + suspended.set(true); + U.sleep(10); continue; } - int k = ThreadLocalRandom.current().nextInt(entriesCnt); - int v1 = ThreadLocalRandom.current().nextInt(); - int v2 = ThreadLocalRandom.current().nextInt(); + int k = rnd.nextInt(entriesCnt); + long order = orderCounter.get(); - int n = nodesCnt.get(); + int v1 = 0, v2 = 0; + boolean remove = false; - if (n <= 0) - continue; + if (removeOp(allowRemoves ? 20 : 0)) + remove = true; + else { + v1 = rnd.nextInt(); + v2 = rnd.nextInt(); + } + + int nodes = nodesCnt.get(); Ignite ignite; try { - ignite = grid(ThreadLocalRandom.current().nextInt(n)); + ignite = grid(rnd.nextInt(nodes)); } catch (Exception ignored) { continue; } - if (ignite == null) - continue; - Transaction tx = null; boolean success = true; @@ -379,7 +410,12 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb tx = ignite.transactions().txStart(); try { - ignite.cache(INDEXED_CACHE).put(k, new TestValue(v1, v2)); + IgniteCache<Object, Object> cache = ignite.cache(INDEXED_CACHE); + + if (remove) + cache.remove(k); + else + cache.put(k, new TestValue(order, v1, v2)); } catch (Exception ignored) { success = false; @@ -395,13 +431,19 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb } } - if (success) - map.put(k, new TestValue(v1, v2)); + if (success) { + map.put(k, new TestValue(order, v1, v2, remove)); + + orderCounter.incrementAndGet(); + } } } }, 1, "load-runner"); - boolean[] changes = new boolean[] {false, false, true, true}; + // "False" means stop last started node, "True" - start new node. + List<Boolean> predefinedChanges = Lists.newArrayList(false, false, true, true); + + List<Boolean> topChangesHistory = new ArrayList<>(); try { for (int it = 0; it < topChanges; it++) { @@ -410,32 +452,62 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb U.sleep(3_000); - boolean add; + boolean addNode; - if (it < changes.length) - add = changes[it]; + if (it < predefinedChanges.size()) + addNode = predefinedChanges.get(it); else if (nodesCnt.get() <= maxNodesCnt / 2) - add = true; + addNode = true; else if (nodesCnt.get() >= maxNodesCnt) - add = false; + addNode = false; else // More chance that node will be added - add = ThreadLocalRandom.current().nextInt(3) <= 1; + addNode = ThreadLocalRandom.current().nextInt(3) <= 1; - if (add) + if (addNode) startGrid(nodesCnt.getAndIncrement()); else stopGrid(nodesCnt.decrementAndGet()); + topChangesHistory.add(addNode); + awaitPartitionMapExchange(); + if (fut.error() != null) + break; + + // Suspend loader and wait for last operation completion. suspend.set(true); + GridTestUtils.waitForCondition(suspended::get, 5_000); + + // Fix last successful cache operation to skip operations that can be performed during check. + long maxOrder = orderCounter.get(); - U.sleep(200); + for (Map.Entry<Integer, TestValue> entry : map.entrySet()) { + final String assertMsg = "Iteration: " + it + ". Changes: " + Objects.toString(topChangesHistory) + + ". Key: " + Integer.toString(entry.getKey()); - for (Map.Entry<Integer, TestValue> entry : map.entrySet()) - assertEquals(it + " " + Integer.toString(entry.getKey()), entry.getValue(), cache.get(entry.getKey())); + TestValue expected = entry.getValue(); + if (expected.order < maxOrder) + continue; + + TestValue actual = cache.get(entry.getKey()); + + if (expected.removed) { + assertNull(assertMsg + " should be removed.", actual); + + continue; + } + + if (entry.getValue().order < maxOrder) + continue; + + assertEquals(assertMsg, expected, actual); + } + + // Resume progress for loader. suspend.set(false); + suspended.set(false); } } finally { @@ -443,11 +515,6 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb } fut.get(); - - awaitPartitionMapExchange(); - - for (Map.Entry<Integer, TestValue> entry : map.entrySet()) - assertEquals(Integer.toString(entry.getKey()), entry.getValue(), cache.get(entry.getKey())); } /** @@ -596,46 +663,52 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb * */ private static class TestValue implements Serializable { + /** Operation order. */ + private final long order; + /** V 1. */ private final int v1; + /** V 2. */ private final int v2; - /** - * @param v1 V 1. - * @param v2 V 2. - */ - private TestValue(int v1, int v2) { + /** Flag indicates that value has removed. */ + private final boolean removed; + + private TestValue(long order, int v1, int v2) { + this(order, v1, v2, false); + } + + private TestValue(long order, int v1, int v2, boolean removed) { + this.order = order; this.v1 = v1; this.v2 = v2; + this.removed = removed; } /** {@inheritDoc} */ @Override public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; + if (this == o) return true; - TestValue val = (TestValue)o; + if (o == null || getClass() != o.getClass()) return false; - return v1 == val.v1 && v2 == val.v2; + TestValue testValue = (TestValue) o; + return order == testValue.order && + v1 == testValue.v1 && + v2 == testValue.v2; } /** {@inheritDoc} */ @Override public int hashCode() { - int res = v1; - - res = 31 * res + v2; - - return res; + return Objects.hash(order, v1, v2); } /** {@inheritDoc} */ @Override public String toString() { return "TestValue{" + - "v1=" + v1 + + "order=" + order + + ", v1=" + v1 + ", v2=" + v2 + '}'; } http://git-wip-us.apache.org/repos/asf/ignite/blob/13e2a314/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTxCacheRebalancingTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTxCacheRebalancingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTxCacheRebalancingTest.java index c641ea4..3b324c3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTxCacheRebalancingTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTxCacheRebalancingTest.java @@ -36,7 +36,6 @@ public class IgnitePdsTxCacheRebalancingTest extends IgnitePdsCacheRebalancingAb ccfg.setCacheMode(CacheMode.PARTITIONED); ccfg.setRebalanceMode(CacheRebalanceMode.SYNC); ccfg.setBackups(1); - ccfg.setRebalanceDelay(10_000); ccfg.setAffinity(new RendezvousAffinityFunction(false, 32)); ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); http://git-wip-us.apache.org/repos/asf/ignite/blob/13e2a314/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTxHistoricalRebalancingTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTxHistoricalRebalancingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTxHistoricalRebalancingTest.java new file mode 100644 index 0000000..8236bd3 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTxHistoricalRebalancingTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence; + +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRebalanceTest; + +/** + * + */ +public class IgnitePdsTxHistoricalRebalancingTest extends IgnitePdsTxCacheRebalancingTest { + /** {@inheritDoc */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setCommunicationSpi(new IgniteWalRebalanceTest.WalRebalanceCheckingCommunicationSpi()); + + return cfg; + } + + /** {@inheritDoc */ + @Override protected long checkpointFrequency() { + return 15 * 1000; + } + + /** {@inheritDoc */ + @Override protected void beforeTest() throws Exception { + // Use rebalance from WAL if possible. + System.setProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD, "0"); + + super.beforeTest(); + } + + /** {@inheritDoc */ + @Override protected void afterTest() throws Exception { + boolean walRebalanceInvoked = !IgniteWalRebalanceTest.WalRebalanceCheckingCommunicationSpi.allRebalances() + .isEmpty(); + + IgniteWalRebalanceTest.WalRebalanceCheckingCommunicationSpi.cleanup(); + + System.clearProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD); + + super.afterTest(); + + if (!walRebalanceInvoked) + throw new AssertionError("WAL rebalance hasn't been invoked."); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/13e2a314/modules/indexing/pom.xml ---------------------------------------------------------------------- diff --git a/modules/indexing/pom.xml b/modules/indexing/pom.xml index b6b7089..19b481f 100644 --- a/modules/indexing/pom.xml +++ b/modules/indexing/pom.xml @@ -119,6 +119,13 @@ <version>${spring.version}</version> <scope>test</scope> </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${guava.version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/ignite/blob/13e2a314/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java index 2d967cd..491bab7 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java @@ -24,6 +24,7 @@ import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsBinarySo import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCorruptedIndexTest; import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsMarshallerMappingRestoreOnNodeStartTest; import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsTxCacheRebalancingTest; +import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsTxHistoricalRebalancingTest; import org.apache.ignite.internal.processors.cache.persistence.IgnitePersistentStoreCacheGroupsTest; import org.apache.ignite.internal.processors.cache.persistence.PersistenceDirectoryWarningLoggingTest; import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsMultiNodePutGetRestartTest; @@ -62,11 +63,13 @@ public class IgnitePdsWithIndexingCoreTestSuite extends TestSuite { suite.addTestSuite(IgniteWalRecoveryTest.class); suite.addTestSuite(IgniteWalRecoveryWithCompactionTest.class); suite.addTestSuite(IgnitePdsNoActualWalHistoryTest.class); - suite.addTestSuite(IgnitePdsAtomicCacheRebalancingTest.class); - suite.addTestSuite(IgnitePdsTxCacheRebalancingTest.class); + suite.addTestSuite(IgniteWalRebalanceTest.class); + suite.addTestSuite(IgnitePdsAtomicCacheRebalancingTest.class); suite.addTestSuite(IgnitePdsAtomicCacheHistoricalRebalancingTest.class); - suite.addTestSuite(IgniteWalRebalanceTest.class); + + suite.addTestSuite(IgnitePdsTxCacheRebalancingTest.class); + suite.addTestSuite(IgnitePdsTxHistoricalRebalancingTest.class); suite.addTestSuite(IgniteWalRecoveryPPCTest.class); suite.addTestSuite(IgnitePdsDiskErrorsRecoveringTest.class);
