This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 3321b4a317 IGNITE-20767 Move code related to recovering volatile RAFT 
groups out of TableManager (#2773)
3321b4a317 is described below

commit 3321b4a317445f841795cf51653a598cb3bc95b2
Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com>
AuthorDate: Wed Nov 1 11:31:47 2023 +0400

    IGNITE-20767 Move code related to recovering volatile RAFT groups out of 
TableManager (#2773)
---
 .../PartitionReplicatorNodeRecovery.java           | 208 +++++++++++++++++++++
 .../internal/table/distributed/TableManager.java   | 127 ++-----------
 2 files changed, 225 insertions(+), 110 deletions(-)

diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionReplicatorNodeRecovery.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionReplicatorNodeRecovery.java
new file mode 100644
index 0000000000..b432cdd4c4
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionReplicatorNodeRecovery.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.function.IntFunction;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.PeersAndLearners;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.distributed.message.HasDataRequest;
+import org.apache.ignite.internal.table.distributed.message.HasDataResponse;
+import org.apache.ignite.internal.utils.RebalanceUtil;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.MessagingService;
+
+/**
+ * Code specific to recovering a partition replicator group node. This 
includes a case when we lost metadata
+ * that is required for the replication protocol (for instance, for RAFT it's 
about group metadata).
+ */
+class PartitionReplicatorNodeRecovery {
+    private static final long QUERY_DATA_NODES_COUNT_TIMEOUT = 
TimeUnit.SECONDS.toMillis(3);
+
+    private static final TableMessagesFactory TABLE_MESSAGES_FACTORY = new 
TableMessagesFactory();
+
+    private final MetaStorageManager metaStorageManager;
+
+    private final MessagingService messagingService;
+
+    /** Resolver that resolves a node consistent ID to cluster node. */
+    private final Function<String, ClusterNode> clusterNodeResolver;
+
+    /** Obtains a TableImpl instance by a table ID. */
+    private final IntFunction<TableImpl> tableById;
+
+    PartitionReplicatorNodeRecovery(
+            MetaStorageManager metaStorageManager,
+            MessagingService messagingService,
+            Function<String, ClusterNode> clusterNodeResolver,
+            IntFunction<TableImpl> tableById) {
+        this.metaStorageManager = metaStorageManager;
+        this.messagingService = messagingService;
+        this.clusterNodeResolver = clusterNodeResolver;
+        this.tableById = tableById;
+    }
+
+    /**
+     * Starts the component.
+     */
+    void start() {
+        addMessageHandler();
+    }
+
+    private void addMessageHandler() {
+        messagingService.addMessageHandler(TableMessageGroup.class, (message, 
sender, correlationId) -> {
+            if (message instanceof HasDataRequest) {
+                // This message queries if a node has any data for a specific 
partition of a table
+                assert correlationId != null;
+
+                HasDataRequest msg = (HasDataRequest) message;
+
+                int tableId = msg.tableId();
+                int partitionId = msg.partitionId();
+
+                boolean storageHasData = false;
+
+                TableImpl table = tableById.apply(tableId);
+
+                if (table != null) {
+                    MvTableStorage storage = table.internalTable().storage();
+
+                    MvPartitionStorage mvPartition = 
storage.getMvPartition(partitionId);
+
+                    // If node's recovery process is incomplete (no partition 
storage), then we consider this node's
+                    // partition storage empty.
+                    if (mvPartition != null) {
+                        storageHasData = 
mvPartition.closestRowId(RowId.lowestRowId(partitionId)) != null;
+                    }
+                }
+
+                messagingService.respond(sender, 
TABLE_MESSAGES_FACTORY.hasDataResponse().result(storageHasData).build(), 
correlationId);
+            }
+        });
+    }
+
+    /**
+     * Returns a future that completes with a decision: should we start the 
corresponding group locally or not.
+     *
+     * @param tablePartitionId ID of the table partition.
+     * @param internalTable Table we are working with.
+     * @param newConfiguration New configuration that is going to be applied 
if we'll start the group.
+     * @param localMemberAssignment Assignment of this node in this group.
+     */
+    CompletableFuture<Boolean> shouldStartGroup(
+            TablePartitionId tablePartitionId,
+            InternalTable internalTable,
+            PeersAndLearners newConfiguration,
+            Assignment localMemberAssignment
+    ) {
+        // If Raft is running in in-memory mode or the PDS has been cleared, 
we need to remove the current node
+        // from the Raft group in order to avoid the double vote problem.
+        if (mightNeedGroupRecovery(internalTable)) {
+            return performGroupRecovery(tablePartitionId, newConfiguration, 
localMemberAssignment);
+        }
+
+        return completedFuture(true);
+    }
+
+    private static boolean mightNeedGroupRecovery(InternalTable internalTable) 
{
+        // <MUTED> See https://issues.apache.org/jira/browse/IGNITE-16668 for 
details.
+        // TODO: https://issues.apache.org/jira/browse/IGNITE-19046 Restore 
"|| !hasData"
+        return internalTable.storage().isVolatile();
+    }
+
+    private CompletableFuture<Boolean> performGroupRecovery(
+            TablePartitionId tablePartitionId,
+            PeersAndLearners newConfiguration,
+            Assignment localMemberAssignment
+    ) {
+        int tableId = tablePartitionId.tableId();
+        int partId = tablePartitionId.partitionId();
+
+        // No majority and not a full partition restart - need to 'remove, 
then add' nodes
+        // with current partition.
+        return queryDataNodesCount(tableId, partId, newConfiguration.peers())
+                .thenApply(dataNodesCount -> {
+                    boolean fullPartitionRestart = dataNodesCount == 0;
+
+                    if (fullPartitionRestart) {
+                        return true;
+                    }
+
+                    boolean majorityAvailable = dataNodesCount >= 
(newConfiguration.peers().size() / 2) + 1;
+
+                    if (majorityAvailable) {
+                        RebalanceUtil.startPeerRemoval(tablePartitionId, 
localMemberAssignment, metaStorageManager);
+
+                        return false;
+                    } else {
+                        // No majority and not a full partition restart - need 
to restart nodes
+                        // with current partition.
+                        String msg = "Unable to start partition " + partId + 
". Majority not available.";
+
+                        throw new IgniteInternalException(msg);
+                    }
+                });
+    }
+
+    /**
+     * Calculates the quantity of the data nodes for the partition of the 
table.
+     *
+     * @param tblId Table id.
+     * @param partId Partition id.
+     * @param peers Raft peers.
+     * @return A future that will hold the quantity of data nodes.
+     */
+    private CompletableFuture<Long> queryDataNodesCount(int tblId, int partId, 
Collection<Peer> peers) {
+        HasDataRequest request = 
TABLE_MESSAGES_FACTORY.hasDataRequest().tableId(tblId).partitionId(partId).build();
+
+        //noinspection unchecked
+        CompletableFuture<Boolean>[] requestFutures = peers.stream()
+                .map(Peer::consistentId)
+                .map(clusterNodeResolver)
+                .filter(Objects::nonNull)
+                .map(node -> messagingService
+                        .invoke(node, request, QUERY_DATA_NODES_COUNT_TIMEOUT)
+                        .thenApply(response -> {
+                            assert response instanceof HasDataResponse : 
response;
+
+                            return ((HasDataResponse) response).result();
+                        })
+                        .exceptionally(unused -> false))
+                .toArray(CompletableFuture[]::new);
+
+        return allOf(requestFutures)
+                .thenApply(unused -> 
Arrays.stream(requestFutures).filter(CompletableFuture::join).count());
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 3bfceb5ab4..0a4bad9ae4 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -46,7 +46,6 @@ import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -93,7 +92,6 @@ import 
org.apache.ignite.internal.distributionzones.DistributionZoneManager;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.ByteArray;
-import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.lang.IgniteStringFormatter;
 import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.logger.IgniteLogger;
@@ -126,7 +124,6 @@ import org.apache.ignite.internal.schema.SchemaManager;
 import org.apache.ignite.internal.schema.configuration.GcConfiguration;
 import org.apache.ignite.internal.storage.DataStorageManager;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
-import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.storage.engine.StorageEngine;
@@ -138,8 +135,6 @@ import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler;
 import org.apache.ignite.internal.table.distributed.gc.MvGc;
 import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
-import org.apache.ignite.internal.table.distributed.message.HasDataRequest;
-import org.apache.ignite.internal.table.distributed.message.HasDataResponse;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
 import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
 import 
org.apache.ignite.internal.table.distributed.raft.RebalanceRaftGroupEventsListener;
@@ -176,7 +171,6 @@ import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.util.IgniteNameUtils;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
-import org.apache.ignite.network.MessagingService;
 import org.apache.ignite.network.TopologyService;
 import org.apache.ignite.raft.jraft.storage.impl.VolatileRaftMetaStorage;
 import org.apache.ignite.raft.jraft.util.Marshaller;
@@ -188,7 +182,6 @@ import org.jetbrains.annotations.TestOnly;
  * Table manager.
  */
 public class TableManager implements IgniteTablesInternal, IgniteComponent {
-    private static final long QUERY_DATA_NODES_COUNT_TIMEOUT = 
TimeUnit.SECONDS.toMillis(3);
 
     /** The logger. */
     private static final IgniteLogger LOG = 
Loggers.forClass(TableManager.class);
@@ -323,8 +316,6 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
     /** Rebalance scheduler pool size. */
     private static final int REBALANCE_SCHEDULER_POOL_SIZE = 
Math.min(Runtime.getRuntime().availableProcessors() * 3, 20);
 
-    private static final TableMessagesFactory TABLE_MESSAGES_FACTORY = new 
TableMessagesFactory();
-
     /** Meta storage listener for pending assignments. */
     private final WatchListener pendingAssignmentsRebalanceListener;
 
@@ -347,6 +338,8 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
 
     private final SchemaVersions schemaVersions;
 
+    private final PartitionReplicatorNodeRecovery 
partitionReplicatorNodeRecovery;
+
     /** Versioned value used only at manager startup to correctly fire table 
creation events. */
     private final IncrementalVersionedValue<Void> startVv;
 
@@ -482,6 +475,13 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
 
         raftCommandsMarshaller = new 
ThreadLocalPartitionCommandsMarshaller(clusterService.serializationRegistry());
 
+        partitionReplicatorNodeRecovery = new PartitionReplicatorNodeRecovery(
+                metaStorageMgr,
+                clusterService.messagingService(),
+                clusterNodeResolver,
+                tableId -> latestTablesById().get(tableId)
+        );
+
         startVv = new IncrementalVersionedValue<>(registry);
     }
 
@@ -520,7 +520,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
                 return onTableDelete(((DropTableEventParameters) 
parameters)).thenApply(unused -> false);
             });
 
-            addMessageHandler(clusterService.messagingService());
+            partitionReplicatorNodeRecovery.start();
         });
     }
 
