This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 6476fc3 ISSUE #1476: LedgerEntry is recycled twice at
ReadLastConfirmedAndEntryOp
6476fc3 is described below
commit 6476fc323fffd328ed9620d9aa39c4c232f8d2be
Author: Sijie Guo <[email protected]>
AuthorDate: Thu Jun 14 02:13:22 2018 -0700
ISSUE #1476: LedgerEntry is recycled twice at ReadLastConfirmedAndEntryOp
Descriptions of the changes in this PR:
The issue #1476 is caused by peculative reads with object recycling, same
request object will be added to the CompletionObjects multiple times with
different txnid. In fact the logic of process the request already take this
into account, only on place inside
`ReadLastConfirmedAndEntryOp.requestComplete` forget to check requestComplete
before calling `submitCallback` which in turn call request.close.
### Motivation
to fix #1476
### Changes
check `requestComplete` before `submitCallback` in
`ReadLastConfirmedAndEntryOp.requestComplete`
Master Issue: #1476
Author: Sijie Guo <[email protected]>
Author: infodog <[email protected]>
Author: zhengxiangyang <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>, Jia Zhai <None>
This closes #1509 from infodog/issue1476, closes #1476
---
.../org/apache/bookkeeper/client/BookKeeper.java | 4 +
.../client/ReadLastConfirmedAndEntryOp.java | 52 +++--
.../client/impl/LastConfirmedAndEntryImpl.java | 14 +-
.../client/ReadLastConfirmedAndEntryOpTest.java | 252 +++++++++++++++++++++
4 files changed, 293 insertions(+), 29 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 5746994..c3a5728 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -738,6 +738,10 @@ public class BookKeeper implements
org.apache.bookkeeper.client.api.BookKeeper {
}
}
+ boolean shouldReorderReadSequence() {
+ return reorderReadSequence;
+ }
+
ZooKeeper getZkHandle() {
return ((ZKMetadataClientDriver) metadataDriver).getZk();
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
index b9888ba..1b3fa70 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
@@ -81,9 +81,9 @@ class ReadLastConfirmedAndEntryOp implements
BookkeeperInternalCallbacks.ReadEnt
ReadLACAndEntryRequest(ArrayList<BookieSocketAddress> ensemble, long
lId, long eId) {
this.entryImpl = LedgerEntryImpl.create(lId, eId);
this.ensemble = ensemble;
- this.writeSet =
lh.distributionSchedule.getWriteSetForLongPoll(eId);
- if (lh.bk.reorderReadSequence) {
- this.orderedEnsemble =
lh.bk.placementPolicy.reorderReadLACSequence(ensemble,
+ this.writeSet =
lh.getDistributionSchedule().getWriteSetForLongPoll(eId);
+ if (lh.getBk().shouldReorderReadSequence()) {
+ this.orderedEnsemble =
lh.getBk().getPlacementPolicy().reorderReadLACSequence(ensemble,
lh.getBookiesHealthInfo(), writeSet.copy());
} else {
this.orderedEnsemble = writeSet.copy();
@@ -118,7 +118,7 @@ class ReadLastConfirmedAndEntryOp implements
BookkeeperInternalCallbacks.ReadEnt
boolean complete(int bookieIndex, BookieSocketAddress host, final
ByteBuf buffer, long entryId) {
ByteBuf content;
try {
- content = lh.macManager.verifyDigestAndReturnData(entryId,
buffer);
+ content =
lh.getDigestManager().verifyDigestAndReturnData(entryId, buffer);
} catch (BKException.BKDigestMatchException e) {
logErrorAndReattemptRead(bookieIndex, host, "Mac mismatch",
BKException.Code.DigestMatchException);
return false;
@@ -201,7 +201,7 @@ class ReadLastConfirmedAndEntryOp implements
BookkeeperInternalCallbacks.ReadEnt
if (LOG.isDebugEnabled()) {
LOG.debug("{} while reading entry: {} ledgerId: {} from
bookie: {}", errMsg, entryImpl.getEntryId(),
- lh.ledgerId, host);
+ lh.getId(), host);
}
}
@@ -417,7 +417,7 @@ class ReadLastConfirmedAndEntryOp implements
BookkeeperInternalCallbacks.ReadEnt
for (int i = 0; i < numReplicasTried; i++) {
int slowBookieIndex = orderedEnsemble.get(i);
BookieSocketAddress slowBookieSocketAddress =
ensemble.get(slowBookieIndex);
-
lh.bk.placementPolicy.registerSlowBookie(slowBookieSocketAddress, entryId);
+
lh.getBk().getPlacementPolicy().registerSlowBookie(slowBookieSocketAddress,
entryId);
}
}
return completed;
@@ -449,7 +449,7 @@ class ReadLastConfirmedAndEntryOp implements
BookkeeperInternalCallbacks.ReadEnt
}
protected LedgerMetadata getLedgerMetadata() {
- return lh.metadata;
+ return lh.getLedgerMetadata();
}
ReadLastConfirmedAndEntryOp parallelRead(boolean enabled) {
@@ -462,7 +462,7 @@ class ReadLastConfirmedAndEntryOp implements
BookkeeperInternalCallbacks.ReadEnt
*/
@Override
public ListenableFuture<Boolean> issueSpeculativeRequest() {
- return lh.bk.getMainWorkerPool().submitOrdered(lh.getId(), new
Callable<Boolean>() {
+ return lh.getBk().getMainWorkerPool().submitOrdered(lh.getId(), new
Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
if (!requestComplete.get() && !request.isComplete()
@@ -480,14 +480,14 @@ class ReadLastConfirmedAndEntryOp implements
BookkeeperInternalCallbacks.ReadEnt
public void initiate() {
if (parallelRead) {
- request = new ParallelReadRequest(lh.metadata.currentEnsemble,
lh.ledgerId, prevEntryId + 1);
+ request = new
ParallelReadRequest(lh.getLedgerMetadata().currentEnsemble, lh.getId(),
prevEntryId + 1);
} else {
- request = new SequenceReadRequest(lh.metadata.currentEnsemble,
lh.ledgerId, prevEntryId + 1);
+ request = new
SequenceReadRequest(lh.getLedgerMetadata().currentEnsemble, lh.getId(),
prevEntryId + 1);
}
request.read();
- if (!parallelRead &&
lh.bk.getReadLACSpeculativeRequestPolicy().isPresent()) {
-
lh.bk.getReadLACSpeculativeRequestPolicy().get().initiateSpeculativeRequest(scheduler,
this);
+ if (!parallelRead &&
lh.getBk().getReadLACSpeculativeRequestPolicy().isPresent()) {
+
lh.getBk().getReadLACSpeculativeRequestPolicy().get().initiateSpeculativeRequest(scheduler,
this);
}
}
@@ -496,8 +496,8 @@ class ReadLastConfirmedAndEntryOp implements
BookkeeperInternalCallbacks.ReadEnt
LOG.debug("Calling Read LAC and Entry with {} and long polling
interval {} on Bookie {} - Parallel {}",
prevEntryId, timeOutInMillis, to, parallelRead);
}
- lh.bk.getBookieClient().readEntryWaitForLACUpdate(to,
- lh.ledgerId,
+ lh.getBk().getBookieClient().readEntryWaitForLACUpdate(to,
+ lh.getId(),
BookieProtocol.LAST_ADD_CONFIRMED,
prevEntryId,
timeOutInMillis,
@@ -517,12 +517,12 @@ class ReadLastConfirmedAndEntryOp implements
BookkeeperInternalCallbacks.ReadEnt
long latencyMicros = MathUtils.elapsedMicroSec(requestTimeNano);
LedgerEntry entry;
if (BKException.Code.OK != rc) {
- lh.bk.getReadLacAndEntryOpLogger()
+ lh.getBk().getReadLacAndEntryOpLogger()
.registerFailedEvent(latencyMicros, TimeUnit.MICROSECONDS);
entry = null;
} else {
// could received advanced lac, with no entry
- lh.bk.getReadLacAndEntryOpLogger()
+ lh.getBk().getReadLacAndEntryOpLogger()
.registerSuccessfulEvent(latencyMicros, TimeUnit.MICROSECONDS);
if (request.entryImpl.getEntryBuffer() != null) {
entry = new LedgerEntry(request.entryImpl);
@@ -558,18 +558,20 @@ class ReadLastConfirmedAndEntryOp implements
BookkeeperInternalCallbacks.ReadEnt
if (entryId != BookieProtocol.LAST_ADD_CONFIRMED) {
buffer.retain();
- if (request.complete(rCtx.getBookieIndex(), bookie, buffer,
entryId)) {
+ if (!requestComplete.get() &&
request.complete(rCtx.getBookieIndex(), bookie, buffer, entryId)) {
// callback immediately
if (rCtx.getLacUpdateTimestamp().isPresent()) {
long elapsedMicros =
TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()
- rCtx.getLacUpdateTimestamp().get());
elapsedMicros = Math.max(elapsedMicros, 0);
- lh.bk.getReadLacAndEntryRespLogger()
- .registerSuccessfulEvent(elapsedMicros,
TimeUnit.MICROSECONDS);
+ lh.getBk().getReadLacAndEntryRespLogger()
+ .registerSuccessfulEvent(elapsedMicros,
TimeUnit.MICROSECONDS);
}
- submitCallback(BKException.Code.OK);
- requestComplete.set(true);
+ // if the request has already completed, the buffer is not
going to be used anymore, release it.
+ if (!completeRequest()) {
+ buffer.release();
+ }
heardFromHostsBitSet.set(rCtx.getBookieIndex(), true);
} else {
buffer.release();
@@ -611,8 +613,9 @@ class ReadLastConfirmedAndEntryOp implements
BookkeeperInternalCallbacks.ReadEnt
}
}
- private void completeRequest() {
- if (requestComplete.compareAndSet(false, true)) {
+ private boolean completeRequest() {
+ boolean requestCompleted = requestComplete.compareAndSet(false, true);
+ if (requestCompleted) {
if (!hasValidResponse) {
// no success called
submitCallback(request.getFirstError());
@@ -621,11 +624,12 @@ class ReadLastConfirmedAndEntryOp implements
BookkeeperInternalCallbacks.ReadEnt
submitCallback(BKException.Code.OK);
}
}
+ return requestCompleted;
}
@Override
public String toString() {
- return String.format("ReadLastConfirmedAndEntryOp(lid=%d,
prevEntryId=%d])", lh.ledgerId, prevEntryId);
+ return String.format("ReadLastConfirmedAndEntryOp(lid=%d,
prevEntryId=%d])", lh.getId(), prevEntryId);
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/LastConfirmedAndEntryImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/LastConfirmedAndEntryImpl.java
index 8f1924a..8090214 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/LastConfirmedAndEntryImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/LastConfirmedAndEntryImpl.java
@@ -41,11 +41,15 @@ public class LastConfirmedAndEntryImpl implements
LastConfirmedAndEntry {
public static LastConfirmedAndEntryImpl create(long lac,
org.apache.bookkeeper.client.LedgerEntry entry) {
LastConfirmedAndEntryImpl entryImpl = RECYCLER.get();
entryImpl.lac = lac;
- entryImpl.entry = LedgerEntryImpl.create(
- entry.getLedgerId(),
- entry.getEntryId(),
- entry.getLength(),
- entry.getEntryBuffer());
+ if (null == entry) {
+ entryImpl.entry = null;
+ } else {
+ entryImpl.entry = LedgerEntryImpl.create(
+ entry.getLedgerId(),
+ entry.getEntryId(),
+ entry.getLength(),
+ entry.getEntryBuffer());
+ }
return entryImpl;
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java
new file mode 100644
index 0000000..f0a03b0
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.client;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.common.base.Optional;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BKException.Code;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import
org.apache.bookkeeper.client.ReadLastConfirmedAndEntryOp.LastConfirmedAndEntryCallback;
+import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
+import org.apache.bookkeeper.client.impl.LastConfirmedAndEntryImpl;
+import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.proto.BookieProtocol;
+import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.proto.ReadLastConfirmedAndEntryContext;
+import org.apache.bookkeeper.proto.checksum.DigestManager;
+import org.apache.bookkeeper.proto.checksum.DummyDigestManager;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.test.TestStatsProvider;
+import org.apache.bookkeeper.util.ByteBufList;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test {@link ReadLastConfirmedAndEntryOp} with mocks.
+ */
+@Slf4j
+public class ReadLastConfirmedAndEntryOpTest {
+
+ private static final long LEDGERID = System.currentTimeMillis();
+
+ private final TestStatsProvider testStatsProvider = new
TestStatsProvider();
+ private OpStatsLogger readLacAndEntryOpLogger;
+ private BookieClient mockBookieClient;
+ private BookKeeper mockBk;
+ private LedgerHandle mockLh;
+ private ScheduledExecutorService scheduler;
+ private OrderedScheduler orderedScheduler;
+ private SpeculativeRequestExecutionPolicy speculativePolicy;
+ private LedgerMetadata ledgerMetadata;
+ private DistributionSchedule distributionSchedule;
+ private DigestManager digestManager;
+
+ @Before
+ public void setup() throws Exception {
+ // stats
+ this.readLacAndEntryOpLogger = testStatsProvider
+ .getStatsLogger("").getOpStatsLogger("readLacAndEntry");
+ // policy
+ this.speculativePolicy = new DefaultSpeculativeRequestExecutionPolicy(
+ 100, 200, 2);
+ // metadata
+ this.ledgerMetadata =
+ new LedgerMetadata(3, 3, 2, DigestType.CRC32, new byte[0]);
+ ArrayList<BookieSocketAddress> ensemble = new ArrayList<>(3);
+ for (int i = 0; i < 3; i++) {
+ ensemble.add(new BookieSocketAddress("127.0.0.1", 3181 + i));
+ }
+ this.ledgerMetadata.addEnsemble(0L, ensemble);
+ this.distributionSchedule = new RoundRobinDistributionSchedule(3, 2,
3);
+ // schedulers
+ this.scheduler = Executors.newSingleThreadScheduledExecutor();
+ this.orderedScheduler = OrderedScheduler.newSchedulerBuilder()
+ .name("test-ordered-scheduler")
+ .numThreads(1)
+ .build();
+
+ this.mockBookieClient = mock(BookieClient.class);
+
+ this.mockBk = mock(BookKeeper.class);
+
when(mockBk.getReadLACSpeculativeRequestPolicy()).thenReturn(Optional.of(speculativePolicy));
+ when(mockBk.getBookieClient()).thenReturn(mockBookieClient);
+
when(mockBk.getReadLacAndEntryOpLogger()).thenReturn(readLacAndEntryOpLogger);
+ when(mockBk.getMainWorkerPool()).thenReturn(orderedScheduler);
+ EnsemblePlacementPolicy mockPlacementPolicy =
mock(EnsemblePlacementPolicy.class);
+ when(mockBk.getPlacementPolicy()).thenReturn(mockPlacementPolicy);
+ this.mockLh = mock(LedgerHandle.class);
+ when(mockLh.getBk()).thenReturn(mockBk);
+ when(mockLh.getId()).thenReturn(LEDGERID);
+ when(mockLh.getLedgerMetadata()).thenReturn(ledgerMetadata);
+
when(mockLh.getDistributionSchedule()).thenReturn(distributionSchedule);
+ digestManager = new DummyDigestManager(LEDGERID, false);
+ when(mockLh.getDigestManager()).thenReturn(digestManager);
+ }
+
+ @After
+ public void teardown() {
+ this.scheduler.shutdown();
+ this.orderedScheduler.shutdown();
+ }
+
+ @Data
+ static class ReadLastConfirmedAndEntryHolder {
+
+ private final BookieSocketAddress address;
+ private final ReadEntryCallback callback;
+ private final ReadLastConfirmedAndEntryContext context;
+
+ }
+
+ /**
+ * Test case: handling different speculative responses. one speculative
response might return a valid response
+ * with a read entry, while the other speculative response might return a
valid response without an entry.
+ * {@link ReadLastConfirmedAndEntryOp} should handle both responses well.
+ *
+ * <p>This test case covers {@link
https://github.com/apache/bookkeeper/issues/1476}.
+ */
+ @Test
+ public void testSpeculativeResponses() throws Exception {
+ final long entryId = 2L;
+ final long lac = 1L;
+
+ ByteBuf data = Unpooled.copiedBuffer("test-speculative-responses",
UTF_8);
+ ByteBufList dataWithDigest =
digestManager.computeDigestAndPackageForSending(
+ entryId, lac, data.readableBytes(), data);
+ byte[] bytesWithDigest = new byte[dataWithDigest.readableBytes()];
+ assertEquals(bytesWithDigest.length,
dataWithDigest.getBytes(bytesWithDigest));
+
+ final Map<BookieSocketAddress, ReadLastConfirmedAndEntryHolder>
callbacks =
+ Collections.synchronizedMap(new HashMap<>());
+ doAnswer(invocationOnMock -> {
+ BookieSocketAddress address = invocationOnMock.getArgument(0);
+ ReadEntryCallback callback = invocationOnMock.getArgument(6);
+ ReadLastConfirmedAndEntryContext context =
invocationOnMock.getArgument(7);
+
+ ReadLastConfirmedAndEntryHolder holder = new
ReadLastConfirmedAndEntryHolder(address, callback, context);
+
+ log.info("Received read request to bookie {}", address);
+
+ callbacks.put(address, holder);
+ return null;
+ }).when(mockBookieClient).readEntryWaitForLACUpdate(
+ any(BookieSocketAddress.class),
+ anyLong(),
+ anyLong(),
+ anyLong(),
+ anyLong(),
+ anyBoolean(),
+ any(ReadEntryCallback.class),
+ any()
+ );
+
+ CompletableFuture<LastConfirmedAndEntry> resultFuture = new
CompletableFuture<>();
+ LastConfirmedAndEntryCallback resultCallback = (rc, lastAddConfirmed,
entry) -> {
+ if (Code.OK != rc) {
+ FutureUtils.completeExceptionally(resultFuture,
BKException.create(rc));
+ } else {
+ FutureUtils.complete(resultFuture,
LastConfirmedAndEntryImpl.create(lastAddConfirmed, entry));
+ }
+ };
+
+ ReadLastConfirmedAndEntryOp op = new ReadLastConfirmedAndEntryOp(
+ mockLh,
+ resultCallback,
+ 1L,
+ 10000,
+ scheduler
+ );
+ op.initiate();
+
+ // wait until all speculative requests are sent
+ while (callbacks.size() < 3) {
+ log.info("Received {} read requests", callbacks.size());
+ Thread.sleep(100);
+ }
+
+ log.info("All speculative reads are outstanding now.");
+
+ // once all the speculative reads are outstanding. complete the
requests in following sequence:
+
+ // 1) complete one bookie with empty response (OK, entryId =
INVALID_ENTRY_ID)
+ // 2) complete second bookie with valid entry response. this will
trigger double-release bug described in
+ // {@link https://github.com/apache/bookkeeper/issues/1476}
+
+ Iterator<Entry<BookieSocketAddress, ReadLastConfirmedAndEntryHolder>>
iter = callbacks.entrySet().iterator();
+ assertTrue(iter.hasNext());
+ Entry<BookieSocketAddress, ReadLastConfirmedAndEntryHolder>
firstBookieEntry = iter.next();
+ ReadLastConfirmedAndEntryHolder firstBookieHolder =
firstBookieEntry.getValue();
+ ReadLastConfirmedAndEntryContext firstContext =
firstBookieHolder.context;
+ firstContext.setLastAddConfirmed(entryId);
+ firstBookieHolder.getCallback()
+ .readEntryComplete(Code.OK, LEDGERID,
BookieProtocol.INVALID_ENTRY_ID, null, firstContext);
+
+ // readEntryComplete above will release the entry impl back to the
object pools.
+ // we want to make sure after the entry is recycled, it will not be
mutated by any future callbacks.
+ LedgerEntryImpl entry = LedgerEntryImpl.create(LEDGERID,
Long.MAX_VALUE);
+
+ assertTrue(iter.hasNext());
+ Entry<BookieSocketAddress, ReadLastConfirmedAndEntryHolder>
secondBookieEntry = iter.next();
+ ReadLastConfirmedAndEntryHolder secondBookieHolder =
secondBookieEntry.getValue();
+ ReadLastConfirmedAndEntryContext secondContext =
secondBookieHolder.context;
+ secondContext.setLastAddConfirmed(entryId);
+ secondBookieHolder.getCallback().readEntryComplete(
+ Code.OK, LEDGERID, entryId,
Unpooled.wrappedBuffer(bytesWithDigest), secondContext);
+
+ // the recycled entry shouldn't be updated by any future callbacks.
+ assertNull(entry.getEntryBuffer());
+ entry.close();
+
+ // wait for results
+ try (LastConfirmedAndEntry lacAndEntry =
FutureUtils.result(resultFuture)) {
+ assertEquals(entryId, lacAndEntry.getLastAddConfirmed());
+ assertNull(lacAndEntry.getEntry());
+ }
+ }
+
+}
--
To stop receiving notification emails like this one, please contact
[email protected].