This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new a6b8a6deaf IGNITE-18021 Creating an API for full rebalance without 
losing user data in MvPartitionStorage on receiver (#1286)
a6b8a6deaf is described below

commit a6b8a6deafead6282a5d41ae2ad2baf770d2034c
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Wed Nov 2 11:02:11 2022 +0300

    IGNITE-18021 Creating an API for full rebalance without losing user data in 
MvPartitionStorage on receiver (#1286)
---
 .../internal/storage/engine/MvTableStorage.java    | 41 ++++++++++++
 .../storage/AbstractMvTableStorageTest.java        | 78 ++++++++++++++++++++++
 .../internal/storage/impl/TestMvTableStorage.java  | 45 +++++++++++--
 .../PersistentPageMemoryTableStorage.java          | 19 ++++++
 .../pagememory/VolatilePageMemoryTableStorage.java | 19 ++++++
 .../PersistentPageMemoryMvTableStorageTest.java    | 18 +++++
 .../VolatilePageMemoryMvTableStorageTest.java      | 18 +++++
 .../storage/rocksdb/RocksDbTableStorage.java       | 18 +++++
 .../storage/rocksdb/RocksDbMvTableStorageTest.java | 19 ++++++
 9 files changed, 269 insertions(+), 6 deletions(-)

diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
index 386c7db7b9..fad6d93356 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
@@ -162,4 +162,45 @@ public interface MvTableStorage {
      * @throws StorageException If an error has occurred during the 
destruction of the storage.
      */
     void destroy() throws StorageException;
+
+    /**
+     * Prepares the partition storage for rebalancing: makes a backup of the 
current partition storage and creates a new storage.
+     *
+     * <p>This method must be called before every full rebalance of the 
partition storage, so that in case of errors or cancellation of the
+     * full rebalance, we can restore the partition storage from the backup.
+     *
+     * <p>Full rebalance will be completed when one of the methods is called:
+     * <ol>
+     *     <li>{@link #abortRebalanceMvPartition(int)} - in case of a full 
rebalance cancellation or failure, so that we can
+     *     restore the partition storage from a backup;</li>
+     *     <li>{@link #finishRebalanceMvPartition(int)} - in case of a 
successful full rebalance, to remove the backup of the
+     *     partition storage.</li>
+     * </ol>
+     *
+     * @param partitionId Partition ID.
+     * @return Future, if completed without errors, then {@link 
#getMvPartition} will return a new (empty) partition storage.
+     */
+    CompletableFuture<Void> startRebalanceMvPartition(int partitionId);
+
+    /**
+     * Aborts rebalancing of the partition storage if it was started: restores 
the partition storage from a backup and deletes the new
+     * storage.
+     *
+     * <p>If a full rebalance has not been {@link 
#startRebalanceMvPartition(int) started}, then nothing will happen.
+     *
+     * @param partitionId Partition ID.
+     * @return Future, upon completion of which {@link #getMvPartition} will 
return the partition storage restored from the backup.
+     */
+    CompletableFuture<Void> abortRebalanceMvPartition(int partitionId);
+
+    /**
+     * Finishes a successful partition storage rebalance if it has been 
started: deletes the backup of the partition storage and saves a new
+     * storage.
+     *
+     * <p>If a full rebalance has not been {@link 
#startRebalanceMvPartition(int) started}, then nothing will happen.
+     *
+     * @param partitionId Partition ID.
+     * @return Future, if it fails, will abort the partition storage rebalance.
+     */
+    CompletableFuture<Void> finishRebalanceMvPartition(int partitionId);
 }
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
index 13f251f9be..201a081ba9 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
@@ -26,15 +26,23 @@ import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 import static org.hamcrest.Matchers.sameInstance;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
 import org.apache.ignite.internal.configuration.util.ConfigurationUtil;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.BinaryTupleSchema;
@@ -77,6 +85,8 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
 
     private TableIndexView hashIdx;
 
+    private final HybridClock clock = new HybridClockImpl();
+
     /**
      * Initializes the internal structures needed for tests.
      *
@@ -299,6 +309,74 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
         );
     }
 
+    @Test
+    public void testStartRebalanceMvPartition() throws Exception {
+        MvPartitionStorage partitionStorage = 
tableStorage.getOrCreateMvPartition(PARTITION_ID);
+
+        partitionStorage.runConsistently(() -> {
+            partitionStorage.addWriteCommitted(
+                    new RowId(PARTITION_ID),
+                    binaryRow(new TestKey(0, "0"), new TestValue(1, "1")),
+                    clock.now()
+            );
+
+            partitionStorage.lastAppliedIndex(100);
+
+            return null;
+        });
+
+        partitionStorage.flush().get(1, TimeUnit.SECONDS);
+
+        tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1, 
TimeUnit.SECONDS);
+
+        MvPartitionStorage newPartitionStorage0 = 
tableStorage.getMvPartition(PARTITION_ID);
+
+        assertNotNull(newPartitionStorage0);
+        assertNotSame(partitionStorage, newPartitionStorage0);
+
+        assertEquals(0L, newPartitionStorage0.lastAppliedIndex());
+        assertEquals(0L, newPartitionStorage0.persistedIndex());
+        assertEquals(0, newPartitionStorage0.rowsCount());
+
+        tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1, 
TimeUnit.SECONDS);
+
+        MvPartitionStorage newPartitionStorage1 = 
tableStorage.getMvPartition(PARTITION_ID);
+
+        assertSame(newPartitionStorage0, newPartitionStorage1);
+    }
+
+    @Test
+    public void testAbortRebalanceMvPartition() throws Exception {
+        assertDoesNotThrow(() -> 
tableStorage.abortRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS));
+
+        MvPartitionStorage partitionStorage = 
tableStorage.getOrCreateMvPartition(PARTITION_ID);
+
+        tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1, 
TimeUnit.SECONDS);
+
+        tableStorage.abortRebalanceMvPartition(PARTITION_ID).get(1, 
TimeUnit.SECONDS);
+
+        assertSame(partitionStorage, 
tableStorage.getMvPartition(PARTITION_ID));
+
+        assertDoesNotThrow(() -> 
tableStorage.abortRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS));
+    }
+
+    @Test
+    public void testFinishRebalanceMvPartition() throws Exception {
+        assertDoesNotThrow(() -> 
tableStorage.finishRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS));
+
+        tableStorage.getOrCreateMvPartition(PARTITION_ID);
+
+        tableStorage.startRebalanceMvPartition(PARTITION_ID).get(1, 
TimeUnit.SECONDS);
+
+        MvPartitionStorage newPartitionStorage = 
tableStorage.getMvPartition(PARTITION_ID);
+
+        tableStorage.finishRebalanceMvPartition(PARTITION_ID).get(1, 
TimeUnit.SECONDS);
+
+        assertSame(newPartitionStorage, 
tableStorage.getMvPartition(PARTITION_ID));
+
+        assertDoesNotThrow(() -> 
tableStorage.finishRebalanceMvPartition(PARTITION_ID).get(1, TimeUnit.SECONDS));
+    }
+
     private static void createTestIndexes(TablesConfiguration tablesConfig) {
         List<IndexDefinition> indexDefinitions = List.of(
                 SchemaBuilders.sortedIndex(SORTED_INDEX_NAME)
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
index 9293496fad..68858f8513 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.storage.impl;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -40,6 +42,8 @@ import org.jetbrains.annotations.Nullable;
 public class TestMvTableStorage implements MvTableStorage {
     private final Map<Integer, MvPartitionStorage> partitions = new 
ConcurrentHashMap<>();
 
+    private final Map<Integer, MvPartitionStorage> backupPartitions = new 
ConcurrentHashMap<>();
+
     private final Map<UUID, SortedIndices> sortedIndicesById = new 
ConcurrentHashMap<>();
 
     private final Map<UUID, HashIndices> hashIndicesById = new 
ConcurrentHashMap<>();
@@ -101,12 +105,10 @@ public class TestMvTableStorage implements MvTableStorage 
{
 
     @Override
     public void destroyPartition(int partitionId) throws StorageException {
-        Integer boxedPartitionId = partitionId;
-
-        partitions.remove(boxedPartitionId);
+        partitions.remove(partitionId);
 
-        sortedIndicesById.values().forEach(indices -> 
indices.storageByPartitionId.remove(boxedPartitionId));
-        hashIndicesById.values().forEach(indices -> 
indices.storageByPartitionId.remove(boxedPartitionId));
+        sortedIndicesById.values().forEach(indices -> 
indices.storageByPartitionId.remove(partitionId));
+        hashIndicesById.values().forEach(indices -> 
indices.storageByPartitionId.remove(partitionId));
     }
 
     @Override
@@ -147,7 +149,7 @@ public class TestMvTableStorage implements MvTableStorage {
             
hashIndex.storageByPartitionId.values().forEach(HashIndexStorage::destroy);
         }
 
-        return CompletableFuture.completedFuture(null);
+        return completedFuture(null);
     }
 
     @Override
@@ -176,4 +178,35 @@ public class TestMvTableStorage implements MvTableStorage {
     @Override
     public void destroy() throws StorageException {
     }
+
+    @Override
+    public CompletableFuture<Void> startRebalanceMvPartition(int partitionId) {
+        MvPartitionStorage oldPartitionStorage = partitions.get(partitionId);
+
+        assert oldPartitionStorage != null : "Partition does not exist: " + 
partitionId;
+
+        if (backupPartitions.putIfAbsent(partitionId, oldPartitionStorage) == 
null) {
+            partitions.put(partitionId, new 
TestMvPartitionStorage(partitionId));
+        }
+
+        return completedFuture(null);
+    }
+
+    @Override
+    public CompletableFuture<Void> abortRebalanceMvPartition(int partitionId) {
+        MvPartitionStorage oldPartitionStorage = 
backupPartitions.remove(partitionId);
+
+        if (oldPartitionStorage != null) {
+            partitions.put(partitionId, oldPartitionStorage);
+        }
+
+        return completedFuture(null);
+    }
+
+    @Override
+    public CompletableFuture<Void> finishRebalanceMvPartition(int partitionId) 
{
+        backupPartitions.remove(partitionId);
+
+        return completedFuture(null);
+    }
 }
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
index dc9259e995..6587bd5d3a 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.storage.pagememory;
 import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX;
 
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.internal.pagememory.evict.PageEvictionTrackerNoOp;
 import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolderNoOp;
@@ -399,4 +400,22 @@ public class PersistentPageMemoryTableStorage extends 
AbstractPageMemoryTableSto
             );
         }
     }
+
+    @Override
+    public CompletableFuture<Void> startRebalanceMvPartition(int partitionId) {
+        // TODO: IGNITE-18029 Implement
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CompletableFuture<Void> abortRebalanceMvPartition(int partitionId) {
+        // TODO: IGNITE-18029 Implement
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CompletableFuture<Void> finishRebalanceMvPartition(int partitionId) 
{
+        // TODO: IGNITE-18029 Implement
+        throw new UnsupportedOperationException();
+    }
 }
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
index f20f4ccd57..b690561e4e 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.storage.pagememory;
 
 import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.internal.pagememory.util.PageLockListenerNoOp;
 import org.apache.ignite.internal.schema.configuration.TableConfiguration;
@@ -138,4 +139,22 @@ public class VolatilePageMemoryTableStorage extends 
AbstractPageMemoryTableStora
             );
         }
     }
+
+    @Override
+    public CompletableFuture<Void> startRebalanceMvPartition(int partitionId) {
+        // TODO: IGNITE-18028 Implement
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CompletableFuture<Void> abortRebalanceMvPartition(int partitionId) {
+        // TODO: IGNITE-18028 Implement
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CompletableFuture<Void> finishRebalanceMvPartition(int partitionId) 
{
+        // TODO: IGNITE-18028 Implement
+        throw new UnsupportedOperationException();
+    }
 }
diff --git 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
index 123c66ccea..7add533606 100644
--- 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
+++ 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
@@ -83,4 +83,22 @@ public class PersistentPageMemoryMvTableStorageTest extends 
AbstractMvTableStora
     public void testDestroyIndex() {
         super.testDestroyIndex();
     }
+
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18029";)
+    @Override
+    public void testStartRebalanceMvPartition() throws Exception {
+        super.testStartRebalanceMvPartition();
+    }
+
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18029";)
+    @Override
+    public void testAbortRebalanceMvPartition() throws Exception {
+        super.testAbortRebalanceMvPartition();
+    }
+
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18029";)
+    @Override
+    public void testFinishRebalanceMvPartition() throws Exception {
+        super.testFinishRebalanceMvPartition();
+    }
 }
diff --git 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryMvTableStorageTest.java
 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryMvTableStorageTest.java
index 7021466b9b..dc442816ab 100644
--- 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryMvTableStorageTest.java
+++ 
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryMvTableStorageTest.java
@@ -77,4 +77,22 @@ public class VolatilePageMemoryMvTableStorageTest extends 
AbstractMvTableStorage
     public void testDestroyIndex() {
         super.testDestroyIndex();
     }
+
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18028";)
+    @Override
+    public void testStartRebalanceMvPartition() throws Exception {
+        super.testStartRebalanceMvPartition();
+    }
+
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18028";)
+    @Override
+    public void testAbortRebalanceMvPartition() throws Exception {
+        super.testAbortRebalanceMvPartition();
+    }
+
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18028";)
+    @Override
+    public void testFinishRebalanceMvPartition() throws Exception {
+        super.testFinishRebalanceMvPartition();
+    }
 }
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index 1ba1f0763c..79ba41a3a7 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -559,4 +559,22 @@ public class RocksDbTableStorage implements MvTableStorage 
{
 
         return new ColumnFamilyDescriptor(cfName.getBytes(UTF_8), options);
     }
+
+    @Override
+    public CompletableFuture<Void> startRebalanceMvPartition(int partitionId) {
+        // TODO: IGNITE-18027 Implement
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CompletableFuture<Void> abortRebalanceMvPartition(int partitionId) {
+        // TODO: IGNITE-18027 Implement
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CompletableFuture<Void> finishRebalanceMvPartition(int partitionId) 
{
+        // TODO: IGNITE-18027 Implement
+        throw new UnsupportedOperationException();
+    }
 }
diff --git 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
index b6a54467de..dd1ee17912 100644
--- 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
+++ 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
@@ -40,6 +40,7 @@ import 
org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
@@ -150,4 +151,22 @@ public class RocksDbMvTableStorageTest extends 
AbstractMvTableStorageTest {
     void storageAdvertisesItIsPersistent() {
         assertThat(tableStorage.isVolatile(), is(false));
     }
+
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18027";)
+    @Override
+    public void testStartRebalanceMvPartition() throws Exception {
+        super.testStartRebalanceMvPartition();
+    }
+
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18027";)
+    @Override
+    public void testAbortRebalanceMvPartition() throws Exception {
+        super.testAbortRebalanceMvPartition();
+    }
+
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18027";)
+    @Override
+    public void testFinishRebalanceMvPartition() throws Exception {
+        super.testFinishRebalanceMvPartition();
+    }
 }

Reply via email to