This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new d6d402d84e IGNITE-23379 Pass lease information with Raft snapshot of
partition (#4725)
d6d402d84e is described below
commit d6d402d84e4c3f0be35e5415e5ee6d0706acefe4
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Nov 15 17:46:50 2024 +0400
IGNITE-23379 Pass lease information with Raft snapshot of partition (#4725)
---
.../network/raft/PartitionSnapshotMeta.java | 10 +++
.../internal/storage/engine/MvPartitionMeta.java} | 36 ++++++----
.../internal/storage/engine/MvTableStorage.java | 12 +---
.../storage/engine/PrimitivePartitionMeta.java | 69 +++++++++++++++++++
.../engine/ThreadAssertingMvTableStorage.java | 9 +--
.../storage/AbstractMvTableStorageTest.java | 24 +++++--
.../storage/impl/TestMvPartitionStorage.java | 9 +--
.../internal/storage/impl/TestMvTableStorage.java | 20 ++++--
.../pagememory/AbstractPageMemoryTableStorage.java | 21 +++---
.../mv/AbstractPageMemoryMvPartitionStorage.java | 13 ++++
.../mv/PersistentPageMemoryMvPartitionStorage.java | 62 ++++++++++-------
.../mv/VolatilePageMemoryMvPartitionStorage.java | 17 ++++-
.../storage/rocksdb/RocksDbMvPartitionStorage.java | 74 +++++++++++++-------
.../storage/rocksdb/RocksDbTableStorage.java | 10 +--
.../raftsnapshot/ItTableRaftSnapshotsTest.java | 35 +++++++++-
.../distributed/raft/snapshot/PartitionAccess.java | 29 ++++++--
.../raft/snapshot/PartitionAccessImpl.java | 69 ++++++++++++-------
.../snapshot/PartitionSnapshotStorageFactory.java | 56 ++++++---------
.../raft/snapshot/RaftSnapshotPartitionMeta.java | 79 ++++++++++++++++++++++
.../snapshot/incoming/IncomingSnapshotCopier.java | 6 +-
.../raft/snapshot/outgoing/OutgoingSnapshot.java | 14 ++--
.../raft/snapshot/outgoing/SnapshotMetaUtils.java | 15 +++-
.../distributed/storage/NullMvTableStorage.java | 8 +--
.../table/distributed/TableManagerTest.java | 2 +
.../incoming/IncomingSnapshotCopierTest.java | 31 ++++++++-
.../outgoing/OutgoingSnapshotCommonTest.java | 9 ++-
.../snapshot/outgoing/SnapshotMetaUtilsTest.java | 25 ++++++-
27 files changed, 564 insertions(+), 200 deletions(-)
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/raft/PartitionSnapshotMeta.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/raft/PartitionSnapshotMeta.java
index 8d0258b767..ccfad0b819 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/raft/PartitionSnapshotMeta.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/raft/PartitionSnapshotMeta.java
@@ -19,6 +19,7 @@ package
org.apache.ignite.internal.partition.replicator.network.raft;
import java.util.Map;
import java.util.UUID;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.network.annotations.Transferable;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
@@ -32,4 +33,13 @@ public interface PartitionSnapshotMeta extends SnapshotMeta {
/** Row ID for which the index needs to be built per building index ID at
the time the snapshot meta was created. */
@Nullable Map<Integer, UUID> nextRowIdToBuildByIndexId();
+
+ /** Lease start time represented as {@link HybridTimestamp#longValue()}. */
+ long leaseStartTime();
+
+ /** ID of primary replica node ({@code null} if there is no primary). */
+ @Nullable UUID primaryReplicaNodeId();
+
+ /** Name of primary replica node ({@code null} if there is no primary). */
+ @Nullable String primaryReplicaNodeName();
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/raft/PartitionSnapshotMeta.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvPartitionMeta.java
similarity index 51%
copy from
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/raft/PartitionSnapshotMeta.java
copy to
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvPartitionMeta.java
index 8d0258b767..006f1b2811 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/raft/PartitionSnapshotMeta.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvPartitionMeta.java
@@ -15,21 +15,33 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.partition.replicator.network.raft;
+package org.apache.ignite.internal.storage.engine;
-import java.util.Map;
import java.util.UUID;
-import org.apache.ignite.internal.network.annotations.Transferable;
-import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
-import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
import org.jetbrains.annotations.Nullable;
-/** Partition Raft snapshot meta. */
-@Transferable(PartitionReplicationMessageGroup.PARTITION_SNAPSHOT_META)
-public interface PartitionSnapshotMeta extends SnapshotMeta {
- /** Minimum catalog version that is required for the snapshot to be
accepted by a follower. */
- int requiredCatalogVersion();
+/**
+ * Partition metadata for {@link MvTableStorage#finishRebalancePartition(int,
MvPartitionMeta)}.
+ */
+public class MvPartitionMeta extends PrimitivePartitionMeta {
+ private final byte[] groupConfig;
+
+ /** Constructor. */
+ public MvPartitionMeta(
+ long lastAppliedIndex,
+ long lastAppliedTerm,
+ byte[] groupConfig,
+ long leaseStartTime,
+ @Nullable UUID primaryReplicaNodeId,
+ @Nullable String primaryReplicaNodeName
+ ) {
+ super(lastAppliedIndex, lastAppliedTerm, leaseStartTime,
primaryReplicaNodeId, primaryReplicaNodeName);
+
+ this.groupConfig = groupConfig;
+ }
- /** Row ID for which the index needs to be built per building index ID at
the time the snapshot meta was created. */
- @Nullable Map<Integer, UUID> nextRowIdToBuildByIndexId();
+ /** Returns replication group config as bytes. */
+ public byte[] groupConfig() {
+ return groupConfig;
+ }
}
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
index bcb4f58b3a..5f149b2c6e 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
@@ -206,19 +206,13 @@ public interface MvTableStorage extends ManuallyCloseable
{
*
* <p>If rebalance has not started, then {@link StorageRebalanceException}
will be thrown.
*
- * @param lastAppliedIndex Last applied index.
- * @param lastAppliedTerm Last applied term.
- * @param groupConfig Replication protocol group configuration (byte
representation).
+ * @param partitionId ID of the partition.
+ * @param partitionMeta Metadata of the partition.
* @return Future of the finish rebalance for a multi-version partition
storage and its indexes.
* @throws IllegalArgumentException If Partition ID is out of bounds.
* @throws StorageRebalanceException If there is an error when completing
rebalance.
*/
- CompletableFuture<Void> finishRebalancePartition(
- int partitionId,
- long lastAppliedIndex,
- long lastAppliedTerm,
- byte[] groupConfig
- );
+ CompletableFuture<Void> finishRebalancePartition(int partitionId,
MvPartitionMeta partitionMeta);
/**
* Clears a partition and all associated indices. After the cleaning is
completed, a partition and all associated indices will be fully
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/PrimitivePartitionMeta.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/PrimitivePartitionMeta.java
new file mode 100644
index 0000000000..c392dc7851
--- /dev/null
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/PrimitivePartitionMeta.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.engine;
+
+import java.util.UUID;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Partition meta containing values of 'primitive' types (which are the same
for all representations of partition metadata).
+ */
+public class PrimitivePartitionMeta {
+ private final long lastAppliedIndex;
+ private final long lastAppliedTerm;
+ private final long leaseStartTime;
+ private final @Nullable UUID primaryReplicaNodeId;
+ private final @Nullable String primaryReplicaNodeName;
+
+ /** Constructor. */
+ public PrimitivePartitionMeta(
+ long lastAppliedIndex,
+ long lastAppliedTerm,
+ long leaseStartTime,
+ @Nullable UUID primaryReplicaNodeId,
+ @Nullable String primaryReplicaNodeName
+ ) {
+ this.lastAppliedIndex = lastAppliedIndex;
+ this.lastAppliedTerm = lastAppliedTerm;
+ this.leaseStartTime = leaseStartTime;
+ this.primaryReplicaNodeId = primaryReplicaNodeId;
+ this.primaryReplicaNodeName = primaryReplicaNodeName;
+ }
+
+ public long lastAppliedIndex() {
+ return lastAppliedIndex;
+ }
+
+ public long lastAppliedTerm() {
+ return lastAppliedTerm;
+ }
+
+ public long leaseStartTime() {
+ return leaseStartTime;
+ }
+
+ /** Returns primary replica node ID (or {@code null} if no primary replica
node is known). */
+ public @Nullable UUID primaryReplicaNodeId() {
+ return primaryReplicaNodeId;
+ }
+
+ /** Returns primary replica node name (or {@code null} if no primary
replica node is known). */
+ public @Nullable String primaryReplicaNodeName() {
+ return primaryReplicaNodeName;
+ }
+}
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/ThreadAssertingMvTableStorage.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/ThreadAssertingMvTableStorage.java
index cc80c45bd1..d48875ea88 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/ThreadAssertingMvTableStorage.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/ThreadAssertingMvTableStorage.java
@@ -123,15 +123,10 @@ public class ThreadAssertingMvTableStorage implements
MvTableStorage {
}
@Override
- public CompletableFuture<Void> finishRebalancePartition(
- int partitionId,
- long lastAppliedIndex,
- long lastAppliedTerm,
- byte[] groupConfig
- ) {
+ public CompletableFuture<Void> finishRebalancePartition(int partitionId,
MvPartitionMeta partitionMeta) {
assertThreadAllowsToWrite();
- return tableStorage.finishRebalancePartition(partitionId,
lastAppliedIndex, lastAppliedTerm, groupConfig);
+ return tableStorage.finishRebalancePartition(partitionId,
partitionMeta);
}
@Override
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
index 9ee0693184..1c6433da82 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
@@ -78,6 +78,7 @@ import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTupleSchema;
import org.apache.ignite.internal.schema.BinaryTupleSchema.Element;
+import org.apache.ignite.internal.storage.engine.MvPartitionMeta;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.index.HashIndexStorage;
import org.apache.ignite.internal.storage.index.IndexRow;
@@ -659,7 +660,7 @@ public abstract class AbstractMvTableStorageTest extends
BaseMvStoragesTest {
// Error because rebalance has not yet started for the partition.
assertThrows(
StorageRebalanceException.class,
- () -> tableStorage.finishRebalancePartition(PARTITION_ID, 100,
500, BYTE_EMPTY_ARRAY)
+ () -> tableStorage.finishRebalancePartition(PARTITION_ID,
saneMvPartitionMeta())
);
List<TestRow> rowsBeforeRebalanceStart = List.of(
@@ -691,18 +692,21 @@ public abstract class AbstractMvTableStorageTest extends
BaseMvStoragesTest {
// Partition is out of configuration range.
assertThrows(
IllegalArgumentException.class,
- () ->
tableStorage.finishRebalancePartition(getPartitionIdOutOfRange(), 100, 500,
BYTE_EMPTY_ARRAY)
+ () ->
tableStorage.finishRebalancePartition(getPartitionIdOutOfRange(),
saneMvPartitionMeta())
);
// Partition does not exist.
assertThrows(
StorageRebalanceException.class,
- () -> tableStorage.finishRebalancePartition(1, 100, 500,
BYTE_EMPTY_ARRAY)
+ () -> tableStorage.finishRebalancePartition(1,
saneMvPartitionMeta())
);
byte[] raftGroupConfig = createRandomRaftGroupConfiguration();
- assertThat(tableStorage.finishRebalancePartition(PARTITION_ID, 10, 20,
raftGroupConfig), willCompleteSuccessfully());
+ assertThat(
+ tableStorage.finishRebalancePartition(PARTITION_ID,
saneMvPartitionMeta(10, 20, raftGroupConfig)),
+ willCompleteSuccessfully()
+ );
completeBuiltIndexes(PARTITION_ID, hashIndexStorage,
sortedIndexStorage);
@@ -714,6 +718,14 @@ public abstract class AbstractMvTableStorageTest extends
BaseMvStoragesTest {
checkRaftGroupConfigs(raftGroupConfig,
mvPartitionStorage.committedGroupConfiguration());
}
+ private static MvPartitionMeta saneMvPartitionMeta() {
+ return saneMvPartitionMeta(100, 500, BYTE_EMPTY_ARRAY);
+ }
+
+ private static MvPartitionMeta saneMvPartitionMeta(long lastAppliedIndex,
long lastAppliedTerm, byte[] groupConfig) {
+ return new MvPartitionMeta(lastAppliedIndex, lastAppliedTerm,
groupConfig, 333, new UUID(1, 2), "primary");
+ }
+
@Test
public void testFailRebalance() throws Exception {
MvPartitionStorage mvPartitionStorage =
getOrCreateMvPartition(PARTITION_ID);
@@ -956,7 +968,7 @@ public abstract class AbstractMvTableStorageTest extends
BaseMvStoragesTest {
void testNextRowIdToBuildAfterRebalance() throws Exception {
testNextRowIdToBuildAfterOperation(() -> {
assertThat(tableStorage.startRebalancePartition(PARTITION_ID),
willCompleteSuccessfully());
- assertThat(tableStorage.finishRebalancePartition(PARTITION_ID,
100, 100, BYTE_EMPTY_ARRAY), willCompleteSuccessfully());
+ assertThat(tableStorage.finishRebalancePartition(PARTITION_ID,
saneMvPartitionMeta()), willCompleteSuccessfully());
});
}
@@ -1425,7 +1437,7 @@ public abstract class AbstractMvTableStorageTest extends
BaseMvStoragesTest {
fillStorages(mvPartitionStorage, null, null, rowsAfterRebalance);
- assertThat(tableStorage.finishRebalancePartition(PARTITION_ID, 42, 42,
BYTE_EMPTY_ARRAY), willCompleteSuccessfully());
+ assertThat(tableStorage.finishRebalancePartition(PARTITION_ID,
saneMvPartitionMeta()), willCompleteSuccessfully());
assertThat(mvPartitionStorage.estimatedSize(), is(3L));
}
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
index c4ab33988e..dd2194b4f8 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
@@ -42,6 +42,7 @@ import
org.apache.ignite.internal.storage.StorageDestroyedException;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.StorageRebalanceException;
import org.apache.ignite.internal.storage.TxIdMismatchException;
+import org.apache.ignite.internal.storage.engine.MvPartitionMeta;
import org.apache.ignite.internal.storage.gc.GcEntry;
import org.apache.ignite.internal.storage.util.LocalLocker;
import org.apache.ignite.internal.storage.util.LockByRowId;
@@ -769,16 +770,16 @@ public class TestMvPartitionStorage implements
MvPartitionStorage {
clear0();
}
- void finishRebalance(long lastAppliedIndex, long lastAppliedTerm, byte[]
groupConfig) {
+ void finishRebalance(MvPartitionMeta partitionMeta) {
checkStorageClosedForRebalance();
assert rebalance;
rebalance = false;
- this.lastAppliedIndex = lastAppliedIndex;
- this.lastAppliedTerm = lastAppliedTerm;
- this.groupConfig = Arrays.copyOf(groupConfig, groupConfig.length);
+ this.lastAppliedIndex = partitionMeta.lastAppliedIndex();
+ this.lastAppliedTerm = partitionMeta.lastAppliedTerm();
+ this.groupConfig = Arrays.copyOf(partitionMeta.groupConfig(),
partitionMeta.groupConfig().length);
}
private class ScanVersionsCursor implements Cursor<ReadResult> {
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
index 2fbcbeb80c..5550dfa2b8 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
@@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.engine.MvPartitionMeta;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
import org.apache.ignite.internal.storage.index.HashIndexStorage;
@@ -243,14 +244,19 @@ public class TestMvTableStorage implements MvTableStorage
{
}
@Override
- public CompletableFuture<Void> finishRebalancePartition(
- int partitionId,
- long lastAppliedIndex,
- long lastAppliedTerm,
- byte[] groupConfig
- ) {
+ public CompletableFuture<Void> finishRebalancePartition(int partitionId,
MvPartitionMeta partitionMeta) {
return mvPartitionStorages.finishRebalance(partitionId,
mvPartitionStorage -> {
- mvPartitionStorage.finishRebalance(lastAppliedIndex,
lastAppliedTerm, groupConfig);
+ mvPartitionStorage.finishRebalance(partitionMeta);
+
+ if (partitionMeta.primaryReplicaNodeId() != null) {
+ assert partitionMeta.primaryReplicaNodeId() != null;
+
+ mvPartitionStorage.updateLease(
+ partitionMeta.leaseStartTime(),
+ partitionMeta.primaryReplicaNodeId(),
+ partitionMeta.primaryReplicaNodeName()
+ );
+ }
testHashIndexStorageStream(partitionId).forEach(TestHashIndexStorage::finishRebalance);
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
index 53fef1f07b..543e8eb15d 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.pagememory.reuse.ReuseList;
import org.apache.ignite.internal.pagememory.tree.BplusTree;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.engine.MvPartitionMeta;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
import org.apache.ignite.internal.storage.index.HashIndexStorage;
@@ -260,17 +261,21 @@ public abstract class AbstractPageMemoryTableStorage
implements MvTableStorage {
}
@Override
- public CompletableFuture<Void> finishRebalancePartition(
- int partitionId,
- long lastAppliedIndex,
- long lastAppliedTerm,
- byte[] groupConfig
- ) {
+ public CompletableFuture<Void> finishRebalancePartition(int partitionId,
MvPartitionMeta partitionMeta) {
return inBusyLock(busyLock, () ->
mvPartitionStorages.finishRebalance(partitionId, mvPartitionStorage -> {
mvPartitionStorage.runConsistently(locker -> {
- mvPartitionStorage.lastAppliedOnRebalance(lastAppliedIndex,
lastAppliedTerm);
+
mvPartitionStorage.lastAppliedOnRebalance(partitionMeta.lastAppliedIndex(),
partitionMeta.lastAppliedTerm());
+
mvPartitionStorage.committedGroupConfigurationOnRebalance(partitionMeta.groupConfig());
+
+ if (partitionMeta.primaryReplicaNodeId() != null) {
+ assert partitionMeta.primaryReplicaNodeName() != null;
-
mvPartitionStorage.committedGroupConfigurationOnRebalance(groupConfig);
+ mvPartitionStorage.updateLeaseOnRebalance(
+ partitionMeta.leaseStartTime(),
+ partitionMeta.primaryReplicaNodeId(),
+ partitionMeta.primaryReplicaNodeName()
+ );
+ }
return null;
});
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index 07f4eff195..192c9dc702 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -746,6 +746,19 @@ public abstract class AbstractPageMemoryMvPartitionStorage
implements MvPartitio
*/
public abstract void committedGroupConfigurationOnRebalance(byte[] config);
+ /**
+ * Updates the current lease start time in the storage on rebalance.
+ *
+ * @param leaseStartTime Lease start time.
+ * @param primaryReplicaNodeId Primary replica node id.
+ * @param primaryReplicaNodeName Primary replica node name.
+ */
+ public abstract void updateLeaseOnRebalance(
+ long leaseStartTime,
+ UUID primaryReplicaNodeId,
+ String primaryReplicaNodeName
+ );
+
/**
* Prepares the storage and its indexes for cleanup.
*
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
index d117831b49..9cd8105ab3 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
@@ -321,38 +321,43 @@ public class PersistentPageMemoryMvPartitionStorage
extends AbstractPageMemoryMv
busy(() -> {
throwExceptionIfStorageNotInRunnableState();
- updateMeta((lastCheckpointId, meta) -> {
- primaryReplicaMetaReadWriteLock.writeLock().lock();
- try {
- if (leaseStartTime <= meta.leaseStartTime()) {
- return;
- }
+ updateLeaseBusy(leaseStartTime, primaryReplicaNodeId,
primaryReplicaNodeName);
- meta.primaryReplicaNodeId(lastCheckpointId,
primaryReplicaNodeId);
+ return null;
+ });
+ }
- if (meta.primaryReplicaNodeNameFirstPageId() ==
BlobStorage.NO_PAGE_ID) {
- long primaryReplicaNodeNameFirstPageId =
blobStorage.addBlob(stringToBytes(primaryReplicaNodeName));
+ private void updateLeaseBusy(long leaseStartTime, UUID
primaryReplicaNodeId, String primaryReplicaNodeName) {
+ updateMeta((lastCheckpointId, meta) -> {
+ primaryReplicaMetaReadWriteLock.writeLock().lock();
-
meta.primaryReplicaNodeNameFirstPageId(lastCheckpointId,
primaryReplicaNodeNameFirstPageId);
- } else {
-
blobStorage.updateBlob(meta.primaryReplicaNodeNameFirstPageId(),
stringToBytes(primaryReplicaNodeName));
- }
+ try {
+ if (leaseStartTime <= meta.leaseStartTime()) {
+ return;
+ }
- meta.updateLease(lastCheckpointId, leaseStartTime);
+ meta.primaryReplicaNodeId(lastCheckpointId,
primaryReplicaNodeId);
- this.primaryReplicaNodeName = primaryReplicaNodeName;
- } catch (IgniteInternalCheckedException e) {
- throw new StorageException(
- "Cannot save lease meta: [tableId={},
partitionId={}]",
- e,
- tableStorage.getTableId(), partitionId
- );
- } finally {
- primaryReplicaMetaReadWriteLock.writeLock().unlock();
+ if (meta.primaryReplicaNodeNameFirstPageId() ==
BlobStorage.NO_PAGE_ID) {
+ long primaryReplicaNodeNameFirstPageId =
blobStorage.addBlob(stringToBytes(primaryReplicaNodeName));
+
+ meta.primaryReplicaNodeNameFirstPageId(lastCheckpointId,
primaryReplicaNodeNameFirstPageId);
+ } else {
+
blobStorage.updateBlob(meta.primaryReplicaNodeNameFirstPageId(),
stringToBytes(primaryReplicaNodeName));
}
- });
- return null;
+ meta.updateLease(lastCheckpointId, leaseStartTime);
+
+ this.primaryReplicaNodeName = primaryReplicaNodeName;
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException(
+ "Cannot save lease meta: [tableId={}, partitionId={}]",
+ e,
+ tableStorage.getTableId(), partitionId
+ );
+ } finally {
+ primaryReplicaMetaReadWriteLock.writeLock().unlock();
+ }
});
}
@@ -535,6 +540,13 @@ public class PersistentPageMemoryMvPartitionStorage
extends AbstractPageMemoryMv
committedGroupConfigurationBusy(config);
}
+ @Override
+ public void updateLeaseOnRebalance(long leaseStartTime, UUID
primaryReplicaNodeId, String primaryReplicaNodeName) {
+ throwExceptionIfStorageNotInProgressOfRebalance(state.get(),
this::createStorageInfo);
+
+ updateLeaseBusy(leaseStartTime, primaryReplicaNodeId,
primaryReplicaNodeName);
+ }
+
private void saveFreeListMetadataBusy(RenewablePartitionStorageState
localState) {
try {
localState.freeList().saveMetadata();
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
index 6db571befb..a65ef9f2f8 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
@@ -216,14 +216,18 @@ public class VolatilePageMemoryMvPartitionStorage extends
AbstractPageMemoryMvPa
return null;
}
- this.leaseStartTime = leaseStartTime;
- this.primaryReplicaNodeId = primaryReplicaNodeId;
- this.primaryReplicaNodeName = primaryReplicaNodeName;
+ updateLeaseBusy(leaseStartTime, primaryReplicaNodeId,
primaryReplicaNodeName);
return null;
});
}
+ private void updateLeaseBusy(long leaseStartTime, UUID
primaryReplicaNodeId, String primaryReplicaNodeName) {
+ this.leaseStartTime = leaseStartTime;
+ this.primaryReplicaNodeId = primaryReplicaNodeId;
+ this.primaryReplicaNodeName = primaryReplicaNodeName;
+ }
+
@Override
public long leaseStartTime() {
return busy(() -> {
@@ -391,6 +395,13 @@ public class VolatilePageMemoryMvPartitionStorage extends
AbstractPageMemoryMvPa
this.groupConfig = config;
}
+ @Override
+ public void updateLeaseOnRebalance(long leaseStartTime, UUID
primaryReplicaNodeId, String primaryReplicaNodeName) {
+ throwExceptionIfStorageNotInProgressOfRebalance(state.get(),
this::createStorageInfo);
+
+ updateLeaseBusy(leaseStartTime, primaryReplicaNodeId,
primaryReplicaNodeName);
+ }
+
@Override
public long estimatedSize() {
return estimatedSize;
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 4a56f119a5..60229f23ff 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -79,6 +79,7 @@ import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.StorageRebalanceException;
import org.apache.ignite.internal.storage.TxIdMismatchException;
+import org.apache.ignite.internal.storage.engine.MvPartitionMeta;
import org.apache.ignite.internal.storage.gc.GcEntry;
import org.apache.ignite.internal.storage.rocksdb.GarbageCollector.AddResult;
import org.apache.ignite.internal.storage.util.LocalLocker;
@@ -1117,32 +1118,35 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
return null;
}
- AbstractWriteBatch writeBatch = requireWriteBatch();
+ saveLease(requireWriteBatch(), leaseStartTime,
primaryReplicaNodeId, primaryReplicaNodeName);
- try {
- ByteArrayOutputStream outputStream = new
ByteArrayOutputStream();
- outputStream.write(longToBytes(leaseStartTime));
+ return null;
+ });
+ }
- byte[] primaryReplicaNodeIdBytes =
uuidToBytes(primaryReplicaNodeId);
- outputStream.write(primaryReplicaNodeIdBytes);
+ private void saveLease(AbstractWriteBatch writeBatch, long leaseStartTime,
UUID primaryReplicaNodeId, String primaryReplicaNodeName) {
+ // TODO: IGNITE-23683 - switch to VersionedSerialization.
+ try {
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ outputStream.write(longToBytes(leaseStartTime));
- byte[] primaryReplicaNodeNameBytes =
stringToBytes(primaryReplicaNodeName);
-
outputStream.write(intToBytes(primaryReplicaNodeNameBytes.length));
- outputStream.write(primaryReplicaNodeNameBytes);
+ byte[] primaryReplicaNodeIdBytes =
uuidToBytes(primaryReplicaNodeId);
+ outputStream.write(primaryReplicaNodeIdBytes);
- writeBatch.put(meta, leaseKey, outputStream.toByteArray());
+ byte[] primaryReplicaNodeNameBytes =
stringToBytes(primaryReplicaNodeName);
+ outputStream.write(intToBytes(primaryReplicaNodeNameBytes.length));
+ outputStream.write(primaryReplicaNodeNameBytes);
- this.leaseStartTime = leaseStartTime;
- this.primaryReplicaNodeId = primaryReplicaNodeId;
- this.primaryReplicaNodeName = primaryReplicaNodeName;
- } catch (IOException e) {
- throw new StorageException(e);
- } catch (RocksDBException e) {
- throw new IgniteRocksDbException(e);
- }
+ writeBatch.put(meta, leaseKey, outputStream.toByteArray());
- return null;
- });
+ this.leaseStartTime = leaseStartTime;
+ this.primaryReplicaNodeId = primaryReplicaNodeId;
+ this.primaryReplicaNodeName = primaryReplicaNodeName;
+ } catch (IOException e) {
+ throw new StorageException(e);
+ } catch (RocksDBException e) {
+ throw new IgniteRocksDbException(e);
+ }
}
@Override
@@ -1653,15 +1657,26 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
*
* @throws StorageRebalanceException If there was an error when finishing
the rebalance.
*/
- void finishRebalance(WriteBatch writeBatch, long lastAppliedIndex, long
lastAppliedTerm, byte[] groupConfig) {
+ void finishRebalance(WriteBatch writeBatch, MvPartitionMeta partitionMeta)
{
if (!state.compareAndSet(StorageState.REBALANCE,
StorageState.RUNNABLE)) {
throwExceptionDependingOnStorageStateOnRebalance(state.get(),
createStorageInfo());
}
try {
- saveLastApplied(writeBatch, lastAppliedIndex, lastAppliedTerm);
+ saveLastApplied(writeBatch, partitionMeta.lastAppliedIndex(),
partitionMeta.lastAppliedTerm());
+
+ saveGroupConfigurationOnRebalance(writeBatch,
partitionMeta.groupConfig());
+
+ if (partitionMeta.primaryReplicaNodeId() != null) {
+ assert partitionMeta.primaryReplicaNodeName() != null;
- saveGroupConfigurationOnRebalance(writeBatch, groupConfig);
+ updateLeaseOnRebalance(
+ writeBatch,
+ partitionMeta.leaseStartTime(),
+ partitionMeta.primaryReplicaNodeId(),
+ partitionMeta.primaryReplicaNodeName()
+ );
+ }
} catch (RocksDBException e) {
throw new StorageRebalanceException("Error when trying to abort
rebalancing storage: " + createStorageInfo(), e);
}
@@ -1696,6 +1711,19 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
this.lastGroupConfig = config.clone();
}
+ private void updateLeaseOnRebalance(
+ WriteBatch writeBatch,
+ long leaseStartTime,
+ UUID primaryReplicaNodeId,
+ String primaryReplicaNodeName
+ ) {
+ saveLease(writeBatch, leaseStartTime, primaryReplicaNodeId,
primaryReplicaNodeName);
+
+ this.leaseStartTime = leaseStartTime;
+ this.primaryReplicaNodeId = primaryReplicaNodeId;
+ this.primaryReplicaNodeName = primaryReplicaNodeName;
+ }
+
/**
* Prepares the storage for cleanup.
*
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index 85d71af2d1..6b67f1f274 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.StorageRebalanceException;
+import org.apache.ignite.internal.storage.engine.MvPartitionMeta;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
import org.apache.ignite.internal.storage.index.HashIndexStorage;
@@ -329,15 +330,10 @@ public class RocksDbTableStorage implements
MvTableStorage {
}
@Override
- public CompletableFuture<Void> finishRebalancePartition(
- int partitionId,
- long lastAppliedIndex,
- long lastAppliedTerm,
- byte[] groupConfig
- ) {
+ public CompletableFuture<Void> finishRebalancePartition(int partitionId,
MvPartitionMeta partitionMeta) {
return inBusyLock(busyLock, () ->
mvPartitionStorages.finishRebalance(partitionId, mvPartitionStorage -> {
try (WriteBatch writeBatch = new WriteBatch()) {
- mvPartitionStorage.finishRebalance(writeBatch,
lastAppliedIndex, lastAppliedTerm, groupConfig);
+ mvPartitionStorage.finishRebalance(writeBatch, partitionMeta);
indexes.finishRebalance(partitionId);
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
index 71b181013f..170f1dc511 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
@@ -23,6 +23,7 @@ import static
org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIMEM_
import static
org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME;
import static
org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_ROCKSDB_PROFILE_NAME;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static org.apache.ignite.internal.raft.util.OptimizedMarshaller.NO_POOL;
import static
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.executeUpdate;
@@ -35,7 +36,9 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.nio.file.Path;
@@ -68,9 +71,11 @@ import org.apache.ignite.internal.raft.server.RaftServer;
import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
import
org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
import
org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryStorageEngine;
import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import org.apache.ignite.internal.table.InternalTable;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.incoming.IncomingSnapshotCopier;
import
org.apache.ignite.internal.table.distributed.schema.PartitionCommandsMarshallerImpl;
import org.apache.ignite.internal.test.WatchListenerInhibitor;
@@ -118,12 +123,13 @@ import org.junit.jupiter.params.provider.ValueSource;
@SuppressWarnings("resource")
@Timeout(90)
@ExtendWith(WorkDirectoryExtension.class)
-@Disabled("https://issues.apache.org/jira/browse/IGNITE-23379")
class ItTableRaftSnapshotsTest extends BaseIgniteAbstractTest {
private static final IgniteLogger LOG =
Loggers.forClass(ItTableRaftSnapshotsTest.class);
private static final int AWAIT_PRIMARY_REPLICA_SECONDS = 10;
+ private static final int PARTITION_ID = 0;
+
/**
* Nodes bootstrap configuration pattern.
*
@@ -200,12 +206,32 @@ class ItTableRaftSnapshotsTest extends
BaseIgniteAbstractTest {
transferLeadershipOnSolePartitionTo(2);
assertThat(getFromNode(2, 1), is("one"));
+ assertThat(estimatedSizeFromNode(2), is(1L));
+
+ MvPartitionStorage partitionAtNode0 = mvPartitionAtNode(0);
+ MvPartitionStorage partitionAtNode2 = mvPartitionAtNode(2);
+
+ assertThat(partitionAtNode2.leaseStartTime(), not(0L));
+ assertEquals(partitionAtNode0.primaryReplicaNodeId(),
partitionAtNode2.primaryReplicaNodeId());
+ assertEquals(partitionAtNode0.primaryReplicaNodeName(),
partitionAtNode2.primaryReplicaNodeName());
}
private @Nullable String getFromNode(int clusterNode, int key) {
return tableViewAt(clusterNode).get(null, key);
}
+ private long estimatedSizeFromNode(int clusterNode) {
+ return mvPartitionAtNode(clusterNode).estimatedSize();
+ }
+
+ private MvPartitionStorage mvPartitionAtNode(int clusterNode) {
+ InternalTable internalTable =
unwrapTableImpl(tableAt(clusterNode)).internalTable();
+ MvPartitionStorage mvPartition =
internalTable.storage().getMvPartition(PARTITION_ID);
+
+ assertNotNull(mvPartition);
+ return mvPartition;
+ }
+
private void feedNode2WithSnapshotOfOneRow() throws InterruptedException {
feedNode2WithSnapshotOfOneRow(DEFAULT_STORAGE_ENGINE);
}
@@ -290,10 +316,14 @@ class ItTableRaftSnapshotsTest extends
BaseIgniteAbstractTest {
}
private KeyValueView<Integer, String> tableViewAt(int nodeIndex) {
- Table table = cluster.node(nodeIndex).tables().table("test");
+ Table table = tableAt(nodeIndex);
return table.keyValueView(Integer.class, String.class);
}
+ private Table tableAt(int nodeIndex) {
+ return cluster.node(nodeIndex).tables().table("test");
+ }
+
private void knockoutNode(int nodeIndex) {
cluster.stopNode(nodeIndex);
@@ -732,6 +762,7 @@ class ItTableRaftSnapshotsTest extends
BaseIgniteAbstractTest {
* rejected, and that, when metadata catches up, the snapshot gets
successfully installed.
*/
@Test
+ @Disabled("IGNITE-23677")
void laggingSchemasOnFollowerPreventSnapshotInstallation() throws
Exception {
startAndInitCluster();
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccess.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccess.java
index e86488b3ce..b52a8f99e3 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccess.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccess.java
@@ -135,6 +135,27 @@ public interface PartitionAccess {
*/
long maxLastAppliedTerm();
+ /**
+ * Returns the start time of the known lease for this replication group.
+ *
+ * @return Lease start time.
+ */
+ long leaseStartTime();
+
+ /**
+ * Return the node ID of the known lease for this replication group.
+ *
+ * @return Primary replica node id or {@code null} if there is no
information about lease in the storage.
+ */
+ @Nullable UUID primaryReplicaNodeId();
+
+ /**
+ * Return the node name of the known lease for this replication group.
+ *
+ * @return Primary replica node name or {@code null} if there is no
information about lease in the storage.
+ */
+ @Nullable String primaryReplicaNodeName();
+
/**
* Prepares partition storages for rebalancing.
* <ul>
@@ -158,7 +179,7 @@ public interface PartitionAccess {
* <p>This method must be called before every rebalance and ends with a
call to one of the methods:
* <ul>
* <li>{@link #abortRebalance()} - in case of errors or cancellation
of rebalance;</li>
- * <li>{@link #finishRebalance(long, long, RaftGroupConfiguration)} -
in case of successful completion of rebalance.</li>
+ * <li>{@link #finishRebalance(RaftSnapshotPartitionMeta)} - in case
of successful completion of rebalance.</li>
* </ul>
*
* @return Future of the operation.
@@ -191,13 +212,11 @@ public interface PartitionAccess {
*
* <p>If rebalance has not started, then {@link StorageRebalanceException}
will be thrown.
*
- * @param lastAppliedIndex Last applied index.
- * @param lastAppliedTerm Last applied term.
- * @param raftGroupConfig RAFT group configuration.
+ * @param partitionMeta Partition metadata.
* @return Future of the operation.
* @throws StorageRebalanceException If there are errors when trying to
finish rebalancing.
*/
- CompletableFuture<Void> finishRebalance(long lastAppliedIndex, long
lastAppliedTerm, RaftGroupConfiguration raftGroupConfig);
+ CompletableFuture<Void> finishRebalance(RaftSnapshotPartitionMeta
partitionMeta);
/**
* Returns the row ID for which the index needs to be built, {@code null}
means that the index building has completed.
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
index 8f179c6c2f..8672dc1611 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
@@ -121,22 +121,22 @@ public class PartitionAccessImpl implements
PartitionAccess {
@Override
public Cursor<IgniteBiTuple<UUID, TxMeta>> getAllTxMeta() {
- return getTxStateStorage(partitionId()).scan();
+ return getTxStateStorage().scan();
}
@Override
public void addTxMeta(UUID txId, TxMeta txMeta) {
- getTxStateStorage(partitionId()).putForRebalance(txId, txMeta);
+ getTxStateStorage().putForRebalance(txId, txMeta);
}
@Override
public @Nullable RowId closestRowId(RowId lowerBound) {
- return getMvPartitionStorage(partitionId()).closestRowId(lowerBound);
+ return getMvPartitionStorage().closestRowId(lowerBound);
}
@Override
public List<ReadResult> getAllRowVersions(RowId rowId) {
- MvPartitionStorage mvPartitionStorage =
getMvPartitionStorage(partitionId());
+ MvPartitionStorage mvPartitionStorage = getMvPartitionStorage();
return mvPartitionStorage.runConsistently(locker -> {
locker.lock(rowId);
@@ -149,14 +149,14 @@ public class PartitionAccessImpl implements
PartitionAccess {
@Override
public @Nullable RaftGroupConfiguration committedGroupConfiguration() {
- byte[] configBytes =
getMvPartitionStorage(partitionId()).committedGroupConfiguration();
+ byte[] configBytes =
getMvPartitionStorage().committedGroupConfiguration();
return raftGroupConfigurationConverter.fromBytes(configBytes);
}
@Override
public void addWrite(RowId rowId, @Nullable BinaryRow row, UUID txId, int
commitTableId, int commitPartitionId, int catalogVersion) {
- MvPartitionStorage mvPartitionStorage =
getMvPartitionStorage(partitionId());
+ MvPartitionStorage mvPartitionStorage = getMvPartitionStorage();
List<IndexIdAndTableVersion> indexIdAndTableVersionList =
fullStateTransferIndexChooser.chooseForAddWrite(
catalogVersion,
@@ -181,7 +181,7 @@ public class PartitionAccessImpl implements PartitionAccess
{
@Override
public void addWriteCommitted(RowId rowId, @Nullable BinaryRow row,
HybridTimestamp commitTimestamp, int catalogVersion) {
- MvPartitionStorage mvPartitionStorage =
getMvPartitionStorage(partitionId());
+ MvPartitionStorage mvPartitionStorage = getMvPartitionStorage();
List<IndexIdAndTableVersion> indexIdAndTableVersionList =
fullStateTransferIndexChooser.chooseForAddWriteCommitted(
catalogVersion,
@@ -207,38 +207,53 @@ public class PartitionAccessImpl implements
PartitionAccess {
@Override
public long minLastAppliedIndex() {
return Math.min(
- getMvPartitionStorage(partitionId()).lastAppliedIndex(),
- getTxStateStorage(partitionId()).lastAppliedIndex()
+ getMvPartitionStorage().lastAppliedIndex(),
+ getTxStateStorage().lastAppliedIndex()
);
}
@Override
public long minLastAppliedTerm() {
return Math.min(
- getMvPartitionStorage(partitionId()).lastAppliedTerm(),
- getTxStateStorage(partitionId()).lastAppliedTerm()
+ getMvPartitionStorage().lastAppliedTerm(),
+ getTxStateStorage().lastAppliedTerm()
);
}
@Override
public long maxLastAppliedIndex() {
return Math.max(
- getMvPartitionStorage(partitionId()).lastAppliedIndex(),
- getTxStateStorage(partitionId()).lastAppliedIndex()
+ getMvPartitionStorage().lastAppliedIndex(),
+ getTxStateStorage().lastAppliedIndex()
);
}
@Override
public long maxLastAppliedTerm() {
return Math.max(
- getMvPartitionStorage(partitionId()).lastAppliedTerm(),
- getTxStateStorage(partitionId()).lastAppliedTerm()
+ getMvPartitionStorage().lastAppliedTerm(),
+ getTxStateStorage().lastAppliedTerm()
);
}
+ @Override
+ public long leaseStartTime() {
+ return getMvPartitionStorage().leaseStartTime();
+ }
+
+ @Override
+ public @Nullable UUID primaryReplicaNodeId() {
+ return getMvPartitionStorage().primaryReplicaNodeId();
+ }
+
+ @Override
+ public @Nullable String primaryReplicaNodeName() {
+ return getMvPartitionStorage().primaryReplicaNodeName();
+ }
+
@Override
public CompletableFuture<Void> startRebalance() {
- TxStateStorage txStateStorage = getTxStateStorage(partitionId());
+ TxStateStorage txStateStorage = getTxStateStorage();
return mvGc.removeStorage(toTablePartitionId(partitionKey))
.thenCompose(unused -> CompletableFuture.allOf(
@@ -249,7 +264,7 @@ public class PartitionAccessImpl implements PartitionAccess
{
@Override
public CompletableFuture<Void> abortRebalance() {
- TxStateStorage txStateStorage = getTxStateStorage(partitionId());
+ TxStateStorage txStateStorage = getTxStateStorage();
return CompletableFuture.allOf(
mvTableStorage.abortRebalancePartition(partitionId()),
@@ -258,14 +273,14 @@ public class PartitionAccessImpl implements
PartitionAccess {
}
@Override
- public CompletableFuture<Void> finishRebalance(long lastAppliedIndex, long
lastAppliedTerm, RaftGroupConfiguration raftGroupConfig) {
- TxStateStorage txStateStorage = getTxStateStorage(partitionId());
+ public CompletableFuture<Void> finishRebalance(RaftSnapshotPartitionMeta
partitionMeta) {
+ TxStateStorage txStateStorage = getTxStateStorage();
- byte[] configBytes =
raftGroupConfigurationConverter.toBytes(raftGroupConfig);
+ byte[] configBytes =
raftGroupConfigurationConverter.toBytes(partitionMeta.raftGroupConfig());
return CompletableFuture.allOf(
- mvTableStorage.finishRebalancePartition(partitionId(),
lastAppliedIndex, lastAppliedTerm, configBytes),
- txStateStorage.finishRebalance(lastAppliedIndex,
lastAppliedTerm)
+ mvTableStorage.finishRebalancePartition(partitionId(),
partitionMeta.toMvPartitionMeta(configBytes)),
+
txStateStorage.finishRebalance(partitionMeta.lastAppliedIndex(),
partitionMeta.lastAppliedTerm())
).thenAccept(unused ->
mvGc.addStorage(toTablePartitionId(partitionKey), gcUpdateHandler));
}
@@ -276,7 +291,7 @@ public class PartitionAccessImpl implements PartitionAccess
{
@Override
public void setNextRowIdToBuildIndex(Map<Integer, RowId>
nextRowIdToBuildByIndexId) {
- MvPartitionStorage mvPartitionStorage =
getMvPartitionStorage(partitionId());
+ MvPartitionStorage mvPartitionStorage = getMvPartitionStorage();
mvPartitionStorage.runConsistently(locker -> {
nextRowIdToBuildByIndexId.forEach(indexUpdateHandler::setNextRowIdToBuildIndex);
@@ -290,7 +305,9 @@ public class PartitionAccessImpl implements PartitionAccess
{
lowWatermark.updateLowWatermark(newLowWatermark);
}
- private MvPartitionStorage getMvPartitionStorage(int partitionId) {
+ private MvPartitionStorage getMvPartitionStorage() {
+ int partitionId = partitionId();
+
MvPartitionStorage mvPartitionStorage =
mvTableStorage.getMvPartition(partitionId);
assert mvPartitionStorage != null :
IgniteStringFormatter.format("tableId={}, partitionId={}", tableId(),
partitionId);
@@ -298,7 +315,9 @@ public class PartitionAccessImpl implements PartitionAccess
{
return mvPartitionStorage;
}
- private TxStateStorage getTxStateStorage(int partitionId) {
+ private TxStateStorage getTxStateStorage() {
+ int partitionId = partitionId();
+
TxStateStorage txStateStorage =
txStateTableStorage.getTxStateStorage(partitionId);
assert txStateStorage != null :
IgniteStringFormatter.format("tableId={}, partitionId={}", tableId(),
partitionId);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java
index 93e02b32af..a60b2d819f 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java
@@ -21,15 +21,14 @@ import static
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoin
import static
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.SnapshotMetaUtils.snapshotMetaAt;
import java.util.Map;
+import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.Executor;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.network.TopologyService;
-import org.apache.ignite.internal.raft.RaftGroupConfiguration;
import org.apache.ignite.internal.raft.storage.SnapshotStorageFactory;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
-import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
import org.apache.ignite.raft.jraft.option.RaftOptions;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
@@ -58,19 +57,7 @@ public class PartitionSnapshotStorageFactory implements
SnapshotStorageFactory {
private final CatalogService catalogService;
- /**
- * RAFT log index, min of {@link MvPartitionStorage#lastAppliedIndex()}
and {@link TxStateStorage#lastAppliedIndex()}
- * at the moment of this factory instantiation.
- */
- private final long lastIncludedRaftIndex;
-
- /** Term corresponding to {@link #lastIncludedRaftIndex}. */
- private final long lastIncludedRaftTerm;
-
- /** RAFT configuration corresponding to {@link #lastIncludedRaftIndex}. */
- private final RaftGroupConfiguration lastIncludedConfiguration;
-
- private final int lastCatalogVersionAtStart;
+ private final @Nullable SnapshotMeta startupSnapshotMeta;
/** Incoming snapshots executor. */
private final Executor incomingSnapshotsExecutor;
@@ -101,12 +88,25 @@ public class PartitionSnapshotStorageFactory implements
SnapshotStorageFactory {
// We must choose the minimum applied index for local recovery so that
we don't skip the raft commands for the storage with the
// lowest applied index and thus no data loss occurs.
- lastIncludedRaftIndex = partition.minLastAppliedIndex();
- lastIncludedRaftTerm = partition.minLastAppliedTerm();
+ long lastIncludedRaftIndex = partition.minLastAppliedIndex();
+ long lastIncludedRaftTerm = partition.minLastAppliedTerm();
- lastIncludedConfiguration = partition.committedGroupConfiguration();
+ int lastCatalogVersionAtStart = catalogService.latestCatalogVersion();
- lastCatalogVersionAtStart = catalogService.latestCatalogVersion();
+ if (lastIncludedRaftIndex == 0) {
+ startupSnapshotMeta = null;
+ } else {
+ startupSnapshotMeta = snapshotMetaAt(
+ lastIncludedRaftIndex,
+ lastIncludedRaftTerm,
+
Objects.requireNonNull(partition.committedGroupConfiguration()),
+ lastCatalogVersionAtStart,
+
collectNextRowIdToBuildIndexesAtStart(lastCatalogVersionAtStart),
+ partition.leaseStartTime(),
+ partition.primaryReplicaNodeId(),
+ partition.primaryReplicaNodeName()
+ );
+ }
}
@Override
@@ -118,26 +118,12 @@ public class PartitionSnapshotStorageFactory implements
SnapshotStorageFactory {
raftOptions,
partition,
catalogService,
- createStartupSnapshotMeta(),
+ startupSnapshotMeta,
incomingSnapshotsExecutor
);
}
- private @Nullable SnapshotMeta createStartupSnapshotMeta() {
- if (lastIncludedRaftIndex == 0) {
- return null;
- }
-
- return snapshotMetaAt(
- lastIncludedRaftIndex,
- lastIncludedRaftTerm,
- lastIncludedConfiguration,
- lastCatalogVersionAtStart,
- collectNextRowIdToBuildIndexesAtStart()
- );
- }
-
- private Map<Integer, UUID> collectNextRowIdToBuildIndexesAtStart() {
+ private Map<Integer, UUID> collectNextRowIdToBuildIndexesAtStart(int
lastCatalogVersionAtStart) {
return collectNextRowIdToBuildIndexes(catalogService, partition,
lastCatalogVersionAtStart);
}
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/RaftSnapshotPartitionMeta.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/RaftSnapshotPartitionMeta.java
new file mode 100644
index 0000000000..13a7162ba2
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/RaftSnapshotPartitionMeta.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft.snapshot;
+
+import java.util.UUID;
+import
org.apache.ignite.internal.partition.replicator.network.raft.PartitionSnapshotMeta;
+import org.apache.ignite.internal.raft.RaftGroupConfiguration;
+import org.apache.ignite.internal.storage.engine.MvPartitionMeta;
+import org.apache.ignite.internal.storage.engine.PrimitivePartitionMeta;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Partition metadata for {@link PartitionAccess}.
+ */
+public class RaftSnapshotPartitionMeta extends PrimitivePartitionMeta {
+ private final RaftGroupConfiguration raftGroupConfig;
+
+ /** Constructs an {@link RaftSnapshotPartitionMeta} from a {@link
PartitionSnapshotMeta} . */
+ public static RaftSnapshotPartitionMeta
fromSnapshotMeta(PartitionSnapshotMeta meta, RaftGroupConfiguration
raftGroupConfig) {
+ return new RaftSnapshotPartitionMeta(
+ meta.lastIncludedIndex(),
+ meta.lastIncludedTerm(),
+ raftGroupConfig,
+ meta.leaseStartTime(),
+ meta.primaryReplicaNodeId(),
+ meta.primaryReplicaNodeName()
+ );
+ }
+
+ /** Constructor. */
+ private RaftSnapshotPartitionMeta(
+ long lastAppliedIndex,
+ long lastAppliedTerm,
+ RaftGroupConfiguration raftGroupConfig,
+ long leaseStartTime,
+ @Nullable UUID primaryReplicaNodeId,
+ @Nullable String primaryReplicaNodeName
+ ) {
+ super(lastAppliedIndex, lastAppliedTerm, leaseStartTime,
primaryReplicaNodeId, primaryReplicaNodeName);
+
+ this.raftGroupConfig = raftGroupConfig;
+ }
+
+ /** Returns replication group config. */
+ public RaftGroupConfiguration raftGroupConfig() {
+ return raftGroupConfig;
+ }
+
+ /**
+ * Converts this meta to {@link MvPartitionMeta}.
+ *
+ * @param configBytes Group config represented as bytes.
+ */
+ public MvPartitionMeta toMvPartitionMeta(byte[] configBytes) {
+ return new MvPartitionMeta(
+ lastAppliedIndex(),
+ lastAppliedTerm(),
+ configBytes,
+ leaseStartTime(),
+ primaryReplicaNodeId(),
+ primaryReplicaNodeName()
+ );
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
index a8d3d99084..830ac6ddce 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
@@ -62,11 +62,11 @@ import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageRebalanceException;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccess;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionSnapshotStorage;
+import
org.apache.ignite.internal.table.distributed.raft.snapshot.RaftSnapshotPartitionMeta;
import org.apache.ignite.internal.table.distributed.raft.snapshot.SnapshotUri;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.network.ClusterNode;
-import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
import org.apache.ignite.raft.jraft.error.RaftError;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotCopier;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
@@ -470,7 +470,7 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
return
partitionSnapshotStorage.partition().abortRebalance().thenCompose(unused ->
failedFuture(throwable));
}
- SnapshotMeta meta = snapshotMeta;
+ PartitionSnapshotMeta meta = snapshotMeta;
RaftGroupConfiguration raftGroupConfig = new
RaftGroupConfiguration(
meta.peersList(),
@@ -487,7 +487,7 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
raftGroupConfig
);
- return
partitionSnapshotStorage.partition().finishRebalance(meta.lastIncludedIndex(),
meta.lastIncludedTerm(), raftGroupConfig);
+ return
partitionSnapshotStorage.partition().finishRebalance(RaftSnapshotPartitionMeta.fromSnapshotMeta(meta,
raftGroupConfig));
} finally {
busyLock.leaveBusy();
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java
index 6b5858d01b..a54385158d 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java
@@ -168,9 +168,6 @@ public class OutgoingSnapshot {
}
private PartitionSnapshotMeta takeSnapshotMeta() {
- long lastAppliedIndex = partition.maxLastAppliedIndex();
- long lastAppliedTerm = partition.maxLastAppliedTerm();
-
RaftGroupConfiguration config =
partition.committedGroupConfiguration();
assert config != null : "Configuration should never be null when
installing a snapshot";
@@ -179,7 +176,16 @@ public class OutgoingSnapshot {
Map<Integer, UUID> nextRowIdToBuildByIndexId =
collectNextRowIdToBuildIndexes(catalogService, partition, catalogVersion);
- return snapshotMetaAt(lastAppliedIndex, lastAppliedTerm, config,
catalogVersion, nextRowIdToBuildByIndexId);
+ return snapshotMetaAt(
+ partition.maxLastAppliedIndex(),
+ partition.maxLastAppliedTerm(),
+ config,
+ catalogVersion,
+ nextRowIdToBuildByIndexId,
+ partition.leaseStartTime(),
+ partition.primaryReplicaNodeId(),
+ partition.primaryReplicaNodeName()
+ );
}
/**
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotMetaUtils.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotMetaUtils.java
index bbd2db8ef1..de405eb47f 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotMetaUtils.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotMetaUtils.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.UUID;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import
org.apache.ignite.internal.partition.replicator.network.raft.PartitionSnapshotMeta;
import
org.apache.ignite.internal.partition.replicator.network.raft.PartitionSnapshotMetaBuilder;
@@ -31,6 +32,7 @@ import org.apache.ignite.internal.raft.RaftGroupConfiguration;
import org.apache.ignite.internal.storage.RowId;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccess;
import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
+import org.jetbrains.annotations.Nullable;
/**
* Utils to build {@link SnapshotMeta} instances.
@@ -45,6 +47,9 @@ public class SnapshotMetaUtils {
* @param requiredCatalogVersion Catalog version that a follower/learner
must have to have ability to accept this snapshot.
* @param nextRowIdToBuildByIndexId Row ID for which the index needs to be
built per building index ID at the time the snapshot meta was
* created.
+ * @param leaseStartTime Lease start time (as {@link
HybridTimestamp#longValue()}).
+ * @param primaryReplicaNodeId Primary replica node ID.
+ * @param primaryReplicaNodeName Primary replica node name.
* @return SnapshotMeta corresponding to the given log index.
*/
public static PartitionSnapshotMeta snapshotMetaAt(
@@ -52,7 +57,10 @@ public class SnapshotMetaUtils {
long term,
RaftGroupConfiguration config,
int requiredCatalogVersion,
- Map<Integer, UUID> nextRowIdToBuildByIndexId
+ Map<Integer, UUID> nextRowIdToBuildByIndexId,
+ long leaseStartTime,
+ @Nullable UUID primaryReplicaNodeId,
+ @Nullable String primaryReplicaNodeName
) {
PartitionSnapshotMetaBuilder metaBuilder = new
PartitionReplicationMessagesFactory().partitionSnapshotMeta()
.lastIncludedIndex(logIndex)
@@ -60,7 +68,10 @@ public class SnapshotMetaUtils {
.peersList(config.peers())
.learnersList(config.learners())
.requiredCatalogVersion(requiredCatalogVersion)
- .nextRowIdToBuildByIndexId(nextRowIdToBuildByIndexId);
+ .nextRowIdToBuildByIndexId(nextRowIdToBuildByIndexId)
+ .leaseStartTime(leaseStartTime)
+ .primaryReplicaNodeId(primaryReplicaNodeId)
+ .primaryReplicaNodeName(primaryReplicaNodeName);
if (!config.isStable()) {
//noinspection ConstantConditions
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/NullMvTableStorage.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/NullMvTableStorage.java
index 5fc64644d9..e31ec44ec5 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/NullMvTableStorage.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/NullMvTableStorage.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.table.distributed.storage;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.engine.MvPartitionMeta;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
import org.apache.ignite.internal.storage.index.HashIndexStorage;
@@ -98,12 +99,7 @@ public class NullMvTableStorage implements MvTableStorage {
}
@Override
- public CompletableFuture<Void> finishRebalancePartition(
- int partitionId,
- long lastAppliedIndex,
- long lastAppliedTerm,
- byte[] groupConfig
- ) {
+ public CompletableFuture<Void> finishRebalancePartition(int partitionId,
MvPartitionMeta partitionMeta) {
return throwNoPartitionsException();
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 7d73b2d82d..793538723d 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -683,6 +683,8 @@ public class TableManagerTest extends IgniteAbstractTest {
when(mvPartitionStorage.lastAppliedIndex()).thenReturn(MvPartitionStorage.REBALANCE_IN_PROGRESS);
}
+ when(mvPartitionStorage.committedGroupConfiguration()).thenReturn(new
byte[0]);
+
doReturn(mock(PartitionTimestampCursor.class)).when(mvPartitionStorage).scan(any());
when(txStateStorage.clear()).thenReturn(nullCompletedFuture());
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
index 5e8d5e5dda..f701f57230 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
@@ -172,6 +172,10 @@ public class IncomingSnapshotCopierTest extends
BaseIgniteAbstractTest {
private final long expLastAppliedTerm = 100L;
private final RaftGroupConfiguration expLastGroupConfig =
generateRaftGroupConfig();
+ private final long expLeaseStartTime = 3000000;
+ private final UUID expPrimaryReplicaNodeId = new UUID(1, 2);
+ private final String expPrimaryReplicaNodeName = "primary";
+
private final List<RowId> rowIds = generateRowIds();
private final List<UUID> txIds = generateTxIds();
@@ -238,6 +242,10 @@ public class IncomingSnapshotCopierTest extends
BaseIgniteAbstractTest {
raftGroupConfigurationConverter.toBytes(expLastGroupConfig),
outgoingMvPartitionStorage.committedGroupConfiguration()
);
+ assertEquals(expLeaseStartTime,
outgoingMvPartitionStorage.leaseStartTime());
+ assertEquals(expPrimaryReplicaNodeId,
outgoingMvPartitionStorage.primaryReplicaNodeId());
+ assertEquals(expPrimaryReplicaNodeName,
outgoingMvPartitionStorage.primaryReplicaNodeName());
+
assertEquals(expLastAppliedIndex,
outgoingTxStatePartitionStorage.lastAppliedIndex());
assertEquals(expLastAppliedTerm,
outgoingTxStatePartitionStorage.lastAppliedTerm());
@@ -258,7 +266,16 @@ public class IncomingSnapshotCopierTest extends
BaseIgniteAbstractTest {
}
private void fillOriginalStorages() {
- fillMvPartitionStorage(outgoingMvPartitionStorage,
expLastAppliedIndex, expLastAppliedTerm, expLastGroupConfig, rowIds);
+ fillMvPartitionStorage(
+ outgoingMvPartitionStorage,
+ expLastAppliedIndex,
+ expLastAppliedTerm,
+ expLastGroupConfig,
+ rowIds,
+ expLeaseStartTime,
+ expPrimaryReplicaNodeId,
+ expPrimaryReplicaNodeName
+ );
fillTxStatePartitionStorage(outgoingTxStatePartitionStorage,
expLastAppliedIndex, expLastAppliedTerm, txIds);
}
@@ -319,7 +336,10 @@ public class IncomingSnapshotCopierTest extends
BaseIgniteAbstractTest {
expLastAppliedTerm,
expLastGroupConfig,
requiredCatalogVersion,
- Map.of(indexId, nextRowIdToBuildIndex.uuid())
+ Map.of(indexId, nextRowIdToBuildIndex.uuid()),
+ expLeaseStartTime,
+ expPrimaryReplicaNodeId,
+ expPrimaryReplicaNodeName
))
.build();
}
@@ -366,7 +386,10 @@ public class IncomingSnapshotCopierTest extends
BaseIgniteAbstractTest {
long lastAppliedIndex,
long lastAppliedTerm,
RaftGroupConfiguration raftGroupConfig,
- List<RowId> rowIds
+ List<RowId> rowIds,
+ long leaseStartTime,
+ UUID primaryReplicaNodeId,
+ String primaryReplicaNodeName
) {
assertEquals(0, rowIds.size() % 2, "size=" + rowIds.size());
@@ -385,6 +408,8 @@ public class IncomingSnapshotCopierTest extends
BaseIgniteAbstractTest {
storage.committedGroupConfiguration(raftGroupConfigurationConverter.toBytes(raftGroupConfig));
+ storage.updateLease(leaseStartTime, primaryReplicaNodeId,
primaryReplicaNodeName);
+
return null;
});
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotCommonTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotCommonTest.java
index fea847ddf1..8dfb27f193 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotCommonTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotCommonTest.java
@@ -71,9 +71,7 @@ class OutgoingSnapshotCommonTest extends
BaseIgniteAbstractTest {
@Test
void sendsSnapshotMeta() {
when(partitionAccess.maxLastAppliedIndex()).thenReturn(100L);
-
when(partitionAccess.maxLastAppliedTerm()).thenReturn(3L);
-
when(partitionAccess.committedGroupConfiguration()).thenReturn(new
RaftGroupConfiguration(
List.of("peer1:3000", "peer2:3000"),
List.of("learner1:3000", "learner2:3000"),
@@ -83,6 +81,10 @@ class OutgoingSnapshotCommonTest extends
BaseIgniteAbstractTest {
when(catalogService.latestCatalogVersion()).thenReturn(REQUIRED_CATALOG_VERSION);
+ when(partitionAccess.leaseStartTime()).thenReturn(333L);
+ when(partitionAccess.primaryReplicaNodeId()).thenReturn(new UUID(1,
2));
+ when(partitionAccess.primaryReplicaNodeName()).thenReturn("primary");
+
snapshot.freezeScopeUnderMvLock();
SnapshotMetaResponse response = getSnapshotMetaResponse();
@@ -94,6 +96,9 @@ class OutgoingSnapshotCommonTest extends
BaseIgniteAbstractTest {
assertThat(response.meta().oldPeersList(), is(List.of("peer1:3000")));
assertThat(response.meta().oldLearnersList(),
is(List.of("learner1:3000")));
assertThat(response.meta().requiredCatalogVersion(),
is(REQUIRED_CATALOG_VERSION));
+ assertThat(response.meta().leaseStartTime(), is(333L));
+ assertThat(response.meta().primaryReplicaNodeId(), is(new UUID(1, 2)));
+ assertThat(response.meta().primaryReplicaNodeName(), is("primary"));
}
private SnapshotMetaResponse getSnapshotMetaResponse() {
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotMetaUtilsTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotMetaUtilsTest.java
index 97fc4f01ec..411c748242 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotMetaUtilsTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotMetaUtilsTest.java
@@ -67,7 +67,16 @@ class SnapshotMetaUtilsTest extends BaseIgniteAbstractTest {
UUID nextRowIdToBuild = UUID.randomUUID();
int indexId = 1;
- PartitionSnapshotMeta meta = snapshotMetaAt(100, 3, config, 42,
Map.of(indexId, nextRowIdToBuild));
+ PartitionSnapshotMeta meta = snapshotMetaAt(
+ 100,
+ 3,
+ config,
+ 42,
+ Map.of(indexId, nextRowIdToBuild),
+ 777L,
+ new UUID(1, 2),
+ "primary"
+ );
assertThat(meta.lastIncludedIndex(), is(100L));
assertThat(meta.lastIncludedTerm(), is(3L));
@@ -77,11 +86,23 @@ class SnapshotMetaUtilsTest extends BaseIgniteAbstractTest {
assertThat(meta.oldLearnersList(), is(List.of("learner1:3000")));
assertThat(meta.requiredCatalogVersion(), is(42));
assertThat(meta.nextRowIdToBuildByIndexId(), is(Map.of(indexId,
nextRowIdToBuild)));
+ assertThat(meta.leaseStartTime(), is(777L));
+ assertThat(meta.primaryReplicaNodeId(), is(new UUID(1, 2)));
+ assertThat(meta.primaryReplicaNodeName(), is("primary"));
}
@Test
void doesNotIncludeOldConfigWhenItIsNotThere() {
- PartitionSnapshotMeta meta = snapshotMetaAt(100, 3, new
RaftGroupConfiguration(List.of(), List.of(), null, null), 42, Map.of());
+ PartitionSnapshotMeta meta = snapshotMetaAt(
+ 100,
+ 3,
+ new RaftGroupConfiguration(List.of(), List.of(), null, null),
+ 42,
+ Map.of(),
+ 777L,
+ null,
+ null
+ );
assertThat(meta.oldPeersList(), is(nullValue()));
assertThat(meta.oldLearnersList(), is(nullValue()));