This is an automated email from the ASF dual-hosted git repository. ibessonov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 3321b4a317 IGNITE-20767 Move code related to recovering volatile RAFT groups out of TableManager (#2773) 3321b4a317 is described below commit 3321b4a317445f841795cf51653a598cb3bc95b2 Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com> AuthorDate: Wed Nov 1 11:31:47 2023 +0400 IGNITE-20767 Move code related to recovering volatile RAFT groups out of TableManager (#2773) --- .../PartitionReplicatorNodeRecovery.java | 208 +++++++++++++++++++++ .../internal/table/distributed/TableManager.java | 127 ++----------- 2 files changed, 225 insertions(+), 110 deletions(-) diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionReplicatorNodeRecovery.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionReplicatorNodeRecovery.java new file mode 100644 index 0000000000..b432cdd4c4 --- /dev/null +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionReplicatorNodeRecovery.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed; + +import static java.util.concurrent.CompletableFuture.allOf; +import static java.util.concurrent.CompletableFuture.completedFuture; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.function.IntFunction; +import org.apache.ignite.internal.affinity.Assignment; +import org.apache.ignite.internal.lang.IgniteInternalException; +import org.apache.ignite.internal.metastorage.MetaStorageManager; +import org.apache.ignite.internal.raft.Peer; +import org.apache.ignite.internal.raft.PeersAndLearners; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.storage.MvPartitionStorage; +import org.apache.ignite.internal.storage.RowId; +import org.apache.ignite.internal.storage.engine.MvTableStorage; +import org.apache.ignite.internal.table.InternalTable; +import org.apache.ignite.internal.table.TableImpl; +import org.apache.ignite.internal.table.distributed.message.HasDataRequest; +import org.apache.ignite.internal.table.distributed.message.HasDataResponse; +import org.apache.ignite.internal.utils.RebalanceUtil; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.network.MessagingService; + +/** + * Code specific to recovering a partition replicator group node. This includes a case when we lost metadata + * that is required for the replication protocol (for instance, for RAFT it's about group metadata). + */ +class PartitionReplicatorNodeRecovery { + private static final long QUERY_DATA_NODES_COUNT_TIMEOUT = TimeUnit.SECONDS.toMillis(3); + + private static final TableMessagesFactory TABLE_MESSAGES_FACTORY = new TableMessagesFactory(); + + private final MetaStorageManager metaStorageManager; + + private final MessagingService messagingService; + + /** Resolver that resolves a node consistent ID to cluster node. */ + private final Function<String, ClusterNode> clusterNodeResolver; + + /** Obtains a TableImpl instance by a table ID. */ + private final IntFunction<TableImpl> tableById; + + PartitionReplicatorNodeRecovery( + MetaStorageManager metaStorageManager, + MessagingService messagingService, + Function<String, ClusterNode> clusterNodeResolver, + IntFunction<TableImpl> tableById) { + this.metaStorageManager = metaStorageManager; + this.messagingService = messagingService; + this.clusterNodeResolver = clusterNodeResolver; + this.tableById = tableById; + } + + /** + * Starts the component. + */ + void start() { + addMessageHandler(); + } + + private void addMessageHandler() { + messagingService.addMessageHandler(TableMessageGroup.class, (message, sender, correlationId) -> { + if (message instanceof HasDataRequest) { + // This message queries if a node has any data for a specific partition of a table + assert correlationId != null; + + HasDataRequest msg = (HasDataRequest) message; + + int tableId = msg.tableId(); + int partitionId = msg.partitionId(); + + boolean storageHasData = false; + + TableImpl table = tableById.apply(tableId); + + if (table != null) { + MvTableStorage storage = table.internalTable().storage(); + + MvPartitionStorage mvPartition = storage.getMvPartition(partitionId); + + // If node's recovery process is incomplete (no partition storage), then we consider this node's + // partition storage empty. + if (mvPartition != null) { + storageHasData = mvPartition.closestRowId(RowId.lowestRowId(partitionId)) != null; + } + } + + messagingService.respond(sender, TABLE_MESSAGES_FACTORY.hasDataResponse().result(storageHasData).build(), correlationId); + } + }); + } + + /** + * Returns a future that completes with a decision: should we start the corresponding group locally or not. + * + * @param tablePartitionId ID of the table partition. + * @param internalTable Table we are working with. + * @param newConfiguration New configuration that is going to be applied if we'll start the group. + * @param localMemberAssignment Assignment of this node in this group. + */ + CompletableFuture<Boolean> shouldStartGroup( + TablePartitionId tablePartitionId, + InternalTable internalTable, + PeersAndLearners newConfiguration, + Assignment localMemberAssignment + ) { + // If Raft is running in in-memory mode or the PDS has been cleared, we need to remove the current node + // from the Raft group in order to avoid the double vote problem. + if (mightNeedGroupRecovery(internalTable)) { + return performGroupRecovery(tablePartitionId, newConfiguration, localMemberAssignment); + } + + return completedFuture(true); + } + + private static boolean mightNeedGroupRecovery(InternalTable internalTable) { + // <MUTED> See https://issues.apache.org/jira/browse/IGNITE-16668 for details. + // TODO: https://issues.apache.org/jira/browse/IGNITE-19046 Restore "|| !hasData" + return internalTable.storage().isVolatile(); + } + + private CompletableFuture<Boolean> performGroupRecovery( + TablePartitionId tablePartitionId, + PeersAndLearners newConfiguration, + Assignment localMemberAssignment + ) { + int tableId = tablePartitionId.tableId(); + int partId = tablePartitionId.partitionId(); + + // No majority and not a full partition restart - need to 'remove, then add' nodes + // with current partition. + return queryDataNodesCount(tableId, partId, newConfiguration.peers()) + .thenApply(dataNodesCount -> { + boolean fullPartitionRestart = dataNodesCount == 0; + + if (fullPartitionRestart) { + return true; + } + + boolean majorityAvailable = dataNodesCount >= (newConfiguration.peers().size() / 2) + 1; + + if (majorityAvailable) { + RebalanceUtil.startPeerRemoval(tablePartitionId, localMemberAssignment, metaStorageManager); + + return false; + } else { + // No majority and not a full partition restart - need to restart nodes + // with current partition. + String msg = "Unable to start partition " + partId + ". Majority not available."; + + throw new IgniteInternalException(msg); + } + }); + } + + /** + * Calculates the quantity of the data nodes for the partition of the table. + * + * @param tblId Table id. + * @param partId Partition id. + * @param peers Raft peers. + * @return A future that will hold the quantity of data nodes. + */ + private CompletableFuture<Long> queryDataNodesCount(int tblId, int partId, Collection<Peer> peers) { + HasDataRequest request = TABLE_MESSAGES_FACTORY.hasDataRequest().tableId(tblId).partitionId(partId).build(); + + //noinspection unchecked + CompletableFuture<Boolean>[] requestFutures = peers.stream() + .map(Peer::consistentId) + .map(clusterNodeResolver) + .filter(Objects::nonNull) + .map(node -> messagingService + .invoke(node, request, QUERY_DATA_NODES_COUNT_TIMEOUT) + .thenApply(response -> { + assert response instanceof HasDataResponse : response; + + return ((HasDataResponse) response).result(); + }) + .exceptionally(unused -> false)) + .toArray(CompletableFuture[]::new); + + return allOf(requestFutures) + .thenApply(unused -> Arrays.stream(requestFutures).filter(CompletableFuture::join).count()); + } +} diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index 3bfceb5ab4..0a4bad9ae4 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -46,7 +46,6 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -93,7 +92,6 @@ import org.apache.ignite.internal.distributionzones.DistributionZoneManager; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.ByteArray; -import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.lang.IgniteStringFormatter; import org.apache.ignite.internal.lang.NodeStoppingException; import org.apache.ignite.internal.logger.IgniteLogger; @@ -126,7 +124,6 @@ import org.apache.ignite.internal.schema.SchemaManager; import org.apache.ignite.internal.schema.configuration.GcConfiguration; import org.apache.ignite.internal.storage.DataStorageManager; import org.apache.ignite.internal.storage.MvPartitionStorage; -import org.apache.ignite.internal.storage.RowId; import org.apache.ignite.internal.storage.StorageException; import org.apache.ignite.internal.storage.engine.MvTableStorage; import org.apache.ignite.internal.storage.engine.StorageEngine; @@ -138,8 +135,6 @@ import org.apache.ignite.internal.table.TableImpl; import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler; import org.apache.ignite.internal.table.distributed.gc.MvGc; import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler; -import org.apache.ignite.internal.table.distributed.message.HasDataRequest; -import org.apache.ignite.internal.table.distributed.message.HasDataResponse; import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage; import org.apache.ignite.internal.table.distributed.raft.PartitionListener; import org.apache.ignite.internal.table.distributed.raft.RebalanceRaftGroupEventsListener; @@ -176,7 +171,6 @@ import org.apache.ignite.lang.IgniteException; import org.apache.ignite.lang.util.IgniteNameUtils; import org.apache.ignite.network.ClusterNode; import org.apache.ignite.network.ClusterService; -import org.apache.ignite.network.MessagingService; import org.apache.ignite.network.TopologyService; import org.apache.ignite.raft.jraft.storage.impl.VolatileRaftMetaStorage; import org.apache.ignite.raft.jraft.util.Marshaller; @@ -188,7 +182,6 @@ import org.jetbrains.annotations.TestOnly; * Table manager. */ public class TableManager implements IgniteTablesInternal, IgniteComponent { - private static final long QUERY_DATA_NODES_COUNT_TIMEOUT = TimeUnit.SECONDS.toMillis(3); /** The logger. */ private static final IgniteLogger LOG = Loggers.forClass(TableManager.class); @@ -323,8 +316,6 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { /** Rebalance scheduler pool size. */ private static final int REBALANCE_SCHEDULER_POOL_SIZE = Math.min(Runtime.getRuntime().availableProcessors() * 3, 20); - private static final TableMessagesFactory TABLE_MESSAGES_FACTORY = new TableMessagesFactory(); - /** Meta storage listener for pending assignments. */ private final WatchListener pendingAssignmentsRebalanceListener; @@ -347,6 +338,8 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { private final SchemaVersions schemaVersions; + private final PartitionReplicatorNodeRecovery partitionReplicatorNodeRecovery; + /** Versioned value used only at manager startup to correctly fire table creation events. */ private final IncrementalVersionedValue<Void> startVv; @@ -482,6 +475,13 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { raftCommandsMarshaller = new ThreadLocalPartitionCommandsMarshaller(clusterService.serializationRegistry()); + partitionReplicatorNodeRecovery = new PartitionReplicatorNodeRecovery( + metaStorageMgr, + clusterService.messagingService(), + clusterNodeResolver, + tableId -> latestTablesById().get(tableId) + ); + startVv = new IncrementalVersionedValue<>(registry); } @@ -520,7 +520,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { return onTableDelete(((DropTableEventParameters) parameters)).thenApply(unused -> false); }); - addMessageHandler(clusterService.messagingService()); + partitionReplicatorNodeRecovery.start(); }); } @@ -559,43 +559,6 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { startVv.update(recoveryRevision, (v, e) -> pendingAssignmentsRecoveryFuture); } - /** - * Adds a table manager message handler. - * - * @param messagingService Messaging service. - */ - private void addMessageHandler(MessagingService messagingService) { - messagingService.addMessageHandler(TableMessageGroup.class, (message, sender, correlationId) -> { - if (message instanceof HasDataRequest) { - // This message queries if a node has any data for a specific partition of a table - assert correlationId != null; - - HasDataRequest msg = (HasDataRequest) message; - - int tableId = msg.tableId(); - int partitionId = msg.partitionId(); - - boolean contains = false; - - TableImpl table = latestTablesById().get(tableId); - - if (table != null) { - MvTableStorage storage = table.internalTable().storage(); - - MvPartitionStorage mvPartition = storage.getMvPartition(partitionId); - - // If node's recovery process is incomplete (no partition storage), then we consider this node's - // partition storage empty. - if (mvPartition != null) { - contains = mvPartition.closestRowId(RowId.lowestRowId(partitionId)) != null; - } - } - - messagingService.respond(sender, TABLE_MESSAGES_FACTORY.hasDataResponse().result(contains).build(), correlationId); - } - }); - } - private CompletableFuture<?> onTableCreate(CreateTableEventParameters parameters) { return createTableLocally(parameters.causalityToken(), parameters.catalogVersion(), parameters.tableDescriptor()); } @@ -726,38 +689,12 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { // start new nodes, only if it is table creation, other cases will be covered by rebalance logic if (localMemberAssignment != null) { - CompletableFuture<Boolean> shouldStartGroupFut; - - // If Raft is running in in-memory mode or the PDS has been cleared, we need to remove the current node - // from the Raft group in order to avoid the double vote problem. - // <MUTED> See https://issues.apache.org/jira/browse/IGNITE-16668 for details. - // TODO: https://issues.apache.org/jira/browse/IGNITE-19046 Restore "|| !hasData" - if (internalTbl.storage().isVolatile()) { - shouldStartGroupFut = queryDataNodesCount(tableId, partId, newConfiguration.peers()) - .thenApply(dataNodesCount -> { - boolean fullPartitionRestart = dataNodesCount == 0; - - if (fullPartitionRestart) { - return true; - } - - boolean majorityAvailable = dataNodesCount >= (newConfiguration.peers().size() / 2) + 1; - - if (majorityAvailable) { - RebalanceUtil.startPeerRemoval(replicaGrpId, localMemberAssignment, metaStorageMgr); - - return false; - } else { - // No majority and not a full partition restart - need to restart nodes - // with current partition. - String msg = "Unable to start partition " + partId + ". Majority not available."; - - throw new IgniteInternalException(msg); - } - }); - } else { - shouldStartGroupFut = completedFuture(true); - } + CompletableFuture<Boolean> shouldStartGroupFut = partitionReplicatorNodeRecovery.shouldStartGroup( + replicaGrpId, + internalTbl, + newConfiguration, + localMemberAssignment + ); startGroupFut = shouldStartGroupFut.thenApplyAsync(startGroup -> inBusyLock(busyLock, () -> { if (!startGroup) { @@ -974,36 +911,6 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { return new PartitionKey(internalTbl.tableId(), partId); } - /** - * Calculates the quantity of the data nodes for the partition of the table. - * - * @param tblId Table id. - * @param partId Partition id. - * @param peers Raft peers. - * @return A future that will hold the quantity of data nodes. - */ - private CompletableFuture<Long> queryDataNodesCount(int tblId, int partId, Collection<Peer> peers) { - HasDataRequest request = TABLE_MESSAGES_FACTORY.hasDataRequest().tableId(tblId).partitionId(partId).build(); - - //noinspection unchecked - CompletableFuture<Boolean>[] requestFutures = peers.stream() - .map(Peer::consistentId) - .map(clusterNodeResolver) - .filter(Objects::nonNull) - .map(node -> clusterService.messagingService() - .invoke(node, request, QUERY_DATA_NODES_COUNT_TIMEOUT) - .thenApply(response -> { - assert response instanceof HasDataResponse : response; - - return ((HasDataResponse) response).result(); - }) - .exceptionally(unused -> false)) - .toArray(CompletableFuture[]::new); - - return allOf(requestFutures) - .thenApply(unused -> Arrays.stream(requestFutures).filter(CompletableFuture::join).count()); - } - private RaftGroupOptions groupOptionsForPartition( MvTableStorage mvTableStorage, TxStateTableStorage txStateTableStorage, @@ -1623,7 +1530,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { * @param name Table name. * @return Future representing pending completion of the {@code TableManager#tableAsyncInternal} operation. */ - public CompletableFuture<TableImpl> tableAsyncInternal(String name) { + private CompletableFuture<TableImpl> tableAsyncInternal(String name) { return inBusyLockAsync(busyLock, () -> { HybridTimestamp now = clock.now();