This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b783ef  PendingReadOp: Fix ledgerEntryImpl reuse problem (#3110)
9b783ef is described below

commit 9b783ef8d8dcdfdc21a56f4afb873447517b5aaf
Author: congbo <[email protected]>
AuthorDate: Thu Mar 24 16:22:02 2022 +0800

    PendingReadOp: Fix ledgerEntryImpl reuse problem (#3110)
---
 .../apache/bookkeeper/client/PendingReadOp.java    | 11 ++-
 .../apache/bookkeeper/client/TestParallelRead.java | 84 +++++++++++++++++++++-
 2 files changed, 93 insertions(+), 2 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index b48761e..60f76f8 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -110,6 +110,11 @@ class PendingReadOp implements ReadEntryCallback, 
SafeRunnable {
 
         @Override
         public void close() {
+            // this request has succeeded before, can't recycle writeSet again
+            if (complete.compareAndSet(false, true)) {
+                rc = BKException.Code.UnexpectedConditionException;
+                writeSet.recycle();
+            }
             entryImpl.close();
         }
 
@@ -169,7 +174,6 @@ class PendingReadOp implements ReadEntryCallback, 
SafeRunnable {
             if (complete.compareAndSet(false, true)) {
                 this.rc = rc;
                 submitCallback(rc);
-                writeSet.recycle();
                 return true;
             } else {
                 return false;
@@ -563,6 +567,10 @@ class PendingReadOp implements ReadEntryCallback, 
SafeRunnable {
         }
     }
 
+    private static ReadContext createReadContext(int bookieIndex, BookieId to, 
LedgerEntryRequest entry) {
+        return new ReadContext(bookieIndex, to, entry);
+    }
+
     void sendReadTo(int bookieIndex, BookieId to, LedgerEntryRequest entry) 
throws InterruptedException {
         if (lh.throttler != null) {
             lh.throttler.acquire();
@@ -592,6 +600,7 @@ class PendingReadOp implements ReadEntryCallback, 
SafeRunnable {
         heardFromHostsBitSet.set(rctx.bookieIndex, true);
 
         buffer.retain();
+        // if entry has completed don't handle twice
         if (entry.complete(rctx.bookieIndex, rctx.to, buffer)) {
             if (!isRecoveryRead) {
                 // do not advance LastAddConfirmed for recovery reads
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
index e4801cf..eae7a10 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
@@ -24,18 +24,27 @@ import static 
org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.lang.reflect.Method;
 import java.util.Iterator;
 import java.util.List;
+import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import org.apache.bookkeeper.client.BKException.Code;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -238,4 +247,77 @@ public class TestParallelRead extends 
BookKeeperClusterTestCase {
         newBk.close();
     }
 
+    @Test
+    public void testLedgerEntryRequestComplete() throws Exception {
+        LedgerHandle lh = mock(LedgerHandle.class);
+        LedgerMetadata ledgerMetadata = mock(LedgerMetadata.class);
+        ClientContext clientContext = mock(ClientContext.class);
+        ClientInternalConf clientInternalConf = mock(ClientInternalConf.class);
+        doReturn(clientInternalConf).when(clientContext).getConf();
+        BookKeeperClientStats bookKeeperClientStats = 
mock(BookKeeperClientStats.class);
+        doReturn(bookKeeperClientStats).when(clientContext).getClientStats();
+        OpStatsLogger opStatsLogger = mock(OpStatsLogger.class);
+        doReturn(opStatsLogger).when(bookKeeperClientStats).getReadOpLogger();
+        doReturn(ledgerMetadata).when(lh).getLedgerMetadata();
+        doReturn(2).when(ledgerMetadata).getWriteQuorumSize();
+        doReturn(1).when(ledgerMetadata).getAckQuorumSize();
+        doReturn(new TreeMap<>()).when(ledgerMetadata).getAllEnsembles();
+        DistributionSchedule.WriteSet writeSet = 
mock(DistributionSchedule.WriteSet.class);
+        doReturn(writeSet).when(lh).getWriteSetForReadOperation(anyLong());
+        PendingReadOp pendingReadOp = new PendingReadOp(lh, clientContext, 1, 
2, false);
+        pendingReadOp.parallelRead(true);
+        pendingReadOp.initiate();
+        PendingReadOp.LedgerEntryRequest first = pendingReadOp.seq.get(0);
+        PendingReadOp.LedgerEntryRequest second = pendingReadOp.seq.get(1);
+
+        pendingReadOp.submitCallback(-105);
+
+        // pendingReadOp.submitCallback(-105) will close all ledgerEntryImpl
+        assertEquals(-1, first.entryImpl.getEntryId());
+        assertEquals(-1, first.entryImpl.getLedgerId());
+        assertEquals(-1, first.entryImpl.getLength());
+        assertNull(first.entryImpl.getEntryBuffer());
+        assertTrue(first.complete.get());
+
+        assertEquals(-1, second.entryImpl.getEntryId());
+        assertEquals(-1, second.entryImpl.getLedgerId());
+        assertEquals(-1, second.entryImpl.getLength());
+        assertNull(second.entryImpl.getEntryBuffer());
+        assertTrue(second.complete.get());
+
+        // Mock ledgerEntryImpl reuse
+        Method method = 
PendingReadOp.class.getDeclaredMethod("createReadContext",
+                int.class, BookieId.class, 
PendingReadOp.LedgerEntryRequest.class);
+        method.setAccessible(true);
+
+        ByteBuf byteBuf = Unpooled.buffer(10);
+        pendingReadOp.readEntryComplete(BKException.Code.OK, 1, 1, 
Unpooled.buffer(10),
+                method.invoke(pendingReadOp, 1, BookieId.parse("test"), 
first));
+
+        // byteBuf has been release
+        assertEquals(byteBuf.refCnt(), 1);
+        // entryBuffer is not replaced
+        assertNull(first.entryImpl.getEntryBuffer());
+        // ledgerEntryRequest has been complete
+        assertTrue(first.complete.get());
+
+        pendingReadOp = new PendingReadOp(lh, clientContext, 1, 2, false);
+        pendingReadOp.parallelRead(true);
+        pendingReadOp.initiate();
+
+        // read entry failed twice, will not close twice
+        
pendingReadOp.readEntryComplete(BKException.Code.TooManyRequestsException, 1, 
1, Unpooled.buffer(10),
+                method.invoke(pendingReadOp, 1, BookieId.parse("test"), 
first));
+
+        
pendingReadOp.readEntryComplete(BKException.Code.TooManyRequestsException, 1, 
1, Unpooled.buffer(10),
+                method.invoke(pendingReadOp, 1, BookieId.parse("test"), 
first));
+
+        // will not complete twice when completed
+        byteBuf = Unpooled.buffer(10);
+        pendingReadOp.readEntryComplete(Code.OK, 1, 1, Unpooled.buffer(10),
+                method.invoke(pendingReadOp, 1, BookieId.parse("test"), 
first));
+        assertEquals(1, byteBuf.refCnt());
+
+    }
+
 }

Reply via email to