This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new bb2e4a8a05 IGNITE-23748 Lock LWM when executing RO operation on data
node (#4974)
bb2e4a8a05 is described below
commit bb2e4a8a0574442dff5baa497fb3e4fd49ff81dc
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Mon Dec 30 11:47:08 2024 +0400
IGNITE-23748 Lock LWM when executing RO operation on data node (#4974)
* When executing an operation in an RO transaction (explicit or implicit),
attempt to lock LWM on the data node where it's executed
* Direct RO operations (which happen in implicit transactions) only lock
LWM if they concern more than 1 key
* If lock attempt fails, throw an exception with a specific error code
* When cleaning up after an RO transaction had been finished, unlock LWM on
each node where such a cleanup happens
* For direct RO operations, unlock LWM right after the read has been done
on the data node
* When locking LWM for an explicit transaction, also register a hook to
unlock LWM when the transaction coordinator leaves
---
.../java/org/apache/ignite/lang/ErrorGroups.java | 6 +
.../ignite/client/fakes/FakeInternalTable.java | 4 +
.../replication/ReadOnlyReplicaRequest.java | 12 +
.../ReadOnlyScanRetrieveBatchReplicaRequest.java | 9 +-
modules/platforms/cpp/ignite/common/error_codes.h | 1 +
modules/platforms/cpp/ignite/odbc/common_types.cpp | 1 +
.../platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs | 3 +
.../ignite/internal/benchmark/SelectBenchmark.java | 26 +-
.../runner/app/ItIgniteNodeRestartTest.java | 3 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 3 +-
.../org/apache/ignite/internal/TestWrappers.java | 10 +
.../internal/sql/sqllogic/ItSqlLogicTest.java | 7 +-
...xDistributedTestSingleNodeNoCleanupMessage.java | 3 +-
.../table/ItTransactionPrimaryChangeTest.java | 9 +-
.../internal/table/ItTransactionRecoveryTest.java | 8 +-
.../ignite/internal/table/InternalTable.java | 45 ++-
.../internal/table/distributed/TableManager.java | 3 +-
.../replicator/PartitionReplicaListener.java | 133 ++++++++-
.../distributed/storage/InternalTableImpl.java | 12 +-
.../PartitionReplicaListenerIndexLockingTest.java | 4 +-
...itionReplicaListenerSortedIndexLockingTest.java | 4 +-
.../replication/PartitionReplicaListenerTest.java | 241 +++++++++++++---
.../storage/InternalTableEstimatedSizeTest.java | 4 +-
.../apache/ignite/distributed/ItTxTestCluster.java | 9 +-
.../table/impl/DummyInternalTableImpl.java | 3 +-
modules/transactions/build.gradle | 4 +
.../readonly/ItReadOnlyTxAndLowWatermarkTest.java | 307 +++++++++++++++++++++
.../FinishedTransactionBatchRequestHandler.java | 17 +-
.../tx/impl/RemotelyTriggeredResourceRegistry.java | 6 +-
.../internal/tx/impl/ResourceVacuumManager.java | 13 +-
...FinishedTransactionBatchRequestHandlerTest.java | 92 ++++++
31 files changed, 911 insertions(+), 91 deletions(-)
diff --git a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index 064e875447..91fa9cf2bb 100755
--- a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -358,6 +358,12 @@ public class ErrorGroups {
/** Failure due to a stale operation of a completed transaction is
detected. */
public static final int TX_STALE_OPERATION_ERR =
TX_ERR_GROUP.registerErrorCode((short) 14);
+
+ /**
+ * Error occurred when trying to execute an operation in a read-only
transaction on a node that has already destroyed data for
+ * read timestamp of the transaction.
+ */
+ public static final int TX_STALE_READ_ONLY_OPERATION_ERR =
TX_ERR_GROUP.registerErrorCode((short) 15);
}
/** Replicator error group. */
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
index 5d35fd6178..c0992d2624 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
@@ -147,6 +147,8 @@ public class FakeInternalTable implements InternalTable,
StreamerReceiverRunner
public CompletableFuture<BinaryRow> get(
BinaryRowEx keyRow,
HybridTimestamp readTimestamp,
+ @Nullable UUID transactionId,
+ @Nullable UUID coordinatorId,
ClusterNode recipientNode) {
return null;
}
@@ -179,6 +181,8 @@ public class FakeInternalTable implements InternalTable,
StreamerReceiverRunner
public CompletableFuture<List<BinaryRow>> getAll(
Collection<BinaryRowEx> keyRows,
HybridTimestamp readTimestamp,
+ @Nullable UUID transactionId,
+ @Nullable UUID coordinatorId,
ClusterNode recipientNode
) {
return null;
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyReplicaRequest.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyReplicaRequest.java
index 1acdbba89e..3e60ac9ac0 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyReplicaRequest.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyReplicaRequest.java
@@ -17,12 +17,24 @@
package org.apache.ignite.internal.partition.replicator.network.replication;
+import java.util.UUID;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.jetbrains.annotations.Nullable;
/**
* Read only replica request.
*/
public interface ReadOnlyReplicaRequest extends ReplicaRequest {
HybridTimestamp readTimestamp();
+
+ /** ID of the transaction in which this request is made. */
+ // TODO: remove @Nullable after IGNITE-24120 is sorted out.
+ @Nullable
+ UUID transactionId();
+
+ /** Inconsistent ID of coordinator of transaction to which this operation
belongs. */
+ // TODO: remove @Nullable after IGNITE-24120 is sorted out.
+ @Nullable
+ UUID coordinatorId();
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyScanRetrieveBatchReplicaRequest.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyScanRetrieveBatchReplicaRequest.java
index 48773454e7..4ccf80b0a3 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyScanRetrieveBatchReplicaRequest.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyScanRetrieveBatchReplicaRequest.java
@@ -27,12 +27,11 @@ import
org.apache.ignite.internal.replicator.message.TableAware;
*/
@Transferable(PartitionReplicationMessageGroup.RO_SCAN_RETRIEVE_BATCH_REPLICA_REQUEST)
public interface ReadOnlyScanRetrieveBatchReplicaRequest extends
ScanRetrieveBatchReplicaRequest, ReadOnlyReplicaRequest, TableAware {
+ // TODO: remove override after @Nullable is removed from the
super-interface method, see IGNITE-24120.
+ @Override
UUID transactionId();
- /**
- * Get the transaction coordinator inconsistent ID.
- *
- * @return Transaction coordinator inconsistent ID.
- */
+ // TODO: remove override after @Nullable is removed from the
super-interface method, see IGNITE-24120.
+ @Override
UUID coordinatorId();
}
diff --git a/modules/platforms/cpp/ignite/common/error_codes.h
b/modules/platforms/cpp/ignite/common/error_codes.h
index 438074e10c..7a3666080a 100644
--- a/modules/platforms/cpp/ignite/common/error_codes.h
+++ b/modules/platforms/cpp/ignite/common/error_codes.h
@@ -130,6 +130,7 @@ enum class code : underlying_t {
TX_PRIMARY_REPLICA_EXPIRED = 0x7000c,
TX_ALREADY_FINISHED = 0x7000d,
TX_STALE_OPERATION = 0x7000e,
+ TX_STALE_READ_ONLY_OPERATION = 0x7000f,
// Replicator group. Group code: 8
REPLICA_COMMON = 0x80001,
diff --git a/modules/platforms/cpp/ignite/odbc/common_types.cpp
b/modules/platforms/cpp/ignite/odbc/common_types.cpp
index a57bf7101d..b3c480b373 100644
--- a/modules/platforms/cpp/ignite/odbc/common_types.cpp
+++ b/modules/platforms/cpp/ignite/odbc/common_types.cpp
@@ -202,6 +202,7 @@ sql_state error_code_to_sql_state(error::code code) {
case error::code::TX_PRIMARY_REPLICA_EXPIRED:
case error::code::TX_ALREADY_FINISHED:
case error::code::TX_STALE_OPERATION:
+ case error::code::TX_STALE_READ_ONLY_OPERATION:
return sql_state::S25000_INVALID_TRANSACTION_STATE;
// Replicator group. Group code: 8
diff --git a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
index 7b5c75d863..9e7563c6c2 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
@@ -291,6 +291,9 @@ namespace Apache.Ignite
/// <summary> TxStaleOperation error. </summary>
public const int TxStaleOperation = (GroupCode << 16) | (14 &
0xFFFF);
+
+ /// <summary> TxStaleReadOnlyOperation error. </summary>
+ public const int TxStaleReadOnlyOperation = (GroupCode << 16) |
(15 & 0xFFFF);
}
/// <summary> Replicator errors. </summary>
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SelectBenchmark.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SelectBenchmark.java
index 52de6ec493..67b4253d3f 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SelectBenchmark.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SelectBenchmark.java
@@ -43,6 +43,10 @@ import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.sql.Statement;
import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.IgniteTransactions;
+import org.apache.ignite.tx.Transaction;
+import org.apache.ignite.tx.TransactionOptions;
+import org.jetbrains.annotations.Nullable;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
@@ -81,6 +85,10 @@ public class SelectBenchmark extends
AbstractMultiNodeBenchmark {
private KeyValueView<Tuple, Tuple> keyValueView;
+ private IgniteTransactions transactions;
+
+ private final TransactionOptions readOnlyTransactionOptions = new
TransactionOptions().readOnly(true);
+
@Param({"1", "2", "3"})
private int clusterSize;
@@ -101,6 +109,8 @@ public class SelectBenchmark extends
AbstractMultiNodeBenchmark {
keyValueView.put(null, Tuple.create().set("ycsb_key", id++), t);
}
+
+ transactions = publicIgnite.transactions();
}
/**
@@ -172,7 +182,21 @@ public class SelectBenchmark extends
AbstractMultiNodeBenchmark {
*/
@Benchmark
public void kvGet(Blackhole bh) {
- Tuple val = keyValueView.get(null, Tuple.create().set("ycsb_key",
random.nextInt(TABLE_SIZE)));
+ doKvGet(null, bh);
+ }
+
+ /**
+ * Benchmark for KV get in RO transaction via embedded client.
+ */
+ @Benchmark
+ public void kvGetInRoTransaction(Blackhole bh) {
+ transactions.runInTransaction(tx -> {
+ doKvGet(tx, bh);
+ }, readOnlyTransactionOptions);
+ }
+
+ private void doKvGet(@Nullable Transaction tx, Blackhole bh) {
+ Tuple val = keyValueView.get(tx, Tuple.create().set("ycsb_key",
random.nextInt(TABLE_SIZE)));
bh.consume(val);
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index da4f3d518a..9a221c5551 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -638,7 +638,8 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
clusterSvc.topologyService(),
clusterSvc.messagingService(),
transactionInflights,
- txManager
+ txManager,
+ lowWatermark
);
var registry = new MetaStorageRevisionListenerRegistry(metaStorageMgr);
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 79486d742d..3b560667e3 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -994,7 +994,8 @@ public class IgniteImpl implements Ignite {
clusterSvc.topologyService(),
messagingServiceReturningToStorageOperationsPool,
transactionInflights,
- txManager
+ txManager,
+ lowWatermark
);
StorageUpdateConfiguration storageUpdateConfiguration =
clusterConfigRegistry
diff --git
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/TestWrappers.java
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/TestWrappers.java
index 744adf056f..c9fb0379b0 100644
---
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/TestWrappers.java
+++
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/TestWrappers.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.internal.wrapper.Wrappers;
import org.apache.ignite.table.IgniteTables;
@@ -110,4 +111,13 @@ public class TestWrappers {
public static Transaction unwrapIgniteTransaction(Transaction tx) {
return Wrappers.unwrap(tx, Transaction.class);
}
+
+ /**
+ * Unwraps an {@link InternalTransaction} from an {@link Transaction}.
+ *
+ * @param tx Object to unwrap.
+ */
+ public static InternalTransaction unwrapInternalTransaction(Transaction
tx) {
+ return Wrappers.unwrap(tx, InternalTransaction.class);
+ }
}
diff --git
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/sqllogic/ItSqlLogicTest.java
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/sqllogic/ItSqlLogicTest.java
index 0aa757cb7b..bee186f5e3 100644
---
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/sqllogic/ItSqlLogicTest.java
+++
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/sqllogic/ItSqlLogicTest.java
@@ -54,6 +54,7 @@ import
org.apache.ignite.internal.testframework.TestIgnitionManager;
import org.apache.ignite.internal.testframework.WithSystemProperty;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.tx.impl.ResourceVacuumManager;
import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.sql.IgniteSql;
@@ -142,6 +143,8 @@ import org.junit.jupiter.api.extension.ExtendWith;
@Tag("sqllogic")
@ExtendWith({SystemPropertiesExtension.class, WorkDirectoryExtension.class})
@WithSystemProperty(key = "IMPLICIT_PK_ENABLED", value = "true")
+// The following is to make sure we unlock LWM on data nodes promptly so that
dropped tables are destroyed fast.
+@WithSystemProperty(key =
ResourceVacuumManager.RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY, value =
"1000")
@SqlLogicTestEnvironment(scriptsRoot = "src/integrationTest/sql/group1")
public class ItSqlLogicTest extends BaseIgniteAbstractTest {
private static final String SQL_LOGIC_TEST_INCLUDE_SLOW =
"SQL_LOGIC_TEST_INCLUDE_SLOW";
@@ -348,8 +351,8 @@ public class ItSqlLogicTest extends BaseIgniteAbstractTest {
.clusterName("cluster")
.clusterConfiguration("ignite {"
+ "metaStorage.idleSyncTimeInterval: " +
METASTORAGE_IDLE_SYNC_TIME_INTERVAL_MS + ",\n"
- + "gc.lowWatermark.dataAvailabilityTime: 1010,\n"
- + "gc.lowWatermark.updateInterval: 3000,\n"
+ + "gc.lowWatermark.dataAvailabilityTime: 5000,\n"
+ + "gc.lowWatermark.updateInterval: 1000,\n"
+ "metrics.exporters.logPush.exporterName: logPush,\n"
+ "metrics.exporters.logPush.period: 5000\n"
+ "}")
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
index 0266b7fde4..bcce6ff3e1 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
@@ -197,7 +197,8 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage
extends TxAbstractTes
clusterNodeResolver,
resourcesRegistry,
schemaRegistry,
- mock(IndexMetaStorage.class)
+ mock(IndexMetaStorage.class),
+ lowWatermark
) {
@Override
public CompletableFuture<ReplicaResult>
invoke(ReplicaRequest request, UUID senderId) {
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionPrimaryChangeTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionPrimaryChangeTest.java
index fededae649..5951a4a2b1 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionPrimaryChangeTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionPrimaryChangeTest.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.table;
import static java.util.stream.Collectors.toList;
import static
org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
-import static org.apache.ignite.internal.TestWrappers.unwrapIgniteTransaction;
+import static
org.apache.ignite.internal.TestWrappers.unwrapInternalTransaction;
import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
import static
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.executeUpdate;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -37,7 +37,6 @@ import org.apache.ignite.internal.TestWrappers;
import org.apache.ignite.internal.app.IgniteImpl;
import
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommand;
import org.apache.ignite.internal.replicator.TablePartitionId;
-import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl;
import org.apache.ignite.raft.jraft.rpc.WriteActionRequest;
import org.apache.ignite.table.RecordView;
import org.apache.ignite.table.Tuple;
@@ -125,7 +124,7 @@ public class ItTransactionPrimaryChangeTest extends
ClusterPerTestIntegrationTes
// Put some value into the table.
Transaction txPreload = txCrdNode.transactions().begin();
- log.info("Test: Preloading the data [tx={}].",
((ReadWriteTransactionImpl) unwrapIgniteTransaction(txPreload)).id());
+ log.info("Test: Preloading the data [tx={}].",
unwrapInternalTransaction(txPreload).id());
view.upsert(txPreload, Tuple.create().set("key", 1).set("val", "1"));
txPreload.commit();
@@ -171,7 +170,7 @@ public class ItTransactionPrimaryChangeTest extends
ClusterPerTestIntegrationTes
// Start a regular transaction that increments the value. It
should see the initially inserted value and its commit should
// succeed.
Transaction tx = txCrdNode.transactions().begin();
- log.info("Test: Started the regular transaction [txId={}].",
((ReadWriteTransactionImpl) unwrapIgniteTransaction(tx)).id());
+ log.info("Test: Started the regular transaction [txId={}].",
unwrapInternalTransaction(tx).id());
Tuple t = view.get(tx, Tuple.create().set("key", 1));
assertEquals("1", t.value(1));
@@ -179,7 +178,7 @@ public class ItTransactionPrimaryChangeTest extends
ClusterPerTestIntegrationTes
tx.commit();
- log.info("Test: Completed the regular transaction [txId={}].",
((ReadWriteTransactionImpl) unwrapIgniteTransaction(tx)).id());
+ log.info("Test: Completed the regular transaction [txId={}].",
unwrapInternalTransaction(tx).id());
} finally {
regularTxComplete.complete(null);
}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
index b25124576f..37e819f23e 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
@@ -1063,7 +1063,9 @@ public class ItTransactionRecoveryTest extends
ClusterPerTestIntegrationTest {
assertFalse(scanned.isDone());
- assertEquals(initialCursorsCount + 1,
targetNode.resourcesRegistry().resources().size());
+ // One for the cursor; for RO, there is also the transaction LWM lock.
+ int delta = tx.isReadOnly() ? 2 : 1;
+ assertEquals(initialCursorsCount + delta,
targetNode.resourcesRegistry().resources().size());
}
@Test
@@ -1110,8 +1112,8 @@ public class ItTransactionRecoveryTest extends
ClusterPerTestIntegrationTest {
scanSingleEntryAndLeaveCursorOpen(txExecNode,
unwrapTableImpl(txCrdNode.tables().table(TABLE_NAME)), roTx);
- // After the RO scan there should be one open cursor.
- assertEquals(1, txExecNode.resourcesRegistry().resources().size());
+ // After the RO scan there should be one open cursor plus transaction
LWM lock resource.
+ assertEquals(2, txExecNode.resourcesRegistry().resources().size());
roTx.commit();
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
index 835860e2a5..600b3edcd0 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
@@ -92,6 +92,7 @@ public interface InternalTable extends ManuallyCloseable {
*/
CompletableFuture<BinaryRow> get(BinaryRowEx keyRow, @Nullable
InternalTransaction tx);
+ // TODO: remove get() methods which do not accept InternalTransaction, see
IGNITE-24120.
/**
* Asynchronously gets a row with same key columns values as given one
from the table on a specific node for the proposed readTimestamp.
*
@@ -100,9 +101,29 @@ public interface InternalTable extends ManuallyCloseable {
* @param recipientNode Cluster node that will handle given get request.
* @return Future representing pending completion of the operation.
*/
+ default CompletableFuture<BinaryRow> get(
+ BinaryRowEx keyRow,
+ HybridTimestamp readTimestamp,
+ ClusterNode recipientNode
+ ) {
+ return get(keyRow, readTimestamp, null, null, recipientNode);
+ }
+
+ /**
+ * Asynchronously gets a row with same key columns values as given one
from the table on a specific node for the proposed readTimestamp.
+ *
+ * @param keyRow Row with key columns set.
+ * @param readTimestamp Read timestamp.
+ * @param transactionId Transaction ID (might be {@code null}).
+ * @param coordinatorId Ephemeral ID of the transaction coordinator.
+ * @param recipientNode Cluster node that will handle given get request.
+ * @return Future representing pending completion of the operation.
+ */
CompletableFuture<BinaryRow> get(
BinaryRowEx keyRow,
HybridTimestamp readTimestamp,
+ @Nullable UUID transactionId,
+ @Nullable UUID coordinatorId,
ClusterNode recipientNode
);
@@ -117,6 +138,7 @@ public interface InternalTable extends ManuallyCloseable {
*/
CompletableFuture<List<BinaryRow>> getAll(Collection<BinaryRowEx> keyRows,
@Nullable InternalTransaction tx);
+ // TODO: remove getAll() methods which do not accept InternalTransaction,
see IGNITE-24120.
/**
* Asynchronously get rows from the table for the proposed read timestamp.
*
@@ -127,13 +149,34 @@ public interface InternalTable extends ManuallyCloseable {
* guaranteed to be the same as the order of {@code keyRows}. If a
record does not exist, the
* element at the corresponding index of the resulting collection is
{@code null}.
*/
+ default CompletableFuture<List<BinaryRow>> getAll(
+ Collection<BinaryRowEx> keyRows,
+ HybridTimestamp readTimestamp,
+ ClusterNode recipientNode
+ ) {
+ return getAll(keyRows, readTimestamp, null, null, recipientNode);
+ }
+
+ /**
+ * Asynchronously get rows from the table for the proposed read timestamp.
+ *
+ * @param keyRows Rows with key columns set.
+ * @param readTimestamp Read timestamp.
+ * @param transactionId Transaction ID (might be {@code null}).
+ * @param coordinatorId Ephemeral ID of the transaction coordinator.
+ * @param recipientNode Cluster node that will handle given get request.
+ * @return Future that will return rows with all columns filled from the
table. The order of collection elements is
+ * guaranteed to be the same as the order of {@code keyRows}. If a
record does not exist, the
+ * element at the corresponding index of the resulting collection is
{@code null}.
+ */
CompletableFuture<List<BinaryRow>> getAll(
Collection<BinaryRowEx> keyRows,
HybridTimestamp readTimestamp,
+ @Nullable UUID transactionId,
+ @Nullable UUID coordinatorId,
ClusterNode recipientNode
);
-
/**
* Asynchronously inserts a row into the table if does not exist or
replaces the existed one.
*
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index b5762c6786..2bae27d71c 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -1364,7 +1364,8 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
topologyService,
remotelyTriggeredResourceRegistry,
schemaManager.schemaRegistry(tableId),
- indexMetaStorage
+ indexMetaStorage,
+ lowWatermark
);
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index e63c9f5070..ddd6d9370d 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -103,6 +103,7 @@ import
org.apache.ignite.internal.lang.IgniteSystemProperties;
import org.apache.ignite.internal.lang.IgniteTriFunction;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.lowwatermark.LowWatermark;
import org.apache.ignite.internal.network.ClusterNodeResolver;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import org.apache.ignite.internal.partition.replicator.network.TimedBinaryRow;
@@ -221,6 +222,7 @@ import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.Lazy;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.lang.ErrorGroups.Replicator;
+import org.apache.ignite.lang.ErrorGroups.Transactions;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.tx.TransactionException;
@@ -344,6 +346,8 @@ public class PartitionReplicaListener implements
ReplicaListener {
private final IndexMetaStorage indexMetaStorage;
+ private final LowWatermark lowWatermark;
+
private static final boolean SKIP_UPDATES =
IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK);
@@ -395,7 +399,8 @@ public class PartitionReplicaListener implements
ReplicaListener {
ClusterNodeResolver clusterNodeResolver,
RemotelyTriggeredResourceRegistry
remotelyTriggeredResourceRegistry,
SchemaRegistry schemaRegistry,
- IndexMetaStorage indexMetaStorage
+ IndexMetaStorage indexMetaStorage,
+ LowWatermark lowWatermark
) {
this.mvDataStorage = mvDataStorage;
this.raftCommandRunner = raftCommandRunner;
@@ -418,6 +423,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
this.remotelyTriggeredResourceRegistry =
remotelyTriggeredResourceRegistry;
this.schemaRegistry = schemaRegistry;
this.indexMetaStorage = indexMetaStorage;
+ this.lowWatermark = lowWatermark;
this.replicationGroupId = new TablePartitionId(tableId, partId);
@@ -534,7 +540,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
// Don't need to validate schema.
if (opTs == null) {
assert opTsIfDirectRo == null;
- return processOperationRequestWithTxRwCounter(senderId, request,
isPrimary, null, leaseStartTime);
+ return
processOperationRequestWithTxOperationManagementLogic(senderId, request,
isPrimary, null, leaseStartTime);
}
assert txTs != null && opTs.compareTo(txTs) >= 0 : "Invalid request
timestamps";
@@ -555,7 +561,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
};
return
schemaSyncService.waitForMetadataCompleteness(opTs).thenRun(validateClo).thenCompose(ignored
->
- processOperationRequestWithTxRwCounter(senderId, request,
isPrimary, opTsIfDirectRo, leaseStartTime));
+
processOperationRequestWithTxOperationManagementLogic(senderId, request,
isPrimary, opTsIfDirectRo, leaseStartTime));
}
private CompletableFuture<Long> processGetEstimatedSizeRequest() {
@@ -2196,9 +2202,6 @@ public class PartitionReplicaListener implements
ReplicaListener {
}))
);
}
- } catch (Exception e) {
- throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
- format("Unable to close cursor [tableId={}]", tableId()),
e);
}
}
@@ -3966,13 +3969,40 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
}
- private CompletableFuture<?> processOperationRequestWithTxRwCounter(
+ private CompletableFuture<?>
processOperationRequestWithTxOperationManagementLogic(
UUID senderId,
ReplicaRequest request,
@Nullable Boolean isPrimary,
@Nullable HybridTimestamp opStartTsIfDirectRo,
@Nullable Long leaseStartTime
) {
+ incrementRwOperationCountIfNeeded(request);
+
+ UUID txIdLockingLwm = tryToLockLwmIfNeeded(request,
opStartTsIfDirectRo);
+
+ try {
+ return processOperationRequest(senderId, request, isPrimary,
opStartTsIfDirectRo, leaseStartTime)
+ .whenComplete((unused, throwable) -> {
+ unlockLwmIfNeeded(txIdLockingLwm, request);
+ decrementRwOperationCountIfNeeded(request);
+ });
+ } catch (Throwable e) {
+ try {
+ unlockLwmIfNeeded(txIdLockingLwm, request);
+ } catch (Throwable unlockProblem) {
+ e.addSuppressed(unlockProblem);
+ }
+
+ try {
+ decrementRwOperationCountIfNeeded(request);
+ } catch (Throwable decrementProblem) {
+ e.addSuppressed(decrementProblem);
+ }
+ throw e;
+ }
+ }
+
+ private void incrementRwOperationCountIfNeeded(ReplicaRequest request) {
if (request instanceof ReadWriteReplicaRequest) {
int rwTxActiveCatalogVersion =
rwTxActiveCatalogVersion(catalogService, (ReadWriteReplicaRequest) request);
@@ -3982,15 +4012,88 @@ public class PartitionReplicaListener implements
ReplicaListener {
throw new
StaleTransactionOperationException(((ReadWriteReplicaRequest)
request).transactionId());
}
}
+ }
- return processOperationRequest(senderId, request, isPrimary,
opStartTsIfDirectRo, leaseStartTime)
- .whenComplete((unused, throwable) -> {
- if (request instanceof ReadWriteReplicaRequest) {
- txRwOperationTracker.decrementOperationCount(
- rwTxActiveCatalogVersion(catalogService,
(ReadWriteReplicaRequest) request)
- );
- }
- });
+ private void decrementRwOperationCountIfNeeded(ReplicaRequest request) {
+ if (request instanceof ReadWriteReplicaRequest) {
+ txRwOperationTracker.decrementOperationCount(
+ rwTxActiveCatalogVersion(catalogService,
(ReadWriteReplicaRequest) request)
+ );
+ }
+ }
+
+ /**
+ * Generates a fake transaction ID that will only be used to identify one
direct RO operation for purposes of locking and unlocking LWM.
+ * It should not be used as a replacement for a real transaction ID in
other contexts.
+ */
+ private static UUID newFakeTxId() {
+ return UUID.randomUUID();
+ }
+
+ /**
+ * For an operation of an RO transaction, attempts to lock LWM on current
node (either if the operation is not direct,
+ * or if it's direct and concerns more than one key), and does nothing for
other types of requests.
+ *
+ * <p>If lock attempt fails, throws an exception with a specific error
code ({@link Transactions#TX_STALE_READ_ONLY_OPERATION_ERR}).
+ *
+ * <p>For explicit RO transactions, the lock will be later released when
cleaning up after the RO transaction had been finished.
+ *
+ * <p>For direct RO operations (which happen in implicit RO transactions),
LWM will be unlocked right after the read had been done
+ * (see {@link #unlockLwmIfNeeded(UUID, ReplicaRequest)}).
+ *
+ * <p>Also, for explicit RO transactions, an automatic unlock is
registered on coordinator leave.
+ *
+ * @param request Request that is being handled.
+ * @param opStartTsIfDirectRo Timestamp of operation start if the
operation is a direct RO operation, {@code null} otherwise.
+ * @return Transaction ID (real for explicit transaction, fake for direct
RO operation) that shoiuld be used to lock LWM,
+ * or {@code null} if LWM doesn't need to be locked..
+ */
+ private @Nullable UUID tryToLockLwmIfNeeded(ReplicaRequest request,
@Nullable HybridTimestamp opStartTsIfDirectRo) {
+ UUID txIdToLockLwm;
+ HybridTimestamp tsToLockLwm = null;
+
+ if (request instanceof ReadOnlyDirectMultiRowReplicaRequest
+ && ((ReadOnlyDirectMultiRowReplicaRequest)
request).primaryKeys().size() > 1) {
+ assert opStartTsIfDirectRo != null;
+
+ txIdToLockLwm = newFakeTxId();
+ tsToLockLwm = opStartTsIfDirectRo;
+ } else if (request instanceof ReadOnlyReplicaRequest) {
+ ReadOnlyReplicaRequest readOnlyRequest = (ReadOnlyReplicaRequest)
request;
+ txIdToLockLwm = readOnlyRequest.transactionId();
+ tsToLockLwm = readOnlyRequest.readTimestamp();
+ } else {
+ txIdToLockLwm = null;
+ }
+
+ if (txIdToLockLwm != null) {
+ if (!lowWatermark.tryLock(txIdToLockLwm, tsToLockLwm)) {
+ throw new
TransactionException(Transactions.TX_STALE_READ_ONLY_OPERATION_ERR, "Read
timestamp is not available anymore.");
+ }
+
+ registerAutoLwmUnlockOnCoordinatorLeaveIfNeeded(request,
txIdToLockLwm);
+ }
+
+ return txIdToLockLwm;
+ }
+
+ private void
registerAutoLwmUnlockOnCoordinatorLeaveIfNeeded(ReplicaRequest request, UUID
txIdToLockLwm) {
+ if (request instanceof ReadOnlyReplicaRequest) {
+ ReadOnlyReplicaRequest readOnlyReplicaRequest =
(ReadOnlyReplicaRequest) request;
+
+ UUID coordinatorId = readOnlyReplicaRequest.coordinatorId();
+ // TODO: remove null check after IGNITE-24120 is sorted out.
+ if (coordinatorId != null) {
+ FullyQualifiedResourceId resourceId = new
FullyQualifiedResourceId(txIdToLockLwm, txIdToLockLwm);
+ remotelyTriggeredResourceRegistry.register(resourceId,
coordinatorId, () -> () -> lowWatermark.unlock(txIdToLockLwm));
+ }
+ }
+ }
+
+ private void unlockLwmIfNeeded(@Nullable UUID txIdToUnlockLwm,
ReplicaRequest request) {
+ if (txIdToUnlockLwm != null && request instanceof
ReadOnlyDirectReplicaRequest) {
+ lowWatermark.unlock(txIdToUnlockLwm);
+ }
}
private void prepareIndexBuilderTxRwOperationTracker() {
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 7f0b985cbc..be91eb7f07 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -888,7 +888,7 @@ public class InternalTableImpl implements InternalTable {
if (tx.isReadOnly()) {
return evaluateReadOnlyRecipientNode(partitionId(keyRow),
tx.readTimestamp())
- .thenCompose(recipientNode -> get(keyRow,
tx.readTimestamp(), recipientNode));
+ .thenCompose(recipientNode -> get(keyRow,
tx.readTimestamp(), tx.id(), tx.coordinatorId(), recipientNode));
}
return enlistInTx(
@@ -915,6 +915,8 @@ public class InternalTableImpl implements InternalTable {
public CompletableFuture<BinaryRow> get(
BinaryRowEx keyRow,
HybridTimestamp readTimestamp,
+ @Nullable UUID transactionId,
+ @Nullable UUID coordinatorId,
ClusterNode recipientNode
) {
int partId = partitionId(keyRow);
@@ -927,6 +929,8 @@ public class InternalTableImpl implements InternalTable {
.primaryKey(keyRow.tupleSlice())
.requestType(RO_GET)
.readTimestamp(readTimestamp)
+ .transactionId(transactionId)
+ .coordinatorId(coordinatorId)
.build()
);
}
@@ -981,7 +985,7 @@ public class InternalTableImpl implements InternalTable {
BinaryRowEx firstRow = keyRows.iterator().next();
return evaluateReadOnlyRecipientNode(partitionId(firstRow),
tx.readTimestamp())
- .thenCompose(recipientNode -> getAll(keyRows,
tx.readTimestamp(), recipientNode));
+ .thenCompose(recipientNode -> getAll(keyRows,
tx.readTimestamp(), tx.id(), tx.coordinatorId(), recipientNode));
}
return enlistInTx(
@@ -999,6 +1003,8 @@ public class InternalTableImpl implements InternalTable {
public CompletableFuture<List<BinaryRow>> getAll(
Collection<BinaryRowEx> keyRows,
HybridTimestamp readTimestamp,
+ @Nullable UUID transactionId,
+ @Nullable UUID coordinatorId,
ClusterNode recipientNode
) {
Int2ObjectMap<RowBatch> rowBatchByPartitionId =
toRowBatchByPartitionId(keyRows);
@@ -1013,6 +1019,8 @@ public class InternalTableImpl implements InternalTable {
.primaryKeys(serializeBinaryTuples(partitionRowBatch.getValue().requestedRows))
.requestType(RO_GET_ALL)
.readTimestamp(readTimestamp)
+ .transactionId(transactionId)
+ .coordinatorId(coordinatorId)
.build();
partitionRowBatch.getValue().resultFuture =
replicaSvc.invoke(recipientNode, request);
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
index 0fa46d6cda..e55485ad01 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
@@ -67,6 +67,7 @@ import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.hlc.TestClockService;
+import org.apache.ignite.internal.lowwatermark.TestLowWatermark;
import org.apache.ignite.internal.network.ClusterNodeResolver;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import
org.apache.ignite.internal.partition.replicator.network.replication.BinaryRowMessage;
@@ -278,7 +279,8 @@ public class PartitionReplicaListenerIndexLockingTest
extends IgniteAbstractTest
mock(ClusterNodeResolver.class),
new RemotelyTriggeredResourceRegistry(),
schemaManager,
- mock(IndexMetaStorage.class)
+ mock(IndexMetaStorage.class),
+ new TestLowWatermark()
);
kvMarshaller = new
ReflectionMarshallerFactory().create(schemaDescriptor, Integer.class,
Integer.class);
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerSortedIndexLockingTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerSortedIndexLockingTest.java
index df27d9ab5e..501f6d23de 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerSortedIndexLockingTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerSortedIndexLockingTest.java
@@ -68,6 +68,7 @@ import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.hlc.TestClockService;
+import org.apache.ignite.internal.lowwatermark.TestLowWatermark;
import org.apache.ignite.internal.network.ClusterNodeResolver;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import
org.apache.ignite.internal.partition.replicator.network.replication.RequestType;
@@ -245,7 +246,8 @@ public class PartitionReplicaListenerSortedIndexLockingTest
extends IgniteAbstra
mock(ClusterNodeResolver.class),
new RemotelyTriggeredResourceRegistry(),
schemaManager,
- mock(IndexMetaStorage.class)
+ mock(IndexMetaStorage.class),
+ new TestLowWatermark()
);
kvMarshaller = new
ReflectionMarshallerFactory().create(schemaDescriptor, Integer.class,
Integer.class);
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 9293aafa69..d5b09dec5a 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -78,8 +78,12 @@ import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -120,6 +124,7 @@ import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.hlc.TestClockService;
+import org.apache.ignite.internal.lowwatermark.LowWatermark;
import org.apache.ignite.internal.network.ClusterNodeImpl;
import org.apache.ignite.internal.network.ClusterNodeResolver;
import org.apache.ignite.internal.network.MessagingService;
@@ -139,6 +144,8 @@ import
org.apache.ignite.internal.partition.replicator.network.replication.Build
import
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyDirectMultiRowReplicaRequest;
import
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyDirectSingleRowReplicaRequest;
import
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyMultiRowPkReplicaRequest;
+import
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyReplicaRequest;
+import
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyScanRetrieveBatchReplicaRequest;
import
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlySingleRowPkReplicaRequest;
import
org.apache.ignite.internal.partition.replicator.network.replication.ReadWriteReplicaRequest;
import
org.apache.ignite.internal.partition.replicator.network.replication.ReadWriteSingleRowPkReplicaRequest;
@@ -155,7 +162,9 @@ import org.apache.ignite.internal.replicator.ReplicaResult;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.TablePartitionId;
import
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
+import
org.apache.ignite.internal.replicator.message.ReadOnlyDirectReplicaRequest;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowConverter;
@@ -172,6 +181,7 @@ import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.TestStorageUtils;
import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.storage.index.HashIndexStorage;
import org.apache.ignite.internal.storage.index.IndexRowImpl;
import org.apache.ignite.internal.storage.index.IndexStorage;
import org.apache.ignite.internal.storage.index.SortedIndexStorage;
@@ -244,6 +254,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.junitpioneer.jupiter.cartesian.ArgumentSets;
@@ -251,6 +262,7 @@ import org.junitpioneer.jupiter.cartesian.CartesianTest;
import org.junitpioneer.jupiter.cartesian.CartesianTest.Values;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
+import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.Spy;
@@ -388,6 +400,9 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
@Mock
private MessagingService messagingService;
+ @Mock
+ private LowWatermark lowWatermark;
+
@InjectConfiguration
private StorageUpdateConfiguration storageUpdateConfiguration;
@@ -425,6 +440,8 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
/** Partition replication listener to test. */
private PartitionReplicaListener partitionReplicaListener;
+ private HashIndexStorage pkIndexStorage;
+
/** Primary index. */
private Lazy<TableSchemaAwareIndexStorage> pkStorageSupplier;
@@ -513,14 +530,11 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
ColumnsExtractor row2Tuple =
BinaryRowConverter.keyExtractor(schemaDescriptor);
- pkStorageSupplier = new Lazy<>(() -> new TableSchemaAwareIndexStorage(
- pkIndexId,
- new TestHashIndexStorage(
- PART_ID,
- new StorageHashIndexDescriptor(pkIndexId, List.of(),
false)
- ),
- row2Tuple
+ pkIndexStorage = spy(new TestHashIndexStorage(
+ PART_ID,
+ new StorageHashIndexDescriptor(pkIndexId, List.of(), false)
));
+ pkStorageSupplier = new Lazy<>(() -> new
TableSchemaAwareIndexStorage(pkIndexId, pkIndexStorage, row2Tuple));
SortedIndexStorage indexStorage = new TestSortedIndexStorage(
PART_ID,
@@ -649,12 +663,15 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
new SingleClusterNodeResolver(localNode),
new RemotelyTriggeredResourceRegistry(),
new DummySchemaManagerImpl(schemaDescriptor,
schemaDescriptorVersion2),
- indexMetaStorage
+ indexMetaStorage,
+ lowWatermark
);
kvMarshaller = marshallerFor(schemaDescriptor);
kvMarshallerVersion2 = marshallerFor(schemaDescriptorVersion2);
+ when(lowWatermark.tryLock(any(), any())).thenReturn(true);
+
reset();
}
@@ -810,20 +827,46 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
}
private CompletableFuture<ReplicaResult> doReadOnlySingleGet(BinaryRow pk,
HybridTimestamp readTimestamp) {
- ReadOnlySingleRowPkReplicaRequest request =
TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest()
+ ReadOnlySingleRowPkReplicaRequest request =
readOnlySingleRowPkReplicaRequest(pk, readTimestamp);
+
+ return invokeListener(request);
+ }
+
+ private CompletableFuture<ReplicaResult> invokeListener(ReplicaRequest
request) {
+ return partitionReplicaListener.invoke(request, localNode.id());
+ }
+
+ private ReadOnlySingleRowPkReplicaRequest
readOnlySingleRowPkReplicaRequest(BinaryRow pk, HybridTimestamp readTimestamp) {
+ return readOnlySingleRowPkReplicaRequest(grpId, newTxId(),
localNode.id(), pk, readTimestamp);
+ }
+
+ private static ReadOnlySingleRowPkReplicaRequest
readOnlySingleRowPkReplicaRequest(
+ TablePartitionId grpId,
+ UUID txId,
+ UUID coordinatorId,
+ BinaryRow pk,
+ HybridTimestamp readTimestamp
+ ) {
+ return TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest()
.groupId(tablePartitionIdMessage(grpId))
.tableId(TABLE_ID)
.readTimestamp(readTimestamp)
.schemaVersion(pk.schemaVersion())
.primaryKey(pk.tupleSlice())
+ .transactionId(txId)
+ .coordinatorId(coordinatorId)
.requestType(RO_GET)
.build();
-
- return partitionReplicaListener.invoke(request, localNode.id());
}
private CompletableFuture<ReplicaResult>
doReadOnlyDirectSingleGet(BinaryRow pk) {
- ReadOnlyDirectSingleRowReplicaRequest request =
TABLE_MESSAGES_FACTORY.readOnlyDirectSingleRowReplicaRequest()
+ ReadOnlyDirectSingleRowReplicaRequest request =
readOnlyDirectSingleRowReplicaRequest(grpId, pk);
+
+ return invokeListener(request);
+ }
+
+ private static ReadOnlyDirectSingleRowReplicaRequest
readOnlyDirectSingleRowReplicaRequest(TablePartitionId grpId, BinaryRow pk) {
+ return TABLE_MESSAGES_FACTORY.readOnlyDirectSingleRowReplicaRequest()
.groupId(tablePartitionIdMessage(grpId))
.tableId(TABLE_ID)
.schemaVersion(pk.schemaVersion())
@@ -831,8 +874,6 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.requestType(RO_GET)
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.build();
-
- return partitionReplicaListener.invoke(request, localNode.id());
}
@Test
@@ -2083,19 +2124,28 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
private CompletableFuture<?> doRoScanRetrieveBatchRequest(UUID targetTxId,
HybridTimestamp readTimestamp) {
return partitionReplicaListener.invoke(
-
TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
- .groupId(tablePartitionIdMessage(grpId))
- .tableId(TABLE_ID)
- .transactionId(targetTxId)
- .scanId(1)
- .batchSize(100)
- .readTimestamp(readTimestamp)
- .coordinatorId(localNode.id())
- .build(),
+ readOnlyScanRetrieveBatchReplicaRequest(grpId, targetTxId,
readTimestamp, localNode.id()),
localNode.id()
);
}
+ private static ReadOnlyScanRetrieveBatchReplicaRequest
readOnlyScanRetrieveBatchReplicaRequest(
+ TablePartitionId grpId,
+ UUID txId,
+ HybridTimestamp readTimestamp,
+ UUID coordinatorId
+ ) {
+ return TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
+ .groupId(tablePartitionIdMessage(grpId))
+ .tableId(TABLE_ID)
+ .transactionId(txId)
+ .scanId(1)
+ .batchSize(100)
+ .readTimestamp(readTimestamp)
+ .coordinatorId(coordinatorId)
+ .build();
+ }
+
@ParameterizedTest
@MethodSource("singleRowWriteRequestTypes")
public void
singleRowWritesAreSuppliedWithRequiredCatalogVersion(RequestType requestType) {
@@ -2750,14 +2800,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
}
private CompletableFuture<BinaryRow> roGetAsync(BinaryRow row,
HybridTimestamp readTimestamp) {
- ReadOnlySingleRowPkReplicaRequest message =
TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest()
- .groupId(tablePartitionIdMessage(grpId))
- .tableId(TABLE_ID)
- .requestType(RO_GET)
- .readTimestamp(readTimestamp)
- .schemaVersion(row.schemaVersion())
- .primaryKey(row.tupleSlice())
- .build();
+ ReadOnlySingleRowPkReplicaRequest message =
readOnlySingleRowPkReplicaRequest(row, readTimestamp);
return partitionReplicaListener.invoke(message,
localNode.id()).thenApply(replicaResult -> (BinaryRow) replicaResult.result());
}
@@ -2771,20 +2814,41 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
}
private CompletableFuture<ReplicaResult>
doReadOnlyMultiGet(Collection<BinaryRow> rows, HybridTimestamp readTimestamp) {
- ReadOnlyMultiRowPkReplicaRequest request =
TABLE_MESSAGES_FACTORY.readOnlyMultiRowPkReplicaRequest()
+ ReadOnlyMultiRowPkReplicaRequest request =
readOnlyMultiRowPkReplicaRequest(grpId, newTxId(), localNode.id(), rows,
readTimestamp);
+
+ return invokeListener(request);
+ }
+
+ private static ReadOnlyMultiRowPkReplicaRequest
readOnlyMultiRowPkReplicaRequest(
+ TablePartitionId grpId,
+ UUID txId,
+ UUID coordinatorId,
+ Collection<BinaryRow> rows,
+ HybridTimestamp readTimestamp
+ ) {
+ return TABLE_MESSAGES_FACTORY.readOnlyMultiRowPkReplicaRequest()
.groupId(tablePartitionIdMessage(grpId))
.tableId(TABLE_ID)
.requestType(RO_GET_ALL)
.readTimestamp(readTimestamp)
.schemaVersion(rows.iterator().next().schemaVersion())
.primaryKeys(binaryRowsToBuffers(rows))
+ .transactionId(txId)
+ .coordinatorId(coordinatorId)
.build();
-
- return partitionReplicaListener.invoke(request, localNode.id());
}
private CompletableFuture<ReplicaResult>
doReadOnlyDirectMultiGet(Collection<BinaryRow> rows) {
- ReadOnlyDirectMultiRowReplicaRequest request =
TABLE_MESSAGES_FACTORY.readOnlyDirectMultiRowReplicaRequest()
+ ReadOnlyDirectMultiRowReplicaRequest request =
readOnlyDirectMultiRowReplicaRequest(grpId, rows);
+
+ return invokeListener(request);
+ }
+
+ private static ReadOnlyDirectMultiRowReplicaRequest
readOnlyDirectMultiRowReplicaRequest(
+ TablePartitionId grpId,
+ Collection<BinaryRow> rows
+ ) {
+ return TABLE_MESSAGES_FACTORY.readOnlyDirectMultiRowReplicaRequest()
.groupId(tablePartitionIdMessage(grpId))
.tableId(TABLE_ID)
.requestType(RO_GET_ALL)
@@ -2792,8 +2856,6 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.primaryKeys(binaryRowsToBuffers(rows))
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
.build();
-
- return partitionReplicaListener.invoke(request, localNode.id());
}
private void cleanup(UUID txId) {
@@ -3113,7 +3175,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.rowIds(List.of())
.build();
- return partitionReplicaListener.invoke(request, localNode.id());
+ return invokeListener(request);
}
private void completeBuiltIndexes(IndexStorage... indexStorages) {
@@ -3161,4 +3223,107 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.txStateMeta(transactionMetaMessage)
.build();
}
+
+ @ParameterizedTest
+ @EnumSource(NonDirectReadOnlyRequestFactory.class)
+ void
nonDirectReadOnlyRequestsLockLwmAndDoNotUnlockIt(NonDirectReadOnlyRequestFactory
requestFactory) {
+ RequestContext context = new RequestContext(grpId, newTxId(), clock,
nextBinaryKey(), localNode.id());
+ ReadOnlyReplicaRequest request = requestFactory.create(context);
+
+ assertThat(invokeListener(request), willCompleteSuccessfully());
+
+ verify(lowWatermark).tryLock(any(), eq(request.readTimestamp()));
+ verify(lowWatermark, never()).unlock(any());
+ }
+
+ @Test
+ void directReadOnlySingleRowRequestDoesNotLockLwm() {
+ ReadOnlyDirectReplicaRequest request =
readOnlyDirectSingleRowReplicaRequest(grpId, nextBinaryKey());
+
+ assertThat(invokeListener(request), willCompleteSuccessfully());
+
+ verify(lowWatermark, never()).tryLock(any(), any());
+ }
+
+ @Test
+ void directReadOnlyMultiRowRequestWithOneKeyDoesNotLockLwm() {
+ ReadOnlyDirectReplicaRequest request =
readOnlyDirectMultiRowReplicaRequest(grpId, List.of(nextBinaryKey()));
+
+ assertThat(invokeListener(request), willCompleteSuccessfully());
+
+ verify(lowWatermark, never()).tryLock(any(), any());
+ }
+
+ @Test
+ void directReadOnlyMultiRowRequestWithMultipleKeysLockAndUnlockLwm() {
+ ReadOnlyDirectReplicaRequest request =
readOnlyDirectMultiRowReplicaRequest(grpId, List.of(nextBinaryKey(),
nextBinaryKey()));
+
+ assertThat(invokeListener(request), willCompleteSuccessfully());
+
+ InOrder orderVerifier = inOrder(lowWatermark, pkIndexStorage);
+
+ ArgumentCaptor<UUID> lockTxIdCaptor =
ArgumentCaptor.forClass(UUID.class);
+ ArgumentCaptor<UUID> unlockTxIdCaptor =
ArgumentCaptor.forClass(UUID.class);
+
+ orderVerifier.verify(lowWatermark).tryLock(lockTxIdCaptor.capture(),
any());
+ orderVerifier.verify(pkIndexStorage).get(any());
+ orderVerifier.verify(lowWatermark).unlock(unlockTxIdCaptor.capture());
+
+ assertThat(unlockTxIdCaptor.getValue(), is(lockTxIdCaptor.getValue()));
+ }
+
+ @Test
+ void
directReadOnlyMultiRowRequestWithMultipleKeysUnlockLwmEvenWhenExceptionHappens()
{
+ doThrow(new RuntimeException("Oops")).when(pkIndexStorage).get(any());
+
+ ReadOnlyDirectReplicaRequest request =
readOnlyDirectMultiRowReplicaRequest(grpId, List.of(nextBinaryKey(),
nextBinaryKey()));
+
+ assertThat(invokeListener(request),
willThrowFast(RuntimeException.class, "Oops"));
+
+ verify(lowWatermark).unlock(any());
+ }
+
+ private static class RequestContext {
+ private final TablePartitionId groupId;
+ private final UUID txId;
+ private final HybridClock clock;
+ private final BinaryRow key;
+ private final UUID coordinatorId;
+
+ private RequestContext(TablePartitionId groupId, UUID txId,
HybridClock clock, BinaryRow key, UUID coordinatorId) {
+ this.groupId = groupId;
+ this.txId = txId;
+ this.clock = clock;
+ this.key = key;
+ this.coordinatorId = coordinatorId;
+ }
+ }
+
+ private enum NonDirectReadOnlyRequestFactory {
+ SINGLE_GET(context -> readOnlySingleRowPkReplicaRequest(
+ context.groupId,
+ context.txId,
+ context.coordinatorId,
+ context.key,
+ context.clock.now())
+ ),
+ MULTI_GET(context -> readOnlyMultiRowPkReplicaRequest(
+ context.groupId,
+ context.txId,
+ context.coordinatorId,
+ singletonList(context.key),
+ context.clock.now()
+ )),
+ SCAN(context ->
readOnlyScanRetrieveBatchReplicaRequest(context.groupId, context.txId,
context.clock.now(), context.coordinatorId));
+
+ private final Function<RequestContext, ReadOnlyReplicaRequest>
requestFactory;
+
+ NonDirectReadOnlyRequestFactory(Function<RequestContext,
ReadOnlyReplicaRequest> requestFactory) {
+ this.requestFactory = requestFactory;
+ }
+
+ ReadOnlyReplicaRequest create(RequestContext context) {
+ return requestFactory.apply(context);
+ }
+ }
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
index 617bf30fa2..a893f75151 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
@@ -60,6 +60,7 @@ import org.apache.ignite.internal.hlc.ClockServiceImpl;
import org.apache.ignite.internal.hlc.ClockWaiter;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lowwatermark.TestLowWatermark;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
@@ -315,7 +316,8 @@ public class InternalTableEstimatedSizeTest extends
BaseIgniteAbstractTest {
clusterNodeResolver,
remotelyTriggeredResourceRegistry,
schemaRegistry,
- indexMetaStorage
+ indexMetaStorage,
+ new TestLowWatermark()
);
}
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index 6a80b56d74..1c052875c7 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -518,7 +518,8 @@ public class ItTxTestCluster {
clusterService.topologyService(),
clusterService.messagingService(),
transactionInflights,
- txMgr
+ txMgr,
+ lowWatermark
);
assertThat(txMgr.startAsync(new ComponentContext()),
willCompleteSuccessfully());
@@ -868,7 +869,8 @@ public class ItTxTestCluster {
clusterNodeResolver,
resourcesRegistry,
schemaRegistry,
- mock(IndexMetaStorage.class)
+ mock(IndexMetaStorage.class),
+ lowWatermark
);
}
@@ -1081,7 +1083,8 @@ public class ItTxTestCluster {
client.topologyService(),
client.messagingService(),
clientTransactionInflights,
- clientTxManager
+ clientTxManager,
+ lowWatermark
);
clientTxStateResolver = new TransactionStateResolver(
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 403d19dbcf..23f2ea84c9 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -461,7 +461,8 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
mock(ClusterNodeResolver.class),
resourcesRegistry,
schemaManager,
- mock(IndexMetaStorage.class)
+ mock(IndexMetaStorage.class),
+ new TestLowWatermark()
);
partitionListener = new PartitionListener(
diff --git a/modules/transactions/build.gradle
b/modules/transactions/build.gradle
index a3b38079c4..8798cdeee0 100644
--- a/modules/transactions/build.gradle
+++ b/modules/transactions/build.gradle
@@ -56,6 +56,10 @@ dependencies {
testImplementation libs.hamcrest.core
integrationTestImplementation project(':ignite-api')
+ integrationTestImplementation project(':ignite-low-watermark')
+ integrationTestImplementation project(':ignite-schema')
+ integrationTestImplementation project(':ignite-configuration')
+ integrationTestImplementation project(':ignite-configuration-root')
integrationTestImplementation(testFixtures(project(':ignite-core')))
integrationTestImplementation(testFixtures(project(':ignite-transactions')))
integrationTestImplementation(testFixtures(project(':ignite-sql-engine')))
diff --git
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/readonly/ItReadOnlyTxAndLowWatermarkTest.java
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/readonly/ItReadOnlyTxAndLowWatermarkTest.java
new file mode 100644
index 0000000000..df078a35b6
--- /dev/null
+++
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/readonly/ItReadOnlyTxAndLowWatermarkTest.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.readonly;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static
org.apache.ignite.internal.TestWrappers.unwrapInternalTransaction;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.hasToString;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.isA;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assumptions.assumeFalse;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.IntStream;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.InitParametersBuilder;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lowwatermark.LowWatermarkImpl;
+import
org.apache.ignite.internal.schema.configuration.GcExtensionConfiguration;
+import
org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
+import org.apache.ignite.internal.testframework.WithSystemProperty;
+import org.apache.ignite.internal.tx.impl.ResourceVacuumManager;
+import org.apache.ignite.lang.ErrorGroups.Transactions;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlException;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.mapper.Mapper;
+import org.apache.ignite.table.partition.PartitionManager;
+import org.apache.ignite.tx.Transaction;
+import org.apache.ignite.tx.TransactionException;
+import org.apache.ignite.tx.TransactionOptions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.junitpioneer.jupiter.cartesian.CartesianTest;
+import org.junitpioneer.jupiter.cartesian.CartesianTest.Enum;
+import org.junitpioneer.jupiter.cartesian.CartesianTest.Values;
+
+class ItReadOnlyTxAndLowWatermarkTest extends ClusterPerTestIntegrationTest {
+ private static final String TABLE_NAME = "TEST_TABLE";
+
+ // 100 keys to make sure that at least one key ends up on every of 2 nodes.
+ private static final int KEY_COUNT = 100;
+
+ private static final long SHORT_DATA_AVAILABILITY_TIME_MS = 1000;
+
+ @Override
+ protected int initialNodes() {
+ // 2 nodes to have a non-coordinator node in cluster.
+ return 2;
+ }
+
+ @Override
+ protected int[] cmgMetastoreNodes() {
+ return new int[] {0};
+ }
+
+ @Override
+ protected void customizeInitParameters(InitParametersBuilder builder) {
+ builder.clusterConfiguration("ignite.gc.lowWatermark: {\n"
+ // Update frequently.
+ + " updateInterval: 100\n"
+ + "}");
+ }
+
+ @BeforeEach
+ void createTable() {
+ node(0).sql().executeScript("CREATE TABLE " + TABLE_NAME + " (ID INT
PRIMARY KEY, VAL VARCHAR)");
+ }
+
+ @ParameterizedTest
+ @EnumSource(TransactionalReader.class)
+ void
roTransactionNoticesTupleVersionsMissingDueToGcOnDataNodes(TransactionalReader
reader) throws Exception {
+ // TODO: remove the assumption when IGNITE-24119 is fixed.
+ assumeFalse(reader == TransactionalReader.MULTI_GET);
+
+ updateDataAvailabilityTimeToShortPeriod();
+
+ Ignite coordinator = node(0);
+ KeyValueView<Integer, String> kvView = kvView(coordinator);
+
+ insertOriginalValuesToBothNodes(KEY_COUNT, kvView);
+
+ Transaction roTx = coordinator.transactions().begin(new
TransactionOptions().readOnly(true));
+
+ updateToNewValues(KEY_COUNT, kvView);
+
+ waitTillLwmTriesToRaiseAndEraseOverrittenVersions();
+
+ IgniteException ex = assertThrows(IgniteException.class, () ->
reader.read(coordinator, roTx));
+ assertThat(ex, isA(reader.sql() ? SqlException.class :
TransactionException.class));
+ assertThat(ex, hasToString(containsString("Read timestamp is not
available anymore.")));
+ assertThat("Wrong error code: " + ex.codeAsString(), ex.code(),
is(Transactions.TX_STALE_READ_ONLY_OPERATION_ERR));
+ }
+
+ private void updateDataAvailabilityTimeToShortPeriod() {
+ IgniteImpl igniteImpl = unwrapIgniteImpl(node(0));
+
+ LowWatermarkConfiguration lwmConfig = igniteImpl.clusterConfiguration()
+ .getConfiguration(GcExtensionConfiguration.KEY)
+ .gc()
+ .lowWatermark();
+
+
assertThat(lwmConfig.dataAvailabilityTime().update(SHORT_DATA_AVAILABILITY_TIME_MS),
willCompleteSuccessfully());
+ }
+
+ private static KeyValueView<Integer, String> kvView(Ignite coordinator) {
+ return
coordinator.tables().table(TABLE_NAME).keyValueView(Integer.class,
String.class);
+ }
+
+ private void insertOriginalValuesToBothNodes(int keyCount,
KeyValueView<Integer, String> kvView) throws Exception {
+ PartitionManager partitionManager =
node(0).tables().table(TABLE_NAME).partitionManager();
+ Set<String> primaryNames = new HashSet<>();
+
+ for (int i = 0; i < keyCount; i++) {
+ kvView.put(null, i, "original-" + i);
+
+ if (primaryNames.size() < 2) {
+ ClusterNode primaryReplica = primaryReplicaFor(i,
partitionManager);
+ primaryNames.add(primaryReplica.name());
+ }
+ }
+
+ assertThat("Expecting both nodes to host inserted keys", primaryNames,
hasSize(2));
+ }
+
+ private static ClusterNode primaryReplicaFor(int key, PartitionManager
partitionManager) throws Exception {
+ CompletableFuture<ClusterNode> primaryReplicaFuture =
partitionManager.partitionAsync(key, Mapper.of(Integer.class))
+ .thenCompose(partitionManager::primaryReplicaAsync);
+
+ return primaryReplicaFuture.get(10, SECONDS);
+ }
+
+ private static void updateToNewValues(int keyCount, KeyValueView<Integer,
String> kvView) {
+ for (int i = 0; i < keyCount; i++) {
+ kvView.put(null, i, "updated-" + i);
+ }
+ }
+
+ private static void waitTillLwmTriesToRaiseAndEraseOverrittenVersions()
throws InterruptedException {
+ Thread.sleep(2 * SHORT_DATA_AVAILABILITY_TIME_MS);
+ }
+
+ @CartesianTest
+ @WithSystemProperty(key =
ResourceVacuumManager.RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY, value =
"100")
+ void lwmIsAllowedToBeRaisedOnDataNodesAfterRoTransactionFinish(
+ @Enum(TransactionalReader.class) TransactionalReader reader,
+ @Values(booleans = {true, false}) boolean commit
+ ) throws Exception {
+ // TODO: remove the assumption when IGNITE-24119 is fixed.
+ assumeFalse(reader == TransactionalReader.MULTI_GET);
+
+ Ignite coordinator = node(0);
+ KeyValueView<Integer, String> kvView = kvView(coordinator);
+
+ insertOriginalValuesToBothNodes(KEY_COUNT, kvView);
+
+ Transaction roTx = coordinator.transactions().begin(new
TransactionOptions().readOnly(true));
+
+ reader.read(coordinator, roTx);
+
+ if (commit) {
+ roTx.commit();
+ } else {
+ roTx.rollback();
+ }
+
+ updateDataAvailabilityTimeToShortPeriod();
+
+ HybridTimestamp readTimestamp =
unwrapInternalTransaction(roTx).readTimestamp();
+
+ assertLwmGrowsAbove(readTimestamp, node(0));
+ assertLwmGrowsAbove(readTimestamp, node(1));
+ }
+
+ private static void assertLwmGrowsAbove(HybridTimestamp ts, Ignite node)
throws InterruptedException {
+ LowWatermarkImpl lowWatermark = unwrapIgniteImpl(node).lowWatermark();
+
+ assertTrue(
+ waitForCondition(
+ () -> {
+ HybridTimestamp lwm =
lowWatermark.getLowWatermark();
+ return lwm != null && lwm.compareTo(ts) > 0;
+ },
+ SECONDS.toMillis(10)
+ ),
+ "Did not see low watermark going up in time"
+ );
+ }
+
+ @ParameterizedTest
+ @EnumSource(TransactionalReader.class)
+ @WithSystemProperty(key =
ResourceVacuumManager.RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY, value =
"100")
+ void
nonFinishedRoTransactionsOfCoordinatorsThatLeftDontHoldLwm(TransactionalReader
reader) throws Exception {
+ // TODO: remove the assumption when IGNITE-24119 is fixed.
+ assumeFalse(reader == TransactionalReader.MULTI_GET);
+
+ Ignite coordinator = node(1);
+ KeyValueView<Integer, String> kvView = kvView(coordinator);
+
+ insertOriginalValuesToBothNodes(KEY_COUNT, kvView);
+
+ Transaction roTx = coordinator.transactions().begin(new
TransactionOptions().readOnly(true));
+
+ // Do actual read(s) in the transaction.
+ reader.read(coordinator, roTx);
+
+ // Stop the coordinator.
+ stopNode(1);
+
+ updateDataAvailabilityTimeToShortPeriod();
+
+ HybridTimestamp readTimestamp =
unwrapInternalTransaction(roTx).readTimestamp();
+
+ assertLwmGrowsAbove(readTimestamp, node(0));
+ }
+
+ private enum TransactionalReader {
+ SINGLE_GETS {
+ @Override
+ void read(Ignite ignite, Transaction transaction) {
+ KeyValueView<Integer, String> kvView = kvView(ignite);
+ for (int i = 0; i < KEY_COUNT; i++) {
+ kvView.get(transaction, i);
+ }
+ }
+
+ @Override
+ boolean sql() {
+ return false;
+ }
+ },
+ MULTI_GET {
+ @Override
+ void read(Ignite ignite, Transaction transaction) {
+ List<Integer> keys = IntStream.range(0,
KEY_COUNT).boxed().collect(toList());
+ kvView(ignite).getAll(transaction, keys);
+ }
+
+ @Override
+ boolean sql() {
+ return false;
+ }
+ },
+ SELECT_ALL {
+ @Override
+ void read(Ignite ignite, Transaction transaction) {
+ try (ResultSet<SqlRow> resultSet =
ignite.sql().execute(transaction, "SELECT * FROM " + TABLE_NAME)) {
+ resultSet.forEachRemaining(item -> {});
+ }
+ }
+
+ @Override
+ boolean sql() {
+ return true;
+ }
+ },
+ SELECT_COUNT {
+ @Override
+ void read(Ignite ignite, Transaction transaction) {
+ try (ResultSet<SqlRow> resultSet =
ignite.sql().execute(transaction, "SELECT COUNT(*) FROM " + TABLE_NAME)) {
+ resultSet.forEachRemaining(item -> {});
+ }
+ }
+
+ @Override
+ boolean sql() {
+ return true;
+ }
+ };
+
+ abstract void read(Ignite ignite, Transaction transaction);
+
+ abstract boolean sql();
+ }
+}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedTransactionBatchRequestHandler.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedTransactionBatchRequestHandler.java
index 1676be8d35..6e26562493 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedTransactionBatchRequestHandler.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedTransactionBatchRequestHandler.java
@@ -17,13 +17,15 @@
package org.apache.ignite.internal.tx.impl;
+import java.util.UUID;
import java.util.concurrent.Executor;
+import org.apache.ignite.internal.lowwatermark.LowWatermark;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.tx.message.FinishedTransactionsBatchMessage;
import org.apache.ignite.internal.tx.message.TxMessageGroup;
/**
- * Handles Cursor Cleanup request ({@link FinishedTransactionsBatchMessage}).
+ * Handles Transaction cleanup request ({@link
FinishedTransactionsBatchMessage}).
*/
public class FinishedTransactionBatchRequestHandler {
/** Messaging service. */
@@ -32,6 +34,8 @@ public class FinishedTransactionBatchRequestHandler {
/** Resources registry. */
private final RemotelyTriggeredResourceRegistry resourcesRegistry;
+ private final LowWatermark lowWatermark;
+
private final Executor asyncExecutor;
/**
@@ -39,15 +43,18 @@ public class FinishedTransactionBatchRequestHandler {
*
* @param messagingService Messaging service.
* @param resourcesRegistry Resources registry.
+ * @param lowWatermark Low watermark.
* @param asyncExecutor Executor to run cleanup commands.
*/
public FinishedTransactionBatchRequestHandler(
MessagingService messagingService,
RemotelyTriggeredResourceRegistry resourcesRegistry,
+ LowWatermark lowWatermark,
Executor asyncExecutor
) {
this.messagingService = messagingService;
this.resourcesRegistry = resourcesRegistry;
+ this.lowWatermark = lowWatermark;
this.asyncExecutor = asyncExecutor;
}
@@ -63,7 +70,13 @@ public class FinishedTransactionBatchRequestHandler {
}
private void
processFinishedTransactionsBatchMessage(FinishedTransactionsBatchMessage
closeCursorsMessage) {
- asyncExecutor.execute(() ->
closeCursorsMessage.transactions().forEach(resourcesRegistry::close));
+ asyncExecutor.execute(() ->
closeCursorsMessage.transactions().forEach(this::cleanUpForTransaction));
+ }
+
+ private void cleanUpForTransaction(UUID transactionId) {
+ resourcesRegistry.close(transactionId);
+
+ lowWatermark.unlock(transactionId);
}
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/RemotelyTriggeredResourceRegistry.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/RemotelyTriggeredResourceRegistry.java
index d8c5462371..823434b374 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/RemotelyTriggeredResourceRegistry.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/RemotelyTriggeredResourceRegistry.java
@@ -82,7 +82,7 @@ public class RemotelyTriggeredResourceRegistry {
resources.remove(resourceId);
-
remoteRemoteHostResource(remotelyTriggeredResource.remoteHostId(), resourceId);
+
removeRemoteHostResource(remotelyTriggeredResource.remoteHostId(), resourceId);
} catch (Exception e) {
throw new ResourceCloseException(resourceId,
remotelyTriggeredResource.remoteHostId(), e);
}
@@ -131,7 +131,7 @@ public class RemotelyTriggeredResourceRegistry {
for (FullyQualifiedResourceId resourceId : closedResources) {
resourcesWithContext.remove(resourceId);
- remoteRemoteHostResource(remoteHostId, resourceId);
+ removeRemoteHostResource(remoteHostId, resourceId);
}
}
@@ -169,7 +169,7 @@ public class RemotelyTriggeredResourceRegistry {
});
}
- private void remoteRemoteHostResource(UUID remoteHostId,
FullyQualifiedResourceId resourceId) {
+ private void removeRemoteHostResource(UUID remoteHostId,
FullyQualifiedResourceId resourceId) {
remoteHostsToResources.computeIfPresent(remoteHostId, (k, v) -> {
v.remove(resourceId);
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ResourceVacuumManager.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ResourceVacuumManager.java
index 589b73147f..81f33bafef 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ResourceVacuumManager.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ResourceVacuumManager.java
@@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.lang.IgniteSystemProperties;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.lowwatermark.LowWatermark;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.network.ClusterNodeResolver;
@@ -84,6 +85,7 @@ public class ResourceVacuumManager implements IgniteComponent
{
* @param messagingService Messaging service.
* @param transactionInflights Transaction inflights.
* @param txManager Transactional manager.
+ * @param lowWatermark Low watermark.
*/
public ResourceVacuumManager(
String nodeName,
@@ -91,7 +93,8 @@ public class ResourceVacuumManager implements IgniteComponent
{
TopologyService topologyService,
MessagingService messagingService,
TransactionInflights transactionInflights,
- TxManager txManager
+ TxManager txManager,
+ LowWatermark lowWatermark
) {
this.resourceRegistry = resourceRegistry;
this.clusterNodeResolver = topologyService;
@@ -104,8 +107,12 @@ public class ResourceVacuumManager implements
IgniteComponent {
messagingService,
transactionInflights
);
- this.finishedTransactionBatchRequestHandler =
- new FinishedTransactionBatchRequestHandler(messagingService,
resourceRegistry, resourceVacuumExecutor);
+ this.finishedTransactionBatchRequestHandler = new
FinishedTransactionBatchRequestHandler(
+ messagingService,
+ resourceRegistry,
+ lowWatermark,
+ resourceVacuumExecutor
+ );
this.txManager = txManager;
}
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/FinishedTransactionBatchRequestHandlerTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/FinishedTransactionBatchRequestHandlerTest.java
new file mode 100644
index 0000000000..38359e4c6c
--- /dev/null
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/FinishedTransactionBatchRequestHandlerTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.impl;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ForkJoinPool;
+import org.apache.ignite.internal.lowwatermark.LowWatermark;
+import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.network.NetworkMessageHandler;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.tx.message.FinishedTransactionsBatchMessage;
+import org.apache.ignite.internal.tx.message.TxMessageGroup;
+import org.apache.ignite.internal.tx.message.TxMessagesFactory;
+import org.apache.ignite.network.ClusterNode;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class FinishedTransactionBatchRequestHandlerTest extends
BaseIgniteAbstractTest {
+ @Mock
+ private MessagingService messagingService;
+
+ @Mock
+ private RemotelyTriggeredResourceRegistry resourceRegistry;
+
+ @Mock
+ private LowWatermark lowWatermark;
+
+ private FinishedTransactionBatchRequestHandler requestHandler;
+
+ private NetworkMessageHandler networkHandler;
+
+ @BeforeEach
+ void createAndStartHandler() {
+ requestHandler = new FinishedTransactionBatchRequestHandler(
+ messagingService,
+ resourceRegistry,
+ lowWatermark,
+ ForkJoinPool.commonPool()
+ );
+ requestHandler.start();
+
+ ArgumentCaptor<NetworkMessageHandler> handlerCaptor =
ArgumentCaptor.forClass(NetworkMessageHandler.class);
+ verify(messagingService).addMessageHandler(eq(TxMessageGroup.class),
handlerCaptor.capture());
+
+ networkHandler = handlerCaptor.getValue();
+ assertThat(networkHandler, is(notNullValue()));
+ }
+
+ @Test
+ void unlocksLwm() {
+ UUID txId1 = new UUID(1, 1);
+ UUID txId2 = new UUID(2, 2);
+
+ FinishedTransactionsBatchMessage message = new
TxMessagesFactory().finishedTransactionsBatchMessage()
+ .transactions(List.of(txId1, txId2))
+ .build();
+
+ networkHandler.onReceived(message, mock(ClusterNode.class), null);
+
+ verify(lowWatermark, timeout(10_000)).unlock(txId1);
+ verify(lowWatermark, timeout(10_000)).unlock(txId2);
+ }
+}