This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 10cc703063 IGNITE-18073 Update the API for a full rebalance of 
MvPartitionStorage and indexes (#1471)
10cc703063 is described below

commit 10cc703063d18cfe6fa820aa22ac0ea4e39c9a76
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Tue Dec 27 15:51:14 2022 +0300

    IGNITE-18073 Update the API for a full rebalance of MvPartitionStorage and 
indexes (#1471)
---
 modules/storage-api/build.gradle                   |   1 +
 .../internal/storage/MvPartitionStorage.java       |   9 +
 .../internal/storage/StorageClosedException.java   |   2 +-
 ...ception.java => StorageRebalanceException.java} |  10 +-
 .../internal/storage/engine/MvTableStorage.java    |  93 +++--
 .../storage/AbstractMvTableStorageTest.java        | 442 ++++++++++++++++-----
 .../internal/storage/BaseMvStoragesTest.java       |  23 +-
 .../storage/impl/TestMvPartitionStorage.java       | 159 ++++++--
 .../internal/storage/impl/TestMvTableStorage.java  | 127 ++++--
 .../storage/index/impl/TestHashIndexStorage.java   |  83 +++-
 .../storage/index/impl/TestSortedIndexStorage.java |  80 +++-
 .../PersistentPageMemoryTableStorage.java          |   6 +-
 .../pagememory/VolatilePageMemoryTableStorage.java |   6 +-
 .../PersistentPageMemoryMvTableStorageTest.java    |  32 +-
 .../VolatilePageMemoryMvTableStorageTest.java      |  28 +-
 .../storage/rocksdb/RocksDbTableStorage.java       |   6 +-
 .../storage/rocksdb/RocksDbMvTableStorageTest.java |  28 +-
 .../internal/tx/storage/state/TxStateStorage.java  |   4 +-
 18 files changed, 864 insertions(+), 275 deletions(-)

diff --git a/modules/storage-api/build.gradle b/modules/storage-api/build.gradle
index 8cb950c654..45e4ddf38b 100644
--- a/modules/storage-api/build.gradle
+++ b/modules/storage-api/build.gradle
@@ -58,6 +58,7 @@ dependencies {
     testFixturesImplementation libs.junit5.impl
     testFixturesImplementation libs.junit5.params
     testFixturesImplementation libs.auto.service.annotations
+    testFixturesImplementation libs.mockito.core
 }
 
 description = 'ignite-storage-api'
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
index 59cca1ba64..1898308bcc 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
@@ -39,6 +39,13 @@ import org.jetbrains.annotations.Nullable;
  * {@link RowId#compareTo} comparison order.
  */
 public interface MvPartitionStorage extends ManuallyCloseable {
+    /**
+     * Value of the {@link #lastAppliedIndex()} and {@link #lastAppliedTerm()} 
during rebalance of transaction state storage.
+     *
+     * <p>Allows to determine on a node restart that rebalance has not been 
completed and storage should be cleared before using it.
+     */
+    long REBALANCE_IN_PROGRESS = -1;
+
     /**
      * Closure for executing write operations on the storage.
      *
@@ -233,6 +240,8 @@ public interface MvPartitionStorage extends 
ManuallyCloseable {
 
     /**
      * Closes the storage.
+     *
+     * <p>REQUIRED: For background tasks for partition, such as rebalancing, 
to be completed by the time the method is called.
      */
     @Override
     void close();
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageClosedException.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageClosedException.java
index dfd45814c4..6abccd774c 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageClosedException.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageClosedException.java
@@ -18,7 +18,7 @@
 package org.apache.ignite.internal.storage;
 
 /**
- *  Exception that will be thrown when the storage is closed.
+ * Exception that will be thrown when the storage is closed.
  */
 public class StorageClosedException extends StorageException {
     /**
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageClosedException.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageRebalanceException.java
similarity index 78%
copy from 
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageClosedException.java
copy to 
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageRebalanceException.java
index dfd45814c4..6a99a8003a 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageClosedException.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageRebalanceException.java
@@ -18,15 +18,15 @@
 package org.apache.ignite.internal.storage;
 
 /**
- *  Exception that will be thrown when the storage is closed.
+ * Exception that will be thrown when the storage is in the process of 
rebalance.
  */
-public class StorageClosedException extends StorageException {
+public class StorageRebalanceException extends StorageException {
     /**
      * Constructor.
      *
      * @param message Error message.
      */
-    public StorageClosedException(String message) {
+    public StorageRebalanceException(String message) {
         super(message);
     }
 
@@ -36,7 +36,7 @@ public class StorageClosedException extends StorageException {
      * @param message Error message.
      * @param cause The cause.
      */
-    public StorageClosedException(String message, Throwable cause) {
+    public StorageRebalanceException(String message, Throwable cause) {
         super(message, cause);
     }
 
@@ -45,7 +45,7 @@ public class StorageClosedException extends StorageException {
      *
      * @param cause The cause.
      */
-    public StorageClosedException(Throwable cause) {
+    public StorageRebalanceException(Throwable cause) {
         super(cause);
     }
 }
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
index 8afef64ac9..a2a964ecf1 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
@@ -24,14 +24,20 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.close.ManuallyCloseable;
 import org.apache.ignite.internal.configuration.util.ConfigurationUtil;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.configuration.TableConfiguration;
 import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import 
org.apache.ignite.internal.schema.configuration.index.TableIndexConfiguration;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.StorageRebalanceException;
 import org.apache.ignite.internal.storage.index.HashIndexStorage;
+import org.apache.ignite.internal.storage.index.IndexRow;
 import org.apache.ignite.internal.storage.index.IndexStorage;
 import org.apache.ignite.internal.storage.index.SortedIndexStorage;
+import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -60,6 +66,8 @@ public interface MvTableStorage extends ManuallyCloseable {
     /**
      * Destroys a partition and all associated indices.
      *
+     * <p>REQUIRED: For background tasks for partition, such as rebalancing, 
to be completed by the time the method is called.
+     *
      * @param partitionId Partition ID.
      * @return Future that will complete when the destroy of the partition is 
completed.
      * @throws IllegalArgumentException If Partition ID is out of bounds.
@@ -165,43 +173,76 @@ public interface MvTableStorage extends ManuallyCloseable 
{
     CompletableFuture<Void> destroy();
 
     /**
-     * Prepares the partition storage for rebalancing: makes a backup of the 
current partition storage and creates a new storage.
-     *
-     * <p>This method must be called before every full rebalance of the 
partition storage, so that in case of errors or cancellation of the
-     * full rebalance, we can restore the partition storage from the backup.
-     *
-     * <p>Full rebalance will be completed when one of the methods is called:
-     * <ol>
-     *     <li>{@link #abortRebalanceMvPartition(int)} - in case of a full 
rebalance cancellation or failure, so that we can
-     *     restore the partition storage from a backup;</li>
-     *     <li>{@link #finishRebalanceMvPartition(int)} - in case of a 
successful full rebalance, to remove the backup of the
-     *     partition storage.</li>
-     * </ol>
+     * Prepares a partition for rebalance.
+     * <ul>
+     *     <li>Cleans up the {@link MvPartitionStorage multi-version partition 
storage} and its associated indexes ({@link HashIndexStorage}
+     *     and {@link SortedIndexStorage});</li>
+     *     <li>Sets {@link MvPartitionStorage#lastAppliedIndex()} and {@link 
MvPartitionStorage#lastAppliedTerm()} to
+     *     {@link MvPartitionStorage#REBALANCE_IN_PROGRESS};</li>
+     *     <li>Stops the cursors of a multi-version partition storage and its 
indexes, subsequent calls to {@link Cursor#hasNext()} and
+     *     {@link Cursor#next()} will throw {@link 
StorageRebalanceException};</li>
+     *     <li>For a multi-version partition storage and its indexes, methods 
for reading and writing data will throw
+     *     {@link StorageRebalanceException} except:<ul>
+     *         <li>{@link MvPartitionStorage#addWrite(RowId, BinaryRow, UUID, 
UUID, int)};</li>
+     *         <li>{@link MvPartitionStorage#commitWrite(RowId, 
HybridTimestamp)};</li>
+     *         <li>{@link MvPartitionStorage#addWriteCommitted(RowId, 
BinaryRow, HybridTimestamp)};</li>
+     *         <li>{@link MvPartitionStorage#lastAppliedIndex()};</li>
+     *         <li>{@link MvPartitionStorage#lastAppliedTerm()};</li>
+     *         <li>{@link MvPartitionStorage#persistedIndex()};</li>
+     *         <li>{@link HashIndexStorage#put(IndexRow)};</li>
+     *         <li>{@link SortedIndexStorage#put(IndexRow)};</li>
+     *     </ul></li>
+     * </ul>
+     *
+     * <p>This method must be called before every rebalance of a multi-version 
partition storage and its indexes and ends with a call
+     * to one of the methods:
+     * <ul>
+     *     <li>{@link #abortRebalancePartition(int)} ()} - in case of errors 
or cancellation of rebalance;</li>
+     *     <li>{@link #finishRebalancePartition(int, long, long)} - in case of 
successful completion of rebalance.</li>
+     * </ul>
+     *
+     * <p>If the {@link MvPartitionStorage#lastAppliedIndex()} is {@link 
MvPartitionStorage#REBALANCE_IN_PROGRESS} after a node restart
+     * , then a multi-version partition storage and its indexes needs to be 
cleared before they start.
+     *
+     * <p>If the partition started to be destroyed or closed, then there will 
be an error when trying to start rebalancing.
      *
      * @param partitionId Partition ID.
-     * @return Future, if completed without errors, then {@link 
#getMvPartition} will return a new (empty) partition storage.
+     * @return Future of the start rebalance for a multi-version partition 
storage and its indexes.
+     * @throws IllegalArgumentException If Partition ID is out of bounds.
+     * @throws StorageRebalanceException If there is an error when starting 
rebalance.
      */
-    CompletableFuture<Void> startRebalanceMvPartition(int partitionId);
+    CompletableFuture<Void> startRebalancePartition(int partitionId);
 
     /**
-     * Aborts rebalancing of the partition storage if it was started: restores 
the partition storage from a backup and deletes the new
-     * storage.
+     * Aborts rebalance for a partition.
+     * <ul>
+     *     <li>Cleans up the {@link MvPartitionStorage multi-version partition 
storage} and its associated indexes ({@link HashIndexStorage}
+     *     and {@link SortedIndexStorage});</li>
+     *     <li>Sets {@link MvPartitionStorage#lastAppliedIndex()} and {@link 
MvPartitionStorage#lastAppliedTerm()} to {@code 0};</li>
+     *     <li>For a multi-version partition storage and its indexes, methods 
for writing and reading will be available.</li>
+     * </ul>
      *
-     * <p>If a full rebalance has not been {@link 
#startRebalanceMvPartition(int) started}, then nothing will happen.
+     * <p>If rebalance has not started, then nothing will happen.
      *
-     * @param partitionId Partition ID.
-     * @return Future, upon completion of which {@link #getMvPartition} will 
return the partition storage restored from the backup.
+     * @return Future of the abort rebalance for a multi-version partition 
storage and its indexes.
+     * @throws IllegalArgumentException If Partition ID is out of bounds.
      */
-    CompletableFuture<Void> abortRebalanceMvPartition(int partitionId);
+    CompletableFuture<Void> abortRebalancePartition(int partitionId);
 
     /**
-     * Finishes a successful partition storage rebalance if it has been 
started: deletes the backup of the partition storage and saves a new
-     * storage.
+     * Completes rebalance for a partition.
+     * <ul>
+     *     <li>Updates {@link MvPartitionStorage#lastAppliedIndex()} and 
{@link MvPartitionStorage#lastAppliedTerm()};</li>
+     *     <li>For a multi-version partition storage and its indexes, methods 
for writing and reading will be available.</li>
+     * </ul>
      *
-     * <p>If a full rebalance has not been {@link 
#startRebalanceMvPartition(int) started}, then nothing will happen.
+     * <p>If rebalance has not started, then {@link StorageRebalanceException} 
will be thrown.
      *
-     * @param partitionId Partition ID.
-     * @return Future, if it fails, will abort the partition storage rebalance.
+     * @param lastAppliedIndex Last applied index.
+     * @param lastAppliedTerm Last applied term.
+     * @return Future of the finish rebalance for a multi-version partition 
storage and its indexes.
+     * @throws IllegalArgumentException If Partition ID is out of bounds.
+     * @throws StorageRebalanceException If there is an error when completing 
rebalance.
      */
-    CompletableFuture<Void> finishRebalanceMvPartition(int partitionId);
+    CompletableFuture<Void> finishRebalancePartition(int partitionId, long 
lastAppliedIndex, long lastAppliedTerm);
 }
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
index a7789feb33..9c7789baa1 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
@@ -17,11 +17,17 @@
 
 package org.apache.ignite.internal.storage;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.ignite.internal.storage.MvPartitionStorage.REBALANCE_IN_PROGRESS;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
@@ -29,24 +35,19 @@ import static org.hamcrest.Matchers.nullValue;
 import static org.hamcrest.Matchers.sameInstance;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNotSame;
 import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
 
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
 import org.apache.ignite.internal.configuration.util.ConfigurationUtil;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryTuple;
-import org.apache.ignite.internal.schema.BinaryTuplePrefix;
 import org.apache.ignite.internal.schema.BinaryTupleSchema;
 import org.apache.ignite.internal.schema.BinaryTupleSchema.Element;
 import org.apache.ignite.internal.schema.NativeTypes;
@@ -59,12 +60,14 @@ import 
org.apache.ignite.internal.schema.testutils.definition.ColumnType;
 import org.apache.ignite.internal.schema.testutils.definition.TableDefinition;
 import 
org.apache.ignite.internal.schema.testutils.definition.index.IndexDefinition;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
 import org.apache.ignite.internal.storage.index.HashIndexStorage;
 import org.apache.ignite.internal.storage.index.IndexRow;
 import org.apache.ignite.internal.storage.index.IndexRowImpl;
 import org.apache.ignite.internal.storage.index.IndexStorage;
 import org.apache.ignite.internal.storage.index.SortedIndexStorage;
 import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.IgniteTuple3;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
@@ -125,12 +128,12 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
      * Tests that partition data does not overlap.
      */
     @Test
-    void testPartitionIndependence() {
+    void testPartitionIndependence() throws Exception {
         MvPartitionStorage partitionStorage0 = 
tableStorage.getOrCreateMvPartition(PARTITION_ID_0);
         // Using a shifted ID value to test a multibyte scenario.
         MvPartitionStorage partitionStorage1 = 
tableStorage.getOrCreateMvPartition(PARTITION_ID_1);
 
-        var testData0 = binaryRow(new TestKey(1, "1"), new TestValue(10, 
"10"));
+        var testData0 = binaryRow(new TestKey(1, "0"), new TestValue(10, 
"10"));
 
         UUID txId = UUID.randomUUID();
 
@@ -151,6 +154,7 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
         assertThat(unwrap(partitionStorage1.read(rowId1, 
HybridTimestamp.MAX_VALUE)), is(equalTo(unwrap(testData1))));
 
         
assertThat(drainToList(partitionStorage0.scan(HybridTimestamp.MAX_VALUE)), 
contains(unwrap(testData0)));
+
         
assertThat(drainToList(partitionStorage1.scan(HybridTimestamp.MAX_VALUE)), 
contains(unwrap(testData1)));
     }
 
@@ -217,7 +221,7 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
     }
 
     @Test
-    public void testHashIndexIndependence() throws Exception {
+    public void testHashIndexIndependence() {
         MvPartitionStorage partitionStorage1 = 
tableStorage.getOrCreateMvPartition(PARTITION_ID);
 
         assertThat(tableStorage.getOrCreateHashIndex(PARTITION_ID, 
hashIdx.id()), is(notNullValue()));
@@ -313,84 +317,9 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
         );
     }
 
-    @Test
-    public void testStartRebalanceMvPartition() throws Exception {
-        MvPartitionStorage partitionStorage = 
tableStorage.getOrCreateMvPartition(PARTITION_ID);
-
-        partitionStorage.runConsistently(() -> {
-            partitionStorage.addWriteCommitted(
-                    new RowId(PARTITION_ID),
-                    binaryRow(new TestKey(0, "0"), new TestValue(1, "1")),
-                    clock.now()
-            );
-
-            partitionStorage.lastApplied(100, 10);
-
-            partitionStorage.committedGroupConfiguration(new 
RaftGroupConfiguration(List.of("peer"), List.of("learner"), null, null));
-
-            return null;
-        });
-
-        partitionStorage.flush().get(1, TimeUnit.SECONDS);
-
-        tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1, 
TimeUnit.SECONDS);
-
-        MvPartitionStorage newPartitionStorage0 = 
tableStorage.getMvPartition(PARTITION_ID);
-
-        assertNotNull(newPartitionStorage0);
-        assertNotSame(partitionStorage, newPartitionStorage0);
-
-        assertEquals(0L, newPartitionStorage0.lastAppliedIndex());
-        assertEquals(0L, newPartitionStorage0.lastAppliedTerm());
-        assertNull(newPartitionStorage0.committedGroupConfiguration());
-        assertEquals(0L, newPartitionStorage0.persistedIndex());
-        assertEquals(0, newPartitionStorage0.rowsCount());
-
-        tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1, 
TimeUnit.SECONDS);
-
-        MvPartitionStorage newPartitionStorage1 = 
tableStorage.getMvPartition(PARTITION_ID);
-
-        assertSame(newPartitionStorage0, newPartitionStorage1);
-    }
-
-    @Test
-    public void testAbortRebalanceMvPartition() throws Exception {
-        assertDoesNotThrow(() -> 
tableStorage.abortRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS));
-
-        MvPartitionStorage partitionStorage = 
tableStorage.getOrCreateMvPartition(PARTITION_ID);
-
-        tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1, 
TimeUnit.SECONDS);
-
-        tableStorage.abortRebalanceMvPartition(PARTITION_ID).get(1, 
TimeUnit.SECONDS);
-
-        assertSame(partitionStorage, 
tableStorage.getMvPartition(PARTITION_ID));
-
-        assertDoesNotThrow(() -> 
tableStorage.abortRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS));
-    }
-
-    @Test
-    public void testFinishRebalanceMvPartition() throws Exception {
-        assertDoesNotThrow(() -> 
tableStorage.finishRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS));
-
-        tableStorage.getOrCreateMvPartition(PARTITION_ID);
-
-        tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1, 
TimeUnit.SECONDS);
-
-        MvPartitionStorage newPartitionStorage = 
tableStorage.getMvPartition(PARTITION_ID);
-
-        tableStorage.finishRebalanceMvPartition(PARTITION_ID).get(1, 
TimeUnit.SECONDS);
-
-        assertSame(newPartitionStorage, 
tableStorage.getMvPartition(PARTITION_ID));
-
-        assertDoesNotThrow(() -> 
tableStorage.finishRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS));
-    }
-
     @Test
     public void testDestroyPartition() throws Exception {
-        assertThrows(
-                IllegalArgumentException.class,
-                () -> 
tableStorage.destroyPartition(tableStorage.configuration().partitions().value())
-        );
+        assertThrows(IllegalArgumentException.class, () -> 
tableStorage.destroyPartition(getPartitionIdOutOfRange()));
 
         MvPartitionStorage mvPartitionStorage = 
tableStorage.getOrCreateMvPartition(PARTITION_ID);
         HashIndexStorage hashIndexStorage = 
tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id());
@@ -400,14 +329,15 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
 
         BinaryRow binaryRow = binaryRow(new TestKey(0, "0"), new TestValue(1, 
"1"));
 
-        IndexRow indexRow = indexRow(binaryRow, rowId);
+        IndexRow hashIndexRow = indexRow(hashIndexStorage.indexDescriptor(), 
binaryRow, rowId);
+        IndexRow sortedIndexRow = 
indexRow(sortedIndexStorage.indexDescriptor(), binaryRow, rowId);
 
         mvPartitionStorage.runConsistently(() -> {
             mvPartitionStorage.addWriteCommitted(rowId, binaryRow, 
clock.now());
 
-            hashIndexStorage.put(indexRow);
+            hashIndexStorage.put(hashIndexRow);
 
-            sortedIndexStorage.put(indexRow);
+            sortedIndexStorage.put(sortedIndexRow);
 
             return null;
         });
@@ -415,12 +345,12 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
         Cursor<ReadResult> scanVersionsCursor = 
mvPartitionStorage.scanVersions(rowId);
         PartitionTimestampCursor scanTimestampCursor = 
mvPartitionStorage.scan(clock.now());
 
-        Cursor<RowId> getFromHashIndexCursor = 
hashIndexStorage.get(indexRow.indexColumns());
+        Cursor<RowId> getFromHashIndexCursor = 
hashIndexStorage.get(hashIndexRow.indexColumns());
 
-        Cursor<RowId> getFromSortedIndexCursor = 
sortedIndexStorage.get(indexRow.indexColumns());
+        Cursor<RowId> getFromSortedIndexCursor = 
sortedIndexStorage.get(hashIndexRow.indexColumns());
         Cursor<IndexRow> scanFromSortedIndexCursor = 
sortedIndexStorage.scan(null, null, 0);
 
-        tableStorage.destroyPartition(PARTITION_ID).get(1, TimeUnit.SECONDS);
+        tableStorage.destroyPartition(PARTITION_ID).get(1, SECONDS);
 
         // Let's check that we won't get destroyed storages.
         assertNull(tableStorage.getMvPartition(PARTITION_ID));
@@ -440,7 +370,7 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
         assertThrows(StorageClosedException.class, () -> 
getAll(scanFromSortedIndexCursor));
 
         // Let's check that nothing will happen if we try to destroy a 
non-existing partition.
-        assertDoesNotThrow(() -> 
tableStorage.destroyPartition(PARTITION_ID).get(1, TimeUnit.SECONDS));
+        assertThat(tableStorage.destroyPartition(PARTITION_ID), 
willCompleteSuccessfully());
     }
 
     @Test
@@ -457,20 +387,124 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
             return null;
         });
 
-        tableStorage.destroyPartition(PARTITION_ID).get(1, TimeUnit.SECONDS);
+        tableStorage.destroyPartition(PARTITION_ID).get(1, SECONDS);
 
         MvPartitionStorage newMvPartitionStorage = 
tableStorage.getOrCreateMvPartition(PARTITION_ID);
 
         assertThat(getAll(newMvPartitionStorage.scanVersions(rowId)), empty());
     }
 
+    @Test
+    public void testSuccessRebalance() throws Exception {
+        MvPartitionStorage mvPartitionStorage = 
tableStorage.getOrCreateMvPartition(PARTITION_ID);
+        HashIndexStorage hashIndexStorage = 
tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id());
+        SortedIndexStorage sortedIndexStorage = 
tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id());
+
+        // Error because reblance has not yet started for the partition.
+        assertThrows(StorageRebalanceException.class, () -> 
tableStorage.finishRebalancePartition(PARTITION_ID, 100, 500));
+
+        List<IgniteTuple3<RowId, BinaryRow, HybridTimestamp>> 
rowsBeforeRebalanceStart = List.of(
+                new IgniteTuple3<>(new RowId(PARTITION_ID), binaryRow(new 
TestKey(0, "0"), new TestValue(0, "0")), clock.now()),
+                new IgniteTuple3<>(new RowId(PARTITION_ID), binaryRow(new 
TestKey(1, "1"), new TestValue(1, "1")), clock.now())
+        );
+
+        startRebalanceWithChecks(
+                PARTITION_ID,
+                mvPartitionStorage,
+                hashIndexStorage,
+                sortedIndexStorage,
+                rowsBeforeRebalanceStart
+        );
+
+        // Let's fill the storages with fresh data on rebalance.
+        List<IgniteTuple3<RowId, BinaryRow, HybridTimestamp>> rowsOnRebalance 
= List.of(
+                new IgniteTuple3<>(new RowId(PARTITION_ID), binaryRow(new 
TestKey(2, "2"), new TestValue(2, "2")), clock.now()),
+                new IgniteTuple3<>(new RowId(PARTITION_ID), binaryRow(new 
TestKey(3, "3"), new TestValue(3, "3")), clock.now())
+        );
+
+        fillStorages(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, 
rowsOnRebalance);
+
+        checkLastApplied(mvPartitionStorage, REBALANCE_IN_PROGRESS, 
REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS);
+
+        // Let's finish rebalancing.
+
+        // Partition is out of configuration range.
+        assertThrows(IllegalArgumentException.class, () -> 
tableStorage.finishRebalancePartition(getPartitionIdOutOfRange(), 100, 500));
+
+        // Partition does not exist.
+        assertThrows(StorageRebalanceException.class, () -> 
tableStorage.finishRebalancePartition(1, 100, 500));
+
+        assertThat(tableStorage.finishRebalancePartition(PARTITION_ID, 10, 
20), willCompleteSuccessfully());
+
+        // Let's check the storages after success finish rebalance.
+        checkForMissingRows(mvPartitionStorage, hashIndexStorage, 
sortedIndexStorage, rowsBeforeRebalanceStart);
+        checkForPresenceRows(mvPartitionStorage, hashIndexStorage, 
sortedIndexStorage, rowsOnRebalance);
+
+        checkLastApplied(mvPartitionStorage, 10, 10, 20);
+    }
+
+    @Test
+    public void testFailRebalance() throws Exception {
+        MvPartitionStorage mvPartitionStorage = 
tableStorage.getOrCreateMvPartition(PARTITION_ID);
+        HashIndexStorage hashIndexStorage = 
tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id());
+        SortedIndexStorage sortedIndexStorage = 
tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id());
+
+        // Nothing will happen because rebalancing has not started.
+        tableStorage.abortRebalancePartition(PARTITION_ID).get(1, SECONDS);
+
+        List<IgniteTuple3<RowId, BinaryRow, HybridTimestamp>> 
rowsBeforeRebalanceStart = List.of(
+                new IgniteTuple3<>(new RowId(PARTITION_ID), binaryRow(new 
TestKey(0, "0"), new TestValue(0, "0")), clock.now()),
+                new IgniteTuple3<>(new RowId(PARTITION_ID), binaryRow(new 
TestKey(1, "1"), new TestValue(1, "1")), clock.now())
+        );
+
+        startRebalanceWithChecks(
+                PARTITION_ID,
+                mvPartitionStorage,
+                hashIndexStorage,
+                sortedIndexStorage,
+                rowsBeforeRebalanceStart
+        );
+
+        // Let's fill the storages with fresh data on rebalance.
+        List<IgniteTuple3<RowId, BinaryRow, HybridTimestamp>> rowsOnRebalance 
= List.of(
+                new IgniteTuple3<>(new RowId(PARTITION_ID), binaryRow(new 
TestKey(2, "2"), new TestValue(2, "2")), clock.now()),
+                new IgniteTuple3<>(new RowId(PARTITION_ID), binaryRow(new 
TestKey(3, "3"), new TestValue(3, "3")), clock.now())
+        );
+
+        fillStorages(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, 
rowsOnRebalance);
+
+        checkLastApplied(mvPartitionStorage, REBALANCE_IN_PROGRESS, 
REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS);
+
+        // Let's abort rebalancing.
+
+        // Partition is out of configuration range.
+        assertThrows(IllegalArgumentException.class, () -> 
tableStorage.abortRebalancePartition(getPartitionIdOutOfRange()));
+
+        assertThat(tableStorage.abortRebalancePartition(PARTITION_ID), 
willCompleteSuccessfully());
+
+        // Let's check the storages after abort rebalance.
+        checkForMissingRows(mvPartitionStorage, hashIndexStorage, 
sortedIndexStorage, rowsBeforeRebalanceStart);
+        checkForMissingRows(mvPartitionStorage, hashIndexStorage, 
sortedIndexStorage, rowsOnRebalance);
+
+        checkLastApplied(mvPartitionStorage, 0, 0, 0);
+    }
+
+    @Test
+    public void testStartRebalanceForClosedPartition() {
+        MvPartitionStorage mvPartitionStorage = 
tableStorage.getOrCreateMvPartition(PARTITION_ID);
+
+        mvPartitionStorage.close();
+
+        assertThrows(StorageRebalanceException.class, () -> 
tableStorage.startRebalancePartition(PARTITION_ID));
+    }
+
     private static void createTestIndexes(TablesConfiguration tablesConfig) {
         List<IndexDefinition> indexDefinitions = List.of(
                 SchemaBuilders.sortedIndex(SORTED_INDEX_NAME)
-                        .addIndexColumn("COLUMN0").done()
+                        .addIndexColumn("strKey").done()
                         .build(),
                 SchemaBuilders.hashIndex(HASH_INDEX_NAME)
-                        .withColumns("COLUMN0")
+                        .withColumns("strKey")
                         .build()
         );
 
@@ -488,10 +522,12 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
     private static void createTestTable(TableConfiguration tableConfig) {
         TableDefinition tableDefinition = 
SchemaBuilders.tableBuilder("PUBLIC", "foo")
                 .columns(
-                        SchemaBuilders.column("ID", ColumnType.INT32).build(),
-                        SchemaBuilders.column("COLUMN0", 
ColumnType.INT32).build()
+                        SchemaBuilders.column("intKey", 
ColumnType.INT32).build(),
+                        SchemaBuilders.column("strKey", 
ColumnType.string()).build(),
+                        SchemaBuilders.column("intVal", 
ColumnType.INT32).build(),
+                        SchemaBuilders.column("strVal", 
ColumnType.string()).build()
                 )
-                .withPrimaryKey("ID")
+                .withPrimaryKey("intKey")
                 .build();
 
         CompletableFuture<Void> createTableFuture = tableConfig.change(
@@ -503,11 +539,11 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
 
     private static <T> List<T> getAll(Cursor<T> cursor) {
         try (cursor) {
-            return cursor.stream().collect(Collectors.toList());
+            return cursor.stream().collect(toList());
         }
     }
 
-    private void checkStorageDestroyed(MvPartitionStorage storage) {
+    private void checkStorageDestroyed(MvPartitionStorage storage) throws 
Exception {
         int partId = PARTITION_ID;
 
         assertThrows(StorageClosedException.class, () -> 
storage.runConsistently(() -> null));
@@ -544,21 +580,207 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
     private void checkStorageDestroyed(SortedIndexStorage storage) {
         checkStorageDestroyed((IndexStorage) storage);
 
-        BinaryTuple indexKey = indexKey(binaryRow(new TestKey(0, "0"), new 
TestValue(1, "1")));
-
-        assertThrows(
-                StorageClosedException.class,
-                () -> 
storage.scan(BinaryTuplePrefix.fromBinaryTuple(indexKey), 
BinaryTuplePrefix.fromBinaryTuple(indexKey), 0)
-        );
+        assertThrows(StorageClosedException.class, () -> storage.scan(null, 
null, 0));
     }
 
     private void checkStorageDestroyed(IndexStorage storage) {
-        IndexRow indexRow = indexRow(binaryRow(new TestKey(0, "0"), new 
TestValue(1, "1")), new RowId(PARTITION_ID));
+        assertThrows(StorageClosedException.class, () -> 
storage.get(mock(BinaryTuple.class)));
+
+        assertThrows(StorageClosedException.class, () -> 
storage.put(mock(IndexRow.class)));
+
+        assertThrows(StorageClosedException.class, () -> 
storage.remove(mock(IndexRow.class)));
+    }
+
+    private int getPartitionIdOutOfRange() {
+        return tableStorage.configuration().partitions().value();
+    }
+
+    private void startRebalanceWithChecks(
+            int partitionId,
+            MvPartitionStorage mvPartitionStorage,
+            HashIndexStorage hashIndexStorage,
+            SortedIndexStorage sortedIndexStorage,
+            List<IgniteTuple3<RowId, BinaryRow, HybridTimestamp>> 
rowsBeforeRebalanceStart
+    ) {
+        assertThat(rowsBeforeRebalanceStart, hasSize(greaterThanOrEqualTo(2)));
 
-        assertThrows(StorageClosedException.class, () -> 
storage.get(indexRow.indexColumns()));
+        fillStorages(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, 
rowsBeforeRebalanceStart);
+
+        // Let's open the cursors before start rebalance.
+        IgniteTuple3<RowId, BinaryRow, HybridTimestamp> rowForCursors = 
rowsBeforeRebalanceStart.get(0);
+
+        Cursor<?> mvPartitionStorageScanVersionsCursor = 
mvPartitionStorage.scanVersions(rowForCursors.get1());
+        Cursor<?> mvPartitionStorageScanCursor = 
mvPartitionStorage.scan(rowForCursors.get3());
+
+        IndexRow hashIndexRow = indexRow(hashIndexStorage.indexDescriptor(), 
rowForCursors.get2(), rowForCursors.get1());
+        IndexRow sortedIndexRow = 
indexRow(sortedIndexStorage.indexDescriptor(), rowForCursors.get2(), 
rowForCursors.get1());
+
+        Cursor<?> hashIndexStorageGetCursor = 
hashIndexStorage.get(hashIndexRow.indexColumns());
+
+        Cursor<?> sortedIndexStorageGetCursor = 
sortedIndexStorage.get(sortedIndexRow.indexColumns());
+        Cursor<?> sortedIndexStorageScanCursor = sortedIndexStorage.scan(null, 
null, 0);
+
+        // Partition is out of configuration range.
+        assertThrows(IllegalArgumentException.class, () -> 
tableStorage.startRebalancePartition(getPartitionIdOutOfRange()));
+
+        // Partition does not exist.
+        assertThrows(StorageRebalanceException.class, () -> 
tableStorage.startRebalancePartition(partitionId + 1));
+
+        // Let's start rebalancing of the partition.
+        assertThat(tableStorage.startRebalancePartition(partitionId), 
willCompleteSuccessfully());
+
+        // Once again, rebalancing of the partition cannot be started.
+        assertThrows(StorageRebalanceException.class, () -> 
tableStorage.startRebalancePartition(partitionId));
+
+        checkMvPartitionStorageMethodsAfterStartRebalance(mvPartitionStorage);
+        checkHashIndexStorageMethodsAfterStartRebalance(hashIndexStorage);
+        checkSortedIndexStorageMethodsAfterStartRebalance(sortedIndexStorage);
+
+        checkCursorAfterStartRebalance(mvPartitionStorageScanVersionsCursor);
+        checkCursorAfterStartRebalance(mvPartitionStorageScanCursor);
+
+        checkCursorAfterStartRebalance(hashIndexStorageGetCursor);
+
+        checkCursorAfterStartRebalance(sortedIndexStorageGetCursor);
+        checkCursorAfterStartRebalance(sortedIndexStorageScanCursor);
+    }
+
+    private void 
checkMvPartitionStorageMethodsAfterStartRebalance(MvPartitionStorage storage) {
+        checkLastApplied(storage, REBALANCE_IN_PROGRESS, 
REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS);
+
+        assertDoesNotThrow(() -> storage.committedGroupConfiguration());
+
+        storage.runConsistently(() -> {
+            assertThrows(StorageRebalanceException.class, () -> 
storage.lastApplied(100, 500));
+
+            assertThrows(
+                    StorageRebalanceException.class,
+                    () -> 
storage.committedGroupConfiguration(mock(RaftGroupConfiguration.class))
+            );
 
-        assertThrows(StorageClosedException.class, () -> 
storage.put(indexRow));
+            RowId rowId = new RowId(PARTITION_ID);
+
+            assertThrows(StorageRebalanceException.class, () -> 
storage.read(rowId, clock.now()));
+            assertThrows(StorageRebalanceException.class, () -> 
storage.abortWrite(rowId));
+            assertThrows(StorageRebalanceException.class, () -> 
storage.scanVersions(rowId));
+            assertThrows(StorageRebalanceException.class, () -> 
storage.scan(clock.now()));
+            assertThrows(StorageRebalanceException.class, () -> 
storage.closestRowId(rowId));
+            assertThrows(StorageRebalanceException.class, storage::rowsCount);
+
+            // TODO: IGNITE-18020 Add check
+            // TODO: IGNITE-18023 Add check
+            if (storage instanceof TestMvPartitionStorage) {
+                assertThrows(StorageRebalanceException.class, () -> 
storage.pollForVacuum(clock.now()));
+            }
+
+            return null;
+        });
+    }
+
+    private static void 
checkHashIndexStorageMethodsAfterStartRebalance(HashIndexStorage storage) {
+        assertDoesNotThrow(storage::indexDescriptor);
+
+        assertThrows(StorageRebalanceException.class, () -> 
storage.get(mock(BinaryTuple.class)));
+        assertThrows(StorageRebalanceException.class, () -> 
storage.remove(mock(IndexRow.class)));
+    }
+
+    private static void 
checkSortedIndexStorageMethodsAfterStartRebalance(SortedIndexStorage storage) {
+        assertDoesNotThrow(storage::indexDescriptor);
+
+        assertThrows(StorageRebalanceException.class, () -> 
storage.get(mock(BinaryTuple.class)));
+        assertThrows(StorageRebalanceException.class, () -> 
storage.remove(mock(IndexRow.class)));
+        assertThrows(StorageRebalanceException.class, () -> storage.scan(null, 
null, 0));
+    }
+
+    private static void checkCursorAfterStartRebalance(Cursor<?> cursor) {
+        assertDoesNotThrow(cursor::close);
+
+        assertThrows(StorageRebalanceException.class, cursor::hasNext);
+        assertThrows(StorageRebalanceException.class, cursor::next);
+    }
+
+    private void fillStorages(
+            MvPartitionStorage mvPartitionStorage,
+            HashIndexStorage hashIndexStorage,
+            SortedIndexStorage sortedIndexStorage,
+            List<IgniteTuple3<RowId, BinaryRow, HybridTimestamp>> rows
+    ) {
+        for (int i = 0; i < rows.size(); i++) {
+            int finalI = i;
+
+            IgniteTuple3<RowId, BinaryRow, HybridTimestamp> row = rows.get(i);
+
+            RowId rowId = row.get1();
+            BinaryRow binaryRow = row.get2();
+            HybridTimestamp timestamp = row.get3();
+
+            IndexRow hashIndexRow = 
indexRow(hashIndexStorage.indexDescriptor(), binaryRow, rowId);
+            IndexRow sortedIndexRow = 
indexRow(sortedIndexStorage.indexDescriptor(), binaryRow, rowId);
+
+            mvPartitionStorage.runConsistently(() -> {
+                // If even.
+                if ((finalI & 1) == 0) {
+                    mvPartitionStorage.addWrite(rowId, binaryRow, 
UUID.randomUUID(), UUID.randomUUID(), rowId.partitionId());
+
+                    mvPartitionStorage.commitWrite(rowId, timestamp);
+                } else {
+                    mvPartitionStorage.addWriteCommitted(rowId, binaryRow, 
timestamp);
+                }
+
+                hashIndexStorage.put(hashIndexRow);
+
+                sortedIndexStorage.put(sortedIndexRow);
+
+                return null;
+            });
+        }
+    }
+
+    private void checkForMissingRows(
+            MvPartitionStorage mvPartitionStorage,
+            HashIndexStorage hashIndexStorage,
+            SortedIndexStorage sortedIndexStorage,
+            List<IgniteTuple3<RowId, BinaryRow, HybridTimestamp>> rows
+    ) {
+        for (IgniteTuple3<RowId, BinaryRow, HybridTimestamp> row : rows) {
+            assertThat(getAll(mvPartitionStorage.scanVersions(row.get1())), 
is(empty()));
+
+            IndexRow hashIndexRow = 
indexRow(hashIndexStorage.indexDescriptor(), row.get2(), row.get1());
+            IndexRow sortedIndexRow = 
indexRow(sortedIndexStorage.indexDescriptor(), row.get2(), row.get1());
+
+            
assertThat(getAll(hashIndexStorage.get(hashIndexRow.indexColumns())), 
is(empty()));
+            
assertThat(getAll(sortedIndexStorage.get(sortedIndexRow.indexColumns())), 
is(empty()));
+        }
+    }
+
+    private void checkForPresenceRows(
+            MvPartitionStorage mvPartitionStorage,
+            HashIndexStorage hashIndexStorage,
+            SortedIndexStorage sortedIndexStorage,
+            List<IgniteTuple3<RowId, BinaryRow, HybridTimestamp>> rows
+    ) {
+        for (IgniteTuple3<RowId, BinaryRow, HybridTimestamp> row : rows) {
+            assertThat(
+                    
getAll(mvPartitionStorage.scanVersions(row.get1())).stream().map(ReadResult::binaryRow).collect(toList()),
+                    containsInAnyOrder(row.get2())
+            );
+
+            IndexRow hashIndexRow = 
indexRow(hashIndexStorage.indexDescriptor(), row.get2(), row.get1());
+            IndexRow sortedIndexRow = 
indexRow(sortedIndexStorage.indexDescriptor(), row.get2(), row.get1());
+
+            
assertThat(getAll(hashIndexStorage.get(hashIndexRow.indexColumns())), 
contains(row.get1()));
+            
assertThat(getAll(sortedIndexStorage.get(sortedIndexRow.indexColumns())), 
contains(row.get1()));
+        }
+    }
 
-        assertThrows(StorageClosedException.class, () -> 
storage.remove(indexRow));
+    private static void checkLastApplied(
+            MvPartitionStorage storage,
+            long expLastAppliedIndex,
+            long expPersistentIndex,
+            long expLastAppliedTerm
+    ) {
+        assertEquals(expLastAppliedIndex, storage.lastAppliedIndex());
+        assertEquals(expPersistentIndex, storage.persistedIndex());
+        assertEquals(expLastAppliedTerm, storage.lastAppliedTerm());
     }
 }
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
index 7389cfd183..2f5fb471b5 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
@@ -21,6 +21,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 
 import java.util.List;
 import java.util.Locale;
@@ -31,6 +32,7 @@ import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.schema.BinaryConverter;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTupleSchema;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
@@ -39,6 +41,7 @@ import 
org.apache.ignite.internal.schema.marshaller.MarshallerException;
 import org.apache.ignite.internal.schema.marshaller.MarshallerFactory;
 import 
org.apache.ignite.internal.schema.marshaller.reflection.ReflectionMarshallerFactory;
 import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.storage.index.IndexDescriptor;
 import org.apache.ignite.internal.storage.index.IndexRow;
 import org.apache.ignite.internal.storage.index.IndexRowImpl;
 import org.apache.ignite.internal.tostring.IgniteToStringInclude;
@@ -115,12 +118,22 @@ public abstract class BaseMvStoragesTest {
         }
     }
 
-    protected static IndexRow indexRow(BinaryRow binaryRow, RowId rowId) {
-        return new IndexRowImpl(kvBinaryConverter.toTuple(binaryRow), rowId);
-    }
+    protected static IndexRow indexRow(IndexDescriptor indexDescriptor, 
BinaryRow binaryRow, RowId rowId) {
+        int[] columnIndexes = indexDescriptor.columns().stream()
+                .mapToInt(indexColumnDescriptor -> {
+                    Column column = 
schemaDescriptor.column(indexColumnDescriptor.name());
+
+                    assertNotNull(column, column.name());
+
+                    return column.schemaIndex();
+                })
+                .toArray();
+
+        BinaryTupleSchema binaryTupleSchema = 
BinaryTupleSchema.createSchema(schemaDescriptor, columnIndexes);
+
+        BinaryConverter binaryTupleConverter = new 
BinaryConverter(schemaDescriptor, binaryTupleSchema, false);
 
-    protected static BinaryTuple indexKey(BinaryRow binaryKey) {
-        return kBinaryConverter.toTuple(binaryKey);
+        return new IndexRowImpl(binaryTupleConverter.toTuple(binaryRow), 
rowId);
     }
 
     protected static TestKey key(BinaryRow binaryRow) {
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
index 02e5251418..b379217843 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
@@ -22,13 +22,11 @@ import static java.util.Comparator.comparing;
 import java.util.Iterator;
 import java.util.NavigableSet;
 import java.util.NoSuchElementException;
-import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.stream.Stream;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.storage.BinaryRowAndRowId;
@@ -39,6 +37,7 @@ import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageClosedException;
 import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.StorageRebalanceException;
 import org.apache.ignite.internal.storage.TxIdMismatchException;
 import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.Nullable;
@@ -65,6 +64,8 @@ public class TestMvPartitionStorage implements 
MvPartitionStorage {
 
     private volatile boolean closed;
 
+    private volatile boolean rebalance;
+
     public TestMvPartitionStorage(int partitionId) {
         this.partitionId = partitionId;
     }
@@ -106,35 +107,35 @@ public class TestMvPartitionStorage implements 
MvPartitionStorage {
 
     @Override
     public <V> V runConsistently(WriteClosure<V> closure) throws 
StorageException {
-        checkClosed();
+        checkStorageClosed();
 
         return closure.execute();
     }
 
     @Override
     public CompletableFuture<Void> flush() {
-        checkClosed();
+        checkStorageClosed();
 
         return CompletableFuture.completedFuture(null);
     }
 
     @Override
     public long lastAppliedIndex() {
-        checkClosed();
+        checkStorageClosed();
 
         return lastAppliedIndex;
     }
 
     @Override
     public long lastAppliedTerm() {
-        checkClosed();
+        checkStorageClosed();
 
         return lastAppliedTerm;
     }
 
     @Override
     public void lastApplied(long lastAppliedIndex, long lastAppliedTerm) 
throws StorageException {
-        checkClosed();
+        checkStorageClosedOrInProcessOfRebalance();
 
         this.lastAppliedIndex = lastAppliedIndex;
         this.lastAppliedTerm = lastAppliedTerm;
@@ -142,7 +143,7 @@ public class TestMvPartitionStorage implements 
MvPartitionStorage {
 
     @Override
     public long persistedIndex() {
-        checkClosed();
+        checkStorageClosed();
 
         return lastAppliedIndex;
     }
@@ -150,13 +151,15 @@ public class TestMvPartitionStorage implements 
MvPartitionStorage {
     @Override
     @Nullable
     public RaftGroupConfiguration committedGroupConfiguration() {
-        checkClosed();
+        checkStorageClosed();
 
         return groupConfig;
     }
 
     @Override
     public void committedGroupConfiguration(RaftGroupConfiguration config) {
+        checkStorageClosedOrInProcessOfRebalance();
+
         this.groupConfig = config;
     }
 
@@ -168,7 +171,7 @@ public class TestMvPartitionStorage implements 
MvPartitionStorage {
             UUID commitTableId,
             int commitPartitionId
     ) throws TxIdMismatchException {
-        checkClosed();
+        checkStorageClosed();
 
         BinaryRow[] res = {null};
 
@@ -191,7 +194,7 @@ public class TestMvPartitionStorage implements 
MvPartitionStorage {
 
     @Override
     public synchronized @Nullable BinaryRow abortWrite(RowId rowId) {
-        checkClosed();
+        checkStorageClosedOrInProcessOfRebalance();
 
         BinaryRow[] res = {null};
 
@@ -212,7 +215,7 @@ public class TestMvPartitionStorage implements 
MvPartitionStorage {
 
     @Override
     public synchronized void commitWrite(RowId rowId, HybridTimestamp 
timestamp) {
-        checkClosed();
+        checkStorageClosed();
 
         map.compute(rowId, (ignored, versionChain) -> {
             assert versionChain != null;
@@ -231,7 +234,7 @@ public class TestMvPartitionStorage implements 
MvPartitionStorage {
             @Nullable BinaryRow row,
             HybridTimestamp commitTimestamp
     ) throws StorageException {
-        checkClosed();
+        checkStorageClosed();
 
         map.compute(rowId, (ignored, versionChain) -> {
             if (versionChain != null && versionChain.isWriteIntent()) {
@@ -269,7 +272,7 @@ public class TestMvPartitionStorage implements 
MvPartitionStorage {
 
     @Override
     public ReadResult read(RowId rowId, @Nullable HybridTimestamp timestamp) {
-        checkClosed();
+        checkStorageClosedOrInProcessOfRebalance();
 
         if (rowId.partitionId() != partitionId) {
             throw new IllegalArgumentException(
@@ -390,19 +393,14 @@ public class TestMvPartitionStorage implements 
MvPartitionStorage {
 
     @Override
     public Cursor<ReadResult> scanVersions(RowId rowId) throws 
StorageException {
-        checkClosed();
+        checkStorageClosedOrInProcessOfRebalance();
 
-        return Cursor.fromBareIterator(
-                Stream.iterate(map.get(rowId), Objects::nonNull, vc -> vc.next)
-                        .peek(versionChain -> checkClosed())
-                        .map((VersionChain versionChain) -> 
versionChainToReadResult(versionChain, false))
-                        .iterator()
-        );
+        return new ScanVersionsCursor(rowId);
     }
 
     @Override
     public PartitionTimestampCursor scan(HybridTimestamp timestamp) {
-        checkClosed();
+        checkStorageClosedOrInProcessOfRebalance();
 
         Iterator<VersionChain> iterator = map.values().iterator();
 
@@ -436,7 +434,7 @@ public class TestMvPartitionStorage implements 
MvPartitionStorage {
 
             @Override
             public boolean hasNext() {
-                checkClosed();
+                checkStorageClosedOrInProcessOfRebalance();
 
                 if (currentReadResult != null) {
                     return true;
@@ -478,13 +476,15 @@ public class TestMvPartitionStorage implements 
MvPartitionStorage {
 
     @Override
     public @Nullable RowId closestRowId(RowId lowerBound) throws 
StorageException {
-        checkClosed();
+        checkStorageClosedOrInProcessOfRebalance();
 
         return map.ceilingKey(lowerBound);
     }
 
     @Override
     public synchronized @Nullable BinaryRowAndRowId 
pollForVacuum(HybridTimestamp lowWatermark) {
+        checkStorageClosedOrInProcessOfRebalance();
+
         Iterator<VersionChain> it = gcQueue.iterator();
 
         if (!it.hasNext()) {
@@ -530,14 +530,18 @@ public class TestMvPartitionStorage implements 
MvPartitionStorage {
 
     @Override
     public long rowsCount() {
-        checkClosed();
+        checkStorageClosedOrInProcessOfRebalance();
 
         return map.size();
     }
 
     @Override
     public void close() {
+        assert !rebalance;
+
         closed = true;
+
+        clear();
     }
 
     public void destroy() {
@@ -551,9 +555,112 @@ public class TestMvPartitionStorage implements 
MvPartitionStorage {
         gcQueue.clear();
     }
 
-    private void checkClosed() {
+    private void checkStorageClosed() {
         if (closed) {
             throw new StorageClosedException("Storage is already closed");
         }
     }
+
+    private void checkStorageInProcessOfRebalance() {
+        if (rebalance) {
+            throw new StorageRebalanceException("Storage in the process of 
rebalancing");
+        }
+    }
+
+    private void checkStorageClosedOrInProcessOfRebalance() {
+        checkStorageClosed();
+        checkStorageInProcessOfRebalance();
+    }
+
+    void startRebalance() {
+        checkStorageClosed();
+
+        rebalance = true;
+
+        clear();
+
+        lastAppliedIndex = REBALANCE_IN_PROGRESS;
+        lastAppliedTerm = REBALANCE_IN_PROGRESS;
+    }
+
+    void abortRebalance() {
+        checkStorageClosed();
+
+        if (!rebalance) {
+            return;
+        }
+
+        rebalance = false;
+
+        clear();
+
+        lastAppliedIndex = 0;
+        lastAppliedTerm = 0;
+    }
+
+    void finishRebalance(long lastAppliedIndex, long lastAppliedTerm) {
+        checkStorageClosed();
+
+        assert rebalance;
+
+        rebalance = false;
+
+        this.lastAppliedIndex = lastAppliedIndex;
+        this.lastAppliedTerm = lastAppliedTerm;
+    }
+
+    boolean closed() {
+        return closed;
+    }
+
+    private class ScanVersionsCursor implements Cursor<ReadResult> {
+        private final RowId rowId;
+
+        @Nullable
+        private Boolean hasNext;
+
+        @Nullable
+        private VersionChain versionChain;
+
+        private ScanVersionsCursor(RowId rowId) {
+            this.rowId = rowId;
+        }
+
+        @Override
+        public void close() {
+            // No-op.
+        }
+
+        @Override
+        public boolean hasNext() {
+            advanceIfNeeded();
+
+            return hasNext;
+        }
+
+        @Override
+        public ReadResult next() {
+            advanceIfNeeded();
+
+            if (!hasNext) {
+                throw new NoSuchElementException();
+            }
+
+            hasNext = null;
+
+            return versionChainToReadResult(versionChain, false);
+        }
+
+        private void advanceIfNeeded() {
+            checkStorageClosedOrInProcessOfRebalance();
+
+            if (hasNext != null) {
+                return;
+            }
+
+            versionChain = versionChain == null ? map.get(rowId) : 
versionChain.next;
+
+            hasNext = versionChain != null;
+        }
+    }
 }
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
index dc53ea17bb..21c84b1051 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
@@ -20,13 +20,16 @@ package org.apache.ignite.internal.storage.impl;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 
 import java.util.Map;
+import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Stream;
 import org.apache.ignite.internal.schema.configuration.TableConfiguration;
 import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.StorageRebalanceException;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.storage.index.HashIndexDescriptor;
 import org.apache.ignite.internal.storage.index.HashIndexStorage;
@@ -41,9 +44,7 @@ import org.jetbrains.annotations.Nullable;
  * Test table storage implementation.
  */
 public class TestMvTableStorage implements MvTableStorage {
-    private final Map<Integer, MvPartitionStorage> partitions = new 
ConcurrentHashMap<>();
-
-    private final Map<Integer, MvPartitionStorage> backupPartitions = new 
ConcurrentHashMap<>();
+    private final Map<Integer, TestMvPartitionStorage> partitions = new 
ConcurrentHashMap<>();
 
     private final Map<UUID, SortedIndices> sortedIndicesById = new 
ConcurrentHashMap<>();
 
@@ -51,6 +52,8 @@ public class TestMvTableStorage implements MvTableStorage {
 
     private final Map<Integer, CompletableFuture<Void>> 
destroyFutureByPartitionId = new ConcurrentHashMap<>();
 
+    private final Map<Integer, CompletableFuture<Void>> 
rebalanceFutureByPartitionId = new ConcurrentHashMap<>();
+
     private final TableConfiguration tableCfg;
 
     private final TablesConfiguration tablesCfg;
@@ -61,13 +64,13 @@ public class TestMvTableStorage implements MvTableStorage {
     private static class SortedIndices {
         private final SortedIndexDescriptor descriptor;
 
-        final Map<Integer, SortedIndexStorage> storageByPartitionId = new 
ConcurrentHashMap<>();
+        final Map<Integer, TestSortedIndexStorage> storageByPartitionId = new 
ConcurrentHashMap<>();
 
         SortedIndices(SortedIndexDescriptor descriptor) {
             this.descriptor = descriptor;
         }
 
-        SortedIndexStorage getOrCreateStorage(Integer partitionId) {
+        TestSortedIndexStorage getOrCreateStorage(Integer partitionId) {
             return storageByPartitionId.computeIfAbsent(partitionId, id -> new 
TestSortedIndexStorage(descriptor));
         }
     }
@@ -78,13 +81,13 @@ public class TestMvTableStorage implements MvTableStorage {
     private static class HashIndices {
         private final HashIndexDescriptor descriptor;
 
-        final Map<Integer, HashIndexStorage> storageByPartitionId = new 
ConcurrentHashMap<>();
+        final Map<Integer, TestHashIndexStorage> storageByPartitionId = new 
ConcurrentHashMap<>();
 
         HashIndices(HashIndexDescriptor descriptor) {
             this.descriptor = descriptor;
         }
 
-        HashIndexStorage getOrCreateStorage(Integer partitionId) {
+        TestHashIndexStorage getOrCreateStorage(Integer partitionId) {
             return storageByPartitionId.computeIfAbsent(partitionId, id -> new 
TestHashIndexStorage(descriptor));
         }
     }
@@ -110,6 +113,8 @@ public class TestMvTableStorage implements MvTableStorage {
     public CompletableFuture<Void> destroyPartition(int partitionId) {
         checkPartitionId(partitionId);
 
+        assert !rebalanceFutureByPartitionId.containsKey(partitionId);
+
         CompletableFuture<Void> destroyPartitionFuture = new 
CompletableFuture<>();
 
         CompletableFuture<Void> previousDestroyPartitionFuture = 
destroyFutureByPartitionId.putIfAbsent(
@@ -157,7 +162,7 @@ public class TestMvTableStorage implements MvTableStorage {
     @Override
     public SortedIndexStorage getOrCreateSortedIndex(int partitionId, UUID 
indexId) {
         if (!partitions.containsKey(partitionId)) {
-            throw new StorageException("Partition ID " + partitionId + " does 
not exist");
+            throw new 
StorageException(createPartitionDoesNotExistsErrorMessage(partitionId));
         }
 
         SortedIndices sortedIndices = sortedIndicesById.computeIfAbsent(
@@ -171,7 +176,7 @@ public class TestMvTableStorage implements MvTableStorage {
     @Override
     public HashIndexStorage getOrCreateHashIndex(int partitionId, UUID 
indexId) {
         if (!partitions.containsKey(partitionId)) {
-            throw new StorageException("Partition ID " + partitionId + " does 
not exist");
+            throw new 
StorageException(createPartitionDoesNotExistsErrorMessage(partitionId));
         }
 
         HashIndices sortedIndices = hashIndicesById.computeIfAbsent(
@@ -229,34 +234,94 @@ public class TestMvTableStorage implements MvTableStorage 
{
     }
 
     @Override
-    public CompletableFuture<Void> startRebalanceMvPartition(int partitionId) {
-        MvPartitionStorage oldPartitionStorage = partitions.get(partitionId);
+    public CompletableFuture<Void> startRebalancePartition(int partitionId) {
+        checkPartitionId(partitionId);
 
-        assert oldPartitionStorage != null : "Partition does not exist: " + 
partitionId;
+        TestMvPartitionStorage partitionStorage = partitions.get(partitionId);
 
-        if (backupPartitions.putIfAbsent(partitionId, oldPartitionStorage) == 
null) {
-            partitions.put(partitionId, new 
TestMvPartitionStorage(partitionId));
+        if (partitionStorage == null) {
+            throw new 
StorageRebalanceException(createPartitionDoesNotExistsErrorMessage(partitionId));
         }
 
-        return completedFuture(null);
+        if (destroyFutureByPartitionId.containsKey(partitionId)) {
+            throw new StorageRebalanceException("Partition in the process of 
destruction: " + partitionId);
+        }
+
+        if (partitionStorage.closed()) {
+            throw new StorageRebalanceException("Partition closed: " + 
partitionId);
+        }
+
+        CompletableFuture<Void> rebalanceFuture = new CompletableFuture<>();
+
+        if (rebalanceFutureByPartitionId.putIfAbsent(partitionId, 
rebalanceFuture) != null) {
+            throw new StorageRebalanceException("Rebalance for the partition 
is already in progress: " + partitionId);
+        }
+
+        try {
+            partitionStorage.startRebalance();
+
+            
testHashIndexStorageStream(partitionId).forEach(TestHashIndexStorage::startRebalance);
+
+            
testSortedIndexStorageStream(partitionId).forEach(TestSortedIndexStorage::startRebalance);
+
+            rebalanceFuture.complete(null);
+        } catch (Throwable t) {
+            rebalanceFuture.completeExceptionally(t);
+        }
+
+        return rebalanceFuture;
     }
 
     @Override
-    public CompletableFuture<Void> abortRebalanceMvPartition(int partitionId) {
-        MvPartitionStorage oldPartitionStorage = 
backupPartitions.remove(partitionId);
+    public CompletableFuture<Void> abortRebalancePartition(int partitionId) {
+        checkPartitionId(partitionId);
+
+        CompletableFuture<Void> rebalanceFuture = 
rebalanceFutureByPartitionId.remove(partitionId);
 
-        if (oldPartitionStorage != null) {
-            partitions.put(partitionId, oldPartitionStorage);
+        if (rebalanceFuture == null) {
+            return completedFuture(null);
         }
 
-        return completedFuture(null);
+        TestMvPartitionStorage partitionStorage = partitions.get(partitionId);
+
+        if (partitionStorage == null) {
+            throw new 
StorageRebalanceException(createPartitionDoesNotExistsErrorMessage(partitionId));
+        }
+
+        return rebalanceFuture
+                .thenAccept(unused -> {
+                    partitionStorage.abortRebalance();
+
+                    
testHashIndexStorageStream(partitionId).forEach(TestHashIndexStorage::abortRebalance);
+
+                    
testSortedIndexStorageStream(partitionId).forEach(TestSortedIndexStorage::abortRebalance);
+                });
     }
 
     @Override
-    public CompletableFuture<Void> finishRebalanceMvPartition(int partitionId) 
{
-        backupPartitions.remove(partitionId);
+    public CompletableFuture<Void> finishRebalancePartition(int partitionId, 
long lastAppliedIndex, long lastAppliedTerm) {
+        checkPartitionId(partitionId);
 
-        return completedFuture(null);
+        CompletableFuture<Void> rebalanceFuture = 
rebalanceFutureByPartitionId.remove(partitionId);
+
+        if (rebalanceFuture == null) {
+            throw new StorageRebalanceException("Rebalance for the partition 
did not start: " + partitionId);
+        }
+
+        TestMvPartitionStorage partitionStorage = partitions.get(partitionId);
+
+        if (partitionStorage == null) {
+            throw new 
StorageRebalanceException(createPartitionDoesNotExistsErrorMessage(partitionId));
+        }
+
+        return rebalanceFuture
+                .thenAccept(unused -> {
+                    partitionStorage.finishRebalance(lastAppliedIndex, 
lastAppliedTerm);
+
+                    
testHashIndexStorageStream(partitionId).forEach(TestHashIndexStorage::finishRebalance);
+
+                    
testSortedIndexStorageStream(partitionId).forEach(TestSortedIndexStorage::finishRebalance);
+                });
     }
 
     private void checkPartitionId(int partitionId) {
@@ -271,4 +336,20 @@ public class TestMvTableStorage implements MvTableStorage {
             ));
         }
     }
+
+    private static String createPartitionDoesNotExistsErrorMessage(int 
partitionId) {
+        return "Partition ID " + partitionId + " does not exist";
+    }
+
+    private Stream<TestHashIndexStorage> testHashIndexStorageStream(Integer 
partitionId) {
+        return hashIndicesById.values().stream()
+                .map(hashIndices -> 
hashIndices.storageByPartitionId.get(partitionId))
+                .filter(Objects::nonNull);
+    }
+
+    private Stream<TestSortedIndexStorage> 
testSortedIndexStorageStream(Integer partitionId) {
+        return sortedIndicesById.values().stream()
+                .map(hashIndices -> 
hashIndices.storageByPartitionId.get(partitionId))
+                .filter(Objects::nonNull);
+    }
 }
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestHashIndexStorage.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestHashIndexStorage.java
index a15deb0656..4bc9f6b2d9 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestHashIndexStorage.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestHashIndexStorage.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageClosedException;
+import org.apache.ignite.internal.storage.StorageRebalanceException;
 import org.apache.ignite.internal.storage.index.HashIndexDescriptor;
 import org.apache.ignite.internal.storage.index.HashIndexStorage;
 import org.apache.ignite.internal.storage.index.IndexRow;
@@ -43,6 +44,8 @@ public class TestHashIndexStorage implements HashIndexStorage 
{
 
     private volatile boolean closed;
 
+    private volatile boolean rebalance;
+
     /**
      * Constructor.
      */
@@ -57,18 +60,35 @@ public class TestHashIndexStorage implements 
HashIndexStorage {
 
     @Override
     public Cursor<RowId> get(BinaryTuple key) {
-        checkClosed();
+        checkStorageClosedOrInProcessOfRebalance();
+
+        Iterator<RowId> iterator = index.getOrDefault(key.byteBuffer(), 
Set.of()).iterator();
 
-        Iterator<RowId> iterator = index.getOrDefault(key.byteBuffer(), 
Set.of()).stream()
-                .peek(rowId -> checkClosed())
-                .iterator();
+        return new Cursor<>() {
+            @Override
+            public void close() {
+                // No-op.
+            }
 
-        return Cursor.fromBareIterator(iterator);
+            @Override
+            public boolean hasNext() {
+                checkStorageClosedOrInProcessOfRebalance();
+
+                return iterator.hasNext();
+            }
+
+            @Override
+            public RowId next() {
+                checkStorageClosedOrInProcessOfRebalance();
+
+                return iterator.next();
+            }
+        };
     }
 
     @Override
     public void put(IndexRow row) {
-        checkClosed();
+        checkStorageClosed();
 
         index.compute(row.indexColumns().byteBuffer(), (k, v) -> {
             if (v == null) {
@@ -88,7 +108,7 @@ public class TestHashIndexStorage implements 
HashIndexStorage {
 
     @Override
     public void remove(IndexRow row) {
-        checkClosed();
+        checkStorageClosedOrInProcessOfRebalance();
 
         index.computeIfPresent(row.indexColumns().byteBuffer(), (k, v) -> {
             if (v.contains(row.rowId())) {
@@ -111,7 +131,7 @@ public class TestHashIndexStorage implements 
HashIndexStorage {
     public void destroy() {
         closed = true;
 
-        index.clear();
+        clear();
     }
 
     /**
@@ -121,9 +141,54 @@ public class TestHashIndexStorage implements 
HashIndexStorage {
         index.clear();
     }
 
-    private void checkClosed() {
+    private void checkStorageClosed() {
         if (closed) {
             throw new StorageClosedException("Storage is already closed");
         }
     }
+
+    private void checkStorageClosedOrInProcessOfRebalance() {
+        checkStorageClosed();
+
+        if (rebalance) {
+            throw new StorageRebalanceException("Storage in the process of 
rebalancing");
+        }
+    }
+
+    /**
+     * Starts rebalancing of the storage.
+     */
+    public void startRebalance() {
+        checkStorageClosed();
+
+        rebalance = true;
+
+        clear();
+    }
+
+    /**
+     * Aborts rebalance of the storage.
+     */
+    public void abortRebalance() {
+        checkStorageClosed();
+
+        if (!rebalance) {
+            return;
+        }
+
+        rebalance = false;
+
+        clear();
+    }
+
+    /**
+     * Completes rebalance of the storage.
+     */
+    public void finishRebalance() {
+        checkStorageClosed();
+
+        assert rebalance;
+
+        rebalance = false;
+    }
 }
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
index 8286ecdc59..fa7f567c7c 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.schema.BinaryTuplePrefix;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageClosedException;
 import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.StorageRebalanceException;
 import org.apache.ignite.internal.storage.index.BinaryTupleComparator;
 import org.apache.ignite.internal.storage.index.IndexRow;
 import org.apache.ignite.internal.storage.index.IndexRowImpl;
@@ -58,6 +59,8 @@ public class TestSortedIndexStorage implements 
SortedIndexStorage {
 
     private volatile boolean closed;
 
+    private volatile boolean rebalance;
+
     /**
      * Constructor.
      */
@@ -73,7 +76,7 @@ public class TestSortedIndexStorage implements 
SortedIndexStorage {
 
     @Override
     public Cursor<RowId> get(BinaryTuple key) throws StorageException {
-        checkClosed();
+        checkStorageClosedOrInProcessOfRebalance();
 
         Iterator<RowId> iterator = index.getOrDefault(key.byteBuffer(), 
emptyNavigableMap()).keySet().iterator();
 
@@ -85,14 +88,14 @@ public class TestSortedIndexStorage implements 
SortedIndexStorage {
 
             @Override
             public boolean hasNext() {
-                checkClosed();
+                checkStorageClosedOrInProcessOfRebalance();
 
                 return iterator.hasNext();
             }
 
             @Override
             public RowId next() {
-                checkClosed();
+                checkStorageClosedOrInProcessOfRebalance();
 
                 return iterator.next();
             }
@@ -101,7 +104,7 @@ public class TestSortedIndexStorage implements 
SortedIndexStorage {
 
     @Override
     public void put(IndexRow row) {
-        checkClosed();
+        checkStorageClosed();
 
         index.compute(row.indexColumns().byteBuffer(), (k, v) -> {
             NavigableMap<RowId, Object> rowIds = v == null ? new 
ConcurrentSkipListMap<>() : v;
@@ -114,7 +117,7 @@ public class TestSortedIndexStorage implements 
SortedIndexStorage {
 
     @Override
     public void remove(IndexRow row) {
-        checkClosed();
+        checkStorageClosedOrInProcessOfRebalance();
 
         index.computeIfPresent(row.indexColumns().byteBuffer(), (k, v) -> {
             v.remove(row.rowId());
@@ -129,7 +132,7 @@ public class TestSortedIndexStorage implements 
SortedIndexStorage {
             @Nullable BinaryTuplePrefix upperBound,
             int flags
     ) {
-        checkClosed();
+        checkStorageClosedOrInProcessOfRebalance();
 
         boolean includeLower = (flags & GREATER_OR_EQUAL) != 0;
         boolean includeUpper = (flags & LESS_OR_EQUAL) != 0;
@@ -175,7 +178,7 @@ public class TestSortedIndexStorage implements 
SortedIndexStorage {
     public void destroy() {
         closed = true;
 
-        index.clear();
+        clear();
     }
 
     /**
@@ -185,12 +188,6 @@ public class TestSortedIndexStorage implements 
SortedIndexStorage {
         index.clear();
     }
 
-    private void checkClosed() {
-        if (closed) {
-            throw new StorageClosedException("Storage is already closed");
-        }
-    }
-
     private class ScanCursor implements PeekCursor<IndexRow> {
         private final NavigableMap<ByteBuffer, NavigableMap<RowId, Object>> 
indexMap;
 
@@ -214,7 +211,7 @@ public class TestSortedIndexStorage implements 
SortedIndexStorage {
 
         @Override
         public boolean hasNext() {
-            checkClosed();
+            checkStorageClosedOrInProcessOfRebalance();
 
             advanceIfNeeded();
 
@@ -223,7 +220,7 @@ public class TestSortedIndexStorage implements 
SortedIndexStorage {
 
         @Override
         public IndexRow next() {
-            checkClosed();
+            checkStorageClosedOrInProcessOfRebalance();
 
             advanceIfNeeded();
 
@@ -240,6 +237,8 @@ public class TestSortedIndexStorage implements 
SortedIndexStorage {
 
         @Override
         public @Nullable IndexRow peek() {
+            checkStorageClosedOrInProcessOfRebalance();
+
             if (hasNext != null) {
                 if (hasNext) {
                     return new IndexRowImpl(new 
BinaryTuple(descriptor.binaryTupleSchema(), currentEntry.getKey()), rowId);
@@ -315,4 +314,55 @@ public class TestSortedIndexStorage implements 
SortedIndexStorage {
             return rowIdEntry == null ? null : rowIdEntry.getKey();
         }
     }
+
+    private void checkStorageClosed() {
+        if (closed) {
+            throw new StorageClosedException("Storage is already closed");
+        }
+    }
+
+    private void checkStorageClosedOrInProcessOfRebalance() {
+        checkStorageClosed();
+
+        if (rebalance) {
+            throw new StorageRebalanceException("Storage in the process of 
rebalancing");
+        }
+    }
+
+    /**
+     * Starts rebalancing for the storage.
+     */
+    public void startRebalance() {
+        checkStorageClosed();
+
+        rebalance = true;
+
+        clear();
+    }
+
+    /**
+     * Aborts rebalance of the storage.
+     */
+    public void abortRebalance() {
+        checkStorageClosed();
+
+        if (!rebalance) {
+            return;
+        }
+
+        rebalance = false;
+
+        clear();
+    }
+
+    /**
+     * Completes rebalance of the storage.
+     */
+    public void finishRebalance() {
+        checkStorageClosed();
+
+        assert rebalance;
+
+        rebalance = false;
+    }
 }
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
index 179efbe20a..5209d25ef0 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
@@ -439,19 +439,19 @@ public class PersistentPageMemoryTableStorage extends 
AbstractPageMemoryTableSto
     }
 
     @Override
-    public CompletableFuture<Void> startRebalanceMvPartition(int partitionId) {
+    public CompletableFuture<Void> startRebalancePartition(int partitionId) {
         // TODO: IGNITE-18029 Implement
         throw new UnsupportedOperationException();
     }
 
     @Override
-    public CompletableFuture<Void> abortRebalanceMvPartition(int partitionId) {
+    public CompletableFuture<Void> abortRebalancePartition(int partitionId) {
         // TODO: IGNITE-18029 Implement
         throw new UnsupportedOperationException();
     }
 
     @Override
-    public CompletableFuture<Void> finishRebalanceMvPartition(int partitionId) 
{
+    public CompletableFuture<Void> finishRebalancePartition(int partitionId, 
long lastAppliedIndex, long lastAppliedTerm) {
         // TODO: IGNITE-18029 Implement
         throw new UnsupportedOperationException();
     }
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
index f3130b32f9..c4755903d8 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
@@ -147,19 +147,19 @@ public class VolatilePageMemoryTableStorage extends 
AbstractPageMemoryTableStora
     }
 
     @Override
-    public CompletableFuture<Void> startRebalanceMvPartition(int partitionId) {
+    public CompletableFuture<Void> startRebalancePartition(int partitionId) {
         // TODO: IGNITE-18028 Implement
         throw new UnsupportedOperationException();
     }
 
     @Override
-    public CompletableFuture<Void> abortRebalanceMvPartition(int partitionId) {
+    public CompletableFuture<Void> abortRebalancePartition(int partitionId) {
         // TODO: IGNITE-18028 Implement
         throw new UnsupportedOperationException();
     }
 
     @Override
-    public CompletableFuture<Void> finishRebalanceMvPartition(int partitionId) 
{
+    public CompletableFuture<Void> finishRebalancePartition(int partitionId, 
long lastAppliedIndex, long lastAppliedTerm) {
         // TODO: IGNITE-18028 Implement
         throw new UnsupportedOperationException();
     }
diff --git 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
index 04e6b403ad..c65cec3e05 100644
--- 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
+++ 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
@@ -79,33 +79,33 @@ public class PersistentPageMemoryMvTableStorageTest extends 
AbstractMvTableStora
         );
     }
 
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18029";)
+    @Test
     @Override
-    public void testStartRebalanceMvPartition() throws Exception {
-        super.testStartRebalanceMvPartition();
+    public void testDestroyPartition() throws Exception {
+        super.testDestroyPartition();
+
+        // Let's make sure that the checkpoint doesn't fail.
+        engine.checkpointManager()
+                .forceCheckpoint("after-test-destroy-partition")
+                .futureFor(CheckpointState.FINISHED)
+                .get(1, TimeUnit.SECONDS);
     }
 
     @Disabled("https://issues.apache.org/jira/browse/IGNITE-18029";)
     @Override
-    public void testAbortRebalanceMvPartition() throws Exception {
-        super.testAbortRebalanceMvPartition();
+    public void testSuccessRebalance() throws Exception {
+        super.testSuccessRebalance();
     }
 
     @Disabled("https://issues.apache.org/jira/browse/IGNITE-18029";)
     @Override
-    public void testFinishRebalanceMvPartition() throws Exception {
-        super.testFinishRebalanceMvPartition();
+    public void testFailRebalance() throws Exception {
+        super.testFailRebalance();
     }
 
-    @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18029";)
     @Override
-    public void testDestroyPartition() throws Exception {
-        super.testDestroyPartition();
-
-        // Let's make sure that the checkpoint doesn't fail.
-        engine.checkpointManager()
-                .forceCheckpoint("after-test-destroy-partition")
-                .futureFor(CheckpointState.FINISHED)
-                .get(1, TimeUnit.SECONDS);
+    public void testStartRebalanceForClosedPartition() {
+        super.testStartRebalanceForClosedPartition();
     }
 }
diff --git 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryMvTableStorageTest.java
 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryMvTableStorageTest.java
index ddd58c47cd..1d37235ef3 100644
--- 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryMvTableStorageTest.java
+++ 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryMvTableStorageTest.java
@@ -71,33 +71,33 @@ public class VolatilePageMemoryMvTableStorageTest extends 
AbstractMvTableStorage
         );
     }
 
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18028";)
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17833";)
     @Override
-    public void testStartRebalanceMvPartition() throws Exception {
-        super.testStartRebalanceMvPartition();
+    public void testDestroyPartition() throws Exception {
+        super.testDestroyPartition();
     }
 
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18028";)
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17833";)
     @Override
-    public void testAbortRebalanceMvPartition() throws Exception {
-        super.testAbortRebalanceMvPartition();
+    public void testReCreatePartition() throws Exception {
+        super.testReCreatePartition();
     }
 
     @Disabled("https://issues.apache.org/jira/browse/IGNITE-18028";)
     @Override
-    public void testFinishRebalanceMvPartition() throws Exception {
-        super.testFinishRebalanceMvPartition();
+    public void testSuccessRebalance() throws Exception {
+        super.testSuccessRebalance();
     }
 
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17833";)
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18028";)
     @Override
-    public void testDestroyPartition() throws Exception {
-        super.testDestroyPartition();
+    public void testFailRebalance() throws Exception {
+        super.testFailRebalance();
     }
 
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17833";)
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18028";)
     @Override
-    public void testReCreatePartition() throws Exception {
-        super.testReCreatePartition();
+    public void testStartRebalanceForClosedPartition() {
+        super.testStartRebalanceForClosedPartition();
     }
 }
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index c2081c379e..f0c1c948bc 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -600,19 +600,19 @@ public class RocksDbTableStorage implements 
MvTableStorage {
     }
 
     @Override
-    public CompletableFuture<Void> startRebalanceMvPartition(int partitionId) {
+    public CompletableFuture<Void> startRebalancePartition(int partitionId) {
         // TODO: IGNITE-18027 Implement
         throw new UnsupportedOperationException();
     }
 
     @Override
-    public CompletableFuture<Void> abortRebalanceMvPartition(int partitionId) {
+    public CompletableFuture<Void> abortRebalancePartition(int partitionId) {
         // TODO: IGNITE-18027 Implement
         throw new UnsupportedOperationException();
     }
 
     @Override
-    public CompletableFuture<Void> finishRebalanceMvPartition(int partitionId) 
{
+    public CompletableFuture<Void> finishRebalancePartition(int partitionId, 
long lastAppliedIndex, long lastAppliedTerm) {
         // TODO: IGNITE-18027 Implement
         throw new UnsupportedOperationException();
     }
diff --git 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
index 0272c65f79..fe2ca44c67 100644
--- 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
+++ 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
@@ -153,33 +153,33 @@ public class RocksDbMvTableStorageTest extends 
AbstractMvTableStorageTest {
         assertThat(tableStorage.isVolatile(), is(false));
     }
 
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18027";)
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18180";)
     @Override
-    public void testStartRebalanceMvPartition() throws Exception {
-        super.testStartRebalanceMvPartition();
+    public void testDestroyPartition() throws Exception {
+        super.testDestroyPartition();
     }
 
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18027";)
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18180";)
     @Override
-    public void testAbortRebalanceMvPartition() throws Exception {
-        super.testAbortRebalanceMvPartition();
+    public void testReCreatePartition() throws Exception {
+        super.testReCreatePartition();
     }
 
     @Disabled("https://issues.apache.org/jira/browse/IGNITE-18027";)
     @Override
-    public void testFinishRebalanceMvPartition() throws Exception {
-        super.testFinishRebalanceMvPartition();
+    public void testSuccessRebalance() throws Exception {
+        super.testSuccessRebalance();
     }
 
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18180";)
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18027";)
     @Override
-    public void testDestroyPartition() throws Exception {
-        super.testDestroyPartition();
+    public void testFailRebalance() throws Exception {
+        super.testFailRebalance();
     }
 
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18180";)
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18027";)
     @Override
-    public void testReCreatePartition() throws Exception {
-        super.testReCreatePartition();
+    public void testStartRebalanceForClosedPartition() {
+        super.testStartRebalanceForClosedPartition();
     }
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java
index d4b9a7161b..e5e03e5300 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java
@@ -170,8 +170,8 @@ public interface TxStateStorage extends ManuallyCloseable {
      *
      * <p>After calling this method, methods for writing and reading will be 
available.
      *
-     * <p>If a full rebalance has not started, then an IgniteInternalException 
with {@link Transactions#TX_STATE_STORAGE_FULL_REBALANCE_ERR}
-     * will be thrown
+     * <p>If a full rebalance has not started, then an {@link 
IgniteInternalException} with
+     * {@link Transactions#TX_STATE_STORAGE_FULL_REBALANCE_ERR} will be thrown.
      *
      * @param lastAppliedIndex Last applied index.
      * @param lastAppliedTerm Last applied term.


Reply via email to