This is an automated email from the ASF dual-hosted git repository. tkalkirill pushed a commit to branch ignite-26849 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit a88b723872d0dad1b2b26f3abc1589b9a9b2054d Author: Kirill Tkalenko <[email protected]> AuthorDate: Tue Nov 18 12:38:16 2025 +0300 IGNITE-26849 wip --- .../snapshot/PartitionSnapshotStorageFactory.java | 8 +- .../ignite/raft/jraft/option/NodeOptions.java | 3 +- .../internal/BaseTruncateRaftLogAbstractTest.java | 93 ++++++++++++++++++++- .../ItTruncateRaftLogAndRebalanceTest.java | 95 +++++++++++++++++++++- .../ItTruncateRaftLogAndRestartNodesTest.java | 81 +----------------- 5 files changed, 194 insertions(+), 86 deletions(-) diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactory.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactory.java index 25706b2409b..dd156b2e893 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactory.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactory.java @@ -46,7 +46,8 @@ public class PartitionSnapshotStorageFactory implements SnapshotStorageFactory { return new PartitionSnapshotStorageAdapter(snapshotStorage, uri); } - private static class PartitionSnapshotStorageAdapter implements SnapshotStorage { + /** Partition snapshot storage adapter. */ + public static class PartitionSnapshotStorageAdapter implements SnapshotStorage { private final PartitionSnapshotStorage snapshotStorage; /** Flag indicating that startup snapshot has been opened. */ @@ -113,5 +114,10 @@ public class PartitionSnapshotStorageFactory implements SnapshotStorageFactory { // Option is not supported. return false; } + + /** Returns partition snapshot storage. */ + public PartitionSnapshotStorage partitionSnapshotStorage() { + return snapshotStorage; + } } } diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java index 02070ffec56..4ef8d2594d7 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java @@ -88,7 +88,8 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> { // If |snapshot_interval_s| <= 0, the time based snapshot would be disabled. // // Default: 3600 (1 hour) - private int snapshotIntervalSecs = 3600; + // TODO: IGNITE-26849 Вернуть один час 3600 + private int snapshotIntervalSecs = 5; // A snapshot saving would be triggered every |snapshot_interval_s| seconds, // and at this moment when state machine's lastAppliedIndex value diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/BaseTruncateRaftLogAbstractTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/BaseTruncateRaftLogAbstractTest.java index 0e68f89a5bf..4f3543e3373 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/BaseTruncateRaftLogAbstractTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/BaseTruncateRaftLogAbstractTest.java @@ -17,25 +17,42 @@ package org.apache.ignite.internal; +import static org.apache.ignite.internal.TestWrappers.unwrapTableViewInternal; import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE; +import static org.apache.ignite.internal.testframework.flow.TestFlowUtils.subscribeToList; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; +import static org.apache.ignite.internal.util.CompletableFutures.allOf; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import java.util.ArrayList; +import java.util.Collection; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Flow.Publisher; import java.util.stream.IntStream; import org.apache.ignite.internal.app.IgniteImpl; import org.apache.ignite.internal.binarytuple.BinaryTupleReader; +import org.apache.ignite.internal.network.InternalClusterNode; import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.Column; import org.apache.ignite.internal.schema.SchemaDescriptor; +import org.apache.ignite.internal.storage.MvPartitionStorage; +import org.apache.ignite.internal.table.OperationContext; +import org.apache.ignite.internal.table.TableViewInternal; +import org.apache.ignite.internal.table.TxContext; +import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl; import org.apache.ignite.internal.tostring.IgniteToStringInclude; import org.apache.ignite.internal.tostring.S; +import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.raft.jraft.core.NodeImpl; +import org.apache.ignite.tx.TransactionOptions; /** Base class for raft log truncating related integration tests, containing useful methods, classes, and methods. */ class BaseTruncateRaftLogAbstractTest extends ClusterPerTestIntegrationTest { @@ -58,7 +75,7 @@ class BaseTruncateRaftLogAbstractTest extends ClusterPerTestIntegrationTest { return raftNodeImpl(igniteImpl(nodeIndex), replicationGroupId); } - NodeImpl raftNodeImpl(IgniteImpl ignite, ReplicationGroupId replicationGroupId) { + static NodeImpl raftNodeImpl(IgniteImpl ignite, ReplicationGroupId replicationGroupId) { NodeImpl[] node = {null}; ignite.raftManager().forEach((raftNodeId, raftGroupService) -> { @@ -104,6 +121,59 @@ class BaseTruncateRaftLogAbstractTest extends ClusterPerTestIntegrationTest { ); } + static String selectPeopleDml(String tableName) { + return String.format( + "select %s, %s, %s from %s", + Person.ID_COLUMN_NAME, Person.NAME_COLUMN_NAME, Person.SALARY_COLUMN_NAME, + tableName + ); + } + + Person[] scanPeopleFromAllPartitions(int nodeIndex, String tableName) { + IgniteImpl ignite = igniteImpl(nodeIndex); + + TableViewInternal tableViewInternal = unwrapTableViewInternal(ignite.tables().table(tableName)); + + InternalTableImpl table = (InternalTableImpl) tableViewInternal.internalTable(); + + InternalTransaction roTx = (InternalTransaction) ignite.transactions().begin(new TransactionOptions().readOnly(true)); + + var scanFutures = new ArrayList<CompletableFuture<List<BinaryRow>>>(); + + try { + for (int partitionId = 0; partitionId < table.partitions(); partitionId++) { + scanFutures.add(subscribeToList(scan(table, roTx, partitionId, ignite.node()))); + } + + assertThat(allOf(scanFutures), willCompleteSuccessfully()); + + SchemaDescriptor schemaDescriptor = tableViewInternal.schemaView().lastKnownSchema(); + + return scanFutures.stream() + .map(CompletableFuture::join) + .flatMap(Collection::stream) + .map(binaryRow -> toPersonFromBinaryRow(schemaDescriptor, binaryRow)) + .toArray(Person[]::new); + } finally { + roTx.commit(); + } + } + + private static Publisher<BinaryRow> scan( + InternalTableImpl internalTableImpl, + InternalTransaction roTx, + int partitionId, + InternalClusterNode recipientNode + ) { + assertTrue(roTx.isReadOnly(), roTx.toString()); + + return internalTableImpl.scan( + partitionId, + recipientNode, + OperationContext.create(TxContext.readOnly(roTx)) + ); + } + static Person[] generatePeople(int count) { assertThat(count, greaterThanOrEqualTo(0)); @@ -127,7 +197,7 @@ class BaseTruncateRaftLogAbstractTest extends ClusterPerTestIntegrationTest { return new Person((Long) sqlRow.get(0), (String) sqlRow.get(1), (Long) sqlRow.get(2)); } - static Person toPersonFromBinaryRow(SchemaDescriptor schemaDescriptor, BinaryRow binaryRow) { + private static Person toPersonFromBinaryRow(SchemaDescriptor schemaDescriptor, BinaryRow binaryRow) { var binaryTupleReader = new BinaryTupleReader(schemaDescriptor.length(), binaryRow.tupleSlice()); Column idColumn = findColumnByName(schemaDescriptor, Person.ID_COLUMN_NAME); @@ -150,6 +220,25 @@ class BaseTruncateRaftLogAbstractTest extends ClusterPerTestIntegrationTest { )); } + TableViewInternal tableViewInternal(int nodeIndex, String tableName) { + TableViewInternal tableViewInternal = unwrapTableViewInternal(igniteImpl(nodeIndex).tables().table(tableName)); + + assertNotNull(tableViewInternal, String.format("Missing table: [nodeIndex=%s, tableName=%s]", nodeIndex, tableName)); + + return tableViewInternal; + } + + static MvPartitionStorage mvPartitionStorage(TableViewInternal tableViewInternal, int partitionId) { + MvPartitionStorage mvPartition = tableViewInternal.internalTable().storage().getMvPartition(partitionId); + + assertNotNull( + mvPartition, + String.format("Missing MvPartitionStorage: [tableName=%s, partitionId=%s]", tableViewInternal.name(), partitionId) + ); + + return mvPartition; + } + static class Person { static final String ID_COLUMN_NAME = "ID"; diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRebalanceTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRebalanceTest.java index a79f5f95e92..8eb6971a207 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRebalanceTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRebalanceTest.java @@ -17,13 +17,32 @@ package org.apache.ignite.internal; +import static java.util.concurrent.CompletableFuture.allOf; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; +import static org.apache.ignite.internal.util.CollectionUtils.first; import static org.awaitility.Awaitility.await; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.arrayWithSize; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import com.typesafe.config.parser.ConfigDocumentFactory; import java.util.Arrays; +import java.util.Collection; import java.util.concurrent.TimeUnit; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorage; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorageFactory.PartitionSnapshotStorageAdapter; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionTxStateAccess; +import org.apache.ignite.internal.replicator.PartitionGroupId; import org.apache.ignite.internal.replicator.ReplicationGroupId; +import org.apache.ignite.internal.storage.MvPartitionStorage; +import org.apache.ignite.internal.table.TableViewInternal; +import org.apache.ignite.internal.wrapper.Wrappers; +import org.apache.ignite.raft.jraft.core.NodeImpl; import org.junit.jupiter.api.Test; /** Class for testing various raft log truncation and rebalancing scenarios. */ @@ -50,14 +69,84 @@ public class ItTruncateRaftLogAndRebalanceTest extends BaseTruncateRaftLogAbstra cluster.transferLeadershipTo(0, replicationGroupId); - insertPeopleAndAwaitTruncateRaftLogOnAllNodes(replicationGroupId); + insertPeopleAndAwaitTruncateRaftLogOnAllNodes(1_000, TABLE_NAME, replicationGroupId); + + // Let's restart the node with aborted rebalance. + startAndAbortRebalance(1, TABLE_NAME, replicationGroupId); + + cluster.stopNode(1); + cluster.startNode(1); + + // Let's check that the replication was successful. + for (int nodeIndex = 0; nodeIndex < 3; nodeIndex++) { + assertThat( + "nodeIndex=" + nodeIndex, + scanPeopleFromAllPartitions(nodeIndex, TABLE_NAME), + arrayWithSize(1_000) + ); + } + } + + private void startAndAbortRebalance(int nodeIndex, String tableName, ReplicationGroupId replicationGroupId) { + NodeImpl raftNodeImpl = raftNodeImpl(nodeIndex, replicationGroupId); + + PartitionSnapshotStorageAdapter snapshotStorageAdapter = partitionSnapshotStorageAdapter(raftNodeImpl); + + PartitionMvStorageAccess mvStorageAccess = partitionMvStorageAccess(snapshotStorageAdapter); + PartitionTxStateAccess txStateAccess = partitionTxStateAccess(snapshotStorageAdapter); + + assertThat(runAsync(() -> allOf(mvStorageAccess.startRebalance(), txStateAccess.startRebalance())), willCompleteSuccessfully()); + + flushMvPartitionStorage(nodeIndex, tableName, replicationGroupId); + + assertThat(runAsync(() -> allOf(mvStorageAccess.abortRebalance(), txStateAccess.abortRebalance())), willCompleteSuccessfully()); + } + + private void flushMvPartitionStorage(int nodeIndex, String tableName, ReplicationGroupId replicationGroupId) { + assertThat(replicationGroupId, instanceOf(PartitionGroupId.class)); + + TableViewInternal tableViewInternal = tableViewInternal(nodeIndex, tableName); + + MvPartitionStorage mvPartitionStorage = mvPartitionStorage( + tableViewInternal, + ((PartitionGroupId) replicationGroupId).partitionId() + ); + + assertThat( + Wrappers.unwrap(mvPartitionStorage, MvPartitionStorage.class).flush(true), + willCompleteSuccessfully() + ); + } + + private static PartitionSnapshotStorageAdapter partitionSnapshotStorageAdapter(NodeImpl raftNodeImpl) { + return (PartitionSnapshotStorageAdapter) raftNodeImpl.getServiceFactory().createSnapshotStorage( + "test", + raftNodeImpl.getRaftOptions() + ); + } + + private static PartitionMvStorageAccess partitionMvStorageAccess(PartitionSnapshotStorageAdapter partitionSnapshotStorageAdapter) { + PartitionSnapshotStorage partitionSnapshotStorage = partitionSnapshotStorageAdapter.partitionSnapshotStorage(); + + Collection<PartitionMvStorageAccess> mvStorageAccesses = partitionSnapshotStorage.partitionsByTableId().values(); + assertThat(mvStorageAccesses, hasSize(1)); + + PartitionMvStorageAccess first = first(mvStorageAccesses); + + assertNotNull(first); + + return first; + } + + private static PartitionTxStateAccess partitionTxStateAccess(PartitionSnapshotStorageAdapter partitionSnapshotStorageAdapter) { + return partitionSnapshotStorageAdapter.partitionSnapshotStorage().txState(); } - private void insertPeopleAndAwaitTruncateRaftLogOnAllNodes(ReplicationGroupId replicationGroupId) { + private void insertPeopleAndAwaitTruncateRaftLogOnAllNodes(int count, String tableName, ReplicationGroupId replicationGroupId) { long[] beforeInsertPeopleRaftFirstLogIndexes = collectRaftFirstLogIndexes(replicationGroupId); assertEquals(initialNodes(), beforeInsertPeopleRaftFirstLogIndexes.length); - insertPeople(TABLE_NAME, generatePeople(1_000)); + insertPeople(tableName, generatePeople(count)); await().atMost(RAFT_SNAPSHOT_INTERVAL_SECS * 2, TimeUnit.SECONDS).until(() -> { long[] raftFirstLogIndexes = collectRaftFirstLogIndexes(replicationGroupId); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRestartNodesTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRestartNodesTest.java index b01e040e812..ace3e0011cd 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRestartNodesTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRestartNodesTest.java @@ -18,24 +18,16 @@ package org.apache.ignite.internal; import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl; -import static org.apache.ignite.internal.TestWrappers.unwrapTableViewInternal; -import static org.apache.ignite.internal.testframework.flow.TestFlowUtils.subscribeToList; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; -import static org.apache.ignite.internal.util.CompletableFutures.allOf; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.lessThan; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Flow.Publisher; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; @@ -43,20 +35,13 @@ import org.apache.ignite.internal.app.IgniteImpl; import org.apache.ignite.internal.close.ManuallyCloseable; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.manager.ComponentContext; -import org.apache.ignite.internal.network.InternalClusterNode; import org.apache.ignite.internal.raft.storage.LogStorageFactory; import org.apache.ignite.internal.raft.storage.impl.IgniteJraftServiceFactory; import org.apache.ignite.internal.raft.util.SharedLogStorageFactoryUtils; import org.apache.ignite.internal.replicator.ReplicationGroupId; -import org.apache.ignite.internal.schema.BinaryRow; -import org.apache.ignite.internal.schema.SchemaDescriptor; import org.apache.ignite.internal.storage.MvPartitionStorage; -import org.apache.ignite.internal.table.OperationContext; import org.apache.ignite.internal.table.TableViewInternal; -import org.apache.ignite.internal.table.TxContext; -import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl; import org.apache.ignite.internal.testframework.IgniteTestUtils; -import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.raft.jraft.conf.ConfigurationManager; import org.apache.ignite.raft.jraft.core.NodeImpl; @@ -64,7 +49,6 @@ import org.apache.ignite.raft.jraft.option.LogStorageOptions; import org.apache.ignite.raft.jraft.option.NodeOptions; import org.apache.ignite.raft.jraft.option.RaftOptions; import org.apache.ignite.raft.jraft.storage.LogStorage; -import org.apache.ignite.tx.TransactionOptions; import org.hamcrest.Matchers; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; @@ -174,17 +158,9 @@ public class ItTruncateRaftLogAndRestartNodesTest extends BaseTruncateRaftLogAbs } private void flushMvPartitionStorage(int nodeIndex, String tableName, int partitionId) { - TableViewInternal tableViewInternal = unwrapTableViewInternal(igniteImpl(nodeIndex).tables().table(tableName)); + TableViewInternal tableViewInternal = tableViewInternal(nodeIndex, tableName); - MvPartitionStorage mvPartition = tableViewInternal.internalTable().storage().getMvPartition(partitionId); - - assertNotNull( - mvPartition, - String.format( - "Missing MvPartitionStorage: [nodeIndex=%s, tableName=%s, partitionId=%s]", - nodeIndex, tableName, partitionId - ) - ); + MvPartitionStorage mvPartition = mvPartitionStorage(tableViewInternal, partitionId); assertThat( IgniteTestUtils.runAsync(() -> mvPartition.flush(true)).thenCompose(Function.identity()), @@ -192,59 +168,6 @@ public class ItTruncateRaftLogAndRestartNodesTest extends BaseTruncateRaftLogAbs ); } - private static String selectPeopleDml(String tableName) { - return String.format( - "select %s, %s, %s from %s", - Person.ID_COLUMN_NAME, Person.NAME_COLUMN_NAME, Person.SALARY_COLUMN_NAME, - tableName - ); - } - - private Person[] scanPeopleFromAllPartitions(int nodeIndex, String tableName) { - IgniteImpl ignite = igniteImpl(nodeIndex); - - TableViewInternal tableViewInternal = unwrapTableViewInternal(ignite.tables().table(tableName)); - - InternalTableImpl table = (InternalTableImpl) tableViewInternal.internalTable(); - - InternalTransaction roTx = (InternalTransaction) ignite.transactions().begin(new TransactionOptions().readOnly(true)); - - var scanFutures = new ArrayList<CompletableFuture<List<BinaryRow>>>(); - - try { - for (int partitionId = 0; partitionId < table.partitions(); partitionId++) { - scanFutures.add(subscribeToList(scan(table, roTx, partitionId, ignite.node()))); - } - - assertThat(allOf(scanFutures), willCompleteSuccessfully()); - - SchemaDescriptor schemaDescriptor = tableViewInternal.schemaView().lastKnownSchema(); - - return scanFutures.stream() - .map(CompletableFuture::join) - .flatMap(Collection::stream) - .map(binaryRow -> toPersonFromBinaryRow(schemaDescriptor, binaryRow)) - .toArray(Person[]::new); - } finally { - roTx.commit(); - } - } - - private static Publisher<BinaryRow> scan( - InternalTableImpl internalTableImpl, - InternalTransaction roTx, - int partitionId, - InternalClusterNode recipientNode - ) { - assertTrue(roTx.isReadOnly(), roTx.toString()); - - return internalTableImpl.scan( - partitionId, - recipientNode, - OperationContext.create(TxContext.readOnly(roTx)) - ); - } - private void truncateRaftLogSuffixHalfOfChanges(LogStorage logStorage, long startRaftLogIndex) { long lastLogIndex = logStorage.getLastLogIndex(); long lastIndexKept = lastLogIndex - (lastLogIndex - startRaftLogIndex) / 2;
