This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 947103e94e IGNITE-21290 Scan cursors do not close after being fully
read in transactions (#3069)
947103e94e is described below
commit 947103e94ed9fca294cf8f088368d63c4a706821
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Mon Feb 5 11:06:34 2024 +0300
IGNITE-21290 Scan cursors do not close after being fully read in
transactions (#3069)
---
.../ignite/internal/table/ItTableScanTest.java | 164 +++++++++++++++------
.../table/distributed/TableMessageGroup.java | 6 +-
.../request/ReadWriteScanCloseReplicaRequest.java | 28 ----
.../request/ScanCloseReplicaRequest.java | 8 +-
.../replicator/PartitionReplicaListener.java | 49 ++++--
.../distributed/storage/InternalTableImpl.java | 80 ++++++++--
.../replication/PartitionReplicaListenerTest.java | 3 +-
7 files changed, 234 insertions(+), 104 deletions(-)
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
index 68a0ba44bf..3ca9b399b8 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
@@ -59,14 +59,15 @@ import
org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTuplePrefix;
-import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.schema.row.RowAssembler;
import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
+import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.storage.impl.TestStorageEngine;
+import org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.tx.InternalTransaction;
-import org.apache.ignite.internal.type.NativeTypes;
import org.apache.ignite.internal.utils.PrimaryReplica;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.table.KeyValueView;
@@ -98,14 +99,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest {
/** The only partition in the table. */
private static final int PART_ID = 0;
- private static final SchemaDescriptor SCHEMA = new SchemaDescriptor(
- 1,
- new Column[]{new Column("key", NativeTypes.INT32, false)},
- new Column[]{
- new Column("valInt", NativeTypes.INT32, false),
- new Column("valStr", NativeTypes.STRING, false)
- }
- );
+ private SchemaDescriptor schema;
private TableViewInternal table;
@@ -117,14 +111,68 @@ public class ItTableScanTest extends
BaseSqlIntegrationTest {
internalTable = table.internalTable();
+ schema = table.schemaView().lastKnownSchema();
+
loadData(table);
}
@AfterEach
public void afterTest() {
+ CLUSTER.runningNodes().forEach(this::checkResourcesAreReleased);
+
clearData(table);
}
+ /**
+ * Checks all transaction resources are released (cursors and locks).
+ *
+ * @param ignite Ignite instance.
+ */
+ private void checkResourcesAreReleased(IgniteImpl ignite) {
+ checkCursorsAreClosed(ignite);
+
+ assertTrue(ignite.txManager().lockManager().isEmpty());
+ }
+
+ /**
+ * Checks all transaction cursors are closed.
+ *
+ * @param ignite Ignite instance.
+ */
+ private void checkCursorsAreClosed(IgniteImpl ignite) {
+ int sortedIdxId = getIndexId(ignite, SORTED_IDX);
+
+ var partitionStorage = (TestMvPartitionStorage) ((TableViewInternal)
ignite.tables().table(TABLE_NAME))
+ .internalTable().storage().getMvPartition(PART_ID);
+ var sortedIdxStorage = (TestSortedIndexStorage) ((TableViewInternal)
ignite.tables().table(TABLE_NAME))
+ .internalTable().storage().getIndex(PART_ID, sortedIdxId);
+
+ assertEquals(0, partitionStorage.pendingCursors());
+ assertEquals(0, sortedIdxStorage.pendingCursors());
+ }
+
+ /**
+ * Gets index id by name.
+ *
+ * @param idxName Index name.
+ * @return Index id.
+ */
+ private int getIndexId(IgniteImpl ignite, String idxName) {
+ CatalogManager catalogManager = ignite.catalogManager();
+
+ int catalogVersion = catalogManager.latestCatalogVersion();
+
+ return catalogManager.indexes(catalogVersion).stream()
+ .filter(index -> {
+ log.info("Scanned idx " + index.name());
+
+ return idxName.equalsIgnoreCase(index.name());
+ })
+ .mapToInt(CatalogObjectDescriptor::id)
+ .findFirst()
+ .getAsInt();
+ }
+
@Test
public void testInsertWaitScanComplete() throws Exception {
IgniteTransactions transactions = igniteTx();
@@ -158,7 +206,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest
{
assertFalse(updateKey2Fut.isDone());
- subscription.request(1_000); // Request so much entries here to close
the publisher.
+ subscription.request(1_000); // Request so many entries here to close
the publisher.
assertThat(scanned, willCompleteSuccessfully());
@@ -167,7 +215,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest
{
assertFalse(insertKey99Fut.isDone());
- log.info("Result: " +
scannedRows.stream().map(ItTableScanTest::rowToString).collect(Collectors.joining(",
")));
+ log.info("Result: " +
scannedRows.stream().map(this::rowToString).collect(Collectors.joining(", ")));
assertEquals(ROW_IDS.size(), scannedRows.size());
@@ -185,7 +233,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest
{
List<BinaryRow> scannedRows = new ArrayList<>();
- Publisher<BinaryRow> publisher = internalTable.scan(0, null,
sortedIndexId, null, null, 0, null);
+ Publisher<BinaryRow> publisher = internalTable.scan(PART_ID, null,
sortedIndexId, null, null, 0, null);
CompletableFuture<Void> scanned = new CompletableFuture<>();
@@ -201,11 +249,11 @@ public class ItTableScanTest extends
BaseSqlIntegrationTest {
table.keyValueView().put(null, Tuple.create().set("key", 3),
Tuple.create().set("valInt", 3).set("valStr", "New_3"));
- subscription.request(1_000); // Request so much entries here to close
the publisher.
+ subscription.request(1_000); // Request so many entries here to close
the publisher.
IgniteTestUtils.await(scanned);
- log.info("Result: " +
scannedRows.stream().map(ItTableScanTest::rowToString).collect(Collectors.joining(",
")));
+ log.info("Result: " +
scannedRows.stream().map(this::rowToString).collect(Collectors.joining(", ")));
assertEquals(ROW_IDS.size() + 1, scannedRows.size());
}
@@ -351,7 +399,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest
{
/**
* The method executes an operation, encapsulated in closure, during a
pure table scan.
*
- * @param txOperationAction An closure to apply during the scan operation.
+ * @param txOperationAction A closure to apply during the scan operation.
* @throws Exception If failed.
*/
public void pureTableScan(Function<InternalTransaction,
CompletableFuture<Integer>> txOperationAction) throws Exception {
@@ -361,7 +409,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest
{
List<BinaryRow> scannedRows = new ArrayList<>();
- Publisher<BinaryRow> publisher = internalTable.scan(0, null, null,
null, null, 0, null);
+ Publisher<BinaryRow> publisher = internalTable.scan(PART_ID, null,
null, null, null, 0, null);
CompletableFuture<Void> scanned = new CompletableFuture<>();
@@ -385,11 +433,11 @@ public class ItTableScanTest extends
BaseSqlIntegrationTest {
assertFalse(scanned.isDone());
assertFalse(txOpFut.isDone());
- subscription.request(1_000); // Request so much entries here to close
the publisher.
+ subscription.request(1_000); // Request so many entries here to close
the publisher.
IgniteTestUtils.await(scanned);
- log.info("Result: " +
scannedRows.stream().map(ItTableScanTest::rowToString).collect(Collectors.joining(",
")));
+ log.info("Result: " +
scannedRows.stream().map(this::rowToString).collect(Collectors.joining(", ")));
assertThat(txOpFut, willCompleteSuccessfully());
@@ -397,7 +445,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest
{
assertEquals(ROW_IDS.size(), scannedRows.size());
- var pub = internalTable.scan(0, null, null, null, null, 0, null);
+ var pub = internalTable.scan(PART_ID, null, null, null, null, 0, null);
assertEquals(ROW_IDS.size() + txOpFut.get(), scanAllRows(pub).size());
}
@@ -434,11 +482,11 @@ public class ItTableScanTest extends
BaseSqlIntegrationTest {
kvView.put(null, Tuple.create().set("key", 8),
Tuple.create().set("valInt", 8).set("valStr", "New_8"));
- subscription.request(1_000); // Request so much entries here to close
the publisher.
+ subscription.request(1_000); // Request so many entries here to close
the publisher.
IgniteTestUtils.await(scanned);
- log.info("Result: " +
scannedRows.stream().map(ItTableScanTest::rowToString).collect(Collectors.joining(",
")));
+ log.info("Result: " +
scannedRows.stream().map(this::rowToString).collect(Collectors.joining(", ")));
assertEquals(ROW_IDS.size() + 1, scannedRows.size());
@@ -487,7 +535,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest
{
List<BinaryRow> scannedRows = scanAllRows(publisher);
- log.info("Result of scanning in old transaction: " +
scannedRows.stream().map(ItTableScanTest::rowToString)
+ log.info("Result of scanning in old transaction: " +
scannedRows.stream().map(this::rowToString)
.collect(Collectors.joining(", ")));
assertEquals(3, scannedRows.size());
@@ -536,7 +584,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest
{
assertEquals(5, scannedRows2.size());
- log.info("Result of scanning after insert rows: " +
scannedRows2.stream().map(ItTableScanTest::rowToString)
+ log.info("Result of scanning after insert rows: " +
scannedRows2.stream().map(this::rowToString)
.collect(Collectors.joining(", ")));
}
@@ -605,16 +653,42 @@ public class ItTableScanTest extends
BaseSqlIntegrationTest {
*
* @param requestAmount1 Number of rows in the first request.
* @param requestAmount2 Number of rows in the second request.
+ * @param readOnly If true, RO transaction is initiated, otherwise, RW
transaction is initiated.
+ * @param implicit If false, an explicit transaction is initiated,
otherwise, an implicit one.
*
* @throws Exception If failed.
*/
@ParameterizedTest
- @CsvSource({"3, 1", "1, 3"})
- public void testCompositeScanRequest(int requestAmount1, int
requestAmount2) throws Exception {
+ @CsvSource({"3, 1, false, false", "1, 3, false, false", "3, 1, true,
false", "1, 3, true, false", "3, 1, false, true",
+ "1, 3, false, true"})
+ public void testCompositeScanRequest(int requestAmount1, int
requestAmount2, boolean readOnly, boolean implicit) throws Exception {
List<BinaryRow> scannedRows = new ArrayList<>();
- Publisher<BinaryRow> publisher = internalTable.scan(0, null, null,
null, null, 0, null);
- CompletableFuture<Void> scanned = new CompletableFuture<>();
+ Publisher<BinaryRow> publisher;
+
+ InternalTransaction tx = null;
+
+ if (readOnly) {
+ IgniteImpl ignite = CLUSTER.aliveNode();
+
+ var tablePartId = new TablePartitionId(internalTable.tableId(),
PART_ID);
+
+ ReplicaMeta primaryReplica = IgniteTestUtils.await(
+ ignite.placementDriver().awaitPrimaryReplica(tablePartId,
ignite.clock().now(), 30, TimeUnit.SECONDS));
+
+ ClusterNode recipientNode =
ignite.clusterNodes().stream().filter(node ->
node.name().equals(primaryReplica.getLeaseholder()))
+ .findFirst().get();
+
+ publisher = internalTable.scan(PART_ID, ignite.clock().now(),
recipientNode);
+ } else {
+ if (!implicit) {
+ tx = (InternalTransaction)
CLUSTER.aliveNode().transactions().begin();
+ }
+
+ publisher = internalTable.scan(PART_ID, tx, null, null, null, 0,
null);
+ }
+
+ CompletableFuture<Void> scanned = new CompletableFuture<>();
Subscription subscription = subscribeToPublisher(scannedRows,
publisher, scanned);
subscription.request(requestAmount1);
@@ -626,6 +700,12 @@ public class ItTableScanTest extends
BaseSqlIntegrationTest {
subscription.cancel();
+ CLUSTER.runningNodes().forEach(this::checkCursorsAreClosed);
+
+ if (tx != null) {
+ tx.rollback();
+ }
+
assertThat(scanned, willCompleteSuccessfully());
}
@@ -689,8 +769,8 @@ public class ItTableScanTest extends BaseSqlIntegrationTest
{
* @param binaryRow Binary row.
* @return String representation.
*/
- private static String rowToString(BinaryRow binaryRow) {
- Row row = Row.wrapBinaryRow(SCHEMA, binaryRow);
+ private String rowToString(BinaryRow binaryRow) {
+ Row row = Row.wrapBinaryRow(schema, binaryRow);
return IgniteStringFormatter.format("[{}, {}, {}]", row.intValue(0),
row.intValue(1), row.stringValue(2));
}
@@ -708,7 +788,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest
{
Subscription subscription = subscribeToPublisher(scannedRows,
publisher, scanned);
- subscription.request(1_000); // Request so much entries here to close
the publisher.
+ subscription.request(1_000); // Request so many entries here to close
the publisher.
assertTrue(waitForCondition(() -> scanned.isDone(), 10_000));
@@ -749,7 +829,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest
{
* Gets an index id.
*/
private static int getSortedIndexId() {
- CatalogManager catalogManager = ((IgniteImpl)
CLUSTER.aliveNode()).catalogManager();
+ CatalogManager catalogManager = (CLUSTER.aliveNode()).catalogManager();
int catalogVersion = catalogManager.latestCatalogVersion();
@@ -766,7 +846,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest
{
* @return Ignite table.
*/
private static TableViewInternal getOrCreateTable() {
- sql("CREATE ZONE IF NOT EXISTS ZONE1 WITH REPLICAS=1, PARTITIONS=1;");
+ sql("CREATE ZONE IF NOT EXISTS ZONE1 ENGINE " +
TestStorageEngine.ENGINE_NAME + " WITH REPLICAS=1, PARTITIONS=1;");
sql("CREATE TABLE IF NOT EXISTS " + TABLE_NAME
+ " (key INTEGER PRIMARY KEY, valInt INTEGER NOT NULL, valStr
VARCHAR NOT NULL) WITH PRIMARY_ZONE='ZONE1';");
@@ -792,14 +872,14 @@ public class ItTableScanTest extends
BaseSqlIntegrationTest {
* @param id Primary key.
* @return Entire row.
*/
- private static Row createKeyValueRow(int id) {
- RowAssembler rowBuilder = new RowAssembler(SCHEMA);
+ private Row createKeyValueRow(int id) {
+ RowAssembler rowBuilder = new RowAssembler(schema);
rowBuilder.appendInt(id);
rowBuilder.appendInt(id);
rowBuilder.appendString("StrNew_" + id);
- return Row.wrapBinaryRow(SCHEMA, rowBuilder.build());
+ return Row.wrapBinaryRow(schema, rowBuilder.build());
}
/**
@@ -808,14 +888,14 @@ public class ItTableScanTest extends
BaseSqlIntegrationTest {
* @param id Primary key.
* @return Entire row.
*/
- private static Row createOldKeyValueRow(int id) {
- RowAssembler rowBuilder = new RowAssembler(SCHEMA);
+ private Row createOldKeyValueRow(int id) {
+ RowAssembler rowBuilder = new RowAssembler(schema);
rowBuilder.appendInt(id);
rowBuilder.appendInt(id);
rowBuilder.appendString("Str_" + id);
- return Row.wrapBinaryRow(SCHEMA, rowBuilder.build());
+ return Row.wrapBinaryRow(schema, rowBuilder.build());
}
/**
@@ -824,12 +904,12 @@ public class ItTableScanTest extends
BaseSqlIntegrationTest {
* @param id Primary key.
* @return Key row.
*/
- private static Row createKeyRow(int id) {
- RowAssembler rowBuilder = RowAssembler.keyAssembler(SCHEMA);
+ private Row createKeyRow(int id) {
+ RowAssembler rowBuilder = RowAssembler.keyAssembler(schema);
rowBuilder.appendInt(id);
- return Row.wrapKeyOnlyBinaryRow(SCHEMA, rowBuilder.build());
+ return Row.wrapKeyOnlyBinaryRow(schema, rowBuilder.build());
}
/**
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
index 00e1631768..47c4b630ca 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
@@ -46,11 +46,11 @@ import
org.apache.ignite.internal.table.distributed.replication.request.ReadOnly
import
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlySingleRowPkReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteMultiRowPkReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteMultiRowReplicaRequest;
-import
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteScanCloseReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteScanRetrieveBatchReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowPkReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSwapRowReplicaRequest;
+import
org.apache.ignite.internal.table.distributed.replication.request.ScanCloseReplicaRequest;
/**
* Message group for the table module.
@@ -81,9 +81,9 @@ public interface TableMessageGroup {
short RW_SCAN_RETRIEVE_BATCH_REPLICA_REQUEST = 3;
/**
- * Message type for {@link ReadWriteScanCloseReplicaRequest}.
+ * Message type for {@link ScanCloseReplicaRequest}.
*/
- short RW_SCAN_CLOSE_REPLICA_REQUEST = 4;
+ short SCAN_CLOSE_REPLICA_REQUEST = 4;
/**
* Message type for {@link HasDataRequest}.
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteScanCloseReplicaRequest.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteScanCloseReplicaRequest.java
deleted file mode 100644
index da4b29f688..0000000000
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteScanCloseReplicaRequest.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.table.distributed.replication.request;
-
-import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.table.distributed.TableMessageGroup;
-
-/**
- * Scan retrieve batch replica request.
- */
-@Transferable(TableMessageGroup.RW_SCAN_CLOSE_REPLICA_REQUEST)
-public interface ReadWriteScanCloseReplicaRequest extends
ScanCloseReplicaRequest, ReadWriteReplicaRequest {
-}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ScanCloseReplicaRequest.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ScanCloseReplicaRequest.java
index 478031503a..13551b2c21 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ScanCloseReplicaRequest.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ScanCloseReplicaRequest.java
@@ -17,12 +17,18 @@
package org.apache.ignite.internal.table.distributed.replication.request;
+import java.util.UUID;
+import org.apache.ignite.internal.network.annotations.Transferable;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.table.distributed.TableMessageGroup;
/**
- * Scan retrieve batch replica request.
+ * Scan cursor close request.
*/
+@Transferable(TableMessageGroup.SCAN_CLOSE_REPLICA_REQUEST)
public interface ScanCloseReplicaRequest extends ReplicaRequest {
+ UUID transactionId();
+
/** The id uniquely determines a cursor for the transaction. */
long scanId();
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 3a67be0fab..e663896260 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -148,11 +148,11 @@ import
org.apache.ignite.internal.table.distributed.replication.request.ReadOnly
import
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteMultiRowPkReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteMultiRowReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteReplicaRequest;
-import
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteScanCloseReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteScanRetrieveBatchReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowPkReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSwapRowReplicaRequest;
+import
org.apache.ignite.internal.table.distributed.replication.request.ScanCloseReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
import
org.apache.ignite.internal.table.distributed.schema.ValidationSchemasSource;
@@ -562,7 +562,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
private CompletableFuture<Void> validateTableExistence(ReplicaRequest
request, @Nullable HybridTimestamp opTsIfDirectRo) {
HybridTimestamp opStartTs;
- if (request instanceof ReadWriteScanCloseReplicaRequest) {
+ if (request instanceof ScanCloseReplicaRequest) {
// We don't need to validate close request for table existence.
opStartTs = null;
} else if (request instanceof ReadWriteReplicaRequest) {
@@ -717,8 +717,8 @@ public class PartitionReplicaListener implements
ReplicaListener {
return rows;
});
- } else if (request instanceof ReadWriteScanCloseReplicaRequest) {
- processScanCloseAction((ReadWriteScanCloseReplicaRequest) request);
+ } else if (request instanceof ScanCloseReplicaRequest) {
+ processScanCloseAction((ScanCloseReplicaRequest) request);
return nullCompletedFuture();
} else if (request instanceof TxFinishReplicaRequest) {
@@ -942,7 +942,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
return rows;
});
} else {
- return completedFuture(rows);
+ return completedFuture(closeCursorIfBatchNotFull(rows, count,
cursorId));
}
});
}
@@ -1099,8 +1099,6 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
}
- txCursors.clear();
-
if (ex != null) {
throw ex;
}
@@ -1111,11 +1109,35 @@ public class PartitionReplicaListener implements
ReplicaListener {
*
* @param request Scan close request operation.
*/
- private void processScanCloseAction(ReadWriteScanCloseReplicaRequest
request) {
+ private void processScanCloseAction(ScanCloseReplicaRequest request) {
UUID txId = request.transactionId();
IgniteUuid cursorId = new IgniteUuid(txId, request.scanId());
+ closeCursor(cursorId);
+ }
+
+ /**
+ * Closes a cursor if the batch is not fully retrieved.
+ *
+ * @param batchSize Requested batch size.
+ * @param rows List of retrieved rows.
+ * @param cursorId Cursor id.
+ */
+ private ArrayList<BinaryRow>
closeCursorIfBatchNotFull(ArrayList<BinaryRow> rows, int batchSize, IgniteUuid
cursorId) {
+ if (rows.size() < batchSize) {
+ closeCursor(cursorId);
+ }
+
+ return rows;
+ }
+
+ /**
+ * Closes a specific cursor.
+ *
+ * @param cursorId Cursor id.
+ */
+ private void closeCursor(IgniteUuid cursorId) {
Cursor<?> cursor = cursors.remove(cursorId);
if (cursor != null) {
@@ -1123,8 +1145,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
cursor.close();
} catch (Exception e) {
throw new ReplicationException(Replicator.REPLICA_COMMON_ERR,
- format("Close cursor exception [replicaGrpId={},
msg={}]", replicationGroupId,
- e.getMessage()), e);
+ format("Close cursor exception [replicaGrpId={},
msg={}]", replicationGroupId, e.getMessage()), e);
}
}
}
@@ -1194,7 +1215,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
Cursor<IndexRow> indexRowCursor = CursorUtils.map(cursor, rowId -> new
IndexRowImpl(key, rowId));
return continueReadOnlyIndexScan(schemaAwareIndexStorage,
indexRowCursor, timestamp, batchCount, result)
- .thenCompose(ignore -> completedFuture(result));
+ .thenApply(ignore -> closeCursorIfBatchNotFull(result,
batchCount, cursorId));
}
private CompletableFuture<List<BinaryRow>> lookupIndex(
@@ -1221,7 +1242,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
var result = new
ArrayList<BinaryRow>(batchCount);
return continueIndexLookup(txId, cursor,
batchCount, result)
- .thenApply(ignore -> result);
+ .thenApply(ignore ->
closeCursorIfBatchNotFull(result, batchCount, cursorId));
});
});
});
@@ -1295,7 +1316,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
return continueIndexScan(txId,
schemaAwareIndexStorage, indexLocker, cursor, batchCount, result,
isUpperBoundAchieved)
- .thenApply(ignore -> result);
+ .thenApply(ignore ->
closeCursorIfBatchNotFull(result, batchCount, cursorId));
});
});
}
@@ -1337,7 +1358,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
var result = new ArrayList<BinaryRow>(batchCount);
return continueReadOnlyIndexScan(schemaAwareIndexStorage, cursor,
timestamp, batchCount, result)
- .thenApply(ignore -> result);
+ .thenApply(ignore -> closeCursorIfBatchNotFull(result,
batchCount, cursorId));
}
private CompletableFuture<Void> continueReadOnlyIndexScan(
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 9a0a6f0c35..f34f3dfed5 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -97,6 +97,7 @@ import
org.apache.ignite.internal.table.distributed.replication.request.ReadOnly
import
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteMultiRowPkReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteMultiRowReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteScanRetrieveBatchReplicaRequest;
+import
org.apache.ignite.internal.table.distributed.replication.request.ScanCloseReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replication.request.SingleRowPkReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replication.request.SingleRowReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replication.request.SwapRowReplicaRequest;
@@ -1354,10 +1355,10 @@ public class InternalTableImpl implements InternalTable
{
UUID txId = UUID.randomUUID();
+ ReplicationGroupId partGroupId =
raftGroupServiceByPartitionId.get(partId).groupId();
+
return new PartitionScanPublisher(
(scanId, batchSize) -> {
- ReplicationGroupId partGroupId =
raftGroupServiceByPartitionId.get(partId).groupId();
-
ReadOnlyScanRetrieveBatchReplicaRequest request =
tableMessagesFactory.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(partGroupId)
.readTimestampLong(readTimestamp.longValue())
@@ -1376,7 +1377,8 @@ public class InternalTableImpl implements InternalTable {
return replicaSvc.invoke(recipientNode, request);
},
// TODO: IGNITE-17666 Close cursor tx finish.
- (unused, fut) -> fut);
+ (intentionallyClose, fut) -> completeScan(txId, partGroupId,
fut, recipientNode, intentionallyClose)
+ );
}
@Override
@@ -1431,7 +1433,25 @@ public class InternalTableImpl implements InternalTable {
columnsToInclude,
implicit
),
- (commit, fut) -> postEnlist(fut, commit, actualTx, implicit &&
!commit)
+ (intentionallyClose, fut) -> {
+ CompletableFuture<Void> opFut;
+
+ if (implicit) {
+ opFut = fut.thenApply(cursorId -> null);
+ } else {
+ var replicationGrpId = new TablePartitionId(tableId,
partId);
+
+ opFut =
tx.enlistedNodeAndConsistencyToken(replicationGrpId) != null ? completeScan(
+ tx.id(),
+ replicationGrpId,
+ fut,
+
tx.enlistedNodeAndConsistencyToken(replicationGrpId).get1(),
+ intentionallyClose
+ ) : fut.thenApply(cursorId -> null);
+ }
+
+ return postEnlist(opFut, intentionallyClose, actualTx,
implicit && !intentionallyClose);
+ }
);
}
@@ -1463,10 +1483,10 @@ public class InternalTableImpl implements InternalTable
{
int flags,
@Nullable BitSet columnsToInclude
) {
+ ReplicationGroupId partGroupId =
raftGroupServiceByPartitionId.get(partId).groupId();
+
return new PartitionScanPublisher(
(scanId, batchSize) -> {
- ReplicationGroupId partGroupId =
raftGroupServiceByPartitionId.get(partId).groupId();
-
ReadWriteScanRetrieveBatchReplicaRequest request =
tableMessagesFactory.readWriteScanRetrieveBatchReplicaRequest()
.groupId(partGroupId)
.timestampLong(clock.nowLong())
@@ -1487,7 +1507,40 @@ public class InternalTableImpl implements InternalTable {
return replicaSvc.invoke(recipient.node(), request);
},
// TODO: IGNITE-17666 Close cursor tx finish.
- (unused, fut) -> fut);
+ (intentionallyClose, fut) -> completeScan(txId, partGroupId,
fut, recipient.node(), intentionallyClose));
+ }
+
+ /**
+ * Closes the cursor on server side.
+ *
+ * @param txId Transaction id.
+ * @param replicaGrpId Replication group id.
+ * @param scanIdFut Future to scan id.
+ * @param recipientNode Server node where the scan was started.
+ * @param intentionallyClose The flag is true when the scan was
intentionally closed on the initiator side and false when the
+ * scan cursor has no more entries to read.
+ * @return The future.
+ */
+ private CompletableFuture<Void> completeScan(
+ UUID txId,
+ ReplicationGroupId replicaGrpId,
+ CompletableFuture<Long> scanIdFut,
+ ClusterNode recipientNode,
+ boolean intentionallyClose
+ ) {
+ return scanIdFut.thenCompose(scanId -> {
+ if (intentionallyClose) {
+ ScanCloseReplicaRequest scanCloseReplicaRequest =
tableMessagesFactory.scanCloseReplicaRequest()
+ .groupId(replicaGrpId)
+ .transactionId(txId)
+ .scanId(scanId)
+ .build();
+
+ return replicaSvc.invoke(recipientNode,
scanCloseReplicaRequest);
+ }
+
+ return nullCompletedFuture();
+ });
}
/**
@@ -1759,7 +1812,7 @@ public class InternalTableImpl implements InternalTable {
private final BiFunction<Long, Integer,
CompletableFuture<Collection<BinaryRow>>> retrieveBatch;
/** The closure will be invoked before the cursor closed. */
- BiFunction<Boolean, CompletableFuture<Void>, CompletableFuture<Void>>
onClose;
+ BiFunction<Boolean, CompletableFuture<Long>, CompletableFuture<Void>>
onClose;
/** True when the publisher has a subscriber, false otherwise. */
private final AtomicBoolean subscribed;
@@ -1773,7 +1826,7 @@ public class InternalTableImpl implements InternalTable {
*/
PartitionScanPublisher(
BiFunction<Long, Integer,
CompletableFuture<Collection<BinaryRow>>> retrieveBatch,
- BiFunction<Boolean, CompletableFuture<Void>,
CompletableFuture<Void>> onClose
+ BiFunction<Boolean, CompletableFuture<Long>,
CompletableFuture<Void>> onClose
) {
this.retrieveBatch = retrieveBatch;
this.onClose = onClose;
@@ -1864,21 +1917,20 @@ public class InternalTableImpl implements InternalTable
{
* After the method is called, a subscriber won't be received
updates from the publisher.
*
* @param t An exception which was thrown when entries were
retrieving from the cursor.
- * @param commit {@code True} to commit.
+ * @param intentionallyClose True if the subscription is closed
for the client side.
+ * @return Future to complete.
*/
- private void cancel(Throwable t, boolean commit) {
+ private void cancel(Throwable t, boolean intentionallyClose) {
if (!canceled.compareAndSet(false, true)) {
return;
}
- onClose.apply(commit, t == null ? nullCompletedFuture() :
failedFuture(t)).handle((ignore, th) -> {
+ onClose.apply(intentionallyClose, t == null ?
completedFuture(scanId) : failedFuture(t)).whenComplete((ignore, th) -> {
if (th != null) {
subscriber.onError(th);
} else {
subscriber.onComplete();
}
-
- return null;
});
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index e8a7c6b9d9..a94b718741 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -1871,10 +1871,9 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
private CompletableFuture<?> doRwScanCloseRequest(UUID targetTxId) {
return partitionReplicaListener.invoke(
- TABLE_MESSAGES_FACTORY.readWriteScanCloseReplicaRequest()
+ TABLE_MESSAGES_FACTORY.scanCloseReplicaRequest()
.groupId(grpId)
.transactionId(targetTxId)
- .enlistmentConsistencyToken(1L)
.scanId(1)
.build(),
localNode.id()