This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 554f4ac6bb2 IGNITE-28216 Advance last applied index for each partition
command (#7770)
554f4ac6bb2 is described below
commit 554f4ac6bb28da2b7493b7002673681c1d4eda77
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Mon Mar 16 16:37:31 2026 +0400
IGNITE-28216 Advance last applied index for each partition command (#7770)
---
.../replicator/raft/ZonePartitionRaftListener.java | 55 ++++-
.../handlers/WriteIntentSwitchCommandHandler.java | 18 +-
.../raft/ZonePartitionRaftListenerTest.java | 237 +++++++++++++++++++--
.../ItAbstractInternalTableScanTest.java | 7 +-
.../ItInternalTableReadOnlyOperationsTest.java | 6 +-
.../distributed/raft/TablePartitionProcessor.java | 20 +-
.../MinimumActiveTxTimeCommandHandler.java | 8 +
.../raft/PartitionCommandListenerTest.java | 77 +++++--
8 files changed, 386 insertions(+), 42 deletions(-)
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
index cb8c2bf087b..9eabd1db070 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
@@ -138,7 +138,7 @@ public class ZonePartitionRaftListener implements
RaftGroupListener {
.addHandler(
PartitionReplicationMessageGroup.GROUP_TYPE,
Commands.WRITE_INTENT_SWITCH_V2,
- new
WriteIntentSwitchCommandHandler(tableProcessors::get, txManager))
+ new
WriteIntentSwitchCommandHandler(tableProcessors::get, txManager,
txStatePartitionStorage))
.addHandler(
TxMessageGroup.GROUP_TYPE,
VACUUM_TX_STATE_COMMAND,
@@ -206,7 +206,19 @@ public class ZonePartitionRaftListener implements
RaftGroupListener {
safeTimestamp
);
} else if (command instanceof
UpdateMinimumActiveTxBeginTimeCommand) {
- result = processCrossTableProcessorsCommand(command,
commandIndex, commandTerm, safeTimestamp);
+ CrossTableCommandResult crossTableResult =
processCrossTableProcessorsCommand(
+ command,
+ commandIndex,
+ commandTerm,
+ safeTimestamp
+ );
+ result = crossTableResult.result;
+
+ if (!crossTableResult.hadAnyTableProcessor) {
+ // We MUST bump information about last updated
index+term at least in one storage.
+ // See a comment in #onWrite() for explanation.
+
updateTxStateStorageLastAppliedIfNotStale(commandIndex, commandTerm);
+ }
} else if (command instanceof SafeTimeSyncCommand) {
result = handleSafeTimeSyncCommand(commandIndex,
commandTerm);
} else if (command instanceof PrimaryReplicaChangeCommand) {
@@ -216,7 +228,13 @@ public class ZonePartitionRaftListener implements
RaftGroupListener {
partitionKey.toReplicationGroupId(), commandIndex,
commandTerm,
cmd.leaseStartTime(), cmd.primaryReplicaNodeId(),
cmd.primaryReplicaNodeName());
- result = processCrossTableProcessorsCommand(command,
commandIndex, commandTerm, safeTimestamp);
+ CrossTableCommandResult crossTableResult =
processCrossTableProcessorsCommand(
+ command,
+ commandIndex,
+ commandTerm,
+ safeTimestamp
+ );
+ result = crossTableResult.result;
if (updateLeaseInfoInTxStorage(cmd, commandIndex,
commandTerm)) {
LOG.debug("Updated lease info in tx storage
[groupId={}, commandIndex={}, leaseStartTime={}]",
@@ -231,6 +249,8 @@ public class ZonePartitionRaftListener implements
RaftGroupListener {
if (commandHandler == null) {
LOG.info("Message type {} is not supported by the zone
partition RAFT listener yet", command.getClass());
+
updateTxStateStorageLastAppliedIfNotStale(commandIndex, commandTerm);
+
result = EMPTY_APPLIED_RESULT;
} else {
result = commandHandler.handle(command, commandIndex,
commandTerm, safeTimestamp);
@@ -275,14 +295,14 @@ public class ZonePartitionRaftListener implements
RaftGroupListener {
* @param safeTimestamp Safe timestamp.
* @return Tuple with the result of the command processing and a flag
indicating whether the command was applied.
*/
- private CommandResult processCrossTableProcessorsCommand(
+ private CrossTableCommandResult processCrossTableProcessorsCommand(
WriteCommand command,
long commandIndex,
long commandTerm,
@Nullable HybridTimestamp safeTimestamp
) {
if (tableProcessors.isEmpty()) {
- return new CommandResult(null, commandIndex > lastAppliedIndex);
+ return new CrossTableCommandResult(false, new CommandResult(null,
commandIndex > lastAppliedIndex));
}
boolean wasApplied = false;
@@ -293,7 +313,7 @@ public class ZonePartitionRaftListener implements
RaftGroupListener {
wasApplied = wasApplied || r.wasApplied();
}
- return new CommandResult(null, wasApplied);
+ return new CrossTableCommandResult(true, new CommandResult(null,
wasApplied));
}
/**
@@ -322,12 +342,22 @@ public class ZonePartitionRaftListener implements
RaftGroupListener {
LOG.warn("Table processor for table ID {} not found. Command will
be ignored: {}",
tableId, command.toStringForLightLogging());
+ // We MUST bump information about last updated index+term.
+ // See a comment in #onWrite() for explanation.
+ updateTxStateStorageLastAppliedIfNotStale(commandIndex,
commandTerm);
+
return EMPTY_APPLIED_RESULT;
}
return tableProcessor.processCommand(command, commandIndex,
commandTerm, safeTimestamp);
}
+ private void updateTxStateStorageLastAppliedIfNotStale(long commandIndex,
long commandTerm) {
+ if (commandIndex > txStateStorage.lastAppliedIndex()) {
+ txStateStorage.lastApplied(commandIndex, commandTerm);
+ }
+ }
+
private boolean updateLeaseInfoInTxStorage(PrimaryReplicaChangeCommand
command, long commandIndex, long commandTerm) {
if (commandIndex <= txStateStorage.lastAppliedIndex()) {
return false;
@@ -518,7 +548,6 @@ public class ZonePartitionRaftListener implements
RaftGroupListener {
/**
* Handler for the {@link SafeTimeSyncCommand}.
*
- * @param cmd Command.
* @param commandIndex RAFT index of the command.
* @param commandTerm RAFT term of the command.
*/
@@ -530,7 +559,7 @@ public class ZonePartitionRaftListener implements
RaftGroupListener {
// We MUST bump information about last updated index+term.
// See a comment in #onWrite() for explanation.
- txStateStorage.lastApplied(commandIndex, commandTerm);
+ updateTxStateStorageLastAppliedIfNotStale(commandIndex, commandTerm);
return EMPTY_APPLIED_RESULT;
}
@@ -539,4 +568,14 @@ public class ZonePartitionRaftListener implements
RaftGroupListener {
public HybridTimestamp currentSafeTime() {
return safeTimeTracker.current();
}
+
+ private static class CrossTableCommandResult {
+ private final boolean hadAnyTableProcessor;
+ private final CommandResult result;
+
+ private CrossTableCommandResult(boolean hadAnyTableProcessor,
CommandResult result) {
+ this.hadAnyTableProcessor = hadAnyTableProcessor;
+ this.result = result;
+ }
+ }
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/handlers/WriteIntentSwitchCommandHandler.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/handlers/WriteIntentSwitchCommandHandler.java
index b9407770de0..0d40a9bc560 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/handlers/WriteIntentSwitchCommandHandler.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/handlers/WriteIntentSwitchCommandHandler.java
@@ -27,6 +27,7 @@ import
org.apache.ignite.internal.partition.replicator.raft.CommandResult;
import org.apache.ignite.internal.partition.replicator.raft.RaftTableProcessor;
import org.apache.ignite.internal.partition.replicator.raft.RaftTxFinishMarker;
import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
import org.jetbrains.annotations.Nullable;
/**
@@ -39,9 +40,16 @@ public class WriteIntentSwitchCommandHandler extends
AbstractCommandHandler<Writ
private final RaftTxFinishMarker txFinishMarker;
+ private final TxStatePartitionStorage txStatePartitionStorage;
+
/** Constructor. */
- public WriteIntentSwitchCommandHandler(IntFunction<RaftTableProcessor>
tableProcessorByTableId, TxManager txManager) {
+ public WriteIntentSwitchCommandHandler(
+ IntFunction<RaftTableProcessor> tableProcessorByTableId,
+ TxManager txManager,
+ TxStatePartitionStorage txStatePartitionStorage
+ ) {
this.tableProcessorByTableId = tableProcessorByTableId;
+ this.txStatePartitionStorage = txStatePartitionStorage;
txFinishMarker = new RaftTxFinishMarker(txManager);
}
@@ -58,6 +66,7 @@ public class WriteIntentSwitchCommandHandler extends
AbstractCommandHandler<Writ
txFinishMarker.markFinished(switchCommand.txId(),
switchCommand.commit(), switchCommand.commitTimestamp(), null);
boolean applied = false;
+ boolean handledByAnyTable = false;
for (int tableId : ((WriteIntentSwitchCommandV2)
switchCommand).tableIds()) {
RaftTableProcessor tableProcessor = raftTableProcessor(tableId);
@@ -76,6 +85,13 @@ public class WriteIntentSwitchCommandHandler extends
AbstractCommandHandler<Writ
.processCommand(switchCommand, commandIndex, commandTerm,
safeTimestamp);
applied = applied || singleResult.wasApplied();
+ handledByAnyTable = true;
+ }
+
+ // We MUST bump information about last updated index+term at least in
one storage.
+ // See a comment in ZonePartitionRaftListener#onWrite() for
explanation.
+ if (!handledByAnyTable && commandIndex >
txStatePartitionStorage.lastAppliedIndex()) {
+ txStatePartitionStorage.lastApplied(commandIndex, commandTerm);
}
return new CommandResult(null, applied);
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
index e6eab46d8b4..650036f82f8 100644
---
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
@@ -31,7 +31,11 @@ import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Answers.RETURNS_DEEP_STUBS;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.lenient;
@@ -51,7 +55,9 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
+import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
@@ -63,6 +69,7 @@ import
org.apache.ignite.internal.partition.replicator.network.command.BuildInde
import
org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommand;
import
org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommandV2;
import
org.apache.ignite.internal.partition.replicator.network.command.UpdateAllCommandV2;
+import
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommand;
import
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommandV2;
import
org.apache.ignite.internal.partition.replicator.network.command.UpdateMinimumActiveTxBeginTimeCommand;
import
org.apache.ignite.internal.partition.replicator.network.command.WriteIntentSwitchCommand;
@@ -99,6 +106,7 @@ import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxMeta;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.UpdateCommandResult;
+import org.apache.ignite.internal.tx.message.TxMessagesFactory;
import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
import
org.apache.ignite.internal.tx.storage.state.test.TestTxStatePartitionStorage;
import org.apache.ignite.internal.tx.test.TestTransactionIds;
@@ -130,11 +138,15 @@ class ZonePartitionRaftListenerTest extends
BaseIgniteAbstractTest {
private static final int TABLE_ID = 1;
+ private static final int NON_EXISTENT_TABLE_ID = 999;
+
private static final PartitionKey ZONE_PARTITION_KEY = new
PartitionKey(ZONE_ID, PARTITION_ID);
private static final PartitionReplicationMessagesFactory
PARTITION_REPLICATION_MESSAGES_FACTORY =
new PartitionReplicationMessagesFactory();
+ private static final TxMessagesFactory TX_MESSAGES_FACTORY = new
TxMessagesFactory();
+
private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new
ReplicaMessagesFactory();
@Captor
@@ -154,8 +166,8 @@ class ZonePartitionRaftListenerTest extends
BaseIgniteAbstractTest {
@Spy
private TxStatePartitionStorage txStatePartitionStorage = new
TestTxStatePartitionStorage();
- @Mock
- private MvPartitionStorage mvPartitionStorage;
+ @Spy
+ private MvPartitionStorage mvPartitionStorage = new
TestMvPartitionStorage(PARTITION_ID);
@InjectExecutorService
private ExecutorService executor;
@@ -170,6 +182,9 @@ class ZonePartitionRaftListenerTest extends
BaseIgniteAbstractTest {
@Mock
private PartitionSnapshots partitionSnapshots;
+ @Mock
+ private CatalogService catalogService;
+
@BeforeEach
void setUp() {
listener = createListener();
@@ -277,11 +292,11 @@ class ZonePartitionRaftListenerTest extends
BaseIgniteAbstractTest {
when(command.primaryReplicaNodeId()).thenReturn(leaseInfo.primaryReplicaNodeId());
when(command.primaryReplicaNodeName()).thenReturn(leaseInfo.primaryReplicaNodeName());
- when(mvPartitionStorage.runConsistently(any())).thenAnswer(invocation
-> {
+ doAnswer(invocation -> {
WriteClosure<?> closure = invocation.getArgument(0);
return closure.execute(locker);
- });
+ }).when(mvPartitionStorage).runConsistently(any());
listener.onWrite(List.of(writeCommandClosure).iterator());
@@ -461,22 +476,22 @@ class ZonePartitionRaftListenerTest extends
BaseIgniteAbstractTest {
.leaseStartTime(HybridTimestamp.MIN_VALUE.addPhysicalTime(1).longValue())
.build();
-
listener.onWrite(List.of(writeCommandClosure(raftIndex.incrementAndGet(), 1,
command, null, null)).iterator());
+
listener.onWrite(List.of(writeCommandClosureWithoutSafeTime(raftIndex.incrementAndGet(),
1, command)).iterator());
mvPartitionStorage.lastApplied(10L, 1L);
UpdateCommandV2 updateCommand = mock(UpdateCommandV2.class);
when(updateCommand.tableId()).thenReturn(TABLE_ID);
- WriteIntentSwitchCommand writeIntentSwitchCommand =
mock(WriteIntentSwitchCommand.class);
+ WriteIntentSwitchCommand writeIntentSwitchCommand =
writeIntentSwitchCommand();
- SafeTimeSyncCommand safeTimeSyncCommand =
mock(SafeTimeSyncCommand.class);
+ SafeTimeSyncCommand safeTimeSyncCommand = safeTimeSyncCommand();
FinishTxCommandV2 finishTxCommand = mock(FinishTxCommandV2.class);
when(finishTxCommand.groupType()).thenReturn(PartitionReplicationMessageGroup.GROUP_TYPE);
when(finishTxCommand.messageType()).thenReturn(Commands.FINISH_TX_V2);
- PrimaryReplicaChangeCommand primaryReplicaChangeCommand =
mock(PrimaryReplicaChangeCommand.class);
+ PrimaryReplicaChangeCommand primaryReplicaChangeCommand =
primaryReplicaChangeCommand();
// Checks for MvPartitionStorage.
listener.onWrite(List.of(
@@ -510,7 +525,7 @@ class ZonePartitionRaftListenerTest extends
BaseIgniteAbstractTest {
verify(txStatePartitionStorage, never())
.compareAndSet(any(UUID.class), any(TxState.class),
any(TxMeta.class), anyLong(), anyLong());
- // First time for the command, second time for updating safe time.
+ // First time for safe time command above, second time for explicit
call of lastApplied() in this test.
verify(txStatePartitionStorage, times(2)).lastApplied(anyLong(),
anyLong());
assertThat(commandClosureResultCaptor.getAllValues(),
containsInAnyOrder(new Throwable[]{null, null}));
@@ -536,6 +551,175 @@ class ZonePartitionRaftListenerTest extends
BaseIgniteAbstractTest {
assertThat(txStatePartitionStorage.lastAppliedTerm(), is(2L));
}
+ @Test
+ void
onlyUpdatesMvStorageLastAppliedForWriteIntentSwitchCommandsThatTouchSomeTableStorages()
{
+ mockCatalogForUpdateExecution();
+
+ listener.addTableProcessor(TABLE_ID, partitionListener(TABLE_ID));
+
+ WriteIntentSwitchCommand command =
PARTITION_REPLICATION_MESSAGES_FACTORY.writeIntentSwitchCommandV2()
+ .txId(TestTransactionIds.newTransactionId())
+ .initiatorTime(clock.now())
+ .commit(true)
+ .tableIds(Set.of(TABLE_ID))
+ .build();
+
+ listener.onWrite(List.of(
+ writeCommandClosure(3, 2, command)
+ ).iterator());
+
+ verify(mvPartitionStorage).lastApplied(3, 2);
+ verify(txStatePartitionStorage, never()).lastApplied(anyLong(),
anyLong());
+ }
+
+ private void mockCatalogForUpdateExecution() {
+ Catalog catalog = mock(Catalog.class);
+ when(catalogService.activeCatalog(anyLong())).thenReturn(catalog);
+
when(catalog.indexes(anyInt())).thenReturn(List.of(mock(CatalogIndexDescriptor.class)));
+ }
+
+ @Test
+ void
updatesTxStateStorageLastAppliedForWriteIntentSwitchCommandsThatTouchNoTableStorages()
{
+ WriteIntentSwitchCommand command =
writeIntentSwitchCommandForMissingTable();
+
+ listener.onWrite(List.of(
+ writeCommandClosure(3, 2, command)
+ ).iterator());
+
+ assertThat(txStatePartitionStorage.lastAppliedIndex(), is(3L));
+ assertThat(txStatePartitionStorage.lastAppliedTerm(), is(2L));
+ }
+
+ private WriteIntentSwitchCommand writeIntentSwitchCommandForMissingTable()
{
+ return
PARTITION_REPLICATION_MESSAGES_FACTORY.writeIntentSwitchCommandV2()
+ .txId(TestTransactionIds.newTransactionId())
+ .initiatorTime(clock.now())
+ .commit(true)
+ .tableIds(Set.of(NON_EXISTENT_TABLE_ID))
+ .build();
+ }
+
+ @Test
+ void
skipsTxStateStorageLastAppliedUpdateForWriteIntentSwitchCommandsWhenIndexIsAhead()
{
+ txStatePartitionStorage.lastApplied(10L, 10L);
+
+ WriteIntentSwitchCommand command =
writeIntentSwitchCommandForMissingTable();
+
+ listener.onWrite(List.of(
+ writeCommandClosure(3, 2, command)
+ ).iterator());
+
+ assertThat(txStatePartitionStorage.lastAppliedIndex(), is(10L));
+ assertThat(txStatePartitionStorage.lastAppliedTerm(), is(10L));
+ }
+
+ @Test
+ void
onlyUpdatesMvStorageLastAppliedForTableAwareCommandsThatTouchTableStorages() {
+ mockCatalogForUpdateExecution();
+ when(mvPartitionStorage.leaseInfo()).thenReturn(new LeaseInfo(0,
randomUUID(), "test"));
+
+ listener.addTableProcessor(TABLE_ID, partitionListener(TABLE_ID));
+
+ UpdateCommand command = updateCommand(TABLE_ID);
+
+ listener.onWrite(List.of(
+ writeCommandClosure(3, 2, command)
+ ).iterator());
+
+ verify(mvPartitionStorage).lastApplied(3, 2);
+ verify(txStatePartitionStorage, never()).lastApplied(anyLong(),
anyLong());
+ }
+
+ @Test
+ void
updatesTxStateStorageLastAppliedForTableAwareCommandsThatTouchNoTableStorages()
{
+ UpdateCommand command = updateCommand(NON_EXISTENT_TABLE_ID);
+
+ listener.onWrite(List.of(
+ writeCommandClosure(3, 2, command)
+ ).iterator());
+
+ assertThat(txStatePartitionStorage.lastAppliedIndex(), is(3L));
+ assertThat(txStatePartitionStorage.lastAppliedTerm(), is(2L));
+ }
+
+ @Test
+ void
skipsUpdateToTxStateStorageLastAppliedForTableAwareCommandsThatTouchNoTableStoragesButIndexIsAlreadyApplied()
{
+ txStatePartitionStorage.lastApplied(10L, 10L);
+
+ UpdateCommand command = updateCommand(NON_EXISTENT_TABLE_ID);
+
+ listener.onWrite(List.of(
+ writeCommandClosure(3, 2, command)
+ ).iterator());
+
+ assertThat(txStatePartitionStorage.lastAppliedIndex(), is(10L));
+ assertThat(txStatePartitionStorage.lastAppliedTerm(), is(10L));
+ }
+
+ @Test
+ void
updatesTxStateStorageLastAppliedForUpdateMinimumActiveTxBeginTimeCommandsThatTouchNoTableStorages()
{
+ WriteCommand command = updateMinimumActiveTxBeginTimeCommand();
+
+ listener.onWrite(List.of(
+ writeCommandClosure(3, 2, command)
+ ).iterator());
+
+ assertThat(txStatePartitionStorage.lastAppliedIndex(), is(3L));
+ assertThat(txStatePartitionStorage.lastAppliedTerm(), is(2L));
+ }
+
+ @Test
+ void
skipsUpdateToTxStateStorageLastAppliedForUpdateMinimumActiveTxBeginTimeCommandsThatTouchNoTableStoragesButIndexIsAlreadyApplied()
{
+ txStatePartitionStorage.lastApplied(10L, 10L);
+
+ WriteCommand command = updateMinimumActiveTxBeginTimeCommand();
+
+ listener.onWrite(List.of(
+ writeCommandClosure(3, 2, command)
+ ).iterator());
+
+ assertThat(txStatePartitionStorage.lastAppliedIndex(), is(10L));
+ assertThat(txStatePartitionStorage.lastAppliedTerm(), is(10L));
+ }
+
+ @Test
+ void
updatesLeaseInfoAndTxStateStorageLastAppliedForPrimaryReplicaChangeCommandsThatTouchNoTableStorages()
{
+ WriteCommand command = primaryReplicaChangeCommand();
+
+ listener.onWrite(List.of(
+ writeCommandClosureWithoutSafeTime(3, 2, command)
+ ).iterator());
+
+ verify(txStatePartitionStorage).leaseInfo(any(), eq(3L), eq(2L));
+ }
+
+ @Test
+ void
updatesLeaseInfoAndTxStateStorageLastAppliedForPrimaryReplicaChangeCommandsThatTouchSomeTableStorages()
{
+ listener.addTableProcessor(TABLE_ID, partitionListener(TABLE_ID));
+
+ WriteCommand command = primaryReplicaChangeCommand();
+
+ listener.onWrite(List.of(
+ writeCommandClosureWithoutSafeTime(3, 2, command)
+ ).iterator());
+
+ verify(txStatePartitionStorage).leaseInfo(any(), eq(3L), eq(2L));
+ }
+
+ @Test
+ void updatesTxStateStorageLastAppliedForVacuumTxStateCommands() {
+ WriteCommand command = TX_MESSAGES_FACTORY.vacuumTxStatesCommand()
+ .txIds(Set.of(randomUUID()))
+ .build();
+
+ listener.onWrite(List.of(
+ writeCommandClosureWithoutSafeTime(3, 2, command)
+ ).iterator());
+
+ assertThat(txStatePartitionStorage.lastAppliedIndex(), is(3L));
+ assertThat(txStatePartitionStorage.lastAppliedTerm(), is(2L));
+ }
+
@Test
public void testSafeTime() {
HybridTimestamp timestamp = clock.now();
@@ -599,9 +783,13 @@ class ZonePartitionRaftListenerTest extends
BaseIgniteAbstractTest {
}
private static UpdateCommandV2 updateCommand() {
+ return updateCommand(TABLE_ID);
+ }
+
+ private static UpdateCommandV2 updateCommand(int tableId) {
return PARTITION_REPLICATION_MESSAGES_FACTORY.updateCommandV2()
.rowUuid(randomUUID())
- .tableId(TABLE_ID)
+ .tableId(tableId)
.commitPartitionId(defaultPartitionIdMessage())
.txCoordinatorId(randomUUID())
.txId(TestTransactionIds.newTransactionId())
@@ -748,6 +936,14 @@ class ZonePartitionRaftListenerTest extends
BaseIgniteAbstractTest {
return commandClosure;
}
+ private static CommandClosure<WriteCommand>
writeCommandClosureWithoutSafeTime(
+ long index,
+ long term,
+ WriteCommand writeCommand
+ ) {
+ return writeCommandClosure(index, term, writeCommand, null, null);
+ }
+
private TablePartitionProcessor partitionListener(int tableId) {
LeasePlacementDriver placementDriver =
mock(LeasePlacementDriver.class);
lenient().when(placementDriver.getCurrentPrimaryReplica(any(),
any())).thenReturn(null);
@@ -755,6 +951,23 @@ class ZonePartitionRaftListenerTest extends
BaseIgniteAbstractTest {
ClockService clockService = mock(ClockService.class);
lenient().when(clockService.current()).thenReturn(clock.current());
+ StorageUpdateHandler storageUpdateHandler =
mock(StorageUpdateHandler.class);
+
+ lenient().doAnswer(invocation -> {
+ Runnable onApplication = invocation.getArgument(3);
+ if (onApplication != null) {
+ onApplication.run();
+ }
+ return null;
+ }).when(storageUpdateHandler).switchWriteIntents(any(), anyBoolean(),
any(), any(Runnable.class), any());
+ lenient().doAnswer(invocation -> {
+ Runnable onApplication = invocation.getArgument(5);
+ if (onApplication != null) {
+ onApplication.run();
+ }
+ return null;
+ }).when(storageUpdateHandler).handleUpdate(any(), any(), any(), any(),
anyBoolean(), any(Runnable.class), any(), any(), any());
+
return new TablePartitionProcessor(
txManager,
new SnapshotAwarePartitionDataStorage(
@@ -763,8 +976,8 @@ class ZonePartitionRaftListenerTest extends
BaseIgniteAbstractTest {
outgoingSnapshotsManager,
ZONE_PARTITION_KEY
),
- mock(StorageUpdateHandler.class),
- mock(CatalogService.class),
+ storageUpdateHandler,
+ catalogService,
mock(SchemaRegistry.class),
mock(IndexMetaStorage.class),
randomUUID(),
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
index af51cae7bde..85076be1067 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
@@ -67,6 +67,7 @@ import
org.apache.ignite.internal.storage.PartitionTimestampCursor;
import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
@@ -78,7 +79,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mock;
+import org.mockito.Spy;
import org.mockito.junit.jupiter.MockitoExtension;
/**
@@ -102,8 +103,8 @@ public abstract class ItAbstractInternalTableScanTest
extends IgniteAbstractTest
private SystemDistributedConfiguration systemDistributedConfiguration;
/** Mock partition storage. */
- @Mock
- private MvPartitionStorage mockStorage;
+ @Spy
+ private MvPartitionStorage mockStorage = new TestMvPartitionStorage(0);
/** Internal table to test. */
DummyInternalTableImpl internalTbl;
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
index e76edffba24..9bde91790ea 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
@@ -61,6 +61,7 @@ import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.schema.row.RowAssembler;
import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
@@ -74,6 +75,7 @@ import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.function.Executable;
import org.mockito.Mock;
+import org.mockito.Spy;
import org.mockito.junit.jupiter.MockitoExtension;
/**
@@ -105,8 +107,8 @@ public class ItInternalTableReadOnlyOperationsTest extends
IgniteAbstractTest {
private static final ColumnsExtractor KEY_EXTRACTOR =
BinaryRowConverter.keyExtractor(SCHEMA);
/** Mock partition storage. */
- @Mock
- private MvPartitionStorage mockStorage;
+ @Spy
+ private MvPartitionStorage mockStorage = new TestMvPartitionStorage(0);
/** Transaction mock. */
@Mock
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java
index 9268c32de85..4f8a0e7985c 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java
@@ -200,6 +200,14 @@ public class TablePartitionProcessor implements
RaftTableProcessor {
throw new AssertionError("Unknown command type [command=" +
command.toStringForLightLogging() + ']');
}
+ assert storage.lastAppliedIndex() >= commandIndex : String.format(
+ "Last applied index after command application is less than the
command index "
+ + "[lastAppliedIndex=%d, commandIndex=%d, command=%s]",
+ storage.lastAppliedIndex(),
+ commandIndex,
+ command.toStringForLightLogging()
+ );
+
return result;
}
@@ -271,6 +279,10 @@ public class TablePartitionProcessor implements
RaftTableProcessor {
long leaseStartTime = cmd.leaseStartTime();
if (storageLeaseInfo == null || leaseStartTime !=
storageLeaseInfo.leaseStartTime()) {
+ // We MUST bump information about last updated index+term.
+ // See a comment in TablePartitionProcessor#processCommand()
for explanation.
+ advanceLastAppliedIndexConsistently(commandIndex, commandTerm);
+
var updateCommandResult = new UpdateCommandResult(
false,
storageLeaseInfo == null ? 0 :
storageLeaseInfo.leaseStartTime(),
@@ -301,7 +313,7 @@ public class TablePartitionProcessor implements
RaftTableProcessor {
);
} else {
// We MUST bump information about last updated index+term.
- // See a comment in #onWrite() for explanation.
+ // See a comment in TablePartitionProcessor#processCommand() for
explanation.
// If we get here, that means that we are collocated with primary
and data was already inserted there, thus it's only required
// to update information about index and term.
advanceLastAppliedIndexConsistently(commandIndex, commandTerm);
@@ -343,6 +355,10 @@ public class TablePartitionProcessor implements
RaftTableProcessor {
long leaseStartTime = cmd.leaseStartTime();
if (storageLeaseInfo == null || leaseStartTime !=
storageLeaseInfo.leaseStartTime()) {
+ // We MUST bump information about last updated index+term.
+ // See a comment in TablePartitionProcessor#processCommand()
for explanation.
+ advanceLastAppliedIndexConsistently(commandIndex, commandTerm);
+
var updateCommandResult = new UpdateCommandResult(
false,
storageLeaseInfo == null ? 0 :
storageLeaseInfo.leaseStartTime(),
@@ -368,7 +384,7 @@ public class TablePartitionProcessor implements
RaftTableProcessor {
);
} else {
// We MUST bump information about last updated index+term.
- // See a comment in #onWrite() for explanation.
+ // See a comment in TablePartitionProcessor#processCommand() for
explanation.
// If we get here, that means that we are collocated with primary
and data was already inserted there, thus it's only required
// to update information about index and term.
advanceLastAppliedIndexConsistently(commandIndex, commandTerm);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/MinimumActiveTxTimeCommandHandler.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/MinimumActiveTxTimeCommandHandler.java
index 0dcb39e8975..27de97d5401 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/MinimumActiveTxTimeCommandHandler.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/MinimumActiveTxTimeCommandHandler.java
@@ -77,6 +77,14 @@ public class MinimumActiveTxTimeCommandHandler extends
AbstractCommandHandler<Up
long timestamp = command.timestamp();
+ // We MUST bump information about last updated index+term.
+ // See a comment in TablePartitionProcessor#processCommand() for
explanation.
+ storage.runConsistently(locker -> {
+ storage.lastApplied(commandIndex, commandTerm);
+
+ return null;
+ });
+
storage.flush(false)
.whenComplete((r, t) -> {
if (t == null) {
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index 11940453982..54eb7e8ee5f 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -129,7 +129,6 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.InOrder;
import org.mockito.Mockito;
@@ -383,25 +382,42 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
}
@ParameterizedTest
- @MethodSource("allCommandsUpdatingLastAppliedIndex")
- void updatesLastAppliedForCommandsUpdatingLastAppliedIndex(WriteCommand
command) {
- commandListener.processCommand(command, 3, 2, hybridClock.now());
+ @MethodSource("commandsVariationsForIndexAdvanceTesting")
+ void updatesLastAppliedForCommandsUpdatingLastAppliedIndex(LabeledCommand
command) {
+ commandListener.processCommand(command.command, 3, 2,
hybridClock.now());
verify(mvPartitionStorage).lastApplied(3, 2);
}
- private static UpdateCommandV2 updateCommand() {
+ private static UpdateCommandV2 updateCommandWithLeaseStartTime() {
+ return updateCommand(999L);
+ }
+
+ private static UpdateCommandV2 updateCommandWithoutLeaseStartTime() {
+ return updateCommand(null);
+ }
+
+ private static UpdateCommandV2 updateCommand(@Nullable Long
leaseStartTime) {
return PARTITION_REPLICATION_MESSAGES_FACTORY.updateCommandV2()
.rowUuid(randomUUID())
.tableId(TABLE_ID)
.commitPartitionId(defaultPartitionIdMessage())
.txCoordinatorId(randomUUID())
.txId(TestTransactionIds.newTransactionId())
+ .leaseStartTime(leaseStartTime)
.initiatorTime(anyTime())
.build();
}
- private static UpdateAllCommandV2 updateAllCommand() {
+ private static UpdateAllCommandV2 updateAllCommandWithLeaseStartTime() {
+ return updateAllCommand(999L);
+ }
+
+ private static UpdateAllCommandV2 updateAllCommandWithoutLeaseStartTime() {
+ return updateAllCommand(null);
+ }
+
+ private static UpdateAllCommandV2 updateAllCommand(@Nullable Long
leaseStartTime) {
return PARTITION_REPLICATION_MESSAGES_FACTORY.updateAllCommandV2()
.messageRowsToUpdate(singletonMap(
randomUUID(),
@@ -411,18 +427,22 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
.commitPartitionId(defaultPartitionIdMessage())
.txCoordinatorId(randomUUID())
.txId(TestTransactionIds.newTransactionId())
+ .leaseStartTime(leaseStartTime)
.initiatorTime(anyTime())
.build();
}
- private static Stream<Arguments> allCommandsUpdatingLastAppliedIndex() {
- return Stream.of(
- updateCommand(),
- updateAllCommand(),
- writeIntentSwitchCommand(),
- primaryReplicaChangeCommand(),
- buildIndexCommand()
- ).map(Arguments::of);
+ private static List<LabeledCommand>
commandsVariationsForIndexAdvanceTesting() {
+ return List.of(
+ new LabeledCommand("UpdateCommand without lease start time",
updateCommandWithoutLeaseStartTime()),
+ new LabeledCommand("UpdateCommand with lease start time",
updateCommandWithLeaseStartTime()),
+ new LabeledCommand("UpdateAllCommand without lease start
time", updateAllCommandWithoutLeaseStartTime()),
+ new LabeledCommand("UpdateAllCommand with lease start time",
updateAllCommandWithLeaseStartTime()),
+ new LabeledCommand(writeIntentSwitchCommand()),
+ new LabeledCommand(primaryReplicaChangeCommand()),
+ new LabeledCommand(updateMinimumActiveTxBeginTimeCommand()),
+ new LabeledCommand(buildIndexCommand())
+ );
}
private static PrimaryReplicaChangeCommand primaryReplicaChangeCommand() {
@@ -432,6 +452,16 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
.build();
}
+ private static WriteCommand updateMinimumActiveTxBeginTimeCommand() {
+ HybridTimestamp baseTime = HybridTimestamp.MIN_VALUE;
+
+ return
PARTITION_REPLICATION_MESSAGES_FACTORY.updateMinimumActiveTxBeginTimeCommand()
+ .initiatorTime(baseTime)
+ .safeTime(baseTime.addPhysicalTime(1))
+ .timestamp(baseTime.addPhysicalTime(1000).longValue())
+ .build();
+ }
+
private static BuildIndexCommandV3 buildIndexCommand() {
return PARTITION_REPLICATION_MESSAGES_FACTORY.buildIndexCommandV3()
.tableId(TABLE_ID)
@@ -955,4 +985,23 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
return indexMeta;
}
+
+ private static class LabeledCommand {
+ private final String label;
+ private final WriteCommand command;
+
+ private LabeledCommand(WriteCommand command) {
+ this(command.getClass().getSimpleName(), command);
+ }
+
+ private LabeledCommand(String label, WriteCommand command) {
+ this.label = label;
+ this.command = command;
+ }
+
+ @Override
+ public String toString() {
+ return label;
+ }
+ }
}