This is an automated email from the ASF dual-hosted git repository.
lushiji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new faaf041ac5 Fix ByteBuf memory leak problem when setExplicitLac (#3577)
faaf041ac5 is described below
commit faaf041ac5422ab1d4e46c123084202f48ea6aa8
Author: wenbingshen <[email protected]>
AuthorDate: Wed Nov 2 00:24:35 2022 +0800
Fix ByteBuf memory leak problem when setExplicitLac (#3577)
* fix set explicitLac no released entry
---
.../org/apache/bookkeeper/bookie/BookieImpl.java | 12 ++-
.../apache/bookkeeper/bookie/BookieImplTest.java | 101 +++++++++++++++++++++
2 files changed, 111 insertions(+), 2 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
index a485b3bf31..6b8e482256 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java
@@ -32,6 +32,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.util.ReferenceCountUtil;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
@@ -1005,7 +1006,8 @@ public class BookieImpl extends BookieCriticalThread
implements Bookie {
}
}
- private ByteBuf createExplicitLACEntry(long ledgerId, ByteBuf explicitLac)
{
+ @VisibleForTesting
+ public ByteBuf createExplicitLACEntry(long ledgerId, ByteBuf explicitLac) {
ByteBuf bb = allocator.directBuffer(8 + 8 + 4 +
explicitLac.capacity());
bb.writeLong(ledgerId);
bb.writeLong(METAENTRY_ID_LEDGER_EXPLICITLAC);
@@ -1016,6 +1018,7 @@ public class BookieImpl extends BookieCriticalThread
implements Bookie {
public void setExplicitLac(ByteBuf entry, WriteCallback writeCallback,
Object ctx, byte[] masterKey)
throws IOException, InterruptedException, BookieException {
+ ByteBuf explicitLACEntry = null;
try {
long ledgerId = entry.getLong(entry.readerIndex());
LedgerDescriptor handle = handles.getHandle(ledgerId, masterKey);
@@ -1023,12 +1026,17 @@ public class BookieImpl extends BookieCriticalThread
implements Bookie {
entry.markReaderIndex();
handle.setExplicitLac(entry);
entry.resetReaderIndex();
- ByteBuf explicitLACEntry = createExplicitLACEntry(ledgerId,
entry);
+ explicitLACEntry = createExplicitLACEntry(ledgerId, entry);
getJournal(ledgerId).logAddEntry(explicitLACEntry, false /*
ackBeforeSync */, writeCallback, ctx);
}
} catch (NoWritableLedgerDirException e) {
stateManager.transitionToReadOnlyMode();
throw new IOException(e);
+ } finally {
+ ReferenceCountUtil.safeRelease(entry);
+ if (explicitLACEntry != null) {
+ ReferenceCountUtil.safeRelease(explicitLACEntry);
+ }
}
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieImplTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieImplTest.java
new file mode 100644
index 0000000000..5cd1f49c42
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieImplTest.java
@@ -0,0 +1,101 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.UnsafeByteOperations;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import java.nio.charset.StandardCharsets;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.apache.bookkeeper.discover.RegistrationManager;
+import org.apache.bookkeeper.meta.MetadataBookieDriver;
+import org.apache.bookkeeper.proto.checksum.DigestManager;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.util.ByteBufList;
+import org.apache.bookkeeper.util.PortManager;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BookieImplTest extends BookKeeperClusterTestCase {
+ private static final Logger log =
LoggerFactory.getLogger(BookieImplTest.class);
+
+ private static final int bookiePort = PortManager.nextFreePort();
+
+ public BookieImplTest() {
+ super(0);
+ }
+
+ @Test
+ public void testWriteLac() throws Exception {
+ final String metadataServiceUri = zkUtil.getMetadataServiceUri();
+ ServerConfiguration conf =
TestBKConfiguration.newServerConfiguration();
+ conf.setMetadataServiceUri(metadataServiceUri);
+
+ MetadataBookieDriver metadataDriver =
BookieResources.createMetadataDriver(
+ conf, NullStatsLogger.INSTANCE);
+ RegistrationManager rm = metadataDriver.createRegistrationManager();
+ TestBookieImpl.Resources resources = new
TestBookieImpl.ResourceBuilder(conf)
+
.withMetadataDriver(metadataDriver).withRegistrationManager(rm).build();
+ BookieImpl b = new TestBookieImpl(resources);
+ b.start();
+
+ final BookieImpl spyBookie = spy(b);
+
+ final long ledgerId = 10;
+ final long lac = 23;
+
+ DigestManager digestManager = DigestManager.instantiate(ledgerId,
"".getBytes(StandardCharsets.UTF_8),
+
BookKeeper.DigestType.toProtoDigestType(BookKeeper.DigestType.CRC32),
UnpooledByteBufAllocator.DEFAULT,
+ baseClientConf.getUseV2WireProtocol());
+
+ final ByteBufList toSend =
digestManager.computeDigestAndPackageForSendingLac(lac);
+ ByteString body = UnsafeByteOperations.unsafeWrap(toSend.array(),
toSend.arrayOffset(), toSend.readableBytes());
+
+ final ByteBuf lacToAdd =
Unpooled.wrappedBuffer(body.asReadOnlyByteBuffer());
+ final byte[] masterKey =
ByteString.copyFrom("masterKey".getBytes()).toByteArray();
+
+ final ByteBuf explicitLACEntry = b.createExplicitLACEntry(ledgerId,
lacToAdd);
+ lacToAdd.resetReaderIndex();
+
+ doReturn(explicitLACEntry)
+ .when(spyBookie)
+ .createExplicitLACEntry(eq(ledgerId), eq(lacToAdd));
+
+ spyBookie.setExplicitLac(lacToAdd, null, null, masterKey);
+
+ assertEquals(0, lacToAdd.refCnt());
+ assertEquals(0, explicitLACEntry.refCnt());
+
+ b.shutdown();
+
+ }
+}