This is an automated email from the ASF dual-hosted git repository. vpyatkov pushed a commit to branch ignite-21248 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 411545ec9cd99ff84f217aa0d66620562bd596df Author: Vladislav Pyatkov <[email protected]> AuthorDate: Fri Jan 19 17:28:12 2024 +0300 IGNITE-21248 HeapUnboundedLockManager lacks abandoned locks handling --- .../internal/table/ItTransactionRecoveryTest.java | 171 +++++++++++++++++++++ 1 file changed, 171 insertions(+) diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java index d72df1b78e..82edbf3949 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java @@ -25,6 +25,7 @@ import static org.apache.ignite.internal.testframework.matchers.CompletableFutur import static org.apache.ignite.internal.util.ExceptionUtils.extractCodeFrom; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; @@ -32,9 +33,12 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import java.util.ArrayList; import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Flow.Publisher; +import java.util.concurrent.Flow.Subscription; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; @@ -49,8 +53,11 @@ import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.replicator.message.ErrorTimestampAwareReplicaResponse; import org.apache.ignite.internal.replicator.message.TimestampAwareReplicaResponse; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.schema.BinaryTuple; import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest; import org.apache.ignite.internal.testframework.IgniteTestUtils; +import org.apache.ignite.internal.testframework.flow.TestFlowUtils; import org.apache.ignite.internal.tx.HybridTimestampTracker; import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.tx.TransactionAlreadyFinishedException; @@ -58,11 +65,14 @@ import org.apache.ignite.internal.tx.TxMeta; import org.apache.ignite.internal.tx.TxState; import org.apache.ignite.internal.tx.TxStateMeta; import org.apache.ignite.internal.tx.configuration.TransactionConfiguration; +import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl; import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest; import org.apache.ignite.internal.tx.message.TxRecoveryMessage; import org.apache.ignite.internal.tx.message.TxStateCommitPartitionRequest; import org.apache.ignite.internal.util.ExceptionUtils; +import org.apache.ignite.internal.utils.PrimaryReplica; import org.apache.ignite.lang.ErrorGroups.Transactions; +import org.apache.ignite.network.ClusterNode; import org.apache.ignite.network.ClusterService; import org.apache.ignite.network.DefaultMessagingService; import org.apache.ignite.network.NetworkMessage; @@ -110,6 +120,167 @@ public class ItTransactionRecoveryTest extends ClusterPerTestIntegrationTest { + "}\n"); } + @Test + public void testTsRecoveryForCursor() throws Exception { + TableImpl tbl = (TableImpl) node(0).tables().table(TABLE_NAME); + + RecordView view1 = node(0).tables().table(TABLE_NAME).recordView(); + + for (int i = 0; i < 10; i++) { + view1.upsert(null, Tuple.create().set("key", i).set("val", "preload")); + } + + var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0); + + CompletableFuture<ReplicaMeta> primaryReplicaFut = node(0).placementDriver().awaitPrimaryReplica( + tblReplicationGrp, + node(0).clock().now(), + 10, + SECONDS + ); + + assertThat(primaryReplicaFut, willCompleteSuccessfully()); + + String leaseholder = primaryReplicaFut.join().getLeaseholder(); + + IgniteImpl commitPartNode = commitPartitionPrimaryNode(leaseholder); + + log.info("Transaction commit partition is determined [node={}].", commitPartNode.name()); + + IgniteImpl txCrdNode = nonPrimaryNode(leaseholder); + + log.info("Transaction coordinator is chosen [node={}].", txCrdNode.name()); + + UUID orphanTxId = startTransactionWithCursorAndStopNode(txCrdNode); + + IgniteImpl newCoordNode = node(0); + + log.info("New transaction coordinator is chosen [node={}].", newCoordNode.name()); + + CompletableFuture<Void> txMsgCaptureFut = new CompletableFuture<>(); + + commitPartNode.dropMessages((nodeName, msg) -> { + if (msg instanceof TxRecoveryMessage) { + txMsgCaptureFut.complete(null); + } + + return false; + }); + + Transaction tx = newCoordNode.transactions().begin(); + + RecordView view = newCoordNode.tables().table(TABLE_NAME).recordView(); + + var opFut = view.upsertAsync(tx, Tuple.create().set("key", 42).set("val", "new")); + + try { + opFut.get(); + } catch (Exception ex) { + log.info("Expected conflict that have to start recovery: " + ex.getMessage()); + } + + assertThat(txMsgCaptureFut, willCompleteSuccessfully()); + } + + private UUID startTransactionWithCursorAndStopNode(IgniteImpl txCrdNode) throws Exception { + RecordView<Tuple> view = txCrdNode.tables().table(TABLE_NAME).recordView(); + + InternalTransaction rwTx = (InternalTransaction) txCrdNode.transactions().begin(); + + scanSingleEntryAndLeaveCursorOpen((TableViewInternal) txCrdNode.tables().table(TABLE_NAME), rwTx, null, null); + + // view.upsert(rwTx, Tuple.create().set("key", 42).set("val", "val1")); + + String txCrdNodeId = txCrdNode.id(); + + txCrdNode.stop(); + + assertTrue(waitForCondition( + () -> node(0).clusterNodes().stream().filter(n -> txCrdNodeId.equals(n.id())).count() == 0, + 10_000) + ); + + return rwTx.id(); + } + + int PART_ID = 0; + + /** + * Starts a scan procedure for a specific transaction and reads only the first line from the cursor. + * + * @param tbl Scanned table. + * @param tx Transaction. + * @param idxId Index id. + * @throws Exception If failed. + */ + private void scanSingleEntryAndLeaveCursorOpen(TableViewInternal tbl, InternalTransaction tx, Integer idxId, BinaryTuple exactKey) + throws Exception { + Publisher<BinaryRow> publisher; + + if (tx.isReadOnly()) { + CompletableFuture<ReplicaMeta> primaryReplicaFut = node(0).placementDriver().getPrimaryReplica( + new TablePartitionId(tbl.tableId(), PART_ID), + node(0).clock().now() + ); + + assertThat(primaryReplicaFut, willCompleteSuccessfully()); + + assertNotNull(primaryReplicaFut.join()); + + String primaryId = primaryReplicaFut.get().getLeaseholderId(); + + ClusterNode primaryNode = node(0).clusterNodes().stream().filter(node -> node.id().equals(primaryId)).findAny().get(); + + if (idxId == null) { + publisher = tbl.internalTable().scan(PART_ID, tx.readTimestamp(), primaryNode); + } else if (exactKey == null) { + publisher = tbl.internalTable().scan(PART_ID, tx.readTimestamp(), primaryNode, idxId, null, null, 0, null); + } else { + publisher = tbl.internalTable().lookup(PART_ID, tx.readTimestamp(), primaryNode, idxId, exactKey, null); + } + } else if (idxId == null) { + publisher = tbl.internalTable().scan(PART_ID, tx); + } else if (exactKey == null) { + publisher = tbl.internalTable().scan(PART_ID, tx, idxId, null, null, 0, null); + } else { + var rwTx = (ReadWriteTransactionImpl) tx; + + CompletableFuture<ReplicaMeta> primaryReplicaFut = node(0).placementDriver().getPrimaryReplica( + new TablePartitionId(tbl.tableId(), PART_ID), + node(0).clock().now() + ); + + assertThat(primaryReplicaFut, willCompleteSuccessfully()); + + assertNotNull(primaryReplicaFut.join()); + + String primaryId = primaryReplicaFut.get().getLeaseholderId(); + + ClusterNode primaryNode = node(0).clusterNodes().stream().filter(node -> node.id().equals(primaryId)).findAny().get(); + + publisher = tbl.internalTable().lookup( + PART_ID, + rwTx.id(), + rwTx.commitPartition(), + new PrimaryReplica(primaryNode, primaryReplicaFut.get().getStartTime().longValue()), + idxId, + exactKey, + null + ); + } + + ArrayList<BinaryRow> scannedRows = new ArrayList<>(); + CompletableFuture<Void> scanned = new CompletableFuture<>(); + + Subscription subscription = TestFlowUtils.subscribeToPublisher(scannedRows, publisher, scanned); + + subscription.request(1); + + assertTrue(waitForCondition(() -> scannedRows.size() == 1, 10_000)); + + assertFalse(scanned.isDone()); + } + @Test public void testMultipleRecoveryRequestsIssued() throws Exception { TableImpl tbl = (TableImpl) node(0).tables().table(TABLE_NAME);
