This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 947103e94e IGNITE-21290 Scan cursors do not close after being fully 
read in transactions (#3069)
947103e94e is described below

commit 947103e94ed9fca294cf8f088368d63c4a706821
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Mon Feb 5 11:06:34 2024 +0300

    IGNITE-21290 Scan cursors do not close after being fully read in 
transactions (#3069)
---
 .../ignite/internal/table/ItTableScanTest.java     | 164 +++++++++++++++------
 .../table/distributed/TableMessageGroup.java       |   6 +-
 .../request/ReadWriteScanCloseReplicaRequest.java  |  28 ----
 .../request/ScanCloseReplicaRequest.java           |   8 +-
 .../replicator/PartitionReplicaListener.java       |  49 ++++--
 .../distributed/storage/InternalTableImpl.java     |  80 ++++++++--
 .../replication/PartitionReplicaListenerTest.java  |   3 +-
 7 files changed, 234 insertions(+), 104 deletions(-)

diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
index 68a0ba44bf..3ca9b399b8 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
@@ -59,14 +59,15 @@ import 
org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.BinaryTuplePrefix;
-import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.schema.row.RowAssembler;
 import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
+import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.storage.impl.TestStorageEngine;
+import org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.internal.tx.InternalTransaction;
-import org.apache.ignite.internal.type.NativeTypes;
 import org.apache.ignite.internal.utils.PrimaryReplica;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.table.KeyValueView;
@@ -98,14 +99,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest {
     /** The only partition in the table. */
     private static final int PART_ID = 0;
 
-    private static final SchemaDescriptor SCHEMA = new SchemaDescriptor(
-            1,
-            new Column[]{new Column("key", NativeTypes.INT32, false)},
-            new Column[]{
-                    new Column("valInt", NativeTypes.INT32, false),
-                    new Column("valStr", NativeTypes.STRING, false)
-            }
-    );
+    private SchemaDescriptor schema;
 
     private TableViewInternal table;
 
@@ -117,14 +111,68 @@ public class ItTableScanTest extends 
BaseSqlIntegrationTest {
 
         internalTable = table.internalTable();
 
+        schema = table.schemaView().lastKnownSchema();
+
         loadData(table);
     }
 
     @AfterEach
     public void afterTest() {
+        CLUSTER.runningNodes().forEach(this::checkResourcesAreReleased);
+
         clearData(table);
     }
 
+    /**
+     * Checks all transaction resources are released (cursors and locks).
+     *
+     * @param ignite Ignite instance.
+     */
+    private void checkResourcesAreReleased(IgniteImpl ignite) {
+        checkCursorsAreClosed(ignite);
+
+        assertTrue(ignite.txManager().lockManager().isEmpty());
+    }
+
+    /**
+     * Checks all transaction cursors are closed.
+     *
+     * @param ignite Ignite instance.
+     */
+    private void checkCursorsAreClosed(IgniteImpl ignite) {
+        int sortedIdxId = getIndexId(ignite, SORTED_IDX);
+
+        var partitionStorage = (TestMvPartitionStorage) ((TableViewInternal) 
ignite.tables().table(TABLE_NAME))
+                .internalTable().storage().getMvPartition(PART_ID);
+        var sortedIdxStorage = (TestSortedIndexStorage) ((TableViewInternal) 
ignite.tables().table(TABLE_NAME))
+                .internalTable().storage().getIndex(PART_ID, sortedIdxId);
+
+        assertEquals(0, partitionStorage.pendingCursors());
+        assertEquals(0, sortedIdxStorage.pendingCursors());
+    }
+
+    /**
+     * Gets index id by name.
+     *
+     * @param idxName Index name.
+     * @return Index id.
+     */
+    private int getIndexId(IgniteImpl ignite, String idxName) {
+        CatalogManager catalogManager = ignite.catalogManager();
+
+        int catalogVersion = catalogManager.latestCatalogVersion();
+
+        return catalogManager.indexes(catalogVersion).stream()
+                .filter(index -> {
+                    log.info("Scanned idx " + index.name());
+
+                    return idxName.equalsIgnoreCase(index.name());
+                })
+                .mapToInt(CatalogObjectDescriptor::id)
+                .findFirst()
+                .getAsInt();
+    }
+
     @Test
     public void testInsertWaitScanComplete() throws Exception {
         IgniteTransactions transactions = igniteTx();
@@ -158,7 +206,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest 
{
 
         assertFalse(updateKey2Fut.isDone());
 
-        subscription.request(1_000); // Request so much entries here to close 
the publisher.
+        subscription.request(1_000); // Request so  many entries here to close 
the publisher.
 
         assertThat(scanned, willCompleteSuccessfully());
 
@@ -167,7 +215,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest 
{
 
         assertFalse(insertKey99Fut.isDone());
 
-        log.info("Result: " + 
scannedRows.stream().map(ItTableScanTest::rowToString).collect(Collectors.joining(",
 ")));
+        log.info("Result: " + 
scannedRows.stream().map(this::rowToString).collect(Collectors.joining(", ")));
 
         assertEquals(ROW_IDS.size(), scannedRows.size());
 
@@ -185,7 +233,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest 
{
 
         List<BinaryRow> scannedRows = new ArrayList<>();
 
-        Publisher<BinaryRow> publisher = internalTable.scan(0, null, 
sortedIndexId, null, null, 0, null);
+        Publisher<BinaryRow> publisher = internalTable.scan(PART_ID, null, 
sortedIndexId, null, null, 0, null);
 
         CompletableFuture<Void> scanned = new CompletableFuture<>();
 
@@ -201,11 +249,11 @@ public class ItTableScanTest extends 
BaseSqlIntegrationTest {
 
         table.keyValueView().put(null, Tuple.create().set("key", 3), 
Tuple.create().set("valInt", 3).set("valStr", "New_3"));
 
-        subscription.request(1_000); // Request so much entries here to close 
the publisher.
+        subscription.request(1_000); // Request so many entries here to close 
the publisher.
 
         IgniteTestUtils.await(scanned);
 
-        log.info("Result: " + 
scannedRows.stream().map(ItTableScanTest::rowToString).collect(Collectors.joining(",
 ")));
+        log.info("Result: " + 
scannedRows.stream().map(this::rowToString).collect(Collectors.joining(", ")));
 
         assertEquals(ROW_IDS.size() + 1, scannedRows.size());
     }
@@ -351,7 +399,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest 
{
     /**
      * The method executes an operation, encapsulated in closure, during a 
pure table scan.
      *
-     * @param txOperationAction An closure to apply during the scan operation.
+     * @param txOperationAction A closure to apply during the scan operation.
      * @throws Exception If failed.
      */
     public void pureTableScan(Function<InternalTransaction, 
CompletableFuture<Integer>> txOperationAction) throws Exception {
@@ -361,7 +409,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest 
{
 
         List<BinaryRow> scannedRows = new ArrayList<>();
 
-        Publisher<BinaryRow> publisher = internalTable.scan(0, null, null, 
null, null, 0, null);
+        Publisher<BinaryRow> publisher = internalTable.scan(PART_ID, null, 
null, null, null, 0, null);
 
         CompletableFuture<Void> scanned = new CompletableFuture<>();
 
@@ -385,11 +433,11 @@ public class ItTableScanTest extends 
BaseSqlIntegrationTest {
         assertFalse(scanned.isDone());
         assertFalse(txOpFut.isDone());
 
-        subscription.request(1_000); // Request so much entries here to close 
the publisher.
+        subscription.request(1_000); // Request so many entries here to close 
the publisher.
 
         IgniteTestUtils.await(scanned);
 
-        log.info("Result: " + 
scannedRows.stream().map(ItTableScanTest::rowToString).collect(Collectors.joining(",
 ")));
+        log.info("Result: " + 
scannedRows.stream().map(this::rowToString).collect(Collectors.joining(", ")));
 
         assertThat(txOpFut, willCompleteSuccessfully());
 
@@ -397,7 +445,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest 
{
 
         assertEquals(ROW_IDS.size(), scannedRows.size());
 
-        var pub = internalTable.scan(0, null, null, null, null, 0, null);
+        var pub = internalTable.scan(PART_ID, null, null, null, null, 0, null);
 
         assertEquals(ROW_IDS.size() + txOpFut.get(), scanAllRows(pub).size());
     }
@@ -434,11 +482,11 @@ public class ItTableScanTest extends 
BaseSqlIntegrationTest {
 
         kvView.put(null, Tuple.create().set("key", 8), 
Tuple.create().set("valInt", 8).set("valStr", "New_8"));
 
-        subscription.request(1_000); // Request so much entries here to close 
the publisher.
+        subscription.request(1_000); // Request so many entries here to close 
the publisher.
 
         IgniteTestUtils.await(scanned);
 
-        log.info("Result: " + 
scannedRows.stream().map(ItTableScanTest::rowToString).collect(Collectors.joining(",
 ")));
+        log.info("Result: " + 
scannedRows.stream().map(this::rowToString).collect(Collectors.joining(", ")));
 
         assertEquals(ROW_IDS.size() + 1, scannedRows.size());
 
@@ -487,7 +535,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest 
{
 
         List<BinaryRow> scannedRows = scanAllRows(publisher);
 
-        log.info("Result of scanning in old transaction: " + 
scannedRows.stream().map(ItTableScanTest::rowToString)
+        log.info("Result of scanning in old transaction: " + 
scannedRows.stream().map(this::rowToString)
                 .collect(Collectors.joining(", ")));
 
         assertEquals(3, scannedRows.size());
@@ -536,7 +584,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest 
{
 
         assertEquals(5, scannedRows2.size());
 
-        log.info("Result of scanning after insert rows: " + 
scannedRows2.stream().map(ItTableScanTest::rowToString)
+        log.info("Result of scanning after insert rows: " + 
scannedRows2.stream().map(this::rowToString)
                 .collect(Collectors.joining(", ")));
     }
 
@@ -605,16 +653,42 @@ public class ItTableScanTest extends 
BaseSqlIntegrationTest {
      *
      * @param requestAmount1 Number of rows in the first request.
      * @param requestAmount2 Number of rows in the second request.
+     * @param readOnly If true, RO transaction is initiated, otherwise, RW 
transaction is initiated.
+     * @param implicit If false, an explicit transaction is initiated, 
otherwise, an implicit one.
      *
      * @throws Exception If failed.
      */
     @ParameterizedTest
-    @CsvSource({"3, 1", "1, 3"})
-    public void testCompositeScanRequest(int requestAmount1, int 
requestAmount2) throws Exception {
+    @CsvSource({"3, 1, false, false", "1, 3, false, false", "3, 1, true, 
false", "1, 3, true, false", "3, 1, false, true",
+            "1, 3, false, true"})
+    public void testCompositeScanRequest(int requestAmount1, int 
requestAmount2, boolean readOnly, boolean implicit) throws Exception {
         List<BinaryRow> scannedRows = new ArrayList<>();
-        Publisher<BinaryRow> publisher = internalTable.scan(0, null, null, 
null, null, 0, null);
-        CompletableFuture<Void> scanned = new CompletableFuture<>();
 
+        Publisher<BinaryRow> publisher;
+
+        InternalTransaction tx = null;
+
+        if (readOnly) {
+            IgniteImpl ignite = CLUSTER.aliveNode();
+
+            var tablePartId = new TablePartitionId(internalTable.tableId(), 
PART_ID);
+
+            ReplicaMeta primaryReplica = IgniteTestUtils.await(
+                    ignite.placementDriver().awaitPrimaryReplica(tablePartId, 
ignite.clock().now(), 30, TimeUnit.SECONDS));
+
+            ClusterNode recipientNode = 
ignite.clusterNodes().stream().filter(node -> 
node.name().equals(primaryReplica.getLeaseholder()))
+                    .findFirst().get();
+
+            publisher = internalTable.scan(PART_ID, ignite.clock().now(), 
recipientNode);
+        } else {
+            if (!implicit) {
+                tx = (InternalTransaction) 
CLUSTER.aliveNode().transactions().begin();
+            }
+
+            publisher = internalTable.scan(PART_ID, tx, null, null, null, 0, 
null);
+        }
+
+        CompletableFuture<Void> scanned = new CompletableFuture<>();
         Subscription subscription = subscribeToPublisher(scannedRows, 
publisher, scanned);
 
         subscription.request(requestAmount1);
@@ -626,6 +700,12 @@ public class ItTableScanTest extends 
BaseSqlIntegrationTest {
 
         subscription.cancel();
 
+        CLUSTER.runningNodes().forEach(this::checkCursorsAreClosed);
+
+        if (tx != null) {
+            tx.rollback();
+        }
+
         assertThat(scanned, willCompleteSuccessfully());
     }
 
@@ -689,8 +769,8 @@ public class ItTableScanTest extends BaseSqlIntegrationTest 
{
      * @param binaryRow Binary row.
      * @return String representation.
      */
-    private static String rowToString(BinaryRow binaryRow) {
-        Row row = Row.wrapBinaryRow(SCHEMA, binaryRow);
+    private String rowToString(BinaryRow binaryRow) {
+        Row row = Row.wrapBinaryRow(schema, binaryRow);
 
         return IgniteStringFormatter.format("[{}, {}, {}]", row.intValue(0), 
row.intValue(1), row.stringValue(2));
     }
@@ -708,7 +788,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest 
{
 
         Subscription subscription = subscribeToPublisher(scannedRows, 
publisher, scanned);
 
-        subscription.request(1_000); // Request so much entries here to close 
the publisher.
+        subscription.request(1_000); // Request so many entries here to close 
the publisher.
 
         assertTrue(waitForCondition(() -> scanned.isDone(), 10_000));
 
@@ -749,7 +829,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest 
{
      * Gets an index id.
      */
     private static int getSortedIndexId() {
-        CatalogManager catalogManager = ((IgniteImpl) 
CLUSTER.aliveNode()).catalogManager();
+        CatalogManager catalogManager = (CLUSTER.aliveNode()).catalogManager();
 
         int catalogVersion = catalogManager.latestCatalogVersion();
 
@@ -766,7 +846,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest 
{
      * @return Ignite table.
      */
     private static TableViewInternal getOrCreateTable() {
-        sql("CREATE ZONE IF NOT EXISTS ZONE1 WITH REPLICAS=1, PARTITIONS=1;");
+        sql("CREATE ZONE IF NOT EXISTS ZONE1 ENGINE " + 
TestStorageEngine.ENGINE_NAME + " WITH REPLICAS=1, PARTITIONS=1;");
 
         sql("CREATE TABLE IF NOT EXISTS " + TABLE_NAME
                 + " (key INTEGER PRIMARY KEY, valInt INTEGER NOT NULL, valStr 
VARCHAR NOT NULL) WITH PRIMARY_ZONE='ZONE1';");
@@ -792,14 +872,14 @@ public class ItTableScanTest extends 
BaseSqlIntegrationTest {
      * @param id Primary key.
      * @return Entire row.
      */
-    private static Row createKeyValueRow(int id) {
-        RowAssembler rowBuilder = new RowAssembler(SCHEMA);
+    private Row createKeyValueRow(int id) {
+        RowAssembler rowBuilder = new RowAssembler(schema);
 
         rowBuilder.appendInt(id);
         rowBuilder.appendInt(id);
         rowBuilder.appendString("StrNew_" + id);
 
-        return Row.wrapBinaryRow(SCHEMA, rowBuilder.build());
+        return Row.wrapBinaryRow(schema, rowBuilder.build());
     }
 
     /**
@@ -808,14 +888,14 @@ public class ItTableScanTest extends 
BaseSqlIntegrationTest {
      * @param id Primary key.
      * @return Entire row.
      */
-    private static Row createOldKeyValueRow(int id) {
-        RowAssembler rowBuilder = new RowAssembler(SCHEMA);
+    private Row createOldKeyValueRow(int id) {
+        RowAssembler rowBuilder = new RowAssembler(schema);
 
         rowBuilder.appendInt(id);
         rowBuilder.appendInt(id);
         rowBuilder.appendString("Str_" + id);
 
-        return Row.wrapBinaryRow(SCHEMA, rowBuilder.build());
+        return Row.wrapBinaryRow(schema, rowBuilder.build());
     }
 
     /**
@@ -824,12 +904,12 @@ public class ItTableScanTest extends 
BaseSqlIntegrationTest {
      * @param id Primary key.
      * @return Key row.
      */
-    private static Row createKeyRow(int id) {
-        RowAssembler rowBuilder = RowAssembler.keyAssembler(SCHEMA);
+    private Row createKeyRow(int id) {
+        RowAssembler rowBuilder = RowAssembler.keyAssembler(schema);
 
         rowBuilder.appendInt(id);
 
-        return Row.wrapKeyOnlyBinaryRow(SCHEMA, rowBuilder.build());
+        return Row.wrapKeyOnlyBinaryRow(schema, rowBuilder.build());
     }
 
     /**
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
index 00e1631768..47c4b630ca 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
@@ -46,11 +46,11 @@ import 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnly
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlySingleRowPkReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteMultiRowPkReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteMultiRowReplicaRequest;
-import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteScanCloseReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteScanRetrieveBatchReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowPkReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSwapRowReplicaRequest;
+import 
org.apache.ignite.internal.table.distributed.replication.request.ScanCloseReplicaRequest;
 
 /**
  * Message group for the table module.
@@ -81,9 +81,9 @@ public interface TableMessageGroup {
     short RW_SCAN_RETRIEVE_BATCH_REPLICA_REQUEST = 3;
 
     /**
-     * Message type for {@link ReadWriteScanCloseReplicaRequest}.
+     * Message type for {@link ScanCloseReplicaRequest}.
      */
-    short RW_SCAN_CLOSE_REPLICA_REQUEST = 4;
+    short SCAN_CLOSE_REPLICA_REQUEST = 4;
 
     /**
      * Message type for {@link HasDataRequest}.
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteScanCloseReplicaRequest.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteScanCloseReplicaRequest.java
deleted file mode 100644
index da4b29f688..0000000000
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteScanCloseReplicaRequest.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.table.distributed.replication.request;
-
-import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.table.distributed.TableMessageGroup;
-
-/**
- * Scan retrieve batch replica request.
- */
-@Transferable(TableMessageGroup.RW_SCAN_CLOSE_REPLICA_REQUEST)
-public interface ReadWriteScanCloseReplicaRequest extends 
ScanCloseReplicaRequest, ReadWriteReplicaRequest {
-}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ScanCloseReplicaRequest.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ScanCloseReplicaRequest.java
index 478031503a..13551b2c21 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ScanCloseReplicaRequest.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ScanCloseReplicaRequest.java
@@ -17,12 +17,18 @@
 
 package org.apache.ignite.internal.table.distributed.replication.request;
 
+import java.util.UUID;
+import org.apache.ignite.internal.network.annotations.Transferable;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.table.distributed.TableMessageGroup;
 
 /**
- * Scan retrieve batch replica request.
+ * Scan cursor close request.
  */
+@Transferable(TableMessageGroup.SCAN_CLOSE_REPLICA_REQUEST)
 public interface ScanCloseReplicaRequest extends ReplicaRequest {
+    UUID transactionId();
+
     /** The id uniquely determines a cursor for the transaction. */
     long scanId();
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 3a67be0fab..e663896260 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -148,11 +148,11 @@ import 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnly
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteMultiRowPkReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteMultiRowReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteReplicaRequest;
-import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteScanCloseReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteScanRetrieveBatchReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowPkReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSwapRowReplicaRequest;
+import 
org.apache.ignite.internal.table.distributed.replication.request.ScanCloseReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
 import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
 import 
org.apache.ignite.internal.table.distributed.schema.ValidationSchemasSource;
@@ -562,7 +562,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
     private CompletableFuture<Void> validateTableExistence(ReplicaRequest 
request, @Nullable HybridTimestamp opTsIfDirectRo) {
         HybridTimestamp opStartTs;
 
-        if (request instanceof ReadWriteScanCloseReplicaRequest) {
+        if (request instanceof ScanCloseReplicaRequest) {
             // We don't need to validate close request for table existence.
             opStartTs = null;
         } else if (request instanceof ReadWriteReplicaRequest) {
@@ -717,8 +717,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
                         return rows;
                     });
-        } else if (request instanceof ReadWriteScanCloseReplicaRequest) {
-            processScanCloseAction((ReadWriteScanCloseReplicaRequest) request);
+        } else if (request instanceof ScanCloseReplicaRequest) {
+            processScanCloseAction((ScanCloseReplicaRequest) request);
 
             return nullCompletedFuture();
         } else if (request instanceof TxFinishReplicaRequest) {
@@ -942,7 +942,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                     return rows;
                 });
             } else {
-                return completedFuture(rows);
+                return completedFuture(closeCursorIfBatchNotFull(rows, count, 
cursorId));
             }
         });
     }
@@ -1099,8 +1099,6 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             }
         }
 
-        txCursors.clear();
-
         if (ex != null) {
             throw ex;
         }
@@ -1111,11 +1109,35 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      *
      * @param request Scan close request operation.
      */
-    private void processScanCloseAction(ReadWriteScanCloseReplicaRequest 
request) {
+    private void processScanCloseAction(ScanCloseReplicaRequest request) {
         UUID txId = request.transactionId();
 
         IgniteUuid cursorId = new IgniteUuid(txId, request.scanId());
 
+        closeCursor(cursorId);
+    }
+
+    /**
+     * Closes a cursor if the batch is not fully retrieved.
+     *
+     * @param batchSize Requested batch size.
+     * @param rows List of retrieved rows.
+     * @param cursorId Cursor id.
+     */
+    private ArrayList<BinaryRow> 
closeCursorIfBatchNotFull(ArrayList<BinaryRow> rows, int batchSize, IgniteUuid 
cursorId) {
+        if (rows.size() < batchSize) {
+            closeCursor(cursorId);
+        }
+
+        return rows;
+    }
+
+    /**
+     * Closes a specific cursor.
+     *
+     * @param cursorId Cursor id.
+     */
+    private void closeCursor(IgniteUuid cursorId) {
         Cursor<?> cursor = cursors.remove(cursorId);
 
         if (cursor != null) {
@@ -1123,8 +1145,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 cursor.close();
             } catch (Exception e) {
                 throw new ReplicationException(Replicator.REPLICA_COMMON_ERR,
-                        format("Close cursor exception [replicaGrpId={}, 
msg={}]", replicationGroupId,
-                                e.getMessage()), e);
+                        format("Close cursor exception [replicaGrpId={}, 
msg={}]", replicationGroupId, e.getMessage()), e);
             }
         }
     }
@@ -1194,7 +1215,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         Cursor<IndexRow> indexRowCursor = CursorUtils.map(cursor, rowId -> new 
IndexRowImpl(key, rowId));
 
         return continueReadOnlyIndexScan(schemaAwareIndexStorage, 
indexRowCursor, timestamp, batchCount, result)
-                .thenCompose(ignore -> completedFuture(result));
+                .thenApply(ignore -> closeCursorIfBatchNotFull(result, 
batchCount, cursorId));
     }
 
     private CompletableFuture<List<BinaryRow>> lookupIndex(
@@ -1221,7 +1242,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                                     var result = new 
ArrayList<BinaryRow>(batchCount);
 
                                     return continueIndexLookup(txId, cursor, 
batchCount, result)
-                                            .thenApply(ignore -> result);
+                                            .thenApply(ignore -> 
closeCursorIfBatchNotFull(result, batchCount, cursorId));
                                 });
                     });
         });
@@ -1295,7 +1316,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
                         return continueIndexScan(txId, 
schemaAwareIndexStorage, indexLocker, cursor, batchCount, result,
                                 isUpperBoundAchieved)
-                                .thenApply(ignore -> result);
+                                .thenApply(ignore -> 
closeCursorIfBatchNotFull(result, batchCount, cursorId));
                     });
         });
     }
@@ -1337,7 +1358,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         var result = new ArrayList<BinaryRow>(batchCount);
 
         return continueReadOnlyIndexScan(schemaAwareIndexStorage, cursor, 
timestamp, batchCount, result)
-                .thenApply(ignore -> result);
+                .thenApply(ignore -> closeCursorIfBatchNotFull(result, 
batchCount, cursorId));
     }
 
     private CompletableFuture<Void> continueReadOnlyIndexScan(
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 9a0a6f0c35..f34f3dfed5 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -97,6 +97,7 @@ import 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnly
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteMultiRowPkReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteMultiRowReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteScanRetrieveBatchReplicaRequest;
+import 
org.apache.ignite.internal.table.distributed.replication.request.ScanCloseReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.SingleRowPkReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.SingleRowReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.SwapRowReplicaRequest;
@@ -1354,10 +1355,10 @@ public class InternalTableImpl implements InternalTable 
{
 
         UUID txId = UUID.randomUUID();
 
+        ReplicationGroupId partGroupId = 
raftGroupServiceByPartitionId.get(partId).groupId();
+
         return new PartitionScanPublisher(
                 (scanId, batchSize) -> {
-                    ReplicationGroupId partGroupId = 
raftGroupServiceByPartitionId.get(partId).groupId();
-
                     ReadOnlyScanRetrieveBatchReplicaRequest request = 
tableMessagesFactory.readOnlyScanRetrieveBatchReplicaRequest()
                             .groupId(partGroupId)
                             .readTimestampLong(readTimestamp.longValue())
@@ -1376,7 +1377,8 @@ public class InternalTableImpl implements InternalTable {
                     return replicaSvc.invoke(recipientNode, request);
                 },
                 // TODO: IGNITE-17666 Close cursor tx finish.
-                (unused, fut) -> fut);
+                (intentionallyClose, fut) -> completeScan(txId, partGroupId, 
fut, recipientNode, intentionallyClose)
+        );
     }
 
     @Override
@@ -1431,7 +1433,25 @@ public class InternalTableImpl implements InternalTable {
                         columnsToInclude,
                         implicit
                 ),
-                (commit, fut) -> postEnlist(fut, commit, actualTx, implicit && 
!commit)
+                (intentionallyClose, fut) -> {
+                    CompletableFuture<Void> opFut;
+
+                    if (implicit) {
+                        opFut = fut.thenApply(cursorId -> null);
+                    } else {
+                        var replicationGrpId = new TablePartitionId(tableId, 
partId);
+
+                        opFut = 
tx.enlistedNodeAndConsistencyToken(replicationGrpId) != null ? completeScan(
+                                tx.id(),
+                                replicationGrpId,
+                                fut,
+                                
tx.enlistedNodeAndConsistencyToken(replicationGrpId).get1(),
+                                intentionallyClose
+                        ) : fut.thenApply(cursorId -> null);
+                    }
+
+                    return postEnlist(opFut, intentionallyClose, actualTx, 
implicit && !intentionallyClose);
+                }
         );
     }
 
@@ -1463,10 +1483,10 @@ public class InternalTableImpl implements InternalTable 
{
             int flags,
             @Nullable BitSet columnsToInclude
     ) {
+        ReplicationGroupId partGroupId = 
raftGroupServiceByPartitionId.get(partId).groupId();
+
         return new PartitionScanPublisher(
                 (scanId, batchSize) -> {
-                    ReplicationGroupId partGroupId = 
raftGroupServiceByPartitionId.get(partId).groupId();
-
                     ReadWriteScanRetrieveBatchReplicaRequest request = 
tableMessagesFactory.readWriteScanRetrieveBatchReplicaRequest()
                             .groupId(partGroupId)
                             .timestampLong(clock.nowLong())
@@ -1487,7 +1507,40 @@ public class InternalTableImpl implements InternalTable {
                     return replicaSvc.invoke(recipient.node(), request);
                 },
                 // TODO: IGNITE-17666 Close cursor tx finish.
-                (unused, fut) -> fut);
+                (intentionallyClose, fut) -> completeScan(txId, partGroupId, 
fut, recipient.node(), intentionallyClose));
+    }
+
+    /**
+     * Closes the cursor on server side.
+     *
+     * @param txId Transaction id.
+     * @param replicaGrpId Replication group id.
+     * @param scanIdFut Future to scan id.
+     * @param recipientNode Server node where the scan was started.
+     * @param intentionallyClose The flag is true when the scan was 
intentionally closed on the initiator side and false when the
+     *         scan cursor has no more entries to read.
+     * @return The future.
+     */
+    private CompletableFuture<Void> completeScan(
+            UUID txId,
+            ReplicationGroupId replicaGrpId,
+            CompletableFuture<Long> scanIdFut,
+            ClusterNode recipientNode,
+            boolean intentionallyClose
+    ) {
+        return scanIdFut.thenCompose(scanId -> {
+            if (intentionallyClose) {
+                ScanCloseReplicaRequest scanCloseReplicaRequest = 
tableMessagesFactory.scanCloseReplicaRequest()
+                        .groupId(replicaGrpId)
+                        .transactionId(txId)
+                        .scanId(scanId)
+                        .build();
+
+                return replicaSvc.invoke(recipientNode, 
scanCloseReplicaRequest);
+            }
+
+            return nullCompletedFuture();
+        });
     }
 
     /**
@@ -1759,7 +1812,7 @@ public class InternalTableImpl implements InternalTable {
         private final BiFunction<Long, Integer, 
CompletableFuture<Collection<BinaryRow>>> retrieveBatch;
 
         /** The closure will be invoked before the cursor closed. */
-        BiFunction<Boolean, CompletableFuture<Void>, CompletableFuture<Void>> 
onClose;
+        BiFunction<Boolean, CompletableFuture<Long>, CompletableFuture<Void>> 
onClose;
 
         /** True when the publisher has a subscriber, false otherwise. */
         private final AtomicBoolean subscribed;
@@ -1773,7 +1826,7 @@ public class InternalTableImpl implements InternalTable {
          */
         PartitionScanPublisher(
                 BiFunction<Long, Integer, 
CompletableFuture<Collection<BinaryRow>>> retrieveBatch,
-                BiFunction<Boolean, CompletableFuture<Void>, 
CompletableFuture<Void>> onClose
+                BiFunction<Boolean, CompletableFuture<Long>, 
CompletableFuture<Void>> onClose
         ) {
             this.retrieveBatch = retrieveBatch;
             this.onClose = onClose;
@@ -1864,21 +1917,20 @@ public class InternalTableImpl implements InternalTable 
{
              * After the method is called, a subscriber won't be received 
updates from the publisher.
              *
              * @param t An exception which was thrown when entries were 
retrieving from the cursor.
-             * @param commit {@code True} to commit.
+             * @param intentionallyClose True if the subscription is closed 
for the client side.
+             * @return Future to complete.
              */
-            private void cancel(Throwable t, boolean commit) {
+            private void cancel(Throwable t, boolean intentionallyClose) {
                 if (!canceled.compareAndSet(false, true)) {
                     return;
                 }
 
-                onClose.apply(commit, t == null ? nullCompletedFuture() : 
failedFuture(t)).handle((ignore, th) -> {
+                onClose.apply(intentionallyClose, t == null ? 
completedFuture(scanId) : failedFuture(t)).whenComplete((ignore, th) -> {
                     if (th != null) {
                         subscriber.onError(th);
                     } else {
                         subscriber.onComplete();
                     }
-
-                    return null;
                 });
             }
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index e8a7c6b9d9..a94b718741 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -1871,10 +1871,9 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
 
     private CompletableFuture<?> doRwScanCloseRequest(UUID targetTxId) {
         return partitionReplicaListener.invoke(
-                TABLE_MESSAGES_FACTORY.readWriteScanCloseReplicaRequest()
+                TABLE_MESSAGES_FACTORY.scanCloseReplicaRequest()
                         .groupId(grpId)
                         .transactionId(targetTxId)
-                        .enlistmentConsistencyToken(1L)
                         .scanId(1)
                         .build(),
                 localNode.id()


Reply via email to