This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch ignite-18323 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 05cae9ba8098a7b3a316754fcb603b2c6230d595 Author: amashenkov <[email protected]> AuthorDate: Tue Dec 20 15:30:33 2022 +0300 wip. fix wrong busylock usage in MetaStorageManager --- .../internal/metastorage/MetaStorageManager.java | 46 +++++++++++++++------- 1 file changed, 31 insertions(+), 15 deletions(-) diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java index d84c5a82ad..39b57cdfce 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java @@ -32,6 +32,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; import org.apache.ignite.internal.manager.IgniteComponent; import org.apache.ignite.internal.metastorage.client.CompactedException; @@ -189,6 +190,7 @@ public class MetaStorageManager implements IgniteComponent { public void start() { this.metaStorageSvcFut = cmgMgr.metaStorageNodes() // use default executor to avoid blocking CMG manager threads + //TODO: avoid using default executor to prevent blocking operations in FJP !!! .thenComposeAsync(metaStorageNodes -> { if (!busyLock.enterBusy()) { return CompletableFuture.failedFuture(new NodeStoppingException()); @@ -385,7 +387,7 @@ public class MetaStorageManager implements IgniteComponent { } try { - return metaStorageSvcFut.thenCompose(svc -> svc.get(key)); + return metaStorageSvcFut.thenCompose(inBusy(svc -> svc.get(key))); } finally { busyLock.leaveBusy(); } @@ -402,7 +404,7 @@ public class MetaStorageManager implements IgniteComponent { } try { - return metaStorageSvcFut.thenCompose(svc -> svc.get(key, revUpperBound)); + return metaStorageSvcFut.thenCompose(inBusy(svc -> svc.get(key, revUpperBound))); } finally { busyLock.leaveBusy(); } @@ -419,7 +421,7 @@ public class MetaStorageManager implements IgniteComponent { } try { - return metaStorageSvcFut.thenCompose(svc -> svc.getAll(keys)); + return metaStorageSvcFut.thenCompose(inBusy(svc -> svc.getAll(keys))); } finally { busyLock.leaveBusy(); } @@ -436,7 +438,7 @@ public class MetaStorageManager implements IgniteComponent { } try { - return metaStorageSvcFut.thenCompose(svc -> svc.getAll(keys, revUpperBound)); + return metaStorageSvcFut.thenCompose(inBusy(svc -> svc.getAll(keys, revUpperBound))); } finally { busyLock.leaveBusy(); } @@ -453,7 +455,7 @@ public class MetaStorageManager implements IgniteComponent { } try { - return metaStorageSvcFut.thenCompose(svc -> svc.put(key, val)); + return metaStorageSvcFut.thenCompose(inBusy(svc -> svc.put(key, val))); } finally { busyLock.leaveBusy(); } @@ -470,7 +472,7 @@ public class MetaStorageManager implements IgniteComponent { } try { - return metaStorageSvcFut.thenCompose(svc -> svc.getAndPut(key, val)); + return metaStorageSvcFut.thenCompose(inBusy(svc -> svc.getAndPut(key, val))); } finally { busyLock.leaveBusy(); } @@ -487,7 +489,7 @@ public class MetaStorageManager implements IgniteComponent { } try { - return metaStorageSvcFut.thenCompose(svc -> svc.putAll(vals)); + return metaStorageSvcFut.thenCompose(inBusy(svc -> svc.putAll(vals))); } finally { busyLock.leaveBusy(); } @@ -504,7 +506,7 @@ public class MetaStorageManager implements IgniteComponent { } try { - return metaStorageSvcFut.thenCompose(svc -> svc.getAndPutAll(vals)); + return metaStorageSvcFut.thenCompose(inBusy(svc -> svc.getAndPutAll(vals))); } finally { busyLock.leaveBusy(); } @@ -521,7 +523,7 @@ public class MetaStorageManager implements IgniteComponent { } try { - return metaStorageSvcFut.thenCompose(svc -> svc.remove(key)); + return metaStorageSvcFut.thenCompose(inBusy(svc -> svc.remove(key))); } finally { busyLock.leaveBusy(); } @@ -538,7 +540,7 @@ public class MetaStorageManager implements IgniteComponent { } try { - return metaStorageSvcFut.thenCompose(svc -> svc.getAndRemove(key)); + return metaStorageSvcFut.thenCompose(inBusy(svc -> svc.getAndRemove(key))); } finally { busyLock.leaveBusy(); } @@ -555,7 +557,7 @@ public class MetaStorageManager implements IgniteComponent { } try { - return metaStorageSvcFut.thenCompose(svc -> svc.removeAll(keys)); + return metaStorageSvcFut.thenCompose(inBusy(svc -> svc.removeAll(keys))); } finally { busyLock.leaveBusy(); } @@ -572,7 +574,7 @@ public class MetaStorageManager implements IgniteComponent { } try { - return metaStorageSvcFut.thenCompose(svc -> svc.getAndRemoveAll(keys)); + return metaStorageSvcFut.thenCompose(inBusy(svc -> svc.getAndRemoveAll(keys))); } finally { busyLock.leaveBusy(); } @@ -593,7 +595,7 @@ public class MetaStorageManager implements IgniteComponent { } try { - return metaStorageSvcFut.thenCompose(svc -> svc.invoke(cond, success, failure)); + return metaStorageSvcFut.thenCompose(inBusy(svc -> svc.invoke(cond, success, failure))); } finally { busyLock.leaveBusy(); } @@ -614,7 +616,7 @@ public class MetaStorageManager implements IgniteComponent { } try { - return metaStorageSvcFut.thenCompose(svc -> svc.invoke(cond, success, failure)); + return metaStorageSvcFut.thenCompose(inBusy(svc -> svc.invoke(cond, success, failure))); } finally { busyLock.leaveBusy(); } @@ -631,7 +633,7 @@ public class MetaStorageManager implements IgniteComponent { } try { - return metaStorageSvcFut.thenCompose(svc -> svc.invoke(iif)); + return metaStorageSvcFut.thenCompose(inBusy(svc -> svc.invoke(iif))); } finally { busyLock.leaveBusy(); } @@ -985,4 +987,18 @@ public class MetaStorageManager implements IgniteComponent { throw new UnsupportedOperationException("Unsupported type of criterion"); } } + + private <T> Function<MetaStorageService, ? extends CompletableFuture<T>> inBusy(Function<MetaStorageService, CompletableFuture<T>> f) { + return (svc) -> { + if (!busyLock.enterBusy()) { + return CompletableFuture.failedFuture(new NodeStoppingException()); + } + + try { + return f.apply(svc); + } finally { + busyLock.leaveBusy(); + } + }; + } }
