This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 87019581cf IGNITE-20379 Concurrency issues with write intent tracking
(#2562)
87019581cf is described below
commit 87019581cff56a0c4611a6fc43af1297b863ec5f
Author: Cyrill <[email protected]>
AuthorDate: Mon Sep 11 10:35:36 2023 +0300
IGNITE-20379 Concurrency issues with write intent tracking (#2562)
---
.../rpc/impl/RaftGroupEventsClientListener.java | 20 ++++----------------
.../table/distributed/StorageUpdateHandler.java | 12 +++---------
.../table/distributed/replicator/PendingRows.java | 18 +++++-------------
3 files changed, 12 insertions(+), 38 deletions(-)
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupEventsClientListener.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupEventsClientListener.java
index e3685b8085..8455f78d71 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupEventsClientListener.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupEventsClientListener.java
@@ -17,10 +17,10 @@
package org.apache.ignite.raft.jraft.rpc.impl;
-import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.raft.LeaderElectionListener;
@@ -42,15 +42,7 @@ public class RaftGroupEventsClientListener {
* @param listener Listener.
*/
public void addLeaderElectionListener(ReplicationGroupId groupId,
LeaderElectionListener listener) {
- leaderElectionListeners.compute(groupId, (k, listeners) -> {
- if (listeners == null) {
- listeners = new ArrayList<>();
- }
-
- listeners.add(listener);
-
- return listeners;
- });
+ leaderElectionListeners.computeIfAbsent(groupId, k -> new
CopyOnWriteArrayList<>()).add(listener);
}
/**
@@ -60,13 +52,9 @@ public class RaftGroupEventsClientListener {
* @param listener Listener.
*/
public void removeLeaderElectionListener(ReplicationGroupId groupId,
LeaderElectionListener listener) {
- leaderElectionListeners.compute(groupId, (k, listeners) -> {
- if (listeners == null) {
- return null;
- }
-
+ leaderElectionListeners.computeIfPresent(groupId, (k, listeners) -> {
listeners.remove(listener);
-
+
return listeners;
});
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
index d53455a4ca..cd6e8518fb 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
@@ -265,9 +265,7 @@ public class StorageUpdateHandler {
* @param commitTimestamp Commit timestamp. Not {@code null} if {@code
commit} is {@code true}.
*/
public void handleTransactionCleanup(UUID txId, boolean commit, @Nullable
HybridTimestamp commitTimestamp) {
- Set<RowId> pendingRowIds = pendingRows.getPendingRowIds(txId);
-
- handleTransactionCleanup(pendingRowIds, commit, commitTimestamp, () ->
pendingRows.removePendingRowIds(txId));
+ handleTransactionCleanup(txId, commit, commitTimestamp, () -> {});
}
/**
@@ -280,13 +278,9 @@ public class StorageUpdateHandler {
*/
public void handleTransactionCleanup(UUID txId, boolean commit,
@Nullable HybridTimestamp commitTimestamp, Runnable onApplication)
{
- Set<RowId> pendingRowIds = pendingRows.getPendingRowIds(txId);
-
- handleTransactionCleanup(pendingRowIds, commit, commitTimestamp, () ->
{
- pendingRows.removePendingRowIds(txId);
+ Set<RowId> pendingRowIds = pendingRows.removePendingRowIds(txId);
- onApplication.run();
- });
+ handleTransactionCleanup(pendingRowIds, commit, commitTimestamp,
onApplication);
}
/**
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PendingRows.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PendingRows.java
index 55411b0b8a..110d45144d 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PendingRows.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PendingRows.java
@@ -77,23 +77,15 @@ public class PendingRows {
}
/**
- * Removes all row IDs for the given transaction.
+ * Removes all pending row IDs for the given transaction.
*
* @param txId Transaction ID.
+ * @return Pending row IDs mapped to the provided transaction or an empty
set if there were none.
*/
- public void removePendingRowIds(UUID txId) {
- txsPendingRowIds.remove(txId);
- }
+ public Set<RowId> removePendingRowIds(UUID txId) {
+ Set<RowId> pendingRows = txsPendingRowIds.remove(txId);
- /**
- * Returns pending row IDs for the given transaction or an empty set if
there are no pending rows.
- *
- * @param txId Transaction ID.
- * @return Pending row IDs.
- */
- public Set<RowId> getPendingRowIds(UUID txId) {
- return txsPendingRowIds.getOrDefault(txId, EMPTY_SET);
+ return pendingRows == null ? EMPTY_SET : pendingRows;
}
-
}