This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new ee489051fd IGNITE-20768 Test coverage of batch operations (#2893)
ee489051fd is described below
commit ee489051fd1ee13f11c9246eed58d4e818177893
Author: Cyrill <[email protected]>
AuthorDate: Wed Nov 29 17:09:29 2023 +0300
IGNITE-20768 Test coverage of batch operations (#2893)
---
.../table/distributed/StorageCleanupTest.java | 427 +++++++++++++++++++++
.../replication/PartitionReplicaListenerTest.java | 30 ++
2 files changed, 457 insertions(+)
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java
index 92dcd6451e..79ad4870ac 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.table.distributed;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
@@ -29,6 +30,7 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -42,6 +44,7 @@ import org.apache.ignite.internal.schema.BinaryRowConverter;
import org.apache.ignite.internal.schema.BinaryTupleSchema;
import org.apache.ignite.internal.schema.ColumnsExtractor;
import org.apache.ignite.internal.storage.BaseMvStoragesTest;
+import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor;
@@ -51,6 +54,7 @@ import
org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor.Sto
import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
import org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage;
import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
+import org.apache.ignite.internal.table.distributed.replicator.TimedBinaryRow;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.type.NativeTypes;
import org.junit.jupiter.api.BeforeEach;
@@ -205,6 +209,88 @@ public class StorageCleanupTest extends BaseMvStoragesTest
{
verify(storage, never()).commitWrite(any(), any());
}
+ @Test
+ void testSimpleCommitBatch() {
+ UUID txUuid = UUID.randomUUID();
+
+ HybridTimestamp commitTs = CLOCK.now();
+
+ BinaryRow row1 = binaryRow(new TestKey(1, "foo1"), new TestValue(2,
"bar"));
+ BinaryRow row2 = binaryRow(new TestKey(3, "foo3"), new TestValue(4,
"baz"));
+ BinaryRow row3 = binaryRow(new TestKey(5, "foo5"), new TestValue(7,
"zzu"));
+
+ TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID);
+
+ TimedBinaryRow tb1 = new TimedBinaryRow(row1, null);
+ TimedBinaryRow tb2 = new TimedBinaryRow(row2, null);
+ TimedBinaryRow tb3 = new TimedBinaryRow(row3, null);
+
+ UUID id1 = UUID.randomUUID();
+ UUID id2 = UUID.randomUUID();
+ UUID id3 = UUID.randomUUID();
+
+ Map<UUID, TimedBinaryRow> rowsToUpdate = Map.of(
+ id1, tb1,
+ id2, tb2,
+ id3, tb3
+ );
+ storageUpdateHandler.handleUpdateAll(txUuid, rowsToUpdate,
partitionId, true, null, null);
+
+ assertEquals(3, storage.rowsCount());
+ // We have three writes to the storage.
+ verify(storage, times(3)).addWrite(any(), any(), any(), anyInt(),
anyInt());
+
+ storageUpdateHandler.handleTransactionCleanup(txUuid, true, commitTs);
+
+ assertEquals(3, storage.rowsCount());
+ // Those writes resulted in three commits.
+ verify(storage, times(3)).commitWrite(any(), any());
+
+ // Now reset the invocation counter.
+ clearInvocations(storage);
+
+ // And run cleanup again for the same transaction.
+ storageUpdateHandler.handleTransactionCleanup(txUuid, true, commitTs);
+
+ assertEquals(3, storage.rowsCount());
+ // And no invocation after, meaning idempotence of the cleanup.
+ verify(storage, never()).commitWrite(any(), any());
+
+ ReadResult result1 = storage.read(new RowId(partitionId.partitionId(),
id1), HybridTimestamp.MAX_VALUE);
+ assertEquals(row1, result1.binaryRow());
+
+ ReadResult result2 = storage.read(new RowId(partitionId.partitionId(),
id2), HybridTimestamp.MAX_VALUE);
+ assertEquals(row2, result2.binaryRow());
+
+ ReadResult result3 = storage.read(new RowId(partitionId.partitionId(),
id3), HybridTimestamp.MAX_VALUE);
+ assertEquals(row3, result3.binaryRow());
+
+ // Reset the invocation counter.
+ clearInvocations(storage);
+
+ // Now delete rows with a batch
+ Map<UUID, TimedBinaryRow> rowsToDelete = new HashMap<>();
+ rowsToDelete.put(id2, null);
+ rowsToDelete.put(id3, null);
+
+ storageUpdateHandler.handleUpdateAll(txUuid, rowsToDelete,
partitionId, true, null, null);
+
+ // We have three writes to the storage.
+ verify(storage, times(2)).addWrite(any(), any(), any(), anyInt(),
anyInt());
+
+ // And run cleanup again for the same transaction.
+ storageUpdateHandler.handleTransactionCleanup(txUuid, true, commitTs);
+
+ ReadResult resultAfterDelete1 = storage.read(new
RowId(partitionId.partitionId(), id1), HybridTimestamp.MAX_VALUE);
+ assertEquals(row1, resultAfterDelete1.binaryRow());
+
+ ReadResult resultAfterDelete2 = storage.read(new
RowId(partitionId.partitionId(), id2), HybridTimestamp.MAX_VALUE);
+ assertNull(resultAfterDelete2.binaryRow());
+
+ ReadResult resultAfterDelete3 = storage.read(new
RowId(partitionId.partitionId(), id3), HybridTimestamp.MAX_VALUE);
+ assertNull(resultAfterDelete3.binaryRow());
+ }
+
@Test
void testCleanupAndUpdateRow() {
UUID txUuid = UUID.randomUUID();
@@ -266,6 +352,80 @@ public class StorageCleanupTest extends BaseMvStoragesTest
{
verify(storage, times(2)).commitWrite(any(), any());
}
+ @Test
+ void testCleanupAndUpdateRowBatch() {
+ UUID txUuid = UUID.randomUUID();
+
+ HybridTimestamp commitTs = CLOCK.now();
+
+ BinaryRow row1 = binaryRow(new TestKey(1, "foo1"), new TestValue(2,
"bar"));
+ BinaryRow row2 = binaryRow(new TestKey(3, "foo3"), new TestValue(4,
"baz"));
+ BinaryRow row3 = binaryRow(new TestKey(5, "foo5"), new TestValue(7,
"zzu"));
+
+ TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID);
+
+
+ UUID row1Id = UUID.randomUUID();
+ UUID row2Id = UUID.randomUUID();
+ UUID row3Id = UUID.randomUUID();
+
+ TimedBinaryRow tb1 = new TimedBinaryRow(row1, null);
+ TimedBinaryRow tb2 = new TimedBinaryRow(row2, null);
+ TimedBinaryRow tb3 = new TimedBinaryRow(row3, null);
+
+ Map<UUID, TimedBinaryRow> rowsToUpdate = Map.of(
+ row1Id, tb1,
+ row2Id, tb2,
+ row3Id, tb3
+ );
+ // Do not track write intents to simulate the loss of a volatile state.
+ storageUpdateHandler.handleUpdateAll(txUuid, rowsToUpdate,
partitionId, false, null, null);
+
+ assertEquals(3, storage.rowsCount());
+
+ // Now run cleanup.
+ storageUpdateHandler.handleTransactionCleanup(txUuid, true, commitTs);
+
+ // But the loss of the state results in no cleanup, and the entries
are still write intents.
+ verify(storage, never()).commitWrite(any(), any());
+
+ // Now imagine we have another transaction that resolves the row, does
the cleanup and commits its own data.
+
+ // Resolve one of the rows affected by the committed transaction.
+ storageUpdateHandler.handleWriteIntentRead(txUuid, new
RowId(PARTITION_ID, row1Id));
+
+ // Run the cleanup.
+ storageUpdateHandler.handleTransactionCleanup(txUuid, true, commitTs);
+
+ // Only the discovered write intent was committed, the other two are
still write intents.
+ verify(storage, times(1)).commitWrite(any(), any());
+
+ BinaryRow row4 = binaryRow(new TestKey(1, "foo1"), new TestValue(20,
"bar20"));
+
+ UUID newTxUuid = UUID.randomUUID();
+
+ // Insert new write intent in the new transaction.
+ TimedBinaryRow tb4 = new TimedBinaryRow(row4, null);
+
+ Map<UUID, TimedBinaryRow> rowsToUpdate2 = Map.of(
+ row1Id, tb4
+ );
+ storageUpdateHandler.handleUpdateAll(newTxUuid, rowsToUpdate2,
partitionId, true, null, null);
+
+ // And concurrently the other two intents were also resolved and
scheduled for cleanup.
+ storageUpdateHandler.handleWriteIntentRead(txUuid, new
RowId(PARTITION_ID, row2Id));
+ storageUpdateHandler.handleWriteIntentRead(txUuid, new
RowId(PARTITION_ID, row3Id));
+
+ // Now reset the invocation counter.
+ clearInvocations(storage);
+
+ // Run cleanup for the original transaction
+ storageUpdateHandler.handleTransactionCleanup(txUuid, true, commitTs);
+
+ // Only those two entries will be affected.
+ verify(storage, times(2)).commitWrite(any(), any());
+ }
+
@Test
void testCleanupBeforeUpdateNoData() {
UUID runningTx = UUID.randomUUID();
@@ -322,6 +482,50 @@ public class StorageCleanupTest extends BaseMvStoragesTest
{
verify(indexUpdateHandler, never()).tryRemoveFromIndexes(any(), any(),
any());
}
+ @Test
+ void testCleanupBeforeUpdateNoWriteIntentBatch() {
+ UUID committedTx = UUID.randomUUID();
+
+ TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID);
+
+ // First commit a row
+
+ BinaryRow row1 = binaryRow(new TestKey(1, "foo1"), new TestValue(2,
"bar"));
+
+ UUID rowId = UUID.randomUUID();
+
+ HybridTimestamp commitTs = CLOCK.now();
+
+ Map<UUID, TimedBinaryRow> rowsToUpdate = Map.of(
+ rowId, new TimedBinaryRow(row1, null)
+ );
+
+ storageUpdateHandler.handleUpdateAll(committedTx, rowsToUpdate,
partitionId, true, null, null);
+
+ storageUpdateHandler.handleTransactionCleanup(committedTx, true,
commitTs);
+
+ assertEquals(1, storage.rowsCount());
+
+ assertFalse(storage.read(new RowId(PARTITION_ID, rowId),
HybridTimestamp.MAX_VALUE).isWriteIntent());
+
+ // Now create a new write intent over the committed data. No cleanup
should be triggered.
+
+ BinaryRow row2 = binaryRow(new TestKey(3, "foo3"), new TestValue(4,
"baz"));
+
+ UUID runningTx = UUID.randomUUID();
+
+ clearInvocations(storage, indexUpdateHandler);
+
+ Map<UUID, TimedBinaryRow> rowsToUpdate2 = Map.of(
+ rowId, new TimedBinaryRow(row2, commitTs)
+ );
+ storageUpdateHandler.handleUpdateAll(runningTx, rowsToUpdate2,
partitionId, true, null, null);
+
+ verify(storage, never()).commitWrite(any(), any());
+ verify(storage, never()).abortWrite(any());
+ verify(indexUpdateHandler, never()).tryRemoveFromIndexes(any(), any(),
any());
+ }
+
@Test
void testCleanupBeforeUpdateSameTxOnlyWriteIntent() {
UUID runningTx = UUID.randomUUID();
@@ -359,6 +563,51 @@ public class StorageCleanupTest extends BaseMvStoragesTest
{
verify(indexUpdateHandler, times(1)).tryRemoveFromIndexes(any(),
any(), any());
}
+ @Test
+ void testCleanupBeforeUpdateSameTxOnlyWriteIntentBatch() {
+ UUID runningTx = UUID.randomUUID();
+
+ TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID);
+
+ // Create a write intent.
+
+ BinaryRow row1 = binaryRow(new TestKey(1, "foo1"), new TestValue(2,
"bar"));
+
+ UUID rowId = UUID.randomUUID();
+
+ HybridTimestamp commitTs = CLOCK.now();
+
+ Map<UUID, TimedBinaryRow> rowsToUpdate = Map.of(
+ rowId, new TimedBinaryRow(row1, null)
+ );
+
+ storageUpdateHandler.handleUpdateAll(runningTx, rowsToUpdate,
partitionId, true, null, null);
+
+ assertEquals(1, storage.rowsCount());
+
+ assertTrue(storage.read(new RowId(PARTITION_ID, rowId),
HybridTimestamp.MAX_VALUE).isWriteIntent());
+
+ // Then create another one for the same row in the same transaction.
The entry will be replaced.
+
+ BinaryRow row2 = binaryRow(new TestKey(3, "foo3"), new TestValue(4,
"baz"));
+
+ clearInvocations(storage, indexUpdateHandler);
+
+ Map<UUID, TimedBinaryRow> rowsToUpdate2 = Map.of(
+ rowId, new TimedBinaryRow(row2, commitTs)
+ );
+ // Do not track write intents to simulate the loss of a volatile state.
+ storageUpdateHandler.handleUpdateAll(runningTx, rowsToUpdate2,
partitionId, true, null, null);
+
+ assertEquals(1, storage.rowsCount());
+
+ assertTrue(storage.read(new RowId(PARTITION_ID, rowId),
HybridTimestamp.MAX_VALUE).isWriteIntent());
+
+ verify(storage, never()).commitWrite(any(), any());
+ verify(storage, never()).abortWrite(any());
+ verify(indexUpdateHandler, times(1)).tryRemoveFromIndexes(any(),
any(), any());
+ }
+
@Test
void testCleanupBeforeUpdateDifferentTxOnlyWriteIntent() {
UUID runningTx = UUID.randomUUID();
@@ -399,6 +648,54 @@ public class StorageCleanupTest extends BaseMvStoragesTest
{
verify(indexUpdateHandler, never()).tryRemoveFromIndexes(any(), any(),
any());
}
+ @Test
+ void testCleanupBeforeUpdateDifferentTxOnlyWriteIntentBatch() {
+ UUID runningTx = UUID.randomUUID();
+
+ TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID);
+
+ // Create a new write intent.
+
+ BinaryRow row1 = binaryRow(new TestKey(1, "foo1"), new TestValue(2,
"bar"));
+
+ UUID rowId = UUID.randomUUID();
+
+ HybridTimestamp commitTs = CLOCK.now();
+
+ Map<UUID, TimedBinaryRow> rowsToUpdate = Map.of(
+ rowId, new TimedBinaryRow(row1, null)
+ );
+
+ storageUpdateHandler.handleUpdateAll(runningTx, rowsToUpdate,
partitionId, true, null, null);
+
+ assertEquals(1, storage.rowsCount());
+
+ assertTrue(storage.read(new RowId(PARTITION_ID, rowId),
HybridTimestamp.MAX_VALUE).isWriteIntent());
+
+ // Create another one and pass `last commit time`. The previous value
should be committed automatically.
+
+ BinaryRow row2 = binaryRow(new TestKey(3, "foo3"), new TestValue(4,
"baz"));
+
+ clearInvocations(storage, indexUpdateHandler);
+
+ UUID runningTx2 = UUID.randomUUID();
+
+ // Previous value will be committed even though the cleanup was not
called explicitly.
+ Map<UUID, TimedBinaryRow> rowsToUpdate2 = Map.of(
+ rowId, new TimedBinaryRow(row2, commitTs)
+ );
+
+ storageUpdateHandler.handleUpdateAll(runningTx2, rowsToUpdate2,
partitionId, true, null, null);
+
+ assertEquals(1, storage.rowsCount());
+
+ assertTrue(storage.read(new RowId(PARTITION_ID, rowId),
HybridTimestamp.MAX_VALUE).isWriteIntent());
+
+ verify(storage, times(1)).commitWrite(any(), any());
+ verify(storage, never()).abortWrite(any());
+ verify(indexUpdateHandler, never()).tryRemoveFromIndexes(any(), any(),
any());
+ }
+
@Test
void testCleanupBeforeUpdateAbortWriteIntent() {
UUID committed1 = UUID.randomUUID();
@@ -451,6 +748,70 @@ public class StorageCleanupTest extends BaseMvStoragesTest
{
verify(indexUpdateHandler, times(1)).tryRemoveFromIndexes(any(),
any(), any());
}
+ @Test
+ void testCleanupBeforeUpdateAbortWriteIntentBatch() {
+ UUID committed1 = UUID.randomUUID();
+
+ TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID);
+
+ // First commit an entry.
+
+ BinaryRow row1 = binaryRow(new TestKey(1, "foo1"), new TestValue(2,
"bar"));
+
+ UUID rowId = UUID.randomUUID();
+
+ HybridTimestamp commitTs = CLOCK.now();
+
+ Map<UUID, TimedBinaryRow> rowsToUpdate = Map.of(
+ rowId, new TimedBinaryRow(row1, null)
+ );
+
+ storageUpdateHandler.handleUpdateAll(committed1, rowsToUpdate,
partitionId, true, null, null);
+
+ storageUpdateHandler.handleTransactionCleanup(committed1, true,
commitTs);
+
+ assertEquals(1, storage.rowsCount());
+
+ assertFalse(storage.read(new RowId(PARTITION_ID, rowId),
HybridTimestamp.MAX_VALUE).isWriteIntent());
+
+ // Now add a new write intent.
+
+ BinaryRow row2 = binaryRow(new TestKey(3, "foo3"), new TestValue(4,
"baz"));
+
+ UUID committed2 = UUID.randomUUID();
+
+ Map<UUID, TimedBinaryRow> rowsToUpdate2 = Map.of(
+ rowId, new TimedBinaryRow(row2, commitTs)
+ );
+
+ storageUpdateHandler.handleUpdateAll(committed2, rowsToUpdate2,
partitionId, true, null, null);
+
+ assertTrue(storage.read(new RowId(PARTITION_ID, rowId),
HybridTimestamp.MAX_VALUE).isWriteIntent());
+
+ // Create another write intent and provide `last commit time`.
+
+ clearInvocations(storage, indexUpdateHandler);
+
+ UUID runningTx = UUID.randomUUID();
+
+ BinaryRow row3 = binaryRow(new TestKey(5, "foo5"), new TestValue(7,
"zzu"));
+
+ // Last commit time equal to the time of the previously committed
value => previous write intent will be reverted.
+ Map<UUID, TimedBinaryRow> rowsToUpdate3 = Map.of(
+ rowId, new TimedBinaryRow(row3, commitTs)
+ );
+
+ storageUpdateHandler.handleUpdateAll(runningTx, rowsToUpdate3,
partitionId, true, null, null);
+
+ assertEquals(1, storage.rowsCount());
+
+ assertTrue(storage.read(new RowId(PARTITION_ID, rowId),
HybridTimestamp.MAX_VALUE).isWriteIntent());
+
+ verify(storage, never()).commitWrite(any(), any());
+ verify(storage, times(1)).abortWrite(any());
+ verify(indexUpdateHandler, times(1)).tryRemoveFromIndexes(any(),
any(), any());
+ }
+
@Test
void testCleanupBeforeUpdateCommitWriteIntent() {
UUID committed1 = UUID.randomUUID();
@@ -505,6 +866,72 @@ public class StorageCleanupTest extends BaseMvStoragesTest
{
verify(indexUpdateHandler, never()).tryRemoveFromIndexes(any(), any(),
any());
}
+ @Test
+ void testCleanupBeforeUpdateCommitWriteIntentBatch() {
+ UUID committed1 = UUID.randomUUID();
+
+ TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID);
+
+ // First commit an entry.
+
+ BinaryRow row1 = binaryRow(new TestKey(1, "foo1"), new TestValue(2,
"bar"));
+
+ UUID rowId = UUID.randomUUID();
+
+ HybridTimestamp commitTs = CLOCK.now();
+
+ Map<UUID, TimedBinaryRow> rowsToUpdate = Map.of(
+ rowId, new TimedBinaryRow(row1, null)
+ );
+
+ storageUpdateHandler.handleUpdateAll(committed1, rowsToUpdate,
partitionId, true, null, null);
+
+ storageUpdateHandler.handleTransactionCleanup(committed1, true,
commitTs);
+
+ assertEquals(1, storage.rowsCount());
+
+ assertFalse(storage.read(new RowId(PARTITION_ID, rowId),
HybridTimestamp.MAX_VALUE).isWriteIntent());
+
+ // Now add a new write intent.
+
+ BinaryRow row2 = binaryRow(new TestKey(3, "foo3"), new TestValue(4,
"baz"));
+
+ UUID committed2 = UUID.randomUUID();
+
+ Map<UUID, TimedBinaryRow> rowsToUpdate2 = Map.of(
+ rowId, new TimedBinaryRow(row2, commitTs)
+ );
+
+ storageUpdateHandler.handleUpdateAll(committed2, rowsToUpdate2,
partitionId, true, null, null);
+
+ assertTrue(storage.read(new RowId(PARTITION_ID, rowId),
HybridTimestamp.MAX_VALUE).isWriteIntent());
+
+ // Create another write intent and provide `last commit time`.
+
+ clearInvocations(storage, indexUpdateHandler);
+
+ UUID runningTx = UUID.randomUUID();
+
+ BinaryRow row3 = binaryRow(new TestKey(5, "foo5"), new TestValue(7,
"zzu"));
+
+ HybridTimestamp lastCommitTs = commitTs.addPhysicalTime(100);
+
+ // Last commit time is after the time of the previously committed
value => previous write intent will be committed.
+ Map<UUID, TimedBinaryRow> rowsToUpdate3 = Map.of(
+ rowId, new TimedBinaryRow(row3, lastCommitTs)
+ );
+
+ storageUpdateHandler.handleUpdateAll(runningTx, rowsToUpdate3,
partitionId, true, null, null);
+
+ assertEquals(1, storage.rowsCount());
+
+ assertTrue(storage.read(new RowId(PARTITION_ID, rowId),
HybridTimestamp.MAX_VALUE).isWriteIntent());
+
+ verify(storage, times(1)).commitWrite(any(), any());
+ verify(storage, never()).abortWrite(any());
+ verify(indexUpdateHandler, never()).tryRemoveFromIndexes(any(), any(),
any());
+ }
+
@Test
void testCleanupBeforeUpdateError() {
UUID committed1 = UUID.randomUUID();
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 32ac621d8d..5071d0d5ad 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -128,6 +128,7 @@ import
org.apache.ignite.internal.table.distributed.command.PartitionCommand;
import
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
+import org.apache.ignite.internal.table.distributed.command.UpdateCommandImpl;
import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
import
org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
@@ -1324,6 +1325,35 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
assertThat(writeAfterCleanupFuture,
willThrowFast(TransactionException.class));
}
+ @Test
+ void testWriteIntentBearsLastCommitTimestamp() {
+ BinaryRow br1 = binaryRow(1);
+
+ BinaryRow br2 = binaryRow(2);
+
+ // First insert a row
+ UUID tx0 = newTxId();
+ upsert(tx0, br1);
+ upsert(tx0, br2);
+
+ cleanup(tx0);
+
+ raftClientFutureClosure = partitionCommand -> {
+ assertTrue(partitionCommand instanceof UpdateCommandImpl);
+
+ UpdateCommandImpl impl = (UpdateCommandImpl) partitionCommand;
+
+ assertNotNull(impl.messageRowToUpdate());
+ assertNotNull(impl.messageRowToUpdate().binaryRow());
+ assertNotNull(impl.messageRowToUpdate().timestamp());
+
+ return defaultMockRaftFutureClosure.apply(partitionCommand);
+ };
+
+ UUID tx1 = newTxId();
+ upsert(tx1, br1);
+ }
+
/**
* Puts several records into the storage, optionally leaving them as write
intents, alternately deleting and upserting the same row
* within the same RW transaction, then checking read correctness via read
only request.