This is an automated email from the ASF dual-hosted git repository. apolovtsev pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 0232275839 IGNITE-20344 Fix rebalance recovery and add tests (#2581) 0232275839 is described below commit 0232275839138080eb9cf1154a1d221921ee9d35 Author: Alexander Polovtcev <alex.polovt...@gmail.com> AuthorDate: Thu Sep 14 16:43:44 2023 +0300 IGNITE-20344 Fix rebalance recovery and add tests (#2581) --- .../apache/ignite/internal/util/IgniteUtils.java | 58 ++++++++----- .../ignite/internal/util/IgniteUtilsTest.java | 21 +++++ .../rebalance/ItRebalanceRecoveryTest.java | 75 +++++++++++++++++ .../internal/table/distributed/TableManager.java | 98 ++++++++++++++-------- 4 files changed, 197 insertions(+), 55 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index 88c8a588f4..8a6f6167b4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -57,6 +57,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; @@ -184,8 +185,8 @@ public class IgniteUtils { * Creates new {@link HashMap} with expected size. * * @param expSize Expected size of the created map. - * @param <K> Type of the map's keys. - * @param <V> Type of the map's values. + * @param <K> Type of the map's keys. + * @param <V> Type of the map's values. * @return New map. */ public static <K, V> HashMap<K, V> newHashMap(int expSize) { @@ -196,8 +197,8 @@ public class IgniteUtils { * Creates new {@link LinkedHashMap} with expected size. * * @param expSize Expected size of created map. - * @param <K> Type of the map's keys. - * @param <V> Type of the map's values. + * @param <K> Type of the map's keys. + * @param <V> Type of the map's values. * @return New map. */ public static <K, V> LinkedHashMap<K, V> newLinkedHashMap(int expSize) { @@ -265,7 +266,7 @@ public class IgniteUtils { /** * Converts byte array to hex string. * - * @param arr Array of bytes. + * @param arr Array of bytes. * @param maxLen Maximum length of result string. Rounds down to a power of two. * @return Hex string. */ @@ -343,7 +344,7 @@ public class IgniteUtils { * Appends {@code byte} in hexadecimal format. * * @param sb String builder. - * @param b Byte to add in hexadecimal format. + * @param b Byte to add in hexadecimal format. */ private static void addByteAsHex(StringBuilder sb, byte b) { sb.append(Integer.toHexString(MASK & b >>> 4)).append(Integer.toHexString(MASK & b)); @@ -447,7 +448,7 @@ public class IgniteUtils { * Gets class for provided name. Accepts primitive types names. * * @param clsName Class name. - * @param ldr Class loader. + * @param ldr Class loader. * @return Class. * @throws ClassNotFoundException If class not found. */ @@ -458,8 +459,8 @@ public class IgniteUtils { /** * Gets class for provided name. Accepts primitive types names. * - * @param clsName Class name. - * @param ldr Class loader. + * @param clsName Class name. + * @param ldr Class loader. * @param clsFilter Predicate to filter class names. * @return Class. * @throws ClassNotFoundException If class not found. @@ -600,7 +601,7 @@ public class IgniteUtils { * * @param service the {@code ExecutorService} to shut down * @param timeout the maximum time to wait for the {@code ExecutorService} to terminate - * @param unit the time unit of the timeout argument + * @param unit the time unit of the timeout argument */ public static void shutdownAndAwaitTermination(ExecutorService service, long timeout, TimeUnit unit) { long halfTimeoutNanos = unit.toNanos(timeout) / 2; @@ -672,9 +673,8 @@ public class IgniteUtils { } /** - * Closes all provided objects. If any of the {@link ManuallyCloseable#close} methods throw an exception, - * only the first thrown exception will be propagated to the caller, after all other objects are closed, - * similar to the try-with-resources block. + * Closes all provided objects. If any of the {@link ManuallyCloseable#close} methods throw an exception, only the first thrown + * exception will be propagated to the caller, after all other objects are closed, similar to the try-with-resources block. * * @param closeables Stream of objects to close. * @throws Exception If failed to close. @@ -751,7 +751,7 @@ public class IgniteUtils { * * @param sourcePath The path to the file to move. * @param targetPath The path to the target file. - * @param log Optional logger. + * @param log Optional logger. * @return The path to the target file. * @throws IOException If the source file cannot be moved to the target. */ @@ -933,7 +933,7 @@ public class IgniteUtils { * @param busyLock Component's busy lock. * @param fn Function to run. * @return Future returned from the {@code fn}, or future with the {@link NodeStoppingException} if - * {@link IgniteSpinBusyLock#enterBusy()} failed or with runtime exception/error while executing the {@code fn}. + * {@link IgniteSpinBusyLock#enterBusy()} failed or with runtime exception/error while executing the {@code fn}. */ public static <T> CompletableFuture<T> inBusyLockAsync(IgniteSpinBusyLock busyLock, Supplier<CompletableFuture<T>> fn) { if (!busyLock.enterBusy()) { @@ -980,8 +980,8 @@ public class IgniteUtils { } /** - * Cancels the future and runs a consumer on future's result if it was completed before the cancellation. - * Does nothing if future is cancelled or completed exceptionally. + * Cancels the future and runs a consumer on future's result if it was completed before the cancellation. Does nothing if future is + * cancelled or completed exceptionally. * * @param future Future. * @param consumer Consumer that accepts future's result. @@ -1069,7 +1069,8 @@ public class IgniteUtils { * Retries operation until it succeeds or fails with exception that is different than the given. * * @param operation Operation. - * @param stopRetryCondition Condition that accepts the exception if one has been thrown, and defines whether retries should be stopped. + * @param stopRetryCondition Condition that accepts the exception if one has been thrown, and defines whether retries should be + * stopped. * @param executor Executor to make retry in. * @return Future that is completed when operation is successful or failed with other exception than the given. */ @@ -1089,7 +1090,8 @@ public class IgniteUtils { * Retries operation until it succeeds or fails with exception that is different than the given. * * @param operation Operation. - * @param stopRetryCondition Condition that accepts the exception if one has been thrown, and defines whether retries should be stopped. + * @param stopRetryCondition Condition that accepts the exception if one has been thrown, and defines whether retries should be + * stopped. * @param executor Executor to make retry in. * @param fut Future that is completed when operation is successful or failed with other exception than the given. */ @@ -1196,4 +1198,22 @@ public class IgniteUtils { public static void stopAll(Stream<? extends IgniteComponent> components) throws Exception { closeAll(components.filter(Objects::nonNull).map(component -> component::stop)); } + + /** + * Creates a consumer that, when passed to a {@link CompletableFuture#whenComplete} call, will copy the outcome (either successful or + * not) of the target future to the given future. + * + * @param future Future to copy the outcome to. + * @param <T> Future result type. + * @return Consumer for transferring a future outcome to another future. + */ + public static <T> BiConsumer<T, Throwable> copyStateTo(CompletableFuture<? super T> future) { + return (v, e) -> { + if (e != null) { + future.completeExceptionally(e); + } else { + future.complete(v); + } + }; + } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java index 7501eac7a3..ffd5fd3b92 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java @@ -20,7 +20,10 @@ package org.apache.ignite.internal.util; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.failedFuture; import static org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; import static org.apache.ignite.internal.util.IgniteUtils.awaitForWorkersStop; +import static org.apache.ignite.internal.util.IgniteUtils.copyStateTo; import static org.apache.ignite.internal.util.IgniteUtils.getUninterruptibly; import static org.apache.ignite.internal.util.IgniteUtils.isPow2; import static org.apache.ignite.internal.util.IgniteUtils.toHexString; @@ -206,4 +209,22 @@ class IgniteUtilsTest extends BaseIgniteAbstractTest { verify(worker0, times(2)).join(); verify(worker1, times(2)).join(); } + + @Test + void testCopyStateToNormal() { + CompletableFuture<Number> result = new CompletableFuture<>(); + + completedFuture(2).whenComplete(copyStateTo(result)); + + assertThat(result, willBe(equalTo(2))); + } + + @Test + void testCopyStateToException() { + CompletableFuture<Number> result = new CompletableFuture<>(); + + CompletableFuture.<Integer>failedFuture(new NumberFormatException()).whenComplete(copyStateTo(result)); + + assertThat(result, willThrow(NumberFormatException.class)); + } } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceRecoveryTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceRecoveryTest.java new file mode 100644 index 0000000000..f1af298c4b --- /dev/null +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceRecoveryTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.rebalance; + +import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.apache.ignite.Ignite; +import org.apache.ignite.internal.ClusterPerTestIntegrationTest; +import org.apache.ignite.internal.storage.MvPartitionStorage; +import org.apache.ignite.internal.table.distributed.TableManager; +import org.apache.ignite.internal.test.WatchListenerInhibitor; +import org.junit.jupiter.api.Test; + +/** + * Tests for recovery of the rebalance procedure. + */ +public class ItRebalanceRecoveryTest extends ClusterPerTestIntegrationTest { + @Override + protected int initialNodes() { + return 2; + } + + @Test + void testPendingAssignmentsRecovery() throws InterruptedException { + cluster.doInSession(0, session -> { + session.execute(null, "CREATE ZONE TEST_ZONE WITH PARTITIONS=1, REPLICAS=1"); + session.execute(null, "CREATE TABLE TEST (id INT PRIMARY KEY, name INT) WITH PRIMARY_ZONE='TEST_ZONE'"); + session.execute(null, "INSERT INTO TEST VALUES (0, 0)"); + }); + + assertTrue(containsPartition(cluster.node(0))); + assertFalse(containsPartition(cluster.node(1))); + + // Block Meta Storage watches on node 1 to inhibit pending assignments handling. + WatchListenerInhibitor.metastorageEventsInhibitor(cluster.node(1)).startInhibit(); + + // Change the number of replicas so that the table would get replicated on both nodes. + cluster.doInSession(0, session -> { + session.execute(null, "ALTER ZONE TEST_ZONE SET REPLICAS=2"); + }); + + cluster.restartNode(1); + + assertTrue(containsPartition(cluster.node(0))); + assertTrue(waitForCondition(() -> containsPartition(cluster.node(1)), 10_000)); + } + + private static boolean containsPartition(Ignite node) { + var tableManager = ((TableManager) node.tables()); + + MvPartitionStorage storage = tableManager.tableImpl("TEST") + .internalTable() + .storage() + .getMvPartition(0); + + return storage.rowsCount() != 0; + } +} diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index 111975e0bf..5c1237fbb9 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.table.distributed; +import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Collections.emptyMap; import static java.util.Collections.unmodifiableMap; import static java.util.concurrent.CompletableFuture.allOf; @@ -39,11 +40,9 @@ import static org.apache.ignite.internal.utils.RebalanceUtil.extractPartitionNum import static org.apache.ignite.internal.utils.RebalanceUtil.extractTableId; import static org.apache.ignite.internal.utils.RebalanceUtil.pendingPartAssignmentsKey; import static org.apache.ignite.internal.utils.RebalanceUtil.stablePartAssignmentsKey; -import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; @@ -496,19 +495,15 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp lowWatermark.start(); - startTables(); + CompletableFuture<Long> recoveryFinishFuture = metaStorageMgr.recoveryFinishedFuture(); - try { - metaStorageMgr.recoveryFinishedFuture() - .thenComposeAsync(this::performRebalanceOnRecovery, ioExecutor) - .get(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - throw new IgniteInternalException(INTERNAL_ERR, e); - } catch (ExecutionException e) { - throw new IgniteInternalException(INTERNAL_ERR, e); - } + assert recoveryFinishFuture.isDone(); + + long recoveryRevision = recoveryFinishFuture.join(); + + startTables(recoveryRevision); + + performRebalanceOnRecovery(recoveryRevision); metaStorageMgr.registerPrefixWatch(ByteArray.fromString(PENDING_ASSIGNMENTS_PREFIX), pendingAssignmentsRebalanceListener); metaStorageMgr.registerPrefixWatch(ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX), stableAssignmentsRebalanceListener); @@ -536,16 +531,39 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp }); } - private CompletableFuture<Void> performRebalanceOnRecovery(long revision) { + private void performRebalanceOnRecovery(long recoveryRevision) { + CompletableFuture<Void> pendingAssignmentsRecoveryFuture; + var prefix = new ByteArray(PENDING_ASSIGNMENTS_PREFIX); - try (Cursor<Entry> cursor = metaStorageMgr.prefixLocally(prefix, revision)) { + try (Cursor<Entry> cursor = metaStorageMgr.prefixLocally(prefix, recoveryRevision)) { CompletableFuture<?>[] futures = cursor.stream() - .map(this::handleChangePendingAssignmentEvent) + .map(pendingAssignmentEntry -> { + if (LOG.isInfoEnabled()) { + LOG.info( + "Missed pending assignments for key '{}' discovered, performing recovery", + new String(pendingAssignmentEntry.key(), UTF_8) + ); + } + + // We use the Meta Storage recovery revision here instead of the entry revision, because + // 'handleChangePendingAssignmentEvent' accesses some Versioned Values that only store values starting with + // tokens equal to Meta Storage recovery revision. In other words, if the entry has a lower revision than the + // recovery revision, there will never be a Versioned Value corresponding to its revision. + return handleChangePendingAssignmentEvent(pendingAssignmentEntry, recoveryRevision); + }) .toArray(CompletableFuture[]::new); - return allOf(futures); + pendingAssignmentsRecoveryFuture = allOf(futures) + // Simply log any errors, we don't want to block watch processing. + .exceptionally(e -> { + LOG.error("Error when performing pending assignments recovery", e); + + return null; + }); } + + startVv.update(recoveryRevision, (v, e) -> pendingAssignmentsRecoveryFuture); } /** @@ -1745,9 +1763,9 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp } try { - assert evt.single(); + Entry newEntry = evt.entryEvent().newEntry(); - return handleChangePendingAssignmentEvent(evt.entryEvent().newEntry()); + return handleChangePendingAssignmentEvent(newEntry, evt.revision()); } finally { busyLock.leaveBusy(); } @@ -1760,14 +1778,13 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp }; } - private CompletableFuture<Void> handleChangePendingAssignmentEvent(Entry pendingAssignmentsEntry) { + private CompletableFuture<Void> handleChangePendingAssignmentEvent(Entry pendingAssignmentsEntry, long revision) { if (pendingAssignmentsEntry.value() == null) { return completedFuture(null); } int partId = extractPartitionNumber(pendingAssignmentsEntry.key()); int tblId = extractTableId(pendingAssignmentsEntry.key(), PENDING_ASSIGNMENTS_PREFIX); - long revision = pendingAssignmentsEntry.revision(); var replicaGrpId = new TablePartitionId(tblId, partId); @@ -1781,11 +1798,24 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp } try { + TableImpl table = tables.get(tblId); + + // Table can be null only recovery, because we use a revision from the future. See comment inside + // performRebalanceOnRecovery. + if (table == null) { + if (LOG.isInfoEnabled()) { + LOG.info("Skipping Pending Assignments update, because table {} does not exist", tblId); + } + + return CompletableFuture.<Void>completedFuture(null); + } + return handleChangePendingAssignmentEvent( replicaGrpId, - tables.get(tblId), + table, pendingAssignmentsEntry, - stableAssignmentsEntry + stableAssignmentsEntry, + revision ); } finally { busyLock.leaveBusy(); @@ -1798,14 +1828,15 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp TablePartitionId replicaGrpId, TableImpl tbl, Entry pendingAssignmentsEntry, - Entry stableAssignmentsEntry + Entry stableAssignmentsEntry, + long revision ) { ClusterNode localMember = localNode(); int partId = replicaGrpId.partitionId(); if (LOG.isInfoEnabled()) { - var stringKey = new String(pendingAssignmentsEntry.key(), StandardCharsets.UTF_8); + var stringKey = new String(pendingAssignmentsEntry.key(), UTF_8); LOG.info("Received update on pending assignments. Check if new raft group should be started" + " [key={}, partition={}, table={}, localMemberAddress={}]", @@ -1829,7 +1860,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp CompletableFuture<Void> localServicesStartFuture; if (shouldStartLocalServices) { - localServicesStartFuture = localPartsByTableIdVv.get(pendingAssignmentsEntry.revision()) + localServicesStartFuture = localPartsByTableIdVv.get(revision) .thenComposeAsync(oldMap -> { int tableId = tbl.tableId(); @@ -1926,7 +1957,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp // Do not change peers of the raft group if this is a stale event. // Note that we start raft node before for the sake of the consistency in a starting and // stopping raft nodes. - if (pendingAssignmentsEntry.revision() < latestPendingAssignmentsEntry.revision()) { + if (revision < latestPendingAssignmentsEntry.revision()) { return completedFuture(null); } @@ -2332,23 +2363,18 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp return tables.stream().filter(table -> table.name().equals(name)).findAny().orElse(null); } - private void startTables() { - CompletableFuture<Long> recoveryFinishFuture = metaStorageMgr.recoveryFinishedFuture(); - - assert recoveryFinishFuture.isDone(); - + private void startTables(long recoveryRevision) { int catalogVersion = catalogService.latestCatalogVersion(); - long causalityToken = recoveryFinishFuture.join(); List<CompletableFuture<?>> startTableFutures = new ArrayList<>(); // TODO: IGNITE-20384 Clean up abandoned resources for dropped zones from volt and metastore for (CatalogTableDescriptor tableDescriptor : catalogService.tables(catalogVersion)) { - startTableFutures.add(createTableLocally(causalityToken, catalogVersion, tableDescriptor)); + startTableFutures.add(createTableLocally(recoveryRevision, catalogVersion, tableDescriptor)); } // Forces you to wait until recovery is complete before the metastore watches is deployed to avoid races with catalog listeners. - startVv.update(causalityToken, (unused, throwable) -> allOf(startTableFutures.toArray(CompletableFuture[]::new))) + startVv.update(recoveryRevision, (unused, throwable) -> allOf(startTableFutures.toArray(CompletableFuture[]::new))) .whenComplete((unused, throwable) -> { if (throwable != null) { LOG.error("Error starting tables", throwable);