This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 7d89cdbcdda IGNITE-24517: Persist Raft meta for empty zones (#5237)
7d89cdbcdda is described below
commit 7d89cdbcddab4262e6f163968c09d49caea9c25f
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Fri Feb 28 08:30:47 2025 +0200
IGNITE-24517: Persist Raft meta for empty zones (#5237)
---
.../replicator/raft/OnSnapshotSaveHandler.java | 49 ++-------
.../replicator/raft/RaftTableProcessor.java | 16 +++
.../replicator/raft/ZonePartitionRaftListener.java | 116 ++++++++++++++-------
.../raft/snapshot/outgoing/OutgoingSnapshot.java | 60 +++++++----
.../raft/ZonePartitionRaftListenerTest.java | 104 +++++++++++++++++-
.../OutgoingSnapshotMvDataStreamingTest.java | 11 +-
.../outgoing/OutgoingSnapshotReaderTest.java | 7 +-
.../table/distributed/raft/PartitionListener.java | 55 +++++++++-
8 files changed, 305 insertions(+), 113 deletions(-)
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/OnSnapshotSaveHandler.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/OnSnapshotSaveHandler.java
index b383e2894e8..6962bc9c7ae 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/OnSnapshotSaveHandler.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/OnSnapshotSaveHandler.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.partition.replicator.raft;
-import static java.lang.Math.max;
import static java.util.concurrent.CompletableFuture.allOf;
import java.util.Collection;
@@ -25,8 +24,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
-import org.apache.ignite.internal.util.PendingComparableValuesTracker;
-import org.apache.ignite.internal.util.TrackerClosedException;
/**
* Handler for the {@link RaftGroupListener#onSnapshotSave} event.
@@ -34,20 +31,18 @@ import
org.apache.ignite.internal.util.TrackerClosedException;
public class OnSnapshotSaveHandler {
private final TxStatePartitionStorage txStatePartitionStorage;
- private final PendingComparableValuesTracker<Long, Void>
storageIndexTracker;
-
- public OnSnapshotSaveHandler(
- TxStatePartitionStorage txStatePartitionStorage,
- PendingComparableValuesTracker<Long, Void> storageIndexTracker
- ) {
+ public OnSnapshotSaveHandler(TxStatePartitionStorage
txStatePartitionStorage) {
this.txStatePartitionStorage = txStatePartitionStorage;
- this.storageIndexTracker = storageIndexTracker;
}
/**
* Called when {@link RaftGroupListener#onSnapshotSave} is triggered.
*/
- public CompletableFuture<Void>
onSnapshotSave(Collection<RaftTableProcessor> tableProcessors) {
+ public CompletableFuture<Void> onSnapshotSave(
+ long lastAppliedIndex,
+ long lastAppliedTerm,
+ Collection<RaftTableProcessor> tableProcessors
+ ) {
// The max index here is required for local recovery and a possible
scenario
// of false node failure when we actually have all required data. This
might happen because we use the minimal index
// among storages on a node restart.
@@ -60,26 +55,9 @@ public class OnSnapshotSaveHandler {
// 4) When we try to restore data starting from the minimal
lastAppliedIndex, we come to the situation
// that a raft node doesn't have such data, because the
truncation until the maximal lastAppliedIndex from 1) has happened.
// 5) Node cannot finish local recovery.
+ tableProcessors.forEach(processor ->
processor.lastApplied(lastAppliedIndex, lastAppliedTerm));
- long maxPartitionLastAppliedIndex = tableProcessors.stream()
- .mapToLong(RaftTableProcessor::lastAppliedIndex)
- .max()
- .orElse(0);
-
- long maxPartitionLastAppliedTerm = tableProcessors.stream()
- .mapToLong(RaftTableProcessor::lastAppliedTerm)
- .max()
- .orElse(0);
-
- long maxLastAppliedIndex = max(maxPartitionLastAppliedIndex,
txStatePartitionStorage.lastAppliedIndex());
-
- long maxLastAppliedTerm = max(maxPartitionLastAppliedTerm,
txStatePartitionStorage.lastAppliedTerm());
-
- tableProcessors.forEach(processor ->
processor.lastApplied(maxLastAppliedIndex, maxLastAppliedTerm));
-
- txStatePartitionStorage.lastApplied(maxLastAppliedIndex,
maxLastAppliedTerm);
-
- updateTrackerIgnoringTrackerClosedException(storageIndexTracker,
maxLastAppliedIndex);
+ txStatePartitionStorage.lastApplied(lastAppliedIndex, lastAppliedTerm);
Stream<CompletableFuture<?>> flushFutures = Stream.concat(
tableProcessors.stream().map(RaftTableProcessor::flushStorage),
@@ -88,15 +66,4 @@ public class OnSnapshotSaveHandler {
return allOf(flushFutures.toArray(CompletableFuture[]::new));
}
-
- private static <T extends Comparable<T>> void
updateTrackerIgnoringTrackerClosedException(
- PendingComparableValuesTracker<T, Void> tracker,
- T newValue
- ) {
- try {
- tracker.update(newValue, null);
- } catch (TrackerClosedException ignored) {
- // No-op.
- }
- }
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/RaftTableProcessor.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/RaftTableProcessor.java
index 4f2f20844cf..5f4544966b0 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/RaftTableProcessor.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/RaftTableProcessor.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.raft.RaftGroupConfiguration;
import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.storage.lease.LeaseInfo;
import org.jetbrains.annotations.Nullable;
/**
@@ -54,6 +55,21 @@ public interface RaftTableProcessor {
long lastAppliedTerm
);
+ /**
+ * Sets the initial state of this processor when it gets added to a zone
partition.
+ *
+ * @param config Initial Raft configuration.
+ * @param leaseInfo Initial lease information.
+ * @param lastAppliedIndex Current last applied index.
+ * @param lastAppliedTerm Current last applied term.
+ */
+ void initialize(
+ @Nullable RaftGroupConfiguration config,
+ @Nullable LeaseInfo leaseInfo,
+ long lastAppliedIndex,
+ long lastAppliedTerm
+ );
+
/**
* Returns the last applied Raft log index.
*/
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
index 116003e8e78..5e0cc97e72d 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.partition.replicator.raft;
+import static java.lang.Math.max;
import static
org.apache.ignite.internal.tx.message.TxMessageGroup.VACUUM_TX_STATE_COMMAND;
import java.io.Serializable;
@@ -44,6 +45,7 @@ import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.Pa
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.PartitionsSnapshots;
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.RaftGroupConfiguration;
+import org.apache.ignite.internal.raft.RaftGroupConfigurationConverter;
import org.apache.ignite.internal.raft.ReadCommand;
import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.internal.raft.service.CommandClosure;
@@ -52,6 +54,7 @@ import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import
org.apache.ignite.internal.replicator.command.SafeTimePropagatingCommand;
import
org.apache.ignite.internal.replicator.message.PrimaryReplicaChangeCommand;
+import org.apache.ignite.internal.storage.lease.LeaseInfo;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.message.TxMessageGroup;
import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
@@ -73,24 +76,35 @@ public class ZonePartitionRaftListener implements
RaftGroupListener {
/** Mapping table ID to table request processor. */
private final Map<Integer, RaftTableProcessor> tableProcessors = new
ConcurrentHashMap<>();
+ private final TxStatePartitionStorage txStateStorage;
+
private final PartitionsSnapshots partitionsSnapshots;
private final PartitionKey partitionKey;
/**
- * Latest committed configuration of the zone-wide Raft group.
+ * Last applied index across all table processors and {@link
#txStateStorage}.
+ *
+ * <p>Multi-threaded access is guarded by {@link
#tableProcessorsStateLock}.
+ */
+ private long lastAppliedIndex;
+
+ /**
+ * Last applied term across all table processors and {@link
#txStateStorage}.
*
- * <p>Multi-threaded access is guarded by {@link
#commitedConfigurationLock}.
+ * <p>Multi-threaded access is guarded by {@link
#tableProcessorsStateLock}.
*/
- private CommittedConfiguration currentCommitedConfiguration;
+ private long lastAppliedTerm;
- private final Object commitedConfigurationLock = new Object();
+ private final Object tableProcessorsStateLock = new Object();
private final OnSnapshotSaveHandler onSnapshotSaveHandler;
// Raft command handlers.
private final CommandHandlers commandHandlers;
+ private final RaftGroupConfigurationConverter
raftGroupConfigurationConverter = new RaftGroupConfigurationConverter();
+
/** Constructor. */
public ZonePartitionRaftListener(
ZonePartitionId zonePartitionId,
@@ -103,9 +117,10 @@ public class ZonePartitionRaftListener implements
RaftGroupListener {
this.safeTimeTracker = safeTimeTracker;
this.storageIndexTracker = storageIndexTracker;
this.partitionsSnapshots = partitionsSnapshots;
+ this.txStateStorage = txStatePartitionStorage;
this.partitionKey = new ZonePartitionKey(zonePartitionId.zoneId(),
zonePartitionId.partitionId());
- onSnapshotSaveHandler = new
OnSnapshotSaveHandler(txStatePartitionStorage, storageIndexTracker);
+ onSnapshotSaveHandler = new
OnSnapshotSaveHandler(txStatePartitionStorage);
// RAFT command handlers initialization.
this.commandHandlers = new CommandHandlers.Builder()
@@ -174,10 +189,27 @@ public class ZonePartitionRaftListener implements
RaftGroupListener {
partitionSnapshots().acquireReadLock();
try {
- boolean crossTableCommand = command instanceof
PrimaryReplicaChangeCommand
- || command instanceof
UpdateMinimumActiveTxBeginTimeCommand;
+ boolean crossTableCommand = command instanceof
UpdateMinimumActiveTxBeginTimeCommand;
+
+ if (command instanceof PrimaryReplicaChangeCommand) {
+ // This is a hack for tests, this command is not issued in
production because no zone-wide placement driver exists yet.
+ // FIXME: https://issues.apache.org/jira/browse/IGNITE-24374
+ result = processCrossTableProcessorsCommand(command,
commandIndex, commandTerm, safeTimestamp);
- if (command instanceof TableAwareCommand) {
+ if (commandIndex > txStateStorage.lastAppliedIndex()) {
+ var primaryReplicaChangeCommand =
(PrimaryReplicaChangeCommand) command;
+
+ var leaseInfo = new LeaseInfo(
+ primaryReplicaChangeCommand.leaseStartTime(),
+ primaryReplicaChangeCommand.primaryReplicaNodeId(),
+
primaryReplicaChangeCommand.primaryReplicaNodeName()
+ );
+
+ txStateStorage.leaseInfo(leaseInfo, commandIndex,
commandTerm);
+
+ result = new IgniteBiTuple<>(null, true);
+ }
+ } else if (command instanceof TableAwareCommand) {
result = processTableAwareCommand(
((TableAwareCommand) command).tableId(),
command,
@@ -186,9 +218,6 @@ public class ZonePartitionRaftListener implements
RaftGroupListener {
safeTimestamp
);
} else if (crossTableCommand) {
- // PrimaryReplicaChangeCommand
- // This is a hack for tests, this command is not issued in
production because no zone-wide placement driver exists yet.
- // FIXME: https://issues.apache.org/jira/browse/IGNITE-24374
result = processCrossTableProcessorsCommand(command,
commandIndex, commandTerm, safeTimestamp);
} else {
AbstractCommandHandler<?> commandHandler =
@@ -211,6 +240,11 @@ public class ZonePartitionRaftListener implements
RaftGroupListener {
updateTrackerIgnoringTrackerClosedException(storageIndexTracker, clo.index());
}
+
+ synchronized (tableProcessorsStateLock) {
+ lastAppliedIndex = max(lastAppliedIndex, commandIndex);
+ lastAppliedTerm = max(lastAppliedTerm, commandTerm);
+ }
} finally {
partitionSnapshots().releaseReadLock();
}
@@ -270,22 +304,40 @@ public class ZonePartitionRaftListener implements
RaftGroupListener {
@Override
public void onConfigurationCommitted(RaftGroupConfiguration config, long
lastAppliedIndex, long lastAppliedTerm) {
- synchronized (commitedConfigurationLock) {
- currentCommitedConfiguration = new CommittedConfiguration(config,
lastAppliedIndex, lastAppliedTerm);
-
+ synchronized (tableProcessorsStateLock) {
tableProcessors.values()
.forEach(listener ->
listener.onConfigurationCommitted(config, lastAppliedIndex, lastAppliedTerm));
+
+ byte[] configBytes =
raftGroupConfigurationConverter.toBytes(config);
+
+ txStateStorage.committedGroupConfiguration(configBytes,
lastAppliedIndex, lastAppliedTerm);
+
+ this.lastAppliedIndex = max(this.lastAppliedIndex,
lastAppliedIndex);
+ this.lastAppliedTerm = max(this.lastAppliedTerm, lastAppliedTerm);
}
}
@Override
public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
- onSnapshotSaveHandler.onSnapshotSave(tableProcessors.values())
+ long lastAppliedIndex;
+ long lastAppliedTerm;
+
+ synchronized (tableProcessorsStateLock) {
+ lastAppliedIndex = this.lastAppliedIndex;
+ lastAppliedTerm = this.lastAppliedTerm;
+ }
+
+ onSnapshotSaveHandler.onSnapshotSave(lastAppliedIndex,
lastAppliedTerm, tableProcessors.values())
.whenComplete((unused, throwable) ->
doneClo.accept(throwable));
}
@Override
public boolean onSnapshotLoad(Path path) {
+ synchronized (tableProcessorsStateLock) {
+ lastAppliedIndex = max(lastAppliedIndex,
txStateStorage.lastAppliedIndex());
+ lastAppliedTerm = max(lastAppliedTerm,
txStateStorage.lastAppliedTerm());
+ }
+
return true;
}
@@ -298,16 +350,20 @@ public class ZonePartitionRaftListener implements
RaftGroupListener {
/**
* Adds a given Table Partition-level Raft processor to the set of managed
processor.
+ *
+ * <p>Callers of this method must ensure that no commands are issued to
this processor before this method returns.
+ *
+ * <p>During table creation this is achieved by executing this method
while processing a Catalog event, which blocks bumping the Catalog
+ * version. Until the Catalog version is updated, commands targeting the
table being added will be rejected by an interceptor that
+ * requires the Catalog version to be equal to a particular value.
*/
public void addTableProcessor(TablePartitionId tablePartitionId,
RaftTableProcessor processor) {
- synchronized (commitedConfigurationLock) {
- if (currentCommitedConfiguration != null) {
- processor.onConfigurationCommitted(
- currentCommitedConfiguration.configuration,
- currentCommitedConfiguration.lastAppliedIndex,
- currentCommitedConfiguration.lastAppliedTerm
- );
- }
+ synchronized (tableProcessorsStateLock) {
+ RaftGroupConfiguration configuration =
raftGroupConfigurationConverter.fromBytes(txStateStorage.committedGroupConfiguration());
+
+ LeaseInfo leaseInfo = txStateStorage.leaseInfo();
+
+ processor.initialize(configuration, leaseInfo, lastAppliedIndex,
lastAppliedTerm);
RaftTableProcessor prev =
tableProcessors.put(tablePartitionId.tableId(), processor);
@@ -333,18 +389,4 @@ public class ZonePartitionRaftListener implements
RaftGroupListener {
private PartitionSnapshots partitionSnapshots() {
return partitionsSnapshots.partitionSnapshots(partitionKey);
}
-
- private static class CommittedConfiguration {
- final RaftGroupConfiguration configuration;
-
- final long lastAppliedIndex;
-
- final long lastAppliedTerm;
-
- CommittedConfiguration(RaftGroupConfiguration configuration, long
lastAppliedIndex, long lastAppliedTerm) {
- this.configuration = configuration;
- this.lastAppliedIndex = lastAppliedIndex;
- this.lastAppliedTerm = lastAppliedTerm;
- }
- }
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java
index f9fc8b9334f..f87aa24aa4c 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing;
-import static java.lang.Math.max;
import static java.util.Comparator.comparingInt;
import static java.util.Comparator.comparingLong;
import static java.util.stream.Collectors.toList;
@@ -62,6 +61,7 @@ import
org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.lease.LeaseInfo;
import org.apache.ignite.internal.tx.TxMeta;
import org.apache.ignite.internal.tx.message.TxMessagesFactory;
import org.apache.ignite.internal.tx.message.TxMetaMessage;
@@ -201,32 +201,50 @@ public class OutgoingSnapshot {
}
private PartitionSnapshotMeta takeSnapshotMeta(int catalogVersion,
Collection<PartitionMvStorageAccess> partitionStorages) {
- // TODO: partitionsByTableId will be empty for zones without tables,
need another way to get meta in that case,
- // see https://issues.apache.org/jira/browse/IGNITE-24517
- PartitionMvStorageAccess partitionStorageWithMaxAppliedIndex =
partitionStorages.stream()
- .max(comparingLong(PartitionMvStorageAccess::lastAppliedIndex))
- .orElseThrow();
-
- RaftGroupConfiguration config =
partitionStorageWithMaxAppliedIndex.committedGroupConfiguration();
-
- assert config != null : "Configuration should never be null when
installing a snapshot";
-
Map<Integer, UUID> nextRowIdToBuildByIndexId =
collectNextRowIdToBuildIndexes(
catalogService,
partitionStorages,
catalogVersion
);
- return snapshotMetaAt(
- max(partitionStorageWithMaxAppliedIndex.lastAppliedIndex(),
txState.lastAppliedIndex()),
- max(partitionStorageWithMaxAppliedIndex.lastAppliedTerm(),
txState.lastAppliedTerm()),
- config,
- catalogVersion,
- nextRowIdToBuildByIndexId,
- partitionStorageWithMaxAppliedIndex.leaseStartTime(),
- partitionStorageWithMaxAppliedIndex.primaryReplicaNodeId(),
- partitionStorageWithMaxAppliedIndex.primaryReplicaNodeName()
- );
+ PartitionMvStorageAccess partitionStorageWithMaxAppliedIndex =
partitionStorages.stream()
+ .max(comparingLong(PartitionMvStorageAccess::lastAppliedIndex))
+ .orElse(null);
+
+ if (partitionStorageWithMaxAppliedIndex == null
+ || txState.lastAppliedIndex() >
partitionStorageWithMaxAppliedIndex.lastAppliedIndex()) {
+ RaftGroupConfiguration config =
txState.committedGroupConfiguration();
+
+ assert config != null : "Configuration should never be null when
installing a snapshot";
+
+ LeaseInfo leaseInfo = txState.leaseInfo();
+
+ return snapshotMetaAt(
+ txState.lastAppliedIndex(),
+ txState.lastAppliedTerm(),
+ config,
+ catalogVersion,
+ nextRowIdToBuildByIndexId,
+ leaseInfo == null ? 0 : leaseInfo.leaseStartTime(),
+ leaseInfo == null ? null :
leaseInfo.primaryReplicaNodeId(),
+ leaseInfo == null ? null :
leaseInfo.primaryReplicaNodeName()
+ );
+ } else {
+ RaftGroupConfiguration config =
partitionStorageWithMaxAppliedIndex.committedGroupConfiguration();
+
+ assert config != null : "Configuration should never be null when
installing a snapshot";
+
+ return snapshotMetaAt(
+ partitionStorageWithMaxAppliedIndex.lastAppliedIndex(),
+ partitionStorageWithMaxAppliedIndex.lastAppliedTerm(),
+ config,
+ catalogVersion,
+ nextRowIdToBuildByIndexId,
+ partitionStorageWithMaxAppliedIndex.leaseStartTime(),
+ partitionStorageWithMaxAppliedIndex.primaryReplicaNodeId(),
+
partitionStorageWithMaxAppliedIndex.primaryReplicaNodeName()
+ );
+ }
}
private List<PartitionMvStorageAccess> freezePartitionStorages(int
catalogVersion) {
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
index 205eac57627..1c58f526041 100644
---
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
@@ -17,19 +17,34 @@
package org.apache.ignite.internal.partition.replicator.raft;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Answers.RETURNS_DEEP_STUBS;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import java.nio.file.Path;
+import java.util.List;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import
org.apache.ignite.internal.partition.replicator.network.command.UpdateMinimumActiveTxBeginTimeCommand;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartitionKey;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.PartitionSnapshots;
+import org.apache.ignite.internal.raft.RaftGroupConfiguration;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.service.CommandClosure;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.ZonePartitionId;
+import
org.apache.ignite.internal.replicator.message.PrimaryReplicaChangeCommand;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.MvPartitionStorage.Locker;
+import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
import
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
@@ -38,12 +53,15 @@ import
org.apache.ignite.internal.table.distributed.raft.snapshot.SnapshotAwareP
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
+import
org.apache.ignite.internal.tx.storage.state.test.TestTxStatePartitionStorage;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.internal.util.SafeTimeValuesTracker;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
+import org.mockito.Spy;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
@@ -56,14 +74,17 @@ class ZonePartitionRaftListenerTest extends
BaseIgniteAbstractTest {
private ZonePartitionRaftListener listener;
- @Mock
+ @Mock(answer = RETURNS_DEEP_STUBS)
private OutgoingSnapshotsManager outgoingSnapshotsManager;
@Mock
private TxManager txManager;
+ @Spy
+ private TxStatePartitionStorage txStatePartitionStorage = new
TestTxStatePartitionStorage();
+
@Mock
- private TxStatePartitionStorage txStatePartitionStorage;
+ private MvPartitionStorage mvPartitionStorage;
@BeforeEach
void setUp() {
@@ -77,6 +98,11 @@ class ZonePartitionRaftListenerTest extends
BaseIgniteAbstractTest {
);
}
+ @AfterEach
+ void tearDown() {
+ txStatePartitionStorage.close();
+ }
+
@Test
void closesOngoingSnapshots(@Mock PartitionSnapshots partitionSnapshots) {
var tablePartition1 = new TablePartitionId(1, PARTITION_ID);
@@ -90,12 +116,84 @@ class ZonePartitionRaftListenerTest extends
BaseIgniteAbstractTest {
verify(outgoingSnapshotsManager).cleanupOutgoingSnapshots(ZONE_PARTITION_KEY);
}
+ @Test
+ void savesIndexAndTermOnSnapshotSave(
+ @Mock CommandClosure<WriteCommand> writeCommandClosure,
+ @Mock UpdateMinimumActiveTxBeginTimeCommand command
+ ) {
+ when(writeCommandClosure.command()).thenReturn(command);
+ when(writeCommandClosure.index()).thenReturn(25L);
+ when(writeCommandClosure.term()).thenReturn(42L);
+
+ listener.onWrite(List.of(writeCommandClosure).iterator());
+
+ var future = new CompletableFuture<Void>();
+
+ listener.onSnapshotSave(Path.of("foo"), throwable -> {
+ if (throwable == null) {
+ future.complete(null);
+ } else {
+ future.completeExceptionally(throwable);
+ }
+ });
+
+ assertThat(future, willCompleteSuccessfully());
+
+ verify(txStatePartitionStorage).lastApplied(25L, 42L);
+ }
+
+ @Test
+ void loadsIndexAndTermOnSnapshotLoad() {
+ listener.onSnapshotLoad(Path.of("foo"));
+
+ verify(txStatePartitionStorage).lastAppliedIndex();
+ verify(txStatePartitionStorage).lastAppliedTerm();
+ }
+
+ @Test
+ void propagatesRaftMetaToPartitionListeners(
+ @Mock CommandClosure<WriteCommand> writeCommandClosure,
+ @Mock PrimaryReplicaChangeCommand command,
+ @Mock RaftGroupConfiguration raftGroupConfiguration,
+ @Mock Locker locker
+ ) {
+ when(writeCommandClosure.command()).thenReturn(command);
+ when(writeCommandClosure.index()).thenReturn(1L);
+ when(writeCommandClosure.term()).thenReturn(1L);
+
+ UUID id = UUID.randomUUID();
+
+ when(command.leaseStartTime()).thenReturn(123L);
+ when(command.primaryReplicaNodeId()).thenReturn(id);
+ when(command.primaryReplicaNodeName()).thenReturn("foo");
+
+ when(mvPartitionStorage.runConsistently(any())).thenAnswer(invocation
-> {
+ WriteClosure<?> closure = invocation.getArgument(0);
+
+ return closure.execute(locker);
+ });
+
+ listener.onWrite(List.of(writeCommandClosure).iterator());
+
+ listener.onConfigurationCommitted(raftGroupConfiguration, 2L, 3L);
+
+ var tablePartitionId = new TablePartitionId(1, PARTITION_ID);
+
+ PartitionListener partitionListener =
partitionListener(tablePartitionId);
+
+ listener.addTableProcessor(tablePartitionId, partitionListener);
+
+ verify(mvPartitionStorage).lastApplied(2L, 3L);
+ verify(mvPartitionStorage).committedGroupConfiguration(any());
+ verify(mvPartitionStorage).updateLease(123L, id, "foo");
+ }
+
private PartitionListener partitionListener(TablePartitionId
tablePartitionId) {
return new PartitionListener(
txManager,
new SnapshotAwarePartitionDataStorage(
tablePartitionId.tableId(),
- mock(MvPartitionStorage.class),
+ mvPartitionStorage,
outgoingSnapshotsManager,
ZONE_PARTITION_KEY
),
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java
index 4f4ce455d19..d750fb1e16f 100644
---
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java
@@ -59,7 +59,6 @@ import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
@@ -155,14 +154,18 @@ class OutgoingSnapshotMvDataStreamingTest extends
BaseIgniteAbstractTest {
rowIdOutOfOrder = id;
}
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-24517")
@Test
- void sendsEmptyResponseForEmptyMvData() {
+ void sendsEmptyResponseForEmptyMvData(
+ @Mock PartitionTxStateAccess txStateAccess,
+ @Mock RaftGroupConfiguration raftGroupConfiguration
+ ) {
+
when(txStateAccess.committedGroupConfiguration()).thenReturn(raftGroupConfiguration);
+
snapshot = new OutgoingSnapshot(
UUID.randomUUID(),
partitionKey,
Int2ObjectMaps.emptyMap(),
- mock(PartitionTxStateAccess.class),
+ txStateAccess,
catalogService
);
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java
index 154ec245855..cc0792be325 100644
---
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java
@@ -71,7 +71,8 @@ public class OutgoingSnapshotReaderTest extends
BaseIgniteAbstractTest {
when(partitionAccess1.tableId()).thenReturn(TABLE_ID_1);
when(partitionAccess2.tableId()).thenReturn(TABLE_ID_2);
-
when(partitionAccess2.committedGroupConfiguration()).thenReturn(raftGroupConfiguration);
+
+
when(txStateAccess.committedGroupConfiguration()).thenReturn(raftGroupConfiguration);
doAnswer(invocation -> {
OutgoingSnapshot snapshot = invocation.getArgument(1);
@@ -105,12 +106,12 @@ public class OutgoingSnapshotReaderTest extends
BaseIgniteAbstractTest {
when(txStateAccess.lastAppliedTerm()).thenReturn(1L);
lenient().when(partitionAccess1.lastAppliedTerm()).thenReturn(2L);
- when(partitionAccess2.lastAppliedTerm()).thenReturn(3L);
+ lenient().when(partitionAccess2.lastAppliedTerm()).thenReturn(3L);
try (var reader = new OutgoingSnapshotReader(snapshotStorage)) {
SnapshotMeta meta = reader.load();
assertEquals(10L, meta.lastIncludedIndex());
- assertEquals(3L, meta.lastIncludedTerm());
+ assertEquals(1L, meta.lastIncludedTerm());
}
}
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 577d486aa31..80102b5c2bf 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.table.distributed.raft;
+import static java.lang.Math.max;
import static java.util.Objects.requireNonNull;
import static
org.apache.ignite.internal.hlc.HybridTimestamp.NULL_HYBRID_TIMESTAMP;
import static
org.apache.ignite.internal.lang.IgniteSystemProperties.enabledColocation;
@@ -80,6 +81,7 @@ import org.apache.ignite.internal.storage.BinaryRowAndRowId;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.MvPartitionStorage.Locker;
import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.lease.LeaseInfo;
import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
import org.apache.ignite.internal.table.distributed.index.IndexMeta;
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
@@ -168,7 +170,7 @@ public class PartitionListener implements
RaftGroupListener, RaftTableProcessor
this.indexMetaStorage = indexMetaStorage;
this.localNodeId = localNodeId;
- onSnapshotSaveHandler = new
OnSnapshotSaveHandler(txStatePartitionStorage, storageIndexTracker);
+ onSnapshotSaveHandler = new
OnSnapshotSaveHandler(txStatePartitionStorage);
// RAFT command handlers initialization.
TablePartitionId tablePartitionId = new
TablePartitionId(storage.tableId(), storage.partitionId());
@@ -302,6 +304,38 @@ public class PartitionListener implements
RaftGroupListener, RaftTableProcessor
return result;
}
+ @Override
+ public void initialize(
+ @Nullable RaftGroupConfiguration config,
+ @Nullable LeaseInfo leaseInfo,
+ long lastAppliedIndex,
+ long lastAppliedTerm
+ ) {
+ if (lastAppliedIndex <= storage.lastAppliedIndex()) {
+ return;
+ }
+
+ storage.runConsistently(locker -> {
+ if (config != null) {
+ setCurrentGroupTopology(config);
+
+ storage.committedGroupConfiguration(config);
+ }
+
+ if (leaseInfo != null) {
+ storage.updateLease(leaseInfo.leaseStartTime(),
leaseInfo.primaryReplicaNodeId(), leaseInfo.primaryReplicaNodeName());
+ }
+
+ storage.lastApplied(lastAppliedIndex, lastAppliedTerm);
+
+ return null;
+ });
+
+ // Initiate a flush but do not wait for it. This is needed to save the
initialization information as soon as possible (to make
+ // recovery more efficient), without blocking the caller thread.
+ storage.flush();
+ }
+
/**
* Handler for the {@link UpdateCommand}.
*
@@ -488,8 +522,7 @@ public class PartitionListener implements
RaftGroupListener, RaftTableProcessor
long lastAppliedIndex,
long lastAppliedTerm
) {
- currentGroupTopology = new HashSet<>(config.peers());
- currentGroupTopology.addAll(config.learners());
+ setCurrentGroupTopology(config);
// Skips the update because the storage has already recorded it.
if (config.index() <= storage.lastAppliedIndex()) {
@@ -514,9 +547,19 @@ public class PartitionListener implements
RaftGroupListener, RaftTableProcessor
}
}
+ private void setCurrentGroupTopology(RaftGroupConfiguration config) {
+ var currentGroupTopology = new HashSet<>(config.peers());
+ currentGroupTopology.addAll(config.learners());
+
+ this.currentGroupTopology = currentGroupTopology;
+ }
+
@Override
public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
- onSnapshotSaveHandler.onSnapshotSave(List.of(this))
+ long maxAppliedIndex = max(storage.lastAppliedIndex(),
txStatePartitionStorage.lastAppliedIndex());
+ long maxAppliedTerm = max(storage.lastAppliedTerm(),
txStatePartitionStorage.lastAppliedTerm());
+
+ onSnapshotSaveHandler.onSnapshotSave(maxAppliedIndex, maxAppliedTerm,
List.of(this))
.whenComplete((unused, throwable) ->
doneClo.accept(throwable));
}
@@ -542,6 +585,10 @@ public class PartitionListener implements
RaftGroupListener, RaftTableProcessor
@Override
public void lastApplied(long lastAppliedIndex, long lastAppliedTerm) {
+ if (lastAppliedIndex <= storage.lastAppliedIndex()) {
+ return;
+ }
+
storage.runConsistently(locker -> {
storage.lastApplied(lastAppliedIndex, lastAppliedTerm);