This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 9843ad62c9 Add ByteBuf refCnt test for AddEntry and use pooled ByteBuf 
reduce heap usage (#3598)
9843ad62c9 is described below

commit 9843ad62c95cc6d975da8a0be9bccc8e2850bf3f
Author: wenbingshen <[email protected]>
AuthorDate: Wed Nov 2 23:40:58 2022 +0800

    Add ByteBuf refCnt test for AddEntry and use pooled ByteBuf reduce heap 
usage (#3598)
    
    * Fix ByteBuf memory leak problem when addEntryInternal
    
    * fix checkstyle
    
    * use pooled ByteBuf
---
 .../org/apache/bookkeeper/bookie/BookieImpl.java   | 31 +++++---
 .../java/org/apache/bookkeeper/bookie/Journal.java |  5 --
 .../apache/bookkeeper/bookie/BookieImplTest.java   | 83 ++++++++++++++++++++++
 3 files changed, 103 insertions(+), 16 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
index 6b8e482256..5822fecc10 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
@@ -931,6 +931,17 @@ public class BookieImpl extends BookieCriticalThread 
implements Bookie {
         return journals.get(MathUtils.signSafeMod(ledgerId, journals.size()));
     }
 
+    @VisibleForTesting
+    public ByteBuf createMasterKeyEntry(long ledgerId, byte[] masterKey) {
+        // new handle, we should add the key to journal ensure we can rebuild
+        ByteBuf bb = allocator.directBuffer(8 + 8 + 4 + masterKey.length);
+        bb.writeLong(ledgerId);
+        bb.writeLong(METAENTRY_ID_LEDGER_KEY);
+        bb.writeInt(masterKey.length);
+        bb.writeBytes(masterKey);
+        return bb;
+    }
+
     /**
      * Add an entry to a ledger as specified by handle.
      */
@@ -948,15 +959,13 @@ public class BookieImpl extends BookieCriticalThread 
implements Bookie {
             // Force the load into masterKey cache
             byte[] oldValue = masterKeyCache.putIfAbsent(ledgerId, masterKey);
             if (oldValue == null) {
-                // new handle, we should add the key to journal ensure we can 
rebuild
-                ByteBuffer bb = ByteBuffer.allocate(8 + 8 + 4 + 
masterKey.length);
-                bb.putLong(ledgerId);
-                bb.putLong(METAENTRY_ID_LEDGER_KEY);
-                bb.putInt(masterKey.length);
-                bb.put(masterKey);
-                bb.flip();
-
-                getJournal(ledgerId).logAddEntry(bb, false /* ackBeforeSync 
*/, new NopWriteCallback(), null);
+                ByteBuf masterKeyEntry = createMasterKeyEntry(ledgerId, 
masterKey);
+                try {
+                    getJournal(ledgerId).logAddEntry(
+                            masterKeyEntry, false /* ackBeforeSync */, new 
NopWriteCallback(), null);
+                } finally {
+                    ReferenceCountUtil.safeRelease(masterKeyEntry);
+                }
             }
         }
 
@@ -1002,7 +1011,7 @@ public class BookieImpl extends BookieCriticalThread 
implements Bookie {
                 bookieStats.getAddBytesStats().registerFailedValue(entrySize);
             }
 
-            entry.release();
+            ReferenceCountUtil.safeRelease(entry);
         }
     }
 
@@ -1096,7 +1105,7 @@ public class BookieImpl extends BookieCriticalThread 
implements Bookie {
                 bookieStats.getAddBytesStats().registerFailedValue(entrySize);
             }
 
-            entry.release();
+            ReferenceCountUtil.safeRelease(entry);
         }
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 93c58277b9..1a986707a8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -960,11 +960,6 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
         }
     }
 
-    public void logAddEntry(ByteBuffer entry, boolean ackBeforeSync, 
WriteCallback cb, Object ctx)
-            throws InterruptedException {
-        logAddEntry(Unpooled.wrappedBuffer(entry), ackBeforeSync, cb, ctx);
-    }
-
     /**
      * record an add entry operation in journal.
      */
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieImplTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieImplTest.java
index 5cd1f49c42..a93b5ac3a0 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieImplTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieImplTest.java
@@ -21,6 +21,7 @@
 package org.apache.bookkeeper.bookie;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
