This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 0232275839 IGNITE-20344 Fix rebalance recovery and add tests (#2581)
0232275839 is described below
commit 0232275839138080eb9cf1154a1d221921ee9d35
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Thu Sep 14 16:43:44 2023 +0300
IGNITE-20344 Fix rebalance recovery and add tests (#2581)
---
.../apache/ignite/internal/util/IgniteUtils.java | 58 ++++++++-----
.../ignite/internal/util/IgniteUtilsTest.java | 21 +++++
.../rebalance/ItRebalanceRecoveryTest.java | 75 +++++++++++++++++
.../internal/table/distributed/TableManager.java | 98 ++++++++++++++--------
4 files changed, 197 insertions(+), 55 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 88c8a588f4..8a6f6167b4 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -57,6 +57,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
@@ -184,8 +185,8 @@ public class IgniteUtils {
* Creates new {@link HashMap} with expected size.
*
* @param expSize Expected size of the created map.
- * @param <K> Type of the map's keys.
- * @param <V> Type of the map's values.
+ * @param <K> Type of the map's keys.
+ * @param <V> Type of the map's values.
* @return New map.
*/
public static <K, V> HashMap<K, V> newHashMap(int expSize) {
@@ -196,8 +197,8 @@ public class IgniteUtils {
* Creates new {@link LinkedHashMap} with expected size.
*
* @param expSize Expected size of created map.
- * @param <K> Type of the map's keys.
- * @param <V> Type of the map's values.
+ * @param <K> Type of the map's keys.
+ * @param <V> Type of the map's values.
* @return New map.
*/
public static <K, V> LinkedHashMap<K, V> newLinkedHashMap(int expSize) {
@@ -265,7 +266,7 @@ public class IgniteUtils {
/**
* Converts byte array to hex string.
*
- * @param arr Array of bytes.
+ * @param arr Array of bytes.
* @param maxLen Maximum length of result string. Rounds down to a power
of two.
* @return Hex string.
*/
@@ -343,7 +344,7 @@ public class IgniteUtils {
* Appends {@code byte} in hexadecimal format.
*
* @param sb String builder.
- * @param b Byte to add in hexadecimal format.
+ * @param b Byte to add in hexadecimal format.
*/
private static void addByteAsHex(StringBuilder sb, byte b) {
sb.append(Integer.toHexString(MASK & b >>>
4)).append(Integer.toHexString(MASK & b));
@@ -447,7 +448,7 @@ public class IgniteUtils {
* Gets class for provided name. Accepts primitive types names.
*
* @param clsName Class name.
- * @param ldr Class loader.
+ * @param ldr Class loader.
* @return Class.
* @throws ClassNotFoundException If class not found.
*/
@@ -458,8 +459,8 @@ public class IgniteUtils {
/**
* Gets class for provided name. Accepts primitive types names.
*
- * @param clsName Class name.
- * @param ldr Class loader.
+ * @param clsName Class name.
+ * @param ldr Class loader.
* @param clsFilter Predicate to filter class names.
* @return Class.
* @throws ClassNotFoundException If class not found.
@@ -600,7 +601,7 @@ public class IgniteUtils {
*
* @param service the {@code ExecutorService} to shut down
* @param timeout the maximum time to wait for the {@code ExecutorService}
to terminate
- * @param unit the time unit of the timeout argument
+ * @param unit the time unit of the timeout argument
*/
public static void shutdownAndAwaitTermination(ExecutorService service,
long timeout, TimeUnit unit) {
long halfTimeoutNanos = unit.toNanos(timeout) / 2;
@@ -672,9 +673,8 @@ public class IgniteUtils {
}
/**
- * Closes all provided objects. If any of the {@link
ManuallyCloseable#close} methods throw an exception,
- * only the first thrown exception will be propagated to the caller, after
all other objects are closed,
- * similar to the try-with-resources block.
+ * Closes all provided objects. If any of the {@link
ManuallyCloseable#close} methods throw an exception, only the first thrown
+ * exception will be propagated to the caller, after all other objects are
closed, similar to the try-with-resources block.
*
* @param closeables Stream of objects to close.
* @throws Exception If failed to close.
@@ -751,7 +751,7 @@ public class IgniteUtils {
*
* @param sourcePath The path to the file to move.
* @param targetPath The path to the target file.
- * @param log Optional logger.
+ * @param log Optional logger.
* @return The path to the target file.
* @throws IOException If the source file cannot be moved to the target.
*/
@@ -933,7 +933,7 @@ public class IgniteUtils {
* @param busyLock Component's busy lock.
* @param fn Function to run.
* @return Future returned from the {@code fn}, or future with the {@link
NodeStoppingException} if
- * {@link IgniteSpinBusyLock#enterBusy()} failed or with runtime
exception/error while executing the {@code fn}.
+ * {@link IgniteSpinBusyLock#enterBusy()} failed or with runtime
exception/error while executing the {@code fn}.
*/
public static <T> CompletableFuture<T> inBusyLockAsync(IgniteSpinBusyLock
busyLock, Supplier<CompletableFuture<T>> fn) {
if (!busyLock.enterBusy()) {
@@ -980,8 +980,8 @@ public class IgniteUtils {
}
/**
- * Cancels the future and runs a consumer on future's result if it was
completed before the cancellation.
- * Does nothing if future is cancelled or completed exceptionally.
+ * Cancels the future and runs a consumer on future's result if it was
completed before the cancellation. Does nothing if future is
+ * cancelled or completed exceptionally.
*
* @param future Future.
* @param consumer Consumer that accepts future's result.
@@ -1069,7 +1069,8 @@ public class IgniteUtils {
* Retries operation until it succeeds or fails with exception that is
different than the given.
*
* @param operation Operation.
- * @param stopRetryCondition Condition that accepts the exception if one
has been thrown, and defines whether retries should be stopped.
+ * @param stopRetryCondition Condition that accepts the exception if one
has been thrown, and defines whether retries should be
+ * stopped.
* @param executor Executor to make retry in.
* @return Future that is completed when operation is successful or failed
with other exception than the given.
*/
@@ -1089,7 +1090,8 @@ public class IgniteUtils {
* Retries operation until it succeeds or fails with exception that is
different than the given.
*
* @param operation Operation.
- * @param stopRetryCondition Condition that accepts the exception if one
has been thrown, and defines whether retries should be stopped.
+ * @param stopRetryCondition Condition that accepts the exception if one
has been thrown, and defines whether retries should be
+ * stopped.
* @param executor Executor to make retry in.
* @param fut Future that is completed when operation is successful or
failed with other exception than the given.
*/
@@ -1196,4 +1198,22 @@ public class IgniteUtils {
public static void stopAll(Stream<? extends IgniteComponent> components)
throws Exception {
closeAll(components.filter(Objects::nonNull).map(component ->
component::stop));
}
+
+ /**
+ * Creates a consumer that, when passed to a {@link
CompletableFuture#whenComplete} call, will copy the outcome (either successful
or
+ * not) of the target future to the given future.
+ *
+ * @param future Future to copy the outcome to.
+ * @param <T> Future result type.
+ * @return Consumer for transferring a future outcome to another future.
+ */
+ public static <T> BiConsumer<T, Throwable> copyStateTo(CompletableFuture<?
super T> future) {
+ return (v, e) -> {
+ if (e != null) {
+ future.completeExceptionally(e);
+ } else {
+ future.complete(v);
+ }
+ };
+ }
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java
index 7501eac7a3..ffd5fd3b92 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java
@@ -20,7 +20,10 @@ package org.apache.ignite.internal.util;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.apache.ignite.internal.util.IgniteUtils.awaitForWorkersStop;
+import static org.apache.ignite.internal.util.IgniteUtils.copyStateTo;
import static org.apache.ignite.internal.util.IgniteUtils.getUninterruptibly;
import static org.apache.ignite.internal.util.IgniteUtils.isPow2;
import static org.apache.ignite.internal.util.IgniteUtils.toHexString;
@@ -206,4 +209,22 @@ class IgniteUtilsTest extends BaseIgniteAbstractTest {
verify(worker0, times(2)).join();
verify(worker1, times(2)).join();
}
+
+ @Test
+ void testCopyStateToNormal() {
+ CompletableFuture<Number> result = new CompletableFuture<>();
+
+ completedFuture(2).whenComplete(copyStateTo(result));
+
+ assertThat(result, willBe(equalTo(2)));
+ }
+
+ @Test
+ void testCopyStateToException() {
+ CompletableFuture<Number> result = new CompletableFuture<>();
+
+ CompletableFuture.<Integer>failedFuture(new
NumberFormatException()).whenComplete(copyStateTo(result));
+
+ assertThat(result, willThrow(NumberFormatException.class));
+ }
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceRecoveryTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceRecoveryTest.java
new file mode 100644
index 0000000000..f1af298c4b
--- /dev/null
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceRecoveryTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rebalance;
+
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.internal.test.WatchListenerInhibitor;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for recovery of the rebalance procedure.
+ */
+public class ItRebalanceRecoveryTest extends ClusterPerTestIntegrationTest {
+ @Override
+ protected int initialNodes() {
+ return 2;
+ }
+
+ @Test
+ void testPendingAssignmentsRecovery() throws InterruptedException {
+ cluster.doInSession(0, session -> {
+ session.execute(null, "CREATE ZONE TEST_ZONE WITH PARTITIONS=1,
REPLICAS=1");
+ session.execute(null, "CREATE TABLE TEST (id INT PRIMARY KEY, name
INT) WITH PRIMARY_ZONE='TEST_ZONE'");
+ session.execute(null, "INSERT INTO TEST VALUES (0, 0)");
+ });
+
+ assertTrue(containsPartition(cluster.node(0)));
+ assertFalse(containsPartition(cluster.node(1)));
+
+ // Block Meta Storage watches on node 1 to inhibit pending assignments
handling.
+
WatchListenerInhibitor.metastorageEventsInhibitor(cluster.node(1)).startInhibit();
+
+ // Change the number of replicas so that the table would get
replicated on both nodes.
+ cluster.doInSession(0, session -> {
+ session.execute(null, "ALTER ZONE TEST_ZONE SET REPLICAS=2");
+ });
+
+ cluster.restartNode(1);
+
+ assertTrue(containsPartition(cluster.node(0)));
+ assertTrue(waitForCondition(() -> containsPartition(cluster.node(1)),
10_000));
+ }
+
+ private static boolean containsPartition(Ignite node) {
+ var tableManager = ((TableManager) node.tables());
+
+ MvPartitionStorage storage = tableManager.tableImpl("TEST")
+ .internalTable()
+ .storage()
+ .getMvPartition(0);
+
+ return storage.rowsCount() != 0;
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 111975e0bf..5c1237fbb9 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.table.distributed;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap;
import static java.util.concurrent.CompletableFuture.allOf;
@@ -39,11 +40,9 @@ import static
org.apache.ignite.internal.utils.RebalanceUtil.extractPartitionNum
import static org.apache.ignite.internal.utils.RebalanceUtil.extractTableId;
import static
org.apache.ignite.internal.utils.RebalanceUtil.pendingPartAssignmentsKey;
import static
org.apache.ignite.internal.utils.RebalanceUtil.stablePartAssignmentsKey;
-import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.io.IOException;
-import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
@@ -496,19 +495,15 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
lowWatermark.start();
- startTables();
+ CompletableFuture<Long> recoveryFinishFuture =
metaStorageMgr.recoveryFinishedFuture();
- try {
- metaStorageMgr.recoveryFinishedFuture()
- .thenComposeAsync(this::performRebalanceOnRecovery,
ioExecutor)
- .get();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
- throw new IgniteInternalException(INTERNAL_ERR, e);
- } catch (ExecutionException e) {
- throw new IgniteInternalException(INTERNAL_ERR, e);
- }
+ assert recoveryFinishFuture.isDone();
+
+ long recoveryRevision = recoveryFinishFuture.join();
+
+ startTables(recoveryRevision);
+
+ performRebalanceOnRecovery(recoveryRevision);
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(PENDING_ASSIGNMENTS_PREFIX),
pendingAssignmentsRebalanceListener);
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX),
stableAssignmentsRebalanceListener);
@@ -536,16 +531,39 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
});
}
- private CompletableFuture<Void> performRebalanceOnRecovery(long revision) {
+ private void performRebalanceOnRecovery(long recoveryRevision) {
+ CompletableFuture<Void> pendingAssignmentsRecoveryFuture;
+
var prefix = new ByteArray(PENDING_ASSIGNMENTS_PREFIX);
- try (Cursor<Entry> cursor = metaStorageMgr.prefixLocally(prefix,
revision)) {
+ try (Cursor<Entry> cursor = metaStorageMgr.prefixLocally(prefix,
recoveryRevision)) {
CompletableFuture<?>[] futures = cursor.stream()
- .map(this::handleChangePendingAssignmentEvent)
+ .map(pendingAssignmentEntry -> {
+ if (LOG.isInfoEnabled()) {
+ LOG.info(
+ "Missed pending assignments for key '{}'
discovered, performing recovery",
+ new String(pendingAssignmentEntry.key(),
UTF_8)
+ );
+ }
+
+ // We use the Meta Storage recovery revision here
instead of the entry revision, because
+ // 'handleChangePendingAssignmentEvent' accesses some
Versioned Values that only store values starting with
+ // tokens equal to Meta Storage recovery revision. In
other words, if the entry has a lower revision than the
+ // recovery revision, there will never be a Versioned
Value corresponding to its revision.
+ return
handleChangePendingAssignmentEvent(pendingAssignmentEntry, recoveryRevision);
+ })
.toArray(CompletableFuture[]::new);
- return allOf(futures);
+ pendingAssignmentsRecoveryFuture = allOf(futures)
+ // Simply log any errors, we don't want to block watch
processing.
+ .exceptionally(e -> {
+ LOG.error("Error when performing pending assignments
recovery", e);
+
+ return null;
+ });
}
+
+ startVv.update(recoveryRevision, (v, e) ->
pendingAssignmentsRecoveryFuture);
}
/**
@@ -1745,9 +1763,9 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
}
try {
- assert evt.single();
+ Entry newEntry = evt.entryEvent().newEntry();
- return
handleChangePendingAssignmentEvent(evt.entryEvent().newEntry());
+ return handleChangePendingAssignmentEvent(newEntry,
evt.revision());
} finally {
busyLock.leaveBusy();
}
@@ -1760,14 +1778,13 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
};
}
- private CompletableFuture<Void> handleChangePendingAssignmentEvent(Entry
pendingAssignmentsEntry) {
+ private CompletableFuture<Void> handleChangePendingAssignmentEvent(Entry
pendingAssignmentsEntry, long revision) {
if (pendingAssignmentsEntry.value() == null) {
return completedFuture(null);
}
int partId = extractPartitionNumber(pendingAssignmentsEntry.key());
int tblId = extractTableId(pendingAssignmentsEntry.key(),
PENDING_ASSIGNMENTS_PREFIX);
- long revision = pendingAssignmentsEntry.revision();
var replicaGrpId = new TablePartitionId(tblId, partId);
@@ -1781,11 +1798,24 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
}
try {
+ TableImpl table = tables.get(tblId);
+
+ // Table can be null only recovery, because we use a
revision from the future. See comment inside
+ // performRebalanceOnRecovery.
+ if (table == null) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Skipping Pending Assignments update,
because table {} does not exist", tblId);
+ }
+
+ return
CompletableFuture.<Void>completedFuture(null);
+ }
+
return handleChangePendingAssignmentEvent(
replicaGrpId,
- tables.get(tblId),
+ table,
pendingAssignmentsEntry,
- stableAssignmentsEntry
+ stableAssignmentsEntry,
+ revision
);
} finally {
busyLock.leaveBusy();
@@ -1798,14 +1828,15 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
TablePartitionId replicaGrpId,
TableImpl tbl,
Entry pendingAssignmentsEntry,
- Entry stableAssignmentsEntry
+ Entry stableAssignmentsEntry,
+ long revision
) {
ClusterNode localMember = localNode();
int partId = replicaGrpId.partitionId();
if (LOG.isInfoEnabled()) {
- var stringKey = new String(pendingAssignmentsEntry.key(),
StandardCharsets.UTF_8);
+ var stringKey = new String(pendingAssignmentsEntry.key(), UTF_8);
LOG.info("Received update on pending assignments. Check if new
raft group should be started"
+ " [key={}, partition={}, table={},
localMemberAddress={}]",
@@ -1829,7 +1860,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
CompletableFuture<Void> localServicesStartFuture;
if (shouldStartLocalServices) {
- localServicesStartFuture =
localPartsByTableIdVv.get(pendingAssignmentsEntry.revision())
+ localServicesStartFuture = localPartsByTableIdVv.get(revision)
.thenComposeAsync(oldMap -> {
int tableId = tbl.tableId();
@@ -1926,7 +1957,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
// Do not change peers of the raft
group if this is a stale event.
// Note that we start raft node
before for the sake of the consistency in a starting and
// stopping raft nodes.
- if
(pendingAssignmentsEntry.revision() < latestPendingAssignmentsEntry.revision())
{
+ if (revision <
latestPendingAssignmentsEntry.revision()) {
return completedFuture(null);
}
@@ -2332,23 +2363,18 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
return tables.stream().filter(table ->
table.name().equals(name)).findAny().orElse(null);
}
- private void startTables() {
- CompletableFuture<Long> recoveryFinishFuture =
metaStorageMgr.recoveryFinishedFuture();
-
- assert recoveryFinishFuture.isDone();
-
+ private void startTables(long recoveryRevision) {
int catalogVersion = catalogService.latestCatalogVersion();
- long causalityToken = recoveryFinishFuture.join();
List<CompletableFuture<?>> startTableFutures = new ArrayList<>();
// TODO: IGNITE-20384 Clean up abandoned resources for dropped zones
from volt and metastore
for (CatalogTableDescriptor tableDescriptor :
catalogService.tables(catalogVersion)) {
- startTableFutures.add(createTableLocally(causalityToken,
catalogVersion, tableDescriptor));
+ startTableFutures.add(createTableLocally(recoveryRevision,
catalogVersion, tableDescriptor));
}
// Forces you to wait until recovery is complete before the metastore
watches is deployed to avoid races with catalog listeners.
- startVv.update(causalityToken, (unused, throwable) ->
allOf(startTableFutures.toArray(CompletableFuture[]::new)))
+ startVv.update(recoveryRevision, (unused, throwable) ->
allOf(startTableFutures.toArray(CompletableFuture[]::new)))
.whenComplete((unused, throwable) -> {
if (throwable != null) {
LOG.error("Error starting tables", throwable);