This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch ignite-26849
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit a88b723872d0dad1b2b26f3abc1589b9a9b2054d
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Tue Nov 18 12:38:16 2025 +0300

    IGNITE-26849 wip
---
 .../snapshot/PartitionSnapshotStorageFactory.java  |  8 +-
 .../ignite/raft/jraft/option/NodeOptions.java      |  3 +-
 .../internal/BaseTruncateRaftLogAbstractTest.java  | 93 ++++++++++++++++++++-
 .../ItTruncateRaftLogAndRebalanceTest.java         | 95 +++++++++++++++++++++-
 .../ItTruncateRaftLogAndRestartNodesTest.java      | 81 +-----------------
 5 files changed, 194 insertions(+), 86 deletions(-)

diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactory.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactory.java
index 25706b2409b..dd156b2e893 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactory.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactory.java
@@ -46,7 +46,8 @@ public class PartitionSnapshotStorageFactory implements 
SnapshotStorageFactory {
         return new PartitionSnapshotStorageAdapter(snapshotStorage, uri);
     }
 
-    private static class PartitionSnapshotStorageAdapter implements 
SnapshotStorage {
+    /** Partition snapshot storage adapter. */
+    public static class PartitionSnapshotStorageAdapter implements 
SnapshotStorage {
         private final PartitionSnapshotStorage snapshotStorage;
 
         /** Flag indicating that startup snapshot has been opened. */
@@ -113,5 +114,10 @@ public class PartitionSnapshotStorageFactory implements 
SnapshotStorageFactory {
             // Option is not supported.
             return false;
         }
+
+        /** Returns partition snapshot storage. */
+        public PartitionSnapshotStorage partitionSnapshotStorage() {
+            return snapshotStorage;
+        }
     }
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
index 02070ffec56..4ef8d2594d7 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
@@ -88,7 +88,8 @@ public class NodeOptions extends RpcOptions implements 
Copiable<NodeOptions> {
     // If |snapshot_interval_s| <= 0, the time based snapshot would be 
disabled.
     //
     // Default: 3600 (1 hour)
-    private int snapshotIntervalSecs = 3600;
+    // TODO: IGNITE-26849 Вернуть один час 3600
+    private int snapshotIntervalSecs = 5;
 
     // A snapshot saving would be triggered every |snapshot_interval_s| 
seconds,
     // and at this moment when state machine's lastAppliedIndex value
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/BaseTruncateRaftLogAbstractTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/BaseTruncateRaftLogAbstractTest.java
index 0e68f89a5bf..4f3543e3373 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/BaseTruncateRaftLogAbstractTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/BaseTruncateRaftLogAbstractTest.java
@@ -17,25 +17,42 @@
 
 package org.apache.ignite.internal;
 
+import static org.apache.ignite.internal.TestWrappers.unwrapTableViewInternal;
 import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
+import static 
org.apache.ignite.internal.testframework.flow.TestFlowUtils.subscribeToList;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.CompletableFutures.allOf;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Flow.Publisher;
 import java.util.stream.IntStream;
 import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
+import org.apache.ignite.internal.network.InternalClusterNode;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.table.OperationContext;
+import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.internal.table.TxContext;
+import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
 import org.apache.ignite.internal.tostring.IgniteToStringInclude;
 import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.tx.TransactionOptions;
 
 /** Base class for raft log truncating related integration tests, containing 
useful methods, classes, and methods. */
 class BaseTruncateRaftLogAbstractTest extends ClusterPerTestIntegrationTest {
@@ -58,7 +75,7 @@ class BaseTruncateRaftLogAbstractTest extends 
ClusterPerTestIntegrationTest {
         return raftNodeImpl(igniteImpl(nodeIndex), replicationGroupId);
     }
 
-    NodeImpl raftNodeImpl(IgniteImpl ignite, ReplicationGroupId 
replicationGroupId) {
+    static NodeImpl raftNodeImpl(IgniteImpl ignite, ReplicationGroupId 
replicationGroupId) {
         NodeImpl[] node = {null};
 
         ignite.raftManager().forEach((raftNodeId, raftGroupService) -> {
@@ -104,6 +121,59 @@ class BaseTruncateRaftLogAbstractTest extends 
ClusterPerTestIntegrationTest {
         );
     }
 
+    static String selectPeopleDml(String tableName) {
+        return String.format(
+                "select %s, %s, %s from %s",
+                Person.ID_COLUMN_NAME, Person.NAME_COLUMN_NAME, 
Person.SALARY_COLUMN_NAME,
+                tableName
+        );
+    }
+
+    Person[] scanPeopleFromAllPartitions(int nodeIndex, String tableName) {
+        IgniteImpl ignite = igniteImpl(nodeIndex);
+
+        TableViewInternal tableViewInternal = 
unwrapTableViewInternal(ignite.tables().table(tableName));
+
+        InternalTableImpl table = (InternalTableImpl) 
tableViewInternal.internalTable();
+
+        InternalTransaction roTx = (InternalTransaction) 
ignite.transactions().begin(new TransactionOptions().readOnly(true));
+
+        var scanFutures = new ArrayList<CompletableFuture<List<BinaryRow>>>();
+
+        try {
+            for (int partitionId = 0; partitionId < table.partitions(); 
partitionId++) {
+                scanFutures.add(subscribeToList(scan(table, roTx, partitionId, 
ignite.node())));
+            }
+
+            assertThat(allOf(scanFutures), willCompleteSuccessfully());
+
+            SchemaDescriptor schemaDescriptor = 
tableViewInternal.schemaView().lastKnownSchema();
+
+            return scanFutures.stream()
+                    .map(CompletableFuture::join)
+                    .flatMap(Collection::stream)
+                    .map(binaryRow -> toPersonFromBinaryRow(schemaDescriptor, 
binaryRow))
+                    .toArray(Person[]::new);
+        } finally {
+            roTx.commit();
+        }
+    }
+
+    private static Publisher<BinaryRow> scan(
+            InternalTableImpl internalTableImpl,
+            InternalTransaction roTx,
+            int partitionId,
+            InternalClusterNode recipientNode
+    ) {
+        assertTrue(roTx.isReadOnly(), roTx.toString());
+
+        return internalTableImpl.scan(
+                partitionId,
+                recipientNode,
+                OperationContext.create(TxContext.readOnly(roTx))
+        );
+    }
+
     static Person[] generatePeople(int count) {
         assertThat(count, greaterThanOrEqualTo(0));
 
@@ -127,7 +197,7 @@ class BaseTruncateRaftLogAbstractTest extends 
ClusterPerTestIntegrationTest {
         return new Person((Long) sqlRow.get(0), (String) sqlRow.get(1), (Long) 
sqlRow.get(2));
     }
 
-    static Person toPersonFromBinaryRow(SchemaDescriptor schemaDescriptor, 
BinaryRow binaryRow) {
+    private static Person toPersonFromBinaryRow(SchemaDescriptor 
schemaDescriptor, BinaryRow binaryRow) {
         var binaryTupleReader = new 
BinaryTupleReader(schemaDescriptor.length(), binaryRow.tupleSlice());
 
         Column idColumn = findColumnByName(schemaDescriptor, 
Person.ID_COLUMN_NAME);
@@ -150,6 +220,25 @@ class BaseTruncateRaftLogAbstractTest extends 
ClusterPerTestIntegrationTest {
                 ));
     }
 
+    TableViewInternal tableViewInternal(int nodeIndex, String tableName) {
+        TableViewInternal tableViewInternal = 
unwrapTableViewInternal(igniteImpl(nodeIndex).tables().table(tableName));
+
+        assertNotNull(tableViewInternal, String.format("Missing table: 
[nodeIndex=%s, tableName=%s]", nodeIndex, tableName));
+
+        return tableViewInternal;
+    }
+
+    static MvPartitionStorage mvPartitionStorage(TableViewInternal 
tableViewInternal, int partitionId) {
+        MvPartitionStorage mvPartition = 
tableViewInternal.internalTable().storage().getMvPartition(partitionId);
+
+        assertNotNull(
+                mvPartition,
+                String.format("Missing MvPartitionStorage: [tableName=%s, 
partitionId=%s]", tableViewInternal.name(), partitionId)
+        );
+
+        return mvPartition;
+    }
+
     static class Person {
         static final String ID_COLUMN_NAME = "ID";
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRebalanceTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRebalanceTest.java
index a79f5f95e92..8eb6971a207 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRebalanceTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRebalanceTest.java
@@ -17,13 +17,32 @@
 
 package org.apache.ignite.internal;
 
+import static java.util.concurrent.CompletableFuture.allOf;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.CollectionUtils.first;
 import static org.awaitility.Awaitility.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.arrayWithSize;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 
 import com.typesafe.config.parser.ConfigDocumentFactory;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.concurrent.TimeUnit;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorage;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorageFactory.PartitionSnapshotStorageAdapter;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionTxStateAccess;
+import org.apache.ignite.internal.replicator.PartitionGroupId;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.internal.wrapper.Wrappers;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
 import org.junit.jupiter.api.Test;
 
 /** Class for testing various raft log truncation and rebalancing scenarios. */
@@ -50,14 +69,84 @@ public class ItTruncateRaftLogAndRebalanceTest extends 
BaseTruncateRaftLogAbstra
 
         cluster.transferLeadershipTo(0, replicationGroupId);
 
-        insertPeopleAndAwaitTruncateRaftLogOnAllNodes(replicationGroupId);
+        insertPeopleAndAwaitTruncateRaftLogOnAllNodes(1_000, TABLE_NAME, 
replicationGroupId);
+
+        // Let's restart the node with aborted rebalance.
+        startAndAbortRebalance(1, TABLE_NAME, replicationGroupId);
+
+        cluster.stopNode(1);
+        cluster.startNode(1);
+
+        // Let's check that the replication was successful.
+        for (int nodeIndex = 0; nodeIndex < 3; nodeIndex++) {
+            assertThat(
+                    "nodeIndex=" + nodeIndex,
+                    scanPeopleFromAllPartitions(nodeIndex, TABLE_NAME),
+                    arrayWithSize(1_000)
+            );
+        }
+    }
+
+    private void startAndAbortRebalance(int nodeIndex, String tableName, 
ReplicationGroupId replicationGroupId) {
+        NodeImpl raftNodeImpl = raftNodeImpl(nodeIndex, replicationGroupId);
+
+        PartitionSnapshotStorageAdapter snapshotStorageAdapter = 
partitionSnapshotStorageAdapter(raftNodeImpl);
+
+        PartitionMvStorageAccess mvStorageAccess = 
partitionMvStorageAccess(snapshotStorageAdapter);
+        PartitionTxStateAccess txStateAccess = 
partitionTxStateAccess(snapshotStorageAdapter);
+
+        assertThat(runAsync(() -> allOf(mvStorageAccess.startRebalance(), 
txStateAccess.startRebalance())), willCompleteSuccessfully());
+
+        flushMvPartitionStorage(nodeIndex, tableName, replicationGroupId);
+
+        assertThat(runAsync(() -> allOf(mvStorageAccess.abortRebalance(), 
txStateAccess.abortRebalance())), willCompleteSuccessfully());
+    }
+
+    private void flushMvPartitionStorage(int nodeIndex, String tableName, 
ReplicationGroupId replicationGroupId) {
+        assertThat(replicationGroupId, instanceOf(PartitionGroupId.class));
+
+        TableViewInternal tableViewInternal = tableViewInternal(nodeIndex, 
tableName);
+
+        MvPartitionStorage mvPartitionStorage = mvPartitionStorage(
+                tableViewInternal,
+                ((PartitionGroupId) replicationGroupId).partitionId()
+        );
+
+        assertThat(
+                Wrappers.unwrap(mvPartitionStorage, 
MvPartitionStorage.class).flush(true),
+                willCompleteSuccessfully()
+        );
+    }
+
+    private static PartitionSnapshotStorageAdapter 
partitionSnapshotStorageAdapter(NodeImpl raftNodeImpl) {
+        return (PartitionSnapshotStorageAdapter) 
raftNodeImpl.getServiceFactory().createSnapshotStorage(
+                "test",
+                raftNodeImpl.getRaftOptions()
+        );
+    }
+
+    private static PartitionMvStorageAccess 
partitionMvStorageAccess(PartitionSnapshotStorageAdapter 
partitionSnapshotStorageAdapter) {
+        PartitionSnapshotStorage partitionSnapshotStorage = 
partitionSnapshotStorageAdapter.partitionSnapshotStorage();
+
+        Collection<PartitionMvStorageAccess> mvStorageAccesses = 
partitionSnapshotStorage.partitionsByTableId().values();
+        assertThat(mvStorageAccesses, hasSize(1));
+
+        PartitionMvStorageAccess first = first(mvStorageAccesses);
+
+        assertNotNull(first);
+
+        return first;
+    }
+
+    private static PartitionTxStateAccess 
partitionTxStateAccess(PartitionSnapshotStorageAdapter 
partitionSnapshotStorageAdapter) {
+        return 
partitionSnapshotStorageAdapter.partitionSnapshotStorage().txState();
     }
 
-    private void 
insertPeopleAndAwaitTruncateRaftLogOnAllNodes(ReplicationGroupId 
replicationGroupId) {
+    private void insertPeopleAndAwaitTruncateRaftLogOnAllNodes(int count, 
String tableName, ReplicationGroupId replicationGroupId) {
         long[] beforeInsertPeopleRaftFirstLogIndexes = 
collectRaftFirstLogIndexes(replicationGroupId);
         assertEquals(initialNodes(), 
beforeInsertPeopleRaftFirstLogIndexes.length);
 
-        insertPeople(TABLE_NAME, generatePeople(1_000));
+        insertPeople(tableName, generatePeople(count));
 
         await().atMost(RAFT_SNAPSHOT_INTERVAL_SECS * 2, 
TimeUnit.SECONDS).until(() -> {
             long[] raftFirstLogIndexes = 
collectRaftFirstLogIndexes(replicationGroupId);
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRestartNodesTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRestartNodesTest.java
index b01e040e812..ace3e0011cd 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRestartNodesTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRestartNodesTest.java
@@ -18,24 +18,16 @@
 package org.apache.ignite.internal;
 
 import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
-import static org.apache.ignite.internal.TestWrappers.unwrapTableViewInternal;
-import static 
org.apache.ignite.internal.testframework.flow.TestFlowUtils.subscribeToList;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
-import static org.apache.ignite.internal.util.CompletableFutures.allOf;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.arrayWithSize;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.lessThan;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Flow.Publisher;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
@@ -43,20 +35,13 @@ import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.close.ManuallyCloseable;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.manager.ComponentContext;
-import org.apache.ignite.internal.network.InternalClusterNode;
 import org.apache.ignite.internal.raft.storage.LogStorageFactory;
 import org.apache.ignite.internal.raft.storage.impl.IgniteJraftServiceFactory;
 import org.apache.ignite.internal.raft.util.SharedLogStorageFactoryUtils;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
-import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
-import org.apache.ignite.internal.table.OperationContext;
 import org.apache.ignite.internal.table.TableViewInternal;
-import org.apache.ignite.internal.table.TxContext;
-import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
-import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
 import org.apache.ignite.raft.jraft.core.NodeImpl;
@@ -64,7 +49,6 @@ import org.apache.ignite.raft.jraft.option.LogStorageOptions;
 import org.apache.ignite.raft.jraft.option.NodeOptions;
 import org.apache.ignite.raft.jraft.option.RaftOptions;
 import org.apache.ignite.raft.jraft.storage.LogStorage;
-import org.apache.ignite.tx.TransactionOptions;
 import org.hamcrest.Matchers;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
@@ -174,17 +158,9 @@ public class ItTruncateRaftLogAndRestartNodesTest extends 
BaseTruncateRaftLogAbs
     }
 
     private void flushMvPartitionStorage(int nodeIndex, String tableName, int 
partitionId) {
-        TableViewInternal tableViewInternal = 
unwrapTableViewInternal(igniteImpl(nodeIndex).tables().table(tableName));
+        TableViewInternal tableViewInternal = tableViewInternal(nodeIndex, 
tableName);
 
-        MvPartitionStorage mvPartition = 
tableViewInternal.internalTable().storage().getMvPartition(partitionId);
-
-        assertNotNull(
-                mvPartition,
-                String.format(
-                        "Missing MvPartitionStorage: [nodeIndex=%s, 
tableName=%s, partitionId=%s]",
-                        nodeIndex, tableName, partitionId
-                )
-        );
+        MvPartitionStorage mvPartition = mvPartitionStorage(tableViewInternal, 
partitionId);
 
         assertThat(
                 IgniteTestUtils.runAsync(() -> 
mvPartition.flush(true)).thenCompose(Function.identity()),
@@ -192,59 +168,6 @@ public class ItTruncateRaftLogAndRestartNodesTest extends 
BaseTruncateRaftLogAbs
         );
     }
 
-    private static String selectPeopleDml(String tableName) {
-        return String.format(
-                "select %s, %s, %s from %s",
-                Person.ID_COLUMN_NAME, Person.NAME_COLUMN_NAME, 
Person.SALARY_COLUMN_NAME,
-                tableName
-        );
-    }
-
-    private Person[] scanPeopleFromAllPartitions(int nodeIndex, String 
tableName) {
-        IgniteImpl ignite = igniteImpl(nodeIndex);
-
-        TableViewInternal tableViewInternal = 
unwrapTableViewInternal(ignite.tables().table(tableName));
-
-        InternalTableImpl table = (InternalTableImpl) 
tableViewInternal.internalTable();
-
-        InternalTransaction roTx = (InternalTransaction) 
ignite.transactions().begin(new TransactionOptions().readOnly(true));
-
-        var scanFutures = new ArrayList<CompletableFuture<List<BinaryRow>>>();
-
-        try {
-            for (int partitionId = 0; partitionId < table.partitions(); 
partitionId++) {
-                scanFutures.add(subscribeToList(scan(table, roTx, partitionId, 
ignite.node())));
-            }
-
-            assertThat(allOf(scanFutures), willCompleteSuccessfully());
-
-            SchemaDescriptor schemaDescriptor = 
tableViewInternal.schemaView().lastKnownSchema();
-
-            return scanFutures.stream()
-                    .map(CompletableFuture::join)
-                    .flatMap(Collection::stream)
-                    .map(binaryRow -> toPersonFromBinaryRow(schemaDescriptor, 
binaryRow))
-                    .toArray(Person[]::new);
-        } finally {
-            roTx.commit();
-        }
-    }
-
-    private static Publisher<BinaryRow> scan(
-            InternalTableImpl internalTableImpl,
-            InternalTransaction roTx,
-            int partitionId,
-            InternalClusterNode recipientNode
-    ) {
-        assertTrue(roTx.isReadOnly(), roTx.toString());
-
-        return internalTableImpl.scan(
-                partitionId,
-                recipientNode,
-                OperationContext.create(TxContext.readOnly(roTx))
-        );
-    }
-
     private void truncateRaftLogSuffixHalfOfChanges(LogStorage logStorage, 
long startRaftLogIndex) {
         long lastLogIndex = logStorage.getLastLogIndex();
         long lastIndexKept = lastLogIndex - (lastLogIndex - startRaftLogIndex) 
/ 2;

Reply via email to