This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch ignite-26849
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 2357c913edbabec67e499debe60d114c57a11d45
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Thu Nov 20 08:43:05 2025 +0300

    IGNITE-26849 Revert some changes
---
 .../snapshot/PartitionSnapshotStorageFactory.java  |   8 +-
 .../internal/BaseTruncateRaftLogAbstractTest.java  | 291 ---------------------
 .../ItTruncateRaftLogAndRebalanceTest.java         | 167 ------------
 .../ItTruncateRaftLogAndRestartNodesTest.java      | 271 ++++++++++++++++++-
 4 files changed, 260 insertions(+), 477 deletions(-)

diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactory.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactory.java
index dd156b2e893..25706b2409b 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactory.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactory.java
@@ -46,8 +46,7 @@ public class PartitionSnapshotStorageFactory implements 
SnapshotStorageFactory {
         return new PartitionSnapshotStorageAdapter(snapshotStorage, uri);
     }
 
-    /** Partition snapshot storage adapter. */
-    public static class PartitionSnapshotStorageAdapter implements 
SnapshotStorage {
+    private static class PartitionSnapshotStorageAdapter implements 
SnapshotStorage {
         private final PartitionSnapshotStorage snapshotStorage;
 
         /** Flag indicating that startup snapshot has been opened. */
@@ -114,10 +113,5 @@ public class PartitionSnapshotStorageFactory implements 
SnapshotStorageFactory {
             // Option is not supported.
             return false;
         }
-
-        /** Returns partition snapshot storage. */
-        public PartitionSnapshotStorage partitionSnapshotStorage() {
-            return snapshotStorage;
-        }
     }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/BaseTruncateRaftLogAbstractTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/BaseTruncateRaftLogAbstractTest.java
deleted file mode 100644
index 4f3543e3373..00000000000
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/BaseTruncateRaftLogAbstractTest.java
+++ /dev/null
@@ -1,291 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal;
-
-import static org.apache.ignite.internal.TestWrappers.unwrapTableViewInternal;
-import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
-import static 
org.apache.ignite.internal.testframework.flow.TestFlowUtils.subscribeToList;
-import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
-import static org.apache.ignite.internal.util.CompletableFutures.allOf;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Flow.Publisher;
-import java.util.stream.IntStream;
-import org.apache.ignite.internal.app.IgniteImpl;
-import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
-import org.apache.ignite.internal.network.InternalClusterNode;
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
-import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.schema.Column;
-import org.apache.ignite.internal.schema.SchemaDescriptor;
-import org.apache.ignite.internal.storage.MvPartitionStorage;
-import org.apache.ignite.internal.table.OperationContext;
-import org.apache.ignite.internal.table.TableViewInternal;
-import org.apache.ignite.internal.table.TxContext;
-import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
-import org.apache.ignite.internal.tostring.IgniteToStringInclude;
-import org.apache.ignite.internal.tostring.S;
-import org.apache.ignite.internal.tx.InternalTransaction;
-import org.apache.ignite.raft.jraft.core.NodeImpl;
-import org.apache.ignite.tx.TransactionOptions;
-
-/** Base class for raft log truncating related integration tests, containing 
useful methods, classes, and methods. */
-class BaseTruncateRaftLogAbstractTest extends ClusterPerTestIntegrationTest {
-    static final String ZONE_NAME = "TEST_ZONE";
-
-    static final String TABLE_NAME = "TEST_TABLE";
-
-    void createZoneAndTablePerson(String zoneName, String tableName, int 
replicas, int partitions) {
-        executeSql(createZoneDdl(zoneName, replicas, partitions));
-        executeSql(createTablePersonDdl(zoneName, tableName));
-    }
-
-    void insertPeople(String tableName, Person... people) {
-        for (Person person : people) {
-            executeSql(insertPersonDml(tableName, person));
-        }
-    }
-
-    NodeImpl raftNodeImpl(int nodeIndex, ReplicationGroupId 
replicationGroupId) {
-        return raftNodeImpl(igniteImpl(nodeIndex), replicationGroupId);
-    }
-
-    static NodeImpl raftNodeImpl(IgniteImpl ignite, ReplicationGroupId 
replicationGroupId) {
-        NodeImpl[] node = {null};
-
-        ignite.raftManager().forEach((raftNodeId, raftGroupService) -> {
-            if (replicationGroupId.equals(raftNodeId.groupId())) {
-                assertNull(
-                        node[0],
-                        String.format("NodeImpl already found: [node=%s, 
replicationGroupId=%s]", ignite.name(), replicationGroupId)
-                );
-
-                node[0] = (NodeImpl) raftGroupService.getRaftNode();
-            }
-        });
-
-        NodeImpl res = node[0];
-
-        assertNotNull(res, String.format("Can't find NodeImpl: [node=%s, 
replicationGroupId=%s]", ignite.name(), replicationGroupId));
-
-        return res;
-    }
-
-    private static String createTablePersonDdl(String zoneName, String 
tableName) {
-        return String.format(
-                "create table if not exists %s (%s bigint primary key, %s 
varchar, %s bigint) zone %s",
-                tableName,
-                Person.ID_COLUMN_NAME, Person.NAME_COLUMN_NAME, 
Person.SALARY_COLUMN_NAME,
-                zoneName
-        );
-    }
-
-    private static String createZoneDdl(String zoneName, int replicas, int 
partitions) {
-        return String.format(
-                "create zone %s with replicas=%s, partitions=%s, 
storage_profiles='%s'",
-                zoneName, replicas, partitions, DEFAULT_STORAGE_PROFILE
-        );
-    }
-
-    private static String insertPersonDml(String tableName, Person person) {
-        return String.format(
-                "insert into %s(%s, %s, %s) values(%s, '%s', %s)",
-                tableName,
-                Person.ID_COLUMN_NAME, Person.NAME_COLUMN_NAME, 
Person.SALARY_COLUMN_NAME,
-                person.id, person.name, person.salary
-        );
-    }
-
-    static String selectPeopleDml(String tableName) {
-        return String.format(
-                "select %s, %s, %s from %s",
-                Person.ID_COLUMN_NAME, Person.NAME_COLUMN_NAME, 
Person.SALARY_COLUMN_NAME,
-                tableName
-        );
-    }
-
-    Person[] scanPeopleFromAllPartitions(int nodeIndex, String tableName) {
-        IgniteImpl ignite = igniteImpl(nodeIndex);
-
-        TableViewInternal tableViewInternal = 
unwrapTableViewInternal(ignite.tables().table(tableName));
-
-        InternalTableImpl table = (InternalTableImpl) 
tableViewInternal.internalTable();
-
-        InternalTransaction roTx = (InternalTransaction) 
ignite.transactions().begin(new TransactionOptions().readOnly(true));
-
-        var scanFutures = new ArrayList<CompletableFuture<List<BinaryRow>>>();
-
-        try {
-            for (int partitionId = 0; partitionId < table.partitions(); 
partitionId++) {
-                scanFutures.add(subscribeToList(scan(table, roTx, partitionId, 
ignite.node())));
-            }
-
-            assertThat(allOf(scanFutures), willCompleteSuccessfully());
-
-            SchemaDescriptor schemaDescriptor = 
tableViewInternal.schemaView().lastKnownSchema();
-
-            return scanFutures.stream()
-                    .map(CompletableFuture::join)
-                    .flatMap(Collection::stream)
-                    .map(binaryRow -> toPersonFromBinaryRow(schemaDescriptor, 
binaryRow))
-                    .toArray(Person[]::new);
-        } finally {
-            roTx.commit();
-        }
-    }
-
-    private static Publisher<BinaryRow> scan(
-            InternalTableImpl internalTableImpl,
-            InternalTransaction roTx,
-            int partitionId,
-            InternalClusterNode recipientNode
-    ) {
-        assertTrue(roTx.isReadOnly(), roTx.toString());
-
-        return internalTableImpl.scan(
-                partitionId,
-                recipientNode,
-                OperationContext.create(TxContext.readOnly(roTx))
-        );
-    }
-
-    static Person[] generatePeople(int count) {
-        assertThat(count, greaterThanOrEqualTo(0));
-
-        return IntStream.range(0, count)
-                .mapToObj(i -> new Person(i, "name-" + i, i + 1_000))
-                .toArray(Person[]::new);
-    }
-
-    static Person[] toPeopleFromSqlRows(List<List<Object>> sqlResult) {
-        return sqlResult.stream()
-                .map(BaseTruncateRaftLogAbstractTest::toPersonFromSqlRow)
-                .toArray(Person[]::new);
-    }
-
-    private static Person toPersonFromSqlRow(List<Object> sqlRow) {
-        assertThat(sqlRow, hasSize(3));
-        assertThat(sqlRow.get(0), instanceOf(Long.class));
-        assertThat(sqlRow.get(1), instanceOf(String.class));
-        assertThat(sqlRow.get(2), instanceOf(Long.class));
-
-        return new Person((Long) sqlRow.get(0), (String) sqlRow.get(1), (Long) 
sqlRow.get(2));
-    }
-
-    private static Person toPersonFromBinaryRow(SchemaDescriptor 
schemaDescriptor, BinaryRow binaryRow) {
-        var binaryTupleReader = new 
BinaryTupleReader(schemaDescriptor.length(), binaryRow.tupleSlice());
-
-        Column idColumn = findColumnByName(schemaDescriptor, 
Person.ID_COLUMN_NAME);
-        Column nameColumn = findColumnByName(schemaDescriptor, 
Person.NAME_COLUMN_NAME);
-        Column salaryColumn = findColumnByName(schemaDescriptor, 
Person.SALARY_COLUMN_NAME);
-
-        return new Person(
-                binaryTupleReader.longValue(idColumn.positionInRow()),
-                binaryTupleReader.stringValue(nameColumn.positionInRow()),
-                binaryTupleReader.longValue(salaryColumn.positionInRow())
-        );
-    }
-
-    private static Column findColumnByName(SchemaDescriptor schemaDescriptor, 
String columnName) {
-        return schemaDescriptor.columns().stream()
-                .filter(column -> columnName.equalsIgnoreCase(column.name()))
-                .findFirst()
-                .orElseThrow(() -> new AssertionError(
-                        String.format("Can't find column by name: 
[columnName=%s, schema=%s]", columnName, schemaDescriptor)
-                ));
-    }
-
-    TableViewInternal tableViewInternal(int nodeIndex, String tableName) {
-        TableViewInternal tableViewInternal = 
unwrapTableViewInternal(igniteImpl(nodeIndex).tables().table(tableName));
-
-        assertNotNull(tableViewInternal, String.format("Missing table: 
[nodeIndex=%s, tableName=%s]", nodeIndex, tableName));
-
-        return tableViewInternal;
-    }
-
-    static MvPartitionStorage mvPartitionStorage(TableViewInternal 
tableViewInternal, int partitionId) {
-        MvPartitionStorage mvPartition = 
tableViewInternal.internalTable().storage().getMvPartition(partitionId);
-
-        assertNotNull(
-                mvPartition,
-                String.format("Missing MvPartitionStorage: [tableName=%s, 
partitionId=%s]", tableViewInternal.name(), partitionId)
-        );
-
-        return mvPartition;
-    }
-
-    static class Person {
-        static final String ID_COLUMN_NAME = "ID";
-
-        static final String NAME_COLUMN_NAME = "NAME";
-
-        static final String SALARY_COLUMN_NAME = "SALARY";
-
-        @IgniteToStringInclude
-        final long id;
-
-        @IgniteToStringInclude
-        final String name;
-
-        @IgniteToStringInclude
-        final long salary;
-
-        private Person(long id, String name, long salary) {
-            this.id = id;
-            this.name = name;
-            this.salary = salary;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) {
-                return true;
-            }
-            if (o == null || getClass() != o.getClass()) {
-                return false;
-            }
-
-            Person p = (Person) o;
-
-            return id == p.id && salary == p.salary && name.equals(p.name);
-        }
-
-        @Override
-        public int hashCode() {
-            int result = (int) (id ^ (id >>> 32));
-            result = 31 * result + (int) (salary ^ (salary >>> 32));
-            result = 31 * result + name.hashCode();
-            return result;
-        }
-
-        @Override
-        public String toString() {
-            return S.toString(Person.class, this);
-        }
-    }
-}
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRebalanceTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRebalanceTest.java
deleted file mode 100644
index f6038de768f..00000000000
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRebalanceTest.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal;
-
-import static java.util.concurrent.CompletableFuture.allOf;
-import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
-import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
-import static org.apache.ignite.internal.util.CollectionUtils.first;
-import static org.awaitility.Awaitility.await;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.arrayWithSize;
-import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-
-import com.typesafe.config.parser.ConfigDocumentFactory;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.concurrent.TimeUnit;
-import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
-import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorage;
-import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorageFactory.PartitionSnapshotStorageAdapter;
-import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionTxStateAccess;
-import org.apache.ignite.internal.replicator.PartitionGroupId;
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
-import org.apache.ignite.internal.storage.MvPartitionStorage;
-import org.apache.ignite.internal.table.TableViewInternal;
-import org.apache.ignite.internal.wrapper.Wrappers;
-import org.apache.ignite.raft.jraft.core.NodeImpl;
-import org.junit.jupiter.api.Test;
-
-/** Class for testing various raft log truncation and rebalancing scenarios. */
-public class ItTruncateRaftLogAndRebalanceTest extends 
BaseTruncateRaftLogAbstractTest {
-    private static final int RAFT_SNAPSHOT_INTERVAL_SECS = 5;
-
-    @Override
-    protected int initialNodes() {
-        return 3;
-    }
-
-    @Override
-    protected String getNodeBootstrapConfigTemplate() {
-        return 
ConfigDocumentFactory.parseString(super.getNodeBootstrapConfigTemplate())
-                
.withValueText("ignite.system.properties.raftSnapshotIntervalSecs", 
Integer.toString(RAFT_SNAPSHOT_INTERVAL_SECS))
-                .render();
-    }
-
-    @Test
-    void testRestartNodeAfterAbortRebalanceAndTruncateRaftLog() throws 
Exception {
-        createZoneAndTablePerson(ZONE_NAME, TABLE_NAME, 3, 1);
-
-        ReplicationGroupId replicationGroupId = 
cluster.solePartitionId(ZONE_NAME, TABLE_NAME);
-
-        cluster.transferLeadershipTo(0, replicationGroupId);
-
-        insertPeopleAndAwaitTruncateRaftLogOnAllNodes(1_000, TABLE_NAME, 
replicationGroupId);
-
-        startAndAbortRebalance(1, TABLE_NAME, replicationGroupId);
-
-        // Let's restart the node with aborted rebalance.
-        cluster.stopNode(1);
-        cluster.startNode(1);
-
-        // Let's check that the replication was successful.
-        for (int nodeIndex = 0; nodeIndex < 3; nodeIndex++) {
-            assertThat(
-                    "nodeIndex=" + nodeIndex,
-                    scanPeopleFromAllPartitions(nodeIndex, TABLE_NAME),
-                    arrayWithSize(1_000)
-            );
-        }
-    }
-
-    private void startAndAbortRebalance(int nodeIndex, String tableName, 
ReplicationGroupId replicationGroupId) {
-        NodeImpl raftNodeImpl = raftNodeImpl(nodeIndex, replicationGroupId);
-
-        PartitionSnapshotStorageAdapter snapshotStorageAdapter = 
partitionSnapshotStorageAdapter(raftNodeImpl);
-
-        PartitionMvStorageAccess mvStorageAccess = 
partitionMvStorageAccess(snapshotStorageAdapter);
-        PartitionTxStateAccess txStateAccess = 
partitionTxStateAccess(snapshotStorageAdapter);
-
-        assertThat(runAsync(() -> allOf(mvStorageAccess.startRebalance(), 
txStateAccess.startRebalance())), willCompleteSuccessfully());
-
-        // Let's flush only MvPartitionStorage to reproduce the situation as 
with a real user.
-        flushMvPartitionStorage(nodeIndex, tableName, replicationGroupId);
-
-        assertThat(runAsync(() -> allOf(mvStorageAccess.abortRebalance(), 
txStateAccess.abortRebalance())), willCompleteSuccessfully());
-    }
-
-    private void flushMvPartitionStorage(int nodeIndex, String tableName, 
ReplicationGroupId replicationGroupId) {
-        assertThat(replicationGroupId, instanceOf(PartitionGroupId.class));
-
-        TableViewInternal tableViewInternal = tableViewInternal(nodeIndex, 
tableName);
-
-        MvPartitionStorage mvPartitionStorage = mvPartitionStorage(
-                tableViewInternal,
-                ((PartitionGroupId) replicationGroupId).partitionId()
-        );
-
-        assertThat(
-                Wrappers.unwrap(mvPartitionStorage, 
MvPartitionStorage.class).flush(true),
-                willCompleteSuccessfully()
-        );
-    }
-
-    private static PartitionSnapshotStorageAdapter 
partitionSnapshotStorageAdapter(NodeImpl raftNodeImpl) {
-        return (PartitionSnapshotStorageAdapter) 
raftNodeImpl.getServiceFactory().createSnapshotStorage(
-                "test",
-                raftNodeImpl.getRaftOptions()
-        );
-    }
-
-    private static PartitionMvStorageAccess 
partitionMvStorageAccess(PartitionSnapshotStorageAdapter 
partitionSnapshotStorageAdapter) {
-        PartitionSnapshotStorage partitionSnapshotStorage = 
partitionSnapshotStorageAdapter.partitionSnapshotStorage();
-
-        Collection<PartitionMvStorageAccess> mvStorageAccesses = 
partitionSnapshotStorage.partitionsByTableId().values();
-        assertThat(mvStorageAccesses, hasSize(1));
-
-        PartitionMvStorageAccess first = first(mvStorageAccesses);
-
-        assertNotNull(first);
-
-        return first;
-    }
-
-    private static PartitionTxStateAccess 
partitionTxStateAccess(PartitionSnapshotStorageAdapter 
partitionSnapshotStorageAdapter) {
-        return 
partitionSnapshotStorageAdapter.partitionSnapshotStorage().txState();
-    }
-
-    private void insertPeopleAndAwaitTruncateRaftLogOnAllNodes(int count, 
String tableName, ReplicationGroupId replicationGroupId) {
-        long[] beforeInsertPeopleRaftFirstLogIndexes = 
collectRaftFirstLogIndexes(replicationGroupId);
-        assertEquals(initialNodes(), 
beforeInsertPeopleRaftFirstLogIndexes.length);
-
-        insertPeople(tableName, generatePeople(count));
-
-        await().atMost(RAFT_SNAPSHOT_INTERVAL_SECS * 2, 
TimeUnit.SECONDS).until(() -> {
-            long[] raftFirstLogIndexes = 
collectRaftFirstLogIndexes(replicationGroupId);
-            assertEquals(initialNodes(), raftFirstLogIndexes.length);
-
-            return !Arrays.equals(beforeInsertPeopleRaftFirstLogIndexes, 
raftFirstLogIndexes);
-        });
-    }
-
-    private long[] collectRaftFirstLogIndexes(ReplicationGroupId 
replicationGroupId) {
-        return cluster.runningNodes()
-                .map(TestWrappers::unwrapIgniteImpl)
-                .map(igniteImpl -> raftNodeImpl(igniteImpl, 
replicationGroupId))
-                .mapToLong(raftNodeImpl -> 
raftNodeImpl.logStorage().getFirstLogIndex())
-                .toArray();
-    }
-}
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRestartNodesTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRestartNodesTest.java
index ace3e0011cd..955b83a8c36 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRestartNodesTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRestartNodesTest.java
@@ -18,30 +18,56 @@
 package org.apache.ignite.internal;
 
 import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static org.apache.ignite.internal.TestWrappers.unwrapTableViewInternal;
+import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
+import static 
org.apache.ignite.internal.testframework.flow.TestFlowUtils.subscribeToList;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.CompletableFutures.allOf;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.arrayWithSize;
 import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.lessThan;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Flow.Publisher;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
+import java.util.stream.IntStream;
 import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
 import org.apache.ignite.internal.close.ManuallyCloseable;
 import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.network.InternalClusterNode;
 import org.apache.ignite.internal.raft.storage.LogStorageFactory;
 import org.apache.ignite.internal.raft.storage.impl.IgniteJraftServiceFactory;
 import org.apache.ignite.internal.raft.util.SharedLogStorageFactoryUtils;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.table.OperationContext;
 import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.internal.table.TxContext;
+import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.tostring.IgniteToStringInclude;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
 import org.apache.ignite.raft.jraft.core.NodeImpl;
@@ -49,6 +75,7 @@ import org.apache.ignite.raft.jraft.option.LogStorageOptions;
 import org.apache.ignite.raft.jraft.option.NodeOptions;
 import org.apache.ignite.raft.jraft.option.RaftOptions;
 import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.apache.ignite.tx.TransactionOptions;
 import org.hamcrest.Matchers;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
@@ -58,7 +85,13 @@ import org.junit.jupiter.api.Test;
  * groups associated with tables.
  */
 // TODO: IGNITE-25501 Fix partition state after snapshot
-public class ItTruncateRaftLogAndRestartNodesTest extends 
BaseTruncateRaftLogAbstractTest {
+public class ItTruncateRaftLogAndRestartNodesTest extends 
ClusterPerTestIntegrationTest {
+    private static final IgniteLogger LOG = 
Loggers.forClass(ItTruncateRaftLogAndRestartNodesTest.class);
+
+    private static final String ZONE_NAME = "TEST_ZONE";
+
+    private static final String TABLE_NAME = "TEST_TABLE";
+
     @Override
     protected int initialNodes() {
         return 0;
@@ -131,6 +164,38 @@ public class ItTruncateRaftLogAndRestartNodesTest extends 
BaseTruncateRaftLogAbs
         }
     }
 
+    private void createZoneAndTablePerson(String zoneName, String tableName, 
int replicas, int partitions) {
+        executeSql(createZoneDdl(zoneName, replicas, partitions));
+        executeSql(createTablePersonDdl(zoneName, tableName));
+    }
+
+    private void insertPeople(String tableName, Person... people) {
+        for (Person person : people) {
+            executeSql(insertPersonDml(tableName, person));
+        }
+    }
+
+    private NodeImpl raftNodeImpl(int nodeIndex, ReplicationGroupId 
replicationGroupId) {
+        NodeImpl[] node = {null};
+
+        igniteImpl(nodeIndex).raftManager().forEach((raftNodeId, 
raftGroupService) -> {
+            if (replicationGroupId.equals(raftNodeId.groupId())) {
+                assertNull(
+                        node[0],
+                        String.format("NodeImpl already found: [nodeIndex=%s, 
replicationGroupId=%s]", nodeIndex, replicationGroupId)
+                );
+
+                node[0] = (NodeImpl) raftGroupService.getRaftNode();
+            }
+        });
+
+        NodeImpl res = node[0];
+
+        assertNotNull(res, String.format("Can't find NodeImpl: [nodeIndex=%s, 
replicationGroupId=%s]", nodeIndex, replicationGroupId));
+
+        return res;
+    }
+
     /**
      * Creates and prepares {@link TestLogStorageFactory} for {@link 
TestLogStorageFactory#createLogStorage} creation after the
      * corresponding node is stopped, so that there are no errors.
@@ -145,7 +210,7 @@ public class ItTruncateRaftLogAndRestartNodesTest extends 
BaseTruncateRaftLogAbs
 
         NodeImpl nodeImpl = raftNodeImpl(nodeIndex, replicationGroupId);
 
-        return new TestLogStorageFactory(logStorageFactory, 
nodeImpl.getOptions(), nodeImpl.getRaftOptions(), log);
+        return new TestLogStorageFactory(logStorageFactory, 
nodeImpl.getOptions(), nodeImpl.getRaftOptions());
     }
 
     private void awaitMajority(ReplicationGroupId replicationGroupId) {
@@ -158,9 +223,17 @@ public class ItTruncateRaftLogAndRestartNodesTest extends 
BaseTruncateRaftLogAbs
     }
 
     private void flushMvPartitionStorage(int nodeIndex, String tableName, int 
partitionId) {
-        TableViewInternal tableViewInternal = tableViewInternal(nodeIndex, 
tableName);
+        TableViewInternal tableViewInternal = 
unwrapTableViewInternal(igniteImpl(nodeIndex).tables().table(tableName));
 
-        MvPartitionStorage mvPartition = mvPartitionStorage(tableViewInternal, 
partitionId);
+        MvPartitionStorage mvPartition = 
tableViewInternal.internalTable().storage().getMvPartition(partitionId);
+
+        assertNotNull(
+                mvPartition,
+                String.format(
+                        "Missing MvPartitionStorage: [nodeIndex=%s, 
tableName=%s, partitionId=%s]",
+                        nodeIndex, tableName, partitionId
+                )
+        );
 
         assertThat(
                 IgniteTestUtils.runAsync(() -> 
mvPartition.flush(true)).thenCompose(Function.identity()),
@@ -168,7 +241,135 @@ public class ItTruncateRaftLogAndRestartNodesTest extends 
BaseTruncateRaftLogAbs
         );
     }
 
-    private void truncateRaftLogSuffixHalfOfChanges(LogStorage logStorage, 
long startRaftLogIndex) {
+    private static String selectPeopleDml(String tableName) {
+        return String.format(
+                "select %s, %s, %s from %s",
+                Person.ID_COLUMN_NAME, Person.NAME_COLUMN_NAME, 
Person.SALARY_COLUMN_NAME,
+                tableName
+        );
+    }
+
+    private static String insertPersonDml(String tableName, Person person) {
+        return String.format(
+                "insert into %s(%s, %s, %s) values(%s, '%s', %s)",
+                tableName,
+                Person.ID_COLUMN_NAME, Person.NAME_COLUMN_NAME, 
Person.SALARY_COLUMN_NAME,
+                person.id, person.name, person.salary
+        );
+    }
+
+    private static String createTablePersonDdl(String zoneName, String 
tableName) {
+        return String.format(
+                "create table if not exists %s (%s bigint primary key, %s 
varchar, %s bigint) zone %s",
+                tableName,
+                Person.ID_COLUMN_NAME, Person.NAME_COLUMN_NAME, 
Person.SALARY_COLUMN_NAME,
+                zoneName
+        );
+    }
+
+    private static String createZoneDdl(String zoneName, int replicas, int 
partitions) {
+        return String.format(
+                "create zone %s with replicas=%s, partitions=%s, 
storage_profiles='%s'",
+                zoneName, replicas, partitions, DEFAULT_STORAGE_PROFILE
+        );
+    }
+
+    private static Person[] generatePeople(int count) {
+        assertThat(count, greaterThanOrEqualTo(0));
+
+        return IntStream.range(0, count)
+                .mapToObj(i -> new Person(i, "name-" + i, i + 1_000))
+                .toArray(Person[]::new);
+    }
+
+    private static Person[] toPeopleFromSqlRows(List<List<Object>> sqlResult) {
+        return sqlResult.stream()
+                .map(ItTruncateRaftLogAndRestartNodesTest::toPersonFromSqlRow)
+                .toArray(Person[]::new);
+    }
+
+    private static Person toPersonFromSqlRow(List<Object> sqlRow) {
+        assertThat(sqlRow, hasSize(3));
+        assertThat(sqlRow.get(0), instanceOf(Long.class));
+        assertThat(sqlRow.get(1), instanceOf(String.class));
+        assertThat(sqlRow.get(2), instanceOf(Long.class));
+
+        return new Person((Long) sqlRow.get(0), (String) sqlRow.get(1), (Long) 
sqlRow.get(2));
+    }
+
+    private Person[] scanPeopleFromAllPartitions(int nodeIndex, String 
tableName) {
+        IgniteImpl ignite = igniteImpl(nodeIndex);
+
+        TableViewInternal tableViewInternal = 
unwrapTableViewInternal(ignite.tables().table(tableName));
+
+        InternalTableImpl table = (InternalTableImpl) 
tableViewInternal.internalTable();
+
+        InternalTransaction roTx = (InternalTransaction) 
ignite.transactions().begin(new TransactionOptions().readOnly(true));
+
+        var scanFutures = new ArrayList<CompletableFuture<List<BinaryRow>>>();
+
+        try {
+            for (int partitionId = 0; partitionId < table.partitions(); 
partitionId++) {
+                scanFutures.add(subscribeToList(scan(table, roTx, partitionId, 
ignite.node())));
+            }
+
+            assertThat(allOf(scanFutures), willCompleteSuccessfully());
+
+            SchemaDescriptor schemaDescriptor = 
tableViewInternal.schemaView().lastKnownSchema();
+
+            return scanFutures.stream()
+                    .map(CompletableFuture::join)
+                    .flatMap(Collection::stream)
+                    .map(binaryRow -> toPersonFromBinaryRow(schemaDescriptor, 
binaryRow))
+                    .toArray(Person[]::new);
+        } finally {
+            roTx.commit();
+        }
+    }
+
+    private static Person[] half(Person... people) {
+        return Arrays.copyOfRange(people, 0, people.length / 2);
+    }
+
+    private static Publisher<BinaryRow> scan(
+            InternalTableImpl internalTableImpl,
+            InternalTransaction roTx,
+            int partitionId,
+            InternalClusterNode recipientNode
+    ) {
+        assertTrue(roTx.isReadOnly(), roTx.toString());
+
+        return internalTableImpl.scan(
+                partitionId,
+                recipientNode,
+                OperationContext.create(TxContext.readOnly(roTx))
+        );
+    }
+
+    private static Person toPersonFromBinaryRow(SchemaDescriptor 
schemaDescriptor, BinaryRow binaryRow) {
+        var binaryTupleReader = new 
BinaryTupleReader(schemaDescriptor.length(), binaryRow.tupleSlice());
+
+        Column idColumn = findColumnByName(schemaDescriptor, 
Person.ID_COLUMN_NAME);
+        Column nameColumn = findColumnByName(schemaDescriptor, 
Person.NAME_COLUMN_NAME);
+        Column salaryColumn = findColumnByName(schemaDescriptor, 
Person.SALARY_COLUMN_NAME);
+
+        return new Person(
+                binaryTupleReader.longValue(idColumn.positionInRow()),
+                binaryTupleReader.stringValue(nameColumn.positionInRow()),
+                binaryTupleReader.longValue(salaryColumn.positionInRow())
+        );
+    }
+
+    private static Column findColumnByName(SchemaDescriptor schemaDescriptor, 
String columnName) {
+        return schemaDescriptor.columns().stream()
+                .filter(column -> columnName.equalsIgnoreCase(column.name()))
+                .findFirst()
+                .orElseThrow(() -> new AssertionError(
+                        String.format("Can't find column by name: 
[columnName=%s, schema=%s]", columnName, schemaDescriptor)
+                ));
+    }
+
+    private static void truncateRaftLogSuffixHalfOfChanges(LogStorage 
logStorage, long startRaftLogIndex) {
         long lastLogIndex = logStorage.getLastLogIndex();
         long lastIndexKept = lastLogIndex - (lastLogIndex - startRaftLogIndex) 
/ 2;
 
@@ -180,7 +381,7 @@ public class ItTruncateRaftLogAndRestartNodesTest extends 
BaseTruncateRaftLogAbs
                 )
         );
 
-        log.info(
+        LOG.info(
                 "Successfully truncated raft log suffix: 
[startRaftLogIndex={}, oldLastLogIndex={}, lastIndexKept={}, term={}]",
                 startRaftLogIndex, lastLogIndex, lastIndexKept, 
logStorage.getEntry(lastIndexKept).getId().getTerm()
         );
@@ -199,20 +400,16 @@ public class ItTruncateRaftLogAndRestartNodesTest extends 
BaseTruncateRaftLogAbs
 
         private final RaftOptions raftOptions;
 
-        private final IgniteLogger log;
-
         private final AtomicBoolean closeGuard = new AtomicBoolean();
 
         private TestLogStorageFactory(
                 LogStorageFactory logStorageFactory,
                 NodeOptions nodeOptions,
-                RaftOptions raftOptions,
-                IgniteLogger log
+                RaftOptions raftOptions
         ) {
             this.logStorageFactory = logStorageFactory;
             this.nodeOptions = nodeOptions;
             this.raftOptions = raftOptions;
-            this.log = log;
         }
 
         /**
@@ -232,7 +429,7 @@ public class ItTruncateRaftLogAndRestartNodesTest extends 
BaseTruncateRaftLogAbs
 
             storage.init(logStorageOptions);
 
-            log.info(
+            LOG.info(
                     "Successfully created LogStorage: [firstLogIndex={}, 
lastLogIndex={}, term={}]",
                     storage.getFirstLogIndex(), storage.getLastLogIndex(), 
storage.getEntry(storage.getLastLogIndex()).getId().getTerm()
             );
@@ -249,4 +446,54 @@ public class ItTruncateRaftLogAndRestartNodesTest extends 
BaseTruncateRaftLogAbs
             assertThat(logStorageFactory.stopAsync(), 
willCompleteSuccessfully());
         }
     }
+
+    private static class Person {
+        static final String ID_COLUMN_NAME = "ID";
+
+        static final String NAME_COLUMN_NAME = "NAME";
+
+        static final String SALARY_COLUMN_NAME = "SALARY";
+
+        @IgniteToStringInclude
+        final long id;
+
+        @IgniteToStringInclude
+        final String name;
+
+        @IgniteToStringInclude
+        final long salary;
+
+        private Person(long id, String name, long salary) {
+            this.id = id;
+            this.name = name;
+            this.salary = salary;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+
+            Person p = (Person) o;
+
+            return id == p.id && salary == p.salary && name.equals(p.name);
+        }
+
+        @Override
+        public int hashCode() {
+            int result = (int) (id ^ (id >>> 32));
+            result = 31 * result + (int) (salary ^ (salary >>> 32));
+            result = 31 * result + name.hashCode();
+            return result;
+        }
+
+        @Override
+        public String toString() {
+            return S.toString(Person.class, this);
+        }
+    }
 }


Reply via email to