Repository: ignite
Updated Branches:
  refs/heads/master d5432c005 -> a232b88fe


IGNITE-9084 Fixed error handling for historical rebalance - Fixes #4437.

Signed-off-by: Alexey Goncharuk <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a232b88f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a232b88f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a232b88f

Branch: refs/heads/master
Commit: a232b88fe823a9823b86a0efd93c259e0cf0f0bc
Parents: d5432c0
Author: Pavel Kovalenko <[email protected]>
Authored: Thu Sep 6 18:03:50 2018 +0300
Committer: Alexey Goncharuk <[email protected]>
Committed: Thu Sep 6 18:03:50 2018 +0300

----------------------------------------------------------------------
 .../communication/GridIoMessageFactory.java     |   7 +
 .../processors/cache/GridCacheIoManager.java    |   3 +-
 .../dht/preloader/GridDhtPartitionDemander.java |  14 +-
 .../dht/preloader/GridDhtPartitionSupplier.java |  69 +++++--
 .../GridDhtPartitionSupplyMessage.java          |   6 +-
 .../GridDhtPartitionSupplyMessageV2.java        | 153 +++++++++++++++
 .../persistence/GridCacheOffheapManager.java    |  10 +
 .../db/wal/IgniteWalRebalanceTest.java          | 185 ++++++++++++++++++-
 8 files changed, 427 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a232b88f/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 8dddd8b..2970e71 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -54,6 +54,8 @@ import 
org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
 import org.apache.ignite.internal.processors.cache.WalStateAckMessage;
 import 
org.apache.ignite.internal.processors.cache.binary.MetadataRequestMessage;
 import 