@@ -559,43 +559,6 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
         startVv.update(recoveryRevision, (v, e) -> 
pendingAssignmentsRecoveryFuture);
     }
 
-    /**
-     * Adds a table manager message handler.
-     *
-     * @param messagingService Messaging service.
-     */
-    private void addMessageHandler(MessagingService messagingService) {
-        messagingService.addMessageHandler(TableMessageGroup.class, (message, 
sender, correlationId) -> {
-            if (message instanceof HasDataRequest) {
-                // This message queries if a node has any data for a specific 
partition of a table
-                assert correlationId != null;
-
-                HasDataRequest msg = (HasDataRequest) message;
-
-                int tableId = msg.tableId();
-                int partitionId = msg.partitionId();
-
-                boolean contains = false;
-
-                TableImpl table = latestTablesById().get(tableId);
-
-                if (table != null) {
-                    MvTableStorage storage = table.internalTable().storage();
-
-                    MvPartitionStorage mvPartition = 
storage.getMvPartition(partitionId);
-
-                    // If node's recovery process is incomplete (no partition 
storage), then we consider this node's
-                    // partition storage empty.
-                    if (mvPartition != null) {
-                        contains = 
mvPartition.closestRowId(RowId.lowestRowId(partitionId)) != null;
-                    }
-                }
-
-                messagingService.respond(sender, 
TABLE_MESSAGES_FACTORY.hasDataResponse().result(contains).build(), 
correlationId);
-            }
-        });
-    }
-
     private CompletableFuture<?> onTableCreate(CreateTableEventParameters 
parameters) {
         return createTableLocally(parameters.causalityToken(), 
parameters.catalogVersion(), parameters.tableDescriptor());
     }
