This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 46cacffd882 [fix][broker] Fix direct memory leak in RawReaderImpl
(#18928)
46cacffd882 is described below
commit 46cacffd882259712271d6d01ba3c10d3355b112
Author: Lari Hotari <[email protected]>
AuthorDate: Tue Dec 20 22:37:33 2022 +0200
[fix][broker] Fix direct memory leak in RawReaderImpl (#18928)
---
.../apache/pulsar/client/impl/RawReaderImpl.java | 60 +++++++++++++++++-----
1 file changed, 47 insertions(+), 13 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 255df22580b..9a1c972b2cc 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -166,6 +166,32 @@ public class RawReaderImpl implements RawReader {
}
}
+ @Override
+ protected CompletableFuture<Void> failPendingReceive() {
+ if (internalPinnedExecutor.isShutdown()) {
+ failPendingRawReceives();
+ return CompletableFuture.completedFuture(null);
+ } else {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ internalPinnedExecutor.execute(() -> {
+ try {
+ failPendingRawReceives();
+ } finally {
+ future.complete(null);
+ }
+ });
+ return future;
+ }
+ }
+
+ private void failPendingRawReceives() {
+ List<CompletableFuture<RawMessage>> toError = new ArrayList<>();
+ while (!pendingRawReceives.isEmpty()) {
+ toError.add(pendingRawReceives.remove());
+ }
+ toError.forEach((f) -> f.cancel(false));
+ }
+
CompletableFuture<RawMessage> receiveRawAsync() {
CompletableFuture<RawMessage> result = new CompletableFuture<>();
pendingRawReceives.add(result);
@@ -174,19 +200,22 @@ public class RawReaderImpl implements RawReader {
}
private void reset() {
- List<CompletableFuture<RawMessage>> toError = new ArrayList<>();
- synchronized (this) {
- while (!pendingRawReceives.isEmpty()) {
- toError.add(pendingRawReceives.remove());
- }
- RawMessageAndCnx m = incomingRawMessages.poll();
- while (m != null) {
- m.msg.close();
- m = incomingRawMessages.poll();
- }
- incomingRawMessages.clear();
+ failPendingRawReceives();
+ clearIncomingRawMessages();
+ }
+
+ private void clearIncomingRawMessages() {
+ RawMessageAndCnx m = incomingRawMessages.poll();
+ while (m != null) {
+ m.msg.close();
+ m = incomingRawMessages.poll();
}
- toError.forEach((f) -> f.cancel(false));
+ }
+
+ @Override
+ protected void clearIncomingMessages() {
+ super.clearIncomingMessages();
+ clearIncomingRawMessages();
}
@Override
@@ -203,12 +232,17 @@ public class RawReaderImpl implements RawReader {
@Override
public CompletableFuture<Void> closeAsync() {
+ CompletableFuture<Void> closeFuture = super.closeAsync();
reset();
- return super.closeAsync();
+ return closeFuture;
}
@Override
void messageReceived(CommandMessage commandMessage, ByteBuf
headersAndPayload, ClientCnx cnx) {
+ State state = getState();
+ if (state == State.Closing || state == State.Closed) {
+ return;
+ }
MessageIdData messageId = commandMessage.getMessageId();
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Received raw message: {}/{}/{}", topic,
subscription,