This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit f909e1c87fe3b4f3068f97db3fafb82de223f6f7 Author: Lari Hotari <[email protected]> AuthorDate: Thu Jan 15 10:39:11 2026 +0200 [fix][proxy] Fix memory leaks in ParserProxyHandler (#25142) (cherry picked from commit 99fdca8af6742dd36bec93ab0038fb572e6eeb0e) --- .../pulsar/proxy/server/DirectProxyHandler.java | 9 +- .../pulsar/proxy/server/ParserProxyHandler.java | 126 ++++++++++++++------- .../apache/pulsar/proxy/server/ProxyService.java | 12 +- .../apache/pulsar/proxy/server/ProxyStatsTest.java | 58 ++++++++++ 4 files changed, 155 insertions(+), 50 deletions(-) diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java index 5f4456d356e..f707abbc06a 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java @@ -426,26 +426,27 @@ public class DirectProxyHandler { } else { // Enable parsing feature, proxyLogLevel(1 or 2) // Add parser handler + ParserProxyHandler.Context parserContext = ParserProxyHandler.createContext(); if (connected.hasMaxMessageSize()) { FrameDecoderUtil.replaceFrameDecoder(inboundChannel.pipeline(), connected.getMaxMessageSize()); FrameDecoderUtil.replaceFrameDecoder(outboundChannel.pipeline(), connected.getMaxMessageSize()); inboundChannel.pipeline().addBefore("handler", "inboundParser", - new ParserProxyHandler(service, + new ParserProxyHandler(parserContext, service, ParserProxyHandler.FRONTEND_CONN, connected.getMaxMessageSize(), outboundChannel.id())); outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser", - new ParserProxyHandler(service, + new ParserProxyHandler(parserContext, service, ParserProxyHandler.BACKEND_CONN, connected.getMaxMessageSize(), inboundChannel.id())); } else { inboundChannel.pipeline().addBefore("handler", "inboundParser", - new ParserProxyHandler(service, + new ParserProxyHandler(parserContext, service, ParserProxyHandler.FRONTEND_CONN, Commands.DEFAULT_MAX_MESSAGE_SIZE, outboundChannel.id())); outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser", - new ParserProxyHandler(service, + new ParserProxyHandler(parserContext, service, ParserProxyHandler.BACKEND_CONN, Commands.DEFAULT_MAX_MESSAGE_SIZE, inboundChannel.id())); } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java index 3a98311eb15..d3de3a2ff0a 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java @@ -31,11 +31,13 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import lombok.Getter; import org.apache.commons.lang3.mutable.MutableLong; import org.apache.pulsar.common.api.proto.BaseCommand; import org.apache.pulsar.common.api.raw.MessageParser; import org.apache.pulsar.common.api.raw.RawMessage; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.util.StringInterner; import org.apache.pulsar.proxy.stats.TopicStats; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,27 +55,41 @@ public class ParserProxyHandler extends ChannelInboundHandlerAdapter { private final int maxMessageSize; private final ChannelId peerChannelId; + @Getter + private final Context context; private final ProxyService service; - /** - * producerid + channelid as key. - */ - private static final Map<String, String> producerHashMap = new ConcurrentHashMap<>(); + public static class Context { + /** + * producerid as key. + */ + @Getter + private final Map<Long, String> producerIdToTopicName = new ConcurrentHashMap<>(); - /** - * consumerid + channelid as key. - */ - private static final Map<String, String> consumerHashMap = new ConcurrentHashMap<>(); + /** + * consumerid as key. + */ + @Getter + private final Map<Long, String> consumerIdToTopicName = new ConcurrentHashMap<>(); - public ParserProxyHandler(ProxyService service, String type, int maxMessageSize, + private Context() { + } + } + + public ParserProxyHandler(Context context, ProxyService service, String type, int maxMessageSize, ChannelId peerChannelId) { + this.context = context; this.service = service; this.connType = type; this.maxMessageSize = maxMessageSize; this.peerChannelId = peerChannelId; } + public static Context createContext() { + return new Context(); + } + private void logging(Channel conn, BaseCommand.Type cmdtype, String info, List<RawMessage> messages) { if (messages != null) { @@ -115,64 +131,86 @@ public class ParserProxyHandler extends ChannelInboundHandlerAdapter { switch (cmd.getType()) { case PRODUCER: - ParserProxyHandler.producerHashMap.put(cmd.getProducer().getProducerId() + "," + ctx.channel().id(), - cmd.getProducer().getTopic()); + topicName = StringInterner.intern(cmd.getProducer().getTopic()); + context.producerIdToTopicName.put(cmd.getProducer().getProducerId(), topicName); String producerName = ""; if (cmd.getProducer().hasProducerName()){ producerName = cmd.getProducer().getProducerName(); } logging(ctx.channel(), cmd.getType(), "{producer:" + producerName - + ",topic:" + cmd.getProducer().getTopic() + "}", null); + + ",topic:" + topicName + "}", null); + break; + case CLOSE_PRODUCER: + context.producerIdToTopicName.remove(cmd.getCloseProducer().getProducerId()); + logging(ctx.channel(), cmd.getType(), "", null); break; - case SEND: if (service.getProxyLogLevel() != 2) { logging(ctx.channel(), cmd.getType(), "", null); break; } - topicName = TopicName.toFullTopicName(ParserProxyHandler.producerHashMap.get( - cmd.getSend().getProducerId() + "," + ctx.channel().id())); - MutableLong msgBytes = new MutableLong(0); - MessageParser.parseMessage(topicName, -1L, - -1L, buffer, (message) -> { - messages.add(message); - msgBytes.add(message.getData().readableBytes()); - }, maxMessageSize); - // update topic stats - TopicStats topicStats = this.service.getTopicStats().computeIfAbsent(topicName, - topic -> new TopicStats()); - topicStats.getMsgInRate().recordMultipleEvents(messages.size(), msgBytes.longValue()); - logging(ctx.channel(), cmd.getType(), "", messages); + long producerId = cmd.getSend().getProducerId(); + String topicForProducer = context.producerIdToTopicName.get(producerId); + if (topicForProducer != null) { + topicName = TopicName.toFullTopicName(topicForProducer); + MutableLong msgBytes = new MutableLong(0); + MessageParser.parseMessage(topicName, -1L, + -1L, buffer, (message) -> { + messages.add(message); + msgBytes.add(message.getData().readableBytes()); + }, maxMessageSize); + // update topic stats + TopicStats topicStats = this.service.getTopicStats().computeIfAbsent(topicName, + topic -> new TopicStats()); + topicStats.getMsgInRate().recordMultipleEvents(messages.size(), msgBytes.longValue()); + logging(ctx.channel(), cmd.getType(), "", messages); + } else { + logging(ctx.channel(), cmd.getType(), + "Cannot find topic name for producerId " + producerId, null); + } break; case SUBSCRIBE: - ParserProxyHandler.consumerHashMap.put(cmd.getSubscribe().getConsumerId() + "," - + ctx.channel().id(), cmd.getSubscribe().getTopic()); + topicName = StringInterner.intern(cmd.getSubscribe().getTopic()); + context.consumerIdToTopicName.put(cmd.getSubscribe().getConsumerId(), topicName); logging(ctx.channel(), cmd.getType(), "{consumer:" + cmd.getSubscribe().getConsumerName() - + ",topic:" + cmd.getSubscribe().getTopic() + "}", null); + + ",topic:" + topicName + "}", null); + break; + case CLOSE_CONSUMER: + context.consumerIdToTopicName.remove(cmd.getCloseConsumer().getConsumerId()); + logging(ctx.channel(), cmd.getType(), "", null); + break; + case UNSUBSCRIBE: + context.consumerIdToTopicName.remove(cmd.getUnsubscribe().getConsumerId()); + logging(ctx.channel(), cmd.getType(), "", null); break; - case MESSAGE: if (service.getProxyLogLevel() != 2) { logging(ctx.channel(), cmd.getType(), "", null); break; } - topicName = TopicName.toFullTopicName(ParserProxyHandler.consumerHashMap.get( - cmd.getMessage().getConsumerId() + "," + peerChannelId)); - - msgBytes = new MutableLong(0); - MessageParser.parseMessage(topicName, -1L, - -1L, buffer, (message) -> { - messages.add(message); - msgBytes.add(message.getData().readableBytes()); - }, maxMessageSize); - // update topic stats - topicStats = this.service.getTopicStats().computeIfAbsent(topicName.toString(), - topic -> new TopicStats()); - topicStats.getMsgOutRate().recordMultipleEvents(messages.size(), msgBytes.longValue()); - logging(ctx.channel(), cmd.getType(), "", messages); + long consumerId = cmd.getMessage().getConsumerId(); + String topicForConsumer = context.consumerIdToTopicName.get(consumerId); + if (topicForConsumer != null) { + topicName = TopicName.toFullTopicName(topicForConsumer); + + MutableLong msgBytes = new MutableLong(0); + MessageParser.parseMessage(topicName, -1L, + -1L, buffer, (message) -> { + messages.add(message); + msgBytes.add(message.getData().readableBytes()); + }, maxMessageSize); + // update topic stats + TopicStats topicStats = this.service.getTopicStats().computeIfAbsent(topicName.toString(), + topic -> new TopicStats()); + topicStats.getMsgOutRate().recordMultipleEvents(messages.size(), msgBytes.longValue()); + logging(ctx.channel(), cmd.getType(), "", messages); + } else { + logging(ctx.channel(), cmd.getType(), "Cannot find topic name for consumerId " + consumerId, + null); + } break; default: diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java index 251e25a0e6b..670ac08dee9 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java @@ -55,7 +55,6 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import lombok.Getter; -import lombok.Setter; import org.apache.pulsar.broker.ServiceConfigurationUtils; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.authorization.AuthorizationService; @@ -116,7 +115,6 @@ public class ProxyService implements Closeable { protected final AtomicReference<Semaphore> lookupRequestSemaphore; @Getter - @Setter protected int proxyLogLevel; @Getter @@ -590,4 +588,14 @@ public class ProxyService implements Closeable { public void setGracefulShutdown(boolean gracefulShutdown) { this.gracefulShutdown = gracefulShutdown; } + + public void setProxyLogLevel(int proxyLogLevel) { + this.proxyLogLevel = proxyLogLevel; + // clear the topic stats when proxy log level is changed to < 2 + // this is a way to avoid the proxy consuming too much memory when there are a lot of topics and log level + // has been temporarily set to 2 + if (proxyLogLevel < 2) { + topicStats.clear(); + } + } } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java index 17dac99d632..b6a0758b528 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java @@ -21,7 +21,9 @@ package org.apache.pulsar.proxy.server; import static java.util.Objects.requireNonNull; import static org.mockito.Mockito.doReturn; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import java.util.List; @@ -214,6 +216,62 @@ public class ProxyStatsTest extends MockedPulsarServiceBaseTest { consumer.close(); consumer2.close(); + + // check that topic stats are cleared after setting proxy log level to 0 + assertFalse(proxyService.getTopicStats().isEmpty()); + proxyService.setProxyLogLevel(0); + assertTrue(proxyService.getTopicStats().isEmpty()); + } + + @Test + public void testMemoryLeakFixed() throws Exception { + proxyService.setProxyLogLevel(2); + final String topicName = "persistent://sample/test/local/topic-stats"; + final String topicName2 = "persistent://sample/test/local/topic-stats-2"; + + @Cleanup + PulsarClient client = PulsarClient.builder().serviceUrl(proxyService.getServiceUrl()).build(); + Producer<byte[]> producer1 = client.newProducer(Schema.BYTES).topic(topicName).enableBatching(false) + .producerName("producer1").messageRoutingMode(MessageRoutingMode.SinglePartition).create(); + + Producer<byte[]> producer2 = client.newProducer(Schema.BYTES).topic(topicName2).enableBatching(false) + .producerName("producer2").messageRoutingMode(MessageRoutingMode.SinglePartition).create(); + + Consumer<byte[]> consumer = client.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe(); + Consumer<byte[]> consumer2 = client.newConsumer().topic(topicName2).subscriptionName("my-sub") + .subscribe(); + + int totalMessages = 10; + for (int i = 0; i < totalMessages; i++) { + producer1.send("test".getBytes()); + producer2.send("test".getBytes()); + } + + for (int i = 0; i < totalMessages; i++) { + Message<byte[]> msg = consumer.receive(1, TimeUnit.SECONDS); + requireNonNull(msg); + consumer.acknowledge(msg); + msg = consumer2.receive(1, TimeUnit.SECONDS); + } + + ParserProxyHandler.Context context = proxyService.getClientCnxs().stream().map(proxyConnection -> { + ParserProxyHandler parserProxyHandler = proxyConnection.ctx().pipeline().get(ParserProxyHandler.class); + return parserProxyHandler != null ? parserProxyHandler.getContext() : null; + }).filter(c -> c != null && !c.getConsumerIdToTopicName().isEmpty()).findFirst().get(); + + assertEquals(context.getConsumerIdToTopicName().size(), 2); + assertEquals(context.getProducerIdToTopicName().size(), 2); + + + consumer.close(); + assertEquals(context.getConsumerIdToTopicName().size(), 1); + consumer2.close(); + assertEquals(context.getConsumerIdToTopicName().size(), 0); + + producer1.close(); + assertEquals(context.getProducerIdToTopicName().size(), 1); + producer2.close(); + assertEquals(context.getProducerIdToTopicName().size(), 0); } /**
