This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 4b3f7c7d53 IGNITE-17817 Update ItTablePersistenceTest to use Replica
layer with new transaction protocol. Fixes #1612
4b3f7c7d53 is described below
commit 4b3f7c7d53d54d66490597f577cc18891282e85e
Author: Denis Chudov <[email protected]>
AuthorDate: Fri Feb 10 11:03:08 2023 +0200
IGNITE-17817 Update ItTablePersistenceTest to use Replica layer with new
transaction protocol. Fixes #1612
Signed-off-by: Slava Koptilin <[email protected]>
---
.../impl/ItMetaStorageServicePersistenceTest.java | 4 +-
.../service/ItAbstractListenerSnapshotTest.java | 21 +-
modules/table/build.gradle | 1 +
.../ignite/distributed/ItTablePersistenceTest.java | 299 +++++++++++++--------
.../replicator/PartitionReplicaListener.java | 22 +-
5 files changed, 218 insertions(+), 129 deletions(-)
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
index 4e8696e262..4dd6bd51a5 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
@@ -79,7 +79,7 @@ public class ItMetaStorageServicePersistenceTest extends
ItAbstractListenerSnaps
/** {@inheritDoc} */
@Override
- public void afterFollowerStop(RaftGroupService service, RaftServer server)
throws Exception {
+ public void afterFollowerStop(RaftGroupService service, RaftServer server,
int stoppedNodeIndex) throws Exception {
ClusterNode followerNode = getNode(server);
KeyValueStorage storage = storageByName.remove(followerNode.name());
@@ -133,7 +133,7 @@ public class ItMetaStorageServicePersistenceTest extends
ItAbstractListenerSnaps
/** {@inheritDoc} */
@Override
- public RaftGroupListener createListener(ClusterService service, Path
listenerPersistencePath) {
+ public RaftGroupListener createListener(ClusterService service, Path
listenerPersistencePath, int index) {
String nodeName = service.localConfiguration().getName();
KeyValueStorage storage = storageByName.computeIfAbsent(nodeName, name
-> {
diff --git
a/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java
b/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java
index 649a9fde05..812dae0ffb 100644
---
a/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java
+++
b/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java
@@ -110,7 +110,7 @@ public abstract class ItAbstractListenerSnapshotTest<T
extends RaftGroupListener
public void beforeTest(TestInfo testInfo) {
executor = new ScheduledThreadPoolExecutor(20, new
NamedThreadFactory(Loza.CLIENT_POOL_NAME, LOG));
- initialMemberConf = IntStream.rangeClosed(0, 2)
+ initialMemberConf = IntStream.range(0, nodes())
.mapToObj(i -> testNodeName(testInfo, PORT + i))
.collect(collectingAndThen(toSet(),
PeersAndLearners::fromConsistentIds));
}
@@ -137,6 +137,15 @@ public abstract class ItAbstractListenerSnapshotTest<T
extends RaftGroupListener
);
}
+ /**
+ * Nodes count.
+ *
+ * @return Nodes count.
+ */
+ protected int nodes() {
+ return 3;
+ }
+
/**
* Test parameters for {@link #testSnapshot}.
*/
@@ -236,7 +245,7 @@ public abstract class ItAbstractListenerSnapshotTest<T
extends RaftGroupListener
// Create a snapshot of the raft group
service.snapshot(service.leader()).get();
- afterFollowerStop(service, toStop);
+ afterFollowerStop(service, toStop, stopIdx);
// Create another raft snapshot
service.snapshot(service.leader()).get();
@@ -280,9 +289,10 @@ public abstract class ItAbstractListenerSnapshotTest<T
extends RaftGroupListener
*
* @param service Raft group service.
* @param server Raft server that has been stopped.
+ * @param stoppedNodeIndex index of the stopped node.
* @throws Exception If failed.
*/
- public abstract void afterFollowerStop(RaftGroupService service,
RaftServer server) throws Exception;
+ public abstract void afterFollowerStop(RaftGroupService service,
RaftServer server, int stoppedNodeIndex) throws Exception;
/**
* Interacts with a raft group after the leader has captured a snapshot.
@@ -316,9 +326,10 @@ public abstract class ItAbstractListenerSnapshotTest<T
extends RaftGroupListener
*
* @param service The cluster service.
* @param listenerPersistencePath Path to storage persistent data.
+ * @param index Index of node for which the listener is
created.
* @return Raft group listener.
*/
- public abstract RaftGroupListener createListener(ClusterService service,
Path listenerPersistencePath);
+ public abstract RaftGroupListener createListener(ClusterService service,
Path listenerPersistencePath, int index);
/**
* Returns raft group id for tests.
@@ -415,7 +426,7 @@ public abstract class ItAbstractListenerSnapshotTest<T
extends RaftGroupListener
server.startRaftNode(
new RaftNodeId(raftGroupId(),
initialMemberConf.peer(service.topologyService().localMember().name())),
initialMemberConf,
- createListener(service, listenerPersistencePath),
+ createListener(service, listenerPersistencePath, idx),
defaults()
);
diff --git a/modules/table/build.gradle b/modules/table/build.gradle
index 54a04a315e..31ee5e07cc 100644
--- a/modules/table/build.gradle
+++ b/modules/table/build.gradle
@@ -90,6 +90,7 @@ dependencies {
integrationTestImplementation project(':ignite-replicator')
integrationTestImplementation project(':ignite-raft-api')
integrationTestImplementation project(':ignite-affinity')
+ integrationTestImplementation project(':ignite-storage-rocksdb')
integrationTestImplementation(testFixtures(project))
integrationTestImplementation(testFixtures(project(':ignite-core')))
integrationTestImplementation(testFixtures(project(':ignite-schema')))
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
index 1d4b33575c..edf9d6d758 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
@@ -17,24 +17,29 @@
package org.apache.ignite.distributed;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener.hybridTimestamp;
+import static
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener.tablePartitionId;
+import static org.apache.ignite.internal.util.ArrayUtils.asList;
+import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
import java.nio.ByteBuffer;
import java.nio.file.Path;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
-import java.util.List;
+import java.util.LinkedList;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
+import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -44,72 +49,108 @@ import
org.apache.ignite.internal.raft.service.ItAbstractListenerSnapshotTest;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.schema.BinaryRowConverter;
-import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.NativeTypes;
import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.configuration.TableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.schema.row.RowAssembler;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
-import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration;
+import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
+import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
+import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
+import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest;
+import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
+import
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
-import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
+import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
import org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorage;
+import
org.apache.ignite.internal.tx.storage.state.test.TestTxStateTableStorage;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NetworkAddress;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
/**
* Persistent partitions raft group snapshots tests.
*/
-@Disabled("IGNITE-16644, IGNITE-17817 MvPartitionStorage hasn't supported
snapshots yet")
+@ExtendWith({WorkDirectoryExtension.class, ConfigurationExtension.class})
public class ItTablePersistenceTest extends
ItAbstractListenerSnapshotTest<PartitionListener> {
+ /** Factory to create RAFT command messages. */
+ private final TableMessagesFactory msgFactory = new TableMessagesFactory();
+
+ @InjectConfiguration("mock.tables.foo = {}")
+ private TablesConfiguration tablesCfg;
+
+ @InjectConfiguration("mock {flushDelayMillis = 0, defaultRegion {size =
16777216, writeBufferSize = 16777216}}")
+ private RocksDbStorageEngineConfiguration engineConfig;
+
private static final SchemaDescriptor SCHEMA = new SchemaDescriptor(
1,
new Column[]{new Column("key", NativeTypes.INT64, false)},
new Column[]{new Column("value", NativeTypes.INT64, false)}
);
- private static final Function<BinaryRow, BinaryTuple> KEY_EXTRACTOR =
BinaryRowConverter.keyExtractor(SCHEMA);
+ private static final Row FIRST_VALUE = createKeyValueRow(1, 1);
- private static final Row FIRST_KEY = createKeyRow(0);
+ private static final Row SECOND_VALUE = createKeyValueRow(2, 2);
- private static final Row FIRST_VALUE = createKeyValueRow(0, 0);
+ /** Paths for created partition listeners. */
+ private final Map<PartitionListener, Path> paths = new
ConcurrentHashMap<>();
- private static final Row SECOND_KEY = createKeyRow(1);
+ /** Map of node indexes to partition listeners. */
+ private final Map<Integer, PartitionListener> partListeners = new
ConcurrentHashMap<>();
- private static final Row SECOND_VALUE = createKeyValueRow(1, 1);
+ /** Map of node indexes to table storages. */
+ private final Map<Integer, MvTableStorage> mvTableStorages = new
ConcurrentHashMap<>();
- /**
- * Paths for created partition listeners.
- */
- private final Map<PartitionListener, Path> paths = new
ConcurrentHashMap<>();
+ /** Map of node indexes to partition storages. */
+ private final Map<Integer, MvPartitionStorage> mvPartitionStorages = new
ConcurrentHashMap<>();
- private final List<TxManager> managers = new ArrayList<>();
+ /** Map of node indexes to transaction managers. */
+ private final Map<Integer, TxManager> txManagers = new
ConcurrentHashMap<>();
+
+ private ReplicaService replicaService;
private final Function<String, ClusterNode> consistentIdToNode = addr
-> new ClusterNode("node1", "node1", new NetworkAddress(addr,
3333));
- private final ReplicaService replicaService = mock(ReplicaService.class);
+ private final HybridClock hybridClock = new HybridClockImpl();
+
+ private int stoppedNodeIndex;
+
+ private InternalTable table;
+
+ private final LinkedList<AutoCloseable> closeables = new LinkedList<>();
@BeforeEach
- void initMocks() {
-
doReturn(CompletableFuture.completedFuture(null)).when(replicaService).invoke(any(),
any());
+ @Override
+ public void beforeTest(TestInfo testInfo) {
+ super.beforeTest(testInfo);
+
+ closeables.clear();
}
@AfterEach
@@ -117,22 +158,29 @@ public class ItTablePersistenceTest extends
ItAbstractListenerSnapshotTest<Parti
public void afterTest() throws Exception {
super.afterTest();
- for (TxManager txManager : managers) {
- txManager.stop();
- }
+ closeAll(closeables);
}
/** {@inheritDoc} */
@Override
public void beforeFollowerStop(RaftGroupService service, RaftServer
server) throws Exception {
- // TODO: https://issues.apache.org/jira/browse/IGNITE-17817 Use
Replica layer with new transaction protocol.
- TxManagerImpl txManager = new TxManagerImpl(replicaService, new
HeapLockManager(), new HybridClockImpl());
+ PartitionReplicaListener partitionReplicaListener =
mockPartitionReplicaListener(service);
- managers.add(txManager);
+ replicaService = mock(ReplicaService.class);
- txManager.start();
+ when(replicaService.invoke(any(), any()))
+ .thenAnswer(invocationOnMock ->
partitionReplicaListener.invoke(invocationOnMock.getArgument(1)));
- var table = new InternalTableImpl(
+ for (int i = 0; i < nodes(); i++) {
+ TxManager txManager = new TxManagerImpl(replicaService, new
HeapLockManager(), hybridClock);
+ txManagers.put(i, txManager);
+ closeables.add(txManager::stop);
+ }
+
+ TxManager txManager = new TxManagerImpl(replicaService, new
HeapLockManager(), hybridClock);
+ closeables.add(txManager::stop);
+
+ table = new InternalTableImpl(
"table",
UUID.randomUUID(),
Int2ObjectMaps.singleton(0, service),
@@ -140,89 +188,117 @@ public class ItTablePersistenceTest extends
ItAbstractListenerSnapshotTest<Parti
consistentIdToNode,
txManager,
mock(MvTableStorage.class),
- mock(TxStateTableStorage.class),
+ new TestTxStateTableStorage(),
replicaService,
- mock(HybridClock.class)
+ hybridClock
);
+ closeables.add(() -> table.close());
+
table.upsert(FIRST_VALUE, null).get();
}
- /** {@inheritDoc} */
- @Override
- public void afterFollowerStop(RaftGroupService service, RaftServer server)
throws Exception {
- // TODO: https://issues.apache.org/jira/browse/IGNITE-17817 Use
Replica layer with new transaction protocol.
- TxManagerImpl txManager = new TxManagerImpl(replicaService, new
HeapLockManager(), new HybridClockImpl());
-
- managers.add(txManager);
+ private PartitionReplicaListener
mockPartitionReplicaListener(RaftGroupService service) {
+ PartitionReplicaListener partitionReplicaListener =
mock(PartitionReplicaListener.class);
+
+
when(partitionReplicaListener.invoke(any())).thenAnswer(invocationOnMock -> {
+ ReplicaRequest req = invocationOnMock.getArgument(0);
+
+ if (req instanceof ReadWriteSingleRowReplicaRequest) {
+ ReadWriteSingleRowReplicaRequest req0 =
(ReadWriteSingleRowReplicaRequest) req;
+
+ if (req0.requestType() == RequestType.RW_GET) {
+ int storageIndex = stoppedNodeIndex == 0 ? 1 : 0;
+ MvPartitionStorage partitionStorage =
mvPartitionStorages.get(storageIndex);
+
+ Map<ByteBuffer, RowId> primaryIndex =
rowsToRowIds(partitionStorage);
+ RowId rowId =
primaryIndex.get(req0.binaryRow().byteBuffer());
+ BinaryRow row = partitionStorage.read(rowId,
HybridTimestamp.MAX_VALUE).binaryRow();
+
+ return completedFuture(row);
+ }
+
+ // Non-null binary row if UPSERT, otherwise it's implied that
request type is DELETE.
+ BinaryRow binaryRow = req0.requestType() ==
RequestType.RW_UPSERT ? req0.binaryRow() : null;
+
+ UpdateCommand cmd = msgFactory.updateCommand()
+ .txId(req0.transactionId())
+ .tablePartitionId(tablePartitionId(new
TablePartitionId(UUID.randomUUID(), 0)))
+ .rowUuid(new RowId(0).uuid())
+ .rowBuffer(binaryRow == null ? null :
binaryRow.byteBuffer())
+ .safeTime(hybridTimestamp(hybridClock.now()))
+ .build();
+
+ return service.run(cmd);
+ } else if (req instanceof TxFinishReplicaRequest) {
+ TxFinishReplicaRequest req0 = (TxFinishReplicaRequest) req;
+
+ FinishTxCommand cmd = msgFactory.finishTxCommand()
+ .txId(req0.txId())
+ .commit(req0.commit())
+
.commitTimestamp(hybridTimestamp(req0.commitTimestamp()))
+ .tablePartitionIds(asList(tablePartitionId(new
TablePartitionId(UUID.randomUUID(), 0))))
+ .safeTime(hybridTimestamp(hybridClock.now()))
+ .build();
+
+ return service.run(cmd)
+ .thenCompose(ignored -> {
+ TxCleanupCommand cleanupCmd =
msgFactory.txCleanupCommand()
+ .txId(req0.txId())
+ .commit(req0.commit())
+
.commitTimestamp(hybridTimestamp(req0.commitTimestamp()))
+
.safeTime(hybridTimestamp(hybridClock.now()))
+ .build();
+
+ return service.run(cleanupCmd);
+ });
+ }
- txManager.start();
+ throw new AssertionError("Unexpected request: " + req);
+ });
- var table = new InternalTableImpl(
- "table",
- UUID.randomUUID(),
- Int2ObjectMaps.singleton(0, service),
- 1,
- consistentIdToNode,
- txManager,
- mock(MvTableStorage.class),
- mock(TxStateTableStorage.class),
- replicaService,
- mock(HybridClock.class)
- );
+ return partitionReplicaListener;
+ }
+ /** {@inheritDoc} */
+ @Override
+ public void afterFollowerStop(RaftGroupService service, RaftServer server,
int stoppedNodeIndex) throws Exception {
// Remove the first key
- table.delete(FIRST_KEY, null).get();
+ table.delete(FIRST_VALUE, null).get();
// Put deleted data again
table.upsert(FIRST_VALUE, null).get();
- txManager.stop();
+ this.stoppedNodeIndex = stoppedNodeIndex;
+
+ mvTableStorages.get(stoppedNodeIndex).stop();
+
+ paths.remove(partListeners.get(stoppedNodeIndex));
}
/** {@inheritDoc} */
@Override
public void afterSnapshot(RaftGroupService service) throws Exception {
- // TODO: https://issues.apache.org/jira/browse/IGNITE-17817 Use
Replica layer with new transaction protocol.
- TxManager txManager = new TxManagerImpl(replicaService, new
HeapLockManager(), new HybridClockImpl());
-
- managers.add(txManager);
-
- txManager.start();
-
- var table = new InternalTableImpl(
- "table",
- UUID.randomUUID(),
- Int2ObjectMaps.singleton(0, service),
- 1,
- consistentIdToNode,
- txManager,
- mock(MvTableStorage.class),
- mock(TxStateTableStorage.class),
- replicaService,
- mock(HybridClock.class)
- );
-
table.upsert(SECOND_VALUE, null).get();
- assertNotNull(table.get(SECOND_KEY, null).join());
-
- txManager.stop();
+ assertNotNull(table.get(SECOND_VALUE, null).join());
}
/** {@inheritDoc} */
@Override
public BooleanSupplier snapshotCheckClosure(JraftServerImpl restarted,
boolean interactedAfterSnapshot) {
MvPartitionStorage storage = getListener(restarted,
raftGroupId()).getMvStorage();
- Map<ByteBuffer, RowId> primaryIndex = rowsToRowIds(storage);
-
- Row key = interactedAfterSnapshot ? SECOND_KEY : FIRST_KEY;
- Row value = interactedAfterSnapshot ? SECOND_VALUE : FIRST_VALUE;
return () -> {
- RowId rowId = primaryIndex.get(key.tupleSlice());
+ Map<ByteBuffer, RowId> primaryIndex = rowsToRowIds(storage);
- assertNotNull(rowId, "No rowId in storage");
+ Row value = interactedAfterSnapshot ? SECOND_VALUE : FIRST_VALUE;
+
+ RowId rowId = primaryIndex.get(value.byteBuffer());
+
+ if (rowId == null) {
+ return false;
+ }
ReadResult read = storage.read(rowId, HybridTimestamp.MAX_VALUE);
@@ -230,7 +306,7 @@ public class ItTablePersistenceTest extends
ItAbstractListenerSnapshotTest<Parti
return false;
}
- return Arrays.equals(value.bytes(), read.binaryRow().bytes());
+ return Arrays.equals(value.bytes(),
read.binaryRow().tupleSlice().array());
};
}
@@ -240,9 +316,10 @@ public class ItTablePersistenceTest extends
ItAbstractListenerSnapshotTest<Parti
RowId rowId = storage.closestRowId(RowId.lowestRowId(0));
while (rowId != null) {
- BinaryTuple binaryTuple = KEY_EXTRACTOR.apply(storage.read(rowId,
HybridTimestamp.MAX_VALUE).binaryRow());
- if (binaryTuple != null) {
- result.put(binaryTuple.byteBuffer(), rowId);
+ BinaryRow binaryRow = storage.read(rowId,
HybridTimestamp.MAX_VALUE).binaryRow();
+
+ if (binaryRow != null) {
+ result.put(binaryRow.byteBuffer(), rowId);
}
RowId incremented = rowId.increment();
@@ -264,31 +341,45 @@ public class ItTablePersistenceTest extends
ItAbstractListenerSnapshotTest<Parti
/** {@inheritDoc} */
@Override
- // TODO: https://issues.apache.org/jira/browse/IGNITE-17817 Use Replica
layer with new transaction protocol.
- public RaftGroupListener createListener(ClusterService service, Path
workDir) {
+ public RaftGroupListener createListener(ClusterService service, Path path,
int index) {
return paths.entrySet().stream()
- .filter(entry -> entry.getValue().equals(workDir))
+ .filter(entry -> entry.getValue().equals(path))
.map(Map.Entry::getKey)
.findAny()
.orElseGet(() -> {
- TxManagerImpl txManager = new
TxManagerImpl(replicaService, new HeapLockManager(), new HybridClockImpl());
+ TableConfiguration tableCfg =
tablesCfg.tables().get("foo");
+
+ tableCfg.change(t -> t.changePartitions(1)).join();
+
+ RocksDbStorageEngine storageEngine = new
RocksDbStorageEngine(engineConfig, path);
+ storageEngine.start();
- txManager.start(); // Init listener.
+ closeables.add(storageEngine::stop);
- var testMpPartStorage = new TestMvPartitionStorage(0);
+ tableCfg.dataStorage().change(ds ->
ds.convert(storageEngine.name())).join();
- PartitionDataStorage partitionDataStorage = new
TestPartitionDataStorage(testMpPartStorage);
+ MvTableStorage mvTableStorage =
storageEngine.createMvTable(tableCfg, tablesCfg);
+ mvTableStorage.start();
+ mvTableStorages.put(index, mvTableStorage);
+ closeables.add(mvTableStorage::close);
+
+ MvPartitionStorage mvPartitionStorage =
mvTableStorage.getOrCreateMvPartition(0);
+ mvPartitionStorages.put(index, mvPartitionStorage);
+ closeables.add(mvPartitionStorage::close);
+
+ PartitionDataStorage partitionDataStorage = new
TestPartitionDataStorage(mvPartitionStorage);
StorageUpdateHandler storageUpdateHandler = new
StorageUpdateHandler(0, partitionDataStorage, Map::of);
PartitionListener listener = new PartitionListener(
- new TestPartitionDataStorage(testMpPartStorage),
+ partitionDataStorage,
storageUpdateHandler,
new TestTxStateStorage(),
new PendingComparableValuesTracker<>(new
HybridTimestamp(1, 0))
);
- paths.put(listener, workDir);
+ paths.put(listener, path);
+ partListeners.put(index, listener);
return listener;
});
@@ -300,20 +391,6 @@ public class ItTablePersistenceTest extends
ItAbstractListenerSnapshotTest<Parti
return new TestReplicationGroupId("partitions");
}
- /**
- * Creates a {@link Row} with the supplied key.
- *
- * @param id Key.
- * @return Row.
- */
- private static Row createKeyRow(long id) {
- RowAssembler rowBuilder = RowAssembler.keyAssembler(SCHEMA);
-
- rowBuilder.appendLong(id);
-
- return new Row(SCHEMA, rowBuilder.build());
- }
-
/**
* Creates a {@link Row} with the supplied key and value.
*
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 9c0954b0e8..26db7e70e1 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -129,10 +129,10 @@ import org.jetbrains.annotations.Nullable;
/** Partition replication listener. */
public class PartitionReplicaListener implements ReplicaListener {
/** Factory to create RAFT command messages. */
- private final TableMessagesFactory msgFactory = new TableMessagesFactory();
+ private static final TableMessagesFactory MSG_FACTORY = new
TableMessagesFactory();
/** Factory for creating replica command messages. */
- private final ReplicaMessagesFactory replicaMessagesFactory = new
ReplicaMessagesFactory();
+ private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new
ReplicaMessagesFactory();
/** Tx messages factory. */
private static final TxMessagesFactory FACTORY = new TxMessagesFactory();
@@ -532,7 +532,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
* @return Future.
*/
private CompletionStage<Void>
processReplicaSafeTimeSyncRequest(ReplicaSafeTimeSyncRequest request) {
- return
raftClient.run(replicaMessagesFactory.safeTimeSyncCommand().safeTime(hybridTimestamp(hybridClock.now())).build());
+ return
raftClient.run(REPLICA_MESSAGES_FACTORY.safeTimeSyncCommand().safeTime(hybridTimestamp(hybridClock.now())).build());
}
/**
@@ -1013,7 +1013,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
HybridTimestamp currentTimestamp = hybridClock.now();
HybridTimestamp commitTimestamp = commit ? currentTimestamp : null;
- FinishTxCommandBuilder finishTxCmdBldr = msgFactory.finishTxCommand()
+ FinishTxCommandBuilder finishTxCmdBldr = MSG_FACTORY.finishTxCommand()
.txId(txId)
.commit(commit)
.safeTime(hybridTimestamp(currentTimestamp))
@@ -1077,7 +1077,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
return allOf(txUpdateFutures.toArray(new
CompletableFuture<?>[txUpdateFutures.size()])).thenCompose(v -> {
HybridTimestampMessage timestampMsg =
hybridTimestamp(request.commitTimestamp());
- TxCleanupCommand txCleanupCmd = msgFactory.txCleanupCommand()
+ TxCleanupCommand txCleanupCmd = MSG_FACTORY.txCleanupCommand()
.txId(request.txId())
.commit(request.commit())
.commitTimestamp(timestampMsg)
@@ -2105,8 +2105,8 @@ public class PartitionReplicaListener implements
ReplicaListener {
* @param tmstmp {@link HybridTimestamp} object to convert to {@link
HybridTimestampMessage}.
* @return {@link HybridTimestampMessage} object obtained from {@link
HybridTimestamp}.
*/
- private HybridTimestampMessage hybridTimestamp(HybridTimestamp tmstmp) {
- return tmstmp != null ? replicaMessagesFactory.hybridTimestampMessage()
+ public static HybridTimestampMessage hybridTimestamp(HybridTimestamp
tmstmp) {
+ return tmstmp != null ?
REPLICA_MESSAGES_FACTORY.hybridTimestampMessage()
.physical(tmstmp.getPhysical())
.logical(tmstmp.getLogical())
.build()
@@ -2123,7 +2123,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
* @return Constructed {@link UpdateCommand} object.
*/
private UpdateCommand updateCommand(TablePartitionId tablePartId, UUID
rowUuid, ByteBuffer rowBuf, UUID txId) {
- UpdateCommandBuilder bldr = msgFactory.updateCommand()
+ UpdateCommandBuilder bldr = MSG_FACTORY.updateCommand()
.tablePartitionId(tablePartitionId(tablePartId))
.rowUuid(rowUuid)
.txId(txId)
@@ -2145,7 +2145,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
* @return Constructed {@link UpdateAllCommand} object.
*/
private UpdateAllCommand updateAllCommand(TablePartitionId tablePartId,
Map<UUID, ByteBuffer> rowsToUpdate, UUID txId) {
- return msgFactory.updateAllCommand()
+ return MSG_FACTORY.updateAllCommand()
.tablePartitionId(tablePartitionId(tablePartId))
.rowsToUpdate(rowsToUpdate)
.txId(txId)
@@ -2159,8 +2159,8 @@ public class PartitionReplicaListener implements
ReplicaListener {
* @param tablePartId {@link TablePartitionId} object to convert to {@link
TablePartitionIdMessage}.
* @return {@link TablePartitionIdMessage} object converted from argument.
*/
- private TablePartitionIdMessage tablePartitionId(TablePartitionId
tablePartId) {
- return msgFactory.tablePartitionIdMessage()
+ public static TablePartitionIdMessage tablePartitionId(TablePartitionId
tablePartId) {
+ return MSG_FACTORY.tablePartitionIdMessage()
.tableId(tablePartId.tableId())
.partitionId(tablePartId.partitionId())
.build();