org.apache.ignite.internal.processors.cache.binary.MetadataResponseMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.LatchAckMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridCacheTtlUpdateRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryResponse;
@@ -889,6 +891,11 @@ public class GridIoMessageFactory implements 
MessageFactory {
 
                 break;
 
+            case 120:
+                msg = new GridDhtPartitionSupplyMessageV2();
+
+                break;
+
             case 124:
                 msg = new GridMessageCollection<>();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a232b88f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 0134421..1e25c93 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -914,7 +914,8 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
 
             break;
 
-            case 114: {
+            case 114:
+            case 120: {
                 processMessage(nodeId, msg, c);// Will be handled by Rebalance 
Demander.
             }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a232b88f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 851fcc9..3c1090f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -688,10 +688,20 @@ public class GridDhtPartitionDemander {
         if (log.isDebugEnabled())
             log.debug("Received supply message [grp=" + grp.cacheOrGroupName() 
+ ", msg=" + supply + ']');
 
-        // Check whether there were class loading errors on unmarshal
+        // Check whether there were error during supply message unmarshalling 
process.
         if (supply.classError() != null) {
             U.warn(log, "Rebalancing from node cancelled [grp=" + 
grp.cacheOrGroupName() + ", node=" + nodeId +
-                "]. Class got undeployed during preloading: " + 
supply.classError());
+                "]. Supply message couldn't be unmarshalled: " + 
supply.classError());
+
+            fut.cancel(nodeId);
+
+            return;
+        }
+
+        // Check whether there were error during supplying process.
+        if (supply.error() != null) {
+            U.warn(log, "Rebalancing from node cancelled [grp=" + 
grp.cacheOrGroupName() + ", node=" + nodeId +
+                "]. Supplier has failed with error: " + supply.error());
 
             fut.cancel(nodeId);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a232b88f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 524d02d..2090c07 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -218,9 +218,11 @@ class GridDhtPartitionSupplier {
         if (node == null)
             return;
 
-        try {
-            SupplyContext sctx;
+        IgniteRebalanceIterator iter = null;
+
+        SupplyContext sctx = null;
 
+        try {
             synchronized (scMap) {
                 sctx = scMap.remove(contextId);
 
@@ -229,7 +231,7 @@ class GridDhtPartitionSupplier {
                     scMap.put(contextId, sctx);
 
                     if (log.isDebugEnabled())
-                        log.debug("Stale demand message [grp=" + 
grp.cacheOrGroupName()
+                        log.debug("Stale demand message [cache=" + 
grp.cacheOrGroupName()
                             + ", actualContext=" + sctx
                             + ", from=" + nodeId
                             + ", demandMsg=" + d + "]");
@@ -241,7 +243,7 @@ class GridDhtPartitionSupplier {
             // Demand request should not contain empty partitions if no supply 
context is associated with it.
             if (sctx == null && (d.partitions() == null || 
d.partitions().isEmpty())) {
                 if (log.isDebugEnabled())
-                    log.debug("Empty demand message [grp=" + 
grp.cacheOrGroupName()
+                    log.debug("Empty demand message [cache=" + 
grp.cacheOrGroupName()
                         + ", from=" + nodeId
                         + ", topicId=" + topicId
                         + ", demandMsg=" + d + "]");
@@ -272,8 +274,6 @@ class GridDhtPartitionSupplier {
                     d.topologyVersion(),
                     grp.deploymentEnabled());
 
-            IgniteRebalanceIterator iter;
-
             Set<Integer> remainingParts;
 
             if (sctx == null || sctx.iterator == null) {
@@ -452,13 +452,56 @@ class GridDhtPartitionSupplier {
                     ", topology=" + demTop + ", rebalanceId=" + 
d.rebalanceId() +
                     ", topicId=" + topicId + "]");
         }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to send partition supply message to node: " + 
nodeId, e);
-        }
-        catch (IgniteSpiException e) {
-            if (log.isDebugEnabled())
-                log.debug("Failed to send message to node (current node is 
stopping?) [node=" + node.id() +
-                    ", msg=" + e.getMessage() + ']');
+        catch (Throwable t) {
+            if (grp.shared().kernalContext().isStopping())
+                return;
+
+            // Sending supply messages with error requires new protocol.
+            boolean sendErrMsg = 
node.version().compareTo(GridDhtPartitionSupplyMessageV2.AVAILABLE_SINCE) >= 0;
+
+            if (t instanceof IgniteSpiException) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to send message to node (current node is 
stopping?) [node=" + node.id() +
+                        ", msg=" + t.getMessage() + ']');
+
+                sendErrMsg = false;
+            }
+            else
+                U.error(log, "Failed to continue supplying process for " +
+                    "[cache=" + grp.cacheOrGroupName() + ", node=" + nodeId
+                    + ", topicId=" + contextId.get2() + ", topVer=" + 
contextId.get3() + "]", t);
+
+            try {
+                if (sctx != null)
+                    clearContext(sctx, log);
+                else if (iter != null)
+                    iter.close();
+            }
+            catch (Throwable t1) {
+                U.error(log, "Failed to cleanup supplying context " +
+                        "[cache=" + grp.cacheOrGroupName() + ", node=" + nodeId
+                        + ", topicId=" + contextId.get2() + ", topVer=" + 
contextId.get3() + "]", t1);
+            }
+
+            if (!sendErrMsg)
+                return;
+
+            try {
+                GridDhtPartitionSupplyMessageV2 errMsg = new 
GridDhtPartitionSupplyMessageV2(
+                    d.rebalanceId(),
+                    grp.groupId(),
+                    d.topologyVersion(),
+                    grp.deploymentEnabled(),
+                    t
+                );
+
+                reply(node, d, errMsg, contextId);
+            }
+            catch (Throwable t1) {
+                U.error(log, "Failed to send supply error message for " +
+                    "[cache=" + grp.cacheOrGroupName() + ", node=" + nodeId
+                        + ", topicId=" + contextId.get2() + ", topVer=" + 
contextId.get3() + "]", t1);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a232b88f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
index 4ecffc4..284700a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
@@ -89,10 +89,12 @@ public class GridDhtPartitionSupplyMessage extends 
GridCacheGroupIdMessage imple
      * @param topVer Topology version.
      * @param addDepInfo Deployment info flag.
      */
-    GridDhtPartitionSupplyMessage(long rebalanceId,
+    GridDhtPartitionSupplyMessage(
+        long rebalanceId,
         int grpId,
         AffinityTopologyVersion topVer,
-        boolean addDepInfo) {
+        boolean addDepInfo
+    ) {
         this.grpId = grpId;
         this.rebalanceId = rebalanceId;
         this.topVer = topVer;

http://git-wip-us.apache.org/repos/asf/ignite/blob/a232b88f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
new file mode 100644
index 0000000..a775766
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteProductVersion;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Supply message with supplier error transfer support.
+ */
+public class GridDhtPartitionSupplyMessageV2 extends 
GridDhtPartitionSupplyMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Available since. */
+    public static final IgniteProductVersion AVAILABLE_SINCE = 
IgniteProductVersion.fromString("2.7.0");
+
+    /** Supplying process error. */
+    @GridDirectTransient
+    private Throwable err;
+
+    /** Supplying process error bytes. */
+    private byte[] errBytes;
+
+    /**
+     * Default constructor.
+     */
+    public GridDhtPartitionSupplyMessageV2() {
+    }
+
+    /**
+     * @param rebalanceId Rebalance id.
+     * @param grpId Group id.
+     * @param topVer Topology version.
+     * @param addDepInfo Add dep info.
+     * @param err Supply process error.
+     */
+    public GridDhtPartitionSupplyMessageV2(
+        long rebalanceId,
+        int grpId,
+        AffinityTopologyVersion topVer,
+        boolean addDepInfo,
+        Throwable err
+    ) {
+        super(rebalanceId, grpId, topVer, addDepInfo);
+
+        this.err = err;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void prepareMarshal(GridCacheSharedContext ctx) throws 
IgniteCheckedException {
+        super.prepareMarshal(ctx);
+
+        if (err != null && errBytes == null)
+            errBytes = U.marshal(ctx, err);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void finishUnmarshal(GridCacheSharedContext ctx, 
ClassLoader ldr) throws IgniteCheckedException {
+        super.finishUnmarshal(ctx, ldr);
+
+        if (errBytes != null && err == null)
+            err = U.unmarshal(ctx, errBytes, U.resolveClassLoader(ldr, 
ctx.gridConfig()));
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 12:
+                if (!writer.writeByteArray("errBytes", errBytes))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 12:
+                errBytes = reader.readByteArray("errBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(GridDhtPartitionSupplyMessageV2.class);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Throwable error() {
+        return err;
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 120;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 13;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a232b88f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 4c45352..199efcb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -880,6 +880,10 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
         /** Flag indicates that partition belongs to current {@link #next} is 
finished and no longer needs to rebalance. */
         private boolean reachedPartitionEnd;
 
+        /** Flag indicates that update counters for requested partitions have 
been reached and done.
+         *  It means that no further iteration is needed. */
+        private boolean doneAllPartitions;
+
         /**
          * @param grp Cache context.
          * @param walIt WAL iterator.
@@ -953,6 +957,9 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
                 doneParts.add(next.partitionId());
 
                 reachedPartitionEnd = false;
+
+                if (doneParts.size() == partMap.size())
+                    doneAllPartitions = true;
             }
 
             advance();
@@ -1011,6 +1018,9 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
         private void advance() {
             next = null;
 
+            if (doneAllPartitions)
+                return;
+
             while (true) {
                 if (entryIt != null) {
                     while (entryIt.hasNext()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a232b88f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
index d4f6f0c..57565bf 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
@@ -17,6 +17,10 @@
 
 package org.apache.ignite.internal.processors.cache.persistence.db.wal;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.OpenOption;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -38,18 +42,31 @@ import 
org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCachePreloader;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.IgniteSpiException;
-import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Assert;
 
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.READ;
+import static java.nio.file.StandardOpenOption.WRITE;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
 
 /**
@@ -62,6 +79,9 @@ public class IgniteWalRebalanceTest extends 
GridCommonAbstractTest {
     /** Partitions count. */
     private static final int PARTS_CNT = 32;
 
+    /** Block message predicate to set to Communication SPI in node 
configuration. */
+    private IgniteBiPredicate<ClusterNode, Message> blockMessagePredicate;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
         System.setProperty(IGNITE_PDS_WAL_REBALANCE_THRESHOLD, "0"); //to make 
all rebalance wal-based
@@ -92,6 +112,12 @@ public class IgniteWalRebalanceTest extends 
GridCommonAbstractTest {
 
         cfg.setCommunicationSpi(new WalRebalanceCheckingCommunicationSpi());
 
+        if (blockMessagePredicate != null) {
+            TestRecordingCommunicationSpi spi = 
(TestRecordingCommunicationSpi) cfg.getCommunicationSpi();
+
+            spi.blockMessages(blockMessagePredicate);
+        }
+
         return cfg;
     }
 
@@ -227,6 +253,8 @@ public class IgniteWalRebalanceTest extends 
GridCommonAbstractTest {
                 cache.put(k, new IndexedObject(k - 1));
         }
 
+        forceCheckpoint();
+
         stopAllGrids();
 
         IgniteEx ig0 = (IgniteEx) startGrids(2);
@@ -240,6 +268,8 @@ public class IgniteWalRebalanceTest extends 
GridCommonAbstractTest {
         for (int k = 0; k < entryCnt; k++)
             cache.put(k, new IndexedObject(k));
 
+        forceCheckpoint();
+
         // This node should rebalance data from other nodes and shouldn't have 
WAL history.
         Ignite ignite = startGrid(2);
 
@@ -258,6 +288,8 @@ public class IgniteWalRebalanceTest extends 
GridCommonAbstractTest {
                 cache.remove(k);
         }
 
+        forceCheckpoint();
+
         // Stop grids which have actual WAL history.
         stopGrid(0);
 
@@ -309,6 +341,8 @@ public class IgniteWalRebalanceTest extends 
GridCommonAbstractTest {
                 cache.put(k, new IndexedObject(k - 1));
         }
 
+        forceCheckpoint();
+
         stopAllGrids();
 
         // Rewrite data with globally disabled WAL.
@@ -325,6 +359,8 @@ public class IgniteWalRebalanceTest extends 
GridCommonAbstractTest {
         for (int k = 0; k < entryCnt; k++)
             cache.put(k, new IndexedObject(k));
 
+        forceCheckpoint();
+
         crd.cluster().enableWal(CACHE_NAME);
 
         // This node shouldn't rebalance data using WAL, because it was 
disabled on other nodes.
@@ -365,6 +401,100 @@ public class IgniteWalRebalanceTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * Tests that cache rebalance is cancelled if supplyer node got exception 
during iteration over WAL.
+     *
+     * @throws Exception If failed.
+     */
+    public void testRebalanceCancelOnSupplyError() throws Exception {
+        // Prepare some data.
+        IgniteEx crd = (IgniteEx) startGrids(3);
+
+        crd.cluster().active(true);
+
+        final int entryCnt = PARTS_CNT * 10;
+
+        {
+            IgniteCache<Object, Object> cache = crd.cache(CACHE_NAME);
+
+            for (int k = 0; k < entryCnt; k++)
+                cache.put(k, new IndexedObject(k - 1));
+        }
+
+        forceCheckpoint();
+
+        stopAllGrids();
+
+        // Rewrite data to trigger further rebalance.
+        IgniteEx supplierNode = (IgniteEx) startGrid(0);
+
+        supplierNode.cluster().active(true);
+
+        IgniteCache<Object, Object> cache = supplierNode.cache(CACHE_NAME);
+
+        for (int k = 0; k < entryCnt; k++)
+            cache.put(k, new IndexedObject(k));
+
+        forceCheckpoint();
+
+        final int groupId = 
supplierNode.cachex(CACHE_NAME).context().groupId();
+
+        // Delay rebalance process for specified group.
+        blockMessagePredicate = (node, msg) -> {
+            if (msg instanceof GridDhtPartitionDemandMessage)
+                return ((GridDhtPartitionDemandMessage) msg).groupId() == 
groupId;
+
+            return false;
+        };
+
+        IgniteEx demanderNode = startGrid(2);
+
+        AffinityTopologyVersion curTopVer = 
demanderNode.context().discovery().topologyVersionEx();
+
+        // Wait for rebalance process start on demander node.
+        final GridCachePreloader preloader = 
demanderNode.cachex(CACHE_NAME).context().group().preloader();
+
+        GridTestUtils.waitForCondition(() ->
+            ((GridDhtPartitionDemander.RebalanceFuture) 
preloader.rebalanceFuture()).topologyVersion().equals(curTopVer),
+            getTestTimeout()
+        );
+
+        // Inject I/O factory which can throw exception during WAL read on 
supplier node.
+        FailingIOFactory ioFactory = new FailingIOFactory(new 
RandomAccessFileIOFactory());
+
+        ((FileWriteAheadLogManager) 
supplierNode.cachex(CACHE_NAME).context().shared().wal()).setFileIOFactory(ioFactory);
+
+        ioFactory.throwExceptionOnWalRead();
+
+        // Resume rebalance process.
+        TestRecordingCommunicationSpi spi = (TestRecordingCommunicationSpi) 
demanderNode.configuration().getCommunicationSpi();
+
+        spi.stopBlock();
+
+        // Wait till rebalance will be failed and cancelled.
+        Boolean result = preloader.rebalanceFuture().get();
+
+        Assert.assertEquals("Rebalance should be cancelled on demander node: " 
+ preloader.rebalanceFuture(), false, result);
+
+        // Stop blocking messages and fail WAL during read.
+        blockMessagePredicate = null;
+
+        ioFactory.reset();
+
+        // Start last grid and wait for rebalance.
+        startGrid(1);
+
+        awaitPartitionMapExchange();
+
+        // Check data consistency.
+        for (Ignite ig : G.allGrids()) {
+            IgniteCache<Object, Object> cache1 = ig.cache(CACHE_NAME);
+
+            for (int k = 0; k < entryCnt; k++)
+                assertEquals(new IndexedObject(k), cache1.get(k));
+        }
+    }
+
+    /**
      *
      */
     private static class IndexedObject {
@@ -409,7 +539,7 @@ public class IgniteWalRebalanceTest extends 
GridCommonAbstractTest {
     /**
      * Wrapper of communication spi to detect on what topology versions WAL 
rebalance has happened.
      */
-    public static class WalRebalanceCheckingCommunicationSpi extends 
TcpCommunicationSpi {
+    public static class WalRebalanceCheckingCommunicationSpi extends 
TestRecordingCommunicationSpi {
         /** (Group ID, Set of topology versions). */
         private static final Map<Integer, Set<Long>> topVers = new HashMap<>();
 
@@ -464,4 +594,55 @@ public class IgniteWalRebalanceTest extends 
GridCommonAbstractTest {
             super.sendMessage(node, msg, ackC);
         }
     }
+
+    /**
+     *
+     */
+    static class FailingIOFactory implements FileIOFactory {
+        /** Fail read operations. */
+        private volatile boolean failRead;
+
+        /** Delegate. */
+        private final FileIOFactory delegate;
+
+        /**
+         * @param delegate Delegate.
+         */
+        FailingIOFactory(FileIOFactory delegate) {
+            this.delegate = delegate;
+        }
+
+        /** {@inheritDoc} */
+        @Override public FileIO create(File file) throws IOException {
+            return create(file, CREATE, WRITE, READ);
+        }
+
+        /** {@inheritDoc} */
+        @Override public FileIO create(File file, OpenOption... modes) throws 
IOException {
+            FileIO delegateIO = delegate.create(file, modes);
+
+            if (file.getName().endsWith(".wal") && failRead)
+                return new FileIODecorator(delegateIO) {
+                    @Override public int read(ByteBuffer destBuf) throws 
IOException {
+                        throw new IgniteException("Test exception.");
+                    }
+                };
+
+            return delegateIO;
+        }
+
+        /**
+         *
+         */
+        public void throwExceptionOnWalRead() {
+            failRead = true;
+        }
+
+        /**
+         *
+         */
+        public void reset() {
+            failRead = false;
+        }
+    }
 }
\ No newline at end of file

Reply via email to