This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 7ee03fdfe6 IGNITE-20316 Fix possible lost updates in 
MetastorageInhibitor (#2521)
7ee03fdfe6 is described below

commit 7ee03fdfe6ef6f1b5ffb8fb986511e390259fa1b
Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com>
AuthorDate: Thu Aug 31 12:18:49 2023 +0400

    IGNITE-20316 Fix possible lost updates in MetastorageInhibitor (#2521)
---
 .../internal/test/WatchListenerInhibitor.java      | 36 +++++++++++++++++-----
 1 file changed, 28 insertions(+), 8 deletions(-)

diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java
index dc3f880ee1..8e5b5792d1 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java
@@ -22,6 +22,7 @@ import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldV
 
 import java.lang.reflect.Field;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.ReadWriteLock;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
@@ -34,7 +35,11 @@ import 
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValue
 public class WatchListenerInhibitor {
     private final WatchProcessor watchProcessor;
 
-    private final Field notificationFutureField;
+    private final RocksDbKeyValueStorage storage;
+
+    private final Field processorNotificationFutureField;
+
+    private final Field storageRwLockField;
 
     /** Future used to block the watch notification thread. */
     private final CompletableFuture<Void> inhibitFuture = new 
CompletableFuture<>();
@@ -46,19 +51,24 @@ public class WatchListenerInhibitor {
      * @return Listener inhibitor.
      */
     public static WatchListenerInhibitor metastorageEventsInhibitor(Ignite 
ignite) {
-        //TODO: IGNITE-15723 After a component factory will be implemented, 
need to got rid of reflection here.
-        var metaStorageManager = (MetaStorageManagerImpl) 
getFieldValue(ignite, IgniteImpl.class, "metaStorageMgr");
+        IgniteImpl igniteImpl = (IgniteImpl) ignite;
+
+        var metaStorageManager = (MetaStorageManagerImpl) 
igniteImpl.metaStorageManager();
 
+        //TODO: IGNITE-15723 After a component factory is implemented, need to 
got rid of reflection here.
         var storage = (RocksDbKeyValueStorage) 
getFieldValue(metaStorageManager, MetaStorageManagerImpl.class, "storage");
 
         var watchProcessor = (WatchProcessor) getFieldValue(storage, 
RocksDbKeyValueStorage.class, "watchProcessor");
 
-        return new WatchListenerInhibitor(watchProcessor);
+        return new WatchListenerInhibitor(watchProcessor, storage);
     }
 
-    private WatchListenerInhibitor(WatchProcessor watchProcessor) {
+    private WatchListenerInhibitor(WatchProcessor watchProcessor, 
RocksDbKeyValueStorage storage) {
         this.watchProcessor = watchProcessor;
-        this.notificationFutureField = getField(watchProcessor, 
WatchProcessor.class, "notificationFuture");
+        this.storage = storage;
+
+        processorNotificationFutureField = getField(watchProcessor, 
WatchProcessor.class, "notificationFuture");
+        storageRwLockField = getField(storage, RocksDbKeyValueStorage.class, 
"rwLock");
     }
 
     /**
@@ -66,9 +76,19 @@ public class WatchListenerInhibitor {
      */
     public void startInhibit() {
         try {
-            CompletableFuture<Void> notificationFuture = 
(CompletableFuture<Void>) notificationFutureField.get(watchProcessor);
+            // We take this lock because it's actually used by 
RocksDbKeyValueStorage, among other things, to make future chaining
+            // correct wrt concurrency.
+            ReadWriteLock rwLock = (ReadWriteLock) 
storageRwLockField.get(storage);
+
+            rwLock.writeLock().lock();
+
+            try {
+                CompletableFuture<Void> notificationFuture = 
(CompletableFuture<Void>) processorNotificationFutureField.get(watchProcessor);
 
-            notificationFutureField.set(watchProcessor, 
notificationFuture.thenCompose(v -> inhibitFuture));
+                processorNotificationFutureField.set(watchProcessor, 
notificationFuture.thenCompose(v -> inhibitFuture));
+            } finally {
+                rwLock.writeLock().unlock();
+            }
         } catch (IllegalAccessException e) {
             throw new RuntimeException(e);
         }

Reply via email to