This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/branch-4.7 by this push:
     new c7b1610  ISSUE #1476: LedgerEntry is recycled twice at 
ReadLastConfirmedAndEntryOp
c7b1610 is described below

commit c7b1610bb0dac9d46b80e5f607248b9b391ec267
Author: Sijie Guo <[email protected]>
AuthorDate: Thu Jun 14 02:13:22 2018 -0700

    ISSUE #1476: LedgerEntry is recycled twice at ReadLastConfirmedAndEntryOp
    
    Descriptions of the changes in this PR:
    
    The issue #1476 is caused by peculative reads with object recycling, same 
request object will be added to the CompletionObjects multiple times with 
different txnid.  In fact the logic of process the request already take this 
into account, only on place inside 
`ReadLastConfirmedAndEntryOp.requestComplete` forget to check requestComplete 
before calling `submitCallback` which in turn call request.close.
    
    ### Motivation
    
    to fix #1476
    
    ### Changes
    
    check `requestComplete` before `submitCallback` in 
`ReadLastConfirmedAndEntryOp.requestComplete`
    
    Master Issue: #1476
    
    Author: Sijie Guo <[email protected]>
    Author: infodog <[email protected]>
    Author: zhengxiangyang <[email protected]>
    
    Reviewers: Enrico Olivelli <[email protected]>, Jia Zhai <None>
    
    This closes #1509 from infodog/issue1476, closes #1476
    
    (cherry picked from commit 6476fc323fffd328ed9620d9aa39c4c232f8d2be)
    Signed-off-by: Sijie Guo <[email protected]>
---
 .../org/apache/bookkeeper/client/BookKeeper.java   |   4 +
 .../client/ReadLastConfirmedAndEntryOp.java        |  52 +++--
 .../client/impl/LastConfirmedAndEntryImpl.java     |  14 +-
 .../client/ReadLastConfirmedAndEntryOpTest.java    | 252 +++++++++++++++++++++
 4 files changed, 293 insertions(+), 29 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index bd0a12d..1ee0595 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -742,6 +742,10 @@ public class BookKeeper implements 
org.apache.bookkeeper.client.api.BookKeeper {
         }
     }
 
+    boolean shouldReorderReadSequence() {
+        return reorderReadSequence;
+    }
+
     ZooKeeper getZkHandle() {
         return ((ZKMetadataClientDriver) metadataDriver).getZk();
     }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
index b9888ba..1b3fa70 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
@@ -81,9 +81,9 @@ class ReadLastConfirmedAndEntryOp implements 
BookkeeperInternalCallbacks.ReadEnt
         ReadLACAndEntryRequest(ArrayList<BookieSocketAddress> ensemble, long 
lId, long eId) {
             this.entryImpl = LedgerEntryImpl.create(lId, eId);
             this.ensemble = ensemble;
-            this.writeSet = 
lh.distributionSchedule.getWriteSetForLongPoll(eId);
-            if (lh.bk.reorderReadSequence) {
-                this.orderedEnsemble = 
lh.bk.placementPolicy.reorderReadLACSequence(ensemble,
+            this.writeSet = 
lh.getDistributionSchedule().getWriteSetForLongPoll(eId);
+            if (lh.getBk().shouldReorderReadSequence()) {
+                this.orderedEnsemble = 
lh.getBk().getPlacementPolicy().reorderReadLACSequence(ensemble,
                         lh.getBookiesHealthInfo(), writeSet.copy());
             } else {
                 this.orderedEnsemble = writeSet.copy();
@@ -118,7 +118,7 @@ class ReadLastConfirmedAndEntryOp implements 
BookkeeperInternalCallbacks.ReadEnt
         boolean complete(int bookieIndex, BookieSocketAddress host, final 
ByteBuf buffer, long entryId) {
             ByteBuf content;
             try {
-                content = lh.macManager.verifyDigestAndReturnData(entryId, 
buffer);
+                content = 
lh.getDigestManager().verifyDigestAndReturnData(entryId, buffer);
             } catch (BKException.BKDigestMatchException e) {
                 logErrorAndReattemptRead(bookieIndex, host, "Mac mismatch", 
BKException.Code.DigestMatchException);
                 return false;
@@ -201,7 +201,7 @@ class ReadLastConfirmedAndEntryOp implements 
BookkeeperInternalCallbacks.ReadEnt
 
             if (LOG.isDebugEnabled()) {
                 LOG.debug("{} while reading entry: {} ledgerId: {} from 
bookie: {}", errMsg, entryImpl.getEntryId(),
-                        lh.ledgerId, host);
+                        lh.getId(), host);
             }
         }
 
@@ -417,7 +417,7 @@ class ReadLastConfirmedAndEntryOp implements 
BookkeeperInternalCallbacks.ReadEnt
                 for (int i = 0; i < numReplicasTried; i++) {
                     int slowBookieIndex = orderedEnsemble.get(i);
                     BookieSocketAddress slowBookieSocketAddress = 
ensemble.get(slowBookieIndex);
-                    
lh.bk.placementPolicy.registerSlowBookie(slowBookieSocketAddress, entryId);
+                    
lh.getBk().getPlacementPolicy().registerSlowBookie(slowBookieSocketAddress, 
entryId);
                 }
             }
             return completed;
@@ -449,7 +449,7 @@ class ReadLastConfirmedAndEntryOp implements 
BookkeeperInternalCallbacks.ReadEnt
     }
 
     protected LedgerMetadata getLedgerMetadata() {
-        return lh.metadata;
+        return lh.getLedgerMetadata();
     }
 
     ReadLastConfirmedAndEntryOp parallelRead(boolean enabled) {
@@ -462,7 +462,7 @@ class ReadLastConfirmedAndEntryOp implements 
BookkeeperInternalCallbacks.ReadEnt
      */
     @Override
     public ListenableFuture<Boolean> issueSpeculativeRequest() {
-        return lh.bk.getMainWorkerPool().submitOrdered(lh.getId(), new 
Callable<Boolean>() {
+        return lh.getBk().getMainWorkerPool().submitOrdered(lh.getId(), new 
Callable<Boolean>() {
             @Override
             public Boolean call() throws Exception {
                 if (!requestComplete.get() && !request.isComplete()
@@ -480,14 +480,14 @@ class ReadLastConfirmedAndEntryOp implements 
BookkeeperInternalCallbacks.ReadEnt
 
     public void initiate() {
         if (parallelRead) {
-            request = new ParallelReadRequest(lh.metadata.currentEnsemble, 
lh.ledgerId, prevEntryId + 1);
+            request = new 
ParallelReadRequest(lh.getLedgerMetadata().currentEnsemble, lh.getId(), 
prevEntryId + 1);
         } else {
-            request = new SequenceReadRequest(lh.metadata.currentEnsemble, 
lh.ledgerId, prevEntryId + 1);
+            request = new 
SequenceReadRequest(lh.getLedgerMetadata().currentEnsemble, lh.getId(), 
prevEntryId + 1);
         }
         request.read();
 
-        if (!parallelRead && 
lh.bk.getReadLACSpeculativeRequestPolicy().isPresent()) {
-            
lh.bk.getReadLACSpeculativeRequestPolicy().get().initiateSpeculativeRequest(scheduler,
 this);
+        if (!parallelRead && 
lh.getBk().getReadLACSpeculativeRequestPolicy().isPresent()) {
+            
lh.getBk().getReadLACSpeculativeRequestPolicy().get().initiateSpeculativeRequest(scheduler,
 this);
         }
     }
 
@@ -496,8 +496,8 @@ class ReadLastConfirmedAndEntryOp implements 
BookkeeperInternalCallbacks.ReadEnt
             LOG.debug("Calling Read LAC and Entry with {} and long polling 
interval {} on Bookie {} - Parallel {}",
                     prevEntryId, timeOutInMillis, to, parallelRead);
         }
-        lh.bk.getBookieClient().readEntryWaitForLACUpdate(to,
-            lh.ledgerId,
+        lh.getBk().getBookieClient().readEntryWaitForLACUpdate(to,
+            lh.getId(),
             BookieProtocol.LAST_ADD_CONFIRMED,
             prevEntryId,
             timeOutInMillis,
@@ -517,12 +517,12 @@ class ReadLastConfirmedAndEntryOp implements 
BookkeeperInternalCallbacks.ReadEnt
         long latencyMicros = MathUtils.elapsedMicroSec(requestTimeNano);
         LedgerEntry entry;
         if (BKException.Code.OK != rc) {
-            lh.bk.getReadLacAndEntryOpLogger()
+            lh.getBk().getReadLacAndEntryOpLogger()
                 .registerFailedEvent(latencyMicros, TimeUnit.MICROSECONDS);
             entry = null;
         } else {
             // could received advanced lac, with no entry
-            lh.bk.getReadLacAndEntryOpLogger()
+            lh.getBk().getReadLacAndEntryOpLogger()
                 .registerSuccessfulEvent(latencyMicros, TimeUnit.MICROSECONDS);
             if (request.entryImpl.getEntryBuffer() != null) {
                 entry = new LedgerEntry(request.entryImpl);
@@ -558,18 +558,20 @@ class ReadLastConfirmedAndEntryOp implements 
BookkeeperInternalCallbacks.ReadEnt
 
             if (entryId != BookieProtocol.LAST_ADD_CONFIRMED) {
                 buffer.retain();
-                if (request.complete(rCtx.getBookieIndex(), bookie, buffer, 
entryId)) {
+                if (!requestComplete.get() && 
request.complete(rCtx.getBookieIndex(), bookie, buffer, entryId)) {
                     // callback immediately
                     if (rCtx.getLacUpdateTimestamp().isPresent()) {
                         long elapsedMicros = 
TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()
                                 - rCtx.getLacUpdateTimestamp().get());
                         elapsedMicros = Math.max(elapsedMicros, 0);
-                        lh.bk.getReadLacAndEntryRespLogger()
-                            .registerSuccessfulEvent(elapsedMicros, 
TimeUnit.MICROSECONDS);
+                        lh.getBk().getReadLacAndEntryRespLogger()
+                                .registerSuccessfulEvent(elapsedMicros, 
TimeUnit.MICROSECONDS);
                     }
 
-                    submitCallback(BKException.Code.OK);
-                    requestComplete.set(true);
+                    // if the request has already completed, the buffer is not 
going to be used anymore, release it.
+                    if (!completeRequest()) {
+                        buffer.release();
+                    }
                     heardFromHostsBitSet.set(rCtx.getBookieIndex(), true);
                 } else {
                     buffer.release();
@@ -611,8 +613,9 @@ class ReadLastConfirmedAndEntryOp implements 
BookkeeperInternalCallbacks.ReadEnt
         }
     }
 
-    private void completeRequest() {
-        if (requestComplete.compareAndSet(false, true)) {
+    private boolean completeRequest() {
+        boolean requestCompleted = requestComplete.compareAndSet(false, true);
+        if (requestCompleted) {
             if (!hasValidResponse) {
                 // no success called
                 submitCallback(request.getFirstError());
@@ -621,11 +624,12 @@ class ReadLastConfirmedAndEntryOp implements 
BookkeeperInternalCallbacks.ReadEnt
                 submitCallback(BKException.Code.OK);
             }
         }
+        return requestCompleted;
     }
 
     @Override
     public String toString() {
-        return String.format("ReadLastConfirmedAndEntryOp(lid=%d, 
prevEntryId=%d])", lh.ledgerId, prevEntryId);
+        return String.format("ReadLastConfirmedAndEntryOp(lid=%d, 
prevEntryId=%d])", lh.getId(), prevEntryId);
     }
 
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/LastConfirmedAndEntryImpl.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/LastConfirmedAndEntryImpl.java
index 8f1924a..8090214 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/LastConfirmedAndEntryImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/LastConfirmedAndEntryImpl.java
@@ -41,11 +41,15 @@ public class LastConfirmedAndEntryImpl implements 
LastConfirmedAndEntry {
     public static LastConfirmedAndEntryImpl create(long lac, 
org.apache.bookkeeper.client.LedgerEntry entry) {
         LastConfirmedAndEntryImpl entryImpl = RECYCLER.get();
         entryImpl.lac = lac;
-        entryImpl.entry = LedgerEntryImpl.create(
-            entry.getLedgerId(),
-            entry.getEntryId(),
-            entry.getLength(),
-            entry.getEntryBuffer());
+        if (null == entry) {
+            entryImpl.entry = null;
+        } else {
+            entryImpl.entry = LedgerEntryImpl.create(
+                entry.getLedgerId(),
+                entry.getEntryId(),
+                entry.getLength(),
+                entry.getEntryBuffer());
+        }
         return entryImpl;
     }
 
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java
new file mode 100644
index 0000000..f0a03b0
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.client;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.common.base.Optional;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BKException.Code;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import 
org.apache.bookkeeper.client.ReadLastConfirmedAndEntryOp.LastConfirmedAndEntryCallback;
+import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
+import org.apache.bookkeeper.client.impl.LastConfirmedAndEntryImpl;
+import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.proto.BookieProtocol;
+import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.proto.ReadLastConfirmedAndEntryContext;
+import org.apache.bookkeeper.proto.checksum.DigestManager;
+import org.apache.bookkeeper.proto.checksum.DummyDigestManager;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.test.TestStatsProvider;
+import org.apache.bookkeeper.util.ByteBufList;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test {@link ReadLastConfirmedAndEntryOp} with mocks.
+ */
+@Slf4j
+public class ReadLastConfirmedAndEntryOpTest {
+
+    private static final long LEDGERID = System.currentTimeMillis();
+
+    private final TestStatsProvider testStatsProvider = new 
TestStatsProvider();
+    private OpStatsLogger readLacAndEntryOpLogger;
+    private BookieClient mockBookieClient;
+    private BookKeeper mockBk;
+    private LedgerHandle mockLh;
+    private ScheduledExecutorService scheduler;
+    private OrderedScheduler orderedScheduler;
+    private SpeculativeRequestExecutionPolicy speculativePolicy;
+    private LedgerMetadata ledgerMetadata;
+    private DistributionSchedule distributionSchedule;
+    private DigestManager digestManager;
+
+    @Before
+    public void setup() throws Exception {
+        // stats
+        this.readLacAndEntryOpLogger = testStatsProvider
+            .getStatsLogger("").getOpStatsLogger("readLacAndEntry");
+        // policy
+        this.speculativePolicy = new DefaultSpeculativeRequestExecutionPolicy(
+            100, 200, 2);
+        // metadata
+        this.ledgerMetadata =
+            new LedgerMetadata(3, 3, 2, DigestType.CRC32, new byte[0]);
+        ArrayList<BookieSocketAddress> ensemble = new ArrayList<>(3);
+        for (int i = 0; i < 3; i++) {
+            ensemble.add(new BookieSocketAddress("127.0.0.1", 3181 + i));
+        }
+        this.ledgerMetadata.addEnsemble(0L, ensemble);
+        this.distributionSchedule = new RoundRobinDistributionSchedule(3, 2, 
3);
+        // schedulers
+        this.scheduler = Executors.newSingleThreadScheduledExecutor();
+        this.orderedScheduler = OrderedScheduler.newSchedulerBuilder()
+            .name("test-ordered-scheduler")
+            .numThreads(1)
+            .build();
+
+        this.mockBookieClient = mock(BookieClient.class);
+
+        this.mockBk = mock(BookKeeper.class);
+        
when(mockBk.getReadLACSpeculativeRequestPolicy()).thenReturn(Optional.of(speculativePolicy));
+        when(mockBk.getBookieClient()).thenReturn(mockBookieClient);
+        
when(mockBk.getReadLacAndEntryOpLogger()).thenReturn(readLacAndEntryOpLogger);
+        when(mockBk.getMainWorkerPool()).thenReturn(orderedScheduler);
+        EnsemblePlacementPolicy mockPlacementPolicy = 
mock(EnsemblePlacementPolicy.class);
+        when(mockBk.getPlacementPolicy()).thenReturn(mockPlacementPolicy);
+        this.mockLh = mock(LedgerHandle.class);
+        when(mockLh.getBk()).thenReturn(mockBk);
+        when(mockLh.getId()).thenReturn(LEDGERID);
+        when(mockLh.getLedgerMetadata()).thenReturn(ledgerMetadata);
+        
when(mockLh.getDistributionSchedule()).thenReturn(distributionSchedule);
+        digestManager = new DummyDigestManager(LEDGERID, false);
+        when(mockLh.getDigestManager()).thenReturn(digestManager);
+    }
+
+    @After
+    public void teardown() {
+        this.scheduler.shutdown();
+        this.orderedScheduler.shutdown();
+    }
+
+    @Data
+    static class ReadLastConfirmedAndEntryHolder {
+
+        private final BookieSocketAddress address;
+        private final ReadEntryCallback callback;
+        private final ReadLastConfirmedAndEntryContext context;
+
+    }
+
+    /**
+     * Test case: handling different speculative responses. one speculative 
response might return a valid response
+     * with a read entry, while the other speculative response might return a 
valid response without an entry.
+     * {@link ReadLastConfirmedAndEntryOp} should handle both responses well.
+     *
+     * <p>This test case covers {@link 
https://github.com/apache/bookkeeper/issues/1476}.
+     */
+    @Test
+    public void testSpeculativeResponses() throws Exception {
+        final long entryId = 2L;
+        final long lac = 1L;
+
+        ByteBuf data = Unpooled.copiedBuffer("test-speculative-responses", 
UTF_8);
+        ByteBufList dataWithDigest = 
digestManager.computeDigestAndPackageForSending(
+            entryId, lac, data.readableBytes(), data);
+        byte[] bytesWithDigest = new byte[dataWithDigest.readableBytes()];
+        assertEquals(bytesWithDigest.length, 
dataWithDigest.getBytes(bytesWithDigest));
+
+        final Map<BookieSocketAddress, ReadLastConfirmedAndEntryHolder> 
callbacks =
+            Collections.synchronizedMap(new HashMap<>());
+        doAnswer(invocationOnMock -> {
+            BookieSocketAddress address = invocationOnMock.getArgument(0);
+            ReadEntryCallback callback = invocationOnMock.getArgument(6);
+            ReadLastConfirmedAndEntryContext context = 
invocationOnMock.getArgument(7);
+
+            ReadLastConfirmedAndEntryHolder holder = new 
ReadLastConfirmedAndEntryHolder(address, callback, context);
+
+            log.info("Received read request to bookie {}", address);
+
+            callbacks.put(address, holder);
+            return null;
+        }).when(mockBookieClient).readEntryWaitForLACUpdate(
+            any(BookieSocketAddress.class),
+            anyLong(),
+            anyLong(),
+            anyLong(),
+            anyLong(),
+            anyBoolean(),
+            any(ReadEntryCallback.class),
+            any()
+        );
+
+        CompletableFuture<LastConfirmedAndEntry> resultFuture = new 
CompletableFuture<>();
+        LastConfirmedAndEntryCallback resultCallback = (rc, lastAddConfirmed, 
entry) -> {
+            if (Code.OK != rc) {
+                FutureUtils.completeExceptionally(resultFuture, 
BKException.create(rc));
+            } else {
+                FutureUtils.complete(resultFuture, 
LastConfirmedAndEntryImpl.create(lastAddConfirmed, entry));
+            }
+        };
+
+        ReadLastConfirmedAndEntryOp op = new ReadLastConfirmedAndEntryOp(
+            mockLh,
+            resultCallback,
+            1L,
+            10000,
+            scheduler
+        );
+        op.initiate();
+
+        // wait until all speculative requests are sent
+        while (callbacks.size() < 3) {
+            log.info("Received {} read requests", callbacks.size());
+            Thread.sleep(100);
+        }
+
+        log.info("All speculative reads are outstanding now.");
+
+        // once all the speculative reads are outstanding. complete the 
requests in following sequence:
+
+        // 1) complete one bookie with empty response (OK, entryId = 
INVALID_ENTRY_ID)
+        // 2) complete second bookie with valid entry response. this will 
trigger double-release bug described in
+        //    {@link https://github.com/apache/bookkeeper/issues/1476}
+
+        Iterator<Entry<BookieSocketAddress, ReadLastConfirmedAndEntryHolder>> 
iter = callbacks.entrySet().iterator();
+        assertTrue(iter.hasNext());
+        Entry<BookieSocketAddress, ReadLastConfirmedAndEntryHolder> 
firstBookieEntry = iter.next();
+        ReadLastConfirmedAndEntryHolder firstBookieHolder = 
firstBookieEntry.getValue();
+        ReadLastConfirmedAndEntryContext firstContext = 
firstBookieHolder.context;
+        firstContext.setLastAddConfirmed(entryId);
+        firstBookieHolder.getCallback()
+            .readEntryComplete(Code.OK, LEDGERID, 
BookieProtocol.INVALID_ENTRY_ID, null, firstContext);
+
+        // readEntryComplete above will release the entry impl back to the 
object pools.
+        // we want to make sure after the entry is recycled, it will not be 
mutated by any future callbacks.
+        LedgerEntryImpl entry = LedgerEntryImpl.create(LEDGERID, 
Long.MAX_VALUE);
+
+        assertTrue(iter.hasNext());
+        Entry<BookieSocketAddress, ReadLastConfirmedAndEntryHolder> 
secondBookieEntry = iter.next();
+        ReadLastConfirmedAndEntryHolder secondBookieHolder = 
secondBookieEntry.getValue();
+        ReadLastConfirmedAndEntryContext secondContext = 
secondBookieHolder.context;
+        secondContext.setLastAddConfirmed(entryId);
+        secondBookieHolder.getCallback().readEntryComplete(
+            Code.OK, LEDGERID, entryId, 
Unpooled.wrappedBuffer(bytesWithDigest), secondContext);
+
+        // the recycled entry shouldn't be updated by any future callbacks.
+        assertNull(entry.getEntryBuffer());
+        entry.close();
+
+        // wait for results
+        try (LastConfirmedAndEntry lacAndEntry = 
FutureUtils.result(resultFuture)) {
+            assertEquals(entryId, lacAndEntry.getLastAddConfirmed());
+            assertNull(lacAndEntry.getEntry());
+        }
+    }
+
+}

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to