This is an automated email from the ASF dual-hosted git repository. ibessonov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 7ee03fdfe6 IGNITE-20316 Fix possible lost updates in MetastorageInhibitor (#2521) 7ee03fdfe6 is described below commit 7ee03fdfe6ef6f1b5ffb8fb986511e390259fa1b Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com> AuthorDate: Thu Aug 31 12:18:49 2023 +0400 IGNITE-20316 Fix possible lost updates in MetastorageInhibitor (#2521) --- .../internal/test/WatchListenerInhibitor.java | 36 +++++++++++++++++----- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java index dc3f880ee1..8e5b5792d1 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java @@ -22,6 +22,7 @@ import static org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldV import java.lang.reflect.Field; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.ReadWriteLock; import org.apache.ignite.Ignite; import org.apache.ignite.internal.app.IgniteImpl; import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl; @@ -34,7 +35,11 @@ import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValue public class WatchListenerInhibitor { private final WatchProcessor watchProcessor; - private final Field notificationFutureField; + private final RocksDbKeyValueStorage storage; + + private final Field processorNotificationFutureField; + + private final Field storageRwLockField; /** Future used to block the watch notification thread. */ private final CompletableFuture<Void> inhibitFuture = new CompletableFuture<>(); @@ -46,19 +51,24 @@ public class WatchListenerInhibitor { * @return Listener inhibitor. */ public static WatchListenerInhibitor metastorageEventsInhibitor(Ignite ignite) { - //TODO: IGNITE-15723 After a component factory will be implemented, need to got rid of reflection here. - var metaStorageManager = (MetaStorageManagerImpl) getFieldValue(ignite, IgniteImpl.class, "metaStorageMgr"); + IgniteImpl igniteImpl = (IgniteImpl) ignite; + + var metaStorageManager = (MetaStorageManagerImpl) igniteImpl.metaStorageManager(); + //TODO: IGNITE-15723 After a component factory is implemented, need to got rid of reflection here. var storage = (RocksDbKeyValueStorage) getFieldValue(metaStorageManager, MetaStorageManagerImpl.class, "storage"); var watchProcessor = (WatchProcessor) getFieldValue(storage, RocksDbKeyValueStorage.class, "watchProcessor"); - return new WatchListenerInhibitor(watchProcessor); + return new WatchListenerInhibitor(watchProcessor, storage); } - private WatchListenerInhibitor(WatchProcessor watchProcessor) { + private WatchListenerInhibitor(WatchProcessor watchProcessor, RocksDbKeyValueStorage storage) { this.watchProcessor = watchProcessor; - this.notificationFutureField = getField(watchProcessor, WatchProcessor.class, "notificationFuture"); + this.storage = storage; + + processorNotificationFutureField = getField(watchProcessor, WatchProcessor.class, "notificationFuture"); + storageRwLockField = getField(storage, RocksDbKeyValueStorage.class, "rwLock"); } /** @@ -66,9 +76,19 @@ public class WatchListenerInhibitor { */ public void startInhibit() { try { - CompletableFuture<Void> notificationFuture = (CompletableFuture<Void>) notificationFutureField.get(watchProcessor); + // We take this lock because it's actually used by RocksDbKeyValueStorage, among other things, to make future chaining + // correct wrt concurrency. + ReadWriteLock rwLock = (ReadWriteLock) storageRwLockField.get(storage); + + rwLock.writeLock().lock(); + + try { + CompletableFuture<Void> notificationFuture = (CompletableFuture<Void>) processorNotificationFutureField.get(watchProcessor); - notificationFutureField.set(watchProcessor, notificationFuture.thenCompose(v -> inhibitFuture)); + processorNotificationFutureField.set(watchProcessor, notificationFuture.thenCompose(v -> inhibitFuture)); + } finally { + rwLock.writeLock().unlock(); + } } catch (IllegalAccessException e) { throw new RuntimeException(e); }