Repository: ignite Updated Branches: refs/heads/master d5432c005 -> a232b88fe
IGNITE-9084 Fixed error handling for historical rebalance - Fixes #4437. Signed-off-by: Alexey Goncharuk <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a232b88f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a232b88f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a232b88f Branch: refs/heads/master Commit: a232b88fe823a9823b86a0efd93c259e0cf0f0bc Parents: d5432c0 Author: Pavel Kovalenko <[email protected]> Authored: Thu Sep 6 18:03:50 2018 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Thu Sep 6 18:03:50 2018 +0300 ---------------------------------------------------------------------- .../communication/GridIoMessageFactory.java | 7 + .../processors/cache/GridCacheIoManager.java | 3 +- .../dht/preloader/GridDhtPartitionDemander.java | 14 +- .../dht/preloader/GridDhtPartitionSupplier.java | 69 +++++-- .../GridDhtPartitionSupplyMessage.java | 6 +- .../GridDhtPartitionSupplyMessageV2.java | 153 +++++++++++++++ .../persistence/GridCacheOffheapManager.java | 10 + .../db/wal/IgniteWalRebalanceTest.java | 185 ++++++++++++++++++- 8 files changed, 427 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a232b88f/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 8dddd8b..2970e71 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -54,6 +54,8 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl; import org.apache.ignite.internal.processors.cache.WalStateAckMessage; import org.apache.ignite.internal.processors.cache.binary.MetadataRequestMessage; import org.apache.ignite.internal.processors.cache.binary.MetadataResponseMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.LatchAckMessage; import org.apache.ignite.internal.processors.cache.distributed.GridCacheTtlUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryRequest; import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryResponse; @@ -889,6 +891,11 @@ public class GridIoMessageFactory implements MessageFactory { break; + case 120: + msg = new GridDhtPartitionSupplyMessageV2(); + + break; + case 124: msg = new GridMessageCollection<>(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a232b88f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 0134421..1e25c93 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -914,7 +914,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { break; - case 114: { + case 114: + case 120: { processMessage(nodeId, msg, c);// Will be handled by Rebalance Demander. } http://git-wip-us.apache.org/repos/asf/ignite/blob/a232b88f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 851fcc9..3c1090f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -688,10 +688,20 @@ public class GridDhtPartitionDemander { if (log.isDebugEnabled()) log.debug("Received supply message [grp=" + grp.cacheOrGroupName() + ", msg=" + supply + ']'); - // Check whether there were class loading errors on unmarshal + // Check whether there were error during supply message unmarshalling process. if (supply.classError() != null) { U.warn(log, "Rebalancing from node cancelled [grp=" + grp.cacheOrGroupName() + ", node=" + nodeId + - "]. Class got undeployed during preloading: " + supply.classError()); + "]. Supply message couldn't be unmarshalled: " + supply.classError()); + + fut.cancel(nodeId); + + return; + } + + // Check whether there were error during supplying process. + if (supply.error() != null) { + U.warn(log, "Rebalancing from node cancelled [grp=" + grp.cacheOrGroupName() + ", node=" + nodeId + + "]. Supplier has failed with error: " + supply.error()); fut.cancel(nodeId); http://git-wip-us.apache.org/repos/asf/ignite/blob/a232b88f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java index 524d02d..2090c07 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java @@ -218,9 +218,11 @@ class GridDhtPartitionSupplier { if (node == null) return; - try { - SupplyContext sctx; + IgniteRebalanceIterator iter = null; + + SupplyContext sctx = null; + try { synchronized (scMap) { sctx = scMap.remove(contextId); @@ -229,7 +231,7 @@ class GridDhtPartitionSupplier { scMap.put(contextId, sctx); if (log.isDebugEnabled()) - log.debug("Stale demand message [grp=" + grp.cacheOrGroupName() + log.debug("Stale demand message [cache=" + grp.cacheOrGroupName() + ", actualContext=" + sctx + ", from=" + nodeId + ", demandMsg=" + d + "]"); @@ -241,7 +243,7 @@ class GridDhtPartitionSupplier { // Demand request should not contain empty partitions if no supply context is associated with it. if (sctx == null && (d.partitions() == null || d.partitions().isEmpty())) { if (log.isDebugEnabled()) - log.debug("Empty demand message [grp=" + grp.cacheOrGroupName() + log.debug("Empty demand message [cache=" + grp.cacheOrGroupName() + ", from=" + nodeId + ", topicId=" + topicId + ", demandMsg=" + d + "]"); @@ -272,8 +274,6 @@ class GridDhtPartitionSupplier { d.topologyVersion(), grp.deploymentEnabled()); - IgniteRebalanceIterator iter; - Set<Integer> remainingParts; if (sctx == null || sctx.iterator == null) { @@ -452,13 +452,56 @@ class GridDhtPartitionSupplier { ", topology=" + demTop + ", rebalanceId=" + d.rebalanceId() + ", topicId=" + topicId + "]"); } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send partition supply message to node: " + nodeId, e); - } - catch (IgniteSpiException e) { - if (log.isDebugEnabled()) - log.debug("Failed to send message to node (current node is stopping?) [node=" + node.id() + - ", msg=" + e.getMessage() + ']'); + catch (Throwable t) { + if (grp.shared().kernalContext().isStopping()) + return; + + // Sending supply messages with error requires new protocol. + boolean sendErrMsg = node.version().compareTo(GridDhtPartitionSupplyMessageV2.AVAILABLE_SINCE) >= 0; + + if (t instanceof IgniteSpiException) { + if (log.isDebugEnabled()) + log.debug("Failed to send message to node (current node is stopping?) [node=" + node.id() + + ", msg=" + t.getMessage() + ']'); + + sendErrMsg = false; + } + else + U.error(log, "Failed to continue supplying process for " + + "[cache=" + grp.cacheOrGroupName() + ", node=" + nodeId + + ", topicId=" + contextId.get2() + ", topVer=" + contextId.get3() + "]", t); + + try { + if (sctx != null) + clearContext(sctx, log); + else if (iter != null) + iter.close(); + } + catch (Throwable t1) { + U.error(log, "Failed to cleanup supplying context " + + "[cache=" + grp.cacheOrGroupName() + ", node=" + nodeId + + ", topicId=" + contextId.get2() + ", topVer=" + contextId.get3() + "]", t1); + } + + if (!sendErrMsg) + return; + + try { + GridDhtPartitionSupplyMessageV2 errMsg = new GridDhtPartitionSupplyMessageV2( + d.rebalanceId(), + grp.groupId(), + d.topologyVersion(), + grp.deploymentEnabled(), + t + ); + + reply(node, d, errMsg, contextId); + } + catch (Throwable t1) { + U.error(log, "Failed to send supply error message for " + + "[cache=" + grp.cacheOrGroupName() + ", node=" + nodeId + + ", topicId=" + contextId.get2() + ", topVer=" + contextId.get3() + "]", t1); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/a232b88f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java index 4ecffc4..284700a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java @@ -89,10 +89,12 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage imple * @param topVer Topology version. * @param addDepInfo Deployment info flag. */ - GridDhtPartitionSupplyMessage(long rebalanceId, + GridDhtPartitionSupplyMessage( + long rebalanceId, int grpId, AffinityTopologyVersion topVer, - boolean addDepInfo) { + boolean addDepInfo + ) { this.grpId = grpId; this.rebalanceId = rebalanceId; this.topVer = topVer; http://git-wip-us.apache.org/repos/asf/ignite/blob/a232b88f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java new file mode 100644 index 0000000..a775766 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; + +import java.nio.ByteBuffer; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.GridDirectTransient; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteProductVersion; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.jetbrains.annotations.Nullable; + +/** + * Supply message with supplier error transfer support. + */ +public class GridDhtPartitionSupplyMessageV2 extends GridDhtPartitionSupplyMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** Available since. */ + public static final IgniteProductVersion AVAILABLE_SINCE = IgniteProductVersion.fromString("2.7.0"); + + /** Supplying process error. */ + @GridDirectTransient + private Throwable err; + + /** Supplying process error bytes. */ + private byte[] errBytes; + + /** + * Default constructor. + */ + public GridDhtPartitionSupplyMessageV2() { + } + + /** + * @param rebalanceId Rebalance id. + * @param grpId Group id. + * @param topVer Topology version. + * @param addDepInfo Add dep info. + * @param err Supply process error. + */ + public GridDhtPartitionSupplyMessageV2( + long rebalanceId, + int grpId, + AffinityTopologyVersion topVer, + boolean addDepInfo, + Throwable err + ) { + super(rebalanceId, grpId, topVer, addDepInfo); + + this.err = err; + } + + /** {@inheritDoc} */ + @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { + super.prepareMarshal(ctx); + + if (err != null && errBytes == null) + errBytes = U.marshal(ctx, err); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { + super.finishUnmarshal(ctx, ldr); + + if (errBytes != null && err == null) + err = U.unmarshal(ctx, errBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!super.writeTo(buf, writer)) + return false; + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 12: + if (!writer.writeByteArray("errBytes", errBytes)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + if (!super.readFrom(buf, reader)) + return false; + + switch (reader.state()) { + case 12: + errBytes = reader.readByteArray("errBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(GridDhtPartitionSupplyMessageV2.class); + } + + /** {@inheritDoc} */ + @Nullable @Override public Throwable error() { + return err; + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 120; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 13; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a232b88f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java index 4c45352..199efcb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java @@ -880,6 +880,10 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple /** Flag indicates that partition belongs to current {@link #next} is finished and no longer needs to rebalance. */ private boolean reachedPartitionEnd; + /** Flag indicates that update counters for requested partitions have been reached and done. + * It means that no further iteration is needed. */ + private boolean doneAllPartitions; + /** * @param grp Cache context. * @param walIt WAL iterator. @@ -953,6 +957,9 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple doneParts.add(next.partitionId()); reachedPartitionEnd = false; + + if (doneParts.size() == partMap.size()) + doneAllPartitions = true; } advance(); @@ -1011,6 +1018,9 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple private void advance() { next = null; + if (doneAllPartitions) + return; + while (true) { if (entryIt != null) { while (entryIt.hasNext()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/a232b88f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java index d4f6f0c..57565bf 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java @@ -17,6 +17,10 @@ package org.apache.ignite.internal.processors.cache.persistence.db.wal; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.OpenOption; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -38,18 +42,31 @@ import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.WALMode; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCachePreloader; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.IgniteSpiException; -import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Assert; +import static java.nio.file.StandardOpenOption.CREATE; +import static java.nio.file.StandardOpenOption.READ; +import static java.nio.file.StandardOpenOption.WRITE; import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD; /** @@ -62,6 +79,9 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest { /** Partitions count. */ private static final int PARTS_CNT = 32; + /** Block message predicate to set to Communication SPI in node configuration. */ + private IgniteBiPredicate<ClusterNode, Message> blockMessagePredicate; + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { System.setProperty(IGNITE_PDS_WAL_REBALANCE_THRESHOLD, "0"); //to make all rebalance wal-based @@ -92,6 +112,12 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest { cfg.setCommunicationSpi(new WalRebalanceCheckingCommunicationSpi()); + if (blockMessagePredicate != null) { + TestRecordingCommunicationSpi spi = (TestRecordingCommunicationSpi) cfg.getCommunicationSpi(); + + spi.blockMessages(blockMessagePredicate); + } + return cfg; } @@ -227,6 +253,8 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest { cache.put(k, new IndexedObject(k - 1)); } + forceCheckpoint(); + stopAllGrids(); IgniteEx ig0 = (IgniteEx) startGrids(2); @@ -240,6 +268,8 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest { for (int k = 0; k < entryCnt; k++) cache.put(k, new IndexedObject(k)); + forceCheckpoint(); + // This node should rebalance data from other nodes and shouldn't have WAL history. Ignite ignite = startGrid(2); @@ -258,6 +288,8 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest { cache.remove(k); } + forceCheckpoint(); + // Stop grids which have actual WAL history. stopGrid(0); @@ -309,6 +341,8 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest { cache.put(k, new IndexedObject(k - 1)); } + forceCheckpoint(); + stopAllGrids(); // Rewrite data with globally disabled WAL. @@ -325,6 +359,8 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest { for (int k = 0; k < entryCnt; k++) cache.put(k, new IndexedObject(k)); + forceCheckpoint(); + crd.cluster().enableWal(CACHE_NAME); // This node shouldn't rebalance data using WAL, because it was disabled on other nodes. @@ -365,6 +401,100 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest { } /** + * Tests that cache rebalance is cancelled if supplyer node got exception during iteration over WAL. + * + * @throws Exception If failed. + */ + public void testRebalanceCancelOnSupplyError() throws Exception { + // Prepare some data. + IgniteEx crd = (IgniteEx) startGrids(3); + + crd.cluster().active(true); + + final int entryCnt = PARTS_CNT * 10; + + { + IgniteCache<Object, Object> cache = crd.cache(CACHE_NAME); + + for (int k = 0; k < entryCnt; k++) + cache.put(k, new IndexedObject(k - 1)); + } + + forceCheckpoint(); + + stopAllGrids(); + + // Rewrite data to trigger further rebalance. + IgniteEx supplierNode = (IgniteEx) startGrid(0); + + supplierNode.cluster().active(true); + + IgniteCache<Object, Object> cache = supplierNode.cache(CACHE_NAME); + + for (int k = 0; k < entryCnt; k++) + cache.put(k, new IndexedObject(k)); + + forceCheckpoint(); + + final int groupId = supplierNode.cachex(CACHE_NAME).context().groupId(); + + // Delay rebalance process for specified group. + blockMessagePredicate = (node, msg) -> { + if (msg instanceof GridDhtPartitionDemandMessage) + return ((GridDhtPartitionDemandMessage) msg).groupId() == groupId; + + return false; + }; + + IgniteEx demanderNode = startGrid(2); + + AffinityTopologyVersion curTopVer = demanderNode.context().discovery().topologyVersionEx(); + + // Wait for rebalance process start on demander node. + final GridCachePreloader preloader = demanderNode.cachex(CACHE_NAME).context().group().preloader(); + + GridTestUtils.waitForCondition(() -> + ((GridDhtPartitionDemander.RebalanceFuture) preloader.rebalanceFuture()).topologyVersion().equals(curTopVer), + getTestTimeout() + ); + + // Inject I/O factory which can throw exception during WAL read on supplier node. + FailingIOFactory ioFactory = new FailingIOFactory(new RandomAccessFileIOFactory()); + + ((FileWriteAheadLogManager) supplierNode.cachex(CACHE_NAME).context().shared().wal()).setFileIOFactory(ioFactory); + + ioFactory.throwExceptionOnWalRead(); + + // Resume rebalance process. + TestRecordingCommunicationSpi spi = (TestRecordingCommunicationSpi) demanderNode.configuration().getCommunicationSpi(); + + spi.stopBlock(); + + // Wait till rebalance will be failed and cancelled. + Boolean result = preloader.rebalanceFuture().get(); + + Assert.assertEquals("Rebalance should be cancelled on demander node: " + preloader.rebalanceFuture(), false, result); + + // Stop blocking messages and fail WAL during read. + blockMessagePredicate = null; + + ioFactory.reset(); + + // Start last grid and wait for rebalance. + startGrid(1); + + awaitPartitionMapExchange(); + + // Check data consistency. + for (Ignite ig : G.allGrids()) { + IgniteCache<Object, Object> cache1 = ig.cache(CACHE_NAME); + + for (int k = 0; k < entryCnt; k++) + assertEquals(new IndexedObject(k), cache1.get(k)); + } + } + + /** * */ private static class IndexedObject { @@ -409,7 +539,7 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest { /** * Wrapper of communication spi to detect on what topology versions WAL rebalance has happened. */ - public static class WalRebalanceCheckingCommunicationSpi extends TcpCommunicationSpi { + public static class WalRebalanceCheckingCommunicationSpi extends TestRecordingCommunicationSpi { /** (Group ID, Set of topology versions). */ private static final Map<Integer, Set<Long>> topVers = new HashMap<>(); @@ -464,4 +594,55 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest { super.sendMessage(node, msg, ackC); } } + + /** + * + */ + static class FailingIOFactory implements FileIOFactory { + /** Fail read operations. */ + private volatile boolean failRead; + + /** Delegate. */ + private final FileIOFactory delegate; + + /** + * @param delegate Delegate. + */ + FailingIOFactory(FileIOFactory delegate) { + this.delegate = delegate; + } + + /** {@inheritDoc} */ + @Override public FileIO create(File file) throws IOException { + return create(file, CREATE, WRITE, READ); + } + + /** {@inheritDoc} */ + @Override public FileIO create(File file, OpenOption... modes) throws IOException { + FileIO delegateIO = delegate.create(file, modes); + + if (file.getName().endsWith(".wal") && failRead) + return new FileIODecorator(delegateIO) { + @Override public int read(ByteBuffer destBuf) throws IOException { + throw new IgniteException("Test exception."); + } + }; + + return delegateIO; + } + + /** + * + */ + public void throwExceptionOnWalRead() { + failRead = true; + } + + /** + * + */ + public void reset() { + failRead = false; + } + } } \ No newline at end of file