@@ -31,16 +32,20 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.buffer.UnpooledByteBufAllocator;
 import java.nio.charset.StandardCharsets;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.discover.RegistrationManager;
 import org.apache.bookkeeper.meta.MetadataBookieDriver;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
 import org.apache.bookkeeper.proto.checksum.DigestManager;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.util.ByteBufList;
 import org.apache.bookkeeper.util.PortManager;
+import org.awaitility.Awaitility;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,6 +55,9 @@ public class BookieImplTest extends BookKeeperClusterTestCase 
{
 
     private static final int bookiePort = PortManager.nextFreePort();
 
+    private static final int ADD = 0;
+    private static final int RECOVERY_ADD = 1;
+
     public BookieImplTest() {
         super(0);
     }
@@ -98,4 +106,79 @@ public class BookieImplTest extends 
BookKeeperClusterTestCase {
         b.shutdown();
 
     }
+
+    @Test
+    public void testAddEntry() throws Exception {
+        mockAddEntryReleased(ADD);
+    }
+
+    @Test
+    public void testRecoveryAddEntry() throws Exception {
+        mockAddEntryReleased(RECOVERY_ADD);
+    }
+
+    public void mockAddEntryReleased(int flag) throws Exception {
+        final String metadataServiceUri = zkUtil.getMetadataServiceUri();
+        ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+        conf.setMetadataServiceUri(metadataServiceUri);
+
+        MetadataBookieDriver metadataDriver = 
BookieResources.createMetadataDriver(
+                conf, NullStatsLogger.INSTANCE);
+        RegistrationManager rm = metadataDriver.createRegistrationManager();
+        TestBookieImpl.Resources resources = new 
TestBookieImpl.ResourceBuilder(conf)
+                
.withMetadataDriver(metadataDriver).withRegistrationManager(rm).build();
+        BookieImpl b = new TestBookieImpl(resources);
+        b.start();
+
+        final BookieImpl spyBookie = spy(b);
+
+        final long ledgerId = 10;
+
+        final byte[] masterKey = 
ByteString.copyFrom("masterKey".getBytes()).toByteArray();
+
+        final ByteBuf masterKeyEntry = b.createMasterKeyEntry(ledgerId, 
masterKey);
+
+        doReturn(masterKeyEntry)
+                .when(spyBookie)
+                .createMasterKeyEntry(eq(ledgerId), eq(masterKey));
+
+        final ByteBuf entry = generateEntry(ledgerId, 0);
+
+        AtomicBoolean complete = new AtomicBoolean(false);
+        final BookkeeperInternalCallbacks.WriteCallback writeCallback =
+                new BookkeeperInternalCallbacks.WriteCallback() {
+                    @Override
+                    public void writeComplete(int rc, long ledgerId, long 
entryId, BookieId addr, Object ctx) {
+                        complete.set(true);
+                    }
+                };
+
+        switch (flag) {
+            case ADD:
+                spyBookie.addEntry(entry, false, writeCallback, null, 
masterKey);
+                break;
+            case RECOVERY_ADD:
+                spyBookie.recoveryAddEntry(entry, writeCallback, null, 
masterKey);
+                break;
+            default:
+                throw new IllegalArgumentException("Only support ADD and 
RECOVERY_ADD flag.");
+        }
+
+        Awaitility.await().untilAsserted(() -> assertTrue(complete.get()));
+
+        assertEquals(0, entry.refCnt());
+        assertEquals(0, masterKeyEntry.refCnt());
+
+        b.shutdown();
+
+    }
+
+    private ByteBuf generateEntry(long ledger, long entry) {
+        byte[] data = ("ledger-" + ledger + "-" + entry).getBytes();
+        ByteBuf bb = Unpooled.buffer(8 + 8 + data.length);
+        bb.writeLong(ledger);
+        bb.writeLong(entry);
+        bb.writeBytes(data);
+        return bb;
+    }
 }

Reply via email to