@@ -726,38 +689,12 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
 
                 // start new nodes, only if it is table creation, other cases 
will be covered by rebalance logic
                 if (localMemberAssignment != null) {
-                    CompletableFuture<Boolean> shouldStartGroupFut;
-
-                    // If Raft is running in in-memory mode or the PDS has 
been cleared, we need to remove the current node
-                    // from the Raft group in order to avoid the double vote 
problem.
-                    // <MUTED> See 
https://issues.apache.org/jira/browse/IGNITE-16668 for details.
-                    // TODO: 
https://issues.apache.org/jira/browse/IGNITE-19046 Restore "|| !hasData"
-                    if (internalTbl.storage().isVolatile()) {
-                        shouldStartGroupFut = queryDataNodesCount(tableId, 
partId, newConfiguration.peers())
-                                .thenApply(dataNodesCount -> {
-                                    boolean fullPartitionRestart = 
dataNodesCount == 0;
-
-                                    if (fullPartitionRestart) {
-                                        return true;
-                                    }
-
-                                    boolean majorityAvailable = dataNodesCount 
>= (newConfiguration.peers().size() / 2) + 1;
-
-                                    if (majorityAvailable) {
-                                        
RebalanceUtil.startPeerRemoval(replicaGrpId, localMemberAssignment, 
metaStorageMgr);
-
-                                        return false;
-                                    } else {
-                                        // No majority and not a full 
partition restart - need to restart nodes
-                                        // with current partition.
-                                        String msg = "Unable to start 
partition " + partId + ". Majority not available.";
-
-                                        throw new IgniteInternalException(msg);
-                                    }
-                                });
-                    } else {
-                        shouldStartGroupFut = completedFuture(true);
-                    }
+                    CompletableFuture<Boolean> shouldStartGroupFut = 
partitionReplicatorNodeRecovery.shouldStartGroup(
+                            replicaGrpId,
+                            internalTbl,
+                            newConfiguration,
+                            localMemberAssignment
+                    );
 
                     startGroupFut = 
shouldStartGroupFut.thenApplyAsync(startGroup -> inBusyLock(busyLock, () -> {
                         if (!startGroup) {
@@ -974,36 +911,6 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
         return new PartitionKey(internalTbl.tableId(), partId);
     }
 
-    /**
-     * Calculates the quantity of the data nodes for the partition of the 
table.
-     *
-     * @param tblId Table id.
-     * @param partId Partition id.
-     * @param peers Raft peers.
-     * @return A future that will hold the quantity of data nodes.
-     */
-    private CompletableFuture<Long> queryDataNodesCount(int tblId, int partId, 
Collection<Peer> peers) {
-        HasDataRequest request = 
TABLE_MESSAGES_FACTORY.hasDataRequest().tableId(tblId).partitionId(partId).build();
-
-        //noinspection unchecked
-        CompletableFuture<Boolean>[] requestFutures = peers.stream()
-                .map(Peer::consistentId)
-                .map(clusterNodeResolver)
-                .filter(Objects::nonNull)
-                .map(node -> clusterService.messagingService()
-                        .invoke(node, request, QUERY_DATA_NODES_COUNT_TIMEOUT)
-                        .thenApply(response -> {
-                            assert response instanceof HasDataResponse : 
response;
-
-                            return ((HasDataResponse) response).result();
-                        })
-                        .exceptionally(unused -> false))
-                .toArray(CompletableFuture[]::new);
-
-        return allOf(requestFutures)
-                .thenApply(unused -> 
Arrays.stream(requestFutures).filter(CompletableFuture::join).count());
-    }
-
     private RaftGroupOptions groupOptionsForPartition(
             MvTableStorage mvTableStorage,
             TxStateTableStorage txStateTableStorage,
@@ -1623,7 +1530,7 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
      * @param name Table name.
      * @return Future representing pending completion of the {@code 
TableManager#tableAsyncInternal} operation.
      */
-    public CompletableFuture<TableImpl> tableAsyncInternal(String name) {
+    private CompletableFuture<TableImpl> tableAsyncInternal(String name) {
         return inBusyLockAsync(busyLock, () -> {
             HybridTimestamp now = clock.now();
 

Reply via email to