This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 33c16499ae IGNITE-23193 Introduce Low Watermark lock API (#4498)
33c16499ae is described below
commit 33c16499ae657fff9ebf7d9cda2ba41b9653f1bf
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Mon Oct 7 13:19:07 2024 +0300
IGNITE-23193 Introduce Low Watermark lock API (#4498)
* Introduce `LowWatermark.tryLock` and `unlock` methods as a cleaner way to
"hold" the watermark (prevent it from changing above certain value)
* Remove `LOW_WATERMARK_BEFORE_CHANGE` event
* Remove `TxManagerImpl.readOnlyTxFutureById` map and
`lowWatermarkValueReference`
Previously, `LOW_WATERMARK_BEFORE_CHANGE` listener was used to prevent
watermark update while there are active read-only transactions with a lower
timestamp.
---
.../ignite/internal/lowwatermark/LowWatermark.java | 17 ++++
.../internal/lowwatermark/LowWatermarkImpl.java | 99 +++++++++++++++++-----
...owWatermarkEvent.java => LowWatermarkLock.java} | 34 ++++----
.../lowwatermark/event/LowWatermarkEvent.java | 7 --
.../lowwatermark/LowWatermarkImplTest.java | 67 ---------------
.../internal/lowwatermark/TestLowWatermark.java | 69 +++++++++++++--
.../internal/tx/impl/ReadOnlyTransactionImpl.java | 13 ++-
.../ignite/internal/tx/impl/TxManagerImpl.java | 97 +++------------------
.../apache/ignite/internal/tx/TxManagerTest.java | 3 -
.../tx/impl/ReadOnlyTransactionImplTest.java | 4 +-
10 files changed, 201 insertions(+), 209 deletions(-)
diff --git
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermark.java
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermark.java
index 7d89c41720..5d79d0a7c5 100644
---
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermark.java
+++
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermark.java
@@ -49,4 +49,21 @@ public interface LowWatermark extends
EventProducer<LowWatermarkEvent, LowWaterm
* @param newLowWatermark Candidate for update.
*/
void updateLowWatermark(HybridTimestamp newLowWatermark);
+
+ /**
+ * Locks the low watermark at the provided timestamp (prevents it from
being updated to a value higher than the provided one).
+ *
+ * @param lockId Lock id.
+ * @param ts Timestamp to lock.
+ * @return True if the lock was acquired,
+ * false if the lock was not acquired due to the low watermark being
higher than the provided timestamp.
+ */
+ boolean tryLock(Object lockId, HybridTimestamp ts);
+
+ /**
+ * Releases the lock created by {@link #tryLock(Object, HybridTimestamp)}.
+ *
+ * @param lockId Lock id.
+ */
+ void unlock(Object lockId);
}
diff --git
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java
index d3d343d6ec..eb37ffe913 100644
---
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java
+++
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java
@@ -20,13 +20,14 @@ package org.apache.ignite.internal.lowwatermark;
import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
import static org.apache.ignite.internal.hlc.HybridTimestamp.MIN_VALUE;
import static
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
-import static
org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent.LOW_WATERMARK_BEFORE_CHANGE;
import static
org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent.LOW_WATERMARK_CHANGED;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -120,6 +121,8 @@ public class LowWatermarkImpl extends
AbstractEventProducer<LowWatermarkEvent, L
new LowWatermarkCandidate(MIN_VALUE, nullCompletedFuture())
);
+ private final Map<Object, LowWatermarkLock> locks = new
ConcurrentHashMap<>();
+
/**
* Constructor.
*
@@ -233,8 +236,10 @@ public class LowWatermarkImpl extends
AbstractEventProducer<LowWatermarkEvent, L
updateLowWatermarkLock.writeLock().lock();
try {
- assert lowWatermark == null ||
newLowWatermark.compareTo(lowWatermark) > 0 :
- "Low watermark should only grow: [cur=" + lowWatermark +
", new=" + newLowWatermark + "]";
+ HybridTimestamp lwm = lowWatermark;
+
+ assert lwm == null || newLowWatermark.compareTo(lwm) > 0 :
+ "Low watermark should only grow: [cur=" + lwm + ", new=" +
newLowWatermark + "]";
lowWatermark = newLowWatermark;
} finally {
@@ -296,31 +301,83 @@ public class LowWatermarkImpl extends
AbstractEventProducer<LowWatermarkEvent, L
});
}
+ @Override
+ public boolean tryLock(Object lockId, HybridTimestamp ts) {
+ return inBusyLock(busyLock, () -> {
+ updateLowWatermarkLock.readLock().lock();
+
+ try {
+ HybridTimestamp lwm = lowWatermark;
+ if (lwm != null && ts.compareTo(lwm) < 0) {
+ return false;
+ }
+
+ locks.put(lockId, new LowWatermarkLock(ts));
+
+ return true;
+ } finally {
+ updateLowWatermarkLock.readLock().unlock();
+ }
+ });
+ }
+
+ @Override
+ public void unlock(Object lockId) {
+ LowWatermarkLock lock = locks.remove(lockId);
+
+ if (lock == null) {
+ // Already released.
+ return;
+ }
+
+ lock.future().complete(null);
+ }
+
CompletableFuture<Void> updateAndNotify(HybridTimestamp newLowWatermark) {
- return inBusyLockAsync(busyLock, () ->
- fireEvent(LOW_WATERMARK_BEFORE_CHANGE, new
ChangeLowWatermarkEventParameters(newLowWatermark))
- .thenComposeAsync(unused -> {
- vaultManager.put(LOW_WATERMARK_VAULT_KEY,
ByteUtils.toBytes(newLowWatermark));
+ return inBusyLockAsync(busyLock, () -> {
+ vaultManager.put(LOW_WATERMARK_VAULT_KEY,
ByteUtils.toBytes(newLowWatermark));
- setLowWatermark(newLowWatermark);
+ return waitForLocksAndSetLowWatermark(newLowWatermark)
+ .thenComposeAsync(unused2 -> fireEvent(
+ LOW_WATERMARK_CHANGED,
+ new
ChangeLowWatermarkEventParameters(newLowWatermark)), scheduledThreadPool)
+ .whenCompleteAsync((unused, throwable) -> {
+ if (throwable != null) {
+ if (!(throwable instanceof
NodeStoppingException)) {
+ LOG.error("Failed to update low
watermark, will schedule again: {}", throwable, newLowWatermark);
- return fireEvent(LOW_WATERMARK_CHANGED, new
ChangeLowWatermarkEventParameters(newLowWatermark));
- }, scheduledThreadPool)
- .whenCompleteAsync((unused, throwable) -> {
- if (throwable != null) {
- if (!(throwable instanceof
NodeStoppingException)) {
- LOG.error("Failed to update low watermark,
will schedule again: {}", throwable, newLowWatermark);
+ failureManager.process(new
FailureContext(CRITICAL_ERROR, throwable));
- failureManager.process(new
FailureContext(CRITICAL_ERROR, throwable));
+ inBusyLock(busyLock,
this::scheduleUpdateLowWatermarkBusy);
+ }
+ } else {
+ LOG.info("Successful low watermark update:
{}", newLowWatermark);
inBusyLock(busyLock,
this::scheduleUpdateLowWatermarkBusy);
}
- } else {
- LOG.info("Successful low watermark update:
{}", newLowWatermark);
-
- inBusyLock(busyLock,
this::scheduleUpdateLowWatermarkBusy);
- }
- }, scheduledThreadPool)
+ }, scheduledThreadPool);
+ }
);
}
+
+ private CompletableFuture<Void>
waitForLocksAndSetLowWatermark(HybridTimestamp newLowWatermark) {
+ return inBusyLockAsync(busyLock, () -> {
+ // Write lock so no new LWM locks can be added.
+ updateLowWatermarkLock.writeLock().lock();
+
+ try {
+ for (LowWatermarkLock lock : locks.values()) {
+ if (lock.timestamp().compareTo(newLowWatermark) < 0) {
+ return lock.future().thenCompose(unused ->
waitForLocksAndSetLowWatermark(newLowWatermark));
+ }
+ }
+
+ setLowWatermark(newLowWatermark);
+
+ return nullCompletedFuture();
+ } finally {
+ updateLowWatermarkLock.writeLock().unlock();
+ }
+ });
+ }
}
diff --git
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/event/LowWatermarkEvent.java
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkLock.java
similarity index 59%
copy from
modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/event/LowWatermarkEvent.java
copy to
modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkLock.java
index 216a5c0d63..d6046e9a8f 100644
---
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/event/LowWatermarkEvent.java
+++
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkLock.java
@@ -15,23 +15,25 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.lowwatermark.event;
+package org.apache.ignite.internal.lowwatermark;
-import org.apache.ignite.internal.event.Event;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
-/** Low watermark events. */
-public enum LowWatermarkEvent implements Event {
- /**
- * This event is fired before the low watermark changes.
- *
- * @see ChangeLowWatermarkEventParameters
- */
- LOW_WATERMARK_BEFORE_CHANGE,
+final class LowWatermarkLock {
+ private final HybridTimestamp timestamp;
- /**
- * This event is fired on a low watermark change.
- *
- * @see ChangeLowWatermarkEventParameters
- */
- LOW_WATERMARK_CHANGED
+ private final CompletableFuture<Void> future = new CompletableFuture<>();
+
+ LowWatermarkLock(HybridTimestamp timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public HybridTimestamp timestamp() {
+ return timestamp;
+ }
+
+ CompletableFuture<Void> future() {
+ return future;
+ }
}
diff --git
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/event/LowWatermarkEvent.java
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/event/LowWatermarkEvent.java
index 216a5c0d63..9299bee188 100644
---
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/event/LowWatermarkEvent.java
+++
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/event/LowWatermarkEvent.java
@@ -21,13 +21,6 @@ import org.apache.ignite.internal.event.Event;
/** Low watermark events. */
public enum LowWatermarkEvent implements Event {
- /**
- * This event is fired before the low watermark changes.
- *
- * @see ChangeLowWatermarkEventParameters
- */
- LOW_WATERMARK_BEFORE_CHANGE,
-
/**
* This event is fired on a low watermark change.
*
diff --git
a/modules/low-watermark/src/test/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImplTest.java
b/modules/low-watermark/src/test/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImplTest.java
index e5e5dd5b1a..fc2083ba68 100644
---
a/modules/low-watermark/src/test/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImplTest.java
+++
b/modules/low-watermark/src/test/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImplTest.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.lowwatermark;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.stream.Collectors.toList;
import static
org.apache.ignite.internal.lowwatermark.LowWatermarkImpl.LOW_WATERMARK_VAULT_KEY;
-import static
org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent.LOW_WATERMARK_BEFORE_CHANGE;
import static
org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent.LOW_WATERMARK_CHANGED;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutFast;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
@@ -325,72 +324,6 @@ public class LowWatermarkImplTest extends
BaseIgniteAbstractTest {
assertThat(lowWatermark.updateAndNotify(newLwm),
willCompleteSuccessfully());
}
- @Test
- void testSequenceEventsOfLowWatermarkChange() {
- HybridTimestamp newLowWatermarkCandidate =
lowWatermark.createNewLowWatermarkCandidate();
-
- EventListener<ChangeLowWatermarkEventParameters> beforeChangeListener
= spy(new EventListener<ChangeLowWatermarkEventParameters>() {
- @Override
- public CompletableFuture<Boolean>
notify(ChangeLowWatermarkEventParameters parameters) {
- assertEquals(newLowWatermarkCandidate,
parameters.newLowWatermark());
-
- return trueCompletedFuture();
- }
- });
-
- EventListener<ChangeLowWatermarkEventParameters> changedListener =
spy(new EventListener<ChangeLowWatermarkEventParameters>() {
- @Override
- public CompletableFuture<Boolean>
notify(ChangeLowWatermarkEventParameters parameters) {
- assertEquals(newLowWatermarkCandidate,
parameters.newLowWatermark());
-
- return trueCompletedFuture();
- }
- });
-
- lowWatermark.listen(LOW_WATERMARK_BEFORE_CHANGE, beforeChangeListener);
- lowWatermark.listen(LOW_WATERMARK_CHANGED, changedListener);
-
- InOrder inOrder = inOrder(beforeChangeListener, changedListener);
-
- assertThat(lowWatermark.updateAndNotify(newLowWatermarkCandidate),
willCompleteSuccessfully());
-
- inOrder.verify(beforeChangeListener).notify(any());
- inOrder.verify(changedListener).notify(any());
- }
-
- @Test
- void testChangedEvenCalledOnlyAfterCompletionBeforeChange() {
- var startBeforeChangeListenerFuture = new CompletableFuture<Void>();
- var startChangedListenerFuture = new CompletableFuture<Void>();
-
- var finishBeforeChangeListenerFuture = new CompletableFuture<Void>();
-
- EventListener<ChangeLowWatermarkEventParameters> beforeChangeListener
= parameters -> {
- startBeforeChangeListenerFuture.complete(null);
-
- return finishBeforeChangeListenerFuture.thenApply(unused -> true);
- };
-
- EventListener<ChangeLowWatermarkEventParameters> changedListener =
parameters -> {
- startChangedListenerFuture.complete(null);
-
- return trueCompletedFuture();
- };
-
- lowWatermark.listen(LOW_WATERMARK_BEFORE_CHANGE, beforeChangeListener);
- lowWatermark.listen(LOW_WATERMARK_CHANGED, changedListener);
-
- CompletableFuture<Void> updateAndNotifyFuture =
lowWatermark.updateAndNotify(lowWatermark.createNewLowWatermarkCandidate());
-
- assertThat(startBeforeChangeListenerFuture,
willCompleteSuccessfully());
- assertThat(startChangedListenerFuture, willTimeoutFast());
- assertFalse(updateAndNotifyFuture.isDone());
-
- finishBeforeChangeListenerFuture.complete(null);
- assertThat(startChangedListenerFuture, willCompleteSuccessfully());
- assertThat(updateAndNotifyFuture, willCompleteSuccessfully());
- }
-
private CompletableFuture<HybridTimestamp> listenUpdateLowWatermark() {
var future = new CompletableFuture<HybridTimestamp>();
diff --git
a/modules/low-watermark/src/testFixtures/java/org/apache/ignite/internal/lowwatermark/TestLowWatermark.java
b/modules/low-watermark/src/testFixtures/java/org/apache/ignite/internal/lowwatermark/TestLowWatermark.java
index eae8d89a91..8fbd949823 100644
---
a/modules/low-watermark/src/testFixtures/java/org/apache/ignite/internal/lowwatermark/TestLowWatermark.java
+++
b/modules/low-watermark/src/testFixtures/java/org/apache/ignite/internal/lowwatermark/TestLowWatermark.java
@@ -19,12 +19,14 @@ package org.apache.ignite.internal.lowwatermark;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.concurrent.CompletableFuture.supplyAsync;
-import static
org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent.LOW_WATERMARK_BEFORE_CHANGE;
import static
org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent.LOW_WATERMARK_CHANGED;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
@@ -49,6 +51,8 @@ public class TestLowWatermark extends
AbstractEventProducer<LowWatermarkEvent, L
private final ReadWriteLock updateLowWatermarkLock = new
ReentrantReadWriteLock();
+ private final Map<Object, LowWatermarkLock> locks = new
ConcurrentHashMap<>();
+
@Override
public @Nullable HybridTimestamp getLowWatermark() {
return ts;
@@ -67,7 +71,9 @@ public class TestLowWatermark extends
AbstractEventProducer<LowWatermarkEvent, L
@Override
public void updateLowWatermark(HybridTimestamp newLowWatermark) {
- if (ts == null || newLowWatermark.compareTo(ts) > 0) {
+ var currentTs = ts;
+
+ if (currentTs == null || newLowWatermark.compareTo(currentTs) > 0) {
supplyAsync(() -> updateAndNotifyInternal(newLowWatermark))
.thenCompose(Function.identity())
.whenComplete((unused, throwable) -> {
@@ -78,6 +84,36 @@ public class TestLowWatermark extends
AbstractEventProducer<LowWatermarkEvent, L
}
}
+ @Override
+ public boolean tryLock(Object lockId, HybridTimestamp lockTs) {
+ updateLowWatermarkLock.readLock().lock();
+
+ try {
+ HybridTimestamp lwm = ts;
+ if (lwm != null && lockTs.compareTo(lwm) < 0) {
+ return false;
+ }
+
+ locks.put(lockId, new LowWatermarkLock(lockTs));
+
+ return true;
+ } finally {
+ updateLowWatermarkLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public void unlock(Object lockId) {
+ LowWatermarkLock lock = locks.remove(lockId);
+
+ if (lock == null) {
+ // Already released.
+ return;
+ }
+
+ lock.future().complete(null);
+ }
+
/**
* Update low watermark and notify listeners.
*
@@ -88,7 +124,9 @@ public class TestLowWatermark extends
AbstractEventProducer<LowWatermarkEvent, L
try {
assertNotNull(newTs);
- assertTrue(ts == null || ts.longValue() < newTs.longValue(), "ts="
+ ts + ", newTs=" + newTs);
+ HybridTimestamp currentTs = ts;
+
+ assertTrue(currentTs == null || currentTs.longValue() <
newTs.longValue(), "ts=" + currentTs + ", newTs=" + newTs);
return updateAndNotifyInternal(newTs);
} catch (Throwable t) {
@@ -114,11 +152,26 @@ public class TestLowWatermark extends
AbstractEventProducer<LowWatermarkEvent, L
private CompletableFuture<Void> updateAndNotifyInternal(HybridTimestamp
newLowWatermark) {
var parameters = new
ChangeLowWatermarkEventParameters(newLowWatermark);
- return fireEvent(LOW_WATERMARK_BEFORE_CHANGE, parameters)
- .thenCompose(unused -> {
- setLowWatermark(newLowWatermark);
+ return waitForLocksAndSetLowWatermark(newLowWatermark)
+ .thenCompose(unused -> fireEvent(LOW_WATERMARK_CHANGED,
parameters));
+ }
+
+ private CompletableFuture<Void>
waitForLocksAndSetLowWatermark(HybridTimestamp newLowWatermark) {
+ // Write lock so no new LWM locks can be added.
+ updateLowWatermarkLock.writeLock().lock();
+
+ try {
+ for (LowWatermarkLock lock : locks.values()) {
+ if (lock.timestamp().compareTo(newLowWatermark) <= 0) {
+ return lock.future().thenCompose(unused ->
waitForLocksAndSetLowWatermark(newLowWatermark));
+ }
+ }
+
+ setLowWatermark(newLowWatermark);
- return fireEvent(LOW_WATERMARK_CHANGED, parameters);
- });
+ return nullCompletedFuture();
+ } finally {
+ updateLowWatermarkLock.writeLock().unlock();
+ }
}
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
index 9237802fc7..e3f8d8d61b 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
@@ -41,6 +41,9 @@ class ReadOnlyTransactionImpl extends
IgniteAbstractTransactionImpl {
/** The tracker is used to track an observable timestamp. */
private final HybridTimestampTracker observableTsTracker;
+ /** Transaction future. */
+ private final CompletableFuture<Void> txFuture;
+
/**
* The constructor.
*
@@ -55,12 +58,14 @@ class ReadOnlyTransactionImpl extends
IgniteAbstractTransactionImpl {
HybridTimestampTracker observableTsTracker,
UUID id,
UUID txCoordinatorId,
- HybridTimestamp readTimestamp
+ HybridTimestamp readTimestamp,
+ CompletableFuture<Void> txFuture
) {
super(txManager, id, txCoordinatorId);
this.readTimestamp = readTimestamp;
this.observableTsTracker = observableTsTracker;
+ this.txFuture = txFuture;
}
@Override
@@ -114,6 +119,10 @@ class ReadOnlyTransactionImpl extends
IgniteAbstractTransactionImpl {
observableTsTracker.update(executionTimestamp);
- return ((TxManagerImpl)
txManager).completeReadOnlyTransactionFuture(new
TxIdAndTimestamp(readTimestamp, id()));
+ txFuture.complete(null);
+
+ ((TxManagerImpl) txManager).completeReadOnlyTransactionFuture(new
TxIdAndTimestamp(readTimestamp, id()));
+
+ return txFuture;
}
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 9243d96d7f..d8bae15056 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -39,15 +39,11 @@ import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_READ_ONLY_TOO_O
import java.io.IOException;
import java.util.Collection;
-import java.util.Comparator;
-import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
@@ -56,7 +52,6 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -70,8 +65,6 @@ import
org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.lowwatermark.LowWatermark;
-import
org.apache.ignite.internal.lowwatermark.event.ChangeLowWatermarkEventParameters;
-import org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.MessagingService;
@@ -138,17 +131,6 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
/** The local state storage. */
private final VolatileTxStateMetaStorage txStateVolatileStorage = new
VolatileTxStateMetaStorage();
- /** Future of a read-only transaction by it {@link TxIdAndTimestamp}. */
- private final ConcurrentNavigableMap<TxIdAndTimestamp,
CompletableFuture<Void>> readOnlyTxFutureById = new ConcurrentSkipListMap<>(
-
Comparator.comparing(TxIdAndTimestamp::getReadTimestamp).thenComparing(TxIdAndTimestamp::getTxId)
- );
-
- /**
- * Low watermark value, does not allow creating read-only transactions
less than or equal to this value, {@code null} means it has never
- * been updated yet.
- */
- private final AtomicReference<HybridTimestamp> lowWatermarkValueReference
= new AtomicReference<>();
-
/** Low watermark. */
private final LowWatermark lowWatermark;
@@ -201,8 +183,6 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
private final EventListener<PrimaryReplicaEventParameters>
primaryReplicaElectedListener;
- private final EventListener<ChangeLowWatermarkEventParameters>
lowWatermarkChangedListener = this::onLwnChanged;
-
/** Counter of read-write transactions that were created and completed
locally on the node. */
private final LocalRwTxCounter localRwTxCounter;
@@ -412,34 +392,24 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
? HybridTimestamp.max(observableTimestamp,
currentReadTimestamp(beginTimestamp))
: currentReadTimestamp(beginTimestamp);
- TxIdAndTimestamp txIdAndTimestamp = new
TxIdAndTimestamp(readTimestamp, txId);
-
- CompletableFuture<Void> txFuture = new CompletableFuture<>();
-
- CompletableFuture<Void> oldFuture =
readOnlyTxFutureById.put(txIdAndTimestamp, txFuture);
- assert oldFuture == null : "previous transaction has not completed
yet: " + txIdAndTimestamp;
-
- HybridTimestamp lowWatermark = this.lowWatermarkValueReference.get();
-
- if (lowWatermark != null && readTimestamp.compareTo(lowWatermark) <=
0) {
- // "updateLowWatermark" method updates "this.lowWatermark" field,
and only then scans "this.readOnlyTxFutureById" for old
- // transactions to wait. In order for that code to work safely, we
have to make sure that no "too old" transactions will be
- // created here in "begin" method after "this.lowWatermark" is
already updated. The simplest way to achieve that is to check
- // LW after we add transaction to the map (adding transaction to
the map before reading LW value, of course).
- readOnlyTxFutureById.remove(txIdAndTimestamp);
-
- // Completing the future is necessary, because
"updateLowWatermark" method may already wait for it if race condition happened.
- txFuture.complete(null);
-
+ boolean lockAcquired = lowWatermark.tryLock(txId, readTimestamp);
+ if (!lockAcquired) {
throw new IgniteInternalException(
TX_READ_ONLY_TOO_OLD_ERR,
"Timestamp of read-only transaction must be greater than
the low watermark: [txTimestamp={}, lowWatermark={}]",
readTimestamp,
- lowWatermark
- );
+ lowWatermark.getLowWatermark());
}
- return new ReadOnlyTransactionImpl(this, timestampTracker, txId,
localNodeId, readTimestamp);
+ try {
+ CompletableFuture<Void> txFuture = new CompletableFuture<>();
+ txFuture.whenComplete((unused, throwable) ->
lowWatermark.unlock(txId));
+
+ return new ReadOnlyTransactionImpl(this, timestampTracker, txId,
localNodeId, readTimestamp, txFuture);
+ } catch (Throwable t) {
+ lowWatermark.unlock(txId);
+ throw t;
+ }
}
/**
@@ -786,10 +756,6 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED,
primaryReplicaElectedListener);
- lowWatermark.listen(LowWatermarkEvent.LOW_WATERMARK_BEFORE_CHANGE,
lowWatermarkChangedListener);
-
- lowWatermarkValueReference.set(lowWatermark.getLowWatermark());
-
return nullCompletedFuture();
});
}
@@ -815,8 +781,6 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
placementDriver.removeListener(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED,
primaryReplicaElectedListener);
-
lowWatermark.removeListener(LowWatermarkEvent.LOW_WATERMARK_BEFORE_CHANGE,
lowWatermarkChangedListener);
-
shutdownAndAwaitTermination(writeIntentSwitchPool, 10,
TimeUnit.SECONDS);
return nullCompletedFuture();
@@ -871,34 +835,12 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
return runAsync(runnable, writeIntentSwitchPool);
}
- CompletableFuture<Void> completeReadOnlyTransactionFuture(TxIdAndTimestamp
txIdAndTimestamp) {
+ void completeReadOnlyTransactionFuture(TxIdAndTimestamp txIdAndTimestamp) {
finishedTxs.add(1);
- CompletableFuture<Void> readOnlyTxFuture =
readOnlyTxFutureById.remove(txIdAndTimestamp);
-
- assert readOnlyTxFuture != null : txIdAndTimestamp;
-
- readOnlyTxFuture.complete(null);
-
UUID txId = txIdAndTimestamp.getTxId();
transactionInflights.markReadOnlyTxFinished(txId);
-
- return readOnlyTxFuture;
- }
-
- private CompletableFuture<Boolean>
onLwnChanged(ChangeLowWatermarkEventParameters parameters) {
- return inBusyLockAsync(busyLock, () -> {
- HybridTimestamp newLowWatermark = parameters.newLowWatermark();
-
- increaseLowWatermarkValueReferenceBusy(newLowWatermark);
-
- TxIdAndTimestamp upperBound = new
TxIdAndTimestamp(newLowWatermark, new UUID(Long.MAX_VALUE, Long.MAX_VALUE));
-
- List<CompletableFuture<Void>> readOnlyTxFutures =
List.copyOf(readOnlyTxFutureById.headMap(upperBound, true).values());
-
- return
allOf(readOnlyTxFutures.toArray(CompletableFuture[]::new)).thenApply(unused ->
false);
- });
}
@Override
@@ -1018,17 +960,4 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
return null;
});
}
-
- private void increaseLowWatermarkValueReferenceBusy(HybridTimestamp
newLowWatermark) {
- lowWatermarkValueReference.updateAndGet(previousLowWatermark -> {
- if (previousLowWatermark == null) {
- return newLowWatermark;
- }
-
- assert newLowWatermark.compareTo(previousLowWatermark) > 0 :
- "lower watermark should be growing: [previous=" +
previousLowWatermark + ", new=" + newLowWatermark + ']';
-
- return newLowWatermark;
- });
- }
}
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
index 8759c6d0ad..3afafbaa0f 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
@@ -67,7 +67,6 @@ import org.apache.ignite.internal.hlc.TestClockService;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lowwatermark.TestLowWatermark;
-import org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.network.ClusterNodeImpl;
import org.apache.ignite.internal.network.ClusterService;
@@ -244,8 +243,6 @@ public class TxManagerTest extends IgniteAbstractTest {
@Test
void testUpdateLowerWatermark() {
-
verify(lowWatermark).listen(eq(LowWatermarkEvent.LOW_WATERMARK_BEFORE_CHANGE),
any());
-
// Let's check the absence of transactions.
assertThat(lowWatermark.updateAndNotify(clockService.now()),
willSucceedFast());
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java
index 741a89a5f8..551f4c2216 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java
@@ -21,6 +21,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
@@ -46,7 +47,8 @@ class ReadOnlyTransactionImplTest extends
BaseIgniteAbstractTest {
new HybridTimestampTracker(),
txId,
new UUID(1, 2),
- readTimestamp
+ readTimestamp,
+ new CompletableFuture<>()
);
assertThat(tx.startTimestamp(), is(readTimestamp));