This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 9843ad62c9 Add ByteBuf refCnt test for AddEntry and use pooled ByteBuf
reduce heap usage (#3598)
9843ad62c9 is described below
commit 9843ad62c95cc6d975da8a0be9bccc8e2850bf3f
Author: wenbingshen <[email protected]>
AuthorDate: Wed Nov 2 23:40:58 2022 +0800
Add ByteBuf refCnt test for AddEntry and use pooled ByteBuf reduce heap
usage (#3598)
* Fix ByteBuf memory leak problem when addEntryInternal
* fix checkstyle
* use pooled ByteBuf
---
.../org/apache/bookkeeper/bookie/BookieImpl.java | 31 +++++---
.../java/org/apache/bookkeeper/bookie/Journal.java | 5 --
.../apache/bookkeeper/bookie/BookieImplTest.java | 83 ++++++++++++++++++++++
3 files changed, 103 insertions(+), 16 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
index 6b8e482256..5822fecc10 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
@@ -931,6 +931,17 @@ public class BookieImpl extends BookieCriticalThread
implements Bookie {
return journals.get(MathUtils.signSafeMod(ledgerId, journals.size()));
}
+ @VisibleForTesting
+ public ByteBuf createMasterKeyEntry(long ledgerId, byte[] masterKey) {
+ // new handle, we should add the key to journal ensure we can rebuild
+ ByteBuf bb = allocator.directBuffer(8 + 8 + 4 + masterKey.length);
+ bb.writeLong(ledgerId);
+ bb.writeLong(METAENTRY_ID_LEDGER_KEY);
+ bb.writeInt(masterKey.length);
+ bb.writeBytes(masterKey);
+ return bb;
+ }
+
/**
* Add an entry to a ledger as specified by handle.
*/
@@ -948,15 +959,13 @@ public class BookieImpl extends BookieCriticalThread
implements Bookie {
// Force the load into masterKey cache
byte[] oldValue = masterKeyCache.putIfAbsent(ledgerId, masterKey);
if (oldValue == null) {
- // new handle, we should add the key to journal ensure we can
rebuild
- ByteBuffer bb = ByteBuffer.allocate(8 + 8 + 4 +
masterKey.length);
- bb.putLong(ledgerId);
- bb.putLong(METAENTRY_ID_LEDGER_KEY);
- bb.putInt(masterKey.length);
- bb.put(masterKey);
- bb.flip();
-
- getJournal(ledgerId).logAddEntry(bb, false /* ackBeforeSync
*/, new NopWriteCallback(), null);
+ ByteBuf masterKeyEntry = createMasterKeyEntry(ledgerId,
masterKey);
+ try {
+ getJournal(ledgerId).logAddEntry(
+ masterKeyEntry, false /* ackBeforeSync */, new
NopWriteCallback(), null);
+ } finally {
+ ReferenceCountUtil.safeRelease(masterKeyEntry);
+ }
}
}
@@ -1002,7 +1011,7 @@ public class BookieImpl extends BookieCriticalThread
implements Bookie {
bookieStats.getAddBytesStats().registerFailedValue(entrySize);
}
- entry.release();
+ ReferenceCountUtil.safeRelease(entry);
}
}
@@ -1096,7 +1105,7 @@ public class BookieImpl extends BookieCriticalThread
implements Bookie {
bookieStats.getAddBytesStats().registerFailedValue(entrySize);
}
- entry.release();
+ ReferenceCountUtil.safeRelease(entry);
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 93c58277b9..1a986707a8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -960,11 +960,6 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
}
}
- public void logAddEntry(ByteBuffer entry, boolean ackBeforeSync,
WriteCallback cb, Object ctx)
- throws InterruptedException {
- logAddEntry(Unpooled.wrappedBuffer(entry), ackBeforeSync, cb, ctx);
- }
-
/**
* record an add entry operation in journal.
*/
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieImplTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieImplTest.java
index 5cd1f49c42..a93b5ac3a0 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieImplTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieImplTest.java
@@ -21,6 +21,7 @@
package org.apache.bookkeeper.bookie;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
@@ -31,16 +32,20 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import java.nio.charset.StandardCharsets;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.discover.RegistrationManager;
import org.apache.bookkeeper.meta.MetadataBookieDriver;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.proto.checksum.DigestManager;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.util.PortManager;
+import org.awaitility.Awaitility;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,6 +55,9 @@ public class BookieImplTest extends BookKeeperClusterTestCase
{
private static final int bookiePort = PortManager.nextFreePort();
+ private static final int ADD = 0;
+ private static final int RECOVERY_ADD = 1;
+
public BookieImplTest() {
super(0);
}
@@ -98,4 +106,79 @@ public class BookieImplTest extends
BookKeeperClusterTestCase {
b.shutdown();
}
+
+ @Test
+ public void testAddEntry() throws Exception {
+ mockAddEntryReleased(ADD);
+ }
+
+ @Test
+ public void testRecoveryAddEntry() throws Exception {
+ mockAddEntryReleased(RECOVERY_ADD);
+ }
+
+ public void mockAddEntryReleased(int flag) throws Exception {
+ final String metadataServiceUri = zkUtil.getMetadataServiceUri();
+ ServerConfiguration conf =
TestBKConfiguration.newServerConfiguration();
+ conf.setMetadataServiceUri(metadataServiceUri);
+
+ MetadataBookieDriver metadataDriver =
BookieResources.createMetadataDriver(
+ conf, NullStatsLogger.INSTANCE);
+ RegistrationManager rm = metadataDriver.createRegistrationManager();
+ TestBookieImpl.Resources resources = new
TestBookieImpl.ResourceBuilder(conf)
+
.withMetadataDriver(metadataDriver).withRegistrationManager(rm).build();
+ BookieImpl b = new TestBookieImpl(resources);
+ b.start();
+
+ final BookieImpl spyBookie = spy(b);
+
+ final long ledgerId = 10;
+
+ final byte[] masterKey =
ByteString.copyFrom("masterKey".getBytes()).toByteArray();
+
+ final ByteBuf masterKeyEntry = b.createMasterKeyEntry(ledgerId,
masterKey);
+
+ doReturn(masterKeyEntry)
+ .when(spyBookie)
+ .createMasterKeyEntry(eq(ledgerId), eq(masterKey));
+
+ final ByteBuf entry = generateEntry(ledgerId, 0);
+
+ AtomicBoolean complete = new AtomicBoolean(false);
+ final BookkeeperInternalCallbacks.WriteCallback writeCallback =
+ new BookkeeperInternalCallbacks.WriteCallback() {
+ @Override
+ public void writeComplete(int rc, long ledgerId, long
entryId, BookieId addr, Object ctx) {
+ complete.set(true);
+ }
+ };
+
+ switch (flag) {
+ case ADD:
+ spyBookie.addEntry(entry, false, writeCallback, null,
masterKey);
+ break;
+ case RECOVERY_ADD:
+ spyBookie.recoveryAddEntry(entry, writeCallback, null,
masterKey);
+ break;
+ default:
+ throw new IllegalArgumentException("Only support ADD and
RECOVERY_ADD flag.");
+ }
+
+ Awaitility.await().untilAsserted(() -> assertTrue(complete.get()));
+
+ assertEquals(0, entry.refCnt());
+ assertEquals(0, masterKeyEntry.refCnt());
+
+ b.shutdown();
+
+ }
+
+ private ByteBuf generateEntry(long ledger, long entry) {
+ byte[] data = ("ledger-" + ledger + "-" + entry).getBytes();
+ ByteBuf bb = Unpooled.buffer(8 + 8 + data.length);
+ bb.writeLong(ledger);
+ bb.writeLong(entry);
+ bb.writeBytes(data);
+ return bb;
+ }
}