This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch branch-4.14 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit f6f827f12fc2e587f4ee77c0566059897a85a939 Author: congbo <[email protected]> AuthorDate: Thu Mar 24 16:22:02 2022 +0800 PendingReadOp: Fix ledgerEntryImpl reuse problem (#3110) (cherry picked from commit 9b783ef8d8dcdfdc21a56f4afb873447517b5aaf) --- .../apache/bookkeeper/client/PendingReadOp.java | 11 ++- .../apache/bookkeeper/client/TestParallelRead.java | 84 +++++++++++++++++++++- 2 files changed, 93 insertions(+), 2 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java index b48761e..60f76f8 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java @@ -110,6 +110,11 @@ class PendingReadOp implements ReadEntryCallback, SafeRunnable { @Override public void close() { + // this request has succeeded before, can't recycle writeSet again + if (complete.compareAndSet(false, true)) { + rc = BKException.Code.UnexpectedConditionException; + writeSet.recycle(); + } entryImpl.close(); } @@ -169,7 +174,6 @@ class PendingReadOp implements ReadEntryCallback, SafeRunnable { if (complete.compareAndSet(false, true)) { this.rc = rc; submitCallback(rc); - writeSet.recycle(); return true; } else { return false; @@ -563,6 +567,10 @@ class PendingReadOp implements ReadEntryCallback, SafeRunnable { } } + private static ReadContext createReadContext(int bookieIndex, BookieId to, LedgerEntryRequest entry) { + return new ReadContext(bookieIndex, to, entry); + } + void sendReadTo(int bookieIndex, BookieId to, LedgerEntryRequest entry) throws InterruptedException { if (lh.throttler != null) { lh.throttler.acquire(); @@ -592,6 +600,7 @@ class PendingReadOp implements ReadEntryCallback, SafeRunnable { heardFromHostsBitSet.set(rctx.bookieIndex, true); buffer.retain(); + // if entry has completed don't handle twice if (entry.complete(rctx.bookieIndex, rctx.to, buffer)) { if (!isRecoveryRead) { // do not advance LastAddConfirmed for recovery reads diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java index e4801cf..eae7a10 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java @@ -24,18 +24,27 @@ import static org.apache.bookkeeper.common.concurrent.FutureUtils.result; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; - +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.lang.reflect.Method; import java.util.Iterator; import java.util.List; +import java.util.TreeMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import org.apache.bookkeeper.client.BKException.Code; import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.net.BookieId; +import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; import org.junit.Test; import org.slf4j.Logger; @@ -238,4 +247,77 @@ public class TestParallelRead extends BookKeeperClusterTestCase { newBk.close(); } + @Test + public void testLedgerEntryRequestComplete() throws Exception { + LedgerHandle lh = mock(LedgerHandle.class); + LedgerMetadata ledgerMetadata = mock(LedgerMetadata.class); + ClientContext clientContext = mock(ClientContext.class); + ClientInternalConf clientInternalConf = mock(ClientInternalConf.class); + doReturn(clientInternalConf).when(clientContext).getConf(); + BookKeeperClientStats bookKeeperClientStats = mock(BookKeeperClientStats.class); + doReturn(bookKeeperClientStats).when(clientContext).getClientStats(); + OpStatsLogger opStatsLogger = mock(OpStatsLogger.class); + doReturn(opStatsLogger).when(bookKeeperClientStats).getReadOpLogger(); + doReturn(ledgerMetadata).when(lh).getLedgerMetadata(); + doReturn(2).when(ledgerMetadata).getWriteQuorumSize(); + doReturn(1).when(ledgerMetadata).getAckQuorumSize(); + doReturn(new TreeMap<>()).when(ledgerMetadata).getAllEnsembles(); + DistributionSchedule.WriteSet writeSet = mock(DistributionSchedule.WriteSet.class); + doReturn(writeSet).when(lh).getWriteSetForReadOperation(anyLong()); + PendingReadOp pendingReadOp = new PendingReadOp(lh, clientContext, 1, 2, false); + pendingReadOp.parallelRead(true); + pendingReadOp.initiate(); + PendingReadOp.LedgerEntryRequest first = pendingReadOp.seq.get(0); + PendingReadOp.LedgerEntryRequest second = pendingReadOp.seq.get(1); + + pendingReadOp.submitCallback(-105); + + // pendingReadOp.submitCallback(-105) will close all ledgerEntryImpl + assertEquals(-1, first.entryImpl.getEntryId()); + assertEquals(-1, first.entryImpl.getLedgerId()); + assertEquals(-1, first.entryImpl.getLength()); + assertNull(first.entryImpl.getEntryBuffer()); + assertTrue(first.complete.get()); + + assertEquals(-1, second.entryImpl.getEntryId()); + assertEquals(-1, second.entryImpl.getLedgerId()); + assertEquals(-1, second.entryImpl.getLength()); + assertNull(second.entryImpl.getEntryBuffer()); + assertTrue(second.complete.get()); + + // Mock ledgerEntryImpl reuse + Method method = PendingReadOp.class.getDeclaredMethod("createReadContext", + int.class, BookieId.class, PendingReadOp.LedgerEntryRequest.class); + method.setAccessible(true); + + ByteBuf byteBuf = Unpooled.buffer(10); + pendingReadOp.readEntryComplete(BKException.Code.OK, 1, 1, Unpooled.buffer(10), + method.invoke(pendingReadOp, 1, BookieId.parse("test"), first)); + + // byteBuf has been release + assertEquals(byteBuf.refCnt(), 1); + // entryBuffer is not replaced + assertNull(first.entryImpl.getEntryBuffer()); + // ledgerEntryRequest has been complete + assertTrue(first.complete.get()); + + pendingReadOp = new PendingReadOp(lh, clientContext, 1, 2, false); + pendingReadOp.parallelRead(true); + pendingReadOp.initiate(); + + // read entry failed twice, will not close twice + pendingReadOp.readEntryComplete(BKException.Code.TooManyRequestsException, 1, 1, Unpooled.buffer(10), + method.invoke(pendingReadOp, 1, BookieId.parse("test"), first)); + + pendingReadOp.readEntryComplete(BKException.Code.TooManyRequestsException, 1, 1, Unpooled.buffer(10), + method.invoke(pendingReadOp, 1, BookieId.parse("test"), first)); + + // will not complete twice when completed + byteBuf = Unpooled.buffer(10); + pendingReadOp.readEntryComplete(Code.OK, 1, 1, Unpooled.buffer(10), + method.invoke(pendingReadOp, 1, BookieId.parse("test"), first)); + assertEquals(1, byteBuf.refCnt()); + + } + }
