This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 7d89cdbcdda IGNITE-24517: Persist Raft meta for empty zones (#5237)
7d89cdbcdda is described below

commit 7d89cdbcddab4262e6f163968c09d49caea9c25f
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Fri Feb 28 08:30:47 2025 +0200

    IGNITE-24517: Persist Raft meta for empty zones (#5237)
---
 .../replicator/raft/OnSnapshotSaveHandler.java     |  49 ++-------
 .../replicator/raft/RaftTableProcessor.java        |  16 +++
 .../replicator/raft/ZonePartitionRaftListener.java | 116 ++++++++++++++-------
 .../raft/snapshot/outgoing/OutgoingSnapshot.java   |  60 +++++++----
 .../raft/ZonePartitionRaftListenerTest.java        | 104 +++++++++++++++++-
 .../OutgoingSnapshotMvDataStreamingTest.java       |  11 +-
 .../outgoing/OutgoingSnapshotReaderTest.java       |   7 +-
 .../table/distributed/raft/PartitionListener.java  |  55 +++++++++-
 8 files changed, 305 insertions(+), 113 deletions(-)

diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/OnSnapshotSaveHandler.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/OnSnapshotSaveHandler.java
index b383e2894e8..6962bc9c7ae 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/OnSnapshotSaveHandler.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/OnSnapshotSaveHandler.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.partition.replicator.raft;
 
-import static java.lang.Math.max;
 import static java.util.concurrent.CompletableFuture.allOf;
 
 import java.util.Collection;
@@ -25,8 +24,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.stream.Stream;
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
 import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
-import org.apache.ignite.internal.util.PendingComparableValuesTracker;
-import org.apache.ignite.internal.util.TrackerClosedException;
 
 /**
  * Handler for the {@link RaftGroupListener#onSnapshotSave} event.
@@ -34,20 +31,18 @@ import 
org.apache.ignite.internal.util.TrackerClosedException;
 public class OnSnapshotSaveHandler {
     private final TxStatePartitionStorage txStatePartitionStorage;
 
-    private final PendingComparableValuesTracker<Long, Void> 
storageIndexTracker;
-
-    public OnSnapshotSaveHandler(
-            TxStatePartitionStorage txStatePartitionStorage,
-            PendingComparableValuesTracker<Long, Void> storageIndexTracker
-    ) {
+    public OnSnapshotSaveHandler(TxStatePartitionStorage 
txStatePartitionStorage) {
         this.txStatePartitionStorage = txStatePartitionStorage;
-        this.storageIndexTracker = storageIndexTracker;
     }
 
     /**
      * Called when {@link RaftGroupListener#onSnapshotSave} is triggered.
      */
-    public CompletableFuture<Void> 
onSnapshotSave(Collection<RaftTableProcessor> tableProcessors) {
+    public CompletableFuture<Void> onSnapshotSave(
+            long lastAppliedIndex,
+            long lastAppliedTerm,
+            Collection<RaftTableProcessor> tableProcessors
+    ) {
         // The max index here is required for local recovery and a possible 
scenario
         // of false node failure when we actually have all required data. This 
might happen because we use the minimal index
         // among storages on a node restart.
@@ -60,26 +55,9 @@ public class OnSnapshotSaveHandler {
         //      4) When we try to restore data starting from the minimal 
lastAppliedIndex, we come to the situation
         //         that a raft node doesn't have such data, because the 
truncation until the maximal lastAppliedIndex from 1) has happened.
         //      5) Node cannot finish local recovery.
+        tableProcessors.forEach(processor -> 
processor.lastApplied(lastAppliedIndex, lastAppliedTerm));
 
-        long maxPartitionLastAppliedIndex = tableProcessors.stream()
-                .mapToLong(RaftTableProcessor::lastAppliedIndex)
-                .max()
-                .orElse(0);
-
-        long maxPartitionLastAppliedTerm = tableProcessors.stream()
-                .mapToLong(RaftTableProcessor::lastAppliedTerm)
-                .max()
-                .orElse(0);
-
-        long maxLastAppliedIndex = max(maxPartitionLastAppliedIndex, 
txStatePartitionStorage.lastAppliedIndex());
-
-        long maxLastAppliedTerm = max(maxPartitionLastAppliedTerm, 
txStatePartitionStorage.lastAppliedTerm());
-
-        tableProcessors.forEach(processor -> 
processor.lastApplied(maxLastAppliedIndex, maxLastAppliedTerm));
-
-        txStatePartitionStorage.lastApplied(maxLastAppliedIndex, 
maxLastAppliedTerm);
-
-        updateTrackerIgnoringTrackerClosedException(storageIndexTracker, 
maxLastAppliedIndex);
+        txStatePartitionStorage.lastApplied(lastAppliedIndex, lastAppliedTerm);
 
         Stream<CompletableFuture<?>> flushFutures = Stream.concat(
                 tableProcessors.stream().map(RaftTableProcessor::flushStorage),
@@ -88,15 +66,4 @@ public class OnSnapshotSaveHandler {
 
         return allOf(flushFutures.toArray(CompletableFuture[]::new));
     }
-
-    private static <T extends Comparable<T>> void 
updateTrackerIgnoringTrackerClosedException(
-            PendingComparableValuesTracker<T, Void> tracker,
-            T newValue
-    ) {
-        try {
-            tracker.update(newValue, null);
-        } catch (TrackerClosedException ignored) {
-            // No-op.
-        }
-    }
 }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/RaftTableProcessor.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/RaftTableProcessor.java
index 4f2f20844cf..5f4544966b0 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/RaftTableProcessor.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/RaftTableProcessor.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.raft.RaftGroupConfiguration;
 import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.storage.lease.LeaseInfo;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -54,6 +55,21 @@ public interface RaftTableProcessor {
             long lastAppliedTerm
     );
 
+    /**
+     * Sets the initial state of this processor when it gets added to a zone 
partition.
+     *
+     * @param config Initial Raft configuration.
+     * @param leaseInfo Initial lease information.
+     * @param lastAppliedIndex Current last applied index.
+     * @param lastAppliedTerm Current last applied term.
+     */
+    void initialize(
+            @Nullable RaftGroupConfiguration config,
+            @Nullable LeaseInfo leaseInfo,
+            long lastAppliedIndex,
+            long lastAppliedTerm
+    );
+
     /**
      * Returns the last applied Raft log index.
      */
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
index 116003e8e78..5e0cc97e72d 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.partition.replicator.raft;
 
+import static java.lang.Math.max;
 import static 
org.apache.ignite.internal.tx.message.TxMessageGroup.VACUUM_TX_STATE_COMMAND;
 
 import java.io.Serializable;
@@ -44,6 +45,7 @@ import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.Pa
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.PartitionsSnapshots;
 import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.RaftGroupConfiguration;
+import org.apache.ignite.internal.raft.RaftGroupConfigurationConverter;
 import org.apache.ignite.internal.raft.ReadCommand;
 import org.apache.ignite.internal.raft.WriteCommand;
 import org.apache.ignite.internal.raft.service.CommandClosure;
@@ -52,6 +54,7 @@ import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
 import 
org.apache.ignite.internal.replicator.command.SafeTimePropagatingCommand;
 import 
org.apache.ignite.internal.replicator.message.PrimaryReplicaChangeCommand;
+import org.apache.ignite.internal.storage.lease.LeaseInfo;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.message.TxMessageGroup;
 import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
@@ -73,24 +76,35 @@ public class ZonePartitionRaftListener implements 
RaftGroupListener {
     /** Mapping table ID to table request processor. */
     private final Map<Integer, RaftTableProcessor> tableProcessors = new 
ConcurrentHashMap<>();
 
+    private final TxStatePartitionStorage txStateStorage;
+
     private final PartitionsSnapshots partitionsSnapshots;
 
     private final PartitionKey partitionKey;
 
     /**
-     * Latest committed configuration of the zone-wide Raft group.
+     * Last applied index across all table processors and {@link 
#txStateStorage}.
+     *
+     * <p>Multi-threaded access is guarded by {@link 
#tableProcessorsStateLock}.
+     */
+    private long lastAppliedIndex;
+
+    /**
+     * Last applied term across all table processors and {@link 
#txStateStorage}.
      *
-     * <p>Multi-threaded access is guarded by {@link 
#commitedConfigurationLock}.
+     * <p>Multi-threaded access is guarded by {@link 
#tableProcessorsStateLock}.
      */
-    private CommittedConfiguration currentCommitedConfiguration;
+    private long lastAppliedTerm;
 
-    private final Object commitedConfigurationLock = new Object();
+    private final Object tableProcessorsStateLock = new Object();
 
     private final OnSnapshotSaveHandler onSnapshotSaveHandler;
 
     // Raft command handlers.
     private final CommandHandlers commandHandlers;
 
+    private final RaftGroupConfigurationConverter 
raftGroupConfigurationConverter = new RaftGroupConfigurationConverter();
+
     /** Constructor. */
     public ZonePartitionRaftListener(
             ZonePartitionId zonePartitionId,
@@ -103,9 +117,10 @@ public class ZonePartitionRaftListener implements 
RaftGroupListener {
         this.safeTimeTracker = safeTimeTracker;
         this.storageIndexTracker = storageIndexTracker;
         this.partitionsSnapshots = partitionsSnapshots;
+        this.txStateStorage = txStatePartitionStorage;
         this.partitionKey = new ZonePartitionKey(zonePartitionId.zoneId(), 
zonePartitionId.partitionId());
 
-        onSnapshotSaveHandler = new 
OnSnapshotSaveHandler(txStatePartitionStorage, storageIndexTracker);
+        onSnapshotSaveHandler = new 
OnSnapshotSaveHandler(txStatePartitionStorage);
 
         // RAFT command handlers initialization.
         this.commandHandlers = new CommandHandlers.Builder()
@@ -174,10 +189,27 @@ public class ZonePartitionRaftListener implements 
RaftGroupListener {
         partitionSnapshots().acquireReadLock();
 
         try {
-            boolean crossTableCommand = command instanceof 
PrimaryReplicaChangeCommand
-                    || command instanceof 
UpdateMinimumActiveTxBeginTimeCommand;
+            boolean crossTableCommand = command instanceof 
UpdateMinimumActiveTxBeginTimeCommand;
+
+            if (command instanceof PrimaryReplicaChangeCommand) {
+                // This is a hack for tests, this command is not issued in 
production because no zone-wide placement driver exists yet.
+                // FIXME: https://issues.apache.org/jira/browse/IGNITE-24374
+                result = processCrossTableProcessorsCommand(command, 
commandIndex, commandTerm, safeTimestamp);
 
-            if (command instanceof TableAwareCommand) {
+                if (commandIndex > txStateStorage.lastAppliedIndex()) {
+                    var primaryReplicaChangeCommand = 
(PrimaryReplicaChangeCommand) command;
+
+                    var leaseInfo = new LeaseInfo(
+                            primaryReplicaChangeCommand.leaseStartTime(),
+                            primaryReplicaChangeCommand.primaryReplicaNodeId(),
+                            
primaryReplicaChangeCommand.primaryReplicaNodeName()
+                    );
+
+                    txStateStorage.leaseInfo(leaseInfo, commandIndex, 
commandTerm);
+
+                    result = new IgniteBiTuple<>(null, true);
+                }
+            } else if (command instanceof TableAwareCommand) {
                 result = processTableAwareCommand(
                         ((TableAwareCommand) command).tableId(),
                         command,
@@ -186,9 +218,6 @@ public class ZonePartitionRaftListener implements 
RaftGroupListener {
                         safeTimestamp
                 );
             } else if (crossTableCommand) {
-                // PrimaryReplicaChangeCommand
-                // This is a hack for tests, this command is not issued in 
production because no zone-wide placement driver exists yet.
-                // FIXME: https://issues.apache.org/jira/browse/IGNITE-24374
                 result = processCrossTableProcessorsCommand(command, 
commandIndex, commandTerm, safeTimestamp);
             } else {
                 AbstractCommandHandler<?> commandHandler =
@@ -211,6 +240,11 @@ public class ZonePartitionRaftListener implements 
RaftGroupListener {
 
                 
updateTrackerIgnoringTrackerClosedException(storageIndexTracker, clo.index());
             }
+
+            synchronized (tableProcessorsStateLock) {
+                lastAppliedIndex = max(lastAppliedIndex, commandIndex);
+                lastAppliedTerm = max(lastAppliedTerm, commandTerm);
+            }
         } finally {
             partitionSnapshots().releaseReadLock();
         }
@@ -270,22 +304,40 @@ public class ZonePartitionRaftListener implements 
RaftGroupListener {
 
     @Override
     public void onConfigurationCommitted(RaftGroupConfiguration config, long 
lastAppliedIndex, long lastAppliedTerm) {
-        synchronized (commitedConfigurationLock) {
-            currentCommitedConfiguration = new CommittedConfiguration(config, 
lastAppliedIndex, lastAppliedTerm);
-
+        synchronized (tableProcessorsStateLock) {
             tableProcessors.values()
                     .forEach(listener -> 
listener.onConfigurationCommitted(config, lastAppliedIndex, lastAppliedTerm));
+
+            byte[] configBytes = 
raftGroupConfigurationConverter.toBytes(config);
+
+            txStateStorage.committedGroupConfiguration(configBytes, 
lastAppliedIndex, lastAppliedTerm);
+
+            this.lastAppliedIndex = max(this.lastAppliedIndex, 
lastAppliedIndex);
+            this.lastAppliedTerm = max(this.lastAppliedTerm, lastAppliedTerm);
         }
     }
 
     @Override
     public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
-        onSnapshotSaveHandler.onSnapshotSave(tableProcessors.values())
+        long lastAppliedIndex;
+        long lastAppliedTerm;
+
+        synchronized (tableProcessorsStateLock) {
+            lastAppliedIndex = this.lastAppliedIndex;
+            lastAppliedTerm = this.lastAppliedTerm;
+        }
+
+        onSnapshotSaveHandler.onSnapshotSave(lastAppliedIndex, 
lastAppliedTerm, tableProcessors.values())
                 .whenComplete((unused, throwable) -> 
doneClo.accept(throwable));
     }
 
     @Override
     public boolean onSnapshotLoad(Path path) {
+        synchronized (tableProcessorsStateLock) {
+            lastAppliedIndex = max(lastAppliedIndex, 
txStateStorage.lastAppliedIndex());
+            lastAppliedTerm = max(lastAppliedTerm, 
txStateStorage.lastAppliedTerm());
+        }
+
         return true;
     }
 
@@ -298,16 +350,20 @@ public class ZonePartitionRaftListener implements 
RaftGroupListener {
 
     /**
      * Adds a given Table Partition-level Raft processor to the set of managed 
processor.
+     *
+     * <p>Callers of this method must ensure that no commands are issued to 
this processor before this method returns.
+     *
+     * <p>During table creation this is achieved by executing this method 
while processing a Catalog event, which blocks bumping the Catalog
+     * version. Until the Catalog version is updated, commands targeting the 
table being added will be rejected by an interceptor that
+     * requires the Catalog version to be equal to a particular value.
      */
     public void addTableProcessor(TablePartitionId tablePartitionId, 
RaftTableProcessor processor) {
-        synchronized (commitedConfigurationLock) {
-            if (currentCommitedConfiguration != null) {
-                processor.onConfigurationCommitted(
-                        currentCommitedConfiguration.configuration,
-                        currentCommitedConfiguration.lastAppliedIndex,
-                        currentCommitedConfiguration.lastAppliedTerm
-                );
-            }
+        synchronized (tableProcessorsStateLock) {
+            RaftGroupConfiguration configuration = 
raftGroupConfigurationConverter.fromBytes(txStateStorage.committedGroupConfiguration());
+
+            LeaseInfo leaseInfo = txStateStorage.leaseInfo();
+
+            processor.initialize(configuration, leaseInfo, lastAppliedIndex, 
lastAppliedTerm);
 
             RaftTableProcessor prev = 
tableProcessors.put(tablePartitionId.tableId(), processor);
 
@@ -333,18 +389,4 @@ public class ZonePartitionRaftListener implements 
RaftGroupListener {
     private PartitionSnapshots partitionSnapshots() {
         return partitionsSnapshots.partitionSnapshots(partitionKey);
     }
-
-    private static class CommittedConfiguration {
-        final RaftGroupConfiguration configuration;
-
-        final long lastAppliedIndex;
-
-        final long lastAppliedTerm;
-
-        CommittedConfiguration(RaftGroupConfiguration configuration, long 
lastAppliedIndex, long lastAppliedTerm) {
-            this.configuration = configuration;
-            this.lastAppliedIndex = lastAppliedIndex;
-            this.lastAppliedTerm = lastAppliedTerm;
-        }
-    }
 }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java
index f9fc8b9334f..f87aa24aa4c 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing;
 
-import static java.lang.Math.max;
 import static java.util.Comparator.comparingInt;
 import static java.util.Comparator.comparingLong;
 import static java.util.stream.Collectors.toList;
@@ -62,6 +61,7 @@ import 
org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.lease.LeaseInfo;
 import org.apache.ignite.internal.tx.TxMeta;
 import org.apache.ignite.internal.tx.message.TxMessagesFactory;
 import org.apache.ignite.internal.tx.message.TxMetaMessage;
@@ -201,32 +201,50 @@ public class OutgoingSnapshot {
     }
 
     private PartitionSnapshotMeta takeSnapshotMeta(int catalogVersion, 
Collection<PartitionMvStorageAccess> partitionStorages) {
-        // TODO: partitionsByTableId will be empty for zones without tables, 
need another way to get meta in that case,
-        //  see https://issues.apache.org/jira/browse/IGNITE-24517
-        PartitionMvStorageAccess partitionStorageWithMaxAppliedIndex = 
partitionStorages.stream()
-                .max(comparingLong(PartitionMvStorageAccess::lastAppliedIndex))
-                .orElseThrow();
-
-        RaftGroupConfiguration config = 
partitionStorageWithMaxAppliedIndex.committedGroupConfiguration();
-
-        assert config != null : "Configuration should never be null when 
installing a snapshot";
-
         Map<Integer, UUID> nextRowIdToBuildByIndexId = 
collectNextRowIdToBuildIndexes(
                 catalogService,
                 partitionStorages,
                 catalogVersion
         );
 
-        return snapshotMetaAt(
-                max(partitionStorageWithMaxAppliedIndex.lastAppliedIndex(), 
txState.lastAppliedIndex()),
-                max(partitionStorageWithMaxAppliedIndex.lastAppliedTerm(), 
txState.lastAppliedTerm()),
-                config,
-                catalogVersion,
-                nextRowIdToBuildByIndexId,
-                partitionStorageWithMaxAppliedIndex.leaseStartTime(),
-                partitionStorageWithMaxAppliedIndex.primaryReplicaNodeId(),
-                partitionStorageWithMaxAppliedIndex.primaryReplicaNodeName()
-        );
+        PartitionMvStorageAccess partitionStorageWithMaxAppliedIndex = 
partitionStorages.stream()
+                .max(comparingLong(PartitionMvStorageAccess::lastAppliedIndex))
+                .orElse(null);
+
+        if (partitionStorageWithMaxAppliedIndex == null
+                || txState.lastAppliedIndex() > 
partitionStorageWithMaxAppliedIndex.lastAppliedIndex()) {
+            RaftGroupConfiguration config = 
txState.committedGroupConfiguration();
+
+            assert config != null : "Configuration should never be null when 
installing a snapshot";
+
+            LeaseInfo leaseInfo = txState.leaseInfo();
+
+            return snapshotMetaAt(
+                    txState.lastAppliedIndex(),
+                    txState.lastAppliedTerm(),
+                    config,
+                    catalogVersion,
+                    nextRowIdToBuildByIndexId,
+                    leaseInfo == null ? 0 : leaseInfo.leaseStartTime(),
+                    leaseInfo == null ? null : 
leaseInfo.primaryReplicaNodeId(),
+                    leaseInfo == null ? null : 
leaseInfo.primaryReplicaNodeName()
+            );
+        } else {
+            RaftGroupConfiguration config = 
partitionStorageWithMaxAppliedIndex.committedGroupConfiguration();
+
+            assert config != null : "Configuration should never be null when 
installing a snapshot";
+
+            return snapshotMetaAt(
+                    partitionStorageWithMaxAppliedIndex.lastAppliedIndex(),
+                    partitionStorageWithMaxAppliedIndex.lastAppliedTerm(),
+                    config,
+                    catalogVersion,
+                    nextRowIdToBuildByIndexId,
+                    partitionStorageWithMaxAppliedIndex.leaseStartTime(),
+                    partitionStorageWithMaxAppliedIndex.primaryReplicaNodeId(),
+                    
partitionStorageWithMaxAppliedIndex.primaryReplicaNodeName()
+            );
+        }
     }
 
     private List<PartitionMvStorageAccess> freezePartitionStorages(int 
catalogVersion) {
diff --git 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
index 205eac57627..1c58f526041 100644
--- 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
+++ 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
@@ -17,19 +17,34 @@
 
 package org.apache.ignite.internal.partition.replicator.raft;
 
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Answers.RETURNS_DEEP_STUBS;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
+import java.nio.file.Path;
+import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateMinimumActiveTxBeginTimeCommand;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartitionKey;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.PartitionSnapshots;
+import org.apache.ignite.internal.raft.RaftGroupConfiguration;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.service.CommandClosure;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
+import 
org.apache.ignite.internal.replicator.message.PrimaryReplicaChangeCommand;
 import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.MvPartitionStorage.Locker;
+import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
 import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
 import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
 import 
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
@@ -38,12 +53,15 @@ import 
org.apache.ignite.internal.table.distributed.raft.snapshot.SnapshotAwareP
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
+import 
org.apache.ignite.internal.tx.storage.state.test.TestTxStatePartitionStorage;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.internal.util.SafeTimeValuesTracker;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
+import org.mockito.Spy;
 import org.mockito.junit.jupiter.MockitoExtension;
 
 @ExtendWith(MockitoExtension.class)
@@ -56,14 +74,17 @@ class ZonePartitionRaftListenerTest extends 
BaseIgniteAbstractTest {
 
     private ZonePartitionRaftListener listener;
 
-    @Mock
+    @Mock(answer = RETURNS_DEEP_STUBS)
     private OutgoingSnapshotsManager outgoingSnapshotsManager;
 
     @Mock
     private TxManager txManager;
 
+    @Spy
+    private TxStatePartitionStorage txStatePartitionStorage = new 
TestTxStatePartitionStorage();
+
     @Mock
-    private TxStatePartitionStorage txStatePartitionStorage;
+    private MvPartitionStorage mvPartitionStorage;
 
     @BeforeEach
     void setUp() {
@@ -77,6 +98,11 @@ class ZonePartitionRaftListenerTest extends 
BaseIgniteAbstractTest {
         );
     }
 
+    @AfterEach
+    void tearDown() {
+        txStatePartitionStorage.close();
+    }
+
     @Test
     void closesOngoingSnapshots(@Mock PartitionSnapshots partitionSnapshots) {
         var tablePartition1 = new TablePartitionId(1, PARTITION_ID);
@@ -90,12 +116,84 @@ class ZonePartitionRaftListenerTest extends 
BaseIgniteAbstractTest {
         
verify(outgoingSnapshotsManager).cleanupOutgoingSnapshots(ZONE_PARTITION_KEY);
     }
 
+    @Test
+    void savesIndexAndTermOnSnapshotSave(
+            @Mock CommandClosure<WriteCommand> writeCommandClosure,
+            @Mock UpdateMinimumActiveTxBeginTimeCommand command
+    ) {
+        when(writeCommandClosure.command()).thenReturn(command);
+        when(writeCommandClosure.index()).thenReturn(25L);
+        when(writeCommandClosure.term()).thenReturn(42L);
+
+        listener.onWrite(List.of(writeCommandClosure).iterator());
+
+        var future = new CompletableFuture<Void>();
+
+        listener.onSnapshotSave(Path.of("foo"), throwable -> {
+            if (throwable == null) {
+                future.complete(null);
+            } else {
+                future.completeExceptionally(throwable);
+            }
+        });
+
+        assertThat(future, willCompleteSuccessfully());
+
+        verify(txStatePartitionStorage).lastApplied(25L, 42L);
+    }
+
+    @Test
+    void loadsIndexAndTermOnSnapshotLoad() {
+        listener.onSnapshotLoad(Path.of("foo"));
+
+        verify(txStatePartitionStorage).lastAppliedIndex();
+        verify(txStatePartitionStorage).lastAppliedTerm();
+    }
+
+    @Test
+    void propagatesRaftMetaToPartitionListeners(
+            @Mock CommandClosure<WriteCommand> writeCommandClosure,
+            @Mock PrimaryReplicaChangeCommand command,
+            @Mock RaftGroupConfiguration raftGroupConfiguration,
+            @Mock Locker locker
+    ) {
+        when(writeCommandClosure.command()).thenReturn(command);
+        when(writeCommandClosure.index()).thenReturn(1L);
+        when(writeCommandClosure.term()).thenReturn(1L);
+
+        UUID id = UUID.randomUUID();
+
+        when(command.leaseStartTime()).thenReturn(123L);
+        when(command.primaryReplicaNodeId()).thenReturn(id);
+        when(command.primaryReplicaNodeName()).thenReturn("foo");
+
+        when(mvPartitionStorage.runConsistently(any())).thenAnswer(invocation 
-> {
+            WriteClosure<?> closure = invocation.getArgument(0);
+
+            return closure.execute(locker);
+        });
+
+        listener.onWrite(List.of(writeCommandClosure).iterator());
+
+        listener.onConfigurationCommitted(raftGroupConfiguration, 2L, 3L);
+
+        var tablePartitionId = new TablePartitionId(1, PARTITION_ID);
+
+        PartitionListener partitionListener = 
partitionListener(tablePartitionId);
+
+        listener.addTableProcessor(tablePartitionId, partitionListener);
+
+        verify(mvPartitionStorage).lastApplied(2L, 3L);
+        verify(mvPartitionStorage).committedGroupConfiguration(any());
+        verify(mvPartitionStorage).updateLease(123L, id, "foo");
+    }
+
     private PartitionListener partitionListener(TablePartitionId 
tablePartitionId) {
         return new PartitionListener(
                 txManager,
                 new SnapshotAwarePartitionDataStorage(
                         tablePartitionId.tableId(),
-                        mock(MvPartitionStorage.class),
+                        mvPartitionStorage,
                         outgoingSnapshotsManager,
                         ZONE_PARTITION_KEY
                 ),
diff --git 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java
 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java
index 4f4ce455d19..d750fb1e16f 100644
--- 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java
+++ 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java
@@ -59,7 +59,6 @@ import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
@@ -155,14 +154,18 @@ class OutgoingSnapshotMvDataStreamingTest extends 
BaseIgniteAbstractTest {
         rowIdOutOfOrder = id;
     }
 
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-24517";)
     @Test
-    void sendsEmptyResponseForEmptyMvData() {
+    void sendsEmptyResponseForEmptyMvData(
+            @Mock PartitionTxStateAccess txStateAccess,
+            @Mock RaftGroupConfiguration raftGroupConfiguration
+    ) {
+        
when(txStateAccess.committedGroupConfiguration()).thenReturn(raftGroupConfiguration);
+
         snapshot = new OutgoingSnapshot(
                 UUID.randomUUID(),
                 partitionKey,
                 Int2ObjectMaps.emptyMap(),
-                mock(PartitionTxStateAccess.class),
+                txStateAccess,
                 catalogService
         );
 
diff --git 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java
 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java
index 154ec245855..cc0792be325 100644
--- 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java
+++ 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java
@@ -71,7 +71,8 @@ public class OutgoingSnapshotReaderTest extends 
BaseIgniteAbstractTest {
 
         when(partitionAccess1.tableId()).thenReturn(TABLE_ID_1);
         when(partitionAccess2.tableId()).thenReturn(TABLE_ID_2);
-        
when(partitionAccess2.committedGroupConfiguration()).thenReturn(raftGroupConfiguration);
+
+        
when(txStateAccess.committedGroupConfiguration()).thenReturn(raftGroupConfiguration);
 
         doAnswer(invocation -> {
             OutgoingSnapshot snapshot = invocation.getArgument(1);
@@ -105,12 +106,12 @@ public class OutgoingSnapshotReaderTest extends 
BaseIgniteAbstractTest {
 
         when(txStateAccess.lastAppliedTerm()).thenReturn(1L);
         lenient().when(partitionAccess1.lastAppliedTerm()).thenReturn(2L);
-        when(partitionAccess2.lastAppliedTerm()).thenReturn(3L);
+        lenient().when(partitionAccess2.lastAppliedTerm()).thenReturn(3L);
 
         try (var reader = new OutgoingSnapshotReader(snapshotStorage)) {
             SnapshotMeta meta = reader.load();
             assertEquals(10L, meta.lastIncludedIndex());
-            assertEquals(3L, meta.lastIncludedTerm());
+            assertEquals(1L, meta.lastIncludedTerm());
         }
     }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 577d486aa31..80102b5c2bf 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.table.distributed.raft;
 
+import static java.lang.Math.max;
 import static java.util.Objects.requireNonNull;
 import static 
org.apache.ignite.internal.hlc.HybridTimestamp.NULL_HYBRID_TIMESTAMP;
 import static 
org.apache.ignite.internal.lang.IgniteSystemProperties.enabledColocation;
@@ -80,6 +81,7 @@ import org.apache.ignite.internal.storage.BinaryRowAndRowId;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.MvPartitionStorage.Locker;
 import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.lease.LeaseInfo;
 import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
 import org.apache.ignite.internal.table.distributed.index.IndexMeta;
 import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
@@ -168,7 +170,7 @@ public class PartitionListener implements 
RaftGroupListener, RaftTableProcessor
         this.indexMetaStorage = indexMetaStorage;
         this.localNodeId = localNodeId;
 
-        onSnapshotSaveHandler = new 
OnSnapshotSaveHandler(txStatePartitionStorage, storageIndexTracker);
+        onSnapshotSaveHandler = new 
OnSnapshotSaveHandler(txStatePartitionStorage);
 
         // RAFT command handlers initialization.
         TablePartitionId tablePartitionId = new 
TablePartitionId(storage.tableId(), storage.partitionId());
@@ -302,6 +304,38 @@ public class PartitionListener implements 
RaftGroupListener, RaftTableProcessor
         return result;
     }
 
+    @Override
+    public void initialize(
+            @Nullable RaftGroupConfiguration config,
+            @Nullable LeaseInfo leaseInfo,
+            long lastAppliedIndex,
+            long lastAppliedTerm
+    ) {
+        if (lastAppliedIndex <= storage.lastAppliedIndex()) {
+            return;
+        }
+
+        storage.runConsistently(locker -> {
+            if (config != null) {
+                setCurrentGroupTopology(config);
+
+                storage.committedGroupConfiguration(config);
+            }
+
+            if (leaseInfo != null) {
+                storage.updateLease(leaseInfo.leaseStartTime(), 
leaseInfo.primaryReplicaNodeId(), leaseInfo.primaryReplicaNodeName());
+            }
+
+            storage.lastApplied(lastAppliedIndex, lastAppliedTerm);
+
+            return null;
+        });
+
+        // Initiate a flush but do not wait for it. This is needed to save the 
initialization information as soon as possible (to make
+        // recovery more efficient), without blocking the caller thread.
+        storage.flush();
+    }
+
     /**
      * Handler for the {@link UpdateCommand}.
      *
@@ -488,8 +522,7 @@ public class PartitionListener implements 
RaftGroupListener, RaftTableProcessor
             long lastAppliedIndex,
             long lastAppliedTerm
     ) {
-        currentGroupTopology = new HashSet<>(config.peers());
-        currentGroupTopology.addAll(config.learners());
+        setCurrentGroupTopology(config);
 
         // Skips the update because the storage has already recorded it.
         if (config.index() <= storage.lastAppliedIndex()) {
@@ -514,9 +547,19 @@ public class PartitionListener implements 
RaftGroupListener, RaftTableProcessor
         }
     }
 
+    private void setCurrentGroupTopology(RaftGroupConfiguration config) {
+        var currentGroupTopology = new HashSet<>(config.peers());
+        currentGroupTopology.addAll(config.learners());
+
+        this.currentGroupTopology = currentGroupTopology;
+    }
+
     @Override
     public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
-        onSnapshotSaveHandler.onSnapshotSave(List.of(this))
+        long maxAppliedIndex = max(storage.lastAppliedIndex(), 
txStatePartitionStorage.lastAppliedIndex());
+        long maxAppliedTerm = max(storage.lastAppliedTerm(), 
txStatePartitionStorage.lastAppliedTerm());
+
+        onSnapshotSaveHandler.onSnapshotSave(maxAppliedIndex, maxAppliedTerm, 
List.of(this))
                 .whenComplete((unused, throwable) -> 
doneClo.accept(throwable));
     }
 
@@ -542,6 +585,10 @@ public class PartitionListener implements 
RaftGroupListener, RaftTableProcessor
 
     @Override
     public void lastApplied(long lastAppliedIndex, long lastAppliedTerm) {
+        if (lastAppliedIndex <= storage.lastAppliedIndex()) {
+            return;
+        }
+
         storage.runConsistently(locker -> {
             storage.lastApplied(lastAppliedIndex, lastAppliedTerm);
 


Reply via email to