This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 9b783ef PendingReadOp: Fix ledgerEntryImpl reuse problem (#3110)
9b783ef is described below
commit 9b783ef8d8dcdfdc21a56f4afb873447517b5aaf
Author: congbo <[email protected]>
AuthorDate: Thu Mar 24 16:22:02 2022 +0800
PendingReadOp: Fix ledgerEntryImpl reuse problem (#3110)
---
.../apache/bookkeeper/client/PendingReadOp.java | 11 ++-
.../apache/bookkeeper/client/TestParallelRead.java | 84 +++++++++++++++++++++-
2 files changed, 93 insertions(+), 2 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index b48761e..60f76f8 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -110,6 +110,11 @@ class PendingReadOp implements ReadEntryCallback,
SafeRunnable {
@Override
public void close() {
+ // this request has succeeded before, can't recycle writeSet again
+ if (complete.compareAndSet(false, true)) {
+ rc = BKException.Code.UnexpectedConditionException;
+ writeSet.recycle();
+ }
entryImpl.close();
}
@@ -169,7 +174,6 @@ class PendingReadOp implements ReadEntryCallback,
SafeRunnable {
if (complete.compareAndSet(false, true)) {
this.rc = rc;
submitCallback(rc);
- writeSet.recycle();
return true;
} else {
return false;
@@ -563,6 +567,10 @@ class PendingReadOp implements ReadEntryCallback,
SafeRunnable {
}
}
+ private static ReadContext createReadContext(int bookieIndex, BookieId to,
LedgerEntryRequest entry) {
+ return new ReadContext(bookieIndex, to, entry);
+ }
+
void sendReadTo(int bookieIndex, BookieId to, LedgerEntryRequest entry)
throws InterruptedException {
if (lh.throttler != null) {
lh.throttler.acquire();
@@ -592,6 +600,7 @@ class PendingReadOp implements ReadEntryCallback,
SafeRunnable {
heardFromHostsBitSet.set(rctx.bookieIndex, true);
buffer.retain();
+ // if entry has completed don't handle twice
if (entry.complete(rctx.bookieIndex, rctx.to, buffer)) {
if (!isRecoveryRead) {
// do not advance LastAddConfirmed for recovery reads
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
index e4801cf..eae7a10 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
@@ -24,18 +24,27 @@ import static
org.apache.bookkeeper.common.concurrent.FutureUtils.result;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.lang.reflect.Method;
import java.util.Iterator;
import java.util.List;
+import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import org.apache.bookkeeper.client.BKException.Code;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.junit.Test;
import org.slf4j.Logger;
@@ -238,4 +247,77 @@ public class TestParallelRead extends
BookKeeperClusterTestCase {
newBk.close();
}
+ @Test
+ public void testLedgerEntryRequestComplete() throws Exception {
+ LedgerHandle lh = mock(LedgerHandle.class);
+ LedgerMetadata ledgerMetadata = mock(LedgerMetadata.class);
+ ClientContext clientContext = mock(ClientContext.class);
+ ClientInternalConf clientInternalConf = mock(ClientInternalConf.class);
+ doReturn(clientInternalConf).when(clientContext).getConf();
+ BookKeeperClientStats bookKeeperClientStats =
mock(BookKeeperClientStats.class);
+ doReturn(bookKeeperClientStats).when(clientContext).getClientStats();
+ OpStatsLogger opStatsLogger = mock(OpStatsLogger.class);
+ doReturn(opStatsLogger).when(bookKeeperClientStats).getReadOpLogger();
+ doReturn(ledgerMetadata).when(lh).getLedgerMetadata();
+ doReturn(2).when(ledgerMetadata).getWriteQuorumSize();
+ doReturn(1).when(ledgerMetadata).getAckQuorumSize();
+ doReturn(new TreeMap<>()).when(ledgerMetadata).getAllEnsembles();
+ DistributionSchedule.WriteSet writeSet =
mock(DistributionSchedule.WriteSet.class);
+ doReturn(writeSet).when(lh).getWriteSetForReadOperation(anyLong());
+ PendingReadOp pendingReadOp = new PendingReadOp(lh, clientContext, 1,
2, false);
+ pendingReadOp.parallelRead(true);
+ pendingReadOp.initiate();
+ PendingReadOp.LedgerEntryRequest first = pendingReadOp.seq.get(0);
+ PendingReadOp.LedgerEntryRequest second = pendingReadOp.seq.get(1);
+
+ pendingReadOp.submitCallback(-105);
+
+ // pendingReadOp.submitCallback(-105) will close all ledgerEntryImpl
+ assertEquals(-1, first.entryImpl.getEntryId());
+ assertEquals(-1, first.entryImpl.getLedgerId());
+ assertEquals(-1, first.entryImpl.getLength());
+ assertNull(first.entryImpl.getEntryBuffer());
+ assertTrue(first.complete.get());
+
+ assertEquals(-1, second.entryImpl.getEntryId());
+ assertEquals(-1, second.entryImpl.getLedgerId());
+ assertEquals(-1, second.entryImpl.getLength());
+ assertNull(second.entryImpl.getEntryBuffer());
+ assertTrue(second.complete.get());
+
+ // Mock ledgerEntryImpl reuse
+ Method method =
PendingReadOp.class.getDeclaredMethod("createReadContext",
+ int.class, BookieId.class,
PendingReadOp.LedgerEntryRequest.class);
+ method.setAccessible(true);
+
+ ByteBuf byteBuf = Unpooled.buffer(10);
+ pendingReadOp.readEntryComplete(BKException.Code.OK, 1, 1,
Unpooled.buffer(10),
+ method.invoke(pendingReadOp, 1, BookieId.parse("test"),
first));
+
+ // byteBuf has been release
+ assertEquals(byteBuf.refCnt(), 1);
+ // entryBuffer is not replaced
+ assertNull(first.entryImpl.getEntryBuffer());
+ // ledgerEntryRequest has been complete
+ assertTrue(first.complete.get());
+
+ pendingReadOp = new PendingReadOp(lh, clientContext, 1, 2, false);
+ pendingReadOp.parallelRead(true);
+ pendingReadOp.initiate();
+
+ // read entry failed twice, will not close twice
+
pendingReadOp.readEntryComplete(BKException.Code.TooManyRequestsException, 1,
1, Unpooled.buffer(10),
+ method.invoke(pendingReadOp, 1, BookieId.parse("test"),
first));
+
+
pendingReadOp.readEntryComplete(BKException.Code.TooManyRequestsException, 1,
1, Unpooled.buffer(10),
+ method.invoke(pendingReadOp, 1, BookieId.parse("test"),
first));
+
+ // will not complete twice when completed
+ byteBuf = Unpooled.buffer(10);
+ pendingReadOp.readEntryComplete(Code.OK, 1, 1, Unpooled.buffer(10),
+ method.invoke(pendingReadOp, 1, BookieId.parse("test"),
first));
+ assertEquals(1, byteBuf.refCnt());
+
+ }
+
}