This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new a227876c0c IGNITE-20187 Pending assignments recovery on Table Manager 
start (#2517)
a227876c0c is described below

commit a227876c0ce4ebf56417b3b031fb3140713bd20e
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Mon Sep 11 13:12:04 2023 +0300

    IGNITE-20187 Pending assignments recovery on Table Manager start (#2517)
---
 .../org/apache/ignite/lang/util/StringUtils.java   |  25 ++
 .../apache/ignite/lang/util/StringUtilsTest.java}  |  41 ++--
 .../internal/metastorage/MetaStorageManager.java   |  13 ++
 .../metastorage/impl/MetaStorageManagerImpl.java   |   8 +
 .../placementdriver/AssignmentsTracker.java        |   7 +-
 .../ItRebalanceDistributedTest.java                |   6 +-
 .../storage/DistributedConfigurationStorage.java   |  24 +-
 .../storage/LocalConfigurationStorage.java         |  10 +-
 .../DistributedConfigurationStorageTest.java       |   7 +-
 .../sql/engine/exec/MockedStructuresTest.java      |  49 ++--
 .../internal/table/distributed/TableManager.java   | 253 ++++++++++++---------
 .../table/distributed/TableManagerTest.java        |  15 +-
 12 files changed, 254 insertions(+), 204 deletions(-)

diff --git 
a/modules/api/src/main/java/org/apache/ignite/lang/util/StringUtils.java 
b/modules/api/src/main/java/org/apache/ignite/lang/util/StringUtils.java
index 9eb1e12a4e..30c4127180 100644
--- a/modules/api/src/main/java/org/apache/ignite/lang/util/StringUtils.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/util/StringUtils.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.lang.util;
 
+import java.util.Objects;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -45,4 +46,28 @@ public final class StringUtils {
     public static boolean nullOrBlank(@Nullable String s) {
         return s == null || s.isBlank();
     }
+
+    /**
+     * Increments the numeric value of the last character of the given string 
by 1.
+     *
+     * <p>This method is useful for using APIs that accept string ranges where 
the upper bound is not included.
+     *
+     * @param s Original string.
+     * @return New string with the last character incremented.
+     */
+    public static String incrementLastChar(String s) {
+        Objects.requireNonNull(s);
+
+        char[] chars = s.toCharArray();
+
+        char lastChar = chars[chars.length - 1];
+
+        if (lastChar == Character.MAX_VALUE) {
+            throw new IllegalArgumentException("Cannot increment the last 
character as it is equal to MAX_VALUE");
+        }
+
+        chars[chars.length - 1] = (char) (lastChar + 1);
+
+        return String.valueOf(chars);
+    }
 }
diff --git 
a/modules/api/src/main/java/org/apache/ignite/lang/util/StringUtils.java 
b/modules/api/src/test/java/org/apache/ignite/lang/util/StringUtilsTest.java
similarity index 50%
copy from modules/api/src/main/java/org/apache/ignite/lang/util/StringUtils.java
copy to 
modules/api/src/test/java/org/apache/ignite/lang/util/StringUtilsTest.java
index 9eb1e12a4e..07180cc435 100644
--- a/modules/api/src/main/java/org/apache/ignite/lang/util/StringUtils.java
+++ b/modules/api/src/test/java/org/apache/ignite/lang/util/StringUtilsTest.java
@@ -17,32 +17,29 @@
 
 package org.apache.ignite.lang.util;
 
-import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.lang.util.StringUtils.incrementLastChar;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
-/**
- * Class containing useful methods for working with strings.
- */
-public final class StringUtils {
-    private StringUtils() {
+import org.junit.jupiter.api.Test;
+
+class StringUtilsTest {
+    @Test
+    void testIncrementLastChar() {
+        assertThat(incrementLastChar("foo_a"), is("foo_b"));
+        assertThat(incrementLastChar("x"), is("y"));
     }
 
-    /**
-     * Tests if given string is {@code null} or empty.
-     *
-     * @param s String to test.
-     * @return Whether or not the given string is {@code null} or empty.
-     */
-    public static boolean nullOrEmpty(@Nullable String s) {
-        return s == null || s.isEmpty();
+    @Test
+    void testIncrementLastCharUtf16() {
+        assertThat(incrementLastChar("foo_ы"), is("foo_ь"));
+        assertThat(incrementLastChar("ы"), is("ь"));
     }
 
-    /**
-     * Tests if given string is {@code null} or {@link String#isBlank}.
-     *
-     * @param s String to test.
-     * @return Whether or not the given string is {@code null} or blank.
-     */
-    public static boolean nullOrBlank(@Nullable String s) {
-        return s == null || s.isBlank();
+    @Test
+    void testIncrementLastCharError() {
+        assertThrows(NullPointerException.class, () -> 
incrementLastChar(null));
+        assertThrows(IllegalArgumentException.class, () -> 
incrementLastChar("foo_" + Character.MAX_VALUE));
     }
 }
diff --git 
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
 
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index 54453fbff0..37cd61af79 100644
--- 
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++ 
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -107,6 +107,19 @@ public interface MetaStorageManager extends 
IgniteComponent {
      */
     Cursor<Entry> getLocally(ByteArray startKey, ByteArray endKey, long 
revUpperBound);
 
+    /**
+     * Returns cursor by entries which correspond to the given key prefix and 
bounded by revision number. The entries in the cursor
+     * are obtained from the local storage.
+     *
+     * <p>This method doesn't wait for the storage's revision to become 
greater or equal to the revUpperBound parameter, so it is
+     * up to user to wait for the appropriate time to call this method.
+     *
+     * @param keyPrefix Key prefix.
+     * @param revUpperBound Upper bound of revision.
+     * @return Cursor by entries which correspond to the given key prefix.
+     */
+    Cursor<Entry> prefixLocally(ByteArray keyPrefix, long revUpperBound);
+
     /**
      * Looks up a timestamp by a revision. This should only be invoked if it 
is guaranteed that the
      * revision is available in the local storage. This method always operates 
locally.
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index b3c41997cf..74a817a730 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -531,6 +531,14 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
         return storage.range(startKey.bytes(), endKey.bytes(), revUpperBound);
     }
 
+    @Override
+    public Cursor<Entry> prefixLocally(ByteArray keyPrefix, long 
revUpperBound) {
+        byte[] rangeStart = keyPrefix.bytes();
+        byte[] rangeEnd = storage.nextKey(rangeStart);
+
+        return storage.range(rangeStart, rangeEnd, revUpperBound);
+    }
+
     @Override
     public HybridTimestamp timestampByRevision(long revision) {
         if (!busyLock.enterBusy()) {
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
index c8a3542421..9bf4778909 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.placementdriver;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static 
org.apache.ignite.internal.utils.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX;
+import static org.apache.ignite.lang.util.StringUtils.incrementLastChar;
 
 import java.nio.charset.StandardCharsets;
 import java.util.Map;
@@ -101,12 +102,6 @@ public class AssignmentsTracker {
         LOG.info("Assignment cache initialized for placement driver 
[groupAssignments={}]", groupAssignments);
     }
 
-    private static String incrementLastChar(String str) {
-        char lastChar = str.charAt(str.length() - 1);
-
-        return str.substring(0, str.length() - 1) + (char) (lastChar + 1);
-    }
-
     /**
      * Stops the tracker.
      */
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
similarity index 99%
rename from 
modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
rename to 
modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index e6223062e3..1be39e9a0d 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.configuration.storage;
+package org.apache.ignite.internal.rebalance;
 
 import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -98,6 +98,8 @@ import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopolog
 import org.apache.ignite.internal.configuration.ConfigurationManager;
 import org.apache.ignite.internal.configuration.ConfigurationRegistry;
 import org.apache.ignite.internal.configuration.ConfigurationTreeGenerator;
+import 
org.apache.ignite.internal.configuration.storage.DistributedConfigurationStorage;
+import 
org.apache.ignite.internal.configuration.storage.LocalFileConfigurationStorage;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import 
org.apache.ignite.internal.configuration.validation.TestConfigurationValidator;
@@ -495,7 +497,7 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
     @Test
     @UseTestTxStateStorage
     @UseRocksMetaStorage
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-20187";)
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-20210";)
     void testDestroyPartitionStoragesOnRestartEvictedNode(TestInfo testInfo) 
throws Exception {
         Node node = getNode(0);
 
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
 
b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
index c9d95e84fb..4cc5fca656 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
@@ -75,13 +75,6 @@ public class DistributedConfigurationStorage implements 
ConfigurationStorage {
      */
     private static final ByteArray DST_KEYS_START_RANGE = new 
ByteArray(DISTRIBUTED_PREFIX);
 
-    /**
-     * This key is expected to be the last key in lexicographical order of 
distributed configuration keys. It is possible because keys are
-     * in lexicographical order in meta storage and adding {@code (char)('.' + 
1)} to the end will produce all keys with prefix
-     * {@link DistributedConfigurationStorage#DISTRIBUTED_PREFIX}
-     */
-    private static final ByteArray DST_KEYS_END_RANGE = new 
ByteArray(incrementLastChar(DISTRIBUTED_PREFIX));
-
     /** Meta storage manager. */
     private final MetaStorageManager metaStorageMgr;
 
@@ -129,13 +122,11 @@ public class DistributedConfigurationStorage implements 
ConfigurationStorage {
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<Map<String, ? extends Serializable>> 
readAllLatest(String prefix) {
-        var rangeStart = new ByteArray(DISTRIBUTED_PREFIX + prefix);
-
-        var rangeEnd = new ByteArray(incrementLastChar(DISTRIBUTED_PREFIX + 
prefix));
+        var prefixBytes = new ByteArray(DISTRIBUTED_PREFIX + prefix);
 
         var resultFuture = new CompletableFuture<Map<String, ? extends 
Serializable>>();
 
-        metaStorageMgr.range(rangeStart, rangeEnd).subscribe(new 
Subscriber<>() {
+        metaStorageMgr.prefix(prefixBytes).subscribe(new Subscriber<>() {
             private final Map<String, Serializable> data = new HashMap<>();
 
             @Override
@@ -206,7 +197,7 @@ public class DistributedConfigurationStorage implements 
ConfigurationStorage {
         byte[] masterKey = MASTER_KEY.bytes();
         boolean sawMasterKey = false;
 
-        try (Cursor<Entry> cursor = 
metaStorageMgr.getLocally(DST_KEYS_START_RANGE, DST_KEYS_END_RANGE, 
cfgRevision)) {
+        try (Cursor<Entry> cursor = 
metaStorageMgr.prefixLocally(DST_KEYS_START_RANGE, cfgRevision)) {
             for (Entry entry : cursor) {
                 if (entry.tombstone()) {
                     continue;
@@ -354,15 +345,6 @@ public class DistributedConfigurationStorage implements 
ConfigurationStorage {
         return metaStorageMgr.get(MASTER_KEY).thenApply(Entry::revision);
     }
 
-    /**
-     * Increments the last character of the given string.
-     */
-    private static String incrementLastChar(String str) {
-        char lastChar = str.charAt(str.length() - 1);
-
-        return str.substring(0, str.length() - 1) + (char) (lastChar + 1);
-    }
-
     private <T> CompletableFuture<T> registerFuture(CompletableFuture<T> 
future) {
         futureTracker.registerFuture(future);
 
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java
 
b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java
index c026a6aa04..8be08bb433 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.configuration.storage;
 import static java.util.concurrent.CompletableFuture.supplyAsync;
 import static 
org.apache.ignite.internal.configuration.util.ConfigurationSerializationUtil.fromBytes;
 import static 
org.apache.ignite.internal.configuration.util.ConfigurationSerializationUtil.toBytes;
+import static org.apache.ignite.lang.util.StringUtils.incrementLastChar;
 
 import java.io.Serializable;
 import java.util.HashMap;
@@ -223,15 +224,6 @@ public class LocalConfigurationStorage implements 
ConfigurationStorage {
                 .thenApply(entry -> entry == null ? 0 : (Long) 
fromBytes(entry.value()));
     }
 
-    /**
-     * Increments the last character of the given string.
-     */
-    private static String incrementLastChar(String str) {
-        char lastChar = str.charAt(str.length() - 1);
-
-        return str.substring(0, str.length() - 1) + (char) (lastChar + 1);
-    }
-
     private <T> CompletableFuture<T> registerFuture(CompletableFuture<T> 
future) {
         futureTracker.registerFuture(future);
 
diff --git 
a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorageTest.java
 
b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorageTest.java
index 36d40052d8..e00efb982d 100644
--- 
a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorageTest.java
+++ 
b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorageTest.java
@@ -86,11 +86,10 @@ public class DistributedConfigurationStorageTest extends 
ConfigurationStorageTes
             return CompletableFuture.completedFuture(invokeResult);
         });
 
-        when(mock.range(any(), any())).thenAnswer(invocation -> {
-            ByteArray keyFrom = invocation.getArgument(0);
-            ByteArray keyTo = invocation.getArgument(1);
+        when(mock.prefix(any())).thenAnswer(invocation -> {
+            ByteArray prefix = invocation.getArgument(0);
 
-            return fromCursor(metaStorage.range(keyFrom.bytes(), keyTo == null 
? null : keyTo.bytes()));
+            return fromCursor(metaStorage.range(prefix.bytes(), 
metaStorage.nextKey(prefix.bytes())));
         });
 
         return mock;
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index ab63d6d848..961bacd999 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -102,6 +102,7 @@ import org.apache.ignite.internal.tx.HybridTimestampTracker;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.util.AsyncCursor.BatchedResult;
+import org.apache.ignite.internal.util.CursorUtils;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.lang.IgniteException;
@@ -118,6 +119,7 @@ import org.apache.ignite.tx.IgniteTransactions;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
 import org.mockito.MockedStatic;
@@ -150,7 +152,7 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
     private TopologyService ts;
 
     /** Cluster service. */
-    @Mock(lenient = true)
+    @Mock
     private ClusterService cs;
 
     /** Raft manager. */
@@ -158,23 +160,23 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
     private RaftManager rm;
 
     /** TX manager. */
-    @Mock(lenient = true)
+    @Mock
     private TxManager tm;
 
     /** Ignite transactions. */
-    @Mock(lenient = true)
+    @Mock
     private IgniteTransactions transactions;
 
     /** Meta storage manager. */
     @Mock
-    MetaStorageManager msm;
+    private MetaStorageManager msm;
 
     /** Replica manager. */
     @Mock
-    ReplicaManager replicaManager;
+    private ReplicaManager replicaManager;
 
     @Mock
-    HybridClock clock;
+    private HybridClock clock;
 
     @Mock
     private VaultManager vaultManager;
@@ -203,13 +205,13 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
     @InjectConfiguration
     private GcConfiguration gcConfig;
 
-    TableManager tblManager;
+    private TableManager tblManager;
 
-    IndexManager idxManager;
+    private IndexManager idxManager;
 
-    ClusterManagementGroupManager cmgMgr;
+    private ClusterManagementGroupManager cmgMgr;
 
-    SqlQueryProcessor queryProc;
+    private SqlQueryProcessor queryProc;
 
     @InjectConfiguration
     private RocksDbStorageEngineConfiguration rocksDbEngineConfig;
@@ -233,11 +235,8 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
     private MetricManager metricManager;
 
     /** Returns current method name. */
-    private static String getCurrentMethodName() {
-        return StackWalker.getInstance()
-                .walk(s -> s.skip(1).findFirst())
-                .get()
-                .getMethodName();
+    private static String getCurrentMethodName(TestInfo testInfo) {
+        return testInfo.getTestMethod().orElseThrow().getName();
     }
 
     /** Stop configuration manager. */
@@ -370,14 +369,18 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
 
         //noinspection unchecked
         when(msm.invoke(any(), any(Collection.class), 
any(Collection.class))).thenReturn(completedFuture(null));
+
+        when(msm.recoveryFinishedFuture()).thenReturn(completedFuture(0L));
+
+        when(msm.prefixLocally(any(), 
anyLong())).thenReturn(CursorUtils.emptyCursor());
     }
 
     /**
      * Tests create a table through public API.
      */
     @Test
-    public void testCreateTable() {
-        String curMethodName = getCurrentMethodName();
+    public void testCreateTable(TestInfo testInfo) {
+        String curMethodName = getCurrentMethodName(testInfo);
 
         String newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY, 
c2 varbinary(255)) "
                 + "with primary_zone='%s'", curMethodName, ZONE_NAME);
@@ -422,8 +425,8 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
      * Tests create a table with distribution zone through public API.
      */
     @Test
-    public void testCreateTableWithDistributionZone() {
-        String tableName = getCurrentMethodName().toUpperCase();
+    public void testCreateTableWithDistributionZone(TestInfo testInfo) {
+        String tableName = getCurrentMethodName(testInfo).toUpperCase();
 
         String newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY, 
c2 varbinary(255)) ",
                  tableName);
@@ -464,8 +467,8 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
      * Tests create and drop table through public API.
      */
     @Test
-    public void testDropTable() {
-        String curMethodName = getCurrentMethodName();
+    public void testDropTable(TestInfo testInfo) {
+        String curMethodName = getCurrentMethodName(testInfo);
 
         String newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY, 
c2 varchar(255))", curMethodName);
 
@@ -488,8 +491,8 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
     }
 
     @Test
-    void createTableWithTableOptions() {
-        String method = getCurrentMethodName();
+    void createTableWithTableOptions(TestInfo testInfo) {
+        String method = getCurrentMethodName(testInfo);
 
         assertDoesNotThrow(() -> readFirst(sql(
                 String.format(
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 11a8c5c9ba..e1d03f8ca9 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -43,6 +43,7 @@ import static 
org.apache.ignite.internal.utils.RebalanceUtil.extractPartitionNum
 import static org.apache.ignite.internal.utils.RebalanceUtil.extractTableId;
 import static 
org.apache.ignite.internal.utils.RebalanceUtil.pendingPartAssignmentsKey;
 import static 
org.apache.ignite.internal.utils.RebalanceUtil.stablePartAssignmentsKey;
+import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import java.io.IOException;
@@ -180,6 +181,7 @@ import 
org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
 import 
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbTableStorage;
 import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -286,14 +288,17 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
     /**
      * Versioned store for local partition set by table id.
-     * Completed strictly after {@link #tablesByIdVv} and strictly before 
{@link #assignmentsUpdatedVv}.
+     *
+     * <p>Completed strictly after {@link #tablesByIdVv} and strictly before 
{@link #assignmentsUpdatedVv}.
      */
     private final IncrementalVersionedValue<Map<Integer, PartitionSet>> 
localPartsByTableIdVv;
 
     /**
      * Versioned store for tracking RAFT groups initialization and starting 
completion.
-     * Only explicitly updated in {@link #createTablePartitionsLocally(long, 
CompletableFuture, int, TableImpl)}.
-     * Completed strictly after {@link #localPartsByTableIdVv}.
+     *
+     * <p>Only explicitly updated in {@link 
#createTablePartitionsLocally(long, CompletableFuture, int, TableImpl)}.
+     *
+     * <p>Completed strictly after {@link #localPartsByTableIdVv}.
      */
     private final IncrementalVersionedValue<Void> assignmentsUpdatedVv;
 
@@ -525,6 +530,18 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
         lowWatermark.start();
 
+        try {
+            metaStorageMgr.recoveryFinishedFuture()
+                    .thenComposeAsync(this::performRebalanceOnRecovery, 
ioExecutor)
+                    .get();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new IgniteInternalException(INTERNAL_ERR, e);
+        } catch (ExecutionException e) {
+            throw new IgniteInternalException(INTERNAL_ERR, e);
+        }
+
         
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(PENDING_ASSIGNMENTS_PREFIX),
 pendingAssignmentsRebalanceListener);
         
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX),
 stableAssignmentsRebalanceListener);
         
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(ASSIGNMENTS_SWITCH_REDUCE_PREFIX),
 assignmentsSwitchRebalanceListener);
@@ -560,6 +577,18 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
         addMessageHandler(clusterService.messagingService());
     }
 
+    private CompletableFuture<Void> performRebalanceOnRecovery(long revision) {
+        var prefix = new ByteArray(PENDING_ASSIGNMENTS_PREFIX);
+
+        try (Cursor<Entry> cursor = metaStorageMgr.prefixLocally(prefix, 
revision)) {
+            CompletableFuture<?>[] futures = cursor.stream()
+                    .map(this::handleChangePendingAssignmentEvent)
+                    .toArray(CompletableFuture[]::new);
+
+            return allOf(futures);
+        }
+    }
+
     /**
      * Adds a table manager message handler.
      *
@@ -1896,7 +1925,8 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
     /**
      * Returns the tables by ID future for the given causality token.
-     * The future will only be completed when corresponding assignments update 
completes.
+     *
+     * <p>The future will only be completed when corresponding assignments 
update completes.
      *
      * @param causalityToken Causality token.
      * @return The future with tables map.
@@ -2021,7 +2051,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
      * @param name Table name.
      * @return Future representing pending completion of the {@code 
TableManager#tableAsyncInternal} operation.
      */
-    public CompletableFuture<TableImpl> tableAsyncInternal(String name) {
+    private CompletableFuture<TableImpl> tableAsyncInternal(String name) {
         if (!busyLock.enterBusy()) {
             throw new IgniteException(new NodeStoppingException());
         }
@@ -2048,7 +2078,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
      *         otherwise.
      * @return Future representing pending completion of the operation.
      */
-    public CompletableFuture<TableImpl> tableAsyncInternal(int id, boolean 
checkConfiguration) {
+    private CompletableFuture<TableImpl> tableAsyncInternal(int id, boolean 
checkConfiguration) {
         CompletableFuture<Boolean> tblCfgFut = checkConfiguration
                 ? supplyAsync(() -> inBusyLock(busyLock, () -> 
isTableConfigured(id)), ioExecutor)
                 : completedFuture(true);
@@ -2160,7 +2190,9 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                 }
 
                 try {
-                    return handleChangePendingAssignmentEvent(evt);
+                    assert evt.single();
+
+                    return 
handleChangePendingAssignmentEvent(evt.entryEvent().newEntry());
                 } finally {
                     busyLock.leaveBusy();
                 }
@@ -2173,24 +2205,21 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
         };
     }
 
-    private CompletableFuture<Void> 
handleChangePendingAssignmentEvent(WatchEvent evt) {
-        assert evt.single();
-
-        Entry pendingAssignmentsWatchEntry = evt.entryEvent().newEntry();
-
-        if (pendingAssignmentsWatchEntry.value() == null) {
+    private CompletableFuture<Void> handleChangePendingAssignmentEvent(Entry 
pendingAssignmentsEntry) {
+        if (pendingAssignmentsEntry.value() == null) {
             return completedFuture(null);
         }
 
-        int partId = 
extractPartitionNumber(pendingAssignmentsWatchEntry.key());
-        int tblId = extractTableId(pendingAssignmentsWatchEntry.key(), 
PENDING_ASSIGNMENTS_PREFIX);
+        int partId = extractPartitionNumber(pendingAssignmentsEntry.key());
+        int tblId = extractTableId(pendingAssignmentsEntry.key(), 
PENDING_ASSIGNMENTS_PREFIX);
+        long revision = pendingAssignmentsEntry.revision();
 
         var replicaGrpId = new TablePartitionId(tblId, partId);
 
         // Stable assignments from the meta store, which revision is bounded 
by the current pending event.
-        CompletableFuture<Entry> stableAssignmentsFuture = 
metaStorageMgr.get(stablePartAssignmentsKey(replicaGrpId), evt.revision());
+        CompletableFuture<Entry> stableAssignmentsFuture = 
metaStorageMgr.get(stablePartAssignmentsKey(replicaGrpId), revision);
 
-        return tablesById(evt.revision())
+        return tablesById(revision)
                 .thenCombineAsync(stableAssignmentsFuture, (tables, 
stableAssignmentsEntry) -> {
                     if (!busyLock.enterBusy()) {
                         return CompletableFuture.<Void>failedFuture(new 
NodeStoppingException());
@@ -2198,10 +2227,9 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
                     try {
                         return handleChangePendingAssignmentEvent(
-                                evt.revision(),
                                 replicaGrpId,
                                 tables.get(tblId),
-                                pendingAssignmentsWatchEntry,
+                                pendingAssignmentsEntry,
                                 stableAssignmentsEntry
                         );
                     } finally {
@@ -2212,140 +2240,145 @@ public class TableManager extends 
Producer<TableEvent, TableEventParameters> imp
     }
 
     private CompletableFuture<Void> handleChangePendingAssignmentEvent(
-            long causalityToken,
             TablePartitionId replicaGrpId,
             TableImpl tbl,
             Entry pendingAssignmentsEntry,
             Entry stableAssignmentsEntry
     ) {
-        // Assignments of the pending rebalance that we received through the 
Meta storage watch mechanism.
-        Set<Assignment> pendingAssignments = 
ByteUtils.fromBytes(pendingAssignmentsEntry.value());
-
-        PeersAndLearners pendingConfiguration = 
configurationFromAssignments(pendingAssignments);
+        ClusterNode localMember = localNode();
 
-        int tableId = tbl.tableId();
         int partId = replicaGrpId.partitionId();
 
-        byte[] stableAssignmentsBytes = stableAssignmentsEntry.value();
+        if (LOG.isInfoEnabled()) {
+            var stringKey = new String(pendingAssignmentsEntry.key(), 
StandardCharsets.UTF_8);
 
-        Set<Assignment> stableAssignments = 
ByteUtils.fromBytes(stableAssignmentsBytes);
+            LOG.info("Received update on pending assignments. Check if new 
raft group should be started"
+                            + " [key={}, partition={}, table={}, 
localMemberAddress={}]",
+                    stringKey, partId, tbl.name(), localMember.address());
+        }
 
-        PeersAndLearners stableConfiguration = 
configurationFromAssignments(stableAssignments);
+        Set<Assignment> pendingAssignments = 
ByteUtils.fromBytes(pendingAssignmentsEntry.value());
+
+        Set<Assignment> stableAssignments = 
ByteUtils.fromBytes(stableAssignmentsEntry.value());
 
         placementDriver.updateAssignment(
                 replicaGrpId,
-                
stableConfiguration.peers().stream().map(Peer::consistentId).collect(toList())
+                
stableAssignments.stream().filter(Assignment::isPeer).map(Assignment::consistentId).collect(toList())
         );
 
-        ClusterNode localMember = localNode();
-
         // Start a new Raft node and Replica if this node has appeared in the 
new assignments.
         boolean shouldStartLocalServices = pendingAssignments.stream()
                 .filter(assignment -> 
localMember.name().equals(assignment.consistentId()))
                 .anyMatch(assignment -> 
!stableAssignments.contains(assignment));
 
-        PendingComparableValuesTracker<HybridTimestamp, Void> safeTimeTracker =
-                new PendingComparableValuesTracker<>(new HybridTimestamp(1, 
0));
-        PendingComparableValuesTracker<Long, Void> storageIndexTracker = new 
PendingComparableValuesTracker<>(0L);
+        CompletableFuture<Void> localServicesStartFuture;
 
-        InternalTable internalTable = tbl.internalTable();
+        if (shouldStartLocalServices) {
+            localServicesStartFuture = 
localPartsByTableIdVv.get(pendingAssignmentsEntry.revision())
+                    .thenComposeAsync(oldMap -> {
+                        int tableId = tbl.tableId();
 
-        LOG.info("Received update on pending assignments. Check if new raft 
group should be started"
-                        + " [key={}, partition={}, table={}, 
localMemberAddress={}]",
-                new String(pendingAssignmentsEntry.key(), 
StandardCharsets.UTF_8), partId, tbl.name(), localMember.address());
+                        PartitionSet partitionSet = oldMap.get(tableId).copy();
 
-        CompletableFuture<Void> localServicesStartFuture;
+                        return getOrCreatePartitionStorages(tbl, 
partitionSet).thenApply(u -> {
+                            var newMap = new HashMap<>(oldMap);
 
-        if (shouldStartLocalServices) {
-            localServicesStartFuture = 
localPartsByTableIdVv.get(causalityToken).thenComposeAsync(oldMap -> {
-                PartitionSet partitionSet = oldMap.get(tableId).copy();
+                            newMap.put(tableId, partitionSet);
 
-                return getOrCreatePartitionStorages(tbl, 
partitionSet).thenApply(u -> {
-                    var newMap = new HashMap<>(oldMap);
+                            return newMap;
+                        });
+                    }, ioExecutor)
+                    .thenRunAsync(() -> inBusyLock(busyLock, () -> {
+                        InternalTable internalTable = tbl.internalTable();
 
-                    newMap.put(tableId, partitionSet);
+                        var safeTimeTracker = new 
PendingComparableValuesTracker<HybridTimestamp, 
Void>(HybridTimestamp.MIN_VALUE);
+                        var storageIndexTracker = new 
PendingComparableValuesTracker<Long, Void>(0L);
 
-                    return newMap;
-                });
-            }).thenComposeAsync(unused -> {
-                PartitionStorages partitionStorages = 
getPartitionStorages(tbl, partId);
+                        PartitionStorages partitionStorages = 
getPartitionStorages(tbl, partId);
 
-                MvPartitionStorage mvPartitionStorage = 
partitionStorages.getMvPartitionStorage();
-                TxStateStorage txStatePartitionStorage = 
partitionStorages.getTxStateStorage();
+                        MvPartitionStorage mvPartitionStorage = 
partitionStorages.getMvPartitionStorage();
+                        TxStateStorage txStatePartitionStorage = 
partitionStorages.getTxStateStorage();
 
-                PartitionDataStorage partitionDataStorage = 
partitionDataStorage(mvPartitionStorage, internalTable, partId);
+                        PartitionDataStorage partitionDataStorage = 
partitionDataStorage(mvPartitionStorage, internalTable, partId);
 
-                PartitionUpdateHandlers partitionUpdateHandlers = 
createPartitionUpdateHandlers(
-                        partId,
-                        partitionDataStorage,
-                        tbl,
-                        safeTimeTracker
-                );
+                        PartitionUpdateHandlers partitionUpdateHandlers = 
createPartitionUpdateHandlers(
+                                partId,
+                                partitionDataStorage,
+                                tbl,
+                                safeTimeTracker
+                        );
 
-                return runAsync(() -> inBusyLock(busyLock, () -> {
-                    try {
-                        Peer serverPeer = 
pendingConfiguration.peer(localMember.name());
-
-                        RaftNodeId raftNodeId = new RaftNodeId(replicaGrpId, 
serverPeer);
-
-                        if (!((Loza) raftMgr).isStarted(raftNodeId)) {
-                            startPartitionRaftGroupNode(
-                                    replicaGrpId,
-                                    pendingConfiguration,
-                                    stableConfiguration,
-                                    safeTimeTracker,
-                                    storageIndexTracker,
-                                    internalTable,
-                                    txStatePartitionStorage,
-                                    partitionDataStorage,
-                                    partitionUpdateHandlers
-                            );
-                        }
+                        PeersAndLearners pendingConfiguration = 
configurationFromAssignments(pendingAssignments);
 
-                        if (!replicaMgr.isReplicaStarted(replicaGrpId)) {
-                            startReplicaWithNewListener(
-                                    replicaGrpId,
-                                    tbl,
-                                    safeTimeTracker,
-                                    storageIndexTracker,
-                                    mvPartitionStorage,
-                                    txStatePartitionStorage,
-                                    partitionUpdateHandlers,
-                                    (TopologyAwareRaftGroupService) 
internalTable.partitionRaftGroupService(partId)
-                            );
+                        try {
+                            Peer serverPeer = 
pendingConfiguration.peer(localMember.name());
+
+                            RaftNodeId raftNodeId = new 
RaftNodeId(replicaGrpId, serverPeer);
+
+                            if (!((Loza) raftMgr).isStarted(raftNodeId)) {
+                                PeersAndLearners stableConfiguration = 
configurationFromAssignments(stableAssignments);
+
+                                startPartitionRaftGroupNode(
+                                        replicaGrpId,
+                                        pendingConfiguration,
+                                        stableConfiguration,
+                                        safeTimeTracker,
+                                        storageIndexTracker,
+                                        internalTable,
+                                        txStatePartitionStorage,
+                                        partitionDataStorage,
+                                        partitionUpdateHandlers
+                                );
+                            }
+
+                            if (!replicaMgr.isReplicaStarted(replicaGrpId)) {
+                                startReplicaWithNewListener(
+                                        replicaGrpId,
+                                        tbl,
+                                        safeTimeTracker,
+                                        storageIndexTracker,
+                                        mvPartitionStorage,
+                                        txStatePartitionStorage,
+                                        partitionUpdateHandlers,
+                                        (TopologyAwareRaftGroupService) 
internalTable.partitionRaftGroupService(partId)
+                                );
+                            }
+                        } catch (NodeStoppingException ignored) {
+                            // No-op.
                         }
-                    } catch (NodeStoppingException ignored) {
-                        // No-op.
-                    }
-                }), ioExecutor);
-            });
+                    }), ioExecutor);
         } else {
             localServicesStartFuture = completedFuture(null);
         }
 
         return localServicesStartFuture
-                .thenCompose(v -> 
metaStorageMgr.get(pendingPartAssignmentsKey(replicaGrpId)))
-                .thenCompose(latestPendingAssignmentsEntry -> {
-                    // Do not change peers of the raft group if this is a 
stale event.
-                    // Note that we start raft node before for the sake of the 
consistency in a starting and stopping raft nodes.
-                    if (pendingAssignmentsEntry.revision() < 
latestPendingAssignmentsEntry.revision()) {
-                        return completedFuture(null);
-                    }
-
-                    RaftGroupService partGrpSvc = 
internalTable.partitionRaftGroupService(partId);
+                .thenCompose(v -> {
+                    RaftGroupService partGrpSvc = 
tbl.internalTable().partitionRaftGroupService(partId);
 
                     return partGrpSvc.refreshAndGetLeaderWithTerm()
                             .thenCompose(leaderWithTerm -> {
-                                // run update of raft configuration if this 
node is a leader
-                                if (isLocalPeer(leaderWithTerm.leader())) {
-                                    LOG.info("Current node={} is the leader of 
partition raft group={}. "
-                                                    + "Initiate rebalance 
process for partition={}, table={}",
-                                            localMember.address(), 
replicaGrpId, partId, tbl.name());
-
-                                    return 
partGrpSvc.changePeersAsync(pendingConfiguration, leaderWithTerm.term());
-                                } else {
+                                if (!isLocalPeer(leaderWithTerm.leader())) {
                                     return completedFuture(null);
                                 }
+
+                                // run update of raft configuration if this 
node is a leader
+                                LOG.info("Current node={} is the leader of 
partition raft group={}. "
+                                                + "Initiate rebalance process 
for partition={}, table={}",
+                                        leaderWithTerm.leader(), replicaGrpId, 
partId, tbl.name());
+
+                                return 
metaStorageMgr.get(pendingPartAssignmentsKey(replicaGrpId))
+                                        
.thenCompose(latestPendingAssignmentsEntry -> {
+                                            // Do not change peers of the raft 
group if this is a stale event.
+                                            // Note that we start raft node 
before for the sake of the consistency in a starting and
+                                            // stopping raft nodes.
+                                            if 
(pendingAssignmentsEntry.revision() < latestPendingAssignmentsEntry.revision()) 
{
+                                                return completedFuture(null);
+                                            }
+
+                                            PeersAndLearners newConfiguration 
= configurationFromAssignments(pendingAssignments);
+
+                                            return 
partGrpSvc.changePeersAsync(newConfiguration, leaderWithTerm.term());
+                                        });
                             });
                 });
     }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index d1d45f11c8..091ac009ca 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -119,6 +119,7 @@ import org.apache.ignite.internal.tx.HybridTimestampTracker;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
+import org.apache.ignite.internal.util.CursorUtils;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.lang.ByteArray;
@@ -198,7 +199,7 @@ public class TableManagerTest extends IgniteAbstractTest {
 
     /** Meta storage manager. */
     @Mock
-    MetaStorageManager msm;
+    private MetaStorageManager msm;
 
     /** Mock cluster service. */
     @Mock
@@ -292,6 +293,8 @@ public class TableManagerTest extends IgniteAbstractTest {
         when(replicaMgr.stopReplica(any())).thenReturn(completedFuture(true));
 
         tblManagerFut = new CompletableFuture<>();
+
+        mockMetastore();
     }
 
     @AfterEach
@@ -315,8 +318,6 @@ public class TableManagerTest extends IgniteAbstractTest {
     public void testPreconfiguredTable() throws Exception {
         when(rm.startRaftGroupService(any(), any(), any())).thenAnswer(mock -> 
completedFuture(mock(TopologyAwareRaftGroupService.class)));
 
-        mockMetastore();
-
         TableManager tableManager = createTableManager(tblManagerFut);
 
         tblManagerFut.complete(tableManager);
@@ -638,8 +639,6 @@ public class TableManagerTest extends IgniteAbstractTest {
         
doReturn(mock(PartitionTimestampCursor.class)).when(mvPartitionStorage).scan(any());
         when(txStateStorage.clear()).thenReturn(completedFuture(null));
 
-        mockMetastore();
-
         // For some reason, "when(something).thenReturn" does not work on 
spies, but this notation works.
         createTableManager(tblManagerFut, (mvTableStorage) -> {
             
doReturn(completedFuture(mvPartitionStorage)).when(mvTableStorage).createMvPartition(anyInt());
@@ -690,6 +689,10 @@ public class TableManagerTest extends IgniteAbstractTest {
 
         when(msm.invoke(any(), any(Operation.class), 
any(Operation.class))).thenReturn(completedFuture(null));
         when(msm.invoke(any(), any(List.class), 
any(List.class))).thenReturn(completedFuture(null));
+
+        when(msm.recoveryFinishedFuture()).thenReturn(completedFuture(0L));
+
+        when(msm.prefixLocally(any(), 
anyLong())).thenReturn(CursorUtils.emptyCursor());
     }
 
     /**
@@ -738,8 +741,6 @@ public class TableManagerTest extends IgniteAbstractTest {
                     .thenReturn(assignment);
         }
 
-        mockMetastore();
-
         TableManager tableManager = createTableManager(tblManagerFut);
 
         int tablesBeforeCreation = tableManager.tables().size();


Reply via email to