Denovo1998 commented on code in PR #24833: URL: https://github.com/apache/pulsar/pull/24833#discussion_r2432187651
########## pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncSemaphoreImpl.java: ########## @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.semaphore; + +import io.netty.util.concurrent.DefaultThreadFactory; +import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.function.BooleanSupplier; +import java.util.function.LongConsumer; +import org.apache.pulsar.common.util.Runnables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of AsyncSemaphore with timeout and queue size limits. + */ +public class AsyncSemaphoreImpl implements AsyncSemaphore, AutoCloseable { + private static final Logger log = LoggerFactory.getLogger(AsyncSemaphoreImpl.class); + + private final AtomicLong availablePermits; + private final Queue<PendingRequest> queue; + private final long maxPermits; + private final long timeoutMillis; + private final ScheduledExecutorService executor; + private final boolean shutdownExecutor; + private final LongConsumer queueLatencyRecorder; + private final AtomicBoolean closed = new AtomicBoolean(false); + private final Runnable processQueueRunnable = Runnables.catchingAndLoggingThrowables(this::internalProcessQueue); + + public AsyncSemaphoreImpl(long maxPermits, int maxQueueSize, long timeoutMillis) { + this(maxPermits, maxQueueSize, timeoutMillis, createExecutor(), true, null); + } + + public AsyncSemaphoreImpl(long maxPermits, int maxQueueSize, long timeoutMillis, + ScheduledExecutorService executor, LongConsumer queueLatencyRecorder) { + this(maxPermits, maxQueueSize, timeoutMillis, executor, false, queueLatencyRecorder); + } + + AsyncSemaphoreImpl(long maxPermits, int maxQueueSize, long timeoutMillis, ScheduledExecutorService executor, + boolean shutdownExecutor, LongConsumer queueLatencyRecorder) { + this.availablePermits = new AtomicLong(maxPermits); + this.maxPermits = maxPermits; + this.queue = new ArrayBlockingQueue<>(maxQueueSize); + this.timeoutMillis = timeoutMillis; + this.executor = executor; + this.shutdownExecutor = shutdownExecutor; + this.queueLatencyRecorder = queueLatencyRecorder; + } + + private static ScheduledExecutorService createExecutor() { + return Executors.newSingleThreadScheduledExecutor( + new DefaultThreadFactory("async-semaphore-executor")); + } + + @Override + public CompletableFuture<AsyncSemaphorePermit> acquire(long permits, BooleanSupplier isCancelled) { + return internalAcquire(permits, permits, isCancelled); + } + + private CompletableFuture<AsyncSemaphorePermit> internalAcquire(long permits, long acquirePermits, + BooleanSupplier isCancelled) { + if (permits < 0) { + throw new IllegalArgumentException("Invalid permits value: " + permits); + } + + CompletableFuture<AsyncSemaphorePermit> future = new CompletableFuture<>(); + + if (closed.get()) { + future.completeExceptionally(new PermitAcquireAlreadyClosedException("Semaphore is closed")); + return future; + } + + PendingRequest request = new PendingRequest(permits, acquirePermits, future, isCancelled); + if (!queue.offer(request)) { + future.completeExceptionally(new PermitAcquireQueueFullException( + "Semaphore queue is full")); + return future; + } + // Schedule timeout + ScheduledFuture<?> timeoutTask = executor.schedule(() -> { + if (!request.future.isDone() && queue.remove(request)) { + // timeout is recorded with Long.MAX_VALUE as the age + recordQueueLatency(Long.MAX_VALUE); + // also record the time in the queue + recordQueueLatency(request.getAgeNanos()); + future.completeExceptionally(new PermitAcquireTimeoutException( + "Permit acquisition timed out")); + // the next request might have smaller permits and that might be processed + processQueue(); + } + }, timeoutMillis, TimeUnit.MILLISECONDS); + request.setTimeoutTask(timeoutTask); + + processQueue(); + return future; + } + + private void recordQueueLatency(long ageNanos) { + if (queueLatencyRecorder != null) { + queueLatencyRecorder.accept(ageNanos); + } + } + + @Override + public CompletableFuture<AsyncSemaphorePermit> update(AsyncSemaphorePermit permit, long newPermits, + BooleanSupplier isCancelled) { + if (newPermits < 0) { + throw new IllegalArgumentException("Invalid permits value: " + newPermits); + } + long oldPermits = permit.getPermits(); + long additionalPermits = newPermits - oldPermits; + // mark the old permits as released without adding the permits to availablePermits + castToImplementation(permit).releasePermits(); + if (additionalPermits > 0) { + return internalAcquire(newPermits, additionalPermits, isCancelled); + } else { + // new permits are less than the old ones, so we return the difference + availablePermits.addAndGet(-additionalPermits); + processQueue(); + // return the new permits immediately + return CompletableFuture.completedFuture(new SemaphorePermit(newPermits)); + } + } + + @Override + public void release(AsyncSemaphorePermit permit) { + availablePermits.addAndGet(castToImplementation(permit).releasePermits()); + processQueue(); + } + + @Override + public long getAvailablePermits() { + return availablePermits.get(); + } + + @Override + public long getAcquiredPermits() { + return maxPermits - availablePermits.get(); + } + + @Override + public int getQueueSize() { + return queue.size(); + } + + private SemaphorePermit castToImplementation(AsyncSemaphorePermit permit) { + if (permit instanceof SemaphorePermit semaphorePermit) { + return semaphorePermit; + } else { + throw new IllegalArgumentException("Invalid permit type"); + } + } + + private void processQueue() { + if (closed.get()) { + return; + } + executor.execute(processQueueRunnable); + } + + private void internalProcessQueue() { Review Comment: Is there a race condition here? - Line 187: current = 10 - Request needs 8 permits - Another thread calls update(), reducing availablePermits to 3 - Line 205: Check passes (8 <= 10 using stale current) - Line 206: availablePermits = 3 - 8 = -5 (negative!) This violates the semaphore invariant, allowing more permits to be acquired than maxPermits. ########## pip/pip-442.md: ########## @@ -163,227 +189,409 @@ public interface AsyncDualMemoryLimiter { HEAP_MEMORY, // For heap memory allocation DIRECT_MEMORY // For direct memory allocation } - + /** * Acquire permits for the specified memory size. * Returned future completes when memory permits are available. - * It will complete exceptionally with AsyncSemaphorePermitAcquireTimeoutException on timeout - * and exceptionally with AsyncSemaphorePermitAcquireQueueFullException when queue full + * It will complete exceptionally with AsyncSemaphore.PermitAcquireTimeoutException on timeout + * and exceptionally with AsyncSemaphore.PermitAcquireQueueFullException when queue full + * + * @param memorySize the size of memory to acquire permits for + * @param limitType the type of memory limit (HEAP_MEMORY or DIRECT_MEMORY) + * @param isCancelled supplier that returns true if acquisition should be cancelled * @return CompletableFuture that completes with permit when available */ - CompletableFuture<AsyncDualMemoryLimiterPermit> acquire(long memorySize, LimitType limitType); + CompletableFuture<AsyncDualMemoryLimiterPermit> acquire(long memorySize, LimitType limitType, + BooleanSupplier isCancelled); /** * Acquire or release permits for previously acquired permits by updating the requested memory size. * Returns a future that completes when permits are available. - * It will complete exceptionally with AsyncSemaphorePermitAcquireTimeoutException on timeout - * and exceptionally with AsyncSemaphorePermitAcquireQueueFullException when queue full + * It will complete exceptionally with AsyncSemaphore.PermitAcquireTimeoutException on timeout + * and exceptionally with AsyncSemaphore.PermitAcquireQueueFullException when queue full + * The provided permit is released when the permits are successfully acquired and the returned updated + * permit replaces the old instance. + * + * @param permit the previously acquired permit to update + * @param newMemorySize the new memory size to update to + * @param isCancelled supplier that returns true if update should be cancelled * @return CompletableFuture that completes with permit when available */ - CompletableFuture<AsyncDualMemoryLimiterPermit> update(AsyncDualMemoryLimiterPermit permit, long newMemorySize); - + CompletableFuture<AsyncDualMemoryLimiterPermit> update(AsyncDualMemoryLimiterPermit permit, long newMemorySize, + BooleanSupplier isCancelled); + /** * Release previously acquired permit. * Must be called to prevent memory permit leaks. + * + * @param permit the permit to release */ void release(AsyncDualMemoryLimiterPermit permit); + /** + * Execute the specified function with acquired permits and release the permits after the returned future completes. + * @param memorySize memory size to acquire permits for + * @param limitType memory limit type to acquire permits for + * @param function function to execute with acquired permits + * @return result of the function + * @param <T> type of the CompletableFuture returned by the function + */ + default <T> CompletableFuture<T> withAcquiredPermits(long memorySize, LimitType limitType, + BooleanSupplier isCancelled, + Function<AsyncDualMemoryLimiterPermit, + CompletableFuture<T>> function, + Function<Throwable, CompletableFuture<T>> + permitAcquireErrorHandler) { + return AsyncDualMemoryLimiterUtil.withPermitsFuture(acquire(memorySize, limitType, isCancelled), function, + permitAcquireErrorHandler, this::release); + } + + /** + * Executed the specified function with updated permits and release the permits after the returned future completes. + * @param initialPermit initial permit to update + * @param newMemorySize new memory size to update to + * @param function function to execute with updated permits + * @return result of the function + * @param <T> type of the CompletableFuture returned by the function + */ + default <T> CompletableFuture<T> withUpdatedPermits(AsyncDualMemoryLimiterPermit initialPermit, long newMemorySize, + BooleanSupplier isCancelled, + Function<AsyncDualMemoryLimiterPermit, + CompletableFuture<T>> function, + Function<Throwable, CompletableFuture<T>> + permitAcquireErrorHandler) { + return AsyncDualMemoryLimiterUtil.withPermitsFuture(update(initialPermit, newMemorySize, isCancelled), function, + permitAcquireErrorHandler, this::release); + } +} +``` + +#### AsyncDualMemoryLimiterUtil Helper + +A utility class provides helper methods for common patterns: + +```java +public class AsyncDualMemoryLimiterUtil { + /** + * Execute a function with acquired permits and ensure permits are released after completion. + * This method handles the lifecycle of permits - acquisition, usage, and release, including error cases. + * + * @param permitsFuture Future that will complete with the required permits + * @param function Function to execute once permits are acquired that returns a CompletableFuture + * @param permitAcquireErrorHandler Handler for permit acquisition errors that returns a CompletableFuture + * @param releaser Consumer that handles releasing the permits + * @param <T> The type of result returned by the function + * @return CompletableFuture that completes with the result of the function execution + */ + public static <T> CompletableFuture<T> withPermitsFuture( + CompletableFuture<AsyncDualMemoryLimiterPermit> + permitsFuture, + Function<AsyncDualMemoryLimiterPermit, + CompletableFuture<T>> function, + Function<Throwable, CompletableFuture<T>> + permitAcquireErrorHandler, + Consumer<AsyncDualMemoryLimiterPermit> releaser) { + // implementation omitted from PIP document + } + + /** + * Acquires permits and writes the command as a response to the channel. + * Releases the permits after the response has been written to the socket or if the write fails. + * + * @param ctx the channel handler context used for writing the response + * @param dualMemoryLimiter the memory limiter used to acquire and release memory permits + * @param isCancelled supplier that indicates if the permit acquisition should be cancelled + * @param command the base command to serialize and write to the channel + * @param permitAcquireErrorHandler handler for errors that occur during permit acquisition + * @return a future that completes when the command has been written to the channel's outbound buffer + */ + public static CompletableFuture<Void> acquireDirectMemoryPermitsAndWriteAndFlush(ChannelHandlerContext ctx, + AsyncDualMemoryLimiter + dualMemoryLimiter, + BooleanSupplier isCancelled, + BaseCommand command, + Consumer<Throwable> + permitAcquireErrorHandler + ) { + // implementation omitted from PIP document + } } ``` #### Integration Points -**1. Heap Memory Limiting (Post-Retrieval)** +**1. Heap Memory Limiting (Post-Retrieval) - Broker** -In `ServerCnx.handleGetTopicsOfNamespace`: +In `ServerCnx.handleGetTopicsOfNamespace`, the implementation uses the helper methods: ```java -// Acquire a fixed amount of permits initially since it's not known how much memory will be used -// This will ensure that the operation continues only after it has the initial permits -// It would be possible to use statistics for initial estimate, but this is simpler and sufficient -maxTopicListInFlightLimiter.acquire(INITIAL_SIZE, AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY) - .thenCompose(initialPermit -> { - getBrokerService().pulsar().getNamespaceService().getListOfUserTopics(namespaceName, mode) - .thenCompose(topics -> { - // Estimate memory after retrieval and update the permits to reflect the actual size - long estimatedSize = topics.stream().mapToInt(String::length).sum(); - return maxTopicListInFlightLimiter - .update(initialPermit, estimatedSize) - .thenApply(permit -> Pair.of(topics, permit)); - }) - .thenAccept(topicsAndPermit -> { - try { - // Process and send response - ... - } finally { - maxTopicListInFlightLimiter.release(topicsAndPermit.getRight()); - } + private void internalHandleGetTopicsOfNamespace(String namespace, NamespaceName namespaceName, long requestId, + CommandGetTopicsOfNamespace.Mode mode, + Optional<String> topicsPattern, Optional<String> topicsHash, + Semaphore lookupSemaphore) { + BooleanSupplier isPermitRequestCancelled = () -> !ctx().channel().isActive(); + maxTopicListInFlightLimiter.withAcquiredPermits(INITIAL_TOPIC_LIST_HEAP_PERMITS_SIZE, + AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY, isPermitRequestCancelled, initialPermits -> { + return getBrokerService().pulsar().getNamespaceService().getListOfUserTopics(namespaceName, mode) + .thenAccept(topics -> { + long actualSize = topics.stream().mapToInt(String::length).sum(); + maxTopicListInFlightLimiter.withUpdatedPermits(initialPermits, actualSize, + isPermitRequestCancelled, permits -> { + boolean filterTopics = false; + // filter system topic + List<String> filteredTopics = topics; + + if (enableSubscriptionPatternEvaluation && topicsPattern.isPresent()) { + if (topicsPattern.get().length() <= maxSubscriptionPatternLength) { + filterTopics = true; + filteredTopics = TopicList.filterTopics(filteredTopics, topicsPattern.get(), + topicsPatternImplementation); + } else { + log.info("[{}] Subscription pattern provided [{}] was longer than " + + "maximum {}.", remoteAddress, topicsPattern.get(), + maxSubscriptionPatternLength); + } + } + String hash = TopicList.calculateHash(filteredTopics); + boolean hashUnchanged = topicsHash.isPresent() && topicsHash.get().equals(hash); + if (hashUnchanged) { + filteredTopics = Collections.emptyList(); + } + if (log.isDebugEnabled()) { + log.debug("[{}] Received CommandGetTopicsOfNamespace for namespace " + + "[//{}] by {}, size:{}", remoteAddress, namespace, + requestId, + topics.size()); + } + commandSender.sendGetTopicsOfNamespaceResponse(filteredTopics, hash, filterTopics, + !hashUnchanged, requestId, ex -> { + log.warn("[{}] Failed to acquire direct memory permits for " + + "GetTopicsOfNamespace: {}", remoteAddress, ex.getMessage()); + commandSender.sendErrorResponse(requestId, ServerError.TooManyRequests, + "Cannot acquire permits for direct memory"); + }); + return CompletableFuture.completedFuture(null); + }, t -> { + log.warn("[{}] Failed to acquire heap memory permits for " + + "GetTopicsOfNamespace: {}", remoteAddress, t.getMessage()); + writeAndFlush(Commands.newError(requestId, ServerError.TooManyRequests, + "Failed due to heap memory limit exceeded")); + return CompletableFuture.completedFuture(null); + }); + }).whenComplete((__, ___) -> { + lookupSemaphore.release(); + }).exceptionally(ex -> { + log.warn("[{}] Error GetTopicsOfNamespace for namespace [//{}] by {}", remoteAddress, + namespace, requestId); + commandSender.sendErrorResponse(requestId, + BrokerServiceException.getClientErrorCode(new ServerMetadataException(ex)), + ex.getMessage()); + return null; + }); + }, t -> { + log.warn("[{}] Failed to acquire initial heap memory permits for GetTopicsOfNamespace: {}", + remoteAddress, t.getMessage()); + writeAndFlush(Commands.newError(requestId, ServerError.TooManyRequests, + "Failed due to heap memory limit exceeded")); + lookupSemaphore.release(); + return CompletableFuture.completedFuture(null); }); - ... - // For exceptional paths, initialPermit would need to be released +} ``` -**2. Direct Memory Limiting (Pre-Serialization)** +**2. Direct Memory Limiting (Pre-Serialization) - Broker** -Modified `CommandSender` implementation: +Modified `PulsarCommandSenderImpl`: ```java @Override public void sendGetTopicsOfNamespaceResponse(List<String> topics, String topicsHash, - boolean filtered, boolean changed, long requestId) { + boolean filtered, boolean changed, long requestId, + Consumer<Throwable> permitAcquireErrorHandler) { BaseCommand command = Commands.newGetTopicsOfNamespaceResponseCommand(topics, topicsHash, filtered, changed, requestId); safeIntercept(command, cnx); - acquireMaxTopicListInFlightPermitsAndWriteAndFlush(command); + acquireDirectMemoryPermitsAndWriteAndFlush(cnx.ctx(), maxTopicListInFlightLimiter, () -> !cnx.isActive(), + command, permitAcquireErrorHandler); } +``` + +The utility method implementation: -private void acquireMaxTopicListInFlightPermitsAndWriteAndFlush(BaseCommand command) { +```java +public static CompletableFuture<Void> acquireDirectMemoryPermitsAndWriteAndFlush(ChannelHandlerContext ctx, + AsyncDualMemoryLimiter + dualMemoryLimiter, + BooleanSupplier isCancelled, + BaseCommand command, + Consumer<Throwable> + permitAcquireErrorHandler +) { // Calculate serialized size before acquiring permits int serializedSize = command.getSerializedSize(); // Acquire permits - maxTopicListInFlightLimiter.acquire(serializedSize, AsyncDualMemoryLimiter.LimitType.DIRECT_MEMORY) - .thenAcceptAsync(permits -> { + return dualMemoryLimiter.acquire(serializedSize, AsyncDualMemoryLimiter.LimitType.DIRECT_MEMORY, isCancelled) + .whenComplete((permits, t) -> { + if (t != null) { + permitAcquireErrorHandler.accept(t); + return; + } try { // Serialize the response ByteBuf outBuf = Commands.serializeWithPrecalculatedSerializedSize(command, serializedSize); // Write the response - cnx.ctx().writeAndFlush(outBuf).addListener(future -> { + ctx.writeAndFlush(outBuf).addListener(future -> { // Release permits after the response has been written to the socket - maxTopicListInFlightLimiter.release(permits); + dualMemoryLimiter.release(permits); }); - } catch (Exception e) { + } catch (Throwable e) { // Return permits if an exception occurs before writeAndFlush is called successfully - maxTopicListInFlightLimiter.release(permits); + dualMemoryLimiter.release(permits); throw e; } - }, cnx.ctx().executor()); + }).thenApply(__ -> null); } ``` -**3. Watch Command Memory Control** +**3. Watch Command Memory Control - Broker** -Similar memory limiting patterns apply to watch commands: +Similar memory limiting patterns apply to watch commands in `TopicListService`: ```java -public void sendWatchTopicListSuccess(long requestId, long watcherId, String topicsHash, List<String> topics) { - BaseCommand command = Commands.newWatchTopicListSuccess(requestId, watcherId, topicsHash, topics); - acquireMaxTopicListInFlightPermitsAndWriteAndFlush(command); -} - -public void sendWatchTopicListUpdate(long watcherId, List<String> newTopics, List<String> deletedTopics, String topicsHash) { - BaseCommand command = Commands.newWatchTopicUpdate(watcherId, newTopics, deletedTopics, topicsHash); - acquireMaxTopicListInFlightPermitsAndWriteAndFlush(command); +public void sendTopicListUpdate(long watcherId, String topicsHash, + List<String> deletedTopics, List<String> newTopics) { + connection.getCommandSender().sendWatchTopicListUpdate( + watcherId, newTopics, deletedTopics, topicsHash, + t -> { + log.warn("[{}] Cannot acquire direct memory tokens for sending topic list update", + connection.toString(), t); + }); } ``` -**4. Proxy Reading Memory Control** - -On the Pulsar Proxy side, the problem is slightly different. The problem occurs when the proxy receives a `CommandGetTopicsOfNamespace` command, forwards it to a broker, and receives a response. The proxy needs to deserialize and serialize the response before sending it to the client. -Memory is allocated for both deserialization and serialization. - -Solving this requires a slight modification to PulsarDecoder. +**4. Proxy Memory Control** -In `PulsarDecoder.channelRead`, it would be necessary to record the size of the incoming message: +On the Pulsar Proxy side in `LookupProxyHandler`: ```java - // Get a buffer that contains the full frame - ByteBuf buffer = (ByteBuf) msg; - try { - // De-serialize the command - int cmdSize = (int) buffer.readUnsignedInt(); - cmd.parseFrom(buffer, cmdSize); -``` +private void internalPerformGetTopicsOfNamespace(long clientRequestId, String namespaceName, ClientCnx clientCnx, + ByteBuf command, long requestId) { + BooleanSupplier isPermitRequestCancelled = () -> !proxyConnection.ctx().channel().isActive(); + maxTopicListInFlightLimiter.withAcquiredPermits(INITIAL_TOPIC_LIST_HEAP_PERMITS_SIZE, + AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY, isPermitRequestCancelled, initialPermits -> { + return clientCnx.newGetTopicsOfNamespace(command, requestId).whenComplete((r, t) -> { + if (t != null) { + log.warn("[{}] Failed to get TopicsOfNamespace {}: {}", clientAddress, namespaceName, + t.getMessage()); + writeAndFlush(Commands.newError(clientRequestId, getServerError(t), t.getMessage())); + } else { + long actualSize = + r.getNonPartitionedOrPartitionTopics().stream().mapToInt(String::length).sum(); + maxTopicListInFlightLimiter.withUpdatedPermits(initialPermits, actualSize, + isPermitRequestCancelled, permits -> { + return handleWritingGetTopicsResponse(clientRequestId, r, isPermitRequestCancelled); + }, t2 -> { + log.warn("[{}] Failed to acquire actual heap memory permits for " + + "GetTopicsOfNamespace: {}", clientAddress, t2.getMessage()); + writeAndFlush(Commands.newError(clientRequestId, ServerError.TooManyRequests, + "Failed due to heap memory limit exceeded")); + + return CompletableFuture.completedFuture(null); + }); + } + }).thenApply(__ -> null); + }, t -> { + log.warn("[{}] Failed to acquire initial heap memory permits for GetTopicsOfNamespace: {}", + clientAddress, t.getMessage()); + writeAndFlush(Commands.newError(clientRequestId, ServerError.TooManyRequests, + "Failed due to heap memory limit exceeded")); + + return CompletableFuture.completedFuture(null); + }).exceptionally(ex -> { + writeAndFlush(Commands.newError(clientRequestId, getServerError(ex), ex.getMessage())); + return null; + }); +} -It could be modified to store the `cmdSize` in a field instead of a local variable: -```java - protected int cmdSize; -... - // Get a buffer that contains the full frame - ByteBuf buffer = (ByteBuf) msg; - try { - // De-serialize the command - cmdSize = (int) buffer.readUnsignedInt(); - cmd.parseFrom(buffer, cmdSize); +private CompletableFuture<Void> handleWritingGetTopicsResponse(long clientRequestId, GetTopicsResult r, + BooleanSupplier isCancelled) { + BaseCommand responseCommand = Commands.newGetTopicsOfNamespaceResponseCommand( + r.getNonPartitionedOrPartitionTopics(), r.getTopicsHash(), r.isFiltered(), + r.isChanged(), clientRequestId); + return acquireDirectMemoryPermitsAndWriteAndFlush(proxyConnection.ctx(), maxTopicListInFlightLimiter, + isCancelled, responseCommand, t -> { + log.warn("[{}] Failed to acquire actual direct memory permits for GetTopicsOfNamespace: {}", + clientAddress, t.getMessage()); + writeAndFlush(Commands.newError(clientRequestId, ServerError.TooManyRequests, + "Failed due to heap memory limit exceeded")); Review Comment: ```java "Failed due to direct memory limit exceeded" ``` ########## pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java: ########## @@ -357,6 +358,62 @@ private void performGetTopicsOfNamespace(long clientRequestId, }); } + private void internalPerformGetTopicsOfNamespace(long clientRequestId, String namespaceName, ClientCnx clientCnx, + ByteBuf command, long requestId) { + BooleanSupplier isPermitRequestCancelled = () -> !proxyConnection.ctx().channel().isActive(); + maxTopicListInFlightLimiter.withAcquiredPermits(INITIAL_TOPIC_LIST_HEAP_PERMITS_SIZE, + AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY, isPermitRequestCancelled, initialPermits -> { + return clientCnx.newGetTopicsOfNamespace(command, requestId).whenComplete((r, t) -> { + if (t != null) { + log.warn("[{}] Failed to get TopicsOfNamespace {}: {}", clientAddress, namespaceName, + t.getMessage()); + writeAndFlush(Commands.newError(clientRequestId, getServerError(t), t.getMessage())); + } else { + long actualSize = + r.getNonPartitionedOrPartitionTopics().stream() + .mapToInt(ByteBufUtil::utf8Bytes) // convert character count to bytes + .map(n -> n + 32) // add 32 bytes overhead for each entry Review Comment: Can be changed to ```java .mapToLong(topic -> ByteBufUtil.utf8Bytes(topic) + 32) // UTF-8 bytes + 32 bytes overhead per entry ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
