This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 10cc703063 IGNITE-18073 Update the API for a full rebalance of
MvPartitionStorage and indexes (#1471)
10cc703063 is described below
commit 10cc703063d18cfe6fa820aa22ac0ea4e39c9a76
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Tue Dec 27 15:51:14 2022 +0300
IGNITE-18073 Update the API for a full rebalance of MvPartitionStorage and
indexes (#1471)
---
modules/storage-api/build.gradle | 1 +
.../internal/storage/MvPartitionStorage.java | 9 +
.../internal/storage/StorageClosedException.java | 2 +-
...ception.java => StorageRebalanceException.java} | 10 +-
.../internal/storage/engine/MvTableStorage.java | 93 +++--
.../storage/AbstractMvTableStorageTest.java | 442 ++++++++++++++++-----
.../internal/storage/BaseMvStoragesTest.java | 23 +-
.../storage/impl/TestMvPartitionStorage.java | 159 ++++++--
.../internal/storage/impl/TestMvTableStorage.java | 127 ++++--
.../storage/index/impl/TestHashIndexStorage.java | 83 +++-
.../storage/index/impl/TestSortedIndexStorage.java | 80 +++-
.../PersistentPageMemoryTableStorage.java | 6 +-
.../pagememory/VolatilePageMemoryTableStorage.java | 6 +-
.../PersistentPageMemoryMvTableStorageTest.java | 32 +-
.../VolatilePageMemoryMvTableStorageTest.java | 28 +-
.../storage/rocksdb/RocksDbTableStorage.java | 6 +-
.../storage/rocksdb/RocksDbMvTableStorageTest.java | 28 +-
.../internal/tx/storage/state/TxStateStorage.java | 4 +-
18 files changed, 864 insertions(+), 275 deletions(-)
diff --git a/modules/storage-api/build.gradle b/modules/storage-api/build.gradle
index 8cb950c654..45e4ddf38b 100644
--- a/modules/storage-api/build.gradle
+++ b/modules/storage-api/build.gradle
@@ -58,6 +58,7 @@ dependencies {
testFixturesImplementation libs.junit5.impl
testFixturesImplementation libs.junit5.params
testFixturesImplementation libs.auto.service.annotations
+ testFixturesImplementation libs.mockito.core
}
description = 'ignite-storage-api'
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
index 59cca1ba64..1898308bcc 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
@@ -39,6 +39,13 @@ import org.jetbrains.annotations.Nullable;
* {@link RowId#compareTo} comparison order.
*/
public interface MvPartitionStorage extends ManuallyCloseable {
+ /**
+ * Value of the {@link #lastAppliedIndex()} and {@link #lastAppliedTerm()}
during rebalance of transaction state storage.
+ *
+ * <p>Allows to determine on a node restart that rebalance has not been
completed and storage should be cleared before using it.
+ */
+ long REBALANCE_IN_PROGRESS = -1;
+
/**
* Closure for executing write operations on the storage.
*
@@ -233,6 +240,8 @@ public interface MvPartitionStorage extends
ManuallyCloseable {
/**
* Closes the storage.
+ *
+ * <p>REQUIRED: For background tasks for partition, such as rebalancing,
to be completed by the time the method is called.
*/
@Override
void close();
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageClosedException.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageClosedException.java
index dfd45814c4..6abccd774c 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageClosedException.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageClosedException.java
@@ -18,7 +18,7 @@
package org.apache.ignite.internal.storage;
/**
- * Exception that will be thrown when the storage is closed.
+ * Exception that will be thrown when the storage is closed.
*/
public class StorageClosedException extends StorageException {
/**
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageClosedException.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageRebalanceException.java
similarity index 78%
copy from
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageClosedException.java
copy to
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageRebalanceException.java
index dfd45814c4..6a99a8003a 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageClosedException.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageRebalanceException.java
@@ -18,15 +18,15 @@
package org.apache.ignite.internal.storage;
/**
- * Exception that will be thrown when the storage is closed.
+ * Exception that will be thrown when the storage is in the process of
rebalance.
*/
-public class StorageClosedException extends StorageException {
+public class StorageRebalanceException extends StorageException {
/**
* Constructor.
*
* @param message Error message.
*/
- public StorageClosedException(String message) {
+ public StorageRebalanceException(String message) {
super(message);
}
@@ -36,7 +36,7 @@ public class StorageClosedException extends StorageException {
* @param message Error message.
* @param cause The cause.
*/
- public StorageClosedException(String message, Throwable cause) {
+ public StorageRebalanceException(String message, Throwable cause) {
super(message, cause);
}
@@ -45,7 +45,7 @@ public class StorageClosedException extends StorageException {
*
* @param cause The cause.
*/
- public StorageClosedException(Throwable cause) {
+ public StorageRebalanceException(Throwable cause) {
super(cause);
}
}
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
index 8afef64ac9..a2a964ecf1 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
@@ -24,14 +24,20 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.close.ManuallyCloseable;
import org.apache.ignite.internal.configuration.util.ConfigurationUtil;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.configuration.TableConfiguration;
import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import
org.apache.ignite.internal.schema.configuration.index.TableIndexConfiguration;
import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.StorageRebalanceException;
import org.apache.ignite.internal.storage.index.HashIndexStorage;
+import org.apache.ignite.internal.storage.index.IndexRow;
import org.apache.ignite.internal.storage.index.IndexStorage;
import org.apache.ignite.internal.storage.index.SortedIndexStorage;
+import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.Nullable;
/**
@@ -60,6 +66,8 @@ public interface MvTableStorage extends ManuallyCloseable {
/**
* Destroys a partition and all associated indices.
*
+ * <p>REQUIRED: For background tasks for partition, such as rebalancing,
to be completed by the time the method is called.
+ *
* @param partitionId Partition ID.
* @return Future that will complete when the destroy of the partition is
completed.
* @throws IllegalArgumentException If Partition ID is out of bounds.
@@ -165,43 +173,76 @@ public interface MvTableStorage extends ManuallyCloseable
{
CompletableFuture<Void> destroy();
/**
- * Prepares the partition storage for rebalancing: makes a backup of the
current partition storage and creates a new storage.
- *
- * <p>This method must be called before every full rebalance of the
partition storage, so that in case of errors or cancellation of the
- * full rebalance, we can restore the partition storage from the backup.
- *
- * <p>Full rebalance will be completed when one of the methods is called:
- * <ol>
- * <li>{@link #abortRebalanceMvPartition(int)} - in case of a full
rebalance cancellation or failure, so that we can
- * restore the partition storage from a backup;</li>
- * <li>{@link #finishRebalanceMvPartition(int)} - in case of a
successful full rebalance, to remove the backup of the
- * partition storage.</li>
- * </ol>
+ * Prepares a partition for rebalance.
+ * <ul>
+ * <li>Cleans up the {@link MvPartitionStorage multi-version partition
storage} and its associated indexes ({@link HashIndexStorage}
+ * and {@link SortedIndexStorage});</li>
+ * <li>Sets {@link MvPartitionStorage#lastAppliedIndex()} and {@link
MvPartitionStorage#lastAppliedTerm()} to
+ * {@link MvPartitionStorage#REBALANCE_IN_PROGRESS};</li>
+ * <li>Stops the cursors of a multi-version partition storage and its
indexes, subsequent calls to {@link Cursor#hasNext()} and
+ * {@link Cursor#next()} will throw {@link
StorageRebalanceException};</li>
+ * <li>For a multi-version partition storage and its indexes, methods
for reading and writing data will throw
+ * {@link StorageRebalanceException} except:<ul>
+ * <li>{@link MvPartitionStorage#addWrite(RowId, BinaryRow, UUID,
UUID, int)};</li>
+ * <li>{@link MvPartitionStorage#commitWrite(RowId,
HybridTimestamp)};</li>
+ * <li>{@link MvPartitionStorage#addWriteCommitted(RowId,
BinaryRow, HybridTimestamp)};</li>
+ * <li>{@link MvPartitionStorage#lastAppliedIndex()};</li>
+ * <li>{@link MvPartitionStorage#lastAppliedTerm()};</li>
+ * <li>{@link MvPartitionStorage#persistedIndex()};</li>
+ * <li>{@link HashIndexStorage#put(IndexRow)};</li>
+ * <li>{@link SortedIndexStorage#put(IndexRow)};</li>
+ * </ul></li>
+ * </ul>
+ *
+ * <p>This method must be called before every rebalance of a multi-version
partition storage and its indexes and ends with a call
+ * to one of the methods:
+ * <ul>
+ * <li>{@link #abortRebalancePartition(int)} ()} - in case of errors
or cancellation of rebalance;</li>
+ * <li>{@link #finishRebalancePartition(int, long, long)} - in case of
successful completion of rebalance.</li>
+ * </ul>
+ *
+ * <p>If the {@link MvPartitionStorage#lastAppliedIndex()} is {@link
MvPartitionStorage#REBALANCE_IN_PROGRESS} after a node restart
+ * , then a multi-version partition storage and its indexes needs to be
cleared before they start.
+ *
+ * <p>If the partition started to be destroyed or closed, then there will
be an error when trying to start rebalancing.
*
* @param partitionId Partition ID.
- * @return Future, if completed without errors, then {@link
#getMvPartition} will return a new (empty) partition storage.
+ * @return Future of the start rebalance for a multi-version partition
storage and its indexes.
+ * @throws IllegalArgumentException If Partition ID is out of bounds.
+ * @throws StorageRebalanceException If there is an error when starting
rebalance.
*/
- CompletableFuture<Void> startRebalanceMvPartition(int partitionId);
+ CompletableFuture<Void> startRebalancePartition(int partitionId);
/**
- * Aborts rebalancing of the partition storage if it was started: restores
the partition storage from a backup and deletes the new
- * storage.
+ * Aborts rebalance for a partition.
+ * <ul>
+ * <li>Cleans up the {@link MvPartitionStorage multi-version partition
storage} and its associated indexes ({@link HashIndexStorage}
+ * and {@link SortedIndexStorage});</li>
+ * <li>Sets {@link MvPartitionStorage#lastAppliedIndex()} and {@link
MvPartitionStorage#lastAppliedTerm()} to {@code 0};</li>
+ * <li>For a multi-version partition storage and its indexes, methods
for writing and reading will be available.</li>
+ * </ul>
*
- * <p>If a full rebalance has not been {@link
#startRebalanceMvPartition(int) started}, then nothing will happen.
+ * <p>If rebalance has not started, then nothing will happen.
*
- * @param partitionId Partition ID.
- * @return Future, upon completion of which {@link #getMvPartition} will
return the partition storage restored from the backup.
+ * @return Future of the abort rebalance for a multi-version partition
storage and its indexes.
+ * @throws IllegalArgumentException If Partition ID is out of bounds.
*/
- CompletableFuture<Void> abortRebalanceMvPartition(int partitionId);
+ CompletableFuture<Void> abortRebalancePartition(int partitionId);
/**
- * Finishes a successful partition storage rebalance if it has been
started: deletes the backup of the partition storage and saves a new
- * storage.
+ * Completes rebalance for a partition.
+ * <ul>
+ * <li>Updates {@link MvPartitionStorage#lastAppliedIndex()} and
{@link MvPartitionStorage#lastAppliedTerm()};</li>
+ * <li>For a multi-version partition storage and its indexes, methods
for writing and reading will be available.</li>
+ * </ul>
*
- * <p>If a full rebalance has not been {@link
#startRebalanceMvPartition(int) started}, then nothing will happen.
+ * <p>If rebalance has not started, then {@link StorageRebalanceException}
will be thrown.
*
- * @param partitionId Partition ID.
- * @return Future, if it fails, will abort the partition storage rebalance.
+ * @param lastAppliedIndex Last applied index.
+ * @param lastAppliedTerm Last applied term.
+ * @return Future of the finish rebalance for a multi-version partition
storage and its indexes.
+ * @throws IllegalArgumentException If Partition ID is out of bounds.
+ * @throws StorageRebalanceException If there is an error when completing
rebalance.
*/
- CompletableFuture<Void> finishRebalanceMvPartition(int partitionId);
+ CompletableFuture<Void> finishRebalancePartition(int partitionId, long
lastAppliedIndex, long lastAppliedTerm);
}
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
index a7789feb33..9c7789baa1 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
@@ -17,11 +17,17 @@
package org.apache.ignite.internal.storage;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.stream.Collectors.toList;
+import static
org.apache.ignite.internal.storage.MvPartitionStorage.REBALANCE_IN_PROGRESS;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
@@ -29,24 +35,19 @@ import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
import org.apache.ignite.internal.configuration.util.ConfigurationUtil;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryTuple;
-import org.apache.ignite.internal.schema.BinaryTuplePrefix;
import org.apache.ignite.internal.schema.BinaryTupleSchema;
import org.apache.ignite.internal.schema.BinaryTupleSchema.Element;
import org.apache.ignite.internal.schema.NativeTypes;
@@ -59,12 +60,14 @@ import
org.apache.ignite.internal.schema.testutils.definition.ColumnType;
import org.apache.ignite.internal.schema.testutils.definition.TableDefinition;
import
org.apache.ignite.internal.schema.testutils.definition.index.IndexDefinition;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
import org.apache.ignite.internal.storage.index.HashIndexStorage;
import org.apache.ignite.internal.storage.index.IndexRow;
import org.apache.ignite.internal.storage.index.IndexRowImpl;
import org.apache.ignite.internal.storage.index.IndexStorage;
import org.apache.ignite.internal.storage.index.SortedIndexStorage;
import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.IgniteTuple3;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
@@ -125,12 +128,12 @@ public abstract class AbstractMvTableStorageTest extends
BaseMvStoragesTest {
* Tests that partition data does not overlap.
*/
@Test
- void testPartitionIndependence() {
+ void testPartitionIndependence() throws Exception {
MvPartitionStorage partitionStorage0 =
tableStorage.getOrCreateMvPartition(PARTITION_ID_0);
// Using a shifted ID value to test a multibyte scenario.
MvPartitionStorage partitionStorage1 =
tableStorage.getOrCreateMvPartition(PARTITION_ID_1);
- var testData0 = binaryRow(new TestKey(1, "1"), new TestValue(10,
"10"));
+ var testData0 = binaryRow(new TestKey(1, "0"), new TestValue(10,
"10"));
UUID txId = UUID.randomUUID();
@@ -151,6 +154,7 @@ public abstract class AbstractMvTableStorageTest extends
BaseMvStoragesTest {
assertThat(unwrap(partitionStorage1.read(rowId1,
HybridTimestamp.MAX_VALUE)), is(equalTo(unwrap(testData1))));
assertThat(drainToList(partitionStorage0.scan(HybridTimestamp.MAX_VALUE)),
contains(unwrap(testData0)));
+
assertThat(drainToList(partitionStorage1.scan(HybridTimestamp.MAX_VALUE)),
contains(unwrap(testData1)));
}
@@ -217,7 +221,7 @@ public abstract class AbstractMvTableStorageTest extends
BaseMvStoragesTest {
}
@Test
- public void testHashIndexIndependence() throws Exception {
+ public void testHashIndexIndependence() {
MvPartitionStorage partitionStorage1 =
tableStorage.getOrCreateMvPartition(PARTITION_ID);
assertThat(tableStorage.getOrCreateHashIndex(PARTITION_ID,
hashIdx.id()), is(notNullValue()));
@@ -313,84 +317,9 @@ public abstract class AbstractMvTableStorageTest extends
BaseMvStoragesTest {
);
}
- @Test
- public void testStartRebalanceMvPartition() throws Exception {
- MvPartitionStorage partitionStorage =
tableStorage.getOrCreateMvPartition(PARTITION_ID);
-
- partitionStorage.runConsistently(() -> {
- partitionStorage.addWriteCommitted(
- new RowId(PARTITION_ID),
- binaryRow(new TestKey(0, "0"), new TestValue(1, "1")),
- clock.now()
- );
-
- partitionStorage.lastApplied(100, 10);
-
- partitionStorage.committedGroupConfiguration(new
RaftGroupConfiguration(List.of("peer"), List.of("learner"), null, null));
-
- return null;
- });
-
- partitionStorage.flush().get(1, TimeUnit.SECONDS);
-
- tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1,
TimeUnit.SECONDS);
-
- MvPartitionStorage newPartitionStorage0 =
tableStorage.getMvPartition(PARTITION_ID);
-
- assertNotNull(newPartitionStorage0);
- assertNotSame(partitionStorage, newPartitionStorage0);
-
- assertEquals(0L, newPartitionStorage0.lastAppliedIndex());
- assertEquals(0L, newPartitionStorage0.lastAppliedTerm());
- assertNull(newPartitionStorage0.committedGroupConfiguration());
- assertEquals(0L, newPartitionStorage0.persistedIndex());
- assertEquals(0, newPartitionStorage0.rowsCount());
-
- tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1,
TimeUnit.SECONDS);
-
- MvPartitionStorage newPartitionStorage1 =
tableStorage.getMvPartition(PARTITION_ID);
-
- assertSame(newPartitionStorage0, newPartitionStorage1);
- }
-
- @Test
- public void testAbortRebalanceMvPartition() throws Exception {
- assertDoesNotThrow(() ->
tableStorage.abortRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS));
-
- MvPartitionStorage partitionStorage =
tableStorage.getOrCreateMvPartition(PARTITION_ID);
-
- tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1,
TimeUnit.SECONDS);
-
- tableStorage.abortRebalanceMvPartition(PARTITION_ID).get(1,
TimeUnit.SECONDS);
-
- assertSame(partitionStorage,
tableStorage.getMvPartition(PARTITION_ID));
-
- assertDoesNotThrow(() ->
tableStorage.abortRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS));
- }
-
- @Test
- public void testFinishRebalanceMvPartition() throws Exception {
- assertDoesNotThrow(() ->
tableStorage.finishRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS));
-
- tableStorage.getOrCreateMvPartition(PARTITION_ID);
-
- tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1,
TimeUnit.SECONDS);
-
- MvPartitionStorage newPartitionStorage =
tableStorage.getMvPartition(PARTITION_ID);
-
- tableStorage.finishRebalanceMvPartition(PARTITION_ID).get(1,
TimeUnit.SECONDS);
-
- assertSame(newPartitionStorage,
tableStorage.getMvPartition(PARTITION_ID));
-
- assertDoesNotThrow(() ->
tableStorage.finishRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS));
- }
-
@Test
public void testDestroyPartition() throws Exception {
- assertThrows(
- IllegalArgumentException.class,
- () ->
tableStorage.destroyPartition(tableStorage.configuration().partitions().value())
- );
+ assertThrows(IllegalArgumentException.class, () ->
tableStorage.destroyPartition(getPartitionIdOutOfRange()));
MvPartitionStorage mvPartitionStorage =
tableStorage.getOrCreateMvPartition(PARTITION_ID);
HashIndexStorage hashIndexStorage =
tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id());
@@ -400,14 +329,15 @@ public abstract class AbstractMvTableStorageTest extends
BaseMvStoragesTest {
BinaryRow binaryRow = binaryRow(new TestKey(0, "0"), new TestValue(1,
"1"));
- IndexRow indexRow = indexRow(binaryRow, rowId);
+ IndexRow hashIndexRow = indexRow(hashIndexStorage.indexDescriptor(),
binaryRow, rowId);
+ IndexRow sortedIndexRow =
indexRow(sortedIndexStorage.indexDescriptor(), binaryRow, rowId);
mvPartitionStorage.runConsistently(() -> {
mvPartitionStorage.addWriteCommitted(rowId, binaryRow,
clock.now());
- hashIndexStorage.put(indexRow);
+ hashIndexStorage.put(hashIndexRow);
- sortedIndexStorage.put(indexRow);
+ sortedIndexStorage.put(sortedIndexRow);
return null;
});
@@ -415,12 +345,12 @@ public abstract class AbstractMvTableStorageTest extends
BaseMvStoragesTest {
Cursor<ReadResult> scanVersionsCursor =
mvPartitionStorage.scanVersions(rowId);
PartitionTimestampCursor scanTimestampCursor =
mvPartitionStorage.scan(clock.now());
- Cursor<RowId> getFromHashIndexCursor =
hashIndexStorage.get(indexRow.indexColumns());
+ Cursor<RowId> getFromHashIndexCursor =
hashIndexStorage.get(hashIndexRow.indexColumns());
- Cursor<RowId> getFromSortedIndexCursor =
sortedIndexStorage.get(indexRow.indexColumns());
+ Cursor<RowId> getFromSortedIndexCursor =
sortedIndexStorage.get(hashIndexRow.indexColumns());
Cursor<IndexRow> scanFromSortedIndexCursor =
sortedIndexStorage.scan(null, null, 0);
- tableStorage.destroyPartition(PARTITION_ID).get(1, TimeUnit.SECONDS);
+ tableStorage.destroyPartition(PARTITION_ID).get(1, SECONDS);
// Let's check that we won't get destroyed storages.
assertNull(tableStorage.getMvPartition(PARTITION_ID));
@@ -440,7 +370,7 @@ public abstract class AbstractMvTableStorageTest extends
BaseMvStoragesTest {
assertThrows(StorageClosedException.class, () ->
getAll(scanFromSortedIndexCursor));
// Let's check that nothing will happen if we try to destroy a
non-existing partition.
- assertDoesNotThrow(() ->
tableStorage.destroyPartition(PARTITION_ID).get(1, TimeUnit.SECONDS));
+ assertThat(tableStorage.destroyPartition(PARTITION_ID),
willCompleteSuccessfully());
}
@Test
@@ -457,20 +387,124 @@ public abstract class AbstractMvTableStorageTest extends
BaseMvStoragesTest {
return null;
});
- tableStorage.destroyPartition(PARTITION_ID).get(1, TimeUnit.SECONDS);
+ tableStorage.destroyPartition(PARTITION_ID).get(1, SECONDS);
MvPartitionStorage newMvPartitionStorage =
tableStorage.getOrCreateMvPartition(PARTITION_ID);
assertThat(getAll(newMvPartitionStorage.scanVersions(rowId)), empty());
}
+ @Test
+ public void testSuccessRebalance() throws Exception {
+ MvPartitionStorage mvPartitionStorage =
tableStorage.getOrCreateMvPartition(PARTITION_ID);
+ HashIndexStorage hashIndexStorage =
tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id());
+ SortedIndexStorage sortedIndexStorage =
tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id());
+
+ // Error because reblance has not yet started for the partition.
+ assertThrows(StorageRebalanceException.class, () ->
tableStorage.finishRebalancePartition(PARTITION_ID, 100, 500));
+
+ List<IgniteTuple3<RowId, BinaryRow, HybridTimestamp>>
rowsBeforeRebalanceStart = List.of(
+ new IgniteTuple3<>(new RowId(PARTITION_ID), binaryRow(new
TestKey(0, "0"), new TestValue(0, "0")), clock.now()),
+ new IgniteTuple3<>(new RowId(PARTITION_ID), binaryRow(new
TestKey(1, "1"), new TestValue(1, "1")), clock.now())
+ );
+
+ startRebalanceWithChecks(
+ PARTITION_ID,
+ mvPartitionStorage,
+ hashIndexStorage,
+ sortedIndexStorage,
+ rowsBeforeRebalanceStart
+ );
+
+ // Let's fill the storages with fresh data on rebalance.
+ List<IgniteTuple3<RowId, BinaryRow, HybridTimestamp>> rowsOnRebalance
= List.of(
+ new IgniteTuple3<>(new RowId(PARTITION_ID), binaryRow(new
TestKey(2, "2"), new TestValue(2, "2")), clock.now()),
+ new IgniteTuple3<>(new RowId(PARTITION_ID), binaryRow(new
TestKey(3, "3"), new TestValue(3, "3")), clock.now())
+ );
+
+ fillStorages(mvPartitionStorage, hashIndexStorage, sortedIndexStorage,
rowsOnRebalance);
+
+ checkLastApplied(mvPartitionStorage, REBALANCE_IN_PROGRESS,
REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS);
+
+ // Let's finish rebalancing.
+
+ // Partition is out of configuration range.
+ assertThrows(IllegalArgumentException.class, () ->
tableStorage.finishRebalancePartition(getPartitionIdOutOfRange(), 100, 500));
+
+ // Partition does not exist.
+ assertThrows(StorageRebalanceException.class, () ->
tableStorage.finishRebalancePartition(1, 100, 500));
+
+ assertThat(tableStorage.finishRebalancePartition(PARTITION_ID, 10,
20), willCompleteSuccessfully());
+
+ // Let's check the storages after success finish rebalance.
+ checkForMissingRows(mvPartitionStorage, hashIndexStorage,
sortedIndexStorage, rowsBeforeRebalanceStart);
+ checkForPresenceRows(mvPartitionStorage, hashIndexStorage,
sortedIndexStorage, rowsOnRebalance);
+
+ checkLastApplied(mvPartitionStorage, 10, 10, 20);
+ }
+
+ @Test
+ public void testFailRebalance() throws Exception {
+ MvPartitionStorage mvPartitionStorage =
tableStorage.getOrCreateMvPartition(PARTITION_ID);
+ HashIndexStorage hashIndexStorage =
tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx.id());
+ SortedIndexStorage sortedIndexStorage =
tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx.id());
+
+ // Nothing will happen because rebalancing has not started.
+ tableStorage.abortRebalancePartition(PARTITION_ID).get(1, SECONDS);
+
+ List<IgniteTuple3<RowId, BinaryRow, HybridTimestamp>>
rowsBeforeRebalanceStart = List.of(
+ new IgniteTuple3<>(new RowId(PARTITION_ID), binaryRow(new
TestKey(0, "0"), new TestValue(0, "0")), clock.now()),
+ new IgniteTuple3<>(new RowId(PARTITION_ID), binaryRow(new
TestKey(1, "1"), new TestValue(1, "1")), clock.now())
+ );
+
+ startRebalanceWithChecks(
+ PARTITION_ID,
+ mvPartitionStorage,
+ hashIndexStorage,
+ sortedIndexStorage,
+ rowsBeforeRebalanceStart
+ );
+
+ // Let's fill the storages with fresh data on rebalance.
+ List<IgniteTuple3<RowId, BinaryRow, HybridTimestamp>> rowsOnRebalance
= List.of(
+ new IgniteTuple3<>(new RowId(PARTITION_ID), binaryRow(new
TestKey(2, "2"), new TestValue(2, "2")), clock.now()),
+ new IgniteTuple3<>(new RowId(PARTITION_ID), binaryRow(new
TestKey(3, "3"), new TestValue(3, "3")), clock.now())
+ );
+
+ fillStorages(mvPartitionStorage, hashIndexStorage, sortedIndexStorage,
rowsOnRebalance);
+
+ checkLastApplied(mvPartitionStorage, REBALANCE_IN_PROGRESS,
REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS);
+
+ // Let's abort rebalancing.
+
+ // Partition is out of configuration range.
+ assertThrows(IllegalArgumentException.class, () ->
tableStorage.abortRebalancePartition(getPartitionIdOutOfRange()));
+
+ assertThat(tableStorage.abortRebalancePartition(PARTITION_ID),
willCompleteSuccessfully());
+
+ // Let's check the storages after abort rebalance.
+ checkForMissingRows(mvPartitionStorage, hashIndexStorage,
sortedIndexStorage, rowsBeforeRebalanceStart);
+ checkForMissingRows(mvPartitionStorage, hashIndexStorage,
sortedIndexStorage, rowsOnRebalance);
+
+ checkLastApplied(mvPartitionStorage, 0, 0, 0);
+ }
+
+ @Test
+ public void testStartRebalanceForClosedPartition() {
+ MvPartitionStorage mvPartitionStorage =
tableStorage.getOrCreateMvPartition(PARTITION_ID);
+
+ mvPartitionStorage.close();
+
+ assertThrows(StorageRebalanceException.class, () ->
tableStorage.startRebalancePartition(PARTITION_ID));
+ }
+
private static void createTestIndexes(TablesConfiguration tablesConfig) {
List<IndexDefinition> indexDefinitions = List.of(
SchemaBuilders.sortedIndex(SORTED_INDEX_NAME)
- .addIndexColumn("COLUMN0").done()
+ .addIndexColumn("strKey").done()
.build(),
SchemaBuilders.hashIndex(HASH_INDEX_NAME)
- .withColumns("COLUMN0")
+ .withColumns("strKey")
.build()
);
@@ -488,10 +522,12 @@ public abstract class AbstractMvTableStorageTest extends
BaseMvStoragesTest {
private static void createTestTable(TableConfiguration tableConfig) {
TableDefinition tableDefinition =
SchemaBuilders.tableBuilder("PUBLIC", "foo")
.columns(
- SchemaBuilders.column("ID", ColumnType.INT32).build(),
- SchemaBuilders.column("COLUMN0",
ColumnType.INT32).build()
+ SchemaBuilders.column("intKey",
ColumnType.INT32).build(),
+ SchemaBuilders.column("strKey",
ColumnType.string()).build(),
+ SchemaBuilders.column("intVal",
ColumnType.INT32).build(),
+ SchemaBuilders.column("strVal",
ColumnType.string()).build()
)
- .withPrimaryKey("ID")
+ .withPrimaryKey("intKey")
.build();
CompletableFuture<Void> createTableFuture = tableConfig.change(
@@ -503,11 +539,11 @@ public abstract class AbstractMvTableStorageTest extends
BaseMvStoragesTest {
private static <T> List<T> getAll(Cursor<T> cursor) {
try (cursor) {
- return cursor.stream().collect(Collectors.toList());
+ return cursor.stream().collect(toList());
}
}
- private void checkStorageDestroyed(MvPartitionStorage storage) {
+ private void checkStorageDestroyed(MvPartitionStorage storage) throws
Exception {
int partId = PARTITION_ID;
assertThrows(StorageClosedException.class, () ->
storage.runConsistently(() -> null));
@@ -544,21 +580,207 @@ public abstract class AbstractMvTableStorageTest extends
BaseMvStoragesTest {
private void checkStorageDestroyed(SortedIndexStorage storage) {
checkStorageDestroyed((IndexStorage) storage);
- BinaryTuple indexKey = indexKey(binaryRow(new TestKey(0, "0"), new
TestValue(1, "1")));
-
- assertThrows(
- StorageClosedException.class,
- () ->
storage.scan(BinaryTuplePrefix.fromBinaryTuple(indexKey),
BinaryTuplePrefix.fromBinaryTuple(indexKey), 0)
- );
+ assertThrows(StorageClosedException.class, () -> storage.scan(null,
null, 0));
}
private void checkStorageDestroyed(IndexStorage storage) {
- IndexRow indexRow = indexRow(binaryRow(new TestKey(0, "0"), new
TestValue(1, "1")), new RowId(PARTITION_ID));
+ assertThrows(StorageClosedException.class, () ->
storage.get(mock(BinaryTuple.class)));
+
+ assertThrows(StorageClosedException.class, () ->
storage.put(mock(IndexRow.class)));
+
+ assertThrows(StorageClosedException.class, () ->
storage.remove(mock(IndexRow.class)));
+ }
+
+ private int getPartitionIdOutOfRange() {
+ return tableStorage.configuration().partitions().value();
+ }
+
+ private void startRebalanceWithChecks(
+ int partitionId,
+ MvPartitionStorage mvPartitionStorage,
+ HashIndexStorage hashIndexStorage,
+ SortedIndexStorage sortedIndexStorage,
+ List<IgniteTuple3<RowId, BinaryRow, HybridTimestamp>>
rowsBeforeRebalanceStart
+ ) {
+ assertThat(rowsBeforeRebalanceStart, hasSize(greaterThanOrEqualTo(2)));
- assertThrows(StorageClosedException.class, () ->
storage.get(indexRow.indexColumns()));
+ fillStorages(mvPartitionStorage, hashIndexStorage, sortedIndexStorage,
rowsBeforeRebalanceStart);
+
+ // Let's open the cursors before start rebalance.
+ IgniteTuple3<RowId, BinaryRow, HybridTimestamp> rowForCursors =
rowsBeforeRebalanceStart.get(0);
+
+ Cursor<?> mvPartitionStorageScanVersionsCursor =
mvPartitionStorage.scanVersions(rowForCursors.get1());
+ Cursor<?> mvPartitionStorageScanCursor =
mvPartitionStorage.scan(rowForCursors.get3());
+
+ IndexRow hashIndexRow = indexRow(hashIndexStorage.indexDescriptor(),
rowForCursors.get2(), rowForCursors.get1());
+ IndexRow sortedIndexRow =
indexRow(sortedIndexStorage.indexDescriptor(), rowForCursors.get2(),
rowForCursors.get1());
+
+ Cursor<?> hashIndexStorageGetCursor =
hashIndexStorage.get(hashIndexRow.indexColumns());
+
+ Cursor<?> sortedIndexStorageGetCursor =
sortedIndexStorage.get(sortedIndexRow.indexColumns());
+ Cursor<?> sortedIndexStorageScanCursor = sortedIndexStorage.scan(null,
null, 0);
+
+ // Partition is out of configuration range.
+ assertThrows(IllegalArgumentException.class, () ->
tableStorage.startRebalancePartition(getPartitionIdOutOfRange()));
+
+ // Partition does not exist.
+ assertThrows(StorageRebalanceException.class, () ->
tableStorage.startRebalancePartition(partitionId + 1));
+
+ // Let's start rebalancing of the partition.
+ assertThat(tableStorage.startRebalancePartition(partitionId),
willCompleteSuccessfully());
+
+ // Once again, rebalancing of the partition cannot be started.
+ assertThrows(StorageRebalanceException.class, () ->
tableStorage.startRebalancePartition(partitionId));
+
+ checkMvPartitionStorageMethodsAfterStartRebalance(mvPartitionStorage);
+ checkHashIndexStorageMethodsAfterStartRebalance(hashIndexStorage);
+ checkSortedIndexStorageMethodsAfterStartRebalance(sortedIndexStorage);
+
+ checkCursorAfterStartRebalance(mvPartitionStorageScanVersionsCursor);
+ checkCursorAfterStartRebalance(mvPartitionStorageScanCursor);
+
+ checkCursorAfterStartRebalance(hashIndexStorageGetCursor);
+
+ checkCursorAfterStartRebalance(sortedIndexStorageGetCursor);
+ checkCursorAfterStartRebalance(sortedIndexStorageScanCursor);
+ }
+
+ private void
checkMvPartitionStorageMethodsAfterStartRebalance(MvPartitionStorage storage) {
+ checkLastApplied(storage, REBALANCE_IN_PROGRESS,
REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS);
+
+ assertDoesNotThrow(() -> storage.committedGroupConfiguration());
+
+ storage.runConsistently(() -> {
+ assertThrows(StorageRebalanceException.class, () ->
storage.lastApplied(100, 500));
+
+ assertThrows(
+ StorageRebalanceException.class,
+ () ->
storage.committedGroupConfiguration(mock(RaftGroupConfiguration.class))
+ );
- assertThrows(StorageClosedException.class, () ->
storage.put(indexRow));
+ RowId rowId = new RowId(PARTITION_ID);
+
+ assertThrows(StorageRebalanceException.class, () ->
storage.read(rowId, clock.now()));
+ assertThrows(StorageRebalanceException.class, () ->
storage.abortWrite(rowId));
+ assertThrows(StorageRebalanceException.class, () ->
storage.scanVersions(rowId));
+ assertThrows(StorageRebalanceException.class, () ->
storage.scan(clock.now()));
+ assertThrows(StorageRebalanceException.class, () ->
storage.closestRowId(rowId));
+ assertThrows(StorageRebalanceException.class, storage::rowsCount);
+
+ // TODO: IGNITE-18020 Add check
+ // TODO: IGNITE-18023 Add check
+ if (storage instanceof TestMvPartitionStorage) {
+ assertThrows(StorageRebalanceException.class, () ->
storage.pollForVacuum(clock.now()));
+ }
+
+ return null;
+ });
+ }
+
+ private static void
checkHashIndexStorageMethodsAfterStartRebalance(HashIndexStorage storage) {
+ assertDoesNotThrow(storage::indexDescriptor);
+
+ assertThrows(StorageRebalanceException.class, () ->
storage.get(mock(BinaryTuple.class)));
+ assertThrows(StorageRebalanceException.class, () ->
storage.remove(mock(IndexRow.class)));
+ }
+
+ private static void
checkSortedIndexStorageMethodsAfterStartRebalance(SortedIndexStorage storage) {
+ assertDoesNotThrow(storage::indexDescriptor);
+
+ assertThrows(StorageRebalanceException.class, () ->
storage.get(mock(BinaryTuple.class)));
+ assertThrows(StorageRebalanceException.class, () ->
storage.remove(mock(IndexRow.class)));
+ assertThrows(StorageRebalanceException.class, () -> storage.scan(null,
null, 0));
+ }
+
+ private static void checkCursorAfterStartRebalance(Cursor<?> cursor) {
+ assertDoesNotThrow(cursor::close);
+
+ assertThrows(StorageRebalanceException.class, cursor::hasNext);
+ assertThrows(StorageRebalanceException.class, cursor::next);
+ }
+
+ private void fillStorages(
+ MvPartitionStorage mvPartitionStorage,
+ HashIndexStorage hashIndexStorage,
+ SortedIndexStorage sortedIndexStorage,
+ List<IgniteTuple3<RowId, BinaryRow, HybridTimestamp>> rows
+ ) {
+ for (int i = 0; i < rows.size(); i++) {
+ int finalI = i;
+
+ IgniteTuple3<RowId, BinaryRow, HybridTimestamp> row = rows.get(i);
+
+ RowId rowId = row.get1();
+ BinaryRow binaryRow = row.get2();
+ HybridTimestamp timestamp = row.get3();
+
+ IndexRow hashIndexRow =
indexRow(hashIndexStorage.indexDescriptor(), binaryRow, rowId);
+ IndexRow sortedIndexRow =
indexRow(sortedIndexStorage.indexDescriptor(), binaryRow, rowId);
+
+ mvPartitionStorage.runConsistently(() -> {
+ // If even.
+ if ((finalI & 1) == 0) {
+ mvPartitionStorage.addWrite(rowId, binaryRow,
UUID.randomUUID(), UUID.randomUUID(), rowId.partitionId());
+
+ mvPartitionStorage.commitWrite(rowId, timestamp);
+ } else {
+ mvPartitionStorage.addWriteCommitted(rowId, binaryRow,
timestamp);
+ }
+
+ hashIndexStorage.put(hashIndexRow);
+
+ sortedIndexStorage.put(sortedIndexRow);
+
+ return null;
+ });
+ }
+ }
+
+ private void checkForMissingRows(
+ MvPartitionStorage mvPartitionStorage,
+ HashIndexStorage hashIndexStorage,
+ SortedIndexStorage sortedIndexStorage,
+ List<IgniteTuple3<RowId, BinaryRow, HybridTimestamp>> rows
+ ) {
+ for (IgniteTuple3<RowId, BinaryRow, HybridTimestamp> row : rows) {
+ assertThat(getAll(mvPartitionStorage.scanVersions(row.get1())),
is(empty()));
+
+ IndexRow hashIndexRow =
indexRow(hashIndexStorage.indexDescriptor(), row.get2(), row.get1());
+ IndexRow sortedIndexRow =
indexRow(sortedIndexStorage.indexDescriptor(), row.get2(), row.get1());
+
+
assertThat(getAll(hashIndexStorage.get(hashIndexRow.indexColumns())),
is(empty()));
+
assertThat(getAll(sortedIndexStorage.get(sortedIndexRow.indexColumns())),
is(empty()));
+ }
+ }
+
+ private void checkForPresenceRows(
+ MvPartitionStorage mvPartitionStorage,
+ HashIndexStorage hashIndexStorage,
+ SortedIndexStorage sortedIndexStorage,
+ List<IgniteTuple3<RowId, BinaryRow, HybridTimestamp>> rows
+ ) {
+ for (IgniteTuple3<RowId, BinaryRow, HybridTimestamp> row : rows) {
+ assertThat(
+
getAll(mvPartitionStorage.scanVersions(row.get1())).stream().map(ReadResult::binaryRow).collect(toList()),
+ containsInAnyOrder(row.get2())
+ );
+
+ IndexRow hashIndexRow =
indexRow(hashIndexStorage.indexDescriptor(), row.get2(), row.get1());
+ IndexRow sortedIndexRow =
indexRow(sortedIndexStorage.indexDescriptor(), row.get2(), row.get1());
+
+
assertThat(getAll(hashIndexStorage.get(hashIndexRow.indexColumns())),
contains(row.get1()));
+
assertThat(getAll(sortedIndexStorage.get(sortedIndexRow.indexColumns())),
contains(row.get1()));
+ }
+ }
- assertThrows(StorageClosedException.class, () ->
storage.remove(indexRow));
+ private static void checkLastApplied(
+ MvPartitionStorage storage,
+ long expLastAppliedIndex,
+ long expPersistentIndex,
+ long expLastAppliedTerm
+ ) {
+ assertEquals(expLastAppliedIndex, storage.lastAppliedIndex());
+ assertEquals(expPersistentIndex, storage.persistedIndex());
+ assertEquals(expLastAppliedTerm, storage.lastAppliedTerm());
}
}
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
index 7389cfd183..2f5fb471b5 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
@@ -21,6 +21,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import java.util.List;
import java.util.Locale;
@@ -31,6 +32,7 @@ import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.schema.BinaryConverter;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTupleSchema;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.NativeTypes;
import org.apache.ignite.internal.schema.SchemaDescriptor;
@@ -39,6 +41,7 @@ import
org.apache.ignite.internal.schema.marshaller.MarshallerException;
import org.apache.ignite.internal.schema.marshaller.MarshallerFactory;
import
org.apache.ignite.internal.schema.marshaller.reflection.ReflectionMarshallerFactory;
import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.storage.index.IndexDescriptor;
import org.apache.ignite.internal.storage.index.IndexRow;
import org.apache.ignite.internal.storage.index.IndexRowImpl;
import org.apache.ignite.internal.tostring.IgniteToStringInclude;
@@ -115,12 +118,22 @@ public abstract class BaseMvStoragesTest {
}
}
- protected static IndexRow indexRow(BinaryRow binaryRow, RowId rowId) {
- return new IndexRowImpl(kvBinaryConverter.toTuple(binaryRow), rowId);
- }
+ protected static IndexRow indexRow(IndexDescriptor indexDescriptor,
BinaryRow binaryRow, RowId rowId) {
+ int[] columnIndexes = indexDescriptor.columns().stream()
+ .mapToInt(indexColumnDescriptor -> {
+ Column column =
schemaDescriptor.column(indexColumnDescriptor.name());
+
+ assertNotNull(column, column.name());
+
+ return column.schemaIndex();
+ })
+ .toArray();
+
+ BinaryTupleSchema binaryTupleSchema =
BinaryTupleSchema.createSchema(schemaDescriptor, columnIndexes);
+
+ BinaryConverter binaryTupleConverter = new
BinaryConverter(schemaDescriptor, binaryTupleSchema, false);
- protected static BinaryTuple indexKey(BinaryRow binaryKey) {
- return kBinaryConverter.toTuple(binaryKey);
+ return new IndexRowImpl(binaryTupleConverter.toTuple(binaryRow),
rowId);
}
protected static TestKey key(BinaryRow binaryRow) {
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
index 02e5251418..b379217843 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
@@ -22,13 +22,11 @@ import static java.util.Comparator.comparing;
import java.util.Iterator;
import java.util.NavigableSet;
import java.util.NoSuchElementException;
-import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.stream.Stream;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.storage.BinaryRowAndRowId;
@@ -39,6 +37,7 @@ import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageClosedException;
import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.StorageRebalanceException;
import org.apache.ignite.internal.storage.TxIdMismatchException;
import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.Nullable;
@@ -65,6 +64,8 @@ public class TestMvPartitionStorage implements
MvPartitionStorage {
private volatile boolean closed;
+ private volatile boolean rebalance;
+
public TestMvPartitionStorage(int partitionId) {
this.partitionId = partitionId;
}
@@ -106,35 +107,35 @@ public class TestMvPartitionStorage implements
MvPartitionStorage {
@Override
public <V> V runConsistently(WriteClosure<V> closure) throws
StorageException {
- checkClosed();
+ checkStorageClosed();
return closure.execute();
}
@Override
public CompletableFuture<Void> flush() {
- checkClosed();
+ checkStorageClosed();
return CompletableFuture.completedFuture(null);
}
@Override
public long lastAppliedIndex() {
- checkClosed();
+ checkStorageClosed();
return lastAppliedIndex;
}
@Override
public long lastAppliedTerm() {
- checkClosed();
+ checkStorageClosed();
return lastAppliedTerm;
}
@Override
public void lastApplied(long lastAppliedIndex, long lastAppliedTerm)
throws StorageException {
- checkClosed();
+ checkStorageClosedOrInProcessOfRebalance();
this.lastAppliedIndex = lastAppliedIndex;
this.lastAppliedTerm = lastAppliedTerm;
@@ -142,7 +143,7 @@ public class TestMvPartitionStorage implements
MvPartitionStorage {
@Override
public long persistedIndex() {
- checkClosed();
+ checkStorageClosed();
return lastAppliedIndex;
}
@@ -150,13 +151,15 @@ public class TestMvPartitionStorage implements
MvPartitionStorage {
@Override
@Nullable
public RaftGroupConfiguration committedGroupConfiguration() {
- checkClosed();
+ checkStorageClosed();
return groupConfig;
}
@Override
public void committedGroupConfiguration(RaftGroupConfiguration config) {
+ checkStorageClosedOrInProcessOfRebalance();
+
this.groupConfig = config;
}
@@ -168,7 +171,7 @@ public class TestMvPartitionStorage implements
MvPartitionStorage {
UUID commitTableId,
int commitPartitionId
) throws TxIdMismatchException {
- checkClosed();
+ checkStorageClosed();
BinaryRow[] res = {null};
@@ -191,7 +194,7 @@ public class TestMvPartitionStorage implements
MvPartitionStorage {
@Override
public synchronized @Nullable BinaryRow abortWrite(RowId rowId) {
- checkClosed();
+ checkStorageClosedOrInProcessOfRebalance();
BinaryRow[] res = {null};
@@ -212,7 +215,7 @@ public class TestMvPartitionStorage implements
MvPartitionStorage {
@Override
public synchronized void commitWrite(RowId rowId, HybridTimestamp
timestamp) {
- checkClosed();
+ checkStorageClosed();
map.compute(rowId, (ignored, versionChain) -> {
assert versionChain != null;
@@ -231,7 +234,7 @@ public class TestMvPartitionStorage implements
MvPartitionStorage {
@Nullable BinaryRow row,
HybridTimestamp commitTimestamp
) throws StorageException {
- checkClosed();
+ checkStorageClosed();
map.compute(rowId, (ignored, versionChain) -> {
if (versionChain != null && versionChain.isWriteIntent()) {
@@ -269,7 +272,7 @@ public class TestMvPartitionStorage implements
MvPartitionStorage {
@Override
public ReadResult read(RowId rowId, @Nullable HybridTimestamp timestamp) {
- checkClosed();
+ checkStorageClosedOrInProcessOfRebalance();
if (rowId.partitionId() != partitionId) {
throw new IllegalArgumentException(
@@ -390,19 +393,14 @@ public class TestMvPartitionStorage implements
MvPartitionStorage {
@Override
public Cursor<ReadResult> scanVersions(RowId rowId) throws
StorageException {
- checkClosed();
+ checkStorageClosedOrInProcessOfRebalance();
- return Cursor.fromBareIterator(
- Stream.iterate(map.get(rowId), Objects::nonNull, vc -> vc.next)
- .peek(versionChain -> checkClosed())
- .map((VersionChain versionChain) ->
versionChainToReadResult(versionChain, false))
- .iterator()
- );
+ return new ScanVersionsCursor(rowId);
}
@Override
public PartitionTimestampCursor scan(HybridTimestamp timestamp) {
- checkClosed();
+ checkStorageClosedOrInProcessOfRebalance();
Iterator<VersionChain> iterator = map.values().iterator();
@@ -436,7 +434,7 @@ public class TestMvPartitionStorage implements
MvPartitionStorage {
@Override
public boolean hasNext() {
- checkClosed();
+ checkStorageClosedOrInProcessOfRebalance();
if (currentReadResult != null) {
return true;
@@ -478,13 +476,15 @@ public class TestMvPartitionStorage implements
MvPartitionStorage {
@Override
public @Nullable RowId closestRowId(RowId lowerBound) throws
StorageException {
- checkClosed();
+ checkStorageClosedOrInProcessOfRebalance();
return map.ceilingKey(lowerBound);
}
@Override
public synchronized @Nullable BinaryRowAndRowId
pollForVacuum(HybridTimestamp lowWatermark) {
+ checkStorageClosedOrInProcessOfRebalance();
+
Iterator<VersionChain> it = gcQueue.iterator();
if (!it.hasNext()) {
@@ -530,14 +530,18 @@ public class TestMvPartitionStorage implements
MvPartitionStorage {
@Override
public long rowsCount() {
- checkClosed();
+ checkStorageClosedOrInProcessOfRebalance();
return map.size();
}
@Override
public void close() {
+ assert !rebalance;
+
closed = true;
+
+ clear();
}
public void destroy() {
@@ -551,9 +555,112 @@ public class TestMvPartitionStorage implements
MvPartitionStorage {
gcQueue.clear();
}
- private void checkClosed() {
+ private void checkStorageClosed() {
if (closed) {
throw new StorageClosedException("Storage is already closed");
}
}
+
+ private void checkStorageInProcessOfRebalance() {
+ if (rebalance) {
+ throw new StorageRebalanceException("Storage in the process of
rebalancing");
+ }
+ }
+
+ private void checkStorageClosedOrInProcessOfRebalance() {
+ checkStorageClosed();
+ checkStorageInProcessOfRebalance();
+ }
+
+ void startRebalance() {
+ checkStorageClosed();
+
+ rebalance = true;
+
+ clear();
+
+ lastAppliedIndex = REBALANCE_IN_PROGRESS;
+ lastAppliedTerm = REBALANCE_IN_PROGRESS;
+ }
+
+ void abortRebalance() {
+ checkStorageClosed();
+
+ if (!rebalance) {
+ return;
+ }
+
+ rebalance = false;
+
+ clear();
+
+ lastAppliedIndex = 0;
+ lastAppliedTerm = 0;
+ }
+
+ void finishRebalance(long lastAppliedIndex, long lastAppliedTerm) {
+ checkStorageClosed();
+
+ assert rebalance;
+
+ rebalance = false;
+
+ this.lastAppliedIndex = lastAppliedIndex;
+ this.lastAppliedTerm = lastAppliedTerm;
+ }
+
+ boolean closed() {
+ return closed;
+ }
+
+ private class ScanVersionsCursor implements Cursor<ReadResult> {
+ private final RowId rowId;
+
+ @Nullable
+ private Boolean hasNext;
+
+ @Nullable
+ private VersionChain versionChain;
+
+ private ScanVersionsCursor(RowId rowId) {
+ this.rowId = rowId;
+ }
+
+ @Override
+ public void close() {
+ // No-op.
+ }
+
+ @Override
+ public boolean hasNext() {
+ advanceIfNeeded();
+
+ return hasNext;
+ }
+
+ @Override
+ public ReadResult next() {
+ advanceIfNeeded();
+
+ if (!hasNext) {
+ throw new NoSuchElementException();
+ }
+
+ hasNext = null;
+
+ return versionChainToReadResult(versionChain, false);
+ }
+
+ private void advanceIfNeeded() {
+ checkStorageClosedOrInProcessOfRebalance();
+
+ if (hasNext != null) {
+ return;
+ }
+
+ versionChain = versionChain == null ? map.get(rowId) :
versionChain.next;
+
+ hasNext = versionChain != null;
+ }
+ }
}
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
index dc53ea17bb..21c84b1051 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
@@ -20,13 +20,16 @@ package org.apache.ignite.internal.storage.impl;
import static java.util.concurrent.CompletableFuture.completedFuture;
import java.util.Map;
+import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Stream;
import org.apache.ignite.internal.schema.configuration.TableConfiguration;
import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.StorageRebalanceException;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.index.HashIndexDescriptor;
import org.apache.ignite.internal.storage.index.HashIndexStorage;
@@ -41,9 +44,7 @@ import org.jetbrains.annotations.Nullable;
* Test table storage implementation.
*/
public class TestMvTableStorage implements MvTableStorage {
- private final Map<Integer, MvPartitionStorage> partitions = new
ConcurrentHashMap<>();
-
- private final Map<Integer, MvPartitionStorage> backupPartitions = new
ConcurrentHashMap<>();
+ private final Map<Integer, TestMvPartitionStorage> partitions = new
ConcurrentHashMap<>();
private final Map<UUID, SortedIndices> sortedIndicesById = new
ConcurrentHashMap<>();
@@ -51,6 +52,8 @@ public class TestMvTableStorage implements MvTableStorage {
private final Map<Integer, CompletableFuture<Void>>
destroyFutureByPartitionId = new ConcurrentHashMap<>();
+ private final Map<Integer, CompletableFuture<Void>>
rebalanceFutureByPartitionId = new ConcurrentHashMap<>();
+
private final TableConfiguration tableCfg;
private final TablesConfiguration tablesCfg;
@@ -61,13 +64,13 @@ public class TestMvTableStorage implements MvTableStorage {
private static class SortedIndices {
private final SortedIndexDescriptor descriptor;
- final Map<Integer, SortedIndexStorage> storageByPartitionId = new
ConcurrentHashMap<>();
+ final Map<Integer, TestSortedIndexStorage> storageByPartitionId = new
ConcurrentHashMap<>();
SortedIndices(SortedIndexDescriptor descriptor) {
this.descriptor = descriptor;
}
- SortedIndexStorage getOrCreateStorage(Integer partitionId) {
+ TestSortedIndexStorage getOrCreateStorage(Integer partitionId) {
return storageByPartitionId.computeIfAbsent(partitionId, id -> new
TestSortedIndexStorage(descriptor));
}
}
@@ -78,13 +81,13 @@ public class TestMvTableStorage implements MvTableStorage {
private static class HashIndices {
private final HashIndexDescriptor descriptor;
- final Map<Integer, HashIndexStorage> storageByPartitionId = new
ConcurrentHashMap<>();
+ final Map<Integer, TestHashIndexStorage> storageByPartitionId = new
ConcurrentHashMap<>();
HashIndices(HashIndexDescriptor descriptor) {
this.descriptor = descriptor;
}
- HashIndexStorage getOrCreateStorage(Integer partitionId) {
+ TestHashIndexStorage getOrCreateStorage(Integer partitionId) {
return storageByPartitionId.computeIfAbsent(partitionId, id -> new
TestHashIndexStorage(descriptor));
}
}
@@ -110,6 +113,8 @@ public class TestMvTableStorage implements MvTableStorage {
public CompletableFuture<Void> destroyPartition(int partitionId) {
checkPartitionId(partitionId);
+ assert !rebalanceFutureByPartitionId.containsKey(partitionId);
+
CompletableFuture<Void> destroyPartitionFuture = new
CompletableFuture<>();
CompletableFuture<Void> previousDestroyPartitionFuture =
destroyFutureByPartitionId.putIfAbsent(
@@ -157,7 +162,7 @@ public class TestMvTableStorage implements MvTableStorage {
@Override
public SortedIndexStorage getOrCreateSortedIndex(int partitionId, UUID
indexId) {
if (!partitions.containsKey(partitionId)) {
- throw new StorageException("Partition ID " + partitionId + " does
not exist");
+ throw new
StorageException(createPartitionDoesNotExistsErrorMessage(partitionId));
}
SortedIndices sortedIndices = sortedIndicesById.computeIfAbsent(
@@ -171,7 +176,7 @@ public class TestMvTableStorage implements MvTableStorage {
@Override
public HashIndexStorage getOrCreateHashIndex(int partitionId, UUID
indexId) {
if (!partitions.containsKey(partitionId)) {
- throw new StorageException("Partition ID " + partitionId + " does
not exist");
+ throw new
StorageException(createPartitionDoesNotExistsErrorMessage(partitionId));
}
HashIndices sortedIndices = hashIndicesById.computeIfAbsent(
@@ -229,34 +234,94 @@ public class TestMvTableStorage implements MvTableStorage
{
}
@Override
- public CompletableFuture<Void> startRebalanceMvPartition(int partitionId) {
- MvPartitionStorage oldPartitionStorage = partitions.get(partitionId);
+ public CompletableFuture<Void> startRebalancePartition(int partitionId) {
+ checkPartitionId(partitionId);
- assert oldPartitionStorage != null : "Partition does not exist: " +
partitionId;
+ TestMvPartitionStorage partitionStorage = partitions.get(partitionId);
- if (backupPartitions.putIfAbsent(partitionId, oldPartitionStorage) ==
null) {
- partitions.put(partitionId, new
TestMvPartitionStorage(partitionId));
+ if (partitionStorage == null) {
+ throw new
StorageRebalanceException(createPartitionDoesNotExistsErrorMessage(partitionId));
}
- return completedFuture(null);
+ if (destroyFutureByPartitionId.containsKey(partitionId)) {
+ throw new StorageRebalanceException("Partition in the process of
destruction: " + partitionId);
+ }
+
+ if (partitionStorage.closed()) {
+ throw new StorageRebalanceException("Partition closed: " +
partitionId);
+ }
+
+ CompletableFuture<Void> rebalanceFuture = new CompletableFuture<>();
+
+ if (rebalanceFutureByPartitionId.putIfAbsent(partitionId,
rebalanceFuture) != null) {
+ throw new StorageRebalanceException("Rebalance for the partition
is already in progress: " + partitionId);
+ }
+
+ try {
+ partitionStorage.startRebalance();
+
+
testHashIndexStorageStream(partitionId).forEach(TestHashIndexStorage::startRebalance);
+
+
testSortedIndexStorageStream(partitionId).forEach(TestSortedIndexStorage::startRebalance);
+
+ rebalanceFuture.complete(null);
+ } catch (Throwable t) {
+ rebalanceFuture.completeExceptionally(t);
+ }
+
+ return rebalanceFuture;
}
@Override
- public CompletableFuture<Void> abortRebalanceMvPartition(int partitionId) {
- MvPartitionStorage oldPartitionStorage =
backupPartitions.remove(partitionId);
+ public CompletableFuture<Void> abortRebalancePartition(int partitionId) {
+ checkPartitionId(partitionId);
+
+ CompletableFuture<Void> rebalanceFuture =
rebalanceFutureByPartitionId.remove(partitionId);
- if (oldPartitionStorage != null) {
- partitions.put(partitionId, oldPartitionStorage);
+ if (rebalanceFuture == null) {
+ return completedFuture(null);
}
- return completedFuture(null);
+ TestMvPartitionStorage partitionStorage = partitions.get(partitionId);
+
+ if (partitionStorage == null) {
+ throw new
StorageRebalanceException(createPartitionDoesNotExistsErrorMessage(partitionId));
+ }
+
+ return rebalanceFuture
+ .thenAccept(unused -> {
+ partitionStorage.abortRebalance();
+
+
testHashIndexStorageStream(partitionId).forEach(TestHashIndexStorage::abortRebalance);
+
+
testSortedIndexStorageStream(partitionId).forEach(TestSortedIndexStorage::abortRebalance);
+ });
}
@Override
- public CompletableFuture<Void> finishRebalanceMvPartition(int partitionId)
{
- backupPartitions.remove(partitionId);
+ public CompletableFuture<Void> finishRebalancePartition(int partitionId,
long lastAppliedIndex, long lastAppliedTerm) {
+ checkPartitionId(partitionId);
- return completedFuture(null);
+ CompletableFuture<Void> rebalanceFuture =
rebalanceFutureByPartitionId.remove(partitionId);
+
+ if (rebalanceFuture == null) {
+ throw new StorageRebalanceException("Rebalance for the partition
did not start: " + partitionId);
+ }
+
+ TestMvPartitionStorage partitionStorage = partitions.get(partitionId);
+
+ if (partitionStorage == null) {
+ throw new
StorageRebalanceException(createPartitionDoesNotExistsErrorMessage(partitionId));
+ }
+
+ return rebalanceFuture
+ .thenAccept(unused -> {
+ partitionStorage.finishRebalance(lastAppliedIndex,
lastAppliedTerm);
+
+
testHashIndexStorageStream(partitionId).forEach(TestHashIndexStorage::finishRebalance);
+
+
testSortedIndexStorageStream(partitionId).forEach(TestSortedIndexStorage::finishRebalance);
+ });
}
private void checkPartitionId(int partitionId) {
@@ -271,4 +336,20 @@ public class TestMvTableStorage implements MvTableStorage {
));
}
}
+
+ private static String createPartitionDoesNotExistsErrorMessage(int
partitionId) {
+ return "Partition ID " + partitionId + " does not exist";
+ }
+
+ private Stream<TestHashIndexStorage> testHashIndexStorageStream(Integer
partitionId) {
+ return hashIndicesById.values().stream()
+ .map(hashIndices ->
hashIndices.storageByPartitionId.get(partitionId))
+ .filter(Objects::nonNull);
+ }
+
+ private Stream<TestSortedIndexStorage>
testSortedIndexStorageStream(Integer partitionId) {
+ return sortedIndicesById.values().stream()
+ .map(hashIndices ->
hashIndices.storageByPartitionId.get(partitionId))
+ .filter(Objects::nonNull);
+ }
}
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestHashIndexStorage.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestHashIndexStorage.java
index a15deb0656..4bc9f6b2d9 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestHashIndexStorage.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestHashIndexStorage.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageClosedException;
+import org.apache.ignite.internal.storage.StorageRebalanceException;
import org.apache.ignite.internal.storage.index.HashIndexDescriptor;
import org.apache.ignite.internal.storage.index.HashIndexStorage;
import org.apache.ignite.internal.storage.index.IndexRow;
@@ -43,6 +44,8 @@ public class TestHashIndexStorage implements HashIndexStorage
{
private volatile boolean closed;
+ private volatile boolean rebalance;
+
/**
* Constructor.
*/
@@ -57,18 +60,35 @@ public class TestHashIndexStorage implements
HashIndexStorage {
@Override
public Cursor<RowId> get(BinaryTuple key) {
- checkClosed();
+ checkStorageClosedOrInProcessOfRebalance();
+
+ Iterator<RowId> iterator = index.getOrDefault(key.byteBuffer(),
Set.of()).iterator();
- Iterator<RowId> iterator = index.getOrDefault(key.byteBuffer(),
Set.of()).stream()
- .peek(rowId -> checkClosed())
- .iterator();
+ return new Cursor<>() {
+ @Override
+ public void close() {
+ // No-op.
+ }
- return Cursor.fromBareIterator(iterator);
+ @Override
+ public boolean hasNext() {
+ checkStorageClosedOrInProcessOfRebalance();
+
+ return iterator.hasNext();
+ }
+
+ @Override
+ public RowId next() {
+ checkStorageClosedOrInProcessOfRebalance();
+
+ return iterator.next();
+ }
+ };
}
@Override
public void put(IndexRow row) {
- checkClosed();
+ checkStorageClosed();
index.compute(row.indexColumns().byteBuffer(), (k, v) -> {
if (v == null) {
@@ -88,7 +108,7 @@ public class TestHashIndexStorage implements
HashIndexStorage {
@Override
public void remove(IndexRow row) {
- checkClosed();
+ checkStorageClosedOrInProcessOfRebalance();
index.computeIfPresent(row.indexColumns().byteBuffer(), (k, v) -> {
if (v.contains(row.rowId())) {
@@ -111,7 +131,7 @@ public class TestHashIndexStorage implements
HashIndexStorage {
public void destroy() {
closed = true;
- index.clear();
+ clear();
}
/**
@@ -121,9 +141,54 @@ public class TestHashIndexStorage implements
HashIndexStorage {
index.clear();
}
- private void checkClosed() {
+ private void checkStorageClosed() {
if (closed) {
throw new StorageClosedException("Storage is already closed");
}
}
+
+ private void checkStorageClosedOrInProcessOfRebalance() {
+ checkStorageClosed();
+
+ if (rebalance) {
+ throw new StorageRebalanceException("Storage in the process of
rebalancing");
+ }
+ }
+
+ /**
+ * Starts rebalancing of the storage.
+ */
+ public void startRebalance() {
+ checkStorageClosed();
+
+ rebalance = true;
+
+ clear();
+ }
+
+ /**
+ * Aborts rebalance of the storage.
+ */
+ public void abortRebalance() {
+ checkStorageClosed();
+
+ if (!rebalance) {
+ return;
+ }
+
+ rebalance = false;
+
+ clear();
+ }
+
+ /**
+ * Completes rebalance of the storage.
+ */
+ public void finishRebalance() {
+ checkStorageClosed();
+
+ assert rebalance;
+
+ rebalance = false;
+ }
}
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
index 8286ecdc59..fa7f567c7c 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.schema.BinaryTuplePrefix;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageClosedException;
import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.StorageRebalanceException;
import org.apache.ignite.internal.storage.index.BinaryTupleComparator;
import org.apache.ignite.internal.storage.index.IndexRow;
import org.apache.ignite.internal.storage.index.IndexRowImpl;
@@ -58,6 +59,8 @@ public class TestSortedIndexStorage implements
SortedIndexStorage {
private volatile boolean closed;
+ private volatile boolean rebalance;
+
/**
* Constructor.
*/
@@ -73,7 +76,7 @@ public class TestSortedIndexStorage implements
SortedIndexStorage {
@Override
public Cursor<RowId> get(BinaryTuple key) throws StorageException {
- checkClosed();
+ checkStorageClosedOrInProcessOfRebalance();
Iterator<RowId> iterator = index.getOrDefault(key.byteBuffer(),
emptyNavigableMap()).keySet().iterator();
@@ -85,14 +88,14 @@ public class TestSortedIndexStorage implements
SortedIndexStorage {
@Override
public boolean hasNext() {
- checkClosed();
+ checkStorageClosedOrInProcessOfRebalance();
return iterator.hasNext();
}
@Override
public RowId next() {
- checkClosed();
+ checkStorageClosedOrInProcessOfRebalance();
return iterator.next();
}
@@ -101,7 +104,7 @@ public class TestSortedIndexStorage implements
SortedIndexStorage {
@Override
public void put(IndexRow row) {
- checkClosed();
+ checkStorageClosed();
index.compute(row.indexColumns().byteBuffer(), (k, v) -> {
NavigableMap<RowId, Object> rowIds = v == null ? new
ConcurrentSkipListMap<>() : v;
@@ -114,7 +117,7 @@ public class TestSortedIndexStorage implements
SortedIndexStorage {
@Override
public void remove(IndexRow row) {
- checkClosed();
+ checkStorageClosedOrInProcessOfRebalance();
index.computeIfPresent(row.indexColumns().byteBuffer(), (k, v) -> {
v.remove(row.rowId());
@@ -129,7 +132,7 @@ public class TestSortedIndexStorage implements
SortedIndexStorage {
@Nullable BinaryTuplePrefix upperBound,
int flags
) {
- checkClosed();
+ checkStorageClosedOrInProcessOfRebalance();
boolean includeLower = (flags & GREATER_OR_EQUAL) != 0;
boolean includeUpper = (flags & LESS_OR_EQUAL) != 0;
@@ -175,7 +178,7 @@ public class TestSortedIndexStorage implements
SortedIndexStorage {
public void destroy() {
closed = true;
- index.clear();
+ clear();
}
/**
@@ -185,12 +188,6 @@ public class TestSortedIndexStorage implements
SortedIndexStorage {
index.clear();
}
- private void checkClosed() {
- if (closed) {
- throw new StorageClosedException("Storage is already closed");
- }
- }
-
private class ScanCursor implements PeekCursor<IndexRow> {
private final NavigableMap<ByteBuffer, NavigableMap<RowId, Object>>
indexMap;
@@ -214,7 +211,7 @@ public class TestSortedIndexStorage implements
SortedIndexStorage {
@Override
public boolean hasNext() {
- checkClosed();
+ checkStorageClosedOrInProcessOfRebalance();
advanceIfNeeded();
@@ -223,7 +220,7 @@ public class TestSortedIndexStorage implements
SortedIndexStorage {
@Override
public IndexRow next() {
- checkClosed();
+ checkStorageClosedOrInProcessOfRebalance();
advanceIfNeeded();
@@ -240,6 +237,8 @@ public class TestSortedIndexStorage implements
SortedIndexStorage {
@Override
public @Nullable IndexRow peek() {
+ checkStorageClosedOrInProcessOfRebalance();
+
if (hasNext != null) {
if (hasNext) {
return new IndexRowImpl(new
BinaryTuple(descriptor.binaryTupleSchema(), currentEntry.getKey()), rowId);
@@ -315,4 +314,55 @@ public class TestSortedIndexStorage implements
SortedIndexStorage {
return rowIdEntry == null ? null : rowIdEntry.getKey();
}
}
+
+ private void checkStorageClosed() {
+ if (closed) {
+ throw new StorageClosedException("Storage is already closed");
+ }
+ }
+
+ private void checkStorageClosedOrInProcessOfRebalance() {
+ checkStorageClosed();
+
+ if (rebalance) {
+ throw new StorageRebalanceException("Storage in the process of
rebalancing");
+ }
+ }
+
+ /**
+ * Starts rebalancing for the storage.
+ */
+ public void startRebalance() {
+ checkStorageClosed();
+
+ rebalance = true;
+
+ clear();
+ }
+
+ /**
+ * Aborts rebalance of the storage.
+ */
+ public void abortRebalance() {
+ checkStorageClosed();
+
+ if (!rebalance) {
+ return;
+ }
+
+ rebalance = false;
+
+ clear();
+ }
+
+ /**
+ * Completes rebalance of the storage.
+ */
+ public void finishRebalance() {
+ checkStorageClosed();
+
+ assert rebalance;
+
+ rebalance = false;
+ }
}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
index 179efbe20a..5209d25ef0 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
@@ -439,19 +439,19 @@ public class PersistentPageMemoryTableStorage extends
AbstractPageMemoryTableSto
}
@Override
- public CompletableFuture<Void> startRebalanceMvPartition(int partitionId) {
+ public CompletableFuture<Void> startRebalancePartition(int partitionId) {
// TODO: IGNITE-18029 Implement
throw new UnsupportedOperationException();
}
@Override
- public CompletableFuture<Void> abortRebalanceMvPartition(int partitionId) {
+ public CompletableFuture<Void> abortRebalancePartition(int partitionId) {
// TODO: IGNITE-18029 Implement
throw new UnsupportedOperationException();
}
@Override
- public CompletableFuture<Void> finishRebalanceMvPartition(int partitionId)
{
+ public CompletableFuture<Void> finishRebalancePartition(int partitionId,
long lastAppliedIndex, long lastAppliedTerm) {
// TODO: IGNITE-18029 Implement
throw new UnsupportedOperationException();
}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
index f3130b32f9..c4755903d8 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
@@ -147,19 +147,19 @@ public class VolatilePageMemoryTableStorage extends
AbstractPageMemoryTableStora
}
@Override
- public CompletableFuture<Void> startRebalanceMvPartition(int partitionId) {
+ public CompletableFuture<Void> startRebalancePartition(int partitionId) {
// TODO: IGNITE-18028 Implement
throw new UnsupportedOperationException();
}
@Override
- public CompletableFuture<Void> abortRebalanceMvPartition(int partitionId) {
+ public CompletableFuture<Void> abortRebalancePartition(int partitionId) {
// TODO: IGNITE-18028 Implement
throw new UnsupportedOperationException();
}
@Override
- public CompletableFuture<Void> finishRebalanceMvPartition(int partitionId)
{
+ public CompletableFuture<Void> finishRebalancePartition(int partitionId,
long lastAppliedIndex, long lastAppliedTerm) {
// TODO: IGNITE-18028 Implement
throw new UnsupportedOperationException();
}
diff --git
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
index 04e6b403ad..c65cec3e05 100644
---
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
+++
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
@@ -79,33 +79,33 @@ public class PersistentPageMemoryMvTableStorageTest extends
AbstractMvTableStora
);
}
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-18029")
+ @Test
@Override
- public void testStartRebalanceMvPartition() throws Exception {
- super.testStartRebalanceMvPartition();
+ public void testDestroyPartition() throws Exception {
+ super.testDestroyPartition();
+
+ // Let's make sure that the checkpoint doesn't fail.
+ engine.checkpointManager()
+ .forceCheckpoint("after-test-destroy-partition")
+ .futureFor(CheckpointState.FINISHED)
+ .get(1, TimeUnit.SECONDS);
}
@Disabled("https://issues.apache.org/jira/browse/IGNITE-18029")
@Override
- public void testAbortRebalanceMvPartition() throws Exception {
- super.testAbortRebalanceMvPartition();
+ public void testSuccessRebalance() throws Exception {
+ super.testSuccessRebalance();
}
@Disabled("https://issues.apache.org/jira/browse/IGNITE-18029")
@Override
- public void testFinishRebalanceMvPartition() throws Exception {
- super.testFinishRebalanceMvPartition();
+ public void testFailRebalance() throws Exception {
+ super.testFailRebalance();
}
- @Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18029")
@Override
- public void testDestroyPartition() throws Exception {
- super.testDestroyPartition();
-
- // Let's make sure that the checkpoint doesn't fail.
- engine.checkpointManager()
- .forceCheckpoint("after-test-destroy-partition")
- .futureFor(CheckpointState.FINISHED)
- .get(1, TimeUnit.SECONDS);
+ public void testStartRebalanceForClosedPartition() {
+ super.testStartRebalanceForClosedPartition();
}
}
diff --git
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryMvTableStorageTest.java
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryMvTableStorageTest.java
index ddd58c47cd..1d37235ef3 100644
---
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryMvTableStorageTest.java
+++
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryMvTableStorageTest.java
@@ -71,33 +71,33 @@ public class VolatilePageMemoryMvTableStorageTest extends
AbstractMvTableStorage
);
}
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-18028")
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-17833")
@Override
- public void testStartRebalanceMvPartition() throws Exception {
- super.testStartRebalanceMvPartition();
+ public void testDestroyPartition() throws Exception {
+ super.testDestroyPartition();
}
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-18028")
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-17833")
@Override
- public void testAbortRebalanceMvPartition() throws Exception {
- super.testAbortRebalanceMvPartition();
+ public void testReCreatePartition() throws Exception {
+ super.testReCreatePartition();
}
@Disabled("https://issues.apache.org/jira/browse/IGNITE-18028")
@Override
- public void testFinishRebalanceMvPartition() throws Exception {
- super.testFinishRebalanceMvPartition();
+ public void testSuccessRebalance() throws Exception {
+ super.testSuccessRebalance();
}
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-17833")
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18028")
@Override
- public void testDestroyPartition() throws Exception {
- super.testDestroyPartition();
+ public void testFailRebalance() throws Exception {
+ super.testFailRebalance();
}
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-17833")
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18028")
@Override
- public void testReCreatePartition() throws Exception {
- super.testReCreatePartition();
+ public void testStartRebalanceForClosedPartition() {
+ super.testStartRebalanceForClosedPartition();
}
}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index c2081c379e..f0c1c948bc 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -600,19 +600,19 @@ public class RocksDbTableStorage implements
MvTableStorage {
}
@Override
- public CompletableFuture<Void> startRebalanceMvPartition(int partitionId) {
+ public CompletableFuture<Void> startRebalancePartition(int partitionId) {
// TODO: IGNITE-18027 Implement
throw new UnsupportedOperationException();
}
@Override
- public CompletableFuture<Void> abortRebalanceMvPartition(int partitionId) {
+ public CompletableFuture<Void> abortRebalancePartition(int partitionId) {
// TODO: IGNITE-18027 Implement
throw new UnsupportedOperationException();
}
@Override
- public CompletableFuture<Void> finishRebalanceMvPartition(int partitionId)
{
+ public CompletableFuture<Void> finishRebalancePartition(int partitionId,
long lastAppliedIndex, long lastAppliedTerm) {
// TODO: IGNITE-18027 Implement
throw new UnsupportedOperationException();
}
diff --git
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
index 0272c65f79..fe2ca44c67 100644
---
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
+++
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
@@ -153,33 +153,33 @@ public class RocksDbMvTableStorageTest extends
AbstractMvTableStorageTest {
assertThat(tableStorage.isVolatile(), is(false));
}
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-18027")
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18180")
@Override
- public void testStartRebalanceMvPartition() throws Exception {
- super.testStartRebalanceMvPartition();
+ public void testDestroyPartition() throws Exception {
+ super.testDestroyPartition();
}
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-18027")
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18180")
@Override
- public void testAbortRebalanceMvPartition() throws Exception {
- super.testAbortRebalanceMvPartition();
+ public void testReCreatePartition() throws Exception {
+ super.testReCreatePartition();
}
@Disabled("https://issues.apache.org/jira/browse/IGNITE-18027")
@Override
- public void testFinishRebalanceMvPartition() throws Exception {
- super.testFinishRebalanceMvPartition();
+ public void testSuccessRebalance() throws Exception {
+ super.testSuccessRebalance();
}
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-18180")
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18027")
@Override
- public void testDestroyPartition() throws Exception {
- super.testDestroyPartition();
+ public void testFailRebalance() throws Exception {
+ super.testFailRebalance();
}
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-18180")
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18027")
@Override
- public void testReCreatePartition() throws Exception {
- super.testReCreatePartition();
+ public void testStartRebalanceForClosedPartition() {
+ super.testStartRebalanceForClosedPartition();
}
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java
index d4b9a7161b..e5e03e5300 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java
@@ -170,8 +170,8 @@ public interface TxStateStorage extends ManuallyCloseable {
*
* <p>After calling this method, methods for writing and reading will be
available.
*
- * <p>If a full rebalance has not started, then an IgniteInternalException
with {@link Transactions#TX_STATE_STORAGE_FULL_REBALANCE_ERR}
- * will be thrown
+ * <p>If a full rebalance has not started, then an {@link
IgniteInternalException} with
+ * {@link Transactions#TX_STATE_STORAGE_FULL_REBALANCE_ERR} will be thrown.
*
* @param lastAppliedIndex Last applied index.
* @param lastAppliedTerm Last applied term.