This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 4b3f7c7d53 IGNITE-17817 Update ItTablePersistenceTest to use Replica 
layer with new transaction protocol. Fixes #1612
4b3f7c7d53 is described below

commit 4b3f7c7d53d54d66490597f577cc18891282e85e
Author: Denis Chudov <[email protected]>
AuthorDate: Fri Feb 10 11:03:08 2023 +0200

    IGNITE-17817 Update ItTablePersistenceTest to use Replica layer with new 
transaction protocol. Fixes #1612
    
    Signed-off-by: Slava Koptilin <[email protected]>
---
 .../impl/ItMetaStorageServicePersistenceTest.java  |   4 +-
 .../service/ItAbstractListenerSnapshotTest.java    |  21 +-
 modules/table/build.gradle                         |   1 +
 .../ignite/distributed/ItTablePersistenceTest.java | 299 +++++++++++++--------
 .../replicator/PartitionReplicaListener.java       |  22 +-
 5 files changed, 218 insertions(+), 129 deletions(-)

diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
index 4e8696e262..4dd6bd51a5 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
@@ -79,7 +79,7 @@ public class ItMetaStorageServicePersistenceTest extends 
ItAbstractListenerSnaps
 
     /** {@inheritDoc} */
     @Override
-    public void afterFollowerStop(RaftGroupService service, RaftServer server) 
throws Exception {
+    public void afterFollowerStop(RaftGroupService service, RaftServer server, 
int stoppedNodeIndex) throws Exception {
         ClusterNode followerNode = getNode(server);
 
         KeyValueStorage storage = storageByName.remove(followerNode.name());
@@ -133,7 +133,7 @@ public class ItMetaStorageServicePersistenceTest extends 
ItAbstractListenerSnaps
 
     /** {@inheritDoc} */
     @Override
-    public RaftGroupListener createListener(ClusterService service, Path 
listenerPersistencePath) {
+    public RaftGroupListener createListener(ClusterService service, Path 
listenerPersistencePath, int index) {
         String nodeName = service.localConfiguration().getName();
 
         KeyValueStorage storage = storageByName.computeIfAbsent(nodeName, name 
-> {
diff --git 
a/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java
 
b/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java
index 649a9fde05..812dae0ffb 100644
--- 
a/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java
+++ 
b/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java
@@ -110,7 +110,7 @@ public abstract class ItAbstractListenerSnapshotTest<T 
extends RaftGroupListener
     public void beforeTest(TestInfo testInfo) {
         executor = new ScheduledThreadPoolExecutor(20, new 
NamedThreadFactory(Loza.CLIENT_POOL_NAME, LOG));
 
-        initialMemberConf = IntStream.rangeClosed(0, 2)
+        initialMemberConf = IntStream.range(0, nodes())
                 .mapToObj(i -> testNodeName(testInfo, PORT + i))
                 .collect(collectingAndThen(toSet(), 
PeersAndLearners::fromConsistentIds));
     }
@@ -137,6 +137,15 @@ public abstract class ItAbstractListenerSnapshotTest<T 
extends RaftGroupListener
         );
     }
 
+    /**
+     * Nodes count.
+     *
+     * @return Nodes count.
+     */
+    protected int nodes() {
+        return 3;
+    }
+
     /**
      * Test parameters for {@link #testSnapshot}.
      */
@@ -236,7 +245,7 @@ public abstract class ItAbstractListenerSnapshotTest<T 
extends RaftGroupListener
         // Create a snapshot of the raft group
         service.snapshot(service.leader()).get();
 
-        afterFollowerStop(service, toStop);
+        afterFollowerStop(service, toStop, stopIdx);
 
         // Create another raft snapshot
         service.snapshot(service.leader()).get();
@@ -280,9 +289,10 @@ public abstract class ItAbstractListenerSnapshotTest<T 
extends RaftGroupListener
      *
      * @param service Raft group service.
      * @param server Raft server that has been stopped.
+     * @param stoppedNodeIndex index of the stopped node.
      * @throws Exception If failed.
      */
-    public abstract void afterFollowerStop(RaftGroupService service, 
RaftServer server) throws Exception;
+    public abstract void afterFollowerStop(RaftGroupService service, 
RaftServer server, int stoppedNodeIndex) throws Exception;
 
     /**
      * Interacts with a raft group after the leader has captured a snapshot.
@@ -316,9 +326,10 @@ public abstract class ItAbstractListenerSnapshotTest<T 
extends RaftGroupListener
      *
      * @param service                 The cluster service.
      * @param listenerPersistencePath Path to storage persistent data.
+     * @param index                   Index of node for which the listener is 
created.
      * @return Raft group listener.
      */
-    public abstract RaftGroupListener createListener(ClusterService service, 
Path listenerPersistencePath);
+    public abstract RaftGroupListener createListener(ClusterService service, 
Path listenerPersistencePath, int index);
 
     /**
      * Returns raft group id for tests.
@@ -415,7 +426,7 @@ public abstract class ItAbstractListenerSnapshotTest<T 
extends RaftGroupListener
         server.startRaftNode(
                 new RaftNodeId(raftGroupId(), 
initialMemberConf.peer(service.topologyService().localMember().name())),
                 initialMemberConf,
-                createListener(service, listenerPersistencePath),
+                createListener(service, listenerPersistencePath, idx),
                 defaults()
         );
 
diff --git a/modules/table/build.gradle b/modules/table/build.gradle
index 54a04a315e..31ee5e07cc 100644
--- a/modules/table/build.gradle
+++ b/modules/table/build.gradle
@@ -90,6 +90,7 @@ dependencies {
     integrationTestImplementation project(':ignite-replicator')
     integrationTestImplementation project(':ignite-raft-api')
     integrationTestImplementation project(':ignite-affinity')
+    integrationTestImplementation project(':ignite-storage-rocksdb')
     integrationTestImplementation(testFixtures(project))
     integrationTestImplementation(testFixtures(project(':ignite-core')))
     integrationTestImplementation(testFixtures(project(':ignite-schema')))
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
index 1d4b33575c..edf9d6d758 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
@@ -17,24 +17,29 @@
 
 package org.apache.ignite.distributed;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener.hybridTimestamp;
+import static 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener.tablePartitionId;
+import static org.apache.ignite.internal.util.ArrayUtils.asList;
+import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
 import java.nio.ByteBuffer;
 import java.nio.file.Path;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.List;
+import java.util.LinkedList;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.BooleanSupplier;
 import java.util.function.Function;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -44,72 +49,108 @@ import 
org.apache.ignite.internal.raft.service.ItAbstractListenerSnapshotTest;
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.schema.BinaryRowConverter;
-import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.configuration.TableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.schema.row.RowAssembler;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
-import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import 
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration;
+import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
+import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
+import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
+import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
 import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest;
+import 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
+import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import 
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
-import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
+import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
 import org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorage;
+import 
org.apache.ignite.internal.tx.storage.state.test.TestTxStateTableStorage;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NetworkAddress;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 /**
  * Persistent partitions raft group snapshots tests.
  */
-@Disabled("IGNITE-16644, IGNITE-17817 MvPartitionStorage hasn't supported 
snapshots yet")
+@ExtendWith({WorkDirectoryExtension.class, ConfigurationExtension.class})
 public class ItTablePersistenceTest extends 
ItAbstractListenerSnapshotTest<PartitionListener> {
+    /** Factory to create RAFT command messages. */
+    private final TableMessagesFactory msgFactory = new TableMessagesFactory();
+
+    @InjectConfiguration("mock.tables.foo = {}")
+    private TablesConfiguration tablesCfg;
+
+    @InjectConfiguration("mock {flushDelayMillis = 0, defaultRegion {size = 
16777216, writeBufferSize = 16777216}}")
+    private RocksDbStorageEngineConfiguration engineConfig;
+
     private static final SchemaDescriptor SCHEMA = new SchemaDescriptor(
             1,
             new Column[]{new Column("key", NativeTypes.INT64, false)},
             new Column[]{new Column("value", NativeTypes.INT64, false)}
     );
 
-    private static final Function<BinaryRow, BinaryTuple> KEY_EXTRACTOR = 
BinaryRowConverter.keyExtractor(SCHEMA);
+    private static final Row FIRST_VALUE = createKeyValueRow(1, 1);
 
-    private static final Row FIRST_KEY = createKeyRow(0);
+    private static final Row SECOND_VALUE = createKeyValueRow(2, 2);
 
-    private static final Row FIRST_VALUE = createKeyValueRow(0, 0);
+    /** Paths for created partition listeners. */
+    private final Map<PartitionListener, Path> paths = new 
ConcurrentHashMap<>();
 
-    private static final Row SECOND_KEY = createKeyRow(1);
+    /** Map of node indexes to partition listeners. */
+    private final Map<Integer, PartitionListener> partListeners = new 
ConcurrentHashMap<>();
 
-    private static final Row SECOND_VALUE = createKeyValueRow(1, 1);
+    /** Map of node indexes to table storages. */
+    private final Map<Integer, MvTableStorage> mvTableStorages = new 
ConcurrentHashMap<>();
 
-    /**
-     * Paths for created partition listeners.
-     */
-    private final Map<PartitionListener, Path> paths = new 
ConcurrentHashMap<>();
+    /** Map of node indexes to partition storages. */
+    private final Map<Integer, MvPartitionStorage> mvPartitionStorages = new 
ConcurrentHashMap<>();
 
-    private final List<TxManager> managers = new ArrayList<>();
+    /** Map of node indexes to transaction managers. */
+    private final Map<Integer, TxManager> txManagers = new 
ConcurrentHashMap<>();
+
+    private ReplicaService replicaService;
 
     private final Function<String, ClusterNode> consistentIdToNode = addr
             -> new ClusterNode("node1", "node1", new NetworkAddress(addr, 
3333));
 
-    private final ReplicaService replicaService = mock(ReplicaService.class);
+    private final HybridClock hybridClock = new HybridClockImpl();
+
+    private int stoppedNodeIndex;
+
+    private InternalTable table;
+
+    private final LinkedList<AutoCloseable> closeables = new LinkedList<>();
 
     @BeforeEach
-    void initMocks() {
-        
doReturn(CompletableFuture.completedFuture(null)).when(replicaService).invoke(any(),
 any());
+    @Override
+    public void beforeTest(TestInfo testInfo) {
+        super.beforeTest(testInfo);
+
+        closeables.clear();
     }
 
     @AfterEach
@@ -117,22 +158,29 @@ public class ItTablePersistenceTest extends 
ItAbstractListenerSnapshotTest<Parti
     public void afterTest() throws Exception {
         super.afterTest();
 
-        for (TxManager txManager : managers) {
-            txManager.stop();
-        }
+        closeAll(closeables);
     }
 
     /** {@inheritDoc} */
     @Override
     public void beforeFollowerStop(RaftGroupService service, RaftServer 
server) throws Exception {
-        // TODO: https://issues.apache.org/jira/browse/IGNITE-17817 Use 
Replica layer with new transaction protocol.
-        TxManagerImpl txManager = new TxManagerImpl(replicaService, new 
HeapLockManager(), new HybridClockImpl());
+        PartitionReplicaListener partitionReplicaListener = 
mockPartitionReplicaListener(service);
 
-        managers.add(txManager);
+        replicaService = mock(ReplicaService.class);
 
-        txManager.start();
+        when(replicaService.invoke(any(), any()))
+                .thenAnswer(invocationOnMock -> 
partitionReplicaListener.invoke(invocationOnMock.getArgument(1)));
 
-        var table = new InternalTableImpl(
+        for (int i = 0; i < nodes(); i++) {
+            TxManager txManager = new TxManagerImpl(replicaService, new 
HeapLockManager(), hybridClock);
+            txManagers.put(i, txManager);
+            closeables.add(txManager::stop);
+        }
+
+        TxManager txManager = new TxManagerImpl(replicaService, new 
HeapLockManager(), hybridClock);
+        closeables.add(txManager::stop);
+
+        table = new InternalTableImpl(
                 "table",
                 UUID.randomUUID(),
                 Int2ObjectMaps.singleton(0, service),
@@ -140,89 +188,117 @@ public class ItTablePersistenceTest extends 
ItAbstractListenerSnapshotTest<Parti
                 consistentIdToNode,
                 txManager,
                 mock(MvTableStorage.class),
-                mock(TxStateTableStorage.class),
+                new TestTxStateTableStorage(),
                 replicaService,
-                mock(HybridClock.class)
+                hybridClock
         );
 
+        closeables.add(() -> table.close());
+
         table.upsert(FIRST_VALUE, null).get();
     }
 
-    /** {@inheritDoc} */
-    @Override
-    public void afterFollowerStop(RaftGroupService service, RaftServer server) 
throws Exception {
-        // TODO: https://issues.apache.org/jira/browse/IGNITE-17817 Use 
Replica layer with new transaction protocol.
-        TxManagerImpl txManager = new TxManagerImpl(replicaService, new 
HeapLockManager(), new HybridClockImpl());
-
-        managers.add(txManager);
+    private PartitionReplicaListener 
mockPartitionReplicaListener(RaftGroupService service) {
+        PartitionReplicaListener partitionReplicaListener = 
mock(PartitionReplicaListener.class);
+
+        
when(partitionReplicaListener.invoke(any())).thenAnswer(invocationOnMock -> {
+            ReplicaRequest req = invocationOnMock.getArgument(0);
+
+            if (req instanceof ReadWriteSingleRowReplicaRequest) {
+                ReadWriteSingleRowReplicaRequest req0 = 
(ReadWriteSingleRowReplicaRequest) req;
+
+                if (req0.requestType() == RequestType.RW_GET) {
+                    int storageIndex = stoppedNodeIndex == 0 ? 1 : 0;
+                    MvPartitionStorage partitionStorage = 
mvPartitionStorages.get(storageIndex);
+
+                    Map<ByteBuffer, RowId> primaryIndex = 
rowsToRowIds(partitionStorage);
+                    RowId rowId = 
primaryIndex.get(req0.binaryRow().byteBuffer());
+                    BinaryRow row = partitionStorage.read(rowId, 
HybridTimestamp.MAX_VALUE).binaryRow();
+
+                    return completedFuture(row);
+                }
+
+                // Non-null binary row if UPSERT, otherwise it's implied that 
request type is DELETE.
+                BinaryRow binaryRow = req0.requestType() == 
RequestType.RW_UPSERT ? req0.binaryRow() : null;
+
+                UpdateCommand cmd = msgFactory.updateCommand()
+                        .txId(req0.transactionId())
+                        .tablePartitionId(tablePartitionId(new 
TablePartitionId(UUID.randomUUID(), 0)))
+                        .rowUuid(new RowId(0).uuid())
+                        .rowBuffer(binaryRow == null ? null : 
binaryRow.byteBuffer())
+                        .safeTime(hybridTimestamp(hybridClock.now()))
+                        .build();
+
+                return service.run(cmd);
+            } else if (req instanceof TxFinishReplicaRequest) {
+                TxFinishReplicaRequest req0 = (TxFinishReplicaRequest) req;
+
+                FinishTxCommand cmd = msgFactory.finishTxCommand()
+                        .txId(req0.txId())
+                        .commit(req0.commit())
+                        
.commitTimestamp(hybridTimestamp(req0.commitTimestamp()))
+                        .tablePartitionIds(asList(tablePartitionId(new 
TablePartitionId(UUID.randomUUID(), 0))))
+                        .safeTime(hybridTimestamp(hybridClock.now()))
+                        .build();
+
+                return service.run(cmd)
+                        .thenCompose(ignored -> {
+                            TxCleanupCommand cleanupCmd = 
msgFactory.txCleanupCommand()
+                                    .txId(req0.txId())
+                                    .commit(req0.commit())
+                                    
.commitTimestamp(hybridTimestamp(req0.commitTimestamp()))
+                                    
.safeTime(hybridTimestamp(hybridClock.now()))
+                                    .build();
+
+                            return service.run(cleanupCmd);
+                        });
+            }
 
-        txManager.start();
+            throw new AssertionError("Unexpected request: " + req);
+        });
 
-        var table = new InternalTableImpl(
-                "table",
-                UUID.randomUUID(),
-                Int2ObjectMaps.singleton(0, service),
-                1,
-                consistentIdToNode,
-                txManager,
-                mock(MvTableStorage.class),
-                mock(TxStateTableStorage.class),
-                replicaService,
-                mock(HybridClock.class)
-        );
+        return partitionReplicaListener;
+    }
 
+    /** {@inheritDoc} */
+    @Override
+    public void afterFollowerStop(RaftGroupService service, RaftServer server, 
int stoppedNodeIndex) throws Exception {
         // Remove the first key
-        table.delete(FIRST_KEY, null).get();
+        table.delete(FIRST_VALUE, null).get();
 
         // Put deleted data again
         table.upsert(FIRST_VALUE, null).get();
 
-        txManager.stop();
+        this.stoppedNodeIndex = stoppedNodeIndex;
+
+        mvTableStorages.get(stoppedNodeIndex).stop();
+
+        paths.remove(partListeners.get(stoppedNodeIndex));
     }
 
     /** {@inheritDoc} */
     @Override
     public void afterSnapshot(RaftGroupService service) throws Exception {
-        // TODO: https://issues.apache.org/jira/browse/IGNITE-17817 Use 
Replica layer with new transaction protocol.
-        TxManager txManager = new TxManagerImpl(replicaService, new 
HeapLockManager(), new HybridClockImpl());
-
-        managers.add(txManager);
-
-        txManager.start();
-
-        var table = new InternalTableImpl(
-                "table",
-                UUID.randomUUID(),
-                Int2ObjectMaps.singleton(0, service),
-                1,
-                consistentIdToNode,
-                txManager,
-                mock(MvTableStorage.class),
-                mock(TxStateTableStorage.class),
-                replicaService,
-                mock(HybridClock.class)
-        );
-
         table.upsert(SECOND_VALUE, null).get();
 
-        assertNotNull(table.get(SECOND_KEY, null).join());
-
-        txManager.stop();
+        assertNotNull(table.get(SECOND_VALUE, null).join());
     }
 
     /** {@inheritDoc} */
     @Override
     public BooleanSupplier snapshotCheckClosure(JraftServerImpl restarted, 
boolean interactedAfterSnapshot) {
         MvPartitionStorage storage = getListener(restarted, 
raftGroupId()).getMvStorage();
-        Map<ByteBuffer, RowId> primaryIndex = rowsToRowIds(storage);
-
-        Row key = interactedAfterSnapshot ? SECOND_KEY : FIRST_KEY;
-        Row value = interactedAfterSnapshot ? SECOND_VALUE : FIRST_VALUE;
 
         return () -> {
-            RowId rowId = primaryIndex.get(key.tupleSlice());
+            Map<ByteBuffer, RowId> primaryIndex = rowsToRowIds(storage);
 
-            assertNotNull(rowId, "No rowId in storage");
+            Row value = interactedAfterSnapshot ? SECOND_VALUE : FIRST_VALUE;
+
+            RowId rowId = primaryIndex.get(value.byteBuffer());
+
+            if (rowId == null) {
+                return false;
+            }
 
             ReadResult read = storage.read(rowId, HybridTimestamp.MAX_VALUE);
 
@@ -230,7 +306,7 @@ public class ItTablePersistenceTest extends 
ItAbstractListenerSnapshotTest<Parti
                 return false;
             }
 
-            return Arrays.equals(value.bytes(), read.binaryRow().bytes());
+            return Arrays.equals(value.bytes(), 
read.binaryRow().tupleSlice().array());
         };
     }
 
@@ -240,9 +316,10 @@ public class ItTablePersistenceTest extends 
ItAbstractListenerSnapshotTest<Parti
         RowId rowId = storage.closestRowId(RowId.lowestRowId(0));
 
         while (rowId != null) {
-            BinaryTuple binaryTuple = KEY_EXTRACTOR.apply(storage.read(rowId, 
HybridTimestamp.MAX_VALUE).binaryRow());
-            if (binaryTuple != null) {
-                result.put(binaryTuple.byteBuffer(), rowId);
+            BinaryRow binaryRow = storage.read(rowId, 
HybridTimestamp.MAX_VALUE).binaryRow();
+
+            if (binaryRow != null) {
+                result.put(binaryRow.byteBuffer(), rowId);
             }
 
             RowId incremented = rowId.increment();
@@ -264,31 +341,45 @@ public class ItTablePersistenceTest extends 
ItAbstractListenerSnapshotTest<Parti
 
     /** {@inheritDoc} */
     @Override
-    // TODO: https://issues.apache.org/jira/browse/IGNITE-17817 Use Replica 
layer with new transaction protocol.
-    public RaftGroupListener createListener(ClusterService service, Path 
workDir) {
+    public RaftGroupListener createListener(ClusterService service, Path path, 
int index) {
         return paths.entrySet().stream()
-                .filter(entry -> entry.getValue().equals(workDir))
+                .filter(entry -> entry.getValue().equals(path))
                 .map(Map.Entry::getKey)
                 .findAny()
                 .orElseGet(() -> {
-                    TxManagerImpl txManager = new 
TxManagerImpl(replicaService, new HeapLockManager(), new HybridClockImpl());
+                    TableConfiguration tableCfg = 
tablesCfg.tables().get("foo");
+
+                    tableCfg.change(t -> t.changePartitions(1)).join();
+
+                    RocksDbStorageEngine storageEngine = new 
RocksDbStorageEngine(engineConfig, path);
+                    storageEngine.start();
 
-                    txManager.start(); // Init listener.
+                    closeables.add(storageEngine::stop);
 
-                    var testMpPartStorage = new TestMvPartitionStorage(0);
+                    tableCfg.dataStorage().change(ds -> 
ds.convert(storageEngine.name())).join();
 
-                    PartitionDataStorage partitionDataStorage = new 
TestPartitionDataStorage(testMpPartStorage);
+                    MvTableStorage mvTableStorage = 
storageEngine.createMvTable(tableCfg, tablesCfg);
+                    mvTableStorage.start();
+                    mvTableStorages.put(index, mvTableStorage);
+                    closeables.add(mvTableStorage::close);
+
+                    MvPartitionStorage mvPartitionStorage = 
mvTableStorage.getOrCreateMvPartition(0);
+                    mvPartitionStorages.put(index, mvPartitionStorage);
+                    closeables.add(mvPartitionStorage::close);
+
+                    PartitionDataStorage partitionDataStorage = new 
TestPartitionDataStorage(mvPartitionStorage);
 
                     StorageUpdateHandler storageUpdateHandler = new 
StorageUpdateHandler(0, partitionDataStorage, Map::of);
 
                     PartitionListener listener = new PartitionListener(
-                            new TestPartitionDataStorage(testMpPartStorage),
+                            partitionDataStorage,
                             storageUpdateHandler,
                             new TestTxStateStorage(),
                             new PendingComparableValuesTracker<>(new 
HybridTimestamp(1, 0))
                     );
 
-                    paths.put(listener, workDir);
+                    paths.put(listener, path);
+                    partListeners.put(index, listener);
 
                     return listener;
                 });
@@ -300,20 +391,6 @@ public class ItTablePersistenceTest extends 
ItAbstractListenerSnapshotTest<Parti
         return new TestReplicationGroupId("partitions");
     }
 
-    /**
-     * Creates a {@link Row} with the supplied key.
-     *
-     * @param id Key.
-     * @return Row.
-     */
-    private static Row createKeyRow(long id) {
-        RowAssembler rowBuilder = RowAssembler.keyAssembler(SCHEMA);
-
-        rowBuilder.appendLong(id);
-
-        return new Row(SCHEMA, rowBuilder.build());
-    }
-
     /**
      * Creates a {@link Row} with the supplied key and value.
      *
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 9c0954b0e8..26db7e70e1 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -129,10 +129,10 @@ import org.jetbrains.annotations.Nullable;
 /** Partition replication listener. */
 public class PartitionReplicaListener implements ReplicaListener {
     /** Factory to create RAFT command messages. */
-    private final TableMessagesFactory msgFactory = new TableMessagesFactory();
+    private static final TableMessagesFactory MSG_FACTORY = new 
TableMessagesFactory();
 
     /** Factory for creating replica command messages. */
-    private final ReplicaMessagesFactory replicaMessagesFactory = new 
ReplicaMessagesFactory();
+    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new 
ReplicaMessagesFactory();
 
     /** Tx messages factory. */
     private static final TxMessagesFactory FACTORY = new TxMessagesFactory();
@@ -532,7 +532,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @return Future.
      */
     private CompletionStage<Void> 
processReplicaSafeTimeSyncRequest(ReplicaSafeTimeSyncRequest request) {
-        return 
raftClient.run(replicaMessagesFactory.safeTimeSyncCommand().safeTime(hybridTimestamp(hybridClock.now())).build());
+        return 
raftClient.run(REPLICA_MESSAGES_FACTORY.safeTimeSyncCommand().safeTime(hybridTimestamp(hybridClock.now())).build());
     }
 
     /**
@@ -1013,7 +1013,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         HybridTimestamp currentTimestamp = hybridClock.now();
         HybridTimestamp commitTimestamp = commit ? currentTimestamp : null;
 
-        FinishTxCommandBuilder finishTxCmdBldr = msgFactory.finishTxCommand()
+        FinishTxCommandBuilder finishTxCmdBldr = MSG_FACTORY.finishTxCommand()
                 .txId(txId)
                 .commit(commit)
                 .safeTime(hybridTimestamp(currentTimestamp))
@@ -1077,7 +1077,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         return allOf(txUpdateFutures.toArray(new 
CompletableFuture<?>[txUpdateFutures.size()])).thenCompose(v -> {
             HybridTimestampMessage timestampMsg = 
hybridTimestamp(request.commitTimestamp());
 
-            TxCleanupCommand txCleanupCmd = msgFactory.txCleanupCommand()
+            TxCleanupCommand txCleanupCmd = MSG_FACTORY.txCleanupCommand()
                     .txId(request.txId())
                     .commit(request.commit())
                     .commitTimestamp(timestampMsg)
@@ -2105,8 +2105,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @param tmstmp {@link HybridTimestamp} object to convert to {@link 
HybridTimestampMessage}.
      * @return {@link HybridTimestampMessage} object obtained from {@link 
HybridTimestamp}.
      */
-    private HybridTimestampMessage hybridTimestamp(HybridTimestamp tmstmp) {
-        return tmstmp != null ? replicaMessagesFactory.hybridTimestampMessage()
+    public static HybridTimestampMessage hybridTimestamp(HybridTimestamp 
tmstmp) {
+        return tmstmp != null ? 
REPLICA_MESSAGES_FACTORY.hybridTimestampMessage()
                 .physical(tmstmp.getPhysical())
                 .logical(tmstmp.getLogical())
                 .build()
@@ -2123,7 +2123,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @return Constructed {@link UpdateCommand} object.
      */
     private UpdateCommand updateCommand(TablePartitionId tablePartId, UUID 
rowUuid, ByteBuffer rowBuf, UUID txId) {
-        UpdateCommandBuilder bldr = msgFactory.updateCommand()
+        UpdateCommandBuilder bldr = MSG_FACTORY.updateCommand()
                 .tablePartitionId(tablePartitionId(tablePartId))
                 .rowUuid(rowUuid)
                 .txId(txId)
@@ -2145,7 +2145,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @return Constructed {@link UpdateAllCommand} object.
      */
     private UpdateAllCommand updateAllCommand(TablePartitionId tablePartId, 
Map<UUID, ByteBuffer> rowsToUpdate, UUID txId) {
-        return msgFactory.updateAllCommand()
+        return MSG_FACTORY.updateAllCommand()
                 .tablePartitionId(tablePartitionId(tablePartId))
                 .rowsToUpdate(rowsToUpdate)
                 .txId(txId)
@@ -2159,8 +2159,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @param tablePartId {@link TablePartitionId} object to convert to {@link 
TablePartitionIdMessage}.
      * @return {@link TablePartitionIdMessage} object converted from argument.
      */
-    private TablePartitionIdMessage tablePartitionId(TablePartitionId 
tablePartId) {
-        return msgFactory.tablePartitionIdMessage()
+    public static TablePartitionIdMessage tablePartitionId(TablePartitionId 
tablePartId) {
+        return MSG_FACTORY.tablePartitionIdMessage()
                 .tableId(tablePartId.tableId())
                 .partitionId(tablePartId.partitionId())
                 .build();

Reply via email to