This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 0232275839 IGNITE-20344 Fix rebalance recovery and add tests (#2581)
0232275839 is described below

commit 0232275839138080eb9cf1154a1d221921ee9d35
Author: Alexander Polovtcev <alex.polovt...@gmail.com>
AuthorDate: Thu Sep 14 16:43:44 2023 +0300

    IGNITE-20344 Fix rebalance recovery and add tests (#2581)
---
 .../apache/ignite/internal/util/IgniteUtils.java   | 58 ++++++++-----
 .../ignite/internal/util/IgniteUtilsTest.java      | 21 +++++
 .../rebalance/ItRebalanceRecoveryTest.java         | 75 +++++++++++++++++
 .../internal/table/distributed/TableManager.java   | 98 ++++++++++++++--------
 4 files changed, 197 insertions(+), 55 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 88c8a588f4..8a6f6167b4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -57,6 +57,7 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Predicate;
@@ -184,8 +185,8 @@ public class IgniteUtils {
      * Creates new {@link HashMap} with expected size.
      *
      * @param expSize Expected size of the created map.
-     * @param <K>     Type of the map's keys.
-     * @param <V>     Type of the map's values.
+     * @param <K> Type of the map's keys.
+     * @param <V> Type of the map's values.
      * @return New map.
      */
     public static <K, V> HashMap<K, V> newHashMap(int expSize) {
@@ -196,8 +197,8 @@ public class IgniteUtils {
      * Creates new {@link LinkedHashMap} with expected size.
      *
      * @param expSize Expected size of created map.
-     * @param <K>     Type of the map's keys.
-     * @param <V>     Type of the map's values.
+     * @param <K> Type of the map's keys.
+     * @param <V> Type of the map's values.
      * @return New map.
      */
     public static <K, V> LinkedHashMap<K, V> newLinkedHashMap(int expSize) {
@@ -265,7 +266,7 @@ public class IgniteUtils {
     /**
      * Converts byte array to hex string.
      *
-     * @param arr    Array of bytes.
+     * @param arr Array of bytes.
      * @param maxLen Maximum length of result string. Rounds down to a power 
of two.
      * @return Hex string.
      */
@@ -343,7 +344,7 @@ public class IgniteUtils {
      * Appends {@code byte} in hexadecimal format.
      *
      * @param sb String builder.
-     * @param b  Byte to add in hexadecimal format.
+     * @param b Byte to add in hexadecimal format.
      */
     private static void addByteAsHex(StringBuilder sb, byte b) {
         sb.append(Integer.toHexString(MASK & b >>> 
4)).append(Integer.toHexString(MASK & b));
@@ -447,7 +448,7 @@ public class IgniteUtils {
      * Gets class for provided name. Accepts primitive types names.
      *
      * @param clsName Class name.
-     * @param ldr     Class loader.
+     * @param ldr Class loader.
      * @return Class.
      * @throws ClassNotFoundException If class not found.
      */
@@ -458,8 +459,8 @@ public class IgniteUtils {
     /**
      * Gets class for provided name. Accepts primitive types names.
      *
-     * @param clsName   Class name.
-     * @param ldr       Class loader.
+     * @param clsName Class name.
+     * @param ldr Class loader.
      * @param clsFilter Predicate to filter class names.
      * @return Class.
      * @throws ClassNotFoundException If class not found.
@@ -600,7 +601,7 @@ public class IgniteUtils {
      *
      * @param service the {@code ExecutorService} to shut down
      * @param timeout the maximum time to wait for the {@code ExecutorService} 
to terminate
-     * @param unit    the time unit of the timeout argument
+     * @param unit the time unit of the timeout argument
      */
     public static void shutdownAndAwaitTermination(ExecutorService service, 
long timeout, TimeUnit unit) {
         long halfTimeoutNanos = unit.toNanos(timeout) / 2;
@@ -672,9 +673,8 @@ public class IgniteUtils {
     }
 
     /**
-     * Closes all provided objects. If any of the {@link 
ManuallyCloseable#close} methods throw an exception,
-     * only the first thrown exception will be propagated to the caller, after 
all other objects are closed,
-     * similar to the try-with-resources block.
+     * Closes all provided objects. If any of the {@link 
ManuallyCloseable#close} methods throw an exception, only the first thrown
+     * exception will be propagated to the caller, after all other objects are 
closed, similar to the try-with-resources block.
      *
      * @param closeables Stream of objects to close.
      * @throws Exception If failed to close.
@@ -751,7 +751,7 @@ public class IgniteUtils {
      *
      * @param sourcePath The path to the file to move.
      * @param targetPath The path to the target file.
-     * @param log        Optional logger.
+     * @param log Optional logger.
      * @return The path to the target file.
      * @throws IOException If the source file cannot be moved to the target.
      */
@@ -933,7 +933,7 @@ public class IgniteUtils {
      * @param busyLock Component's busy lock.
      * @param fn Function to run.
      * @return Future returned from the {@code fn}, or future with the {@link 
NodeStoppingException} if
-     *      {@link IgniteSpinBusyLock#enterBusy()} failed or with runtime 
exception/error while executing the {@code fn}.
+     *         {@link IgniteSpinBusyLock#enterBusy()} failed or with runtime 
exception/error while executing the {@code fn}.
      */
     public static <T> CompletableFuture<T> inBusyLockAsync(IgniteSpinBusyLock 
busyLock, Supplier<CompletableFuture<T>> fn) {
         if (!busyLock.enterBusy()) {
@@ -980,8 +980,8 @@ public class IgniteUtils {
     }
 
     /**
-     * Cancels the future and runs a consumer on future's result if it was 
completed before the cancellation.
-     * Does nothing if future is cancelled or completed exceptionally.
+     * Cancels the future and runs a consumer on future's result if it was 
completed before the cancellation. Does nothing if future is
+     * cancelled or completed exceptionally.
      *
      * @param future Future.
      * @param consumer Consumer that accepts future's result.
@@ -1069,7 +1069,8 @@ public class IgniteUtils {
      * Retries operation until it succeeds or fails with exception that is 
different than the given.
      *
      * @param operation Operation.
-     * @param stopRetryCondition Condition that accepts the exception if one 
has been thrown, and defines whether retries should be stopped.
+     * @param stopRetryCondition Condition that accepts the exception if one 
has been thrown, and defines whether retries should be
+     *         stopped.
      * @param executor Executor to make retry in.
      * @return Future that is completed when operation is successful or failed 
with other exception than the given.
      */
@@ -1089,7 +1090,8 @@ public class IgniteUtils {
      * Retries operation until it succeeds or fails with exception that is 
different than the given.
      *
      * @param operation Operation.
-     * @param stopRetryCondition Condition that accepts the exception if one 
has been thrown, and defines whether retries should be stopped.
+     * @param stopRetryCondition Condition that accepts the exception if one 
has been thrown, and defines whether retries should be
+     *         stopped.
      * @param executor Executor to make retry in.
      * @param fut Future that is completed when operation is successful or 
failed with other exception than the given.
      */
@@ -1196,4 +1198,22 @@ public class IgniteUtils {
     public static void stopAll(Stream<? extends IgniteComponent> components) 
throws Exception {
         closeAll(components.filter(Objects::nonNull).map(component -> 
component::stop));
     }
+
+    /**
+     * Creates a consumer that, when passed to a {@link 
CompletableFuture#whenComplete} call, will copy the outcome (either successful 
or
+     * not) of the target future to the given future.
+     *
+     * @param future Future to copy the outcome to.
+     * @param <T> Future result type.
+     * @return Consumer for transferring a future outcome to another future.
+     */
+    public static <T> BiConsumer<T, Throwable> copyStateTo(CompletableFuture<? 
super T> future) {
+        return (v, e) -> {
+            if (e != null) {
+                future.completeExceptionally(e);
+            } else {
+                future.complete(v);
+            }
+        };
+    }
 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java
index 7501eac7a3..ffd5fd3b92 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java
@@ -20,7 +20,10 @@ package org.apache.ignite.internal.util;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static org.apache.ignite.internal.util.IgniteUtils.awaitForWorkersStop;
+import static org.apache.ignite.internal.util.IgniteUtils.copyStateTo;
 import static org.apache.ignite.internal.util.IgniteUtils.getUninterruptibly;
 import static org.apache.ignite.internal.util.IgniteUtils.isPow2;
 import static org.apache.ignite.internal.util.IgniteUtils.toHexString;
@@ -206,4 +209,22 @@ class IgniteUtilsTest extends BaseIgniteAbstractTest {
         verify(worker0, times(2)).join();
         verify(worker1, times(2)).join();
     }
+
+    @Test
+    void testCopyStateToNormal() {
+        CompletableFuture<Number> result = new CompletableFuture<>();
+
+        completedFuture(2).whenComplete(copyStateTo(result));
+
+        assertThat(result, willBe(equalTo(2)));
+    }
+
+    @Test
+    void testCopyStateToException() {
+        CompletableFuture<Number> result = new CompletableFuture<>();
+
+        CompletableFuture.<Integer>failedFuture(new 
NumberFormatException()).whenComplete(copyStateTo(result));
+
+        assertThat(result, willThrow(NumberFormatException.class));
+    }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceRecoveryTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceRecoveryTest.java
new file mode 100644
index 0000000000..f1af298c4b
--- /dev/null
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceRecoveryTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rebalance;
+
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.internal.test.WatchListenerInhibitor;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for recovery of the rebalance procedure.
+ */
+public class ItRebalanceRecoveryTest extends ClusterPerTestIntegrationTest {
+    @Override
+    protected int initialNodes() {
+        return 2;
+    }
+
+    @Test
+    void testPendingAssignmentsRecovery() throws InterruptedException {
+        cluster.doInSession(0, session -> {
+            session.execute(null, "CREATE ZONE TEST_ZONE WITH PARTITIONS=1, 
REPLICAS=1");
+            session.execute(null, "CREATE TABLE TEST (id INT PRIMARY KEY, name 
INT) WITH PRIMARY_ZONE='TEST_ZONE'");
+            session.execute(null, "INSERT INTO TEST VALUES (0, 0)");
+        });
+
+        assertTrue(containsPartition(cluster.node(0)));
+        assertFalse(containsPartition(cluster.node(1)));
+
+        // Block Meta Storage watches on node 1 to inhibit pending assignments 
handling.
+        
WatchListenerInhibitor.metastorageEventsInhibitor(cluster.node(1)).startInhibit();
+
+        // Change the number of replicas so that the table would get 
replicated on both nodes.
+        cluster.doInSession(0, session -> {
+            session.execute(null, "ALTER ZONE TEST_ZONE SET REPLICAS=2");
+        });
+
+        cluster.restartNode(1);
+
+        assertTrue(containsPartition(cluster.node(0)));
+        assertTrue(waitForCondition(() -> containsPartition(cluster.node(1)), 
10_000));
+    }
+
+    private static boolean containsPartition(Ignite node) {
+        var tableManager = ((TableManager) node.tables());
+
+        MvPartitionStorage storage = tableManager.tableImpl("TEST")
+                .internalTable()
+                .storage()
+                .getMvPartition(0);
+
+        return storage.rowsCount() != 0;
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 111975e0bf..5c1237fbb9 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.table.distributed;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.unmodifiableMap;
 import static java.util.concurrent.CompletableFuture.allOf;
@@ -39,11 +40,9 @@ import static 
org.apache.ignite.internal.utils.RebalanceUtil.extractPartitionNum
 import static org.apache.ignite.internal.utils.RebalanceUtil.extractTableId;
 import static 
org.apache.ignite.internal.utils.RebalanceUtil.pendingPartAssignmentsKey;
 import static 
org.apache.ignite.internal.utils.RebalanceUtil.stablePartAssignmentsKey;
-import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
@@ -496,19 +495,15 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
             lowWatermark.start();
 
-            startTables();
+            CompletableFuture<Long> recoveryFinishFuture = 
metaStorageMgr.recoveryFinishedFuture();
 
-            try {
-                metaStorageMgr.recoveryFinishedFuture()
-                        .thenComposeAsync(this::performRebalanceOnRecovery, 
ioExecutor)
-                        .get();
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-
-                throw new IgniteInternalException(INTERNAL_ERR, e);
-            } catch (ExecutionException e) {
-                throw new IgniteInternalException(INTERNAL_ERR, e);
-            }
+            assert recoveryFinishFuture.isDone();
+
+            long recoveryRevision = recoveryFinishFuture.join();
+
+            startTables(recoveryRevision);
+
+            performRebalanceOnRecovery(recoveryRevision);
 
             
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(PENDING_ASSIGNMENTS_PREFIX),
 pendingAssignmentsRebalanceListener);
             
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX),
 stableAssignmentsRebalanceListener);
@@ -536,16 +531,39 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
         });
     }
 
-    private CompletableFuture<Void> performRebalanceOnRecovery(long revision) {
+    private void performRebalanceOnRecovery(long recoveryRevision) {
+        CompletableFuture<Void> pendingAssignmentsRecoveryFuture;
+
         var prefix = new ByteArray(PENDING_ASSIGNMENTS_PREFIX);
 
-        try (Cursor<Entry> cursor = metaStorageMgr.prefixLocally(prefix, 
revision)) {
+        try (Cursor<Entry> cursor = metaStorageMgr.prefixLocally(prefix, 
recoveryRevision)) {
             CompletableFuture<?>[] futures = cursor.stream()
-                    .map(this::handleChangePendingAssignmentEvent)
+                    .map(pendingAssignmentEntry -> {
+                        if (LOG.isInfoEnabled()) {
+                            LOG.info(
+                                    "Missed pending assignments for key '{}' 
discovered, performing recovery",
+                                    new String(pendingAssignmentEntry.key(), 
UTF_8)
+                            );
+                        }
+
+                        // We use the Meta Storage recovery revision here 
instead of the entry revision, because
+                        // 'handleChangePendingAssignmentEvent' accesses some 
Versioned Values that only store values starting with
+                        // tokens equal to Meta Storage recovery revision. In 
other words, if the entry has a lower revision than the
+                        // recovery revision, there will never be a Versioned 
Value corresponding to its revision.
+                        return 
handleChangePendingAssignmentEvent(pendingAssignmentEntry, recoveryRevision);
+                    })
                     .toArray(CompletableFuture[]::new);
 
-            return allOf(futures);
+            pendingAssignmentsRecoveryFuture = allOf(futures)
+                    // Simply log any errors, we don't want to block watch 
processing.
+                    .exceptionally(e -> {
+                        LOG.error("Error when performing pending assignments 
recovery", e);
+
+                        return null;
+                    });
         }
+
+        startVv.update(recoveryRevision, (v, e) -> 
pendingAssignmentsRecoveryFuture);
     }
 
     /**
@@ -1745,9 +1763,9 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                 }
 
                 try {
-                    assert evt.single();
+                    Entry newEntry = evt.entryEvent().newEntry();
 
-                    return 
handleChangePendingAssignmentEvent(evt.entryEvent().newEntry());
+                    return handleChangePendingAssignmentEvent(newEntry, 
evt.revision());
                 } finally {
                     busyLock.leaveBusy();
                 }
@@ -1760,14 +1778,13 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
         };
     }
 
-    private CompletableFuture<Void> handleChangePendingAssignmentEvent(Entry 
pendingAssignmentsEntry) {
+    private CompletableFuture<Void> handleChangePendingAssignmentEvent(Entry 
pendingAssignmentsEntry, long revision) {
         if (pendingAssignmentsEntry.value() == null) {
             return completedFuture(null);
         }
 
         int partId = extractPartitionNumber(pendingAssignmentsEntry.key());
         int tblId = extractTableId(pendingAssignmentsEntry.key(), 
PENDING_ASSIGNMENTS_PREFIX);
-        long revision = pendingAssignmentsEntry.revision();
 
         var replicaGrpId = new TablePartitionId(tblId, partId);
 
@@ -1781,11 +1798,24 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                     }
 
                     try {
+                        TableImpl table = tables.get(tblId);
+
+                        // Table can be null only recovery, because we use a 
revision from the future. See comment inside
+                        // performRebalanceOnRecovery.
+                        if (table == null) {
+                            if (LOG.isInfoEnabled()) {
+                                LOG.info("Skipping Pending Assignments update, 
because table {} does not exist", tblId);
+                            }
+
+                            return 
CompletableFuture.<Void>completedFuture(null);
+                        }
+
                         return handleChangePendingAssignmentEvent(
                                 replicaGrpId,
-                                tables.get(tblId),
+                                table,
                                 pendingAssignmentsEntry,
-                                stableAssignmentsEntry
+                                stableAssignmentsEntry,
+                                revision
                         );
                     } finally {
                         busyLock.leaveBusy();
@@ -1798,14 +1828,15 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
             TablePartitionId replicaGrpId,
             TableImpl tbl,
             Entry pendingAssignmentsEntry,
-            Entry stableAssignmentsEntry
+            Entry stableAssignmentsEntry,
+            long revision
     ) {
         ClusterNode localMember = localNode();
 
         int partId = replicaGrpId.partitionId();
 
         if (LOG.isInfoEnabled()) {
-            var stringKey = new String(pendingAssignmentsEntry.key(), 
StandardCharsets.UTF_8);
+            var stringKey = new String(pendingAssignmentsEntry.key(), UTF_8);
 
             LOG.info("Received update on pending assignments. Check if new 
raft group should be started"
                             + " [key={}, partition={}, table={}, 
localMemberAddress={}]",
@@ -1829,7 +1860,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
         CompletableFuture<Void> localServicesStartFuture;
 
         if (shouldStartLocalServices) {
-            localServicesStartFuture = 
localPartsByTableIdVv.get(pendingAssignmentsEntry.revision())
+            localServicesStartFuture = localPartsByTableIdVv.get(revision)
                     .thenComposeAsync(oldMap -> {
                         int tableId = tbl.tableId();
 
@@ -1926,7 +1957,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                                             // Do not change peers of the raft 
group if this is a stale event.
                                             // Note that we start raft node 
before for the sake of the consistency in a starting and
                                             // stopping raft nodes.
-                                            if 
(pendingAssignmentsEntry.revision() < latestPendingAssignmentsEntry.revision()) 
{
+                                            if (revision < 
latestPendingAssignmentsEntry.revision()) {
                                                 return completedFuture(null);
                                             }
 
@@ -2332,23 +2363,18 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
         return tables.stream().filter(table -> 
table.name().equals(name)).findAny().orElse(null);
     }
 
-    private void startTables() {
-        CompletableFuture<Long> recoveryFinishFuture = 
metaStorageMgr.recoveryFinishedFuture();
-
-        assert recoveryFinishFuture.isDone();
-
+    private void startTables(long recoveryRevision) {
         int catalogVersion = catalogService.latestCatalogVersion();
-        long causalityToken = recoveryFinishFuture.join();
 
         List<CompletableFuture<?>> startTableFutures = new ArrayList<>();
 
         // TODO: IGNITE-20384 Clean up abandoned resources for dropped zones 
from volt and metastore
         for (CatalogTableDescriptor tableDescriptor : 
catalogService.tables(catalogVersion)) {
-            startTableFutures.add(createTableLocally(causalityToken, 
catalogVersion, tableDescriptor));
+            startTableFutures.add(createTableLocally(recoveryRevision, 
catalogVersion, tableDescriptor));
         }
 
         // Forces you to wait until recovery is complete before the metastore 
watches is deployed to avoid races with catalog listeners.
-        startVv.update(causalityToken, (unused, throwable) -> 
allOf(startTableFutures.toArray(CompletableFuture[]::new)))
+        startVv.update(recoveryRevision, (unused, throwable) -> 
allOf(startTableFutures.toArray(CompletableFuture[]::new)))
                 .whenComplete((unused, throwable) -> {
                     if (throwable != null) {
                         LOG.error("Error starting tables", throwable);

Reply via email to