This is an automated email from the ASF dual-hosted git repository. tkalkirill pushed a commit to branch ignite-26849 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 2357c913edbabec67e499debe60d114c57a11d45 Author: Kirill Tkalenko <[email protected]> AuthorDate: Thu Nov 20 08:43:05 2025 +0300 IGNITE-26849 Revert some changes --- .../snapshot/PartitionSnapshotStorageFactory.java | 8 +- .../internal/BaseTruncateRaftLogAbstractTest.java | 291 --------------------- .../ItTruncateRaftLogAndRebalanceTest.java | 167 ------------ .../ItTruncateRaftLogAndRestartNodesTest.java | 271 ++++++++++++++++++- 4 files changed, 260 insertions(+), 477 deletions(-) diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactory.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactory.java index dd156b2e893..25706b2409b 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactory.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactory.java @@ -46,8 +46,7 @@ public class PartitionSnapshotStorageFactory implements SnapshotStorageFactory { return new PartitionSnapshotStorageAdapter(snapshotStorage, uri); } - /** Partition snapshot storage adapter. */ - public static class PartitionSnapshotStorageAdapter implements SnapshotStorage { + private static class PartitionSnapshotStorageAdapter implements SnapshotStorage { private final PartitionSnapshotStorage snapshotStorage; /** Flag indicating that startup snapshot has been opened. */ @@ -114,10 +113,5 @@ public class PartitionSnapshotStorageFactory implements SnapshotStorageFactory { // Option is not supported. return false; } - - /** Returns partition snapshot storage. */ - public PartitionSnapshotStorage partitionSnapshotStorage() { - return snapshotStorage; - } } } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/BaseTruncateRaftLogAbstractTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/BaseTruncateRaftLogAbstractTest.java deleted file mode 100644 index 4f3543e3373..00000000000 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/BaseTruncateRaftLogAbstractTest.java +++ /dev/null @@ -1,291 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal; - -import static org.apache.ignite.internal.TestWrappers.unwrapTableViewInternal; -import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE; -import static org.apache.ignite.internal.testframework.flow.TestFlowUtils.subscribeToList; -import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; -import static org.apache.ignite.internal.util.CompletableFutures.allOf; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.instanceOf; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Flow.Publisher; -import java.util.stream.IntStream; -import org.apache.ignite.internal.app.IgniteImpl; -import org.apache.ignite.internal.binarytuple.BinaryTupleReader; -import org.apache.ignite.internal.network.InternalClusterNode; -import org.apache.ignite.internal.replicator.ReplicationGroupId; -import org.apache.ignite.internal.schema.BinaryRow; -import org.apache.ignite.internal.schema.Column; -import org.apache.ignite.internal.schema.SchemaDescriptor; -import org.apache.ignite.internal.storage.MvPartitionStorage; -import org.apache.ignite.internal.table.OperationContext; -import org.apache.ignite.internal.table.TableViewInternal; -import org.apache.ignite.internal.table.TxContext; -import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl; -import org.apache.ignite.internal.tostring.IgniteToStringInclude; -import org.apache.ignite.internal.tostring.S; -import org.apache.ignite.internal.tx.InternalTransaction; -import org.apache.ignite.raft.jraft.core.NodeImpl; -import org.apache.ignite.tx.TransactionOptions; - -/** Base class for raft log truncating related integration tests, containing useful methods, classes, and methods. */ -class BaseTruncateRaftLogAbstractTest extends ClusterPerTestIntegrationTest { - static final String ZONE_NAME = "TEST_ZONE"; - - static final String TABLE_NAME = "TEST_TABLE"; - - void createZoneAndTablePerson(String zoneName, String tableName, int replicas, int partitions) { - executeSql(createZoneDdl(zoneName, replicas, partitions)); - executeSql(createTablePersonDdl(zoneName, tableName)); - } - - void insertPeople(String tableName, Person... people) { - for (Person person : people) { - executeSql(insertPersonDml(tableName, person)); - } - } - - NodeImpl raftNodeImpl(int nodeIndex, ReplicationGroupId replicationGroupId) { - return raftNodeImpl(igniteImpl(nodeIndex), replicationGroupId); - } - - static NodeImpl raftNodeImpl(IgniteImpl ignite, ReplicationGroupId replicationGroupId) { - NodeImpl[] node = {null}; - - ignite.raftManager().forEach((raftNodeId, raftGroupService) -> { - if (replicationGroupId.equals(raftNodeId.groupId())) { - assertNull( - node[0], - String.format("NodeImpl already found: [node=%s, replicationGroupId=%s]", ignite.name(), replicationGroupId) - ); - - node[0] = (NodeImpl) raftGroupService.getRaftNode(); - } - }); - - NodeImpl res = node[0]; - - assertNotNull(res, String.format("Can't find NodeImpl: [node=%s, replicationGroupId=%s]", ignite.name(), replicationGroupId)); - - return res; - } - - private static String createTablePersonDdl(String zoneName, String tableName) { - return String.format( - "create table if not exists %s (%s bigint primary key, %s varchar, %s bigint) zone %s", - tableName, - Person.ID_COLUMN_NAME, Person.NAME_COLUMN_NAME, Person.SALARY_COLUMN_NAME, - zoneName - ); - } - - private static String createZoneDdl(String zoneName, int replicas, int partitions) { - return String.format( - "create zone %s with replicas=%s, partitions=%s, storage_profiles='%s'", - zoneName, replicas, partitions, DEFAULT_STORAGE_PROFILE - ); - } - - private static String insertPersonDml(String tableName, Person person) { - return String.format( - "insert into %s(%s, %s, %s) values(%s, '%s', %s)", - tableName, - Person.ID_COLUMN_NAME, Person.NAME_COLUMN_NAME, Person.SALARY_COLUMN_NAME, - person.id, person.name, person.salary - ); - } - - static String selectPeopleDml(String tableName) { - return String.format( - "select %s, %s, %s from %s", - Person.ID_COLUMN_NAME, Person.NAME_COLUMN_NAME, Person.SALARY_COLUMN_NAME, - tableName - ); - } - - Person[] scanPeopleFromAllPartitions(int nodeIndex, String tableName) { - IgniteImpl ignite = igniteImpl(nodeIndex); - - TableViewInternal tableViewInternal = unwrapTableViewInternal(ignite.tables().table(tableName)); - - InternalTableImpl table = (InternalTableImpl) tableViewInternal.internalTable(); - - InternalTransaction roTx = (InternalTransaction) ignite.transactions().begin(new TransactionOptions().readOnly(true)); - - var scanFutures = new ArrayList<CompletableFuture<List<BinaryRow>>>(); - - try { - for (int partitionId = 0; partitionId < table.partitions(); partitionId++) { - scanFutures.add(subscribeToList(scan(table, roTx, partitionId, ignite.node()))); - } - - assertThat(allOf(scanFutures), willCompleteSuccessfully()); - - SchemaDescriptor schemaDescriptor = tableViewInternal.schemaView().lastKnownSchema(); - - return scanFutures.stream() - .map(CompletableFuture::join) - .flatMap(Collection::stream) - .map(binaryRow -> toPersonFromBinaryRow(schemaDescriptor, binaryRow)) - .toArray(Person[]::new); - } finally { - roTx.commit(); - } - } - - private static Publisher<BinaryRow> scan( - InternalTableImpl internalTableImpl, - InternalTransaction roTx, - int partitionId, - InternalClusterNode recipientNode - ) { - assertTrue(roTx.isReadOnly(), roTx.toString()); - - return internalTableImpl.scan( - partitionId, - recipientNode, - OperationContext.create(TxContext.readOnly(roTx)) - ); - } - - static Person[] generatePeople(int count) { - assertThat(count, greaterThanOrEqualTo(0)); - - return IntStream.range(0, count) - .mapToObj(i -> new Person(i, "name-" + i, i + 1_000)) - .toArray(Person[]::new); - } - - static Person[] toPeopleFromSqlRows(List<List<Object>> sqlResult) { - return sqlResult.stream() - .map(BaseTruncateRaftLogAbstractTest::toPersonFromSqlRow) - .toArray(Person[]::new); - } - - private static Person toPersonFromSqlRow(List<Object> sqlRow) { - assertThat(sqlRow, hasSize(3)); - assertThat(sqlRow.get(0), instanceOf(Long.class)); - assertThat(sqlRow.get(1), instanceOf(String.class)); - assertThat(sqlRow.get(2), instanceOf(Long.class)); - - return new Person((Long) sqlRow.get(0), (String) sqlRow.get(1), (Long) sqlRow.get(2)); - } - - private static Person toPersonFromBinaryRow(SchemaDescriptor schemaDescriptor, BinaryRow binaryRow) { - var binaryTupleReader = new BinaryTupleReader(schemaDescriptor.length(), binaryRow.tupleSlice()); - - Column idColumn = findColumnByName(schemaDescriptor, Person.ID_COLUMN_NAME); - Column nameColumn = findColumnByName(schemaDescriptor, Person.NAME_COLUMN_NAME); - Column salaryColumn = findColumnByName(schemaDescriptor, Person.SALARY_COLUMN_NAME); - - return new Person( - binaryTupleReader.longValue(idColumn.positionInRow()), - binaryTupleReader.stringValue(nameColumn.positionInRow()), - binaryTupleReader.longValue(salaryColumn.positionInRow()) - ); - } - - private static Column findColumnByName(SchemaDescriptor schemaDescriptor, String columnName) { - return schemaDescriptor.columns().stream() - .filter(column -> columnName.equalsIgnoreCase(column.name())) - .findFirst() - .orElseThrow(() -> new AssertionError( - String.format("Can't find column by name: [columnName=%s, schema=%s]", columnName, schemaDescriptor) - )); - } - - TableViewInternal tableViewInternal(int nodeIndex, String tableName) { - TableViewInternal tableViewInternal = unwrapTableViewInternal(igniteImpl(nodeIndex).tables().table(tableName)); - - assertNotNull(tableViewInternal, String.format("Missing table: [nodeIndex=%s, tableName=%s]", nodeIndex, tableName)); - - return tableViewInternal; - } - - static MvPartitionStorage mvPartitionStorage(TableViewInternal tableViewInternal, int partitionId) { - MvPartitionStorage mvPartition = tableViewInternal.internalTable().storage().getMvPartition(partitionId); - - assertNotNull( - mvPartition, - String.format("Missing MvPartitionStorage: [tableName=%s, partitionId=%s]", tableViewInternal.name(), partitionId) - ); - - return mvPartition; - } - - static class Person { - static final String ID_COLUMN_NAME = "ID"; - - static final String NAME_COLUMN_NAME = "NAME"; - - static final String SALARY_COLUMN_NAME = "SALARY"; - - @IgniteToStringInclude - final long id; - - @IgniteToStringInclude - final String name; - - @IgniteToStringInclude - final long salary; - - private Person(long id, String name, long salary) { - this.id = id; - this.name = name; - this.salary = salary; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - Person p = (Person) o; - - return id == p.id && salary == p.salary && name.equals(p.name); - } - - @Override - public int hashCode() { - int result = (int) (id ^ (id >>> 32)); - result = 31 * result + (int) (salary ^ (salary >>> 32)); - result = 31 * result + name.hashCode(); - return result; - } - - @Override - public String toString() { - return S.toString(Person.class, this); - } - } -} diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRebalanceTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRebalanceTest.java deleted file mode 100644 index f6038de768f..00000000000 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRebalanceTest.java +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal; - -import static java.util.concurrent.CompletableFuture.allOf; -import static org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync; -import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; -import static org.apache.ignite.internal.util.CollectionUtils.first; -import static org.awaitility.Awaitility.await; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.arrayWithSize; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.instanceOf; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; - -import com.typesafe.config.parser.ConfigDocumentFactory; -import java.util.Arrays; -import java.util.Collection; -import java.util.concurrent.TimeUnit; -import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess; -import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorage; -import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorageFactory.PartitionSnapshotStorageAdapter; -import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionTxStateAccess; -import org.apache.ignite.internal.replicator.PartitionGroupId; -import org.apache.ignite.internal.replicator.ReplicationGroupId; -import org.apache.ignite.internal.storage.MvPartitionStorage; -import org.apache.ignite.internal.table.TableViewInternal; -import org.apache.ignite.internal.wrapper.Wrappers; -import org.apache.ignite.raft.jraft.core.NodeImpl; -import org.junit.jupiter.api.Test; - -/** Class for testing various raft log truncation and rebalancing scenarios. */ -public class ItTruncateRaftLogAndRebalanceTest extends BaseTruncateRaftLogAbstractTest { - private static final int RAFT_SNAPSHOT_INTERVAL_SECS = 5; - - @Override - protected int initialNodes() { - return 3; - } - - @Override - protected String getNodeBootstrapConfigTemplate() { - return ConfigDocumentFactory.parseString(super.getNodeBootstrapConfigTemplate()) - .withValueText("ignite.system.properties.raftSnapshotIntervalSecs", Integer.toString(RAFT_SNAPSHOT_INTERVAL_SECS)) - .render(); - } - - @Test - void testRestartNodeAfterAbortRebalanceAndTruncateRaftLog() throws Exception { - createZoneAndTablePerson(ZONE_NAME, TABLE_NAME, 3, 1); - - ReplicationGroupId replicationGroupId = cluster.solePartitionId(ZONE_NAME, TABLE_NAME); - - cluster.transferLeadershipTo(0, replicationGroupId); - - insertPeopleAndAwaitTruncateRaftLogOnAllNodes(1_000, TABLE_NAME, replicationGroupId); - - startAndAbortRebalance(1, TABLE_NAME, replicationGroupId); - - // Let's restart the node with aborted rebalance. - cluster.stopNode(1); - cluster.startNode(1); - - // Let's check that the replication was successful. - for (int nodeIndex = 0; nodeIndex < 3; nodeIndex++) { - assertThat( - "nodeIndex=" + nodeIndex, - scanPeopleFromAllPartitions(nodeIndex, TABLE_NAME), - arrayWithSize(1_000) - ); - } - } - - private void startAndAbortRebalance(int nodeIndex, String tableName, ReplicationGroupId replicationGroupId) { - NodeImpl raftNodeImpl = raftNodeImpl(nodeIndex, replicationGroupId); - - PartitionSnapshotStorageAdapter snapshotStorageAdapter = partitionSnapshotStorageAdapter(raftNodeImpl); - - PartitionMvStorageAccess mvStorageAccess = partitionMvStorageAccess(snapshotStorageAdapter); - PartitionTxStateAccess txStateAccess = partitionTxStateAccess(snapshotStorageAdapter); - - assertThat(runAsync(() -> allOf(mvStorageAccess.startRebalance(), txStateAccess.startRebalance())), willCompleteSuccessfully()); - - // Let's flush only MvPartitionStorage to reproduce the situation as with a real user. - flushMvPartitionStorage(nodeIndex, tableName, replicationGroupId); - - assertThat(runAsync(() -> allOf(mvStorageAccess.abortRebalance(), txStateAccess.abortRebalance())), willCompleteSuccessfully()); - } - - private void flushMvPartitionStorage(int nodeIndex, String tableName, ReplicationGroupId replicationGroupId) { - assertThat(replicationGroupId, instanceOf(PartitionGroupId.class)); - - TableViewInternal tableViewInternal = tableViewInternal(nodeIndex, tableName); - - MvPartitionStorage mvPartitionStorage = mvPartitionStorage( - tableViewInternal, - ((PartitionGroupId) replicationGroupId).partitionId() - ); - - assertThat( - Wrappers.unwrap(mvPartitionStorage, MvPartitionStorage.class).flush(true), - willCompleteSuccessfully() - ); - } - - private static PartitionSnapshotStorageAdapter partitionSnapshotStorageAdapter(NodeImpl raftNodeImpl) { - return (PartitionSnapshotStorageAdapter) raftNodeImpl.getServiceFactory().createSnapshotStorage( - "test", - raftNodeImpl.getRaftOptions() - ); - } - - private static PartitionMvStorageAccess partitionMvStorageAccess(PartitionSnapshotStorageAdapter partitionSnapshotStorageAdapter) { - PartitionSnapshotStorage partitionSnapshotStorage = partitionSnapshotStorageAdapter.partitionSnapshotStorage(); - - Collection<PartitionMvStorageAccess> mvStorageAccesses = partitionSnapshotStorage.partitionsByTableId().values(); - assertThat(mvStorageAccesses, hasSize(1)); - - PartitionMvStorageAccess first = first(mvStorageAccesses); - - assertNotNull(first); - - return first; - } - - private static PartitionTxStateAccess partitionTxStateAccess(PartitionSnapshotStorageAdapter partitionSnapshotStorageAdapter) { - return partitionSnapshotStorageAdapter.partitionSnapshotStorage().txState(); - } - - private void insertPeopleAndAwaitTruncateRaftLogOnAllNodes(int count, String tableName, ReplicationGroupId replicationGroupId) { - long[] beforeInsertPeopleRaftFirstLogIndexes = collectRaftFirstLogIndexes(replicationGroupId); - assertEquals(initialNodes(), beforeInsertPeopleRaftFirstLogIndexes.length); - - insertPeople(tableName, generatePeople(count)); - - await().atMost(RAFT_SNAPSHOT_INTERVAL_SECS * 2, TimeUnit.SECONDS).until(() -> { - long[] raftFirstLogIndexes = collectRaftFirstLogIndexes(replicationGroupId); - assertEquals(initialNodes(), raftFirstLogIndexes.length); - - return !Arrays.equals(beforeInsertPeopleRaftFirstLogIndexes, raftFirstLogIndexes); - }); - } - - private long[] collectRaftFirstLogIndexes(ReplicationGroupId replicationGroupId) { - return cluster.runningNodes() - .map(TestWrappers::unwrapIgniteImpl) - .map(igniteImpl -> raftNodeImpl(igniteImpl, replicationGroupId)) - .mapToLong(raftNodeImpl -> raftNodeImpl.logStorage().getFirstLogIndex()) - .toArray(); - } -} diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRestartNodesTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRestartNodesTest.java index ace3e0011cd..955b83a8c36 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRestartNodesTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRestartNodesTest.java @@ -18,30 +18,56 @@ package org.apache.ignite.internal; import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl; +import static org.apache.ignite.internal.TestWrappers.unwrapTableViewInternal; +import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE; +import static org.apache.ignite.internal.testframework.flow.TestFlowUtils.subscribeToList; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; +import static org.apache.ignite.internal.util.CompletableFutures.allOf; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.lessThan; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Flow.Publisher; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; +import java.util.stream.IntStream; import org.apache.ignite.internal.app.IgniteImpl; +import org.apache.ignite.internal.binarytuple.BinaryTupleReader; import org.apache.ignite.internal.close.ManuallyCloseable; import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.manager.ComponentContext; +import org.apache.ignite.internal.network.InternalClusterNode; import org.apache.ignite.internal.raft.storage.LogStorageFactory; import org.apache.ignite.internal.raft.storage.impl.IgniteJraftServiceFactory; import org.apache.ignite.internal.raft.util.SharedLogStorageFactoryUtils; import org.apache.ignite.internal.replicator.ReplicationGroupId; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.schema.Column; +import org.apache.ignite.internal.schema.SchemaDescriptor; import org.apache.ignite.internal.storage.MvPartitionStorage; +import org.apache.ignite.internal.table.OperationContext; import org.apache.ignite.internal.table.TableViewInternal; +import org.apache.ignite.internal.table.TxContext; +import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl; import org.apache.ignite.internal.testframework.IgniteTestUtils; +import org.apache.ignite.internal.tostring.IgniteToStringInclude; +import org.apache.ignite.internal.tostring.S; +import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.raft.jraft.conf.ConfigurationManager; import org.apache.ignite.raft.jraft.core.NodeImpl; @@ -49,6 +75,7 @@ import org.apache.ignite.raft.jraft.option.LogStorageOptions; import org.apache.ignite.raft.jraft.option.NodeOptions; import org.apache.ignite.raft.jraft.option.RaftOptions; import org.apache.ignite.raft.jraft.storage.LogStorage; +import org.apache.ignite.tx.TransactionOptions; import org.hamcrest.Matchers; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; @@ -58,7 +85,13 @@ import org.junit.jupiter.api.Test; * groups associated with tables. */ // TODO: IGNITE-25501 Fix partition state after snapshot -public class ItTruncateRaftLogAndRestartNodesTest extends BaseTruncateRaftLogAbstractTest { +public class ItTruncateRaftLogAndRestartNodesTest extends ClusterPerTestIntegrationTest { + private static final IgniteLogger LOG = Loggers.forClass(ItTruncateRaftLogAndRestartNodesTest.class); + + private static final String ZONE_NAME = "TEST_ZONE"; + + private static final String TABLE_NAME = "TEST_TABLE"; + @Override protected int initialNodes() { return 0; @@ -131,6 +164,38 @@ public class ItTruncateRaftLogAndRestartNodesTest extends BaseTruncateRaftLogAbs } } + private void createZoneAndTablePerson(String zoneName, String tableName, int replicas, int partitions) { + executeSql(createZoneDdl(zoneName, replicas, partitions)); + executeSql(createTablePersonDdl(zoneName, tableName)); + } + + private void insertPeople(String tableName, Person... people) { + for (Person person : people) { + executeSql(insertPersonDml(tableName, person)); + } + } + + private NodeImpl raftNodeImpl(int nodeIndex, ReplicationGroupId replicationGroupId) { + NodeImpl[] node = {null}; + + igniteImpl(nodeIndex).raftManager().forEach((raftNodeId, raftGroupService) -> { + if (replicationGroupId.equals(raftNodeId.groupId())) { + assertNull( + node[0], + String.format("NodeImpl already found: [nodeIndex=%s, replicationGroupId=%s]", nodeIndex, replicationGroupId) + ); + + node[0] = (NodeImpl) raftGroupService.getRaftNode(); + } + }); + + NodeImpl res = node[0]; + + assertNotNull(res, String.format("Can't find NodeImpl: [nodeIndex=%s, replicationGroupId=%s]", nodeIndex, replicationGroupId)); + + return res; + } + /** * Creates and prepares {@link TestLogStorageFactory} for {@link TestLogStorageFactory#createLogStorage} creation after the * corresponding node is stopped, so that there are no errors. @@ -145,7 +210,7 @@ public class ItTruncateRaftLogAndRestartNodesTest extends BaseTruncateRaftLogAbs NodeImpl nodeImpl = raftNodeImpl(nodeIndex, replicationGroupId); - return new TestLogStorageFactory(logStorageFactory, nodeImpl.getOptions(), nodeImpl.getRaftOptions(), log); + return new TestLogStorageFactory(logStorageFactory, nodeImpl.getOptions(), nodeImpl.getRaftOptions()); } private void awaitMajority(ReplicationGroupId replicationGroupId) { @@ -158,9 +223,17 @@ public class ItTruncateRaftLogAndRestartNodesTest extends BaseTruncateRaftLogAbs } private void flushMvPartitionStorage(int nodeIndex, String tableName, int partitionId) { - TableViewInternal tableViewInternal = tableViewInternal(nodeIndex, tableName); + TableViewInternal tableViewInternal = unwrapTableViewInternal(igniteImpl(nodeIndex).tables().table(tableName)); - MvPartitionStorage mvPartition = mvPartitionStorage(tableViewInternal, partitionId); + MvPartitionStorage mvPartition = tableViewInternal.internalTable().storage().getMvPartition(partitionId); + + assertNotNull( + mvPartition, + String.format( + "Missing MvPartitionStorage: [nodeIndex=%s, tableName=%s, partitionId=%s]", + nodeIndex, tableName, partitionId + ) + ); assertThat( IgniteTestUtils.runAsync(() -> mvPartition.flush(true)).thenCompose(Function.identity()), @@ -168,7 +241,135 @@ public class ItTruncateRaftLogAndRestartNodesTest extends BaseTruncateRaftLogAbs ); } - private void truncateRaftLogSuffixHalfOfChanges(LogStorage logStorage, long startRaftLogIndex) { + private static String selectPeopleDml(String tableName) { + return String.format( + "select %s, %s, %s from %s", + Person.ID_COLUMN_NAME, Person.NAME_COLUMN_NAME, Person.SALARY_COLUMN_NAME, + tableName + ); + } + + private static String insertPersonDml(String tableName, Person person) { + return String.format( + "insert into %s(%s, %s, %s) values(%s, '%s', %s)", + tableName, + Person.ID_COLUMN_NAME, Person.NAME_COLUMN_NAME, Person.SALARY_COLUMN_NAME, + person.id, person.name, person.salary + ); + } + + private static String createTablePersonDdl(String zoneName, String tableName) { + return String.format( + "create table if not exists %s (%s bigint primary key, %s varchar, %s bigint) zone %s", + tableName, + Person.ID_COLUMN_NAME, Person.NAME_COLUMN_NAME, Person.SALARY_COLUMN_NAME, + zoneName + ); + } + + private static String createZoneDdl(String zoneName, int replicas, int partitions) { + return String.format( + "create zone %s with replicas=%s, partitions=%s, storage_profiles='%s'", + zoneName, replicas, partitions, DEFAULT_STORAGE_PROFILE + ); + } + + private static Person[] generatePeople(int count) { + assertThat(count, greaterThanOrEqualTo(0)); + + return IntStream.range(0, count) + .mapToObj(i -> new Person(i, "name-" + i, i + 1_000)) + .toArray(Person[]::new); + } + + private static Person[] toPeopleFromSqlRows(List<List<Object>> sqlResult) { + return sqlResult.stream() + .map(ItTruncateRaftLogAndRestartNodesTest::toPersonFromSqlRow) + .toArray(Person[]::new); + } + + private static Person toPersonFromSqlRow(List<Object> sqlRow) { + assertThat(sqlRow, hasSize(3)); + assertThat(sqlRow.get(0), instanceOf(Long.class)); + assertThat(sqlRow.get(1), instanceOf(String.class)); + assertThat(sqlRow.get(2), instanceOf(Long.class)); + + return new Person((Long) sqlRow.get(0), (String) sqlRow.get(1), (Long) sqlRow.get(2)); + } + + private Person[] scanPeopleFromAllPartitions(int nodeIndex, String tableName) { + IgniteImpl ignite = igniteImpl(nodeIndex); + + TableViewInternal tableViewInternal = unwrapTableViewInternal(ignite.tables().table(tableName)); + + InternalTableImpl table = (InternalTableImpl) tableViewInternal.internalTable(); + + InternalTransaction roTx = (InternalTransaction) ignite.transactions().begin(new TransactionOptions().readOnly(true)); + + var scanFutures = new ArrayList<CompletableFuture<List<BinaryRow>>>(); + + try { + for (int partitionId = 0; partitionId < table.partitions(); partitionId++) { + scanFutures.add(subscribeToList(scan(table, roTx, partitionId, ignite.node()))); + } + + assertThat(allOf(scanFutures), willCompleteSuccessfully()); + + SchemaDescriptor schemaDescriptor = tableViewInternal.schemaView().lastKnownSchema(); + + return scanFutures.stream() + .map(CompletableFuture::join) + .flatMap(Collection::stream) + .map(binaryRow -> toPersonFromBinaryRow(schemaDescriptor, binaryRow)) + .toArray(Person[]::new); + } finally { + roTx.commit(); + } + } + + private static Person[] half(Person... people) { + return Arrays.copyOfRange(people, 0, people.length / 2); + } + + private static Publisher<BinaryRow> scan( + InternalTableImpl internalTableImpl, + InternalTransaction roTx, + int partitionId, + InternalClusterNode recipientNode + ) { + assertTrue(roTx.isReadOnly(), roTx.toString()); + + return internalTableImpl.scan( + partitionId, + recipientNode, + OperationContext.create(TxContext.readOnly(roTx)) + ); + } + + private static Person toPersonFromBinaryRow(SchemaDescriptor schemaDescriptor, BinaryRow binaryRow) { + var binaryTupleReader = new BinaryTupleReader(schemaDescriptor.length(), binaryRow.tupleSlice()); + + Column idColumn = findColumnByName(schemaDescriptor, Person.ID_COLUMN_NAME); + Column nameColumn = findColumnByName(schemaDescriptor, Person.NAME_COLUMN_NAME); + Column salaryColumn = findColumnByName(schemaDescriptor, Person.SALARY_COLUMN_NAME); + + return new Person( + binaryTupleReader.longValue(idColumn.positionInRow()), + binaryTupleReader.stringValue(nameColumn.positionInRow()), + binaryTupleReader.longValue(salaryColumn.positionInRow()) + ); + } + + private static Column findColumnByName(SchemaDescriptor schemaDescriptor, String columnName) { + return schemaDescriptor.columns().stream() + .filter(column -> columnName.equalsIgnoreCase(column.name())) + .findFirst() + .orElseThrow(() -> new AssertionError( + String.format("Can't find column by name: [columnName=%s, schema=%s]", columnName, schemaDescriptor) + )); + } + + private static void truncateRaftLogSuffixHalfOfChanges(LogStorage logStorage, long startRaftLogIndex) { long lastLogIndex = logStorage.getLastLogIndex(); long lastIndexKept = lastLogIndex - (lastLogIndex - startRaftLogIndex) / 2; @@ -180,7 +381,7 @@ public class ItTruncateRaftLogAndRestartNodesTest extends BaseTruncateRaftLogAbs ) ); - log.info( + LOG.info( "Successfully truncated raft log suffix: [startRaftLogIndex={}, oldLastLogIndex={}, lastIndexKept={}, term={}]", startRaftLogIndex, lastLogIndex, lastIndexKept, logStorage.getEntry(lastIndexKept).getId().getTerm() ); @@ -199,20 +400,16 @@ public class ItTruncateRaftLogAndRestartNodesTest extends BaseTruncateRaftLogAbs private final RaftOptions raftOptions; - private final IgniteLogger log; - private final AtomicBoolean closeGuard = new AtomicBoolean(); private TestLogStorageFactory( LogStorageFactory logStorageFactory, NodeOptions nodeOptions, - RaftOptions raftOptions, - IgniteLogger log + RaftOptions raftOptions ) { this.logStorageFactory = logStorageFactory; this.nodeOptions = nodeOptions; this.raftOptions = raftOptions; - this.log = log; } /** @@ -232,7 +429,7 @@ public class ItTruncateRaftLogAndRestartNodesTest extends BaseTruncateRaftLogAbs storage.init(logStorageOptions); - log.info( + LOG.info( "Successfully created LogStorage: [firstLogIndex={}, lastLogIndex={}, term={}]", storage.getFirstLogIndex(), storage.getLastLogIndex(), storage.getEntry(storage.getLastLogIndex()).getId().getTerm() ); @@ -249,4 +446,54 @@ public class ItTruncateRaftLogAndRestartNodesTest extends BaseTruncateRaftLogAbs assertThat(logStorageFactory.stopAsync(), willCompleteSuccessfully()); } } + + private static class Person { + static final String ID_COLUMN_NAME = "ID"; + + static final String NAME_COLUMN_NAME = "NAME"; + + static final String SALARY_COLUMN_NAME = "SALARY"; + + @IgniteToStringInclude + final long id; + + @IgniteToStringInclude + final String name; + + @IgniteToStringInclude + final long salary; + + private Person(long id, String name, long salary) { + this.id = id; + this.name = name; + this.salary = salary; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + Person p = (Person) o; + + return id == p.id && salary == p.salary && name.equals(p.name); + } + + @Override + public int hashCode() { + int result = (int) (id ^ (id >>> 32)); + result = 31 * result + (int) (salary ^ (salary >>> 32)); + result = 31 * result + name.hashCode(); + return result; + } + + @Override + public String toString() { + return S.toString(Person.class, this); + } + } }
