This is an automated email from the ASF dual-hosted git repository.

lushiji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new faaf041ac5 Fix ByteBuf memory leak problem when setExplicitLac (#3577)
faaf041ac5 is described below

commit faaf041ac5422ab1d4e46c123084202f48ea6aa8
Author: wenbingshen <[email protected]>
AuthorDate: Wed Nov 2 00:24:35 2022 +0800

    Fix ByteBuf memory leak problem when setExplicitLac (#3577)
    
    * fix set explicitLac no released entry
---
 .../org/apache/bookkeeper/bookie/BookieImpl.java   |  12 ++-
 .../apache/bookkeeper/bookie/BookieImplTest.java   | 101 +++++++++++++++++++++
 2 files changed, 111 insertions(+), 2 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
index a485b3bf31..6b8e482256 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
@@ -32,6 +32,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.Unpooled;
 import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.util.ReferenceCountUtil;
 import java.io.File;
 import java.io.FilenameFilter;
 import java.io.IOException;
@@ -1005,7 +1006,8 @@ public class BookieImpl extends BookieCriticalThread 
implements Bookie {
         }
     }
 
-    private ByteBuf createExplicitLACEntry(long ledgerId, ByteBuf explicitLac) 
{
+    @VisibleForTesting
+    public ByteBuf createExplicitLACEntry(long ledgerId, ByteBuf explicitLac) {
         ByteBuf bb = allocator.directBuffer(8 + 8 + 4 + 
explicitLac.capacity());
         bb.writeLong(ledgerId);
         bb.writeLong(METAENTRY_ID_LEDGER_EXPLICITLAC);
@@ -1016,6 +1018,7 @@ public class BookieImpl extends BookieCriticalThread 
implements Bookie {
 
     public void setExplicitLac(ByteBuf entry, WriteCallback writeCallback, 
Object ctx, byte[] masterKey)
             throws IOException, InterruptedException, BookieException {
+        ByteBuf explicitLACEntry = null;
         try {
             long ledgerId = entry.getLong(entry.readerIndex());
             LedgerDescriptor handle = handles.getHandle(ledgerId, masterKey);
@@ -1023,12 +1026,17 @@ public class BookieImpl extends BookieCriticalThread 
implements Bookie {
                 entry.markReaderIndex();
                 handle.setExplicitLac(entry);
                 entry.resetReaderIndex();
-                ByteBuf explicitLACEntry = createExplicitLACEntry(ledgerId, 
entry);
+                explicitLACEntry = createExplicitLACEntry(ledgerId, entry);
                 getJournal(ledgerId).logAddEntry(explicitLACEntry, false /* 
ackBeforeSync */, writeCallback, ctx);
             }
         } catch (NoWritableLedgerDirException e) {
             stateManager.transitionToReadOnlyMode();
             throw new IOException(e);
+        } finally {
+            ReferenceCountUtil.safeRelease(entry);
+            if (explicitLACEntry != null) {
+                ReferenceCountUtil.safeRelease(explicitLACEntry);
+            }
         }
     }
 
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieImplTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieImplTest.java
new file mode 100644
index 0000000000..5cd1f49c42
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieImplTest.java
@@ -0,0 +1,101 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.UnsafeByteOperations;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import java.nio.charset.StandardCharsets;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.apache.bookkeeper.discover.RegistrationManager;
+import org.apache.bookkeeper.meta.MetadataBookieDriver;
+import org.apache.bookkeeper.proto.checksum.DigestManager;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.util.ByteBufList;
+import org.apache.bookkeeper.util.PortManager;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BookieImplTest extends BookKeeperClusterTestCase {
+    private static final Logger log = 
LoggerFactory.getLogger(BookieImplTest.class);
+
+    private static final int bookiePort = PortManager.nextFreePort();
+
+    public BookieImplTest() {
+        super(0);
+    }
+
+    @Test
+    public void testWriteLac() throws Exception {
+        final String metadataServiceUri = zkUtil.getMetadataServiceUri();
+        ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+        conf.setMetadataServiceUri(metadataServiceUri);
+
+        MetadataBookieDriver metadataDriver = 
BookieResources.createMetadataDriver(
+                conf, NullStatsLogger.INSTANCE);
+        RegistrationManager rm = metadataDriver.createRegistrationManager();
+        TestBookieImpl.Resources resources = new 
TestBookieImpl.ResourceBuilder(conf)
+                
.withMetadataDriver(metadataDriver).withRegistrationManager(rm).build();
+        BookieImpl b = new TestBookieImpl(resources);
+        b.start();
+
+        final BookieImpl spyBookie = spy(b);
+
+        final long ledgerId = 10;
+        final long lac = 23;
+
+        DigestManager digestManager = DigestManager.instantiate(ledgerId, 
"".getBytes(StandardCharsets.UTF_8),
+                
BookKeeper.DigestType.toProtoDigestType(BookKeeper.DigestType.CRC32), 
UnpooledByteBufAllocator.DEFAULT,
+                baseClientConf.getUseV2WireProtocol());
+
+        final ByteBufList toSend = 
digestManager.computeDigestAndPackageForSendingLac(lac);
+        ByteString body = UnsafeByteOperations.unsafeWrap(toSend.array(), 
toSend.arrayOffset(), toSend.readableBytes());
+
+        final ByteBuf lacToAdd = 
Unpooled.wrappedBuffer(body.asReadOnlyByteBuffer());
+        final byte[] masterKey = 
ByteString.copyFrom("masterKey".getBytes()).toByteArray();
+
+        final ByteBuf explicitLACEntry = b.createExplicitLACEntry(ledgerId, 
lacToAdd);
+        lacToAdd.resetReaderIndex();
+
+        doReturn(explicitLACEntry)
+                .when(spyBookie)
+                .createExplicitLACEntry(eq(ledgerId), eq(lacToAdd));
+
+        spyBookie.setExplicitLac(lacToAdd, null, null, masterKey);
+
+        assertEquals(0, lacToAdd.refCnt());
+        assertEquals(0, explicitLACEntry.refCnt());
+
+        b.shutdown();
+
+    }
+}

Reply via email to