This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new a6b8a6deaf IGNITE-18021 Creating an API for full rebalance without
losing user data in MvPartitionStorage on receiver (#1286)
a6b8a6deaf is described below
commit a6b8a6deafead6282a5d41ae2ad2baf770d2034c
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Wed Nov 2 11:02:11 2022 +0300
IGNITE-18021 Creating an API for full rebalance without losing user data in
MvPartitionStorage on receiver (#1286)
---
.../internal/storage/engine/MvTableStorage.java | 41 ++++++++++++
.../storage/AbstractMvTableStorageTest.java | 78 ++++++++++++++++++++++
.../internal/storage/impl/TestMvTableStorage.java | 45 +++++++++++--
.../PersistentPageMemoryTableStorage.java | 19 ++++++
.../pagememory/VolatilePageMemoryTableStorage.java | 19 ++++++
.../PersistentPageMemoryMvTableStorageTest.java | 18 +++++
.../VolatilePageMemoryMvTableStorageTest.java | 18 +++++
.../storage/rocksdb/RocksDbTableStorage.java | 18 +++++
.../storage/rocksdb/RocksDbMvTableStorageTest.java | 19 ++++++
9 files changed, 269 insertions(+), 6 deletions(-)
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
index 386c7db7b9..fad6d93356 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
@@ -162,4 +162,45 @@ public interface MvTableStorage {
* @throws StorageException If an error has occurred during the
destruction of the storage.
*/
void destroy() throws StorageException;
+
+ /**
+ * Prepares the partition storage for rebalancing: makes a backup of the
current partition storage and creates a new storage.
+ *
+ * <p>This method must be called before every full rebalance of the
partition storage, so that in case of errors or cancellation of the
+ * full rebalance, we can restore the partition storage from the backup.
+ *
+ * <p>Full rebalance will be completed when one of the methods is called:
+ * <ol>
+ * <li>{@link #abortRebalanceMvPartition(int)} - in case of a full
rebalance cancellation or failure, so that we can
+ * restore the partition storage from a backup;</li>
+ * <li>{@link #finishRebalanceMvPartition(int)} - in case of a
successful full rebalance, to remove the backup of the
+ * partition storage.</li>
+ * </ol>
+ *
+ * @param partitionId Partition ID.
+ * @return Future, if completed without errors, then {@link
#getMvPartition} will return a new (empty) partition storage.
+ */
+ CompletableFuture<Void> startRebalanceMvPartition(int partitionId);
+
+ /**
+ * Aborts rebalancing of the partition storage if it was started: restores
the partition storage from a backup and deletes the new
+ * storage.
+ *
+ * <p>If a full rebalance has not been {@link
#startRebalanceMvPartition(int) started}, then nothing will happen.
+ *
+ * @param partitionId Partition ID.
+ * @return Future, upon completion of which {@link #getMvPartition} will
return the partition storage restored from the backup.
+ */
+ CompletableFuture<Void> abortRebalanceMvPartition(int partitionId);
+
+ /**
+ * Finishes a successful partition storage rebalance if it has been
started: deletes the backup of the partition storage and saves a new
+ * storage.
+ *
+ * <p>If a full rebalance has not been {@link
#startRebalanceMvPartition(int) started}, then nothing will happen.
+ *
+ * @param partitionId Partition ID.
+ * @return Future, if it fails, will abort the partition storage rebalance.
+ */
+ CompletableFuture<Void> finishRebalanceMvPartition(int partitionId);
}
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
index 13f251f9be..201a081ba9 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
@@ -26,15 +26,23 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
import org.apache.ignite.internal.configuration.util.ConfigurationUtil;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTupleSchema;
@@ -77,6 +85,8 @@ public abstract class AbstractMvTableStorageTest extends
BaseMvStoragesTest {
private TableIndexView hashIdx;
+ private final HybridClock clock = new HybridClockImpl();
+
/**
* Initializes the internal structures needed for tests.
*
@@ -299,6 +309,74 @@ public abstract class AbstractMvTableStorageTest extends
BaseMvStoragesTest {
);
}
+ @Test
+ public void testStartRebalanceMvPartition() throws Exception {
+ MvPartitionStorage partitionStorage =
tableStorage.getOrCreateMvPartition(PARTITION_ID);
+
+ partitionStorage.runConsistently(() -> {
+ partitionStorage.addWriteCommitted(
+ new RowId(PARTITION_ID),
+ binaryRow(new TestKey(0, "0"), new TestValue(1, "1")),
+ clock.now()
+ );
+
+ partitionStorage.lastAppliedIndex(100);
+
+ return null;
+ });
+
+ partitionStorage.flush().get(1, TimeUnit.SECONDS);
+
+ tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1,
TimeUnit.SECONDS);
+
+ MvPartitionStorage newPartitionStorage0 =
tableStorage.getMvPartition(PARTITION_ID);
+
+ assertNotNull(newPartitionStorage0);
+ assertNotSame(partitionStorage, newPartitionStorage0);
+
+ assertEquals(0L, newPartitionStorage0.lastAppliedIndex());
+ assertEquals(0L, newPartitionStorage0.persistedIndex());
+ assertEquals(0, newPartitionStorage0.rowsCount());
+
+ tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1,
TimeUnit.SECONDS);
+
+ MvPartitionStorage newPartitionStorage1 =
tableStorage.getMvPartition(PARTITION_ID);
+
+ assertSame(newPartitionStorage0, newPartitionStorage1);
+ }
+
+ @Test
+ public void testAbortRebalanceMvPartition() throws Exception {
+ assertDoesNotThrow(() ->
tableStorage.abortRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS));
+
+ MvPartitionStorage partitionStorage =
tableStorage.getOrCreateMvPartition(PARTITION_ID);
+
+ tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1,
TimeUnit.SECONDS);
+
+ tableStorage.abortRebalanceMvPartition(PARTITION_ID).get(1,
TimeUnit.SECONDS);
+
+ assertSame(partitionStorage,
tableStorage.getMvPartition(PARTITION_ID));
+
+ assertDoesNotThrow(() ->
tableStorage.abortRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void testFinishRebalanceMvPartition() throws Exception {
+ assertDoesNotThrow(() ->
tableStorage.finishRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS));
+
+ tableStorage.getOrCreateMvPartition(PARTITION_ID);
+
+ tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1,
TimeUnit.SECONDS);
+
+ MvPartitionStorage newPartitionStorage =
tableStorage.getMvPartition(PARTITION_ID);
+
+ tableStorage.finishRebalanceMvPartition(PARTITION_ID).get(1,
TimeUnit.SECONDS);
+
+ assertSame(newPartitionStorage,
tableStorage.getMvPartition(PARTITION_ID));
+
+ assertDoesNotThrow(() ->
tableStorage.finishRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS));
+ }
+
private static void createTestIndexes(TablesConfiguration tablesConfig) {
List<IndexDefinition> indexDefinitions = List.of(
SchemaBuilders.sortedIndex(SORTED_INDEX_NAME)
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
index 9293496fad..68858f8513 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.storage.impl;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -40,6 +42,8 @@ import org.jetbrains.annotations.Nullable;
public class TestMvTableStorage implements MvTableStorage {
private final Map<Integer, MvPartitionStorage> partitions = new
ConcurrentHashMap<>();
+ private final Map<Integer, MvPartitionStorage> backupPartitions = new
ConcurrentHashMap<>();
+
private final Map<UUID, SortedIndices> sortedIndicesById = new
ConcurrentHashMap<>();
private final Map<UUID, HashIndices> hashIndicesById = new
ConcurrentHashMap<>();
@@ -101,12 +105,10 @@ public class TestMvTableStorage implements MvTableStorage
{
@Override
public void destroyPartition(int partitionId) throws StorageException {
- Integer boxedPartitionId = partitionId;
-
- partitions.remove(boxedPartitionId);
+ partitions.remove(partitionId);
- sortedIndicesById.values().forEach(indices ->
indices.storageByPartitionId.remove(boxedPartitionId));
- hashIndicesById.values().forEach(indices ->
indices.storageByPartitionId.remove(boxedPartitionId));
+ sortedIndicesById.values().forEach(indices ->
indices.storageByPartitionId.remove(partitionId));
+ hashIndicesById.values().forEach(indices ->
indices.storageByPartitionId.remove(partitionId));
}
@Override
@@ -147,7 +149,7 @@ public class TestMvTableStorage implements MvTableStorage {
hashIndex.storageByPartitionId.values().forEach(HashIndexStorage::destroy);
}
- return CompletableFuture.completedFuture(null);
+ return completedFuture(null);
}
@Override
@@ -176,4 +178,35 @@ public class TestMvTableStorage implements MvTableStorage {
@Override
public void destroy() throws StorageException {
}
+
+ @Override
+ public CompletableFuture<Void> startRebalanceMvPartition(int partitionId) {
+ MvPartitionStorage oldPartitionStorage = partitions.get(partitionId);
+
+ assert oldPartitionStorage != null : "Partition does not exist: " +
partitionId;
+
+ if (backupPartitions.putIfAbsent(partitionId, oldPartitionStorage) ==
null) {
+ partitions.put(partitionId, new
TestMvPartitionStorage(partitionId));
+ }
+
+ return completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void> abortRebalanceMvPartition(int partitionId) {
+ MvPartitionStorage oldPartitionStorage =
backupPartitions.remove(partitionId);
+
+ if (oldPartitionStorage != null) {
+ partitions.put(partitionId, oldPartitionStorage);
+ }
+
+ return completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void> finishRebalanceMvPartition(int partitionId)
{
+ backupPartitions.remove(partitionId);
+
+ return completedFuture(null);
+ }
}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
index dc9259e995..6587bd5d3a 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.storage.pagememory;
import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.internal.pagememory.evict.PageEvictionTrackerNoOp;
import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolderNoOp;
@@ -399,4 +400,22 @@ public class PersistentPageMemoryTableStorage extends
AbstractPageMemoryTableSto
);
}
}
+
+ @Override
+ public CompletableFuture<Void> startRebalanceMvPartition(int partitionId) {
+ // TODO: IGNITE-18029 Implement
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<Void> abortRebalanceMvPartition(int partitionId) {
+ // TODO: IGNITE-18029 Implement
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<Void> finishRebalanceMvPartition(int partitionId)
{
+ // TODO: IGNITE-18029 Implement
+ throw new UnsupportedOperationException();
+ }
}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
index f20f4ccd57..b690561e4e 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.storage.pagememory;
import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.internal.pagememory.util.PageLockListenerNoOp;
import org.apache.ignite.internal.schema.configuration.TableConfiguration;
@@ -138,4 +139,22 @@ public class VolatilePageMemoryTableStorage extends
AbstractPageMemoryTableStora
);
}
}
+
+ @Override
+ public CompletableFuture<Void> startRebalanceMvPartition(int partitionId) {
+ // TODO: IGNITE-18028 Implement
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<Void> abortRebalanceMvPartition(int partitionId) {
+ // TODO: IGNITE-18028 Implement
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<Void> finishRebalanceMvPartition(int partitionId)
{
+ // TODO: IGNITE-18028 Implement
+ throw new UnsupportedOperationException();
+ }
}
diff --git
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
index 123c66ccea..7add533606 100644
---
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
+++
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
@@ -83,4 +83,22 @@ public class PersistentPageMemoryMvTableStorageTest extends
AbstractMvTableStora
public void testDestroyIndex() {
super.testDestroyIndex();
}
+
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18029")
+ @Override
+ public void testStartRebalanceMvPartition() throws Exception {
+ super.testStartRebalanceMvPartition();
+ }
+
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18029")
+ @Override
+ public void testAbortRebalanceMvPartition() throws Exception {
+ super.testAbortRebalanceMvPartition();
+ }
+
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18029")
+ @Override
+ public void testFinishRebalanceMvPartition() throws Exception {
+ super.testFinishRebalanceMvPartition();
+ }
}
diff --git
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryMvTableStorageTest.java
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryMvTableStorageTest.java
index 7021466b9b..dc442816ab 100644
---
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryMvTableStorageTest.java
+++
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryMvTableStorageTest.java
@@ -77,4 +77,22 @@ public class VolatilePageMemoryMvTableStorageTest extends
AbstractMvTableStorage
public void testDestroyIndex() {
super.testDestroyIndex();
}
+
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18028")
+ @Override
+ public void testStartRebalanceMvPartition() throws Exception {
+ super.testStartRebalanceMvPartition();
+ }
+
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18028")
+ @Override
+ public void testAbortRebalanceMvPartition() throws Exception {
+ super.testAbortRebalanceMvPartition();
+ }
+
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18028")
+ @Override
+ public void testFinishRebalanceMvPartition() throws Exception {
+ super.testFinishRebalanceMvPartition();
+ }
}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index 1ba1f0763c..79ba41a3a7 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -559,4 +559,22 @@ public class RocksDbTableStorage implements MvTableStorage
{
return new ColumnFamilyDescriptor(cfName.getBytes(UTF_8), options);
}
+
+ @Override
+ public CompletableFuture<Void> startRebalanceMvPartition(int partitionId) {
+ // TODO: IGNITE-18027 Implement
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<Void> abortRebalanceMvPartition(int partitionId) {
+ // TODO: IGNITE-18027 Implement
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<Void> finishRebalanceMvPartition(int partitionId)
{
+ // TODO: IGNITE-18027 Implement
+ throw new UnsupportedOperationException();
+ }
}
diff --git
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
index b6a54467de..dd1ee17912 100644
---
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
+++
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
@@ -40,6 +40,7 @@ import
org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.util.IgniteUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -150,4 +151,22 @@ public class RocksDbMvTableStorageTest extends
AbstractMvTableStorageTest {
void storageAdvertisesItIsPersistent() {
assertThat(tableStorage.isVolatile(), is(false));
}
+
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18027")
+ @Override
+ public void testStartRebalanceMvPartition() throws Exception {
+ super.testStartRebalanceMvPartition();
+ }
+
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18027")
+ @Override
+ public void testAbortRebalanceMvPartition() throws Exception {
+ super.testAbortRebalanceMvPartition();
+ }
+
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18027")
+ @Override
+ public void testFinishRebalanceMvPartition() throws Exception {
+ super.testFinishRebalanceMvPartition();
+ }
}