This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit f32236ff7c3d0647f3b2ca1839ff6ae073d52532 Author: Lari Hotari <[email protected]> AuthorDate: Fri Dec 12 13:40:10 2025 +0200 [fix][broker] PIP-442: Fix race condition in async semaphore permit updates that causes memory limits to become ineffective (#25066) (cherry picked from commit c8dbc3c9c2276d1324adc53f33a2f409238d121f) --- .../apache/pulsar/broker/service/ServerCnx.java | 4 +- ...nConsumerBackPressureMultipleConsumersTest.java | 136 ++++++++++++--------- .../semaphore/AsyncDualMemoryLimiterImpl.java | 4 +- .../common/semaphore/AsyncSemaphoreImpl.java | 68 +++++------ .../semaphore/AsyncDualMemoryLimiterImplTest.java | 24 ++++ .../pulsar/proxy/server/LookupProxyHandler.java | 7 +- ...nConsumerBackPressureMultipleConsumersTest.java | 20 +++ 7 files changed, 162 insertions(+), 101 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index a539f437e7f..098c5a9090d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -2593,10 +2593,10 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY, isPermitRequestCancelled, initialPermits -> { return getBrokerService().pulsar().getNamespaceService() .getListOfUserTopics(namespaceName, mode) - .thenAccept(topics -> { + .thenCompose(topics -> { long actualSize = TopicListMemoryLimiter.estimateTopicListSize(topics); listSizeHolder.updateSize(actualSize); - maxTopicListInFlightLimiter.withUpdatedPermits(initialPermits, actualSize, + return maxTopicListInFlightLimiter.withUpdatedPermits(initialPermits, actualSize, isPermitRequestCancelled, permits -> { boolean filterTopics = false; // filter system topic diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternConsumerBackPressureMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternConsumerBackPressureMultipleConsumersTest.java index d91106ab683..15db34823e1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternConsumerBackPressureMultipleConsumersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternConsumerBackPressureMultipleConsumersTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.api; +import static org.assertj.core.api.Assertions.assertThat; import static org.testng.Assert.assertEquals; import io.netty.buffer.ByteBuf; import java.io.Closeable; @@ -41,6 +42,8 @@ import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.semaphore.AsyncDualMemoryLimiter; +import org.apache.pulsar.common.semaphore.AsyncDualMemoryLimiterImpl; import org.apache.pulsar.common.stats.JvmMetrics; import org.apache.pulsar.common.util.DirectMemoryUtils; import org.testng.annotations.AfterMethod; @@ -96,69 +99,86 @@ public class PatternConsumerBackPressureMultipleConsumersTest extends MockedPuls } }; - @Cleanup("shutdownNow") - final ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime() - .availableProcessors()); - - @Cleanup - PulsarClientSharedResources sharedResources = - PulsarClientSharedResources.builder().build(); - List<PulsarClientImpl> clients = new ArrayList<>(numberOfClients); - @Cleanup - Closeable closeClients = () -> { - for (PulsarClient client : clients) { - try { - client.close(); - } catch (PulsarClientException e) { - log.error("Failed to close client {}", client, e); + { + @Cleanup("shutdownNow") final ExecutorService executorService = + Executors.newFixedThreadPool(Runtime.getRuntime() + .availableProcessors()); + + @Cleanup + PulsarClientSharedResources sharedResources = + PulsarClientSharedResources.builder().build(); + List<PulsarClientImpl> clients = new ArrayList<>(numberOfClients); + @Cleanup + Closeable closeClients = () -> { + for (PulsarClient client : clients) { + try { + client.close(); + } catch (PulsarClientException e) { + log.error("Failed to close client {}", client, e); + } } + }; + for (int i = 0; i < numberOfClients; i++) { + PulsarClientImpl client = (PulsarClientImpl) PulsarClient.builder() + .serviceUrl(getClientServiceUrl()) + .sharedResources(sharedResources) + .build(); + clients.add(client); } - }; - for (int i = 0; i < numberOfClients; i++) { - PulsarClientImpl client = (PulsarClientImpl) PulsarClient.builder() - .serviceUrl(getClientServiceUrl()) - .sharedResources(sharedResources) - .build(); - clients.add(client); - } - final AtomicInteger success = new AtomicInteger(0); - final CountDownLatch latch = new CountDownLatch(requests); - final Semaphore semaphore = new Semaphore(maxRequestsInFlight); - for (int i = 0; i < requests; i++) { - PulsarClientImpl pulsarClientImpl = clients.get(i % numberOfClients); - executorService.execute(() -> { - semaphore.acquireUninterruptibly(); - try { - pulsarClientImpl.getLookup() - .getTopicsUnderNamespace(NamespaceName.get("public", "default"), - CommandGetTopicsOfNamespace.Mode.PERSISTENT, ".*", "") - .whenComplete((result, ex) -> { - semaphore.release(); - if (ex == null) { - success.incrementAndGet(); - } else { - log.error("Failed to get topic list.", ex); - } - log.info( - "latch-count: {}, succeed: {}, available direct mem: {} MB, free heap mem: {}" - + " MB", - latch.getCount(), success.get(), - (DirectMemoryUtils.jvmMaxDirectMemory() - JvmMetrics.getJvmDirectMemoryUsed()) - / (1024 * 1024), Runtime.getRuntime().freeMemory() / (1024 * 1024)); - latch.countDown(); - }); - } catch (Exception e) { - semaphore.release(); - latch.countDown(); - log.error("Failed to execute getTopicsUnderNamespace request.", e); - } - }); + final AtomicInteger success = new AtomicInteger(0); + final CountDownLatch latch = new CountDownLatch(requests); + final Semaphore semaphore = new Semaphore(maxRequestsInFlight); + for (int i = 0; i < requests; i++) { + PulsarClientImpl pulsarClientImpl = clients.get(i % numberOfClients); + executorService.execute(() -> { + semaphore.acquireUninterruptibly(); + try { + pulsarClientImpl.getLookup() + .getTopicsUnderNamespace(NamespaceName.get("public", "default"), + CommandGetTopicsOfNamespace.Mode.PERSISTENT, ".*", "") + .whenComplete((result, ex) -> { + semaphore.release(); + if (ex == null) { + success.incrementAndGet(); + } else { + log.error("Failed to get topic list.", ex); + } + log.info("latch-count: {}, succeed: {}, available direct mem: {} MB, " + + "free heap mem: {} MB", + latch.getCount(), success.get(), + (DirectMemoryUtils.jvmMaxDirectMemory() + - JvmMetrics.getJvmDirectMemoryUsed()) + / (1024 * 1024), Runtime.getRuntime().freeMemory() / (1024 * 1024)); + latch.countDown(); + }); + } catch (Exception e) { + semaphore.release(); + latch.countDown(); + log.error("Failed to execute getTopicsUnderNamespace request.", e); + } + }); + } + latch.await(); + assertEquals(success.get(), requests); + + validateTopiclistPrometheusMetrics(); } - latch.await(); - assertEquals(success.get(), requests); - validateTopiclistPrometheusMetrics(); + validateThatTokensHaventLeakedOrIncreased(); + } + + protected void validateThatTokensHaventLeakedOrIncreased() { + AsyncDualMemoryLimiterImpl limiter = + pulsar.getBrokerService().getMaxTopicListInFlightLimiter(); + assertThat(limiter.getLimiter(AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY).getAvailablePermits()) + .isEqualTo(pulsar.getConfiguration().getMaxTopicListInFlightHeapMemSizeMB() * 1024 * 1024); + assertThat(limiter.getLimiter(AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY).getAcquiredPermits()) + .isEqualTo(0); + assertThat(limiter.getLimiter(AsyncDualMemoryLimiter.LimitType.DIRECT_MEMORY).getAvailablePermits()) + .isEqualTo(pulsar.getConfiguration().getMaxTopicListInFlightDirectMemSizeMB() * 1024 * 1024); + assertThat(limiter.getLimiter(AsyncDualMemoryLimiter.LimitType.DIRECT_MEMORY).getAcquiredPermits()) + .isEqualTo(0); } protected int getNumberOfClients() { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterImpl.java index d6b31aa72ee..1b55d56d201 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterImpl.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.common.semaphore; +import com.google.common.annotations.VisibleForTesting; import io.netty.util.concurrent.DefaultThreadFactory; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; @@ -93,7 +94,8 @@ public class AsyncDualMemoryLimiterImpl implements AsyncDualMemoryLimiter, AutoC new DualMemoryLimiterPermit(limitType, result)); } - protected AsyncSemaphore getLimiter(LimitType limitType) { + @VisibleForTesting + public AsyncSemaphore getLimiter(LimitType limitType) { switch (limitType) { case HEAP_MEMORY: return heapLimiter; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncSemaphoreImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncSemaphoreImpl.java index 7fc506753a8..bd5f5912921 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncSemaphoreImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncSemaphoreImpl.java @@ -21,7 +21,6 @@ package org.apache.pulsar.common.semaphore; import io.netty.util.concurrent.DefaultThreadFactory; import java.util.Queue; import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; @@ -33,7 +32,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.function.BooleanSupplier; import java.util.function.LongConsumer; -import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.Runnables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -103,10 +101,10 @@ public class AsyncSemaphoreImpl implements AsyncSemaphore, AutoCloseable { @Override public CompletableFuture<AsyncSemaphorePermit> acquire(long permits, BooleanSupplier isCancelled) { - return internalAcquire(permits, permits, isCancelled); + return internalAcquire(permits, null, isCancelled); } - private CompletableFuture<AsyncSemaphorePermit> internalAcquire(long permits, long acquirePermits, + private CompletableFuture<AsyncSemaphorePermit> internalAcquire(long permits, SemaphorePermit previousPermit, BooleanSupplier isCancelled) { validatePermits(permits); @@ -122,7 +120,7 @@ public class AsyncSemaphoreImpl implements AsyncSemaphore, AutoCloseable { return future; } - PendingRequest request = new PendingRequest(permits, acquirePermits, future, isCancelled); + PendingRequest request = new PendingRequest(permits, previousPermit, future, isCancelled); if (!queue.offer(request)) { future.completeExceptionally(new PermitAcquireQueueFullException( "Semaphore queue is full")); @@ -177,34 +175,21 @@ public class AsyncSemaphoreImpl implements AsyncSemaphore, AutoCloseable { long oldPermits = permit.getPermits(); long additionalPermits = newPermits - oldPermits; if (additionalPermits > 0) { - CompletableFuture<AsyncSemaphorePermit> acquireFuture = - internalAcquire(newPermits, additionalPermits, isCancelled); - // return a future that completes after original permits have been released when the acquisition - // has been successfully completed - CompletableFuture<AsyncSemaphorePermit> returnedFuture = - acquireFuture.thenApply(p -> { - // mark the old permits as released without adding the permits to availablePermits - castToImplementation(permit).releasePermits(); - return p; - }); - // add cancellation support for returned future, so that it cancels the acquireFuture if the returnedFuture - // is cancelled - returnedFuture.whenComplete((p, t) -> { - if (t != null && FutureUtil.unwrapCompletionException(t) instanceof CancellationException) { - acquireFuture.cancel(false); - } - }); - return returnedFuture; - } - if (additionalPermits < 0) { - // new permits are less than the old ones, so we return the difference - availablePermits.addAndGet(-additionalPermits); - processQueue(); + return internalAcquire(newPermits, castToImplementation(permit), isCancelled); } // mark the old permits as released without adding the permits to availablePermits - castToImplementation(permit).releasePermits(); - // return the new permits immediately - return CompletableFuture.completedFuture(new SemaphorePermit(newPermits)); + long leftoverPermits = castToImplementation(permit).releasePermits() - newPermits; + if (leftoverPermits >= 0) { + if (leftoverPermits > 0) { + // new permits are less than the old ones, so we return the difference + availablePermits.addAndGet(leftoverPermits); + processQueue(); + } + // return the new permits immediately + return CompletableFuture.completedFuture(new SemaphorePermit(newPermits)); + } else { + return acquire(newPermits, isCancelled); + } } @Override @@ -282,8 +267,9 @@ public class AsyncSemaphoreImpl implements AsyncSemaphore, AutoCloseable { continue; } - if (request.acquirePermits <= current) { - availablePermits.addAndGet(-request.acquirePermits); + if (request.getRequiredPermits() <= current) { + long requiredPermitsReusingPrevious = request.getRequiredPermitsReusingPrevious(); + availablePermits.addAndGet(-requiredPermitsReusingPrevious); request.cancelTimeoutTask(); queue.remove(request); SemaphorePermit permit = new SemaphorePermit(request.permits); @@ -291,7 +277,7 @@ public class AsyncSemaphoreImpl implements AsyncSemaphore, AutoCloseable { boolean futureCompleted = request.future.complete(permit); if (!futureCompleted) { // request was cancelled by user code, return permits - availablePermits.addAndGet(request.acquirePermits); + availablePermits.addAndGet(requiredPermitsReusingPrevious); } } else { break; @@ -318,16 +304,16 @@ public class AsyncSemaphoreImpl implements AsyncSemaphore, AutoCloseable { private static class PendingRequest { final long permits; - private final long acquirePermits; + private final SemaphorePermit previousPermit; final CompletableFuture<AsyncSemaphorePermit> future; private final BooleanSupplier isCancelled; private volatile ScheduledFuture<?> timeoutTask; private final long requestCreatedNanos = System.nanoTime(); - PendingRequest(long permits, long acquirePermits, CompletableFuture<AsyncSemaphorePermit> future, + PendingRequest(long permits, SemaphorePermit previousPermit, CompletableFuture<AsyncSemaphorePermit> future, BooleanSupplier isCancelled) { this.permits = permits; - this.acquirePermits = acquirePermits; + this.previousPermit = previousPermit; this.future = future; this.isCancelled = isCancelled; } @@ -346,6 +332,14 @@ public class AsyncSemaphoreImpl implements AsyncSemaphore, AutoCloseable { long getAgeNanos() { return System.nanoTime() - requestCreatedNanos; } + + long getRequiredPermits() { + return previousPermit == null ? permits : permits - previousPermit.getPermits(); + } + + long getRequiredPermitsReusingPrevious() { + return previousPermit == null ? permits : permits - previousPermit.releasePermits(); + } } private static class SemaphorePermit implements AsyncSemaphorePermit { diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterImplTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterImplTest.java index fd23d95151c..3c972ce1640 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterImplTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterImplTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.common.semaphore; +import static org.assertj.core.api.Assertions.assertThat; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; @@ -32,6 +33,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BooleanSupplier; import org.apache.pulsar.common.semaphore.AsyncDualMemoryLimiter.AsyncDualMemoryLimiterPermit; import org.apache.pulsar.common.semaphore.AsyncDualMemoryLimiter.LimitType; import org.apache.pulsar.common.semaphore.AsyncSemaphore.PermitAcquireAlreadyClosedException; @@ -718,4 +721,25 @@ public class AsyncDualMemoryLimiterImplTest { limiter.release(permit); } } + + @Test(invocationCount = 100) + public void testUpdateHeapPermitsIncreaseWithReleasedPermits() throws Exception { + limiter = new AsyncDualMemoryLimiterImpl(100000, 10, 5000, 1000, 100, 5000); + + BooleanSupplier cancelled = () -> false; + AtomicReference<CompletableFuture<Void>> future2Ref = new AtomicReference<>(); + + CompletableFuture<Void> future = + limiter.withAcquiredPermits(100, LimitType.HEAP_MEMORY, cancelled, permit -> { + future2Ref.set(limiter.withUpdatedPermits(permit, 200, cancelled, updatedPermit -> { + return CompletableFuture.supplyAsync(() -> null); + }, CompletableFuture::failedFuture)); + return CompletableFuture.completedFuture(null); + }, CompletableFuture::failedFuture); + + assertThat(future).succeedsWithin(1, TimeUnit.SECONDS); + assertThat(future2Ref.get()).succeedsWithin(1, TimeUnit.SECONDS); + assertThat(limiter.getLimiter(LimitType.HEAP_MEMORY).getAvailablePermits()).isEqualTo(100000); + assertThat(limiter.getLimiter(LimitType.HEAP_MEMORY).getAcquiredPermits()).isEqualTo(0); + } } \ No newline at end of file diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java index 49a59444b62..d276c7996a6 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java @@ -368,17 +368,18 @@ public class LookupProxyHandler { listSizeHolder.getSizeAsync().thenAccept(initialSize -> { maxTopicListInFlightLimiter.withAcquiredPermits(initialSize, AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY, isPermitRequestCancelled, initialPermits -> { - return clientCnx.newGetTopicsOfNamespace(command, requestId).whenComplete((r, t) -> { + return clientCnx.newGetTopicsOfNamespace(command, requestId).handle((r, t) -> { if (t != null) { log.warn("[{}] Failed to get TopicsOfNamespace {}: {}", clientAddress, namespaceName, t.getMessage()); listSizeHolder.resetIfInitializing(); writeAndFlush(Commands.newError(clientRequestId, getServerError(t), t.getMessage())); + return CompletableFuture.completedFuture(null); } else { long actualSize = TopicListMemoryLimiter.estimateTopicListSize( r.getNonPartitionedOrPartitionTopics()); listSizeHolder.updateSize(actualSize); - maxTopicListInFlightLimiter.withUpdatedPermits(initialPermits, actualSize, + return maxTopicListInFlightLimiter.withUpdatedPermits(initialPermits, actualSize, isPermitRequestCancelled, permits -> { return handleWritingGetTopicsResponse(clientRequestId, r, isPermitRequestCancelled); @@ -392,7 +393,7 @@ public class LookupProxyHandler { return CompletableFuture.completedFuture(null); }); } - }).thenApply(__ -> null); + }); }, t -> { log.warn("[{}] Failed to acquire initial heap memory permits for GetTopicsOfNamespace: {}", clientAddress, t.getMessage()); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPatternConsumerBackPressureMultipleConsumersTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPatternConsumerBackPressureMultipleConsumersTest.java index 3263cc2985c..e8550db40b9 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPatternConsumerBackPressureMultipleConsumersTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPatternConsumerBackPressureMultipleConsumersTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.proxy.server; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.doReturn; import java.util.Optional; import lombok.extern.slf4j.Slf4j; @@ -26,6 +27,8 @@ import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.PatternConsumerBackPressureMultipleConsumersTest; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; +import org.apache.pulsar.common.semaphore.AsyncDualMemoryLimiter; +import org.apache.pulsar.common.semaphore.AsyncDualMemoryLimiterImpl; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.mockito.Mockito; import org.testng.annotations.AfterMethod; @@ -75,6 +78,23 @@ public class ProxyPatternConsumerBackPressureMultipleConsumersTest extends super.cleanup(); } + @Override + protected void validateThatTokensHaventLeakedOrIncreased() { + // validate broker's limiter + super.validateThatTokensHaventLeakedOrIncreased(); + // validate proxy's limiter + AsyncDualMemoryLimiterImpl limiter = + proxyService.getMaxTopicListInFlightLimiter(); + assertThat(limiter.getLimiter(AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY).getAvailablePermits()) + .isEqualTo(proxyConfig.getMaxTopicListInFlightHeapMemSizeMB() * 1024 * 1024); + assertThat(limiter.getLimiter(AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY).getAcquiredPermits()) + .isEqualTo(0); + assertThat(limiter.getLimiter(AsyncDualMemoryLimiter.LimitType.DIRECT_MEMORY).getAvailablePermits()) + .isEqualTo(proxyConfig.getMaxTopicListInFlightDirectMemSizeMB() * 1024 * 1024); + assertThat(limiter.getLimiter(AsyncDualMemoryLimiter.LimitType.DIRECT_MEMORY).getAcquiredPermits()) + .isEqualTo(0); + } + @Override protected String getClientServiceUrl() { return proxyService.getServiceUrl();
