This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 7ee03fdfe6 IGNITE-20316 Fix possible lost updates in
MetastorageInhibitor (#2521)
7ee03fdfe6 is described below
commit 7ee03fdfe6ef6f1b5ffb8fb986511e390259fa1b
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Thu Aug 31 12:18:49 2023 +0400
IGNITE-20316 Fix possible lost updates in MetastorageInhibitor (#2521)
---
.../internal/test/WatchListenerInhibitor.java | 36 +++++++++++++++++-----
1 file changed, 28 insertions(+), 8 deletions(-)
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java
index dc3f880ee1..8e5b5792d1 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java
@@ -22,6 +22,7 @@ import static
org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldV
import java.lang.reflect.Field;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.ReadWriteLock;
import org.apache.ignite.Ignite;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
@@ -34,7 +35,11 @@ import
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValue
public class WatchListenerInhibitor {
private final WatchProcessor watchProcessor;
- private final Field notificationFutureField;
+ private final RocksDbKeyValueStorage storage;
+
+ private final Field processorNotificationFutureField;
+
+ private final Field storageRwLockField;
/** Future used to block the watch notification thread. */
private final CompletableFuture<Void> inhibitFuture = new
CompletableFuture<>();
@@ -46,19 +51,24 @@ public class WatchListenerInhibitor {
* @return Listener inhibitor.
*/
public static WatchListenerInhibitor metastorageEventsInhibitor(Ignite
ignite) {
- //TODO: IGNITE-15723 After a component factory will be implemented,
need to got rid of reflection here.
- var metaStorageManager = (MetaStorageManagerImpl)
getFieldValue(ignite, IgniteImpl.class, "metaStorageMgr");
+ IgniteImpl igniteImpl = (IgniteImpl) ignite;
+
+ var metaStorageManager = (MetaStorageManagerImpl)
igniteImpl.metaStorageManager();
+ //TODO: IGNITE-15723 After a component factory is implemented, need to
got rid of reflection here.
var storage = (RocksDbKeyValueStorage)
getFieldValue(metaStorageManager, MetaStorageManagerImpl.class, "storage");
var watchProcessor = (WatchProcessor) getFieldValue(storage,
RocksDbKeyValueStorage.class, "watchProcessor");
- return new WatchListenerInhibitor(watchProcessor);
+ return new WatchListenerInhibitor(watchProcessor, storage);
}
- private WatchListenerInhibitor(WatchProcessor watchProcessor) {
+ private WatchListenerInhibitor(WatchProcessor watchProcessor,
RocksDbKeyValueStorage storage) {
this.watchProcessor = watchProcessor;
- this.notificationFutureField = getField(watchProcessor,
WatchProcessor.class, "notificationFuture");
+ this.storage = storage;
+
+ processorNotificationFutureField = getField(watchProcessor,
WatchProcessor.class, "notificationFuture");
+ storageRwLockField = getField(storage, RocksDbKeyValueStorage.class,
"rwLock");
}
/**
@@ -66,9 +76,19 @@ public class WatchListenerInhibitor {
*/
public void startInhibit() {
try {
- CompletableFuture<Void> notificationFuture =
(CompletableFuture<Void>) notificationFutureField.get(watchProcessor);
+ // We take this lock because it's actually used by
RocksDbKeyValueStorage, among other things, to make future chaining
+ // correct wrt concurrency.
+ ReadWriteLock rwLock = (ReadWriteLock)
storageRwLockField.get(storage);
+
+ rwLock.writeLock().lock();
+
+ try {
+ CompletableFuture<Void> notificationFuture =
(CompletableFuture<Void>) processorNotificationFutureField.get(watchProcessor);
- notificationFutureField.set(watchProcessor,
notificationFuture.thenCompose(v -> inhibitFuture));
+ processorNotificationFutureField.set(watchProcessor,
notificationFuture.thenCompose(v -> inhibitFuture));
+ } finally {
+ rwLock.writeLock().unlock();
+ }
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}