This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch ignite-21248
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 411545ec9cd99ff84f217aa0d66620562bd596df
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Fri Jan 19 17:28:12 2024 +0300

    IGNITE-21248 HeapUnboundedLockManager lacks abandoned locks handling
---
 .../internal/table/ItTransactionRecoveryTest.java  | 171 +++++++++++++++++++++
 1 file changed, 171 insertions(+)

diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
index d72df1b78e..82edbf3949 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
@@ -25,6 +25,7 @@ import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutur
 import static org.apache.ignite.internal.util.ExceptionUtils.extractCodeFrom;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -32,9 +33,12 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
+import java.util.ArrayList;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.Flow.Subscription;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Predicate;
@@ -49,8 +53,11 @@ import 
org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import 
org.apache.ignite.internal.replicator.message.ErrorTimestampAwareReplicaResponse;
 import 
org.apache.ignite.internal.replicator.message.TimestampAwareReplicaResponse;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryTuple;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.testframework.flow.TestFlowUtils;
 import org.apache.ignite.internal.tx.HybridTimestampTracker;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.TransactionAlreadyFinishedException;
@@ -58,11 +65,14 @@ import org.apache.ignite.internal.tx.TxMeta;
 import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.internal.tx.TxStateMeta;
 import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
+import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl;
 import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
 import org.apache.ignite.internal.tx.message.TxRecoveryMessage;
 import org.apache.ignite.internal.tx.message.TxStateCommitPartitionRequest;
 import org.apache.ignite.internal.util.ExceptionUtils;
+import org.apache.ignite.internal.utils.PrimaryReplica;
 import org.apache.ignite.lang.ErrorGroups.Transactions;
+import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.DefaultMessagingService;
 import org.apache.ignite.network.NetworkMessage;
@@ -110,6 +120,167 @@ public class ItTransactionRecoveryTest extends 
ClusterPerTestIntegrationTest {
                 + "}\n");
     }
 
+    @Test
+    public void testTsRecoveryForCursor() throws Exception {
+        TableImpl tbl = (TableImpl) node(0).tables().table(TABLE_NAME);
+
+        RecordView view1 = node(0).tables().table(TABLE_NAME).recordView();
+
+        for (int i = 0; i < 10; i++) {
+            view1.upsert(null, Tuple.create().set("key", i).set("val", 
"preload"));
+        }
+
+        var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0);
+
+        CompletableFuture<ReplicaMeta> primaryReplicaFut = 
node(0).placementDriver().awaitPrimaryReplica(
+                tblReplicationGrp,
+                node(0).clock().now(),
+                10,
+                SECONDS
+        );
+
+        assertThat(primaryReplicaFut, willCompleteSuccessfully());
+
+        String leaseholder = primaryReplicaFut.join().getLeaseholder();
+
+        IgniteImpl commitPartNode = commitPartitionPrimaryNode(leaseholder);
+
+        log.info("Transaction commit partition is determined [node={}].", 
commitPartNode.name());
+
+        IgniteImpl txCrdNode = nonPrimaryNode(leaseholder);
+
+        log.info("Transaction coordinator is chosen [node={}].", 
txCrdNode.name());
+
+        UUID orphanTxId = startTransactionWithCursorAndStopNode(txCrdNode);
+
+        IgniteImpl newCoordNode = node(0);
+
+        log.info("New transaction coordinator is chosen [node={}].", 
newCoordNode.name());
+
+        CompletableFuture<Void> txMsgCaptureFut = new CompletableFuture<>();
+
+        commitPartNode.dropMessages((nodeName, msg) -> {
+            if (msg instanceof TxRecoveryMessage) {
+                txMsgCaptureFut.complete(null);
+            }
+
+            return false;
+        });
+
+        Transaction tx = newCoordNode.transactions().begin();
+
+        RecordView view = newCoordNode.tables().table(TABLE_NAME).recordView();
+
+        var opFut = view.upsertAsync(tx, Tuple.create().set("key", 
42).set("val", "new"));
+
+        try {
+            opFut.get();
+        } catch (Exception ex) {
+            log.info("Expected conflict that have to start recovery: " + 
ex.getMessage());
+        }
+
+        assertThat(txMsgCaptureFut, willCompleteSuccessfully());
+    }
+
+    private UUID startTransactionWithCursorAndStopNode(IgniteImpl txCrdNode) 
throws Exception {
+        RecordView<Tuple> view = 
txCrdNode.tables().table(TABLE_NAME).recordView();
+
+        InternalTransaction rwTx = (InternalTransaction) 
txCrdNode.transactions().begin();
+
+        scanSingleEntryAndLeaveCursorOpen((TableViewInternal) 
txCrdNode.tables().table(TABLE_NAME), rwTx, null, null);
+
+        // view.upsert(rwTx, Tuple.create().set("key", 42).set("val", "val1"));
+
+        String txCrdNodeId = txCrdNode.id();
+
+        txCrdNode.stop();
+
+        assertTrue(waitForCondition(
+                () -> node(0).clusterNodes().stream().filter(n -> 
txCrdNodeId.equals(n.id())).count() == 0,
+                10_000)
+        );
+
+        return rwTx.id();
+    }
+
+    int PART_ID = 0;
+
+    /**
+     * Starts a scan procedure for a specific transaction and reads only the 
first line from the cursor.
+     *
+     * @param tbl Scanned table.
+     * @param tx Transaction.
+     * @param idxId Index id.
+     * @throws Exception If failed.
+     */
+    private void scanSingleEntryAndLeaveCursorOpen(TableViewInternal tbl, 
InternalTransaction tx, Integer idxId, BinaryTuple exactKey)
+            throws Exception {
+        Publisher<BinaryRow> publisher;
+
+        if (tx.isReadOnly()) {
+            CompletableFuture<ReplicaMeta> primaryReplicaFut = 
node(0).placementDriver().getPrimaryReplica(
+                    new TablePartitionId(tbl.tableId(), PART_ID),
+                    node(0).clock().now()
+            );
+
+            assertThat(primaryReplicaFut, willCompleteSuccessfully());
+
+            assertNotNull(primaryReplicaFut.join());
+
+            String primaryId = primaryReplicaFut.get().getLeaseholderId();
+
+            ClusterNode primaryNode = 
node(0).clusterNodes().stream().filter(node -> 
node.id().equals(primaryId)).findAny().get();
+
+            if (idxId == null) {
+                publisher = tbl.internalTable().scan(PART_ID, 
tx.readTimestamp(), primaryNode);
+            } else if (exactKey == null) {
+                publisher = tbl.internalTable().scan(PART_ID, 
tx.readTimestamp(), primaryNode, idxId, null, null, 0, null);
+            } else {
+                publisher = tbl.internalTable().lookup(PART_ID, 
tx.readTimestamp(), primaryNode, idxId, exactKey, null);
+            }
+        } else if (idxId == null) {
+            publisher = tbl.internalTable().scan(PART_ID, tx);
+        } else if (exactKey == null) {
+            publisher = tbl.internalTable().scan(PART_ID, tx, idxId, null, 
null, 0, null);
+        } else {
+            var rwTx = (ReadWriteTransactionImpl) tx;
+
+            CompletableFuture<ReplicaMeta> primaryReplicaFut = 
node(0).placementDriver().getPrimaryReplica(
+                    new TablePartitionId(tbl.tableId(), PART_ID),
+                    node(0).clock().now()
+            );
+
+            assertThat(primaryReplicaFut, willCompleteSuccessfully());
+
+            assertNotNull(primaryReplicaFut.join());
+
+            String primaryId = primaryReplicaFut.get().getLeaseholderId();
+
+            ClusterNode primaryNode = 
node(0).clusterNodes().stream().filter(node -> 
node.id().equals(primaryId)).findAny().get();
+
+            publisher = tbl.internalTable().lookup(
+                    PART_ID,
+                    rwTx.id(),
+                    rwTx.commitPartition(),
+                    new PrimaryReplica(primaryNode, 
primaryReplicaFut.get().getStartTime().longValue()),
+                    idxId,
+                    exactKey,
+                    null
+            );
+        }
+
+        ArrayList<BinaryRow> scannedRows = new ArrayList<>();
+        CompletableFuture<Void> scanned = new CompletableFuture<>();
+
+        Subscription subscription = 
TestFlowUtils.subscribeToPublisher(scannedRows, publisher, scanned);
+
+        subscription.request(1);
+
+        assertTrue(waitForCondition(() -> scannedRows.size() == 1, 10_000));
+
+        assertFalse(scanned.isDone());
+    }
+
     @Test
     public void testMultipleRecoveryRequestsIssued() throws Exception {
         TableImpl tbl = (TableImpl) node(0).tables().table(TABLE_NAME);

Reply via email to