This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 99fdca8af67 [fix][proxy] Fix memory leaks in ParserProxyHandler
(#25142)
99fdca8af67 is described below
commit 99fdca8af6742dd36bec93ab0038fb572e6eeb0e
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Jan 15 10:39:11 2026 +0200
[fix][proxy] Fix memory leaks in ParserProxyHandler (#25142)
---
.../pulsar/proxy/server/DirectProxyHandler.java | 9 +-
.../pulsar/proxy/server/ParserProxyHandler.java | 126 ++++++++++++++-------
.../apache/pulsar/proxy/server/ProxyService.java | 12 +-
.../apache/pulsar/proxy/server/ProxyStatsTest.java | 58 ++++++++++
4 files changed, 155 insertions(+), 50 deletions(-)
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index 5f4456d356e..f707abbc06a 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -426,26 +426,27 @@ public class DirectProxyHandler {
} else {
// Enable parsing feature, proxyLogLevel(1 or 2)
// Add parser handler
+ ParserProxyHandler.Context parserContext =
ParserProxyHandler.createContext();
if (connected.hasMaxMessageSize()) {
FrameDecoderUtil.replaceFrameDecoder(inboundChannel.pipeline(),
connected.getMaxMessageSize());
FrameDecoderUtil.replaceFrameDecoder(outboundChannel.pipeline(),
connected.getMaxMessageSize());
inboundChannel.pipeline().addBefore("handler",
"inboundParser",
- new ParserProxyHandler(service,
+ new ParserProxyHandler(parserContext, service,
ParserProxyHandler.FRONTEND_CONN,
connected.getMaxMessageSize(),
outboundChannel.id()));
outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser",
- new ParserProxyHandler(service,
+ new ParserProxyHandler(parserContext, service,
ParserProxyHandler.BACKEND_CONN,
connected.getMaxMessageSize(),
inboundChannel.id()));
} else {
inboundChannel.pipeline().addBefore("handler",
"inboundParser",
- new ParserProxyHandler(service,
+ new ParserProxyHandler(parserContext, service,
ParserProxyHandler.FRONTEND_CONN,
Commands.DEFAULT_MAX_MESSAGE_SIZE,
outboundChannel.id()));
outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser",
- new ParserProxyHandler(service,
+ new ParserProxyHandler(parserContext, service,
ParserProxyHandler.BACKEND_CONN,
Commands.DEFAULT_MAX_MESSAGE_SIZE,
inboundChannel.id()));
}
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
index 3a98311eb15..d3de3a2ff0a 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java
@@ -31,11 +31,13 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import lombok.Getter;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.raw.MessageParser;
import org.apache.pulsar.common.api.raw.RawMessage;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.StringInterner;
import org.apache.pulsar.proxy.stats.TopicStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,27 +55,41 @@ public class ParserProxyHandler extends
ChannelInboundHandlerAdapter {
private final int maxMessageSize;
private final ChannelId peerChannelId;
+ @Getter
+ private final Context context;
private final ProxyService service;
- /**
- * producerid + channelid as key.
- */
- private static final Map<String, String> producerHashMap = new
ConcurrentHashMap<>();
+ public static class Context {
+ /**
+ * producerid as key.
+ */
+ @Getter
+ private final Map<Long, String> producerIdToTopicName = new
ConcurrentHashMap<>();
- /**
- * consumerid + channelid as key.
- */
- private static final Map<String, String> consumerHashMap = new
ConcurrentHashMap<>();
+ /**
+ * consumerid as key.
+ */
+ @Getter
+ private final Map<Long, String> consumerIdToTopicName = new
ConcurrentHashMap<>();
- public ParserProxyHandler(ProxyService service, String type, int
maxMessageSize,
+ private Context() {
+ }
+ }
+
+ public ParserProxyHandler(Context context, ProxyService service, String
type, int maxMessageSize,
ChannelId peerChannelId) {
+ this.context = context;
this.service = service;
this.connType = type;
this.maxMessageSize = maxMessageSize;
this.peerChannelId = peerChannelId;
}
+ public static Context createContext() {
+ return new Context();
+ }
+
private void logging(Channel conn, BaseCommand.Type cmdtype, String info,
List<RawMessage> messages) {
if (messages != null) {
@@ -115,64 +131,86 @@ public class ParserProxyHandler extends
ChannelInboundHandlerAdapter {
switch (cmd.getType()) {
case PRODUCER:
-
ParserProxyHandler.producerHashMap.put(cmd.getProducer().getProducerId() + ","
+ ctx.channel().id(),
- cmd.getProducer().getTopic());
+ topicName =
StringInterner.intern(cmd.getProducer().getTopic());
+
context.producerIdToTopicName.put(cmd.getProducer().getProducerId(), topicName);
String producerName = "";
if (cmd.getProducer().hasProducerName()){
producerName = cmd.getProducer().getProducerName();
}
logging(ctx.channel(), cmd.getType(), "{producer:" +
producerName
- + ",topic:" + cmd.getProducer().getTopic() + "}",
null);
+ + ",topic:" + topicName + "}", null);
+ break;
+ case CLOSE_PRODUCER:
+
context.producerIdToTopicName.remove(cmd.getCloseProducer().getProducerId());
+ logging(ctx.channel(), cmd.getType(), "", null);
break;
-
case SEND:
if (service.getProxyLogLevel() != 2) {
logging(ctx.channel(), cmd.getType(), "", null);
break;
}
- topicName =
TopicName.toFullTopicName(ParserProxyHandler.producerHashMap.get(
- cmd.getSend().getProducerId() + "," +
ctx.channel().id()));
- MutableLong msgBytes = new MutableLong(0);
- MessageParser.parseMessage(topicName, -1L,
- -1L, buffer, (message) -> {
- messages.add(message);
-
msgBytes.add(message.getData().readableBytes());
- }, maxMessageSize);
- // update topic stats
- TopicStats topicStats =
this.service.getTopicStats().computeIfAbsent(topicName,
- topic -> new TopicStats());
-
topicStats.getMsgInRate().recordMultipleEvents(messages.size(),
msgBytes.longValue());
- logging(ctx.channel(), cmd.getType(), "", messages);
+ long producerId = cmd.getSend().getProducerId();
+ String topicForProducer =
context.producerIdToTopicName.get(producerId);
+ if (topicForProducer != null) {
+ topicName =
TopicName.toFullTopicName(topicForProducer);
+ MutableLong msgBytes = new MutableLong(0);
+ MessageParser.parseMessage(topicName, -1L,
+ -1L, buffer, (message) -> {
+ messages.add(message);
+
msgBytes.add(message.getData().readableBytes());
+ }, maxMessageSize);
+ // update topic stats
+ TopicStats topicStats =
this.service.getTopicStats().computeIfAbsent(topicName,
+ topic -> new TopicStats());
+
topicStats.getMsgInRate().recordMultipleEvents(messages.size(),
msgBytes.longValue());
+ logging(ctx.channel(), cmd.getType(), "", messages);
+ } else {
+ logging(ctx.channel(), cmd.getType(),
+ "Cannot find topic name for producerId " +
producerId, null);
+ }
break;
case SUBSCRIBE:
-
ParserProxyHandler.consumerHashMap.put(cmd.getSubscribe().getConsumerId() + ","
- + ctx.channel().id(),
cmd.getSubscribe().getTopic());
+ topicName =
StringInterner.intern(cmd.getSubscribe().getTopic());
+
context.consumerIdToTopicName.put(cmd.getSubscribe().getConsumerId(),
topicName);
logging(ctx.channel(), cmd.getType(), "{consumer:" +
cmd.getSubscribe().getConsumerName()
- + ",topic:" + cmd.getSubscribe().getTopic() + "}",
null);
+ + ",topic:" + topicName + "}", null);
+ break;
+ case CLOSE_CONSUMER:
+
context.consumerIdToTopicName.remove(cmd.getCloseConsumer().getConsumerId());
+ logging(ctx.channel(), cmd.getType(), "", null);
+ break;
+ case UNSUBSCRIBE:
+
context.consumerIdToTopicName.remove(cmd.getUnsubscribe().getConsumerId());
+ logging(ctx.channel(), cmd.getType(), "", null);
break;
-
case MESSAGE:
if (service.getProxyLogLevel() != 2) {
logging(ctx.channel(), cmd.getType(), "", null);
break;
}
- topicName =
TopicName.toFullTopicName(ParserProxyHandler.consumerHashMap.get(
- cmd.getMessage().getConsumerId() + "," +
peerChannelId));
-
- msgBytes = new MutableLong(0);
- MessageParser.parseMessage(topicName, -1L,
- -1L, buffer, (message) -> {
- messages.add(message);
-
msgBytes.add(message.getData().readableBytes());
- }, maxMessageSize);
- // update topic stats
- topicStats =
this.service.getTopicStats().computeIfAbsent(topicName.toString(),
- topic -> new TopicStats());
-
topicStats.getMsgOutRate().recordMultipleEvents(messages.size(),
msgBytes.longValue());
- logging(ctx.channel(), cmd.getType(), "", messages);
+ long consumerId = cmd.getMessage().getConsumerId();
+ String topicForConsumer =
context.consumerIdToTopicName.get(consumerId);
+ if (topicForConsumer != null) {
+ topicName =
TopicName.toFullTopicName(topicForConsumer);
+
+ MutableLong msgBytes = new MutableLong(0);
+ MessageParser.parseMessage(topicName, -1L,
+ -1L, buffer, (message) -> {
+ messages.add(message);
+
msgBytes.add(message.getData().readableBytes());
+ }, maxMessageSize);
+ // update topic stats
+ TopicStats topicStats =
this.service.getTopicStats().computeIfAbsent(topicName.toString(),
+ topic -> new TopicStats());
+
topicStats.getMsgOutRate().recordMultipleEvents(messages.size(),
msgBytes.longValue());
+ logging(ctx.channel(), cmd.getType(), "", messages);
+ } else {
+ logging(ctx.channel(), cmd.getType(), "Cannot find
topic name for consumerId " + consumerId,
+ null);
+ }
break;
default:
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 251e25a0e6b..670ac08dee9 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -55,7 +55,6 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Getter;
-import lombok.Setter;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
@@ -116,7 +115,6 @@ public class ProxyService implements Closeable {
protected final AtomicReference<Semaphore> lookupRequestSemaphore;
@Getter
- @Setter
protected int proxyLogLevel;
@Getter
@@ -590,4 +588,14 @@ public class ProxyService implements Closeable {
public void setGracefulShutdown(boolean gracefulShutdown) {
this.gracefulShutdown = gracefulShutdown;
}
+
+ public void setProxyLogLevel(int proxyLogLevel) {
+ this.proxyLogLevel = proxyLogLevel;
+ // clear the topic stats when proxy log level is changed to < 2
+ // this is a way to avoid the proxy consuming too much memory when
there are a lot of topics and log level
+ // has been temporarily set to 2
+ if (proxyLogLevel < 2) {
+ topicStats.clear();
+ }
+ }
}
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
index 17dac99d632..b6a0758b528 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
@@ -21,7 +21,9 @@ package org.apache.pulsar.proxy.server;
import static java.util.Objects.requireNonNull;
import static org.mockito.Mockito.doReturn;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.util.List;
@@ -214,6 +216,62 @@ public class ProxyStatsTest extends
MockedPulsarServiceBaseTest {
consumer.close();
consumer2.close();
+
+ // check that topic stats are cleared after setting proxy log level to 0
+ assertFalse(proxyService.getTopicStats().isEmpty());
+ proxyService.setProxyLogLevel(0);
+ assertTrue(proxyService.getTopicStats().isEmpty());
+ }
+
+ @Test
+ public void testMemoryLeakFixed() throws Exception {
+ proxyService.setProxyLogLevel(2);
+ final String topicName = "persistent://sample/test/local/topic-stats";
+ final String topicName2 =
"persistent://sample/test/local/topic-stats-2";
+
+ @Cleanup
+ PulsarClient client =
PulsarClient.builder().serviceUrl(proxyService.getServiceUrl()).build();
+ Producer<byte[]> producer1 =
client.newProducer(Schema.BYTES).topic(topicName).enableBatching(false)
+
.producerName("producer1").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
+
+ Producer<byte[]> producer2 =
client.newProducer(Schema.BYTES).topic(topicName2).enableBatching(false)
+
.producerName("producer2").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
+
+ Consumer<byte[]> consumer =
client.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe();
+ Consumer<byte[]> consumer2 =
client.newConsumer().topic(topicName2).subscriptionName("my-sub")
+ .subscribe();
+
+ int totalMessages = 10;
+ for (int i = 0; i < totalMessages; i++) {
+ producer1.send("test".getBytes());
+ producer2.send("test".getBytes());
+ }
+
+ for (int i = 0; i < totalMessages; i++) {
+ Message<byte[]> msg = consumer.receive(1, TimeUnit.SECONDS);
+ requireNonNull(msg);
+ consumer.acknowledge(msg);
+ msg = consumer2.receive(1, TimeUnit.SECONDS);
+ }
+
+ ParserProxyHandler.Context context =
proxyService.getClientCnxs().stream().map(proxyConnection -> {
+ ParserProxyHandler parserProxyHandler =
proxyConnection.ctx().pipeline().get(ParserProxyHandler.class);
+ return parserProxyHandler != null ?
parserProxyHandler.getContext() : null;
+ }).filter(c -> c != null &&
!c.getConsumerIdToTopicName().isEmpty()).findFirst().get();
+
+ assertEquals(context.getConsumerIdToTopicName().size(), 2);
+ assertEquals(context.getProducerIdToTopicName().size(), 2);
+
+
+ consumer.close();
+ assertEquals(context.getConsumerIdToTopicName().size(), 1);
+ consumer2.close();
+ assertEquals(context.getConsumerIdToTopicName().size(), 0);
+
+ producer1.close();
+ assertEquals(context.getProducerIdToTopicName().size(), 1);
+ producer2.close();
+ assertEquals(context.getProducerIdToTopicName().size(), 0);
}
/**