This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 33c16499ae IGNITE-23193 Introduce Low Watermark lock API (#4498)
33c16499ae is described below

commit 33c16499ae657fff9ebf7d9cda2ba41b9653f1bf
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Mon Oct 7 13:19:07 2024 +0300

    IGNITE-23193 Introduce Low Watermark lock API (#4498)
    
    * Introduce `LowWatermark.tryLock` and `unlock` methods as a cleaner way to 
"hold" the watermark (prevent it from changing above certain value)
    * Remove `LOW_WATERMARK_BEFORE_CHANGE` event
    * Remove `TxManagerImpl.readOnlyTxFutureById` map and 
`lowWatermarkValueReference`
    
    Previously, `LOW_WATERMARK_BEFORE_CHANGE` listener was used to prevent 
watermark update while there are active read-only transactions with a lower 
timestamp.
---
 .../ignite/internal/lowwatermark/LowWatermark.java | 17 ++++
 .../internal/lowwatermark/LowWatermarkImpl.java    | 99 +++++++++++++++++-----
 ...owWatermarkEvent.java => LowWatermarkLock.java} | 34 ++++----
 .../lowwatermark/event/LowWatermarkEvent.java      |  7 --
 .../lowwatermark/LowWatermarkImplTest.java         | 67 ---------------
 .../internal/lowwatermark/TestLowWatermark.java    | 69 +++++++++++++--
 .../internal/tx/impl/ReadOnlyTransactionImpl.java  | 13 ++-
 .../ignite/internal/tx/impl/TxManagerImpl.java     | 97 +++------------------
 .../apache/ignite/internal/tx/TxManagerTest.java   |  3 -
 .../tx/impl/ReadOnlyTransactionImplTest.java       |  4 +-
 10 files changed, 201 insertions(+), 209 deletions(-)

diff --git 
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermark.java
 
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermark.java
index 7d89c41720..5d79d0a7c5 100644
--- 
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermark.java
+++ 
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermark.java
@@ -49,4 +49,21 @@ public interface LowWatermark extends 
EventProducer<LowWatermarkEvent, LowWaterm
      * @param newLowWatermark Candidate for update.
      */
     void updateLowWatermark(HybridTimestamp newLowWatermark);
+
+    /**
+     * Locks the low watermark at the provided timestamp (prevents it from 
being updated to a value higher than the provided one).
+     *
+     * @param lockId Lock id.
+     * @param ts Timestamp to lock.
+     * @return True if the lock was acquired,
+     *     false if the lock was not acquired due to the low watermark being 
higher than the provided timestamp.
+     */
+    boolean tryLock(Object lockId, HybridTimestamp ts);
+
+    /**
+     * Releases the lock created by {@link #tryLock(Object, HybridTimestamp)}.
+     *
+     * @param lockId Lock id.
+     */
+    void unlock(Object lockId);
 }
diff --git 
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java
 
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java
index d3d343d6ec..eb37ffe913 100644
--- 
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java
+++ 
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java
@@ -20,13 +20,14 @@ package org.apache.ignite.internal.lowwatermark;
 import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
 import static org.apache.ignite.internal.hlc.HybridTimestamp.MIN_VALUE;
 import static 
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
-import static 
org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent.LOW_WATERMARK_BEFORE_CHANGE;
 import static 
org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent.LOW_WATERMARK_CHANGED;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
 
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -120,6 +121,8 @@ public class LowWatermarkImpl extends 
AbstractEventProducer<LowWatermarkEvent, L
             new LowWatermarkCandidate(MIN_VALUE, nullCompletedFuture())
     );
 
+    private final Map<Object, LowWatermarkLock> locks = new 
ConcurrentHashMap<>();
+
     /**
      * Constructor.
      *
@@ -233,8 +236,10 @@ public class LowWatermarkImpl extends 
AbstractEventProducer<LowWatermarkEvent, L
         updateLowWatermarkLock.writeLock().lock();
 
         try {
-            assert lowWatermark == null || 
newLowWatermark.compareTo(lowWatermark) > 0 :
-                    "Low watermark should only grow: [cur=" + lowWatermark + 
", new=" + newLowWatermark + "]";
+            HybridTimestamp lwm = lowWatermark;
+
+            assert lwm == null || newLowWatermark.compareTo(lwm) > 0 :
+                    "Low watermark should only grow: [cur=" + lwm + ", new=" + 
newLowWatermark + "]";
 
             lowWatermark = newLowWatermark;
         } finally {
@@ -296,31 +301,83 @@ public class LowWatermarkImpl extends 
AbstractEventProducer<LowWatermarkEvent, L
         });
     }
 
+    @Override
+    public boolean tryLock(Object lockId, HybridTimestamp ts) {
+        return inBusyLock(busyLock, () -> {
+            updateLowWatermarkLock.readLock().lock();
+
+            try {
+                HybridTimestamp lwm = lowWatermark;
+                if (lwm != null && ts.compareTo(lwm) < 0) {
+                    return false;
+                }
+
+                locks.put(lockId, new LowWatermarkLock(ts));
+
+                return true;
+            } finally {
+                updateLowWatermarkLock.readLock().unlock();
+            }
+        });
+    }
+
+    @Override
+    public void unlock(Object lockId) {
+        LowWatermarkLock lock = locks.remove(lockId);
+
+        if (lock == null) {
+            // Already released.
+            return;
+        }
+
+        lock.future().complete(null);
+    }
+
     CompletableFuture<Void> updateAndNotify(HybridTimestamp newLowWatermark) {
-        return inBusyLockAsync(busyLock, () ->
-                fireEvent(LOW_WATERMARK_BEFORE_CHANGE, new 
ChangeLowWatermarkEventParameters(newLowWatermark))
-                        .thenComposeAsync(unused -> {
-                            vaultManager.put(LOW_WATERMARK_VAULT_KEY, 
ByteUtils.toBytes(newLowWatermark));
+        return inBusyLockAsync(busyLock, () -> {
+                    vaultManager.put(LOW_WATERMARK_VAULT_KEY, 
ByteUtils.toBytes(newLowWatermark));
 
-                            setLowWatermark(newLowWatermark);
+                    return waitForLocksAndSetLowWatermark(newLowWatermark)
+                            .thenComposeAsync(unused2 -> fireEvent(
+                                    LOW_WATERMARK_CHANGED,
+                                    new 
ChangeLowWatermarkEventParameters(newLowWatermark)), scheduledThreadPool)
+                            .whenCompleteAsync((unused, throwable) -> {
+                                if (throwable != null) {
+                                    if (!(throwable instanceof 
NodeStoppingException)) {
+                                        LOG.error("Failed to update low 
watermark, will schedule again: {}", throwable, newLowWatermark);
 
-                            return fireEvent(LOW_WATERMARK_CHANGED, new 
ChangeLowWatermarkEventParameters(newLowWatermark));
-                        }, scheduledThreadPool)
-                        .whenCompleteAsync((unused, throwable) -> {
-                            if (throwable != null) {
-                                if (!(throwable instanceof 
NodeStoppingException)) {
-                                    LOG.error("Failed to update low watermark, 
will schedule again: {}", throwable, newLowWatermark);
+                                        failureManager.process(new 
FailureContext(CRITICAL_ERROR, throwable));
 
-                                    failureManager.process(new 
FailureContext(CRITICAL_ERROR, throwable));
+                                        inBusyLock(busyLock, 
this::scheduleUpdateLowWatermarkBusy);
+                                    }
+                                } else {
+                                    LOG.info("Successful low watermark update: 
{}", newLowWatermark);
 
                                     inBusyLock(busyLock, 
this::scheduleUpdateLowWatermarkBusy);
                                 }
-                            } else {
-                                LOG.info("Successful low watermark update: 
{}", newLowWatermark);
-
-                                inBusyLock(busyLock, 
this::scheduleUpdateLowWatermarkBusy);
-                            }
-                        }, scheduledThreadPool)
+                            }, scheduledThreadPool);
+                }
         );
     }
+
+    private CompletableFuture<Void> 
waitForLocksAndSetLowWatermark(HybridTimestamp newLowWatermark) {
+        return inBusyLockAsync(busyLock, () -> {
+            // Write lock so no new LWM locks can be added.
+            updateLowWatermarkLock.writeLock().lock();
+
+            try {
+                for (LowWatermarkLock lock : locks.values()) {
+                    if (lock.timestamp().compareTo(newLowWatermark) < 0) {
+                        return lock.future().thenCompose(unused -> 
waitForLocksAndSetLowWatermark(newLowWatermark));
+                    }
+                }
+
+                setLowWatermark(newLowWatermark);
+
+                return nullCompletedFuture();
+            } finally {
+                updateLowWatermarkLock.writeLock().unlock();
+            }
+        });
+    }
 }
diff --git 
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/event/LowWatermarkEvent.java
 
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkLock.java
similarity index 59%
copy from 
modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/event/LowWatermarkEvent.java
copy to 
modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkLock.java
index 216a5c0d63..d6046e9a8f 100644
--- 
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/event/LowWatermarkEvent.java
+++ 
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkLock.java
@@ -15,23 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.lowwatermark.event;
+package org.apache.ignite.internal.lowwatermark;
 
-import org.apache.ignite.internal.event.Event;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 
-/** Low watermark events. */
-public enum LowWatermarkEvent implements Event {
-    /**
-     * This event is fired before the low watermark changes.
-     *
-     * @see ChangeLowWatermarkEventParameters
-     */
-    LOW_WATERMARK_BEFORE_CHANGE,
+final class LowWatermarkLock {
+    private final HybridTimestamp timestamp;
 
-    /**
-     * This event is fired on a low watermark change.
-     *
-     * @see ChangeLowWatermarkEventParameters
-     */
-    LOW_WATERMARK_CHANGED
+    private final CompletableFuture<Void> future = new CompletableFuture<>();
+
+    LowWatermarkLock(HybridTimestamp timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    public HybridTimestamp timestamp() {
+        return timestamp;
+    }
+
+    CompletableFuture<Void> future() {
+        return future;
+    }
 }
diff --git 
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/event/LowWatermarkEvent.java
 
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/event/LowWatermarkEvent.java
index 216a5c0d63..9299bee188 100644
--- 
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/event/LowWatermarkEvent.java
+++ 
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/event/LowWatermarkEvent.java
@@ -21,13 +21,6 @@ import org.apache.ignite.internal.event.Event;
 
 /** Low watermark events. */
 public enum LowWatermarkEvent implements Event {
-    /**
-     * This event is fired before the low watermark changes.
-     *
-     * @see ChangeLowWatermarkEventParameters
-     */
-    LOW_WATERMARK_BEFORE_CHANGE,
-
     /**
      * This event is fired on a low watermark change.
      *
diff --git 
a/modules/low-watermark/src/test/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImplTest.java
 
b/modules/low-watermark/src/test/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImplTest.java
index e5e5dd5b1a..fc2083ba68 100644
--- 
a/modules/low-watermark/src/test/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImplTest.java
+++ 
b/modules/low-watermark/src/test/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImplTest.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.lowwatermark;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.stream.Collectors.toList;
 import static 
org.apache.ignite.internal.lowwatermark.LowWatermarkImpl.LOW_WATERMARK_VAULT_KEY;
-import static 
org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent.LOW_WATERMARK_BEFORE_CHANGE;
 import static 
org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent.LOW_WATERMARK_CHANGED;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutFast;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
@@ -325,72 +324,6 @@ public class LowWatermarkImplTest extends 
BaseIgniteAbstractTest {
         assertThat(lowWatermark.updateAndNotify(newLwm), 
willCompleteSuccessfully());
     }
 
-    @Test
-    void testSequenceEventsOfLowWatermarkChange() {
-        HybridTimestamp newLowWatermarkCandidate = 
lowWatermark.createNewLowWatermarkCandidate();
-
-        EventListener<ChangeLowWatermarkEventParameters> beforeChangeListener 
= spy(new EventListener<ChangeLowWatermarkEventParameters>() {
-            @Override
-            public CompletableFuture<Boolean> 
notify(ChangeLowWatermarkEventParameters parameters) {
-                assertEquals(newLowWatermarkCandidate, 
parameters.newLowWatermark());
-
-                return trueCompletedFuture();
-            }
-        });
-
-        EventListener<ChangeLowWatermarkEventParameters> changedListener = 
spy(new EventListener<ChangeLowWatermarkEventParameters>() {
-            @Override
-            public CompletableFuture<Boolean> 
notify(ChangeLowWatermarkEventParameters parameters) {
-                assertEquals(newLowWatermarkCandidate, 
parameters.newLowWatermark());
-
-                return trueCompletedFuture();
-            }
-        });
-
-        lowWatermark.listen(LOW_WATERMARK_BEFORE_CHANGE, beforeChangeListener);
-        lowWatermark.listen(LOW_WATERMARK_CHANGED, changedListener);
-
-        InOrder inOrder = inOrder(beforeChangeListener, changedListener);
-
-        assertThat(lowWatermark.updateAndNotify(newLowWatermarkCandidate), 
willCompleteSuccessfully());
-
-        inOrder.verify(beforeChangeListener).notify(any());
-        inOrder.verify(changedListener).notify(any());
-    }
-
-    @Test
-    void testChangedEvenCalledOnlyAfterCompletionBeforeChange() {
-        var startBeforeChangeListenerFuture = new CompletableFuture<Void>();
-        var startChangedListenerFuture = new CompletableFuture<Void>();
-
-        var finishBeforeChangeListenerFuture = new CompletableFuture<Void>();
-
-        EventListener<ChangeLowWatermarkEventParameters> beforeChangeListener 
= parameters -> {
-            startBeforeChangeListenerFuture.complete(null);
-
-            return finishBeforeChangeListenerFuture.thenApply(unused -> true);
-        };
-
-        EventListener<ChangeLowWatermarkEventParameters> changedListener = 
parameters -> {
-            startChangedListenerFuture.complete(null);
-
-            return trueCompletedFuture();
-        };
-
-        lowWatermark.listen(LOW_WATERMARK_BEFORE_CHANGE, beforeChangeListener);
-        lowWatermark.listen(LOW_WATERMARK_CHANGED, changedListener);
-
-        CompletableFuture<Void> updateAndNotifyFuture = 
lowWatermark.updateAndNotify(lowWatermark.createNewLowWatermarkCandidate());
-
-        assertThat(startBeforeChangeListenerFuture, 
willCompleteSuccessfully());
-        assertThat(startChangedListenerFuture, willTimeoutFast());
-        assertFalse(updateAndNotifyFuture.isDone());
-
-        finishBeforeChangeListenerFuture.complete(null);
-        assertThat(startChangedListenerFuture, willCompleteSuccessfully());
-        assertThat(updateAndNotifyFuture, willCompleteSuccessfully());
-    }
-
     private CompletableFuture<HybridTimestamp> listenUpdateLowWatermark() {
         var future = new CompletableFuture<HybridTimestamp>();
 
diff --git 
a/modules/low-watermark/src/testFixtures/java/org/apache/ignite/internal/lowwatermark/TestLowWatermark.java
 
b/modules/low-watermark/src/testFixtures/java/org/apache/ignite/internal/lowwatermark/TestLowWatermark.java
index eae8d89a91..8fbd949823 100644
--- 
a/modules/low-watermark/src/testFixtures/java/org/apache/ignite/internal/lowwatermark/TestLowWatermark.java
+++ 
b/modules/low-watermark/src/testFixtures/java/org/apache/ignite/internal/lowwatermark/TestLowWatermark.java
@@ -19,12 +19,14 @@ package org.apache.ignite.internal.lowwatermark;
 
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.concurrent.CompletableFuture.supplyAsync;
-import static 
org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent.LOW_WATERMARK_BEFORE_CHANGE;
 import static 
org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent.LOW_WATERMARK_CHANGED;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Consumer;
@@ -49,6 +51,8 @@ public class TestLowWatermark extends 
AbstractEventProducer<LowWatermarkEvent, L
 
     private final ReadWriteLock updateLowWatermarkLock = new 
ReentrantReadWriteLock();
 
+    private final Map<Object, LowWatermarkLock> locks = new 
ConcurrentHashMap<>();
+
     @Override
     public @Nullable HybridTimestamp getLowWatermark() {
         return ts;
@@ -67,7 +71,9 @@ public class TestLowWatermark extends 
AbstractEventProducer<LowWatermarkEvent, L
 
     @Override
     public void updateLowWatermark(HybridTimestamp newLowWatermark) {
-        if (ts == null || newLowWatermark.compareTo(ts) > 0) {
+        var currentTs = ts;
+
+        if (currentTs == null || newLowWatermark.compareTo(currentTs) > 0) {
             supplyAsync(() -> updateAndNotifyInternal(newLowWatermark))
                     .thenCompose(Function.identity())
                     .whenComplete((unused, throwable) -> {
@@ -78,6 +84,36 @@ public class TestLowWatermark extends 
AbstractEventProducer<LowWatermarkEvent, L
         }
     }
 
+    @Override
+    public boolean tryLock(Object lockId, HybridTimestamp lockTs) {
+        updateLowWatermarkLock.readLock().lock();
+
+        try {
+            HybridTimestamp lwm = ts;
+            if (lwm != null && lockTs.compareTo(lwm) < 0) {
+                return false;
+            }
+
+            locks.put(lockId, new LowWatermarkLock(lockTs));
+
+            return true;
+        } finally {
+            updateLowWatermarkLock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public void unlock(Object lockId) {
+        LowWatermarkLock lock = locks.remove(lockId);
+
+        if (lock == null) {
+            // Already released.
+            return;
+        }
+
+        lock.future().complete(null);
+    }
+
     /**
      * Update low watermark and notify listeners.
      *
@@ -88,7 +124,9 @@ public class TestLowWatermark extends 
AbstractEventProducer<LowWatermarkEvent, L
         try {
             assertNotNull(newTs);
 
-            assertTrue(ts == null || ts.longValue() < newTs.longValue(), "ts=" 
+ ts + ", newTs=" + newTs);
+            HybridTimestamp currentTs = ts;
+
+            assertTrue(currentTs == null || currentTs.longValue() < 
newTs.longValue(), "ts=" + currentTs + ", newTs=" + newTs);
 
             return updateAndNotifyInternal(newTs);
         } catch (Throwable t) {
@@ -114,11 +152,26 @@ public class TestLowWatermark extends 
AbstractEventProducer<LowWatermarkEvent, L
     private CompletableFuture<Void> updateAndNotifyInternal(HybridTimestamp 
newLowWatermark) {
         var parameters = new 
ChangeLowWatermarkEventParameters(newLowWatermark);
 
-        return fireEvent(LOW_WATERMARK_BEFORE_CHANGE, parameters)
-                .thenCompose(unused -> {
-                    setLowWatermark(newLowWatermark);
+        return waitForLocksAndSetLowWatermark(newLowWatermark)
+                .thenCompose(unused -> fireEvent(LOW_WATERMARK_CHANGED, 
parameters));
+    }
+
+    private CompletableFuture<Void> 
waitForLocksAndSetLowWatermark(HybridTimestamp newLowWatermark) {
+        // Write lock so no new LWM locks can be added.
+        updateLowWatermarkLock.writeLock().lock();
+
+        try {
+            for (LowWatermarkLock lock : locks.values()) {
+                if (lock.timestamp().compareTo(newLowWatermark) <= 0) {
+                    return lock.future().thenCompose(unused -> 
waitForLocksAndSetLowWatermark(newLowWatermark));
+                }
+            }
+
+            setLowWatermark(newLowWatermark);
 
-                    return fireEvent(LOW_WATERMARK_CHANGED, parameters);
-                });
+            return nullCompletedFuture();
+        } finally {
+            updateLowWatermarkLock.writeLock().unlock();
+        }
     }
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
index 9237802fc7..e3f8d8d61b 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
@@ -41,6 +41,9 @@ class ReadOnlyTransactionImpl extends 
IgniteAbstractTransactionImpl {
     /** The tracker is used to track an observable timestamp. */
     private final HybridTimestampTracker observableTsTracker;
 
+    /** Transaction future. */
+    private final CompletableFuture<Void> txFuture;
+
     /**
      * The constructor.
      *
@@ -55,12 +58,14 @@ class ReadOnlyTransactionImpl extends 
IgniteAbstractTransactionImpl {
             HybridTimestampTracker observableTsTracker,
             UUID id,
             UUID txCoordinatorId,
-            HybridTimestamp readTimestamp
+            HybridTimestamp readTimestamp,
+            CompletableFuture<Void> txFuture
     ) {
         super(txManager, id, txCoordinatorId);
 
         this.readTimestamp = readTimestamp;
         this.observableTsTracker = observableTsTracker;
+        this.txFuture = txFuture;
     }
 
     @Override
@@ -114,6 +119,10 @@ class ReadOnlyTransactionImpl extends 
IgniteAbstractTransactionImpl {
 
         observableTsTracker.update(executionTimestamp);
 
-        return ((TxManagerImpl) 
txManager).completeReadOnlyTransactionFuture(new 
TxIdAndTimestamp(readTimestamp, id()));
+        txFuture.complete(null);
+
+        ((TxManagerImpl) txManager).completeReadOnlyTransactionFuture(new 
TxIdAndTimestamp(readTimestamp, id()));
+
+        return txFuture;
     }
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 9243d96d7f..d8bae15056 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -39,15 +39,11 @@ import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_READ_ONLY_TOO_O
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.Comparator;
-import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ForkJoinPool;
@@ -56,7 +52,6 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -70,8 +65,6 @@ import 
org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.lowwatermark.LowWatermark;
-import 
org.apache.ignite.internal.lowwatermark.event.ChangeLowWatermarkEventParameters;
-import org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.network.MessagingService;
@@ -138,17 +131,6 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
     /** The local state storage. */
     private final VolatileTxStateMetaStorage txStateVolatileStorage = new 
VolatileTxStateMetaStorage();
 
-    /** Future of a read-only transaction by it {@link TxIdAndTimestamp}. */
-    private final ConcurrentNavigableMap<TxIdAndTimestamp, 
CompletableFuture<Void>> readOnlyTxFutureById = new ConcurrentSkipListMap<>(
-            
Comparator.comparing(TxIdAndTimestamp::getReadTimestamp).thenComparing(TxIdAndTimestamp::getTxId)
-    );
-
-    /**
-     * Low watermark value, does not allow creating read-only transactions 
less than or equal to this value, {@code null} means it has never
-     * been updated yet.
-     */
-    private final AtomicReference<HybridTimestamp> lowWatermarkValueReference 
= new AtomicReference<>();
-
     /** Low watermark. */
     private final LowWatermark lowWatermark;
 
@@ -201,8 +183,6 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
 
     private final EventListener<PrimaryReplicaEventParameters> 
primaryReplicaElectedListener;
 
-    private final EventListener<ChangeLowWatermarkEventParameters> 
lowWatermarkChangedListener = this::onLwnChanged;
-
     /** Counter of read-write transactions that were created and completed 
locally on the node. */
     private final LocalRwTxCounter localRwTxCounter;
 
@@ -412,34 +392,24 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
                 ? HybridTimestamp.max(observableTimestamp, 
currentReadTimestamp(beginTimestamp))
                 : currentReadTimestamp(beginTimestamp);
 
-        TxIdAndTimestamp txIdAndTimestamp = new 
TxIdAndTimestamp(readTimestamp, txId);
-
-        CompletableFuture<Void> txFuture = new CompletableFuture<>();
-
-        CompletableFuture<Void> oldFuture = 
readOnlyTxFutureById.put(txIdAndTimestamp, txFuture);
-        assert oldFuture == null : "previous transaction has not completed 
yet: " + txIdAndTimestamp;
-
-        HybridTimestamp lowWatermark = this.lowWatermarkValueReference.get();
-
-        if (lowWatermark != null && readTimestamp.compareTo(lowWatermark) <= 
0) {
-            // "updateLowWatermark" method updates "this.lowWatermark" field, 
and only then scans "this.readOnlyTxFutureById" for old
-            // transactions to wait. In order for that code to work safely, we 
have to make sure that no "too old" transactions will be
-            // created here in "begin" method after "this.lowWatermark" is 
already updated. The simplest way to achieve that is to check
-            // LW after we add transaction to the map (adding transaction to 
the map before reading LW value, of course).
-            readOnlyTxFutureById.remove(txIdAndTimestamp);
-
-            // Completing the future is necessary, because 
"updateLowWatermark" method may already wait for it if race condition happened.
-            txFuture.complete(null);
-
+        boolean lockAcquired = lowWatermark.tryLock(txId, readTimestamp);
+        if (!lockAcquired) {
             throw new IgniteInternalException(
                     TX_READ_ONLY_TOO_OLD_ERR,
                     "Timestamp of read-only transaction must be greater than 
the low watermark: [txTimestamp={}, lowWatermark={}]",
                     readTimestamp,
-                    lowWatermark
-            );
+                    lowWatermark.getLowWatermark());
         }
 
-        return new ReadOnlyTransactionImpl(this, timestampTracker, txId, 
localNodeId, readTimestamp);
+        try {
+            CompletableFuture<Void> txFuture = new CompletableFuture<>();
+            txFuture.whenComplete((unused, throwable) -> 
lowWatermark.unlock(txId));
+
+            return new ReadOnlyTransactionImpl(this, timestampTracker, txId, 
localNodeId, readTimestamp, txFuture);
+        } catch (Throwable t) {
+            lowWatermark.unlock(txId);
+            throw t;
+        }
     }
 
     /**
@@ -786,10 +756,6 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
 
             
placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, 
primaryReplicaElectedListener);
 
-            lowWatermark.listen(LowWatermarkEvent.LOW_WATERMARK_BEFORE_CHANGE, 
lowWatermarkChangedListener);
-
-            lowWatermarkValueReference.set(lowWatermark.getLowWatermark());
-
             return nullCompletedFuture();
         });
     }
@@ -815,8 +781,6 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
 
         
placementDriver.removeListener(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, 
primaryReplicaElectedListener);
 
-        
lowWatermark.removeListener(LowWatermarkEvent.LOW_WATERMARK_BEFORE_CHANGE, 
lowWatermarkChangedListener);
-
         shutdownAndAwaitTermination(writeIntentSwitchPool, 10, 
TimeUnit.SECONDS);
 
         return nullCompletedFuture();
@@ -871,34 +835,12 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
         return runAsync(runnable, writeIntentSwitchPool);
     }
 
-    CompletableFuture<Void> completeReadOnlyTransactionFuture(TxIdAndTimestamp 
txIdAndTimestamp) {
+    void completeReadOnlyTransactionFuture(TxIdAndTimestamp txIdAndTimestamp) {
         finishedTxs.add(1);
 
-        CompletableFuture<Void> readOnlyTxFuture = 
readOnlyTxFutureById.remove(txIdAndTimestamp);
-
-        assert readOnlyTxFuture != null : txIdAndTimestamp;
-
-        readOnlyTxFuture.complete(null);
-
         UUID txId = txIdAndTimestamp.getTxId();
 
         transactionInflights.markReadOnlyTxFinished(txId);
-
-        return readOnlyTxFuture;
-    }
-
-    private CompletableFuture<Boolean> 
onLwnChanged(ChangeLowWatermarkEventParameters parameters) {
-        return inBusyLockAsync(busyLock, () -> {
-            HybridTimestamp newLowWatermark = parameters.newLowWatermark();
-
-            increaseLowWatermarkValueReferenceBusy(newLowWatermark);
-
-            TxIdAndTimestamp upperBound = new 
TxIdAndTimestamp(newLowWatermark, new UUID(Long.MAX_VALUE, Long.MAX_VALUE));
-
-            List<CompletableFuture<Void>> readOnlyTxFutures = 
List.copyOf(readOnlyTxFutureById.headMap(upperBound, true).values());
-
-            return 
allOf(readOnlyTxFutures.toArray(CompletableFuture[]::new)).thenApply(unused -> 
false);
-        });
     }
 
     @Override
@@ -1018,17 +960,4 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
             return null;
         });
     }
-
-    private void increaseLowWatermarkValueReferenceBusy(HybridTimestamp 
newLowWatermark) {
-        lowWatermarkValueReference.updateAndGet(previousLowWatermark -> {
-            if (previousLowWatermark == null) {
-                return newLowWatermark;
-            }
-
-            assert newLowWatermark.compareTo(previousLowWatermark) > 0 :
-                    "lower watermark should be growing: [previous=" + 
previousLowWatermark + ", new=" + newLowWatermark + ']';
-
-            return newLowWatermark;
-        });
-    }
 }
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
index 8759c6d0ad..3afafbaa0f 100644
--- 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
+++ 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
@@ -67,7 +67,6 @@ import org.apache.ignite.internal.hlc.TestClockService;
 import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.lowwatermark.TestLowWatermark;
-import org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.network.ClusterNodeImpl;
 import org.apache.ignite.internal.network.ClusterService;
@@ -244,8 +243,6 @@ public class TxManagerTest extends IgniteAbstractTest {
 
     @Test
     void testUpdateLowerWatermark() {
-        
verify(lowWatermark).listen(eq(LowWatermarkEvent.LOW_WATERMARK_BEFORE_CHANGE), 
any());
-
         // Let's check the absence of transactions.
         assertThat(lowWatermark.updateAndNotify(clockService.now()), 
willSucceedFast());
 
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java
index 741a89a5f8..551f4c2216 100644
--- 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java
+++ 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java
@@ -21,6 +21,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
@@ -46,7 +47,8 @@ class ReadOnlyTransactionImplTest extends 
BaseIgniteAbstractTest {
                 new HybridTimestampTracker(),
                 txId,
                 new UUID(1, 2),
-                readTimestamp
+                readTimestamp,
+                new CompletableFuture<>()
         );
 
         assertThat(tx.startTimestamp(), is(readTimestamp));


Reply via email to