This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f2ec1b4 Fix memory leak when running topic compaction. (#6485)
f2ec1b4 is described below
commit f2ec1b4e2836859b0a6beb9b5a12656e4bcaf8f9
Author: Rolf Arne Corneliussen <[email protected]>
AuthorDate: Fri Mar 6 07:32:28 2020 +0100
Fix memory leak when running topic compaction. (#6485)
Fixes #6482
### Motivation
Prevent topic compaction from leaking direct memory
### Modifications
Several leaks were discovered using Netty leak detection and code review.
* `CompactedTopicImpl.readOneMessageId` would get an `Enumeration` of
`LedgerEntry`, but did not release the underlying buffers. Fix: iterate though
the `Enumeration` and release underlying buffer. Instead of logging the case
where the `Enumeration` did not contain any elements, complete the future
exceptionally with the message (will be logged by Caffeine).
* Two main sources of leak in `TwoPhaseCompactor`. The
`RawBacthConverter.rebatchMessage` method failed to close/release a `ByteBuf`
(uncompressedPayload). Also, the return ByteBuf of
`RawBacthConverter.rebatchMessage` was not closed. The first one was easy to
fix (release buffer), to fix the second one and make the code easier to read, I
decided to not let `RawBacthConverter.rebatchMessage` close the message read
from the topic, instead the message read from the topic can be closed [...]
### Verifying this change
Modified `RawReaderTest.testBatchingRebatch` to show new contract.
One can run the test described to reproduce the issue, to verify no leak is
detected.
---
.../pulsar/client/impl/RawBatchConverter.java | 5 +-
.../pulsar/compaction/CompactedTopicImpl.java | 19 ++--
.../pulsar/compaction/TwoPhaseCompactor.java | 119 +++++++++++----------
.../apache/pulsar/client/impl/RawReaderTest.java | 4 +-
4 files changed, 82 insertions(+), 65 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
index e252426..8c21a73 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
@@ -91,8 +91,7 @@ public class RawBatchConverter {
* Take a batched message and a filter, and returns a message with the
only the sub-messages
* which match the filter. Returns an empty optional if no messages match.
*
- * This takes ownership of the passes in message, and if the returned
optional is not empty,
- * the ownership of that message is returned also.
+ * NOTE: this message does not alter the reference count of the
RawMessage argument.
*/
public static Optional<RawMessage> rebatchMessage(RawMessage msg,
BiPredicate<String,
MessageId> filter)
@@ -161,9 +160,9 @@ public class RawBatchConverter {
return Optional.empty();
}
} finally {
+ uncompressedPayload.release();
batchBuffer.release();
metadata.recycle();
- msg.close();
}
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index b1378b6..22efe8e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -164,12 +164,19 @@ public class CompactedTopicImpl implements CompactedTopic
{
if (rc != BKException.Code.OK) {
promise.completeExceptionally(BKException.create(rc));
} else {
- try (RawMessage m =
RawMessageImpl.deserializeFrom(
-
seq.nextElement().getEntryBuffer())) {
- promise.complete(m.getMessageIdData());
- } catch (NoSuchElementException e) {
- log.error("No such entry {} in ledger
{}", entryId, lh.getId());
- promise.completeExceptionally(e);
+ // Need to release buffers for all entries
in the sequence
+ if (seq.hasMoreElements()) {
+ LedgerEntry entry = seq.nextElement();
+ try (RawMessage m =
RawMessageImpl.deserializeFrom(entry.getEntryBuffer())) {
+ entry.getEntryBuffer().release();
+ while (seq.hasMoreElements()) {
+
seq.nextElement().getEntryBuffer().release();
+ }
+
promise.complete(m.getMessageIdData());
+ }
+ } else {
+ promise.completeExceptionally(new
NoSuchElementException(
+ String.format("No such entry
%d in ledger %d", entryId, lh.getId())));
}
}
}, null);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index 95f6f1a..a275bb5 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -212,77 +212,88 @@ public class TwoPhaseCompactor extends Compactor {
private void phaseTwoLoop(RawReader reader, MessageId to, Map<String,
MessageId> latestForKey,
LedgerHandle lh, Semaphore outstanding,
CompletableFuture<Void> promise) {
+ if (promise.isDone()) {
+ return;
+ }
reader.readNextAsync().whenCompleteAsync(
(m, exception) -> {
if (exception != null) {
promise.completeExceptionally(exception);
return;
} else if (promise.isDone()) {
+ m.close();
return;
}
- MessageId id = m.getMessageId();
- Optional<RawMessage> messageToAdd = Optional.empty();
- if (RawBatchConverter.isReadableBatch(m)) {
- try {
- messageToAdd = RawBatchConverter.rebatchMessage(
- m, (key, subid) ->
latestForKey.get(key).equals(subid));
- } catch (IOException ioe) {
- log.info("Error decoding batch for message {}.
Whole batch will be included in output",
- id, ioe);
- messageToAdd = Optional.of(m);
- }
- } else {
- Pair<String,Integer> keyAndSize = extractKeyAndSize(m);
- MessageId msg;
- if (keyAndSize == null) { // pass through messages
without a key
- messageToAdd = Optional.of(m);
- } else if ((msg =
latestForKey.get(keyAndSize.getLeft())) != null
- && msg.equals(id)) { // consider message only
if present into latestForKey map
- if (keyAndSize.getRight() <= 0) {
- promise.completeExceptionally(new
IllegalArgumentException(
- "Compaction phase found empty record
from sorted key-map"));
+ try {
+ MessageId id = m.getMessageId();
+ Optional<RawMessage> messageToAdd = Optional.empty();
+ if (RawBatchConverter.isReadableBatch(m)) {
+ try {
+ messageToAdd =
RawBatchConverter.rebatchMessage(
+ m, (key, subid) ->
latestForKey.get(key).equals(subid));
+ } catch (IOException ioe) {
+ log.info("Error decoding batch for message {}.
Whole batch will be included in output",
+ id, ioe);
+ messageToAdd = Optional.of(m);
}
- messageToAdd = Optional.of(m);
} else {
- m.close();
+ Pair<String,Integer> keyAndSize =
extractKeyAndSize(m);
+ MessageId msg;
+ if (keyAndSize == null) { // pass through messages
without a key
+ messageToAdd = Optional.of(m);
+ } else if ((msg =
latestForKey.get(keyAndSize.getLeft())) != null
+ && msg.equals(id)) { // consider message
only if present into latestForKey map
+ if (keyAndSize.getRight() <= 0) {
+ promise.completeExceptionally(new
IllegalArgumentException(
+ "Compaction phase found empty
record from sorted key-map"));
+ }
+ messageToAdd = Optional.of(m);
+ }
}
- }
- if (messageToAdd.isPresent()) {
- try {
- outstanding.acquire();
- CompletableFuture<Void> addFuture =
addToCompactedLedger(lh, messageToAdd.get())
- .whenComplete((res, exception2) -> {
- outstanding.release();
- if (exception2 != null) {
-
promise.completeExceptionally(exception2);
+ if (messageToAdd.isPresent()) {
+ RawMessage message = messageToAdd.get();
+ try {
+ outstanding.acquire();
+ CompletableFuture<Void> addFuture =
addToCompactedLedger(lh, message)
+ .whenComplete((res, exception2) -> {
+ outstanding.release();
+ if (exception2 != null) {
+
promise.completeExceptionally(exception2);
+ }
+ });
+ if (to.equals(id)) {
+ addFuture.whenComplete((res, exception2)
-> {
+ if (exception2 == null) {
+ promise.complete(null);
}
});
- if (to.equals(id)) {
- addFuture.whenComplete((res, exception2) -> {
- if (exception2 == null) {
- promise.complete(null);
- }
- });
+ }
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ promise.completeExceptionally(ie);
+ } finally {
+ if (message != m) {
+ message.close();
+ }
}
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- promise.completeExceptionally(ie);
- }
- } else if (to.equals(id)) {
- // Reached to last-id and phase-one found it
deleted-message while iterating on ledger so, not
- // present under latestForKey. Complete the compaction.
- try {
- // make sure all inflight writes have finished
- outstanding.acquire(MAX_OUTSTANDING);
- promise.complete(null);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- promise.completeExceptionally(e);
+ } else if (to.equals(id)) {
+ // Reached to last-id and phase-one found it
deleted-message while iterating on ledger so,
+ // not present under latestForKey. Complete the
compaction.
+ try {
+ // make sure all inflight writes have finished
+ outstanding.acquire(MAX_OUTSTANDING);
+ promise.complete(null);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ promise.completeExceptionally(e);
+ }
+ return;
}
- return;
+ phaseTwoLoop(reader, to, latestForKey, lh,
outstanding, promise);
+ } finally {
+ m.close();
}
- phaseTwoLoop(reader, to, latestForKey, lh, outstanding,
promise);
}, scheduler);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
index b0c7cd1..5ae4618 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
@@ -319,13 +319,13 @@ public class RawReaderTest extends
MockedPulsarServiceBaseTest {
}
RawReader reader = RawReader.create(pulsarClient, topic,
subscription).get();
- try {
- RawMessage m1 = reader.readNextAsync().get();
+ try (RawMessage m1 = reader.readNextAsync().get()) {
RawMessage m2 = RawBatchConverter.rebatchMessage(m1, (key, id) ->
key.equals("key2")).get();
List<ImmutablePair<MessageId,String>> idsAndKeys =
RawBatchConverter.extractIdsAndKeys(m2);
Assert.assertEquals(idsAndKeys.size(), 1);
Assert.assertEquals(idsAndKeys.get(0).getRight(), "key2");
m2.close();
+ Assert.assertEquals(m1.getHeadersAndPayload().refCnt(), 1);
} finally {
reader.closeAsync().get();
}