This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new c25f9fdac1 IGNITE-22245 Force switch write intents (#3765)
c25f9fdac1 is described below
commit c25f9fdac1668c46e27a6ff2190f58948d0f75ef
Author: Cyrill <[email protected]>
AuthorDate: Thu May 16 10:50:27 2024 +0300
IGNITE-22245 Force switch write intents (#3765)
---
.../ignite/raft/jraft/core/FSMCallerImpl.java | 2 +-
.../internal/table/ItTxResourcesVacuumTest.java | 61 ++++++++++------------
.../internal/table/distributed/TableManager.java | 9 +---
.../replicator/PartitionReplicaListener.java | 23 ++++++--
.../internal/tx/impl/TxCleanupRequestSender.java | 4 +-
5 files changed, 52 insertions(+), 47 deletions(-)
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
index c8846ca134..a157eee51c 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
@@ -188,7 +188,7 @@ public class FSMCallerImpl implements FSMCaller {
}
this.error = new RaftException(ErrorType.ERROR_TYPE_NONE);
this.msgFactory = opts.getRaftMessagesFactory();
- LOG.info("Starts FSMCaller successfully.");
+ LOG.info("Starts FSMCaller successfully [nodeId={}].", nodeId);
return true;
}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
index 2b320a5740..cd40ce6168 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
@@ -218,7 +218,6 @@ public class ItTxResourcesVacuumTest extends
ClusterPerTestIntegrationTest {
view.upsert(parallelTx2, tupleForParallelTx);
CompletableFuture<Void> finishStartedFuture = new
CompletableFuture<>();
- CompletableFuture<Void> finishAllowedFuture = new
CompletableFuture<>();
node.dropMessages((n, msg) -> {
if (msg instanceof TxFinishReplicaRequest) {
@@ -227,7 +226,9 @@ public class ItTxResourcesVacuumTest extends
ClusterPerTestIntegrationTest {
if (finishRequest.txId().equals(txId)) {
finishStartedFuture.complete(null);
- joinWithTimeout(finishAllowedFuture);
+ log.info("Test: dropping finish on [node= {}].", n);
+
+ return true;
}
}
@@ -248,7 +249,7 @@ public class ItTxResourcesVacuumTest extends
ClusterPerTestIntegrationTest {
// Check that the volatile state of the transaction is preserved.
assertTrue(checkVolatileTxStateOnNodes(nodes, txId));
- assertTrue(finishAllowedFuture.complete(null));
+ node.stopDroppingMessages();
assertThat(commitFut, willCompleteSuccessfully());
@@ -395,16 +396,17 @@ public class ItTxResourcesVacuumTest extends
ClusterPerTestIntegrationTest {
view.upsert(tx, tuple1);
CompletableFuture<Void> cleanupStarted = new CompletableFuture<>();
- CompletableFuture<Void> cleanupAllowed = new CompletableFuture<>();
commitPartitionLeaseholder.dropMessages((n, msg) -> {
if (msg instanceof TxCleanupMessage) {
- cleanupStarted.complete(null);
-
- log.info("Test: cleanup started.");
+ log.info("Test: cleanup started on [node= {}].", n);
if (commitPartNodes.contains(n)) {
- joinWithTimeout(cleanupAllowed);
+ cleanupStarted.complete(null);
+
+ log.info("Test: dropping cleanup on [node= {}].", n);
+
+ return true;
}
}
@@ -424,7 +426,7 @@ public class ItTxResourcesVacuumTest extends
ClusterPerTestIntegrationTest {
assertTxStateVacuumized(Set.of(leaseholderForAnotherTuple.name()),
txId, commitPartId, false);
// Unblocking cleanup.
- assertTrue(cleanupAllowed.complete(null));
+ commitPartitionLeaseholder.stopDroppingMessages();
assertThat(commitFut, willCompleteSuccessfully());
@@ -490,14 +492,19 @@ public class ItTxResourcesVacuumTest extends
ClusterPerTestIntegrationTest {
view.upsert(tx, tuple);
CompletableFuture<Void> cleanupStarted = new CompletableFuture<>();
- CompletableFuture<Void> cleanupAllowedFut = new CompletableFuture<>();
boolean[] cleanupAllowed = new boolean[1];
commitPartitionLeaseholder.dropMessages((n, msg) -> {
- if (msg instanceof TxCleanupMessage && !cleanupAllowed[0]) {
+ if (msg instanceof TxCleanupMessage) {
+ log.info("Test: perform cleanup on [node= {}, msg={}].", n,
msg);
+
cleanupStarted.complete(null);
- joinWithTimeout(cleanupAllowedFut);
+ if (!cleanupAllowed[0]) {
+ log.info("Test: dropping cleanup on [node= {}].", n);
+
+ return true;
+ }
}
return false;
@@ -511,8 +518,6 @@ public class ItTxResourcesVacuumTest extends
ClusterPerTestIntegrationTest {
transferPrimary(cluster.runningNodes().collect(toSet()),
commitPartGrpId, commitPartNodes::contains);
- assertTrue(cleanupAllowedFut.complete(null));
-
cleanupAllowed[0] = true;
assertThat(commitFut, willCompleteSuccessfully());
@@ -521,10 +526,10 @@ public class ItTxResourcesVacuumTest extends
ClusterPerTestIntegrationTest {
waitForTxStateVacuum(txId, commitPartId, true, 10_000);
- Transaction roTxAfter = beginReadOnlyTx(anyNode());
-
log.info("Test: checking values.");
+ Transaction roTxAfter = beginReadOnlyTx(anyNode());
+
// Trying to read the value.
Tuple key = Tuple.create().set("key", tuple.longValue("key"));
checkValueReadOnly(view, roTxBefore, key, null);
@@ -575,19 +580,20 @@ public class ItTxResourcesVacuumTest extends
ClusterPerTestIntegrationTest {
view.upsert(tx, tuple);
CompletableFuture<Void> cleanupStarted = new CompletableFuture<>();
- CompletableFuture<Void> cleanupAllowedFut = new CompletableFuture<>();
boolean[] cleanupAllowed = new boolean[1];
// Cleanup may be triggered by the primary replica reelection as well.
runningNodes().filter(n ->
commitPartNodes.contains(n.name())).forEach(nd -> nd.dropMessages((n, msg) -> {
- if (msg instanceof TxCleanupMessage && !cleanupAllowed[0]) {
- cleanupStarted.complete(null);
+ if (msg instanceof TxCleanupMessage) {
+ log.info("Test: perform cleanup on [node= {}, msg={}].", n,
msg);
- log.warn("Test: cleanup started.");
+ cleanupStarted.complete(null);
- joinWithTimeout(cleanupAllowedFut);
+ if (!cleanupAllowed[0]) {
+ log.info("Test: dropping cleanup on [node= {}].", n);
- log.info("Test: cleanup resumed.");
+ return true;
+ }
}
return false;
@@ -609,8 +615,6 @@ public class ItTxResourcesVacuumTest extends
ClusterPerTestIntegrationTest {
log.info("Test: volatile state vacuumized");
- assertTrue(cleanupAllowedFut.complete(null));
-
cleanupAllowed[0] = true;
assertThat(commitFut, willCompleteSuccessfully());
@@ -1066,13 +1070,4 @@ public class ItTxResourcesVacuumTest extends
ClusterPerTestIntegrationTest {
.get();
}
- private void joinWithTimeout(CompletableFuture<?> future) {
- future.orTimeout(60, TimeUnit.SECONDS)
- .exceptionally(e -> {
- log.error("Could not wait for the future.", e);
-
- return null;
- })
- .join();
- }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 5dfb1ad1e4..dec713da93 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -1610,14 +1610,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
* @return Future.
*/
public CompletableFuture<TableViewInternal> tableAsync(long
causalityToken, int id) {
- if (!busyLock.enterBusy()) {
- throw new IgniteException(new NodeStoppingException());
- }
- try {
- return tablesById(causalityToken).thenApply(tablesById ->
tablesById.get(id));
- } finally {
- busyLock.leaveBusy();
- }
+ return inBusyLockAsync(busyLock, () ->
tablesById(causalityToken).thenApply(tablesById -> tablesById.get(id)));
}
@Override
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 00ca2e230a..7113ee5b68 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -1802,7 +1802,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
return awaitCleanupReadyFutures(request.txId(), request.commit())
.thenCompose(res -> {
- if (res.hadUpdateFutures()) {
+ if (res.hadUpdateFutures() || res.forceCleanup()) {
HybridTimestamp commandTimestamp = clockService.now();
return reliableCatalogVersionFor(commandTimestamp)
@@ -1830,11 +1830,22 @@ public class PartitionReplicaListener implements
ReplicaListener {
List<CompletableFuture<?>> txUpdateFutures = new ArrayList<>();
List<CompletableFuture<?>> txReadFutures = new ArrayList<>();
+ AtomicBoolean forceCleanup = new AtomicBoolean(true);
+
txCleanupReadyFutures.compute(txId, (id, txOps) -> {
if (txOps == null) {
return null;
}
+ // Cleanup futures (both read and update) are empty in two cases:
+ // - there were no actions in the transaction
+ // - write intent switch is being executed on the new primary (the
primary has changed after write intent appeared)
+ // Both cases are expected to happen extremely rarely so we are
fine to force the write intent switch.
+
+ // The reason for the forced switch is that otherwise write
intents would not be switched (if there is no volatile state and
+ // FuturesCleanupResult.hadUpdateFutures() returns false).
+ forceCleanup.set(txOps.futures.isEmpty());
+
txOps.futures.forEach((opType, futures) -> {
if (opType.isRwRead()) {
txReadFutures.addAll(futures.values());
@@ -1850,7 +1861,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
return allOfFuturesExceptionIgnored(txUpdateFutures, commit, txId)
.thenCompose(v -> allOfFuturesExceptionIgnored(txReadFutures,
commit, txId))
- .thenApply(v -> new
FuturesCleanupResult(!txReadFutures.isEmpty(), !txUpdateFutures.isEmpty()));
+ .thenApply(v -> new
FuturesCleanupResult(!txReadFutures.isEmpty(), !txUpdateFutures.isEmpty(),
forceCleanup.get()));
}
private CompletableFuture<WriteIntentSwitchReplicatedInfo>
applyWriteIntentSwitchCommand(
@@ -3951,10 +3962,12 @@ public class PartitionReplicaListener implements
ReplicaListener {
private static class FuturesCleanupResult {
private final boolean hadReadFutures;
private final boolean hadUpdateFutures;
+ private final boolean forceCleanup;
- public FuturesCleanupResult(boolean hadReadFutures, boolean
hadUpdateFutures) {
+ public FuturesCleanupResult(boolean hadReadFutures, boolean
hadUpdateFutures, boolean forceCleanup) {
this.hadReadFutures = hadReadFutures;
this.hadUpdateFutures = hadUpdateFutures;
+ this.forceCleanup = forceCleanup;
}
public boolean hadReadFutures() {
@@ -3964,6 +3977,10 @@ public class PartitionReplicaListener implements
ReplicaListener {
public boolean hadUpdateFutures() {
return hadUpdateFutures;
}
+
+ public boolean forceCleanup() {
+ return forceCleanup;
+ }
}
private CompletableFuture<?> processOperationRequestWithTxRwCounter(
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
index dcf97ff5da..82cfee4fab 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
@@ -51,7 +51,7 @@ import org.jetbrains.annotations.Nullable;
*/
public class TxCleanupRequestSender {
/** Logger. */
- private final IgniteLogger log =
Loggers.forClass(TxCleanupRequestSender.class);
+ private static final IgniteLogger LOG =
Loggers.forClass(TxCleanupRequestSender.class);
/** Placement driver helper. */
private final PlacementDriverHelper placementDriverHelper;
@@ -96,7 +96,7 @@ public class TxCleanupRequestSender {
if (msg instanceof TxCleanupMessageErrorResponse) {
TxCleanupMessageErrorResponse response =
(TxCleanupMessageErrorResponse) msg;
- log.warn("Exception happened during transaction cleanup
[txId={}].", response.throwable(), response.txId());
+ LOG.warn("Exception happened during transaction cleanup
[txId={}].", response.throwable(), response.txId());
}
}
});