This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new f2ec1b4 Fix memory leak when running topic compaction. (#6485) f2ec1b4 is described below commit f2ec1b4e2836859b0a6beb9b5a12656e4bcaf8f9 Author: Rolf Arne Corneliussen <rac...@users.noreply.github.com> AuthorDate: Fri Mar 6 07:32:28 2020 +0100 Fix memory leak when running topic compaction. (#6485) Fixes #6482 ### Motivation Prevent topic compaction from leaking direct memory ### Modifications Several leaks were discovered using Netty leak detection and code review. * `CompactedTopicImpl.readOneMessageId` would get an `Enumeration` of `LedgerEntry`, but did not release the underlying buffers. Fix: iterate though the `Enumeration` and release underlying buffer. Instead of logging the case where the `Enumeration` did not contain any elements, complete the future exceptionally with the message (will be logged by Caffeine). * Two main sources of leak in `TwoPhaseCompactor`. The `RawBacthConverter.rebatchMessage` method failed to close/release a `ByteBuf` (uncompressedPayload). Also, the return ByteBuf of `RawBacthConverter.rebatchMessage` was not closed. The first one was easy to fix (release buffer), to fix the second one and make the code easier to read, I decided to not let `RawBacthConverter.rebatchMessage` close the message read from the topic, instead the message read from the topic can be closed [...] ### Verifying this change Modified `RawReaderTest.testBatchingRebatch` to show new contract. One can run the test described to reproduce the issue, to verify no leak is detected. --- .../pulsar/client/impl/RawBatchConverter.java | 5 +- .../pulsar/compaction/CompactedTopicImpl.java | 19 ++-- .../pulsar/compaction/TwoPhaseCompactor.java | 119 +++++++++++---------- .../apache/pulsar/client/impl/RawReaderTest.java | 4 +- 4 files changed, 82 insertions(+), 65 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java index e252426..8c21a73 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java @@ -91,8 +91,7 @@ public class RawBatchConverter { * Take a batched message and a filter, and returns a message with the only the sub-messages * which match the filter. Returns an empty optional if no messages match. * - * This takes ownership of the passes in message, and if the returned optional is not empty, - * the ownership of that message is returned also. + * NOTE: this message does not alter the reference count of the RawMessage argument. */ public static Optional<RawMessage> rebatchMessage(RawMessage msg, BiPredicate<String, MessageId> filter) @@ -161,9 +160,9 @@ public class RawBatchConverter { return Optional.empty(); } } finally { + uncompressedPayload.release(); batchBuffer.release(); metadata.recycle(); - msg.close(); } } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java index b1378b6..22efe8e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java @@ -164,12 +164,19 @@ public class CompactedTopicImpl implements CompactedTopic { if (rc != BKException.Code.OK) { promise.completeExceptionally(BKException.create(rc)); } else { - try (RawMessage m = RawMessageImpl.deserializeFrom( - seq.nextElement().getEntryBuffer())) { - promise.complete(m.getMessageIdData()); - } catch (NoSuchElementException e) { - log.error("No such entry {} in ledger {}", entryId, lh.getId()); - promise.completeExceptionally(e); + // Need to release buffers for all entries in the sequence + if (seq.hasMoreElements()) { + LedgerEntry entry = seq.nextElement(); + try (RawMessage m = RawMessageImpl.deserializeFrom(entry.getEntryBuffer())) { + entry.getEntryBuffer().release(); + while (seq.hasMoreElements()) { + seq.nextElement().getEntryBuffer().release(); + } + promise.complete(m.getMessageIdData()); + } + } else { + promise.completeExceptionally(new NoSuchElementException( + String.format("No such entry %d in ledger %d", entryId, lh.getId()))); } } }, null); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java index 95f6f1a..a275bb5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java @@ -212,77 +212,88 @@ public class TwoPhaseCompactor extends Compactor { private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId> latestForKey, LedgerHandle lh, Semaphore outstanding, CompletableFuture<Void> promise) { + if (promise.isDone()) { + return; + } reader.readNextAsync().whenCompleteAsync( (m, exception) -> { if (exception != null) { promise.completeExceptionally(exception); return; } else if (promise.isDone()) { + m.close(); return; } - MessageId id = m.getMessageId(); - Optional<RawMessage> messageToAdd = Optional.empty(); - if (RawBatchConverter.isReadableBatch(m)) { - try { - messageToAdd = RawBatchConverter.rebatchMessage( - m, (key, subid) -> latestForKey.get(key).equals(subid)); - } catch (IOException ioe) { - log.info("Error decoding batch for message {}. Whole batch will be included in output", - id, ioe); - messageToAdd = Optional.of(m); - } - } else { - Pair<String,Integer> keyAndSize = extractKeyAndSize(m); - MessageId msg; - if (keyAndSize == null) { // pass through messages without a key - messageToAdd = Optional.of(m); - } else if ((msg = latestForKey.get(keyAndSize.getLeft())) != null - && msg.equals(id)) { // consider message only if present into latestForKey map - if (keyAndSize.getRight() <= 0) { - promise.completeExceptionally(new IllegalArgumentException( - "Compaction phase found empty record from sorted key-map")); + try { + MessageId id = m.getMessageId(); + Optional<RawMessage> messageToAdd = Optional.empty(); + if (RawBatchConverter.isReadableBatch(m)) { + try { + messageToAdd = RawBatchConverter.rebatchMessage( + m, (key, subid) -> latestForKey.get(key).equals(subid)); + } catch (IOException ioe) { + log.info("Error decoding batch for message {}. Whole batch will be included in output", + id, ioe); + messageToAdd = Optional.of(m); } - messageToAdd = Optional.of(m); } else { - m.close(); + Pair<String,Integer> keyAndSize = extractKeyAndSize(m); + MessageId msg; + if (keyAndSize == null) { // pass through messages without a key + messageToAdd = Optional.of(m); + } else if ((msg = latestForKey.get(keyAndSize.getLeft())) != null + && msg.equals(id)) { // consider message only if present into latestForKey map + if (keyAndSize.getRight() <= 0) { + promise.completeExceptionally(new IllegalArgumentException( + "Compaction phase found empty record from sorted key-map")); + } + messageToAdd = Optional.of(m); + } } - } - if (messageToAdd.isPresent()) { - try { - outstanding.acquire(); - CompletableFuture<Void> addFuture = addToCompactedLedger(lh, messageToAdd.get()) - .whenComplete((res, exception2) -> { - outstanding.release(); - if (exception2 != null) { - promise.completeExceptionally(exception2); + if (messageToAdd.isPresent()) { + RawMessage message = messageToAdd.get(); + try { + outstanding.acquire(); + CompletableFuture<Void> addFuture = addToCompactedLedger(lh, message) + .whenComplete((res, exception2) -> { + outstanding.release(); + if (exception2 != null) { + promise.completeExceptionally(exception2); + } + }); + if (to.equals(id)) { + addFuture.whenComplete((res, exception2) -> { + if (exception2 == null) { + promise.complete(null); } }); - if (to.equals(id)) { - addFuture.whenComplete((res, exception2) -> { - if (exception2 == null) { - promise.complete(null); - } - }); + } + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + promise.completeExceptionally(ie); + } finally { + if (message != m) { + message.close(); + } } - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - promise.completeExceptionally(ie); - } - } else if (to.equals(id)) { - // Reached to last-id and phase-one found it deleted-message while iterating on ledger so, not - // present under latestForKey. Complete the compaction. - try { - // make sure all inflight writes have finished - outstanding.acquire(MAX_OUTSTANDING); - promise.complete(null); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - promise.completeExceptionally(e); + } else if (to.equals(id)) { + // Reached to last-id and phase-one found it deleted-message while iterating on ledger so, + // not present under latestForKey. Complete the compaction. + try { + // make sure all inflight writes have finished + outstanding.acquire(MAX_OUTSTANDING); + promise.complete(null); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + promise.completeExceptionally(e); + } + return; } - return; + phaseTwoLoop(reader, to, latestForKey, lh, outstanding, promise); + } finally { + m.close(); } - phaseTwoLoop(reader, to, latestForKey, lh, outstanding, promise); }, scheduler); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java index b0c7cd1..5ae4618 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java @@ -319,13 +319,13 @@ public class RawReaderTest extends MockedPulsarServiceBaseTest { } RawReader reader = RawReader.create(pulsarClient, topic, subscription).get(); - try { - RawMessage m1 = reader.readNextAsync().get(); + try (RawMessage m1 = reader.readNextAsync().get()) { RawMessage m2 = RawBatchConverter.rebatchMessage(m1, (key, id) -> key.equals("key2")).get(); List<ImmutablePair<MessageId,String>> idsAndKeys = RawBatchConverter.extractIdsAndKeys(m2); Assert.assertEquals(idsAndKeys.size(), 1); Assert.assertEquals(idsAndKeys.get(0).getRight(), "key2"); m2.close(); + Assert.assertEquals(m1.getHeadersAndPayload().refCnt(), 1); } finally { reader.closeAsync().get(); }