This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new a227876c0c IGNITE-20187 Pending assignments recovery on Table Manager
start (#2517)
a227876c0c is described below
commit a227876c0ce4ebf56417b3b031fb3140713bd20e
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Mon Sep 11 13:12:04 2023 +0300
IGNITE-20187 Pending assignments recovery on Table Manager start (#2517)
---
.../org/apache/ignite/lang/util/StringUtils.java | 25 ++
.../apache/ignite/lang/util/StringUtilsTest.java} | 41 ++--
.../internal/metastorage/MetaStorageManager.java | 13 ++
.../metastorage/impl/MetaStorageManagerImpl.java | 8 +
.../placementdriver/AssignmentsTracker.java | 7 +-
.../ItRebalanceDistributedTest.java | 6 +-
.../storage/DistributedConfigurationStorage.java | 24 +-
.../storage/LocalConfigurationStorage.java | 10 +-
.../DistributedConfigurationStorageTest.java | 7 +-
.../sql/engine/exec/MockedStructuresTest.java | 49 ++--
.../internal/table/distributed/TableManager.java | 253 ++++++++++++---------
.../table/distributed/TableManagerTest.java | 15 +-
12 files changed, 254 insertions(+), 204 deletions(-)
diff --git
a/modules/api/src/main/java/org/apache/ignite/lang/util/StringUtils.java
b/modules/api/src/main/java/org/apache/ignite/lang/util/StringUtils.java
index 9eb1e12a4e..30c4127180 100644
--- a/modules/api/src/main/java/org/apache/ignite/lang/util/StringUtils.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/util/StringUtils.java
@@ -17,6 +17,7 @@
package org.apache.ignite.lang.util;
+import java.util.Objects;
import org.jetbrains.annotations.Nullable;
/**
@@ -45,4 +46,28 @@ public final class StringUtils {
public static boolean nullOrBlank(@Nullable String s) {
return s == null || s.isBlank();
}
+
+ /**
+ * Increments the numeric value of the last character of the given string
by 1.
+ *
+ * <p>This method is useful for using APIs that accept string ranges where
the upper bound is not included.
+ *
+ * @param s Original string.
+ * @return New string with the last character incremented.
+ */
+ public static String incrementLastChar(String s) {
+ Objects.requireNonNull(s);
+
+ char[] chars = s.toCharArray();
+
+ char lastChar = chars[chars.length - 1];
+
+ if (lastChar == Character.MAX_VALUE) {
+ throw new IllegalArgumentException("Cannot increment the last
character as it is equal to MAX_VALUE");
+ }
+
+ chars[chars.length - 1] = (char) (lastChar + 1);
+
+ return String.valueOf(chars);
+ }
}
diff --git
a/modules/api/src/main/java/org/apache/ignite/lang/util/StringUtils.java
b/modules/api/src/test/java/org/apache/ignite/lang/util/StringUtilsTest.java
similarity index 50%
copy from modules/api/src/main/java/org/apache/ignite/lang/util/StringUtils.java
copy to
modules/api/src/test/java/org/apache/ignite/lang/util/StringUtilsTest.java
index 9eb1e12a4e..07180cc435 100644
--- a/modules/api/src/main/java/org/apache/ignite/lang/util/StringUtils.java
+++ b/modules/api/src/test/java/org/apache/ignite/lang/util/StringUtilsTest.java
@@ -17,32 +17,29 @@
package org.apache.ignite.lang.util;
-import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.lang.util.StringUtils.incrementLastChar;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertThrows;
-/**
- * Class containing useful methods for working with strings.
- */
-public final class StringUtils {
- private StringUtils() {
+import org.junit.jupiter.api.Test;
+
+class StringUtilsTest {
+ @Test
+ void testIncrementLastChar() {
+ assertThat(incrementLastChar("foo_a"), is("foo_b"));
+ assertThat(incrementLastChar("x"), is("y"));
}
- /**
- * Tests if given string is {@code null} or empty.
- *
- * @param s String to test.
- * @return Whether or not the given string is {@code null} or empty.
- */
- public static boolean nullOrEmpty(@Nullable String s) {
- return s == null || s.isEmpty();
+ @Test
+ void testIncrementLastCharUtf16() {
+ assertThat(incrementLastChar("foo_ы"), is("foo_ь"));
+ assertThat(incrementLastChar("ы"), is("ь"));
}
- /**
- * Tests if given string is {@code null} or {@link String#isBlank}.
- *
- * @param s String to test.
- * @return Whether or not the given string is {@code null} or blank.
- */
- public static boolean nullOrBlank(@Nullable String s) {
- return s == null || s.isBlank();
+ @Test
+ void testIncrementLastCharError() {
+ assertThrows(NullPointerException.class, () ->
incrementLastChar(null));
+ assertThrows(IllegalArgumentException.class, () ->
incrementLastChar("foo_" + Character.MAX_VALUE));
}
}
diff --git
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index 54453fbff0..37cd61af79 100644
---
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -107,6 +107,19 @@ public interface MetaStorageManager extends
IgniteComponent {
*/
Cursor<Entry> getLocally(ByteArray startKey, ByteArray endKey, long
revUpperBound);
+ /**
+ * Returns cursor by entries which correspond to the given key prefix and
bounded by revision number. The entries in the cursor
+ * are obtained from the local storage.
+ *
+ * <p>This method doesn't wait for the storage's revision to become
greater or equal to the revUpperBound parameter, so it is
+ * up to user to wait for the appropriate time to call this method.
+ *
+ * @param keyPrefix Key prefix.
+ * @param revUpperBound Upper bound of revision.
+ * @return Cursor by entries which correspond to the given key prefix.
+ */
+ Cursor<Entry> prefixLocally(ByteArray keyPrefix, long revUpperBound);
+
/**
* Looks up a timestamp by a revision. This should only be invoked if it
is guaranteed that the
* revision is available in the local storage. This method always operates
locally.
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index b3c41997cf..74a817a730 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -531,6 +531,14 @@ public class MetaStorageManagerImpl implements
MetaStorageManager {
return storage.range(startKey.bytes(), endKey.bytes(), revUpperBound);
}
+ @Override
+ public Cursor<Entry> prefixLocally(ByteArray keyPrefix, long
revUpperBound) {
+ byte[] rangeStart = keyPrefix.bytes();
+ byte[] rangeEnd = storage.nextKey(rangeStart);
+
+ return storage.range(rangeStart, rangeEnd, revUpperBound);
+ }
+
@Override
public HybridTimestamp timestampByRevision(long revision) {
if (!busyLock.enterBusy()) {
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
index c8a3542421..9bf4778909 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.placementdriver;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static
org.apache.ignite.internal.utils.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX;
+import static org.apache.ignite.lang.util.StringUtils.incrementLastChar;
import java.nio.charset.StandardCharsets;
import java.util.Map;
@@ -101,12 +102,6 @@ public class AssignmentsTracker {
LOG.info("Assignment cache initialized for placement driver
[groupAssignments={}]", groupAssignments);
}
- private static String incrementLastChar(String str) {
- char lastChar = str.charAt(str.length() - 1);
-
- return str.substring(0, str.length() - 1) + (char) (lastChar + 1);
- }
-
/**
* Stops the tracker.
*/
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
similarity index 99%
rename from
modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
rename to
modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index e6223062e3..1be39e9a0d 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.configuration.storage;
+package org.apache.ignite.internal.rebalance;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -98,6 +98,8 @@ import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopolog
import org.apache.ignite.internal.configuration.ConfigurationManager;
import org.apache.ignite.internal.configuration.ConfigurationRegistry;
import org.apache.ignite.internal.configuration.ConfigurationTreeGenerator;
+import
org.apache.ignite.internal.configuration.storage.DistributedConfigurationStorage;
+import
org.apache.ignite.internal.configuration.storage.LocalFileConfigurationStorage;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import
org.apache.ignite.internal.configuration.validation.TestConfigurationValidator;
@@ -495,7 +497,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
@Test
@UseTestTxStateStorage
@UseRocksMetaStorage
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-20187")
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-20210")
void testDestroyPartitionStoragesOnRestartEvictedNode(TestInfo testInfo)
throws Exception {
Node node = getNode(0);
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
index c9d95e84fb..4cc5fca656 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
@@ -75,13 +75,6 @@ public class DistributedConfigurationStorage implements
ConfigurationStorage {
*/
private static final ByteArray DST_KEYS_START_RANGE = new
ByteArray(DISTRIBUTED_PREFIX);
- /**
- * This key is expected to be the last key in lexicographical order of
distributed configuration keys. It is possible because keys are
- * in lexicographical order in meta storage and adding {@code (char)('.' +
1)} to the end will produce all keys with prefix
- * {@link DistributedConfigurationStorage#DISTRIBUTED_PREFIX}
- */
- private static final ByteArray DST_KEYS_END_RANGE = new
ByteArray(incrementLastChar(DISTRIBUTED_PREFIX));
-
/** Meta storage manager. */
private final MetaStorageManager metaStorageMgr;
@@ -129,13 +122,11 @@ public class DistributedConfigurationStorage implements
ConfigurationStorage {
/** {@inheritDoc} */
@Override
public CompletableFuture<Map<String, ? extends Serializable>>
readAllLatest(String prefix) {
- var rangeStart = new ByteArray(DISTRIBUTED_PREFIX + prefix);
-
- var rangeEnd = new ByteArray(incrementLastChar(DISTRIBUTED_PREFIX +
prefix));
+ var prefixBytes = new ByteArray(DISTRIBUTED_PREFIX + prefix);
var resultFuture = new CompletableFuture<Map<String, ? extends
Serializable>>();
- metaStorageMgr.range(rangeStart, rangeEnd).subscribe(new
Subscriber<>() {
+ metaStorageMgr.prefix(prefixBytes).subscribe(new Subscriber<>() {
private final Map<String, Serializable> data = new HashMap<>();
@Override
@@ -206,7 +197,7 @@ public class DistributedConfigurationStorage implements
ConfigurationStorage {
byte[] masterKey = MASTER_KEY.bytes();
boolean sawMasterKey = false;
- try (Cursor<Entry> cursor =
metaStorageMgr.getLocally(DST_KEYS_START_RANGE, DST_KEYS_END_RANGE,
cfgRevision)) {
+ try (Cursor<Entry> cursor =
metaStorageMgr.prefixLocally(DST_KEYS_START_RANGE, cfgRevision)) {
for (Entry entry : cursor) {
if (entry.tombstone()) {
continue;
@@ -354,15 +345,6 @@ public class DistributedConfigurationStorage implements
ConfigurationStorage {
return metaStorageMgr.get(MASTER_KEY).thenApply(Entry::revision);
}
- /**
- * Increments the last character of the given string.
- */
- private static String incrementLastChar(String str) {
- char lastChar = str.charAt(str.length() - 1);
-
- return str.substring(0, str.length() - 1) + (char) (lastChar + 1);
- }
-
private <T> CompletableFuture<T> registerFuture(CompletableFuture<T>
future) {
futureTracker.registerFuture(future);
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java
b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java
index c026a6aa04..8be08bb433 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.configuration.storage;
import static java.util.concurrent.CompletableFuture.supplyAsync;
import static
org.apache.ignite.internal.configuration.util.ConfigurationSerializationUtil.fromBytes;
import static
org.apache.ignite.internal.configuration.util.ConfigurationSerializationUtil.toBytes;
+import static org.apache.ignite.lang.util.StringUtils.incrementLastChar;
import java.io.Serializable;
import java.util.HashMap;
@@ -223,15 +224,6 @@ public class LocalConfigurationStorage implements
ConfigurationStorage {
.thenApply(entry -> entry == null ? 0 : (Long)
fromBytes(entry.value()));
}
- /**
- * Increments the last character of the given string.
- */
- private static String incrementLastChar(String str) {
- char lastChar = str.charAt(str.length() - 1);
-
- return str.substring(0, str.length() - 1) + (char) (lastChar + 1);
- }
-
private <T> CompletableFuture<T> registerFuture(CompletableFuture<T>
future) {
futureTracker.registerFuture(future);
diff --git
a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorageTest.java
b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorageTest.java
index 36d40052d8..e00efb982d 100644
---
a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorageTest.java
+++
b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorageTest.java
@@ -86,11 +86,10 @@ public class DistributedConfigurationStorageTest extends
ConfigurationStorageTes
return CompletableFuture.completedFuture(invokeResult);
});
- when(mock.range(any(), any())).thenAnswer(invocation -> {
- ByteArray keyFrom = invocation.getArgument(0);
- ByteArray keyTo = invocation.getArgument(1);
+ when(mock.prefix(any())).thenAnswer(invocation -> {
+ ByteArray prefix = invocation.getArgument(0);
- return fromCursor(metaStorage.range(keyFrom.bytes(), keyTo == null
? null : keyTo.bytes()));
+ return fromCursor(metaStorage.range(prefix.bytes(),
metaStorage.nextKey(prefix.bytes())));
});
return mock;
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index ab63d6d848..961bacd999 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -102,6 +102,7 @@ import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.util.AsyncCursor.BatchedResult;
+import org.apache.ignite.internal.util.CursorUtils;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteException;
@@ -118,6 +119,7 @@ import org.apache.ignite.tx.IgniteTransactions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.MockedStatic;
@@ -150,7 +152,7 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
private TopologyService ts;
/** Cluster service. */
- @Mock(lenient = true)
+ @Mock
private ClusterService cs;
/** Raft manager. */
@@ -158,23 +160,23 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
private RaftManager rm;
/** TX manager. */
- @Mock(lenient = true)
+ @Mock
private TxManager tm;
/** Ignite transactions. */
- @Mock(lenient = true)
+ @Mock
private IgniteTransactions transactions;
/** Meta storage manager. */
@Mock
- MetaStorageManager msm;
+ private MetaStorageManager msm;
/** Replica manager. */
@Mock
- ReplicaManager replicaManager;
+ private ReplicaManager replicaManager;
@Mock
- HybridClock clock;
+ private HybridClock clock;
@Mock
private VaultManager vaultManager;
@@ -203,13 +205,13 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
@InjectConfiguration
private GcConfiguration gcConfig;
- TableManager tblManager;
+ private TableManager tblManager;
- IndexManager idxManager;
+ private IndexManager idxManager;
- ClusterManagementGroupManager cmgMgr;
+ private ClusterManagementGroupManager cmgMgr;
- SqlQueryProcessor queryProc;
+ private SqlQueryProcessor queryProc;
@InjectConfiguration
private RocksDbStorageEngineConfiguration rocksDbEngineConfig;
@@ -233,11 +235,8 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
private MetricManager metricManager;
/** Returns current method name. */
- private static String getCurrentMethodName() {
- return StackWalker.getInstance()
- .walk(s -> s.skip(1).findFirst())
- .get()
- .getMethodName();
+ private static String getCurrentMethodName(TestInfo testInfo) {
+ return testInfo.getTestMethod().orElseThrow().getName();
}
/** Stop configuration manager. */
@@ -370,14 +369,18 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
//noinspection unchecked
when(msm.invoke(any(), any(Collection.class),
any(Collection.class))).thenReturn(completedFuture(null));
+
+ when(msm.recoveryFinishedFuture()).thenReturn(completedFuture(0L));
+
+ when(msm.prefixLocally(any(),
anyLong())).thenReturn(CursorUtils.emptyCursor());
}
/**
* Tests create a table through public API.
*/
@Test
- public void testCreateTable() {
- String curMethodName = getCurrentMethodName();
+ public void testCreateTable(TestInfo testInfo) {
+ String curMethodName = getCurrentMethodName(testInfo);
String newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY,
c2 varbinary(255)) "
+ "with primary_zone='%s'", curMethodName, ZONE_NAME);
@@ -422,8 +425,8 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
* Tests create a table with distribution zone through public API.
*/
@Test
- public void testCreateTableWithDistributionZone() {
- String tableName = getCurrentMethodName().toUpperCase();
+ public void testCreateTableWithDistributionZone(TestInfo testInfo) {
+ String tableName = getCurrentMethodName(testInfo).toUpperCase();
String newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY,
c2 varbinary(255)) ",
tableName);
@@ -464,8 +467,8 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
* Tests create and drop table through public API.
*/
@Test
- public void testDropTable() {
- String curMethodName = getCurrentMethodName();
+ public void testDropTable(TestInfo testInfo) {
+ String curMethodName = getCurrentMethodName(testInfo);
String newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY,
c2 varchar(255))", curMethodName);
@@ -488,8 +491,8 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
}
@Test
- void createTableWithTableOptions() {
- String method = getCurrentMethodName();
+ void createTableWithTableOptions(TestInfo testInfo) {
+ String method = getCurrentMethodName(testInfo);
assertDoesNotThrow(() -> readFirst(sql(
String.format(
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 11a8c5c9ba..e1d03f8ca9 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -43,6 +43,7 @@ import static
org.apache.ignite.internal.utils.RebalanceUtil.extractPartitionNum
import static org.apache.ignite.internal.utils.RebalanceUtil.extractTableId;
import static
org.apache.ignite.internal.utils.RebalanceUtil.pendingPartAssignmentsKey;
import static
org.apache.ignite.internal.utils.RebalanceUtil.stablePartAssignmentsKey;
+import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.io.IOException;
@@ -180,6 +181,7 @@ import
org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
import
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbTableStorage;
import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
@@ -286,14 +288,17 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
/**
* Versioned store for local partition set by table id.
- * Completed strictly after {@link #tablesByIdVv} and strictly before
{@link #assignmentsUpdatedVv}.
+ *
+ * <p>Completed strictly after {@link #tablesByIdVv} and strictly before
{@link #assignmentsUpdatedVv}.
*/
private final IncrementalVersionedValue<Map<Integer, PartitionSet>>
localPartsByTableIdVv;
/**
* Versioned store for tracking RAFT groups initialization and starting
completion.
- * Only explicitly updated in {@link #createTablePartitionsLocally(long,
CompletableFuture, int, TableImpl)}.
- * Completed strictly after {@link #localPartsByTableIdVv}.
+ *
+ * <p>Only explicitly updated in {@link
#createTablePartitionsLocally(long, CompletableFuture, int, TableImpl)}.
+ *
+ * <p>Completed strictly after {@link #localPartsByTableIdVv}.
*/
private final IncrementalVersionedValue<Void> assignmentsUpdatedVv;
@@ -525,6 +530,18 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
lowWatermark.start();
+ try {
+ metaStorageMgr.recoveryFinishedFuture()
+ .thenComposeAsync(this::performRebalanceOnRecovery,
ioExecutor)
+ .get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ throw new IgniteInternalException(INTERNAL_ERR, e);
+ } catch (ExecutionException e) {
+ throw new IgniteInternalException(INTERNAL_ERR, e);
+ }
+
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(PENDING_ASSIGNMENTS_PREFIX),
pendingAssignmentsRebalanceListener);
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX),
stableAssignmentsRebalanceListener);
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(ASSIGNMENTS_SWITCH_REDUCE_PREFIX),
assignmentsSwitchRebalanceListener);
@@ -560,6 +577,18 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
addMessageHandler(clusterService.messagingService());
}
+ private CompletableFuture<Void> performRebalanceOnRecovery(long revision) {
+ var prefix = new ByteArray(PENDING_ASSIGNMENTS_PREFIX);
+
+ try (Cursor<Entry> cursor = metaStorageMgr.prefixLocally(prefix,
revision)) {
+ CompletableFuture<?>[] futures = cursor.stream()
+ .map(this::handleChangePendingAssignmentEvent)
+ .toArray(CompletableFuture[]::new);
+
+ return allOf(futures);
+ }
+ }
+
/**
* Adds a table manager message handler.
*
@@ -1896,7 +1925,8 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
/**
* Returns the tables by ID future for the given causality token.
- * The future will only be completed when corresponding assignments update
completes.
+ *
+ * <p>The future will only be completed when corresponding assignments
update completes.
*
* @param causalityToken Causality token.
* @return The future with tables map.
@@ -2021,7 +2051,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
* @param name Table name.
* @return Future representing pending completion of the {@code
TableManager#tableAsyncInternal} operation.
*/
- public CompletableFuture<TableImpl> tableAsyncInternal(String name) {
+ private CompletableFuture<TableImpl> tableAsyncInternal(String name) {
if (!busyLock.enterBusy()) {
throw new IgniteException(new NodeStoppingException());
}
@@ -2048,7 +2078,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
* otherwise.
* @return Future representing pending completion of the operation.
*/
- public CompletableFuture<TableImpl> tableAsyncInternal(int id, boolean
checkConfiguration) {
+ private CompletableFuture<TableImpl> tableAsyncInternal(int id, boolean
checkConfiguration) {
CompletableFuture<Boolean> tblCfgFut = checkConfiguration
? supplyAsync(() -> inBusyLock(busyLock, () ->
isTableConfigured(id)), ioExecutor)
: completedFuture(true);
@@ -2160,7 +2190,9 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
}
try {
- return handleChangePendingAssignmentEvent(evt);
+ assert evt.single();
+
+ return
handleChangePendingAssignmentEvent(evt.entryEvent().newEntry());
} finally {
busyLock.leaveBusy();
}
@@ -2173,24 +2205,21 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
};
}
- private CompletableFuture<Void>
handleChangePendingAssignmentEvent(WatchEvent evt) {
- assert evt.single();
-
- Entry pendingAssignmentsWatchEntry = evt.entryEvent().newEntry();
-
- if (pendingAssignmentsWatchEntry.value() == null) {
+ private CompletableFuture<Void> handleChangePendingAssignmentEvent(Entry
pendingAssignmentsEntry) {
+ if (pendingAssignmentsEntry.value() == null) {
return completedFuture(null);
}
- int partId =
extractPartitionNumber(pendingAssignmentsWatchEntry.key());
- int tblId = extractTableId(pendingAssignmentsWatchEntry.key(),
PENDING_ASSIGNMENTS_PREFIX);
+ int partId = extractPartitionNumber(pendingAssignmentsEntry.key());
+ int tblId = extractTableId(pendingAssignmentsEntry.key(),
PENDING_ASSIGNMENTS_PREFIX);
+ long revision = pendingAssignmentsEntry.revision();
var replicaGrpId = new TablePartitionId(tblId, partId);
// Stable assignments from the meta store, which revision is bounded
by the current pending event.
- CompletableFuture<Entry> stableAssignmentsFuture =
metaStorageMgr.get(stablePartAssignmentsKey(replicaGrpId), evt.revision());
+ CompletableFuture<Entry> stableAssignmentsFuture =
metaStorageMgr.get(stablePartAssignmentsKey(replicaGrpId), revision);
- return tablesById(evt.revision())
+ return tablesById(revision)
.thenCombineAsync(stableAssignmentsFuture, (tables,
stableAssignmentsEntry) -> {
if (!busyLock.enterBusy()) {
return CompletableFuture.<Void>failedFuture(new
NodeStoppingException());
@@ -2198,10 +2227,9 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
try {
return handleChangePendingAssignmentEvent(
- evt.revision(),
replicaGrpId,
tables.get(tblId),
- pendingAssignmentsWatchEntry,
+ pendingAssignmentsEntry,
stableAssignmentsEntry
);
} finally {
@@ -2212,140 +2240,145 @@ public class TableManager extends
Producer<TableEvent, TableEventParameters> imp
}
private CompletableFuture<Void> handleChangePendingAssignmentEvent(
- long causalityToken,
TablePartitionId replicaGrpId,
TableImpl tbl,
Entry pendingAssignmentsEntry,
Entry stableAssignmentsEntry
) {
- // Assignments of the pending rebalance that we received through the
Meta storage watch mechanism.
- Set<Assignment> pendingAssignments =
ByteUtils.fromBytes(pendingAssignmentsEntry.value());
-
- PeersAndLearners pendingConfiguration =
configurationFromAssignments(pendingAssignments);
+ ClusterNode localMember = localNode();
- int tableId = tbl.tableId();
int partId = replicaGrpId.partitionId();
- byte[] stableAssignmentsBytes = stableAssignmentsEntry.value();
+ if (LOG.isInfoEnabled()) {
+ var stringKey = new String(pendingAssignmentsEntry.key(),
StandardCharsets.UTF_8);
- Set<Assignment> stableAssignments =
ByteUtils.fromBytes(stableAssignmentsBytes);
+ LOG.info("Received update on pending assignments. Check if new
raft group should be started"
+ + " [key={}, partition={}, table={},
localMemberAddress={}]",
+ stringKey, partId, tbl.name(), localMember.address());
+ }
- PeersAndLearners stableConfiguration =
configurationFromAssignments(stableAssignments);
+ Set<Assignment> pendingAssignments =
ByteUtils.fromBytes(pendingAssignmentsEntry.value());
+
+ Set<Assignment> stableAssignments =
ByteUtils.fromBytes(stableAssignmentsEntry.value());
placementDriver.updateAssignment(
replicaGrpId,
-
stableConfiguration.peers().stream().map(Peer::consistentId).collect(toList())
+
stableAssignments.stream().filter(Assignment::isPeer).map(Assignment::consistentId).collect(toList())
);
- ClusterNode localMember = localNode();
-
// Start a new Raft node and Replica if this node has appeared in the
new assignments.
boolean shouldStartLocalServices = pendingAssignments.stream()
.filter(assignment ->
localMember.name().equals(assignment.consistentId()))
.anyMatch(assignment ->
!stableAssignments.contains(assignment));
- PendingComparableValuesTracker<HybridTimestamp, Void> safeTimeTracker =
- new PendingComparableValuesTracker<>(new HybridTimestamp(1,
0));
- PendingComparableValuesTracker<Long, Void> storageIndexTracker = new
PendingComparableValuesTracker<>(0L);
+ CompletableFuture<Void> localServicesStartFuture;
- InternalTable internalTable = tbl.internalTable();
+ if (shouldStartLocalServices) {
+ localServicesStartFuture =
localPartsByTableIdVv.get(pendingAssignmentsEntry.revision())
+ .thenComposeAsync(oldMap -> {
+ int tableId = tbl.tableId();
- LOG.info("Received update on pending assignments. Check if new raft
group should be started"
- + " [key={}, partition={}, table={},
localMemberAddress={}]",
- new String(pendingAssignmentsEntry.key(),
StandardCharsets.UTF_8), partId, tbl.name(), localMember.address());
+ PartitionSet partitionSet = oldMap.get(tableId).copy();
- CompletableFuture<Void> localServicesStartFuture;
+ return getOrCreatePartitionStorages(tbl,
partitionSet).thenApply(u -> {
+ var newMap = new HashMap<>(oldMap);
- if (shouldStartLocalServices) {
- localServicesStartFuture =
localPartsByTableIdVv.get(causalityToken).thenComposeAsync(oldMap -> {
- PartitionSet partitionSet = oldMap.get(tableId).copy();
+ newMap.put(tableId, partitionSet);
- return getOrCreatePartitionStorages(tbl,
partitionSet).thenApply(u -> {
- var newMap = new HashMap<>(oldMap);
+ return newMap;
+ });
+ }, ioExecutor)
+ .thenRunAsync(() -> inBusyLock(busyLock, () -> {
+ InternalTable internalTable = tbl.internalTable();
- newMap.put(tableId, partitionSet);
+ var safeTimeTracker = new
PendingComparableValuesTracker<HybridTimestamp,
Void>(HybridTimestamp.MIN_VALUE);
+ var storageIndexTracker = new
PendingComparableValuesTracker<Long, Void>(0L);
- return newMap;
- });
- }).thenComposeAsync(unused -> {
- PartitionStorages partitionStorages =
getPartitionStorages(tbl, partId);
+ PartitionStorages partitionStorages =
getPartitionStorages(tbl, partId);
- MvPartitionStorage mvPartitionStorage =
partitionStorages.getMvPartitionStorage();
- TxStateStorage txStatePartitionStorage =
partitionStorages.getTxStateStorage();
+ MvPartitionStorage mvPartitionStorage =
partitionStorages.getMvPartitionStorage();
+ TxStateStorage txStatePartitionStorage =
partitionStorages.getTxStateStorage();
- PartitionDataStorage partitionDataStorage =
partitionDataStorage(mvPartitionStorage, internalTable, partId);
+ PartitionDataStorage partitionDataStorage =
partitionDataStorage(mvPartitionStorage, internalTable, partId);
- PartitionUpdateHandlers partitionUpdateHandlers =
createPartitionUpdateHandlers(
- partId,
- partitionDataStorage,
- tbl,
- safeTimeTracker
- );
+ PartitionUpdateHandlers partitionUpdateHandlers =
createPartitionUpdateHandlers(
+ partId,
+ partitionDataStorage,
+ tbl,
+ safeTimeTracker
+ );
- return runAsync(() -> inBusyLock(busyLock, () -> {
- try {
- Peer serverPeer =
pendingConfiguration.peer(localMember.name());
-
- RaftNodeId raftNodeId = new RaftNodeId(replicaGrpId,
serverPeer);
-
- if (!((Loza) raftMgr).isStarted(raftNodeId)) {
- startPartitionRaftGroupNode(
- replicaGrpId,
- pendingConfiguration,
- stableConfiguration,
- safeTimeTracker,
- storageIndexTracker,
- internalTable,
- txStatePartitionStorage,
- partitionDataStorage,
- partitionUpdateHandlers
- );
- }
+ PeersAndLearners pendingConfiguration =
configurationFromAssignments(pendingAssignments);
- if (!replicaMgr.isReplicaStarted(replicaGrpId)) {
- startReplicaWithNewListener(
- replicaGrpId,
- tbl,
- safeTimeTracker,
- storageIndexTracker,
- mvPartitionStorage,
- txStatePartitionStorage,
- partitionUpdateHandlers,
- (TopologyAwareRaftGroupService)
internalTable.partitionRaftGroupService(partId)
- );
+ try {
+ Peer serverPeer =
pendingConfiguration.peer(localMember.name());
+
+ RaftNodeId raftNodeId = new
RaftNodeId(replicaGrpId, serverPeer);
+
+ if (!((Loza) raftMgr).isStarted(raftNodeId)) {
+ PeersAndLearners stableConfiguration =
configurationFromAssignments(stableAssignments);
+
+ startPartitionRaftGroupNode(
+ replicaGrpId,
+ pendingConfiguration,
+ stableConfiguration,
+ safeTimeTracker,
+ storageIndexTracker,
+ internalTable,
+ txStatePartitionStorage,
+ partitionDataStorage,
+ partitionUpdateHandlers
+ );
+ }
+
+ if (!replicaMgr.isReplicaStarted(replicaGrpId)) {
+ startReplicaWithNewListener(
+ replicaGrpId,
+ tbl,
+ safeTimeTracker,
+ storageIndexTracker,
+ mvPartitionStorage,
+ txStatePartitionStorage,
+ partitionUpdateHandlers,
+ (TopologyAwareRaftGroupService)
internalTable.partitionRaftGroupService(partId)
+ );
+ }
+ } catch (NodeStoppingException ignored) {
+ // No-op.
}
- } catch (NodeStoppingException ignored) {
- // No-op.
- }
- }), ioExecutor);
- });
+ }), ioExecutor);
} else {
localServicesStartFuture = completedFuture(null);
}
return localServicesStartFuture
- .thenCompose(v ->
metaStorageMgr.get(pendingPartAssignmentsKey(replicaGrpId)))
- .thenCompose(latestPendingAssignmentsEntry -> {
- // Do not change peers of the raft group if this is a
stale event.
- // Note that we start raft node before for the sake of the
consistency in a starting and stopping raft nodes.
- if (pendingAssignmentsEntry.revision() <
latestPendingAssignmentsEntry.revision()) {
- return completedFuture(null);
- }
-
- RaftGroupService partGrpSvc =
internalTable.partitionRaftGroupService(partId);
+ .thenCompose(v -> {
+ RaftGroupService partGrpSvc =
tbl.internalTable().partitionRaftGroupService(partId);
return partGrpSvc.refreshAndGetLeaderWithTerm()
.thenCompose(leaderWithTerm -> {
- // run update of raft configuration if this
node is a leader
- if (isLocalPeer(leaderWithTerm.leader())) {
- LOG.info("Current node={} is the leader of
partition raft group={}. "
- + "Initiate rebalance
process for partition={}, table={}",
- localMember.address(),
replicaGrpId, partId, tbl.name());
-
- return
partGrpSvc.changePeersAsync(pendingConfiguration, leaderWithTerm.term());
- } else {
+ if (!isLocalPeer(leaderWithTerm.leader())) {
return completedFuture(null);
}
+
+ // run update of raft configuration if this
node is a leader
+ LOG.info("Current node={} is the leader of
partition raft group={}. "
+ + "Initiate rebalance process
for partition={}, table={}",
+ leaderWithTerm.leader(), replicaGrpId,
partId, tbl.name());
+
+ return
metaStorageMgr.get(pendingPartAssignmentsKey(replicaGrpId))
+
.thenCompose(latestPendingAssignmentsEntry -> {
+ // Do not change peers of the raft
group if this is a stale event.
+ // Note that we start raft node
before for the sake of the consistency in a starting and
+ // stopping raft nodes.
+ if
(pendingAssignmentsEntry.revision() < latestPendingAssignmentsEntry.revision())
{
+ return completedFuture(null);
+ }
+
+ PeersAndLearners newConfiguration
= configurationFromAssignments(pendingAssignments);
+
+ return
partGrpSvc.changePeersAsync(newConfiguration, leaderWithTerm.term());
+ });
});
});
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index d1d45f11c8..091ac009ca 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -119,6 +119,7 @@ import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
+import org.apache.ignite.internal.util.CursorUtils;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.lang.ByteArray;
@@ -198,7 +199,7 @@ public class TableManagerTest extends IgniteAbstractTest {
/** Meta storage manager. */
@Mock
- MetaStorageManager msm;
+ private MetaStorageManager msm;
/** Mock cluster service. */
@Mock
@@ -292,6 +293,8 @@ public class TableManagerTest extends IgniteAbstractTest {
when(replicaMgr.stopReplica(any())).thenReturn(completedFuture(true));
tblManagerFut = new CompletableFuture<>();
+
+ mockMetastore();
}
@AfterEach
@@ -315,8 +318,6 @@ public class TableManagerTest extends IgniteAbstractTest {
public void testPreconfiguredTable() throws Exception {
when(rm.startRaftGroupService(any(), any(), any())).thenAnswer(mock ->
completedFuture(mock(TopologyAwareRaftGroupService.class)));
- mockMetastore();
-
TableManager tableManager = createTableManager(tblManagerFut);
tblManagerFut.complete(tableManager);
@@ -638,8 +639,6 @@ public class TableManagerTest extends IgniteAbstractTest {
doReturn(mock(PartitionTimestampCursor.class)).when(mvPartitionStorage).scan(any());
when(txStateStorage.clear()).thenReturn(completedFuture(null));
- mockMetastore();
-
// For some reason, "when(something).thenReturn" does not work on
spies, but this notation works.
createTableManager(tblManagerFut, (mvTableStorage) -> {
doReturn(completedFuture(mvPartitionStorage)).when(mvTableStorage).createMvPartition(anyInt());
@@ -690,6 +689,10 @@ public class TableManagerTest extends IgniteAbstractTest {
when(msm.invoke(any(), any(Operation.class),
any(Operation.class))).thenReturn(completedFuture(null));
when(msm.invoke(any(), any(List.class),
any(List.class))).thenReturn(completedFuture(null));
+
+ when(msm.recoveryFinishedFuture()).thenReturn(completedFuture(0L));
+
+ when(msm.prefixLocally(any(),
anyLong())).thenReturn(CursorUtils.emptyCursor());
}
/**
@@ -738,8 +741,6 @@ public class TableManagerTest extends IgniteAbstractTest {
.thenReturn(assignment);
}
- mockMetastore();
-
TableManager tableManager = createTableManager(tblManagerFut);
int tablesBeforeCreation = tableManager.tables().size();