This is an automated email from the ASF dual-hosted git repository. vpyatkov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 228b5be6e3 IGNITE-20445 Clean up write intents for RW transaction on replication… (#2657) 228b5be6e3 is described below commit 228b5be6e3176ce40982b41f0dd3cf5c96452b09 Author: Cyrill <cyrill.si...@gmail.com> AuthorDate: Wed Oct 11 11:29:02 2023 +0300 IGNITE-20445 Clean up write intents for RW transaction on replication… (#2657) --- ...ava => ItTxDistributedCleanupRecoveryTest.java} | 157 +++------- ...xDistributedTestSingleNodeNoCleanupMessage.java | 15 +- .../table/distributed/StorageUpdateHandler.java | 65 ++++- .../distributed/command/UpdateAllCommand.java | 21 ++ .../table/distributed/command/UpdateCommand.java | 15 + .../table/distributed/raft/PartitionListener.java | 6 +- .../replicator/PartitionReplicaListener.java | 269 ++++++++++------- .../TimedBinaryRow.java} | 37 +-- .../replicator/TransactionStateResolver.java | 4 +- .../internal/table/distributed/IndexBaseTest.java | 13 +- .../table/distributed/StorageCleanupTest.java | 323 ++++++++++++++++++++- .../distributed/StorageUpdateHandlerTest.java | 5 +- .../ignite/internal/table/TxAbstractTest.java | 27 +- 13 files changed, 682 insertions(+), 275 deletions(-) diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java similarity index 54% copy from modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java copy to modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java index 08c7bcee7a..b01d5e14da 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java @@ -17,19 +17,14 @@ package org.apache.ignite.distributed; -import static java.util.concurrent.CompletableFuture.completedFuture; -import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNull; +import static java.util.concurrent.CompletableFuture.failedFuture; +import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_COMMON_ERR; +import static org.junit.jupiter.api.Assertions.assertThrows; -import java.util.List; import java.util.Map; -import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import org.apache.ignite.internal.catalog.CatalogService; import org.apache.ignite.internal.hlc.HybridClock; @@ -37,7 +32,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.placementdriver.PlacementDriver; import org.apache.ignite.internal.raft.service.RaftGroupService; import org.apache.ignite.internal.replicator.ReplicaResult; -import org.apache.ignite.internal.replicator.ReplicaService; +import org.apache.ignite.internal.replicator.exception.ReplicationException; import org.apache.ignite.internal.replicator.message.ReplicaRequest; import org.apache.ignite.internal.storage.MvPartitionStorage; import org.apache.ignite.internal.table.distributed.IndexLocker; @@ -47,18 +42,12 @@ import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaL import org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver; import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService; import org.apache.ignite.internal.table.distributed.schema.Schemas; -import org.apache.ignite.internal.tx.InternalTransaction; -import org.apache.ignite.internal.tx.LockManager; import org.apache.ignite.internal.tx.TxManager; -import org.apache.ignite.internal.tx.impl.HeapLockManager; -import org.apache.ignite.internal.tx.impl.TransactionIdGenerator; -import org.apache.ignite.internal.tx.impl.TxManagerImpl; import org.apache.ignite.internal.tx.message.TxCleanupReplicaRequest; import org.apache.ignite.internal.tx.storage.state.TxStateStorage; import org.apache.ignite.internal.util.Lazy; import org.apache.ignite.internal.util.PendingComparableValuesTracker; import org.apache.ignite.network.ClusterNode; -import org.apache.ignite.table.Tuple; import org.apache.ignite.tx.TransactionException; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; @@ -66,27 +55,31 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; /** - * Test to Simulate missing cleanup action. + * Durable cleanup test with successful recovery after the failures. */ -public class ItTxDistributedTestSingleNodeNoCleanupMessage extends ItTxDistributedTestSingleNode { - /** A list of background cleanup futures. */ - private final List<CompletableFuture<?>> cleanupFutures = new CopyOnWriteArrayList<>(); +public class ItTxDistributedCleanupRecoveryTest extends ItTxDistributedTestSingleNode { - /** A flag to drop async cleanup actions. */ - private volatile boolean ignoreAsyncCleanup; + private AtomicInteger defaultRetryCount; /** * The constructor. * * @param testInfo Test info. */ - public ItTxDistributedTestSingleNodeNoCleanupMessage(TestInfo testInfo) { + public ItTxDistributedCleanupRecoveryTest(TestInfo testInfo) { super(testInfo); } + private void setDefaultRetryCount(int count) { + defaultRetryCount = new AtomicInteger(count); + } + @BeforeEach @Override public void before() throws Exception { + // The value of 3 is less than the allowed number of cleanup retries. + setDefaultRetryCount(3); + txTestCluster = new ItTxTestCluster( testInfo, raftConfiguration, @@ -97,23 +90,6 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage extends ItTxDistribut startClient(), timestampTracker ) { - @Override - protected TxManagerImpl newTxManager(ReplicaService replicaSvc, HybridClock clock, TransactionIdGenerator generator, - ClusterNode node) { - return new TxManagerImpl(replicaSvc, new HeapLockManager(), clock, generator, node::id) { - @Override - public CompletableFuture<Void> executeCleanupAsync(Runnable runnable) { - if (ignoreAsyncCleanup) { - return completedFuture(null); - } - CompletableFuture<Void> cleanupFuture = super.executeCleanupAsync(runnable); - - cleanupFutures.add(cleanupFuture); - - return cleanupFuture; - } - }; - } @Override protected PartitionReplicaListener newReplicaListener( @@ -161,15 +137,12 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage extends ItTxDistribut ) { @Override public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, String senderId) { - if (request instanceof TxCleanupReplicaRequest) { + if (request instanceof TxCleanupReplicaRequest && defaultRetryCount.getAndDecrement() > 0) { logger().info("Dropping cleanup request: {}", request); - releaseTxLocks( - ((TxCleanupReplicaRequest) request).txId(), - txManager.lockManager() - ); - - return completedFuture(new ReplicaResult(null, null)); + return failedFuture(new ReplicationException( + REPLICA_COMMON_ERR, + "Test Tx Cleanup exception [replicaGroupId=" + request.groupId() + ']')); } return super.invoke(request, senderId); } @@ -187,86 +160,44 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage extends ItTxDistribut log.info("Tables have been started"); } - @Disabled("IGNITE-20395, IGNITE-20445, IGNITE-20446") - @Test - @Override - public void testTransactionAlreadyRolledback() { - super.testTransactionAlreadyRolledback(); - } - @Disabled("IGNITE-20395, IGNITE-20445, IGNITE-20446") @Test @Override - public void testTransactionAlreadyCommitted() { - super.testTransactionAlreadyCommitted(); - } + public void testDeleteUpsertCommit() throws TransactionException { + // The value of 6 is higher than the default retry count. + // So we should give up retrying and crash. + setDefaultRetryCount(6); - @Disabled("IGNITE-20395, IGNITE-20445, IGNITE-20446") - @Test - @Override - public void testBalance() throws InterruptedException { - super.testBalance(); + assertThrows(TransactionException.class, () -> deleteUpsert().commit()); } - @Disabled("IGNITE-20395, IGNITE-20445, IGNITE-20446") + @Disabled("IGNITE-20560") @Test @Override - public void testDelete() throws TransactionException { - super.testDelete(); - } - - @Disabled("IGNITE-20395, IGNITE-20445, IGNITE-20446") - @Test - public void testTwoReadWriteTransactions() throws TransactionException { - Tuple key = makeKey(1); - - assertFalse(accounts.recordView().delete(null, key)); - assertNull(accounts.recordView().get(null, key)); - - // Disable background cleanup to avoid a race. - ignoreAsyncCleanup = true; - - InternalTransaction tx1 = (InternalTransaction) igniteTransactions.begin(); - accounts.recordView().upsert(tx1, makeValue(1, 100.)); - tx1.commit(); + public void testTransactionAlreadyRolledback() { + // The value of 6 is higher than the default retry count. + // So we should give up retrying and crash. + setDefaultRetryCount(6); - InternalTransaction tx2 = (InternalTransaction) igniteTransactions.begin(); - accounts.recordView().upsert(tx2, makeValue(1, 200.)); - tx2.commit(); + testTransactionAlreadyFinished(false, (transaction, txId) -> { + assertThrows(TransactionException.class, transaction::rollback); - assertEquals(200., accounts.recordView().get(null, makeKey(1)).doubleValue("balance")); + log.info("Rolled back transaction {}", txId); + }); } + @Disabled("IGNITE-20560") @Test - public void testTwoReadWriteTransactionsWaitForCleanup() throws TransactionException { - Tuple key = makeKey(1); - - assertFalse(accounts.recordView().delete(null, key)); - assertNull(accounts.recordView().get(null, key)); - - // Start the first transaction. The values it changes will not be cleaned up. - InternalTransaction tx1 = (InternalTransaction) igniteTransactions.begin(); - - accounts.recordView().upsert(tx1, makeValue(1, 100.)); - - tx1.commit(); - - //Now start the seconds transaction and make sure write intent resolution is called by adding a `get` operaiton. - InternalTransaction tx2 = (InternalTransaction) igniteTransactions.begin(); - - assertEquals(100., accounts.recordView().get(tx2, makeKey(1)).doubleValue("balance")); - - // Now wait for the background task to finish. - cleanupFutures.forEach(completableFuture -> assertThat(completableFuture, willCompleteSuccessfully())); - - accounts.recordView().upsert(tx2, makeValue(1, 200.)); - - tx2.commit(); + @Override + public void testTransactionAlreadyCommitted() { + // The value of 6 is higher than the default retry count. + // So we should give up retrying and crash. + setDefaultRetryCount(6); - assertEquals(200., accounts.recordView().get(null, makeKey(1)).doubleValue("balance")); - } + testTransactionAlreadyFinished(true, (transaction, txId) -> { + assertThrows(TransactionException.class, transaction::commit); - private static void releaseTxLocks(UUID txId, LockManager lockManager) { - lockManager.locks(txId).forEachRemaining(lockManager::release); + log.info("Committed transaction {}", txId); + }); } } diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java index 08c7bcee7a..c240591ab9 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java @@ -187,35 +187,28 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage extends ItTxDistribut log.info("Tables have been started"); } - @Disabled("IGNITE-20395, IGNITE-20445, IGNITE-20446") + @Disabled("IGNITE-20560") @Test @Override public void testTransactionAlreadyRolledback() { super.testTransactionAlreadyRolledback(); } - @Disabled("IGNITE-20395, IGNITE-20445, IGNITE-20446") + @Disabled("IGNITE-20560") @Test @Override public void testTransactionAlreadyCommitted() { super.testTransactionAlreadyCommitted(); } - @Disabled("IGNITE-20395, IGNITE-20445, IGNITE-20446") + @Disabled("IGNITE-20395") @Test @Override public void testBalance() throws InterruptedException { super.testBalance(); } - @Disabled("IGNITE-20395, IGNITE-20445, IGNITE-20446") - @Test - @Override - public void testDelete() throws TransactionException { - super.testDelete(); - } - - @Disabled("IGNITE-20395, IGNITE-20445, IGNITE-20446") + @Disabled("IGNITE-20395") @Test public void testTwoReadWriteTransactions() throws TransactionException { Tuple key = makeKey(1); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java index 6e91612520..155d8e792d 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java @@ -107,6 +107,7 @@ public class StorageUpdateHandler { * @param trackWriteIntent If {@code true} then write intent should be tracked. * @param onApplication Callback on application. * @param commitTs Commit timestamp to use on autocommit. + * @param lastCommitTs The timestamp of last known committed entry. */ public void handleUpdate( UUID txId, @@ -115,7 +116,8 @@ public class StorageUpdateHandler { @Nullable BinaryRow row, boolean trackWriteIntent, @Nullable Runnable onApplication, - @Nullable HybridTimestamp commitTs + @Nullable HybridTimestamp commitTs, + @Nullable HybridTimestamp lastCommitTs ) { indexUpdateHandler.waitIndexes(); @@ -126,6 +128,8 @@ public class StorageUpdateHandler { locker.lock(rowId); + performStorageCleanupIfNeeded(txId, rowId, lastCommitTs); + if (commitTs != null) { storage.addWriteCommitted(rowId, row, commitTs); } else { @@ -163,6 +167,7 @@ public class StorageUpdateHandler { * @param trackWriteIntent If {@code true} then write intent should be tracked. * @param onApplication Callback on application. * @param commitTs Commit timestamp to use on autocommit. + * @param lastCommitTsMap A map(Row Id -> timestamp) of timestamps of the most recent commits to the affected rows. */ public void handleUpdateAll( UUID txId, @@ -170,7 +175,8 @@ public class StorageUpdateHandler { TablePartitionId commitPartitionId, boolean trackWriteIntent, @Nullable Runnable onApplication, - @Nullable HybridTimestamp commitTs + @Nullable HybridTimestamp commitTs, + Map<UUID, HybridTimestamp> lastCommitTsMap ) { indexUpdateHandler.waitIndexes(); @@ -190,6 +196,8 @@ public class StorageUpdateHandler { locker.lock(rowId); + performStorageCleanupIfNeeded(txId, rowId, lastCommitTsMap.get(entry.getKey())); + if (commitTs != null) { storage.addWriteCommitted(rowId, row, commitTs); } else { @@ -221,6 +229,59 @@ public class StorageUpdateHandler { executeBatchGc(); } + private void performStorageCleanupIfNeeded(UUID txId, RowId rowId, @Nullable HybridTimestamp lastCommitTs) { + // No previously committed value, this action might be an insert. No need to cleanup. + if (lastCommitTs == null) { + return; + } + + try (Cursor<ReadResult> cursor = storage.scanVersions(rowId)) { + // Okay, lastCommitTs is not null. It means that we are changing the previously committed data. + // However, we could have previously called cleanup for the same row. + // If the previous operation was "delete" and it was executed successfully, no data will be present in the storage. + if (!cursor.hasNext()) { + return; + } + + ReadResult item = cursor.next(); + // If there is a write intent in the storage and this intent was created by a different transaction + // then check the previous entry. + // Otherwise exit the check - everything's fine. + if (item.isWriteIntent() && !txId.equals(item.transactionId())) { + if (!cursor.hasNext()) { + // No more data => the write intent we have is actually the first version of this row + // and lastCommitTs is the commit timestamp of it. + // Action: commit this write intent. + performCommitWrite(item.transactionId(), Set.of(rowId), lastCommitTs); + return; + } + // Otherwise there are other versions in the chain. + ReadResult committedItem = cursor.next(); + + // They should be regular entries, not write intents. + assert !committedItem.isWriteIntent() : "Cannot have more than one write intent per row"; + + assert lastCommitTs.compareTo(committedItem.commitTimestamp()) >= 0 : + "Primary commit timestamp " + lastCommitTs + " is earlier than local commit timestamp " + + committedItem.commitTimestamp(); + + if (lastCommitTs.compareTo(committedItem.commitTimestamp()) > 0) { + // We see that lastCommitTs is later than the timestamp of the committed value => we need to commit the write intent. + // Action: commit this write intent. + performCommitWrite(item.transactionId(), Set.of(rowId), lastCommitTs); + } else { + // lastCommitTs == committedItem.commitTimestamp() + // So we see a write intent from a different transaction, which was not committed on primary. + // Because of transaction locks we cannot have two transactions creating write intents for the same row. + // So if we got up to here, it means that the previous transaction was aborted, + // but the storage was not cleaned after it. + // Action: abort this write intent. + performAbortWrite(item.transactionId(), Set.of(rowId)); + } + } + } + } + void executeBatchGc() { HybridTimestamp lwm = lowWatermark.getLowWatermark(); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java index 470333b64f..4d75fd22be 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java @@ -17,11 +17,16 @@ package org.apache.ignite.internal.table.distributed.command; +import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp; + +import java.util.HashMap; import java.util.Map; import java.util.UUID; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.table.distributed.TableMessageGroup; import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage; import org.apache.ignite.network.annotations.Transferable; +import org.jetbrains.annotations.Nullable; /** * State machine command for updating a batch of entries. @@ -33,4 +38,20 @@ public interface UpdateAllCommand extends PartitionCommand { Map<UUID, BinaryRowMessage> rowsToUpdate(); String txCoordinatorId(); + + // TODO: IGNITE-20609 row id in this map duplicates row id in rowsToUpdate. + @Nullable Map<UUID, Long> lastCommitTimestampsLong(); + + /** + * Returns the timestamps of the last committed entries for each row. + */ + default Map<UUID, HybridTimestamp> lastCommitTimestamps() { + Map<UUID, HybridTimestamp> map = new HashMap<>(); + + Map<UUID, Long> uuidLongMap = lastCommitTimestampsLong(); + if (uuidLongMap != null) { + uuidLongMap.forEach((uuid, ts) -> map.put(uuid, hybridTimestamp(ts))); + } + return map; + } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java index 821e449311..3e47dfd574 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java @@ -17,7 +17,10 @@ package org.apache.ignite.internal.table.distributed.command; +import static org.apache.ignite.internal.hlc.HybridTimestamp.nullableHybridTimestamp; + import java.util.UUID; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.table.distributed.TableMessageGroup; import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage; @@ -45,4 +48,16 @@ public interface UpdateCommand extends PartitionCommand { return message == null ? null : message.asBinaryRow(); } + + /** + * Returns the timestamp of the last committed entry. + */ + long lastCommitTimestampLong(); + + /** + * Returns the timestamp of the last committed entry. + */ + default @Nullable HybridTimestamp lastCommitTimestamp() { + return nullableHybridTimestamp(lastCommitTimestampLong()); + } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java index 6ed5a8464b..9456db2c27 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java @@ -238,7 +238,8 @@ public class PartitionListener implements RaftGroupListener { storageUpdateHandler.handleUpdate(cmd.txId(), cmd.rowUuid(), cmd.tablePartitionId().asTablePartitionId(), cmd.row(), !cmd.full(), () -> storage.lastApplied(commandIndex, commandTerm), - cmd.full() ? cmd.safeTime() : null + cmd.full() ? cmd.safeTime() : null, + cmd.lastCommitTimestamp() ); } @@ -266,7 +267,8 @@ public class PartitionListener implements RaftGroupListener { storageUpdateHandler.handleUpdateAll(cmd.txId(), cmd.rowsToUpdate(), cmd.tablePartitionId().asTablePartitionId(), !cmd.full(), () -> storage.lastApplied(commandIndex, commandTerm), - cmd.full() ? cmd.safeTime() : null + cmd.full() ? cmd.safeTime() : null, + cmd.lastCommitTimestamps() ); updateTrackerIgnoringTrackerClosedException(safeTime, cmd.safeTime()); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java index ee6b2183a8..189929cf27 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java @@ -18,12 +18,14 @@ package org.apache.ignite.internal.table.distributed.replicator; import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; import static java.util.Objects.requireNonNull; import static java.util.concurrent.CompletableFuture.allOf; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.failedFuture; import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp; import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; import static org.apache.ignite.internal.tx.TxState.ABANDONED; @@ -48,6 +50,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Objects; import java.util.Set; import java.util.UUID; @@ -59,11 +62,9 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.Executor; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; -import java.util.stream.Collectors; import org.apache.ignite.internal.binarytuple.BinaryTupleCommon; import org.apache.ignite.internal.catalog.CatalogService; import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; @@ -71,6 +72,7 @@ import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteBiTuple; import org.apache.ignite.internal.lang.IgniteInternalException; +import org.apache.ignite.internal.lang.IgniteTriFunction; import org.apache.ignite.internal.lang.IgniteUuid; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; @@ -514,7 +516,7 @@ public class PartitionReplicaListener implements ReplicaListener { * @return Result future. */ private CompletableFuture<List<BinaryRow>> retrieveExactEntriesUntilCursorEmpty( - @Nullable UUID txId, + UUID txId, @Nullable HybridTimestamp readTimestamp, IgniteUuid cursorId, int count @@ -522,14 +524,20 @@ public class PartitionReplicaListener implements ReplicaListener { @SuppressWarnings("resource") PartitionTimestampCursor cursor = (PartitionTimestampCursor) cursors.computeIfAbsent(cursorId, id -> mvDataStorage.scan(readTimestamp == null ? HybridTimestamp.MAX_VALUE : readTimestamp)); - var resolutionFuts = new ArrayList<CompletableFuture<BinaryRow>>(count); + var resolutionFuts = new ArrayList<CompletableFuture<TimedBinaryRow>>(count); while (resolutionFuts.size() < count && cursor.hasNext()) { ReadResult readResult = cursor.next(); HybridTimestamp newestCommitTimestamp = readResult.newestCommitTimestamp(); - BinaryRow candidate = - newestCommitTimestamp == null || !readResult.isWriteIntent() ? null : cursor.committed(newestCommitTimestamp); + TimedBinaryRow candidate; + if (newestCommitTimestamp == null || !readResult.isWriteIntent()) { + candidate = null; + } else { + BinaryRow committedRow = cursor.committed(newestCommitTimestamp); + + candidate = committedRow == null ? null : new TimedBinaryRow(committedRow, newestCommitTimestamp); + } resolutionFuts.add(resolveReadResult(readResult, txId, readTimestamp, () -> candidate)); } @@ -537,11 +545,11 @@ public class PartitionReplicaListener implements ReplicaListener { return allOf(resolutionFuts.toArray(new CompletableFuture[0])).thenCompose(unused -> { var rows = new ArrayList<BinaryRow>(count); - for (CompletableFuture<BinaryRow> resolutionFut : resolutionFuts) { - BinaryRow resolvedReadResult = resolutionFut.join(); + for (CompletableFuture<TimedBinaryRow> resolutionFut : resolutionFuts) { + TimedBinaryRow resolvedReadResult = resolutionFut.join(); - if (resolvedReadResult != null) { - rows.add(resolvedReadResult); + if (resolvedReadResult != null && resolvedReadResult.binaryRow() != null) { + rows.add(resolvedReadResult.binaryRow()); } } @@ -965,8 +973,10 @@ public class PartitionReplicaListener implements ReplicaListener { RowId rowId = indexRow.rowId(); return resolvePlainReadResult(rowId, null, timestamp).thenComposeAsync(resolvedReadResult -> { - if (resolvedReadResult != null && indexRowMatches(indexRow, resolvedReadResult, schemaAwareIndexStorage)) { - result.add(resolvedReadResult); + if (resolvedReadResult != null + && resolvedReadResult.binaryRow() != null + && indexRowMatches(indexRow, resolvedReadResult.binaryRow(), schemaAwareIndexStorage)) { + result.add(resolvedReadResult.binaryRow()); } return continueReadOnlyIndexScan(schemaAwareIndexStorage, cursor, timestamp, batchSize, result); @@ -1009,10 +1019,10 @@ public class PartitionReplicaListener implements ReplicaListener { return lockManager.acquire(txId, new LockKey(tableId(), rowId), LockMode.S) .thenComposeAsync(rowLock -> { // Table row S lock return resolvePlainReadResult(rowId, txId).thenCompose(resolvedReadResult -> { - if (resolvedReadResult != null) { - if (indexRowMatches(currentRow, resolvedReadResult, schemaAwareIndexStorage)) { - result.add(resolvedReadResult); - } + if (resolvedReadResult != null + && resolvedReadResult.binaryRow() != null + && indexRowMatches(currentRow, resolvedReadResult.binaryRow(), schemaAwareIndexStorage)) { + result.add(resolvedReadResult.binaryRow()); } // Proceed scan. @@ -1059,8 +1069,8 @@ public class PartitionReplicaListener implements ReplicaListener { return lockManager.acquire(txId, new LockKey(tableId(), rowId), LockMode.S) .thenComposeAsync(rowLock -> { // Table row S lock return resolvePlainReadResult(rowId, txId).thenCompose(resolvedReadResult -> { - if (resolvedReadResult != null) { - result.add(resolvedReadResult); + if (resolvedReadResult != null && resolvedReadResult.binaryRow() != null) { + result.add(resolvedReadResult.binaryRow()); } // Proceed lookup. @@ -1077,7 +1087,11 @@ public class PartitionReplicaListener implements ReplicaListener { * @param timestamp Read timestamp. * @return Future finishes with the resolved binary row. */ - private CompletableFuture<BinaryRow> resolvePlainReadResult(RowId rowId, @Nullable UUID txId, @Nullable HybridTimestamp timestamp) { + private CompletableFuture<@Nullable TimedBinaryRow> resolvePlainReadResult( + RowId rowId, + @Nullable UUID txId, + @Nullable HybridTimestamp timestamp + ) { ReadResult readResult = mvDataStorage.read(rowId, timestamp == null ? HybridTimestamp.MAX_VALUE : timestamp); return resolveReadResult(readResult, txId, timestamp, () -> { @@ -1091,7 +1105,7 @@ public class PartitionReplicaListener implements ReplicaListener { "The result is not committed [rowId=" + rowId + ", timestamp=" + readResult.newestCommitTimestamp() + ']'; - return committedReadResult.binaryRow(); + return new TimedBinaryRow(committedReadResult.binaryRow(), committedReadResult.commitTimestamp()); }); } @@ -1102,13 +1116,13 @@ public class PartitionReplicaListener implements ReplicaListener { * @param txId Transaction id. * @return Future finishes with the resolved binary row. */ - private CompletableFuture<BinaryRow> resolvePlainReadResult(RowId rowId, UUID txId) { + private CompletableFuture<@Nullable TimedBinaryRow> resolvePlainReadResult(RowId rowId, UUID txId) { return resolvePlainReadResult(rowId, txId, null).thenCompose(row -> { - if (row == null) { + if (row == null || row.binaryRow() == null) { return completedFuture(null); } - return schemaCompatValidator.validateBackwards(row.schemaVersion(), tableId(), txId) + return schemaCompatValidator.validateBackwards(row.binaryRow().schemaVersion(), tableId(), txId) .thenApply(validationResult -> { if (validationResult.isSuccessful()) { return row; @@ -1121,27 +1135,6 @@ public class PartitionReplicaListener implements ReplicaListener { }); } - private CompletableFuture<Void> continueReadOnlyIndexLookup( - Cursor<RowId> indexCursor, - HybridTimestamp timestamp, - int batchSize, - List<BinaryRow> result - ) { - if (result.size() >= batchSize || !indexCursor.hasNext()) { - return completedFuture(null); - } - - RowId rowId = indexCursor.next(); - - return resolvePlainReadResult(rowId, null, timestamp).thenComposeAsync(resolvedReadResult -> { - if (resolvedReadResult != null) { - result.add(resolvedReadResult); - } - - return continueReadOnlyIndexLookup(indexCursor, timestamp, batchSize, result); - }, scanRequestExecutor); - } - /** * Processes transaction finish request. * <ol> @@ -1433,7 +1426,7 @@ public class PartitionReplicaListener implements ReplicaListener { private <T> CompletableFuture<T> resolveRowByPk( BinaryTuple pk, UUID txId, - BiFunction<@Nullable RowId, @Nullable BinaryRow, CompletableFuture<T>> action + IgniteTriFunction<@Nullable RowId, @Nullable BinaryRow, @Nullable HybridTimestamp, CompletableFuture<T>> action ) { IndexLocker pkLocker = indexesLockers.get().get(pkIndexStorage.get().id()); @@ -1466,22 +1459,21 @@ public class PartitionReplicaListener implements ReplicaListener { private <T> CompletableFuture<T> continueResolvingByPk( Cursor<RowId> cursor, UUID txId, - BiFunction<@Nullable RowId, @Nullable BinaryRow, CompletableFuture<T>> action + IgniteTriFunction<@Nullable RowId, @Nullable BinaryRow, @Nullable HybridTimestamp, CompletableFuture<T>> action ) { if (!cursor.hasNext()) { - return action.apply(null, null); + return action.apply(null, null, null); } RowId rowId = cursor.next(); return resolvePlainReadResult(rowId, txId).thenCompose(row -> { - if (row != null) { - return action.apply(rowId, row); + if (row != null && row.binaryRow() != null) { + return action.apply(rowId, row.binaryRow(), row.commitTimestamp()); } else { return continueResolvingByPk(cursor, txId, action); } }); - } /** @@ -1557,7 +1549,7 @@ public class PartitionReplicaListener implements ReplicaListener { * @param ts A timestamp regarding which we need to resolve the given row. * @return Result of the given action. */ - private CompletableFuture<BinaryRow> resolveRowByPkForReadOnly(BinaryTuple pk, HybridTimestamp ts) { + private CompletableFuture<@Nullable BinaryRow> resolveRowByPkForReadOnly(BinaryTuple pk, HybridTimestamp ts) { // Indexes store values associated with different versions of one entry. // It's possible to have multiple entries for a particular search key // only if we insert, delete and again insert an entry with the same indexed fields. @@ -1717,14 +1709,20 @@ public class PartitionReplicaListener implements ReplicaListener { case RW_DELETE_EXACT_ALL: { CompletableFuture<RowId>[] deleteExactLockFuts = new CompletableFuture[searchRows.size()]; + Map<UUID, HybridTimestamp> lastCommitTimes = new HashMap<>(); + for (int i = 0; i < searchRows.size(); i++) { BinaryRow searchRow = searchRows.get(i); - deleteExactLockFuts[i] = resolveRowByPk(extractPk(searchRow), txId, (rowId, row) -> { + deleteExactLockFuts[i] = resolveRowByPk(extractPk(searchRow), txId, (rowId, row, lastCommitTime) -> { if (rowId == null) { return completedFuture(null); } + if (lastCommitTime != null) { + lastCommitTimes.put(rowId.uuid(), lastCommitTime); + } + return takeLocksForDeleteExact(searchRow, rowId, row, txId); }); } @@ -1747,7 +1745,7 @@ public class PartitionReplicaListener implements ReplicaListener { return completedFuture(new ReplicaResult(result, null)); } - return validateAtTimestampAndBuildUpdateAllCommand(request, rowIdsToDelete, txCoordinatorId) + return validateAtTimestampAndBuildUpdateAllCommand(request, rowIdsToDelete, lastCommitTimes, txCoordinatorId) .thenCompose(cmd -> applyUpdateAllCommand(cmd, request.skipDelayedAck())) .thenApply(res -> new ReplicaResult(result, res)); }); @@ -1762,7 +1760,7 @@ public class PartitionReplicaListener implements ReplicaListener { pks.add(pk); - pkReadLockFuts[i] = resolveRowByPk(pk, txId, (rowId, row) -> completedFuture(rowId)); + pkReadLockFuts[i] = resolveRowByPk(pk, txId, (rowId, row, lastCommitTime) -> completedFuture(rowId)); } return allOf(pkReadLockFuts).thenCompose(ignore -> { @@ -1794,7 +1792,7 @@ public class PartitionReplicaListener implements ReplicaListener { } Map<UUID, BinaryRowMessage> convertedMap = rowsToInsert.entrySet().stream() - .collect(Collectors.toMap( + .collect(toMap( e -> e.getKey().uuid(), e -> MSG_FACTORY.binaryRowMessage() .binaryTuple(e.getValue().tupleSlice()) @@ -1803,7 +1801,9 @@ public class PartitionReplicaListener implements ReplicaListener { )); return allOf(insertLockFuts) - .thenCompose(ignored -> validateAtTimestampAndBuildUpdateAllCommand(request, convertedMap, txCoordinatorId)) + .thenCompose(ignored -> + // We are inserting completely new rows - no need to cleanup anything in this case, hence empty times. + validateAtTimestampAndBuildUpdateAllCommand(request, convertedMap, emptyMap(), txCoordinatorId)) .thenCompose(cmd -> applyUpdateAllCommand(cmd, request.skipDelayedAck())) .thenApply(res -> { // Release short term locks. @@ -1819,14 +1819,20 @@ public class PartitionReplicaListener implements ReplicaListener { case RW_UPSERT_ALL: { CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>>[] rowIdFuts = new CompletableFuture[searchRows.size()]; + Map<UUID, HybridTimestamp> lastCommitTimes = new HashMap<>(); + for (int i = 0; i < searchRows.size(); i++) { BinaryRow searchRow = searchRows.get(i); - rowIdFuts[i] = resolveRowByPk(extractPk(searchRow), txId, (rowId, row) -> { + rowIdFuts[i] = resolveRowByPk(extractPk(searchRow), txId, (rowId, row, lastCommitTime) -> { boolean insert = rowId == null; RowId rowId0 = insert ? new RowId(partId(), UUID.randomUUID()) : rowId; + if (lastCommitTime != null) { + lastCommitTimes.put(rowId.uuid(), lastCommitTime); + } + return insert ? takeLocksForInsert(searchRow, rowId0, txId) : takeLocksForUpdate(searchRow, rowId0, txId); @@ -1847,7 +1853,7 @@ public class PartitionReplicaListener implements ReplicaListener { return completedFuture(new ReplicaResult(null, null)); } - return validateAtTimestampAndBuildUpdateAllCommand(request, rowsToUpdate, txCoordinatorId) + return validateAtTimestampAndBuildUpdateAllCommand(request, rowsToUpdate, lastCommitTimes, txCoordinatorId) .thenCompose(cmd -> applyUpdateAllCommand(cmd, request.skipDelayedAck())) .thenApply(res -> { // Release short term locks. @@ -1867,7 +1873,6 @@ public class PartitionReplicaListener implements ReplicaListener { } } - /** * Precesses multi request. * @@ -1888,7 +1893,7 @@ public class PartitionReplicaListener implements ReplicaListener { CompletableFuture<BinaryRow>[] rowFuts = new CompletableFuture[primaryKeys.size()]; for (int i = 0; i < primaryKeys.size(); i++) { - rowFuts[i] = resolveRowByPk(primaryKeys.get(i), txId, (rowId, row) -> { + rowFuts[i] = resolveRowByPk(primaryKeys.get(i), txId, (rowId, row, lastCommitTime) -> { if (rowId == null) { return completedFuture(null); } @@ -1917,12 +1922,18 @@ public class PartitionReplicaListener implements ReplicaListener { case RW_DELETE_ALL: { CompletableFuture<RowId>[] rowIdLockFuts = new CompletableFuture[primaryKeys.size()]; + Map<UUID, HybridTimestamp> lastCommitTimes = new HashMap<>(); + for (int i = 0; i < primaryKeys.size(); i++) { - rowIdLockFuts[i] = resolveRowByPk(primaryKeys.get(i), txId, (rowId, row) -> { + rowIdLockFuts[i] = resolveRowByPk(primaryKeys.get(i), txId, (rowId, row, lastCommitTime) -> { if (rowId == null) { return completedFuture(null); } + if (lastCommitTime != null) { + lastCommitTimes.put(rowId.uuid(), lastCommitTime); + } + return takeLocksForDelete(row, rowId, txId); }); } @@ -1947,7 +1958,7 @@ public class PartitionReplicaListener implements ReplicaListener { return completedFuture(new ReplicaResult(result, null)); } - return validateAtTimestampAndBuildUpdateAllCommand(request, rowIdsToDelete, txCoordinatorId) + return validateAtTimestampAndBuildUpdateAllCommand(request, rowIdsToDelete, lastCommitTimes, txCoordinatorId) .thenCompose(cmd -> applyUpdateAllCommand(cmd, request.skipDelayedAck())) .thenApply(res -> new ReplicaResult(result, res)); }); @@ -2016,7 +2027,8 @@ public class PartitionReplicaListener implements ReplicaListener { cmd.row(), true, null, - null); + null, + cmd.lastCommitTimestamp()); // TODO: https://issues.apache.org/jira/browse/IGNITE-20124 tmp synchronized (safeTime) { @@ -2039,7 +2051,8 @@ public class PartitionReplicaListener implements ReplicaListener { cmd.row(), false, null, - cmd.safeTime()); + cmd.safeTime(), + cmd.lastCommitTimestamp()); updateTrackerIgnoringTrackerClosedException(safeTime, cmd.safeTime()); } @@ -2065,7 +2078,8 @@ public class PartitionReplicaListener implements ReplicaListener { cmd.tablePartitionId().asTablePartitionId(), true, null, - null); + null, + cmd.lastCommitTimestamps()); // TODO: https://issues.apache.org/jira/browse/IGNITE-20124 tmp synchronized (safeTime) { @@ -2091,7 +2105,8 @@ public class PartitionReplicaListener implements ReplicaListener { cmd.tablePartitionId().asTablePartitionId(), true, null, - null); + null, + cmd.lastCommitTimestamps()); updateTrackerIgnoringTrackerClosedException(safeTime, cmd.safeTime()); } @@ -2110,7 +2125,8 @@ public class PartitionReplicaListener implements ReplicaListener { cmd.tablePartitionId().asTablePartitionId(), false, null, - cmd.safeTime()); + cmd.safeTime(), + cmd.lastCommitTimestamps()); updateTrackerIgnoringTrackerClosedException(safeTime, cmd.safeTime()); } @@ -2155,7 +2171,7 @@ public class PartitionReplicaListener implements ReplicaListener { switch (request.requestType()) { case RW_DELETE_EXACT: { - return resolveRowByPk(extractPk(searchRow), txId, (rowId, row) -> { + return resolveRowByPk(extractPk(searchRow), txId, (rowId, row, lastCommitTime) -> { if (rowId == null) { return completedFuture(new ReplicaResult(false, null)); } @@ -2166,14 +2182,20 @@ public class PartitionReplicaListener implements ReplicaListener { return completedFuture(new ReplicaResult(false, request.full() ? null : completedFuture(null))); } - return validateAtTimestampAndBuildUpdateCommand(request, validatedRowId.uuid(), null, txCoordinatorId) + return validateAtTimestampAndBuildUpdateCommand( + request, + validatedRowId.uuid(), + null, + lastCommitTime, + txCoordinatorId + ) .thenCompose(this::applyUpdateCommand) .thenApply(res -> new ReplicaResult(true, res)); }); }); } case RW_INSERT: { - return resolveRowByPk(extractPk(searchRow), txId, (rowId, row) -> { + return resolveRowByPk(extractPk(searchRow), txId, (rowId, row, lastCommitTime) -> { if (rowId != null) { return completedFuture(new ReplicaResult(false, null)); } @@ -2182,7 +2204,7 @@ public class PartitionReplicaListener implements ReplicaListener { return takeLocksForInsert(searchRow, rowId0, txId) .thenCompose(rowIdLock -> validateAtTimestampAndBuildUpdateCommand(request, rowId0.uuid(), searchRow, - txCoordinatorId + lastCommitTime, txCoordinatorId ) .thenCompose(this::applyUpdateCommand) .thenApply(res -> new IgniteBiTuple<>(res, rowIdLock))) @@ -2195,7 +2217,7 @@ public class PartitionReplicaListener implements ReplicaListener { }); } case RW_UPSERT: { - return resolveRowByPk(extractPk(searchRow), txId, (rowId, row) -> { + return resolveRowByPk(extractPk(searchRow), txId, (rowId, row, lastCommitTime) -> { boolean insert = rowId == null; RowId rowId0 = insert ? new RowId(partId(), UUID.randomUUID()) : rowId; @@ -2206,7 +2228,7 @@ public class PartitionReplicaListener implements ReplicaListener { return lockFut .thenCompose(rowIdLock -> validateAtTimestampAndBuildUpdateCommand(request, rowId0.uuid(), searchRow, - txCoordinatorId + lastCommitTime, txCoordinatorId ) .thenCompose(this::applyUpdateCommand) .thenApply(res -> new IgniteBiTuple<>(res, rowIdLock))) @@ -2219,7 +2241,7 @@ public class PartitionReplicaListener implements ReplicaListener { }); } case RW_GET_AND_UPSERT: { - return resolveRowByPk(extractPk(searchRow), txId, (rowId, row) -> { + return resolveRowByPk(extractPk(searchRow), txId, (rowId, row, lastCommitTime) -> { boolean insert = rowId == null; RowId rowId0 = insert ? new RowId(partId(), UUID.randomUUID()) : rowId; @@ -2230,7 +2252,7 @@ public class PartitionReplicaListener implements ReplicaListener { return lockFut .thenCompose(rowIdLock -> validateAtTimestampAndBuildUpdateCommand(request, rowId0.uuid(), searchRow, - txCoordinatorId + lastCommitTime, txCoordinatorId ) .thenCompose(this::applyUpdateCommand) .thenApply(res -> new IgniteBiTuple<>(res, rowIdLock))) @@ -2243,14 +2265,14 @@ public class PartitionReplicaListener implements ReplicaListener { }); } case RW_GET_AND_REPLACE: { - return resolveRowByPk(extractPk(searchRow), txId, (rowId, row) -> { + return resolveRowByPk(extractPk(searchRow), txId, (rowId, row, lastCommitTime) -> { if (rowId == null) { return completedFuture(new ReplicaResult(null, null)); } return takeLocksForUpdate(searchRow, rowId, txId) .thenCompose(rowIdLock -> validateAtTimestampAndBuildUpdateCommand(request, rowId.uuid(), searchRow, - txCoordinatorId + lastCommitTime, txCoordinatorId ) .thenCompose(this::applyUpdateCommand) .thenApply(res -> new IgniteBiTuple<>(res, rowIdLock))) @@ -2263,14 +2285,14 @@ public class PartitionReplicaListener implements ReplicaListener { }); } case RW_REPLACE_IF_EXIST: { - return resolveRowByPk(extractPk(searchRow), txId, (rowId, row) -> { + return resolveRowByPk(extractPk(searchRow), txId, (rowId, row, lastCommitTime) -> { if (rowId == null) { return completedFuture(new ReplicaResult(false, null)); } return takeLocksForUpdate(searchRow, rowId, txId) .thenCompose(rowIdLock -> validateAtTimestampAndBuildUpdateCommand(request, rowId.uuid(), searchRow, - txCoordinatorId + lastCommitTime, txCoordinatorId ) .thenCompose(this::applyUpdateCommand) .thenApply(res -> new IgniteBiTuple<>(res, rowIdLock))) @@ -2306,7 +2328,7 @@ public class PartitionReplicaListener implements ReplicaListener { switch (request.requestType()) { case RW_GET: { - return resolveRowByPk(primaryKey, txId, (rowId, row) -> { + return resolveRowByPk(primaryKey, txId, (rowId, row, lastCommitTime) -> { if (rowId == null) { return completedFuture(null); } @@ -2317,25 +2339,38 @@ public class PartitionReplicaListener implements ReplicaListener { }); } case RW_DELETE: { - return resolveRowByPk(primaryKey, txId, (rowId, row) -> { + return resolveRowByPk(primaryKey, txId, (rowId, row, lastCommitTime) -> { if (rowId == null) { return completedFuture(new ReplicaResult(false, null)); } return takeLocksForDelete(row, rowId, txId) - .thenCompose(rowLock -> validateAtTimestampAndBuildUpdateCommand(request, rowId.uuid(), null, txCoordinatorId)) + .thenCompose(rowLock -> + validateAtTimestampAndBuildUpdateCommand( + request, + rowId.uuid(), + null, + lastCommitTime, + txCoordinatorId)) .thenCompose(this::applyUpdateCommand) .thenApply(res -> new ReplicaResult(true, res)); }); } case RW_GET_AND_DELETE: { - return resolveRowByPk(primaryKey, txId, (rowId, row) -> { + return resolveRowByPk(primaryKey, txId, (rowId, row, lastCommitTime) -> { if (rowId == null) { return completedFuture(null); } return takeLocksForDelete(row, rowId, txId) - .thenCompose(ignored -> validateAtTimestampAndBuildUpdateCommand(request, rowId.uuid(), null, txCoordinatorId)) + .thenCompose(ignored -> + validateAtTimestampAndBuildUpdateCommand( + request, + rowId.uuid(), + null, + lastCommitTime, + txCoordinatorId) + ) .thenCompose(this::applyUpdateCommand) .thenApply(res -> new ReplicaResult(row, res)); }); @@ -2506,7 +2541,7 @@ public class PartitionReplicaListener implements ReplicaListener { UUID txId = request.transactionId(); if (request.requestType() == RequestType.RW_REPLACE) { - return resolveRowByPk(extractPk(newRow), txId, (rowId, row) -> { + return resolveRowByPk(extractPk(newRow), txId, (rowId, row, lastCommitTime) -> { if (rowId == null) { return completedFuture(new ReplicaResult(false, null)); } @@ -2518,7 +2553,7 @@ public class PartitionReplicaListener implements ReplicaListener { } return validateAtTimestampAndBuildUpdateCommand(commitPartitionId.asTablePartitionId(), - rowIdLock.get1().uuid(), newRow, txId, request.full(), txCoordinatorId + rowIdLock.get1().uuid(), newRow, lastCommitTime, txId, request.full(), txCoordinatorId ) .thenCompose(this::applyUpdateCommand) .thenApply(res -> new IgniteBiTuple<>(res, rowIdLock)) @@ -2543,7 +2578,7 @@ public class PartitionReplicaListener implements ReplicaListener { * @param txId Transaction id. * @return Future completes with tuple {@link RowId} and collection of {@link Lock} or {@code null} if there is no suitable row. */ - private CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> takeLocksForReplace(BinaryRow expectedRow, BinaryRow oldRow, + private CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> takeLocksForReplace(BinaryRow expectedRow, @Nullable BinaryRow oldRow, BinaryRow newRow, RowId rowId, UUID txId) { return lockManager.acquire(txId, new LockKey(tableId()), LockMode.IX) .thenCompose(ignored -> lockManager.acquire(txId, new LockKey(tableId(), rowId), LockMode.S)) @@ -2636,24 +2671,24 @@ public class PartitionReplicaListener implements ReplicaListener { * @param lastCommitted Action to get the latest committed row. * @return Future to resolved binary row. */ - private CompletableFuture<BinaryRow> resolveReadResult( + private CompletableFuture<@Nullable TimedBinaryRow> resolveReadResult( ReadResult readResult, @Nullable UUID txId, @Nullable HybridTimestamp timestamp, - Supplier<BinaryRow> lastCommitted + Supplier<@Nullable TimedBinaryRow> lastCommitted ) { if (readResult == null) { return completedFuture(null); } else if (!readResult.isWriteIntent()) { - return completedFuture(readResult.binaryRow()); + return completedFuture(new TimedBinaryRow(readResult.binaryRow(), readResult.commitTimestamp())); } else { - // RW resolution. + // RW write intent resolution. if (timestamp == null) { UUID retrievedResultTxId = readResult.transactionId(); if (txId.equals(retrievedResultTxId)) { - // Same transaction - return retrieved value. It may be both writeIntent or regular value. - return completedFuture(readResult.binaryRow()); + // Same transaction - return the retrieved value. It may be either a writeIntent or a regular value. + return completedFuture(new TimedBinaryRow(readResult.binaryRow())); } } @@ -2669,16 +2704,21 @@ public class PartitionReplicaListener implements ReplicaListener { * @param lastCommitted Action to get a last committed row. * @return Result future. */ - private CompletableFuture<BinaryRow> resolveWriteIntentAsync( + private CompletableFuture<@Nullable TimedBinaryRow> resolveWriteIntentAsync( ReadResult readResult, - HybridTimestamp timestamp, - Supplier<BinaryRow> lastCommitted + @Nullable HybridTimestamp timestamp, + Supplier<@Nullable TimedBinaryRow> lastCommitted ) { return inBusyLockAsync(busyLock, () -> resolveWriteIntentReadability(readResult, timestamp) .thenApply(writeIntentReadable -> inBusyLock(busyLock, () -> - writeIntentReadable ? readResult.binaryRow() : lastCommitted.get() + // TODO: If the write intent is readable - it's either committed or aborted. + // Local (primary) cleanup should be done first. + // https://issues.apache.org/jira/browse/IGNITE-20395 + // Once the cleanup is performed, the timestamp should be added to + // `new TimedBinaryRow(readResult.binaryRow())`. + writeIntentReadable ? new TimedBinaryRow(readResult.binaryRow()) : lastCommitted.get() ) ) ); @@ -2726,7 +2766,7 @@ public class PartitionReplicaListener implements ReplicaListener { * @return The future completes with {@code true} when the transaction is committed and commit time <= read time, {@code false} * otherwise (whe the transaction is either in progress, or aborted, or committed and commit time > read time). */ - private CompletableFuture<Boolean> resolveWriteIntentReadability(ReadResult writeIntent, HybridTimestamp timestamp) { + private CompletableFuture<Boolean> resolveWriteIntentReadability(ReadResult writeIntent, @Nullable HybridTimestamp timestamp) { UUID txId = writeIntent.transactionId(); return transactionStateResolver.resolveTxState( @@ -2789,12 +2829,14 @@ public class PartitionReplicaListener implements ReplicaListener { ReadWriteSingleRowReplicaRequest request, UUID rowUuid, @Nullable BinaryRow row, + @Nullable HybridTimestamp lastCommitTimestamp, String txCoordinatorId ) { return validateAtTimestampAndBuildUpdateCommand( request.commitPartitionId().asTablePartitionId(), rowUuid, row, + lastCommitTimestamp, request.transactionId(), request.full(), txCoordinatorId @@ -2814,12 +2856,14 @@ public class PartitionReplicaListener implements ReplicaListener { ReadWriteSingleRowPkReplicaRequest request, UUID rowUuid, @Nullable BinaryRow row, + @Nullable HybridTimestamp lastCommitTimestamp, String txCoordinatorId ) { return validateAtTimestampAndBuildUpdateCommand( request.commitPartitionId().asTablePartitionId(), rowUuid, row, + lastCommitTimestamp, request.transactionId(), request.full(), txCoordinatorId @@ -2841,6 +2885,7 @@ public class PartitionReplicaListener implements ReplicaListener { TablePartitionId tablePartId, UUID rowUuid, @Nullable BinaryRow row, + @Nullable HybridTimestamp lastCommitTimestamp, UUID txId, boolean full, String txCoordinatorId @@ -2851,7 +2896,17 @@ public class PartitionReplicaListener implements ReplicaListener { .thenApply(catalogVersion -> { failIfSchemaChangedSinceTxStart(txId, operationTimestamp); - return updateCommand(tablePartId, rowUuid, row, txId, full, txCoordinatorId, operationTimestamp, catalogVersion); + return updateCommand( + tablePartId, + rowUuid, + row, + lastCommitTimestamp, + txId, + full, + txCoordinatorId, + operationTimestamp, + catalogVersion + ); }); } @@ -2859,6 +2914,7 @@ public class PartitionReplicaListener implements ReplicaListener { TablePartitionId tablePartId, UUID rowUuid, @Nullable BinaryRow row, + @Nullable HybridTimestamp lastCommitTimestamp, UUID txId, boolean full, String txCoordinatorId, @@ -2874,6 +2930,10 @@ public class PartitionReplicaListener implements ReplicaListener { .txCoordinatorId(txCoordinatorId) .requiredCatalogVersion(catalogVersion); + if (lastCommitTimestamp != null) { + bldr.lastCommitTimestampLong(lastCommitTimestamp.longValue()); + } + if (row != null) { BinaryRowMessage rowMessage = MSG_FACTORY.binaryRowMessage() .binaryTuple(row.tupleSlice()) @@ -2906,10 +2966,12 @@ public class PartitionReplicaListener implements ReplicaListener { private CompletableFuture<UpdateAllCommand> validateAtTimestampAndBuildUpdateAllCommand( ReadWriteMultiRowReplicaRequest request, Map<UUID, BinaryRowMessage> rowsToUpdate, + Map<UUID, HybridTimestamp> lastCommitTimes, String txCoordinatorId ) { return validateAtTimestampAndBuildUpdateAllCommand( rowsToUpdate, + lastCommitTimes, request.commitPartitionId(), request.transactionId(), request.full(), @@ -2928,10 +2990,12 @@ public class PartitionReplicaListener implements ReplicaListener { private CompletableFuture<UpdateAllCommand> validateAtTimestampAndBuildUpdateAllCommand( ReadWriteMultiRowPkReplicaRequest request, Map<UUID, BinaryRowMessage> rowsToUpdate, + Map<UUID, HybridTimestamp> lastCommitTimes, String txCoordinatorId ) { return validateAtTimestampAndBuildUpdateAllCommand( rowsToUpdate, + lastCommitTimes, request.commitPartitionId(), request.transactionId(), request.full(), @@ -2951,6 +3015,7 @@ public class PartitionReplicaListener implements ReplicaListener { */ private CompletableFuture<UpdateAllCommand> validateAtTimestampAndBuildUpdateAllCommand( Map<UUID, BinaryRowMessage> rowsToUpdate, + Map<UUID, HybridTimestamp> lastCommitTimes, TablePartitionIdMessage commitPartitionId, UUID transactionId, boolean full, @@ -2970,6 +3035,12 @@ public class PartitionReplicaListener implements ReplicaListener { .full(full) .txCoordinatorId(txCoordinatorId) .requiredCatalogVersion(catalogVersion) + .lastCommitTimestampsLong( + // Also make sure lastCommitTimes contains only those entries that match rowsToUpdate. + lastCommitTimes.entrySet().stream() + .filter(entry -> rowsToUpdate.containsKey(entry.getKey())) + .collect(toMap(Entry::getKey, entry -> entry.getValue().longValue())) + ) .build(); }); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TimedBinaryRow.java similarity index 51% copy from modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java copy to modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TimedBinaryRow.java index 821e449311..2dee94ab59 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TimedBinaryRow.java @@ -15,34 +15,35 @@ * limitations under the License. */ -package org.apache.ignite.internal.table.distributed.command; +package org.apache.ignite.internal.table.distributed.replicator; -import java.util.UUID; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.schema.BinaryRow; -import org.apache.ignite.internal.table.distributed.TableMessageGroup; -import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage; -import org.apache.ignite.network.annotations.Transferable; import org.jetbrains.annotations.Nullable; /** - * State machine command to update a row specified by a row id. + * Row with time. */ -@Transferable(TableMessageGroup.Commands.UPDATE) -public interface UpdateCommand extends PartitionCommand { - TablePartitionIdMessage tablePartitionId(); +class TimedBinaryRow { - UUID rowUuid(); + private final @Nullable BinaryRow binaryRow; - @Nullable - BinaryRowMessage rowMessage(); + private final @Nullable HybridTimestamp commitTimestamp; - String txCoordinatorId(); + TimedBinaryRow(@Nullable BinaryRow binaryRow, @Nullable HybridTimestamp commitTimestamp) { + this.binaryRow = binaryRow; + this.commitTimestamp = commitTimestamp; + } + + TimedBinaryRow(@Nullable BinaryRow binaryRow) { + this(binaryRow, null); + } - /** Returns the row to update or {@code null} if the row should be removed. */ - @Nullable - default BinaryRow row() { - BinaryRowMessage message = rowMessage(); + public @Nullable BinaryRow binaryRow() { + return binaryRow; + } - return message == null ? null : message.asBinaryRow(); + public @Nullable HybridTimestamp commitTimestamp() { + return commitTimestamp; } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java index fdad65a016..956bc69fd6 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java @@ -140,7 +140,7 @@ public class TransactionStateResolver { public CompletableFuture<TransactionMeta> resolveTxState( UUID txId, ReplicationGroupId commitGrpId, - HybridTimestamp timestamp + @Nullable HybridTimestamp timestamp ) { TxStateMeta localMeta = txManager.stateMeta(txId); @@ -176,7 +176,7 @@ public class TransactionStateResolver { UUID txId, @Nullable TxStateMeta localMeta, ReplicationGroupId commitGrpId, - HybridTimestamp timestamp, + @Nullable HybridTimestamp timestamp, CompletableFuture<TransactionMeta> txMetaFuture ) { assert localMeta == null || !isFinalState(localMeta.txState()) : "Unexpected tx meta [txId" + txId + ", meta=" + localMeta + ']'; diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java index b604afee6b..db2084f269 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.table.distributed; +import static java.util.Collections.emptyMap; import static java.util.Collections.singletonMap; import static java.util.stream.Collectors.toList; import static org.mockito.Mockito.mock; @@ -170,9 +171,13 @@ public abstract class IndexBaseTest extends BaseMvStoragesTest { } static void addWrite(StorageUpdateHandler handler, UUID rowUuid, @Nullable BinaryRow row) { + addWrite(handler, rowUuid, row, null); + } + + static void addWrite(StorageUpdateHandler handler, UUID rowUuid, @Nullable BinaryRow row, @Nullable HybridTimestamp lastCommitTime) { TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID); - handler.handleUpdate(TX_ID, rowUuid, partitionId, row, false, null, null); + handler.handleUpdate(TX_ID, rowUuid, partitionId, row, false, null, null, lastCommitTime); } static BinaryRow defaultRow() { @@ -232,7 +237,8 @@ public abstract class IndexBaseTest extends BaseMvStoragesTest { USE_UPDATE { @Override void addWrite(StorageUpdateHandler handler, TablePartitionId partitionId, UUID rowUuid, @Nullable BinaryRow row) { - handler.handleUpdate(TX_ID, rowUuid, partitionId, row, true, null, null); + // TODO: perhaps need to pass last commit time as a param + handler.handleUpdate(TX_ID, rowUuid, partitionId, row, true, null, null, null); } }, /** Uses updateAll api. */ @@ -252,7 +258,8 @@ public abstract class IndexBaseTest extends BaseMvStoragesTest { partitionId, true, null, - null + null, + emptyMap() ); } }; diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java index 75cf522ac1..de61b6b648 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java @@ -18,6 +18,9 @@ package org.apache.ignite.internal.table.distributed; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.clearInvocations; @@ -92,6 +95,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest { private StorageUpdateHandler storageUpdateHandler; private GcUpdateHandler gcUpdateHandler; + private IndexUpdateHandler indexUpdateHandler; @BeforeEach void setUp(@InjectConfiguration GcConfiguration gcConfig) { @@ -143,7 +147,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest { TestPartitionDataStorage partitionDataStorage = new TestPartitionDataStorage(tableId, PARTITION_ID, storage); - IndexUpdateHandler indexUpdateHandler = new IndexUpdateHandler(DummyInternalTableImpl.createTableIndexStoragesSupplier(indexes)); + indexUpdateHandler = spy(new IndexUpdateHandler(DummyInternalTableImpl.createTableIndexStoragesSupplier(indexes))); gcUpdateHandler = new GcUpdateHandler( partitionDataStorage, @@ -171,9 +175,9 @@ public class StorageCleanupTest extends BaseMvStoragesTest { TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID); - storageUpdateHandler.handleUpdate(txUuid, UUID.randomUUID(), partitionId, row1, true, null, null); - storageUpdateHandler.handleUpdate(txUuid, UUID.randomUUID(), partitionId, row2, true, null, null); - storageUpdateHandler.handleUpdate(txUuid, UUID.randomUUID(), partitionId, row3, true, null, null); + storageUpdateHandler.handleUpdate(txUuid, UUID.randomUUID(), partitionId, row1, true, null, null, null); + storageUpdateHandler.handleUpdate(txUuid, UUID.randomUUID(), partitionId, row2, true, null, null, null); + storageUpdateHandler.handleUpdate(txUuid, UUID.randomUUID(), partitionId, row3, true, null, null, null); assertEquals(3, storage.rowsCount()); @@ -194,9 +198,9 @@ public class StorageCleanupTest extends BaseMvStoragesTest { TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID); - storageUpdateHandler.handleUpdate(txUuid, UUID.randomUUID(), partitionId, row1, true, null, null); - storageUpdateHandler.handleUpdate(txUuid, UUID.randomUUID(), partitionId, row2, true, null, null); - storageUpdateHandler.handleUpdate(txUuid, UUID.randomUUID(), partitionId, row3, true, null, null); + storageUpdateHandler.handleUpdate(txUuid, UUID.randomUUID(), partitionId, row1, true, null, null, null); + storageUpdateHandler.handleUpdate(txUuid, UUID.randomUUID(), partitionId, row2, true, null, null, null); + storageUpdateHandler.handleUpdate(txUuid, UUID.randomUUID(), partitionId, row3, true, null, null, null); assertEquals(3, storage.rowsCount()); // We have three writes to the storage. @@ -236,9 +240,9 @@ public class StorageCleanupTest extends BaseMvStoragesTest { UUID row2Id = UUID.randomUUID(); UUID row3Id = UUID.randomUUID(); - storageUpdateHandler.handleUpdate(txUuid, row1Id, partitionId, row1, false, null, null); - storageUpdateHandler.handleUpdate(txUuid, row2Id, partitionId, row2, false, null, null); - storageUpdateHandler.handleUpdate(txUuid, row3Id, partitionId, row3, false, null, null); + storageUpdateHandler.handleUpdate(txUuid, row1Id, partitionId, row1, false, null, null, null); + storageUpdateHandler.handleUpdate(txUuid, row2Id, partitionId, row2, false, null, null, null); + storageUpdateHandler.handleUpdate(txUuid, row3Id, partitionId, row3, false, null, null, null); assertEquals(3, storage.rowsCount()); @@ -264,7 +268,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest { UUID newTxUuid = UUID.randomUUID(); // Insert new write intent in the new transaction. - storageUpdateHandler.handleUpdate(newTxUuid, row1Id, partitionId, row4, true, null, null); + storageUpdateHandler.handleUpdate(newTxUuid, row1Id, partitionId, row4, true, null, null, null); // And concurrently the other two intents were also resolved and scheduled for cleanup. storageUpdateHandler.handleWriteIntentRead(txUuid, new RowId(PARTITION_ID, row2Id)); @@ -279,4 +283,301 @@ public class StorageCleanupTest extends BaseMvStoragesTest { // Only those two entries will be affected. verify(storage, times(2)).commitWrite(any(), any()); } + + @Test + void testCleanupBeforeUpdateNoData() { + UUID runningTx = UUID.randomUUID(); + + TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID); + + BinaryRow row1 = binaryRow(new TestKey(1, "foo1"), new TestValue(2, "bar")); + + UUID rowId = UUID.randomUUID(); + + HybridTimestamp commitTs = CLOCK.now(); + + storageUpdateHandler.handleUpdate(runningTx, rowId, partitionId, row1, false, null, null, commitTs); + + verify(storage, never()).commitWrite(any(), any()); + verify(storage, never()).abortWrite(any()); + verify(indexUpdateHandler, never()).tryRemoveFromIndexes(any(), any(), any()); + } + + @Test + void testCleanupBeforeUpdateNoWriteIntent() { + UUID committedTx = UUID.randomUUID(); + + TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID); + + // First commit a row + + BinaryRow row1 = binaryRow(new TestKey(1, "foo1"), new TestValue(2, "bar")); + + UUID rowId = UUID.randomUUID(); + + HybridTimestamp commitTs = CLOCK.now(); + + storageUpdateHandler.handleUpdate(committedTx, rowId, partitionId, row1, true, null, null, null); + + storageUpdateHandler.handleTransactionCleanup(committedTx, true, commitTs); + + assertEquals(1, storage.rowsCount()); + + assertFalse(storage.read(new RowId(PARTITION_ID, rowId), HybridTimestamp.MAX_VALUE).isWriteIntent()); + + // Now create a new write intent over the committed data. No cleanup should be triggered. + + BinaryRow row2 = binaryRow(new TestKey(3, "foo3"), new TestValue(4, "baz")); + + UUID runningTx = UUID.randomUUID(); + + clearInvocations(storage, indexUpdateHandler); + + storageUpdateHandler.handleUpdate(runningTx, rowId, partitionId, row2, true, null, null, commitTs); + + verify(storage, never()).commitWrite(any(), any()); + verify(storage, never()).abortWrite(any()); + verify(indexUpdateHandler, never()).tryRemoveFromIndexes(any(), any(), any()); + } + + @Test + void testCleanupBeforeUpdateSameTxOnlyWriteIntent() { + UUID runningTx = UUID.randomUUID(); + + TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID); + + // Create a write intent. + + BinaryRow row1 = binaryRow(new TestKey(1, "foo1"), new TestValue(2, "bar")); + + UUID rowId = UUID.randomUUID(); + + HybridTimestamp commitTs = CLOCK.now(); + + storageUpdateHandler.handleUpdate(runningTx, rowId, partitionId, row1, true, null, null, null); + + assertEquals(1, storage.rowsCount()); + + assertTrue(storage.read(new RowId(PARTITION_ID, rowId), HybridTimestamp.MAX_VALUE).isWriteIntent()); + + // Then create another one for the same row in the same transaction. The entry will be replaced. + + BinaryRow row2 = binaryRow(new TestKey(3, "foo3"), new TestValue(4, "baz")); + + clearInvocations(storage, indexUpdateHandler); + + storageUpdateHandler.handleUpdate(runningTx, rowId, partitionId, row2, true, null, null, commitTs); + + assertEquals(1, storage.rowsCount()); + + assertTrue(storage.read(new RowId(PARTITION_ID, rowId), HybridTimestamp.MAX_VALUE).isWriteIntent()); + + verify(storage, never()).commitWrite(any(), any()); + verify(storage, never()).abortWrite(any()); + verify(indexUpdateHandler, times(1)).tryRemoveFromIndexes(any(), any(), any()); + } + + @Test + void testCleanupBeforeUpdateDifferentTxOnlyWriteIntent() { + UUID runningTx = UUID.randomUUID(); + + TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID); + + // Create a new write intent. + + BinaryRow row1 = binaryRow(new TestKey(1, "foo1"), new TestValue(2, "bar")); + + UUID rowId = UUID.randomUUID(); + + HybridTimestamp commitTs = CLOCK.now(); + + storageUpdateHandler.handleUpdate(runningTx, rowId, partitionId, row1, true, null, null, null); + + assertEquals(1, storage.rowsCount()); + + assertTrue(storage.read(new RowId(PARTITION_ID, rowId), HybridTimestamp.MAX_VALUE).isWriteIntent()); + + // Create another one and pass `last commit time`. The previous value should be committed automatically. + + BinaryRow row2 = binaryRow(new TestKey(3, "foo3"), new TestValue(4, "baz")); + + clearInvocations(storage, indexUpdateHandler); + + UUID runningTx2 = UUID.randomUUID(); + + // Previous value will be committed even though the cleanup was not called explicitly. + storageUpdateHandler.handleUpdate(runningTx2, rowId, partitionId, row2, true, null, null, commitTs); + + assertEquals(1, storage.rowsCount()); + + assertTrue(storage.read(new RowId(PARTITION_ID, rowId), HybridTimestamp.MAX_VALUE).isWriteIntent()); + + verify(storage, times(1)).commitWrite(any(), any()); + verify(storage, never()).abortWrite(any()); + verify(indexUpdateHandler, never()).tryRemoveFromIndexes(any(), any(), any()); + } + + @Test + void testCleanupBeforeUpdateAbortWriteIntent() { + UUID committed1 = UUID.randomUUID(); + + TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID); + + // First commit an entry. + + BinaryRow row1 = binaryRow(new TestKey(1, "foo1"), new TestValue(2, "bar")); + + UUID rowId = UUID.randomUUID(); + + HybridTimestamp commitTs = CLOCK.now(); + + storageUpdateHandler.handleUpdate(committed1, rowId, partitionId, row1, true, null, null, null); + + storageUpdateHandler.handleTransactionCleanup(committed1, true, commitTs); + + assertEquals(1, storage.rowsCount()); + + assertFalse(storage.read(new RowId(PARTITION_ID, rowId), HybridTimestamp.MAX_VALUE).isWriteIntent()); + + // Now add a new write intent. + + BinaryRow row2 = binaryRow(new TestKey(3, "foo3"), new TestValue(4, "baz")); + + UUID committed2 = UUID.randomUUID(); + + storageUpdateHandler.handleUpdate(committed2, rowId, partitionId, row2, true, null, null, null); + + assertTrue(storage.read(new RowId(PARTITION_ID, rowId), HybridTimestamp.MAX_VALUE).isWriteIntent()); + + // Create another write intent and provide `last commit time`. + + clearInvocations(storage, indexUpdateHandler); + + UUID runningTx = UUID.randomUUID(); + + BinaryRow row3 = binaryRow(new TestKey(5, "foo5"), new TestValue(7, "zzu")); + + // Last commit time equal to the time of the previously committed value => previous write intent will be reverted. + storageUpdateHandler.handleUpdate(runningTx, rowId, partitionId, row3, true, null, null, commitTs); + + assertEquals(1, storage.rowsCount()); + + assertTrue(storage.read(new RowId(PARTITION_ID, rowId), HybridTimestamp.MAX_VALUE).isWriteIntent()); + + verify(storage, never()).commitWrite(any(), any()); + verify(storage, times(1)).abortWrite(any()); + verify(indexUpdateHandler, times(1)).tryRemoveFromIndexes(any(), any(), any()); + } + + @Test + void testCleanupBeforeUpdateCommitWriteIntent() { + UUID committed1 = UUID.randomUUID(); + + TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID); + + // First commit an entry. + + BinaryRow row1 = binaryRow(new TestKey(1, "foo1"), new TestValue(2, "bar")); + + UUID rowId = UUID.randomUUID(); + + HybridTimestamp commitTs = CLOCK.now(); + + storageUpdateHandler.handleUpdate(committed1, rowId, partitionId, row1, true, null, null, null); + + storageUpdateHandler.handleTransactionCleanup(committed1, true, commitTs); + + assertEquals(1, storage.rowsCount()); + + assertFalse(storage.read(new RowId(PARTITION_ID, rowId), HybridTimestamp.MAX_VALUE).isWriteIntent()); + + // Now add a new write intent. + + BinaryRow row2 = binaryRow(new TestKey(3, "foo3"), new TestValue(4, "baz")); + + UUID committed2 = UUID.randomUUID(); + + storageUpdateHandler.handleUpdate(committed2, rowId, partitionId, row2, true, null, null, null); + + assertTrue(storage.read(new RowId(PARTITION_ID, rowId), HybridTimestamp.MAX_VALUE).isWriteIntent()); + + // Create another write intent and provide `last commit time`. + + clearInvocations(storage, indexUpdateHandler); + + UUID runningTx = UUID.randomUUID(); + + BinaryRow row3 = binaryRow(new TestKey(5, "foo5"), new TestValue(7, "zzu")); + + HybridTimestamp lastCommitTs = commitTs.addPhysicalTime(100); + + // Last commit time is after the time of the previously committed value => previous write intent will be committed. + storageUpdateHandler.handleUpdate(runningTx, rowId, partitionId, row3, true, null, null, lastCommitTs); + + assertEquals(1, storage.rowsCount()); + + assertTrue(storage.read(new RowId(PARTITION_ID, rowId), HybridTimestamp.MAX_VALUE).isWriteIntent()); + + verify(storage, times(1)).commitWrite(any(), any()); + verify(storage, never()).abortWrite(any()); + verify(indexUpdateHandler, never()).tryRemoveFromIndexes(any(), any(), any()); + } + + @Test + void testCleanupBeforeUpdateError() { + UUID committed1 = UUID.randomUUID(); + + TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID); + + // First commit an entry. + + BinaryRow row1 = binaryRow(new TestKey(1, "foo1"), new TestValue(2, "bar")); + + UUID rowId = UUID.randomUUID(); + + HybridTimestamp commitTs = CLOCK.now(); + + storageUpdateHandler.handleUpdate(committed1, rowId, partitionId, row1, true, null, null, null); + + storageUpdateHandler.handleTransactionCleanup(committed1, true, commitTs); + + assertEquals(1, storage.rowsCount()); + + assertFalse(storage.read(new RowId(PARTITION_ID, rowId), HybridTimestamp.MAX_VALUE).isWriteIntent()); + + // Now add a new write intent. + + BinaryRow row2 = binaryRow(new TestKey(3, "foo3"), new TestValue(4, "baz")); + + UUID committed2 = UUID.randomUUID(); + + storageUpdateHandler.handleUpdate(committed2, rowId, partitionId, row2, true, null, null, null); + + assertTrue(storage.read(new RowId(PARTITION_ID, rowId), HybridTimestamp.MAX_VALUE).isWriteIntent()); + + // Create another write intent and provide `last commit time`. + + clearInvocations(storage, indexUpdateHandler); + + UUID runningTx = UUID.randomUUID(); + + BinaryRow row3 = binaryRow(new TestKey(5, "foo5"), new TestValue(7, "zzu")); + + // This should lead to an exception + HybridTimestamp lastCommitTs = commitTs.subtractPhysicalTime(100); + + // Last commit time is before the time of the previously committed value => this should not happen. + assertThrows(AssertionError.class, () -> + storageUpdateHandler.handleUpdate(runningTx, rowId, partitionId, row3, true, null, null, lastCommitTs)); + + + assertEquals(1, storage.rowsCount()); + + assertTrue(storage.read(new RowId(PARTITION_ID, rowId), HybridTimestamp.MAX_VALUE).isWriteIntent()); + + verify(storage, never()).commitWrite(any(), any()); + verify(storage, never()).abortWrite(any()); + verify(indexUpdateHandler, never()).tryRemoveFromIndexes(any(), any(), any()); + } + } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java index 22d981e0f4..1934ed5477 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.table.distributed; +import static java.util.Collections.emptyMap; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -116,6 +117,7 @@ public class StorageUpdateHandlerTest extends BaseIgniteAbstractTest { null, false, null, + null, null ); @@ -138,7 +140,8 @@ public class StorageUpdateHandlerTest extends BaseIgniteAbstractTest { new TablePartitionId(TABLE_ID, PARTITION_ID), false, null, - null + null, + emptyMap() ); verify(partitionStorage).peek(lwm); diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java index 5ccba2992c..028d4a47b9 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java @@ -50,6 +50,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; +import java.util.function.BiConsumer; import java.util.function.Consumer; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.Column; @@ -1769,7 +1770,7 @@ public abstract class TxAbstractTest extends IgniteAbstractTest { * @param rows Rows. * @param expected Expected values. */ - private static void validateBalance(Collection<Tuple> rows, @Nullable Double... expected) { + protected static void validateBalance(Collection<Tuple> rows, @Nullable Double... expected) { assertThat( rows.stream().map(tuple -> tuple == null ? null : tuple.doubleValue("balance")).collect(toList()), contains(expected) @@ -1970,12 +1971,20 @@ public abstract class TxAbstractTest extends IgniteAbstractTest { @Test public void testTransactionAlreadyCommitted() { - testTransactionAlreadyFinished(true); + testTransactionAlreadyFinished(true, (transaction, uuid) -> { + transaction.commit(); + + log.info("Committed transaction {}", uuid); + }); } @Test public void testTransactionAlreadyRolledback() { - testTransactionAlreadyFinished(false); + testTransactionAlreadyFinished(false, (transaction, uuid) -> { + transaction.rollback(); + + log.info("Rolled back transaction {}", uuid); + }); } @Test @@ -2075,7 +2084,7 @@ public abstract class TxAbstractTest extends IgniteAbstractTest { * * @param commit True when transaction is committed, false the transaction is rolled back. */ - private void testTransactionAlreadyFinished(boolean commit) { + protected void testTransactionAlreadyFinished(boolean commit, BiConsumer<Transaction, UUID> finisher) { Transaction tx = igniteTransactions.begin(); var txId = ((ReadWriteTransactionImpl) tx).id(); @@ -2091,15 +2100,7 @@ public abstract class TxAbstractTest extends IgniteAbstractTest { validateBalance(res, 100., 200.); - if (commit) { - tx.commit(); - - log.info("Committed transaction {}", txId); - } else { - tx.rollback(); - - log.info("Rolled back transaction {}", txId); - } + finisher.accept(tx, txId); TransactionException ex = assertThrows(TransactionException.class, () -> accountsRv.get(tx, makeKey(1))); assertTrue(ex.getMessage().contains("Transaction is already finished."));