This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 3321b4a317 IGNITE-20767 Move code related to recovering volatile RAFT
groups out of TableManager (#2773)
3321b4a317 is described below
commit 3321b4a317445f841795cf51653a598cb3bc95b2
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Wed Nov 1 11:31:47 2023 +0400
IGNITE-20767 Move code related to recovering volatile RAFT groups out of
TableManager (#2773)
---
.../PartitionReplicatorNodeRecovery.java | 208 +++++++++++++++++++++
.../internal/table/distributed/TableManager.java | 127 ++-----------
2 files changed, 225 insertions(+), 110 deletions(-)
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionReplicatorNodeRecovery.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionReplicatorNodeRecovery.java
new file mode 100644
index 0000000000..b432cdd4c4
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionReplicatorNodeRecovery.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.function.IntFunction;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.PeersAndLearners;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.distributed.message.HasDataRequest;
+import org.apache.ignite.internal.table.distributed.message.HasDataResponse;
+import org.apache.ignite.internal.utils.RebalanceUtil;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.MessagingService;
+
+/**
+ * Code specific to recovering a partition replicator group node. This
includes a case when we lost metadata
+ * that is required for the replication protocol (for instance, for RAFT it's
about group metadata).
+ */
+class PartitionReplicatorNodeRecovery {
+ private static final long QUERY_DATA_NODES_COUNT_TIMEOUT =
TimeUnit.SECONDS.toMillis(3);
+
+ private static final TableMessagesFactory TABLE_MESSAGES_FACTORY = new
TableMessagesFactory();
+
+ private final MetaStorageManager metaStorageManager;
+
+ private final MessagingService messagingService;
+
+ /** Resolver that resolves a node consistent ID to cluster node. */
+ private final Function<String, ClusterNode> clusterNodeResolver;
+
+ /** Obtains a TableImpl instance by a table ID. */
+ private final IntFunction<TableImpl> tableById;
+
+ PartitionReplicatorNodeRecovery(
+ MetaStorageManager metaStorageManager,
+ MessagingService messagingService,
+ Function<String, ClusterNode> clusterNodeResolver,
+ IntFunction<TableImpl> tableById) {
+ this.metaStorageManager = metaStorageManager;
+ this.messagingService = messagingService;
+ this.clusterNodeResolver = clusterNodeResolver;
+ this.tableById = tableById;
+ }
+
+ /**
+ * Starts the component.
+ */
+ void start() {
+ addMessageHandler();
+ }
+
+ private void addMessageHandler() {
+ messagingService.addMessageHandler(TableMessageGroup.class, (message,
sender, correlationId) -> {
+ if (message instanceof HasDataRequest) {
+ // This message queries if a node has any data for a specific
partition of a table
+ assert correlationId != null;
+
+ HasDataRequest msg = (HasDataRequest) message;
+
+ int tableId = msg.tableId();
+ int partitionId = msg.partitionId();
+
+ boolean storageHasData = false;
+
+ TableImpl table = tableById.apply(tableId);
+
+ if (table != null) {
+ MvTableStorage storage = table.internalTable().storage();
+
+ MvPartitionStorage mvPartition =
storage.getMvPartition(partitionId);
+
+ // If node's recovery process is incomplete (no partition
storage), then we consider this node's
+ // partition storage empty.
+ if (mvPartition != null) {
+ storageHasData =
mvPartition.closestRowId(RowId.lowestRowId(partitionId)) != null;
+ }
+ }
+
+ messagingService.respond(sender,
TABLE_MESSAGES_FACTORY.hasDataResponse().result(storageHasData).build(),
correlationId);
+ }
+ });
+ }
+
+ /**
+ * Returns a future that completes with a decision: should we start the
corresponding group locally or not.
+ *
+ * @param tablePartitionId ID of the table partition.
+ * @param internalTable Table we are working with.
+ * @param newConfiguration New configuration that is going to be applied
if we'll start the group.
+ * @param localMemberAssignment Assignment of this node in this group.
+ */
+ CompletableFuture<Boolean> shouldStartGroup(
+ TablePartitionId tablePartitionId,
+ InternalTable internalTable,
+ PeersAndLearners newConfiguration,
+ Assignment localMemberAssignment
+ ) {
+ // If Raft is running in in-memory mode or the PDS has been cleared,
we need to remove the current node
+ // from the Raft group in order to avoid the double vote problem.
+ if (mightNeedGroupRecovery(internalTable)) {
+ return performGroupRecovery(tablePartitionId, newConfiguration,
localMemberAssignment);
+ }
+
+ return completedFuture(true);
+ }
+
+ private static boolean mightNeedGroupRecovery(InternalTable internalTable)
{
+ // <MUTED> See https://issues.apache.org/jira/browse/IGNITE-16668 for
details.
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-19046 Restore
"|| !hasData"
+ return internalTable.storage().isVolatile();
+ }
+
+ private CompletableFuture<Boolean> performGroupRecovery(
+ TablePartitionId tablePartitionId,
+ PeersAndLearners newConfiguration,
+ Assignment localMemberAssignment
+ ) {
+ int tableId = tablePartitionId.tableId();
+ int partId = tablePartitionId.partitionId();
+
+ // No majority and not a full partition restart - need to 'remove,
then add' nodes
+ // with current partition.
+ return queryDataNodesCount(tableId, partId, newConfiguration.peers())
+ .thenApply(dataNodesCount -> {
+ boolean fullPartitionRestart = dataNodesCount == 0;
+
+ if (fullPartitionRestart) {
+ return true;
+ }
+
+ boolean majorityAvailable = dataNodesCount >=
(newConfiguration.peers().size() / 2) + 1;
+
+ if (majorityAvailable) {
+ RebalanceUtil.startPeerRemoval(tablePartitionId,
localMemberAssignment, metaStorageManager);
+
+ return false;
+ } else {
+ // No majority and not a full partition restart - need
to restart nodes
+ // with current partition.
+ String msg = "Unable to start partition " + partId +
". Majority not available.";
+
+ throw new IgniteInternalException(msg);
+ }
+ });
+ }
+
+ /**
+ * Calculates the quantity of the data nodes for the partition of the
table.
+ *
+ * @param tblId Table id.
+ * @param partId Partition id.
+ * @param peers Raft peers.
+ * @return A future that will hold the quantity of data nodes.
+ */
+ private CompletableFuture<Long> queryDataNodesCount(int tblId, int partId,
Collection<Peer> peers) {
+ HasDataRequest request =
TABLE_MESSAGES_FACTORY.hasDataRequest().tableId(tblId).partitionId(partId).build();
+
+ //noinspection unchecked
+ CompletableFuture<Boolean>[] requestFutures = peers.stream()
+ .map(Peer::consistentId)
+ .map(clusterNodeResolver)
+ .filter(Objects::nonNull)
+ .map(node -> messagingService
+ .invoke(node, request, QUERY_DATA_NODES_COUNT_TIMEOUT)
+ .thenApply(response -> {
+ assert response instanceof HasDataResponse :
response;
+
+ return ((HasDataResponse) response).result();
+ })
+ .exceptionally(unused -> false))
+ .toArray(CompletableFuture[]::new);
+
+ return allOf(requestFutures)
+ .thenApply(unused ->
Arrays.stream(requestFutures).filter(CompletableFuture::join).count());
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 3bfceb5ab4..0a4bad9ae4 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -46,7 +46,6 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -93,7 +92,6 @@ import
org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.ByteArray;
-import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -126,7 +124,6 @@ import org.apache.ignite.internal.schema.SchemaManager;
import org.apache.ignite.internal.schema.configuration.GcConfiguration;
import org.apache.ignite.internal.storage.DataStorageManager;
import org.apache.ignite.internal.storage.MvPartitionStorage;
-import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.engine.StorageEngine;
@@ -138,8 +135,6 @@ import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler;
import org.apache.ignite.internal.table.distributed.gc.MvGc;
import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
-import org.apache.ignite.internal.table.distributed.message.HasDataRequest;
-import org.apache.ignite.internal.table.distributed.message.HasDataResponse;
import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
import
org.apache.ignite.internal.table.distributed.raft.RebalanceRaftGroupEventsListener;
@@ -176,7 +171,6 @@ import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.util.IgniteNameUtils;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
-import org.apache.ignite.network.MessagingService;
import org.apache.ignite.network.TopologyService;
import org.apache.ignite.raft.jraft.storage.impl.VolatileRaftMetaStorage;
import org.apache.ignite.raft.jraft.util.Marshaller;
@@ -188,7 +182,6 @@ import org.jetbrains.annotations.TestOnly;
* Table manager.
*/
public class TableManager implements IgniteTablesInternal, IgniteComponent {
- private static final long QUERY_DATA_NODES_COUNT_TIMEOUT =
TimeUnit.SECONDS.toMillis(3);
/** The logger. */
private static final IgniteLogger LOG =
Loggers.forClass(TableManager.class);
@@ -323,8 +316,6 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
/** Rebalance scheduler pool size. */
private static final int REBALANCE_SCHEDULER_POOL_SIZE =
Math.min(Runtime.getRuntime().availableProcessors() * 3, 20);
- private static final TableMessagesFactory TABLE_MESSAGES_FACTORY = new
TableMessagesFactory();
-
/** Meta storage listener for pending assignments. */
private final WatchListener pendingAssignmentsRebalanceListener;
@@ -347,6 +338,8 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
private final SchemaVersions schemaVersions;
+ private final PartitionReplicatorNodeRecovery
partitionReplicatorNodeRecovery;
+
/** Versioned value used only at manager startup to correctly fire table
creation events. */
private final IncrementalVersionedValue<Void> startVv;
@@ -482,6 +475,13 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
raftCommandsMarshaller = new
ThreadLocalPartitionCommandsMarshaller(clusterService.serializationRegistry());
+ partitionReplicatorNodeRecovery = new PartitionReplicatorNodeRecovery(
+ metaStorageMgr,
+ clusterService.messagingService(),
+ clusterNodeResolver,
+ tableId -> latestTablesById().get(tableId)
+ );
+
startVv = new IncrementalVersionedValue<>(registry);
}
@@ -520,7 +520,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
return onTableDelete(((DropTableEventParameters)
parameters)).thenApply(unused -> false);
});
- addMessageHandler(clusterService.messagingService());
+ partitionReplicatorNodeRecovery.start();
});
}
@@ -559,43 +559,6 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
startVv.update(recoveryRevision, (v, e) ->
pendingAssignmentsRecoveryFuture);
}
- /**
- * Adds a table manager message handler.
- *
- * @param messagingService Messaging service.
- */
- private void addMessageHandler(MessagingService messagingService) {
- messagingService.addMessageHandler(TableMessageGroup.class, (message,
sender, correlationId) -> {
- if (message instanceof HasDataRequest) {
- // This message queries if a node has any data for a specific
partition of a table
- assert correlationId != null;
-
- HasDataRequest msg = (HasDataRequest) message;
-
- int tableId = msg.tableId();
- int partitionId = msg.partitionId();
-
- boolean contains = false;
-
- TableImpl table = latestTablesById().get(tableId);
-
- if (table != null) {
- MvTableStorage storage = table.internalTable().storage();
-
- MvPartitionStorage mvPartition =
storage.getMvPartition(partitionId);
-
- // If node's recovery process is incomplete (no partition
storage), then we consider this node's
- // partition storage empty.
- if (mvPartition != null) {
- contains =
mvPartition.closestRowId(RowId.lowestRowId(partitionId)) != null;
- }
- }
-
- messagingService.respond(sender,
TABLE_MESSAGES_FACTORY.hasDataResponse().result(contains).build(),
correlationId);
- }
- });
- }
-
private CompletableFuture<?> onTableCreate(CreateTableEventParameters
parameters) {
return createTableLocally(parameters.causalityToken(),
parameters.catalogVersion(), parameters.tableDescriptor());
}
@@ -726,38 +689,12 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
// start new nodes, only if it is table creation, other cases
will be covered by rebalance logic
if (localMemberAssignment != null) {
- CompletableFuture<Boolean> shouldStartGroupFut;
-
- // If Raft is running in in-memory mode or the PDS has
been cleared, we need to remove the current node
- // from the Raft group in order to avoid the double vote
problem.
- // <MUTED> See
https://issues.apache.org/jira/browse/IGNITE-16668 for details.
- // TODO:
https://issues.apache.org/jira/browse/IGNITE-19046 Restore "|| !hasData"
- if (internalTbl.storage().isVolatile()) {
- shouldStartGroupFut = queryDataNodesCount(tableId,
partId, newConfiguration.peers())
- .thenApply(dataNodesCount -> {
- boolean fullPartitionRestart =
dataNodesCount == 0;
-
- if (fullPartitionRestart) {
- return true;
- }
-
- boolean majorityAvailable = dataNodesCount
>= (newConfiguration.peers().size() / 2) + 1;
-
- if (majorityAvailable) {
-
RebalanceUtil.startPeerRemoval(replicaGrpId, localMemberAssignment,
metaStorageMgr);
-
- return false;
- } else {
- // No majority and not a full
partition restart - need to restart nodes
- // with current partition.
- String msg = "Unable to start
partition " + partId + ". Majority not available.";
-
- throw new IgniteInternalException(msg);
- }
- });
- } else {
- shouldStartGroupFut = completedFuture(true);
- }
+ CompletableFuture<Boolean> shouldStartGroupFut =
partitionReplicatorNodeRecovery.shouldStartGroup(
+ replicaGrpId,
+ internalTbl,
+ newConfiguration,
+ localMemberAssignment
+ );
startGroupFut =
shouldStartGroupFut.thenApplyAsync(startGroup -> inBusyLock(busyLock, () -> {
if (!startGroup) {
@@ -974,36 +911,6 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
return new PartitionKey(internalTbl.tableId(), partId);
}
- /**
- * Calculates the quantity of the data nodes for the partition of the
table.
- *
- * @param tblId Table id.
- * @param partId Partition id.
- * @param peers Raft peers.
- * @return A future that will hold the quantity of data nodes.
- */
- private CompletableFuture<Long> queryDataNodesCount(int tblId, int partId,
Collection<Peer> peers) {
- HasDataRequest request =
TABLE_MESSAGES_FACTORY.hasDataRequest().tableId(tblId).partitionId(partId).build();
-
- //noinspection unchecked
- CompletableFuture<Boolean>[] requestFutures = peers.stream()
- .map(Peer::consistentId)
- .map(clusterNodeResolver)
- .filter(Objects::nonNull)
- .map(node -> clusterService.messagingService()
- .invoke(node, request, QUERY_DATA_NODES_COUNT_TIMEOUT)
- .thenApply(response -> {
- assert response instanceof HasDataResponse :
response;
-
- return ((HasDataResponse) response).result();
- })
- .exceptionally(unused -> false))
- .toArray(CompletableFuture[]::new);
-
- return allOf(requestFutures)
- .thenApply(unused ->
Arrays.stream(requestFutures).filter(CompletableFuture::join).count());
- }
-
private RaftGroupOptions groupOptionsForPartition(
MvTableStorage mvTableStorage,
TxStateTableStorage txStateTableStorage,
@@ -1623,7 +1530,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
* @param name Table name.
* @return Future representing pending completion of the {@code
TableManager#tableAsyncInternal} operation.
*/
- public CompletableFuture<TableImpl> tableAsyncInternal(String name) {
+ private CompletableFuture<TableImpl> tableAsyncInternal(String name) {
return inBusyLockAsync(busyLock, () -> {
HybridTimestamp now = clock.now();