This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 8740314944c [fix][broker] Fix shadow topics cannot be consumed when 
the entry is not cached (#23147)
8740314944c is described below

commit 8740314944cc97bb586b333ebf98762ffc9be5dc
Author: Yunze Xu <[email protected]>
AuthorDate: Thu Aug 15 11:57:52 2024 +0800

    [fix][broker] Fix shadow topics cannot be consumed when the entry is not 
cached (#23147)
    
    For shadow topics, a `ReadOnlyLedgerHandle` is created to read messages 
from the source topic when the entry is not cached. However, it leverages the 
`readAsync` API that validates the `lastAddConfirmed` field (LAC). In 
`ReadOnlyLedgerHandle`, this field could never be updated, so `readAsync` could 
fail immediately. See `LedgerHandle#readAsync`:
    
    ```java
    if (lastEntry > lastAddConfirmed) {
        LOG.error("ReadAsync exception on ledgerId:{} firstEntry:{} 
lastEntry:{} lastAddConfirmed:{}",
                ledgerId, firstEntry, lastEntry, lastAddConfirmed);
        return FutureUtils.exception(new BKReadException());
    }
    ```
    
    This bug is not exposed because:
    1. `PulsarMockReadHandle` does not maintain a LAC field.
    2. The case for cache miss is never tested.
    
    Replace `readAsync` with `readUnconfirmedAsync` and compare the entry range 
with the `ManagedLedger#getLastConfirmedEntry`. The managed ledger already 
maintains a `lastConfirmedEntry` to limit the last entry. See 
`ManagedLedgerImpl#internalReadFromLedger`:
    
    ```java
    Position lastPosition = lastConfirmedEntry;
    
    if (ledger.getId() == lastPosition.getLedgerId()) {
        lastEntryInLedger = lastPosition.getEntryId();
    ```
    
    Add `ShadowTopicRealBkTest` to cover two code changes 
`RangeEntryCacheImpl#readFromStorage` and `EntryCache#asyncReadEntry`.
    
    Exceptionally, compare the entry range with the LAC of a ledger handle when 
it does not exist in the managed ledger. It's because 
`ReadOnlyManagedLedgerImpl` could read a ledger in another managed ledger.
    
    <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
    
    - [ ] `doc` <!-- Your PR contains doc changes. -->
    - [ ] `doc-required` <!-- Your PR changes impact docs and you will update 
later -->
    - [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
    - [ ] `doc-complete` <!-- Docs have been already added -->
    
    PR in forked repository: https://github.com/BewareMyPower/pulsar/pull/33
    
    <!--
    After opening this PR, the build in apache/pulsar will fail and 
instructions will
    be provided for opening a PR in the PR author's forked repository.
    
    apache/pulsar pull requests should be first tested in your own fork since 
the
    apache/pulsar CI based on GitHub Actions has constrained resources and 
quota.
    GitHub Actions provides separate quota for pull requests that are executed 
in
    a forked repository.
    
    The tests will be run in the forked repository until all PR review comments 
have
    been handled, the tests pass and the PR is approved by a reviewer.
    -->
    
    (cherry picked from commit 15b88d250818bada5c1a94f5c54ef7806f88a500)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |   2 +
 .../mledger/impl/cache/EntryCacheDisabled.java     |   4 +-
 .../mledger/impl/cache/RangeEntryCacheImpl.java    |   4 +-
 .../mledger/impl/cache/ReadEntryUtils.java         |  54 ++++++
 .../mledger/impl/EntryCacheManagerTest.java        |   7 +-
 .../bookkeeper/mledger/impl/EntryCacheTest.java    | 186 +++++++++------------
 .../mledger/impl/OffloadPrefixReadTest.java        |   2 +-
 .../service/persistent/ShadowTopicRealBkTest.java  | 109 ++++++++++++
 8 files changed, 252 insertions(+), 116 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index b91cefb1520..1b1bc03f4bc 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -4054,6 +4054,8 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
     public static ManagedLedgerException 
createManagedLedgerException(Throwable t) {
         if (t instanceof org.apache.bookkeeper.client.api.BKException) {
             return 
createManagedLedgerException(((org.apache.bookkeeper.client.api.BKException) 
t).getCode());
+        } else if (t instanceof ManagedLedgerException) {
+            return (ManagedLedgerException) t;
         } else if (t instanceof CompletionException
                 && !(t.getCause() instanceof CompletionException) /* check to 
avoid stackoverlflow */) {
             return createManagedLedgerException(t.getCause());
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java
index d1050e00628..64595dfe47e 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java
@@ -79,7 +79,7 @@ public class EntryCacheDisabled implements EntryCache {
     @Override
     public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, 
boolean isSlowestReader,
                                final AsyncCallbacks.ReadEntriesCallback 
callback, Object ctx) {
-        lh.readAsync(firstEntry, lastEntry).thenAcceptAsync(
+        ReadEntryUtils.readAsync(ml, lh, firstEntry, 
lastEntry).thenAcceptAsync(
                 ledgerEntries -> {
                     List<Entry> entries = new ArrayList<>();
                     long totalSize = 0;
@@ -107,7 +107,7 @@ public class EntryCacheDisabled implements EntryCache {
     @Override
     public void asyncReadEntry(ReadHandle lh, PositionImpl position, 
AsyncCallbacks.ReadEntryCallback callback,
                                Object ctx) {
-        lh.readAsync(position.getEntryId(), 
position.getEntryId()).whenCompleteAsync(
+        ReadEntryUtils.readAsync(ml, lh, position.getEntryId(), 
position.getEntryId()).whenCompleteAsync(
                 (ledgerEntries, exception) -> {
                     if (exception != null) {
                         ml.invalidateLedgerHandle(lh);
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
index 27aec6f178e..21eb62e5a8c 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
@@ -248,7 +248,7 @@ public class RangeEntryCacheImpl implements EntryCache {
             manager.mlFactoryMBean.recordCacheHit(cachedEntry.getLength());
             callback.readEntryComplete(cachedEntry, ctx);
         } else {
-            lh.readAsync(position.getEntryId(), 
position.getEntryId()).thenAcceptAsync(
+            ReadEntryUtils.readAsync(ml, lh, position.getEntryId(), 
position.getEntryId()).thenAcceptAsync(
                     ledgerEntries -> {
                         try {
                             Iterator<LedgerEntry> iterator = 
ledgerEntries.iterator();
@@ -428,7 +428,7 @@ public class RangeEntryCacheImpl implements EntryCache {
     CompletableFuture<List<EntryImpl>> readFromStorage(ReadHandle lh,
                                                        long firstEntry, long 
lastEntry, boolean shouldCacheEntry) {
         final int entriesToRead = (int) (lastEntry - firstEntry) + 1;
-        CompletableFuture<List<EntryImpl>> readResult = 
lh.readAsync(firstEntry, lastEntry)
+        CompletableFuture<List<EntryImpl>> readResult = 
ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry)
                 .thenApply(
                         ledgerEntries -> {
                             requireNonNull(ml.getName());
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java
new file mode 100644
index 00000000000..5cf5f053f0c
--- /dev/null
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl.cache;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+
+class ReadEntryUtils {
+
+    static CompletableFuture<LedgerEntries> readAsync(ManagedLedger ml, 
ReadHandle handle, long firstEntry,
+                                                      long lastEntry) {
+        if (ml.getOptionalLedgerInfo(handle.getId()).isEmpty()) {
+            // The read handle comes from another managed ledger, in this 
case, we can only compare the entry range with
+            // the LAC of that read handle. Specifically, it happens when this 
method is called by a
+            // ReadOnlyManagedLedgerImpl object.
+            return handle.readAsync(firstEntry, lastEntry);
+        }
+        // Compare the entry range with the lastConfirmedEntry maintained by 
the managed ledger because the entry cache
+        // of `ShadowManagedLedgerImpl` reads entries via 
`ReadOnlyLedgerHandle`, which never updates `lastAddConfirmed`
+        final var lastConfirmedEntry = ml.getLastConfirmedEntry();
+        if (lastConfirmedEntry == null) {
+            return CompletableFuture.failedFuture(new ManagedLedgerException(
+                    "LastConfirmedEntry is null when reading ledger " + 
handle.getId()));
+        }
+        if (handle.getId() > lastConfirmedEntry.getLedgerId()) {
+            return CompletableFuture.failedFuture(new 
ManagedLedgerException("LastConfirmedEntry is "
+                    + lastConfirmedEntry + " when reading ledger " + 
handle.getId()));
+        }
+        if (handle.getId() == lastConfirmedEntry.getLedgerId() && lastEntry > 
lastConfirmedEntry.getEntryId()) {
+            return CompletableFuture.failedFuture(new 
ManagedLedgerException("LastConfirmedEntry is "
+                    + lastConfirmedEntry + " when reading entry " + 
lastEntry));
+        }
+        return handle.readUnconfirmedAsync(firstEntry, lastEntry);
+    }
+}
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
index 1ab3198498a..de8c6f5d7d0 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
@@ -28,6 +28,7 @@ import static org.testng.Assert.assertTrue;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -43,6 +44,7 @@ import 
org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
 import org.apache.bookkeeper.mledger.impl.cache.EntryCache;
 import org.apache.bookkeeper.mledger.impl.cache.EntryCacheDisabled;
 import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -390,6 +392,9 @@ public class EntryCacheManagerTest extends 
MockedBookKeeperTestCase {
         EntryCache entryCache = cacheManager.getEntryCache(ml1);
 
         final CountDownLatch counter = new CountDownLatch(1);
+        when(ml1.getLastConfirmedEntry()).thenReturn(new PositionImpl(1L, 1L));
+        
when(ml1.getOptionalLedgerInfo(lh.getId())).thenReturn(Optional.of(mock(
+                MLDataFormats.ManagedLedgerInfo.LedgerInfo.class)));
         entryCache.asyncReadEntry(lh, new PositionImpl(1L,1L), new 
AsyncCallbacks.ReadEntryCallback() {
             public void readEntryComplete(Entry entry, Object ctx) {
                 Assert.assertNotEquals(entry, null);
@@ -404,7 +409,7 @@ public class EntryCacheManagerTest extends 
MockedBookKeeperTestCase {
         }, null);
         counter.await();
 
-        verify(lh).readAsync(anyLong(), anyLong());
+        verify(lh).readUnconfirmedAsync(anyLong(), anyLong());
     }
 
 }
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java
index c8338798f27..3964f62e8e6 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java
@@ -25,14 +25,16 @@ import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
 
 import io.netty.buffer.Unpooled;
-
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
-
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
 import lombok.Cleanup;
 import org.apache.bookkeeper.client.BKException.BKNoSuchLedgerExistsException;
 import org.apache.bookkeeper.client.api.LedgerEntries;
@@ -45,8 +47,8 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.impl.cache.EntryCache;
 import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
-import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class EntryCacheTest extends MockedBookKeeperTestCase {
@@ -60,6 +62,8 @@ public class EntryCacheTest extends MockedBookKeeperTestCase {
         when(ml.getExecutor()).thenReturn(executor);
         when(ml.getMbean()).thenReturn(new ManagedLedgerMBeanImpl(ml));
         when(ml.getConfig()).thenReturn(new ManagedLedgerConfig());
+        when(ml.getOptionalLedgerInfo(0L)).thenReturn(Optional.of(mock(
+                MLDataFormats.ManagedLedgerInfo.LedgerInfo.class)));
     }
 
     @Test(timeOut = 5000)
@@ -76,22 +80,13 @@ public class EntryCacheTest extends 
MockedBookKeeperTestCase {
             entryCache.insert(EntryImpl.create(0, i, data));
         }
 
-        final CountDownLatch counter = new CountDownLatch(1);
-
-        entryCache.asyncReadEntry(lh, 0, 9, false, new ReadEntriesCallback() {
-            public void readEntriesComplete(List<Entry> entries, Object ctx) {
-                assertEquals(entries.size(), 10);
-                entries.forEach(Entry::release);
-                counter.countDown();
-            }
-
-            public void readEntriesFailed(ManagedLedgerException exception, 
Object ctx) {
-                Assert.fail("should not have failed");
-            }
-        }, null);
-        counter.await();
+        when(ml.getLastConfirmedEntry()).thenReturn(new PositionImpl(0, 9));
+        final var entries = readEntry(entryCache, lh, 0, 9, false, null);
+        assertEquals(entries.size(), 10);
+        entries.forEach(Entry::release);
 
         // Verify no entries were read from bookkeeper
+        verify(lh, never()).readUnconfirmedAsync(anyLong(), anyLong());
         verify(lh, never()).readAsync(anyLong(), anyLong());
     }
 
@@ -109,19 +104,9 @@ public class EntryCacheTest extends 
MockedBookKeeperTestCase {
             entryCache.insert(EntryImpl.create(0, i, data));
         }
 
-        final CountDownLatch counter = new CountDownLatch(1);
-
-        entryCache.asyncReadEntry(lh, 0, 9, false, new ReadEntriesCallback() {
-            public void readEntriesComplete(List<Entry> entries, Object ctx) {
-                assertEquals(entries.size(), 10);
-                counter.countDown();
-            }
-
-            public void readEntriesFailed(ManagedLedgerException exception, 
Object ctx) {
-                Assert.fail("should not have failed");
-            }
-        }, null);
-        counter.await();
+        when(ml.getLastConfirmedEntry()).thenReturn(new PositionImpl(0, 9));
+        final var entries = readEntry(entryCache, lh, 0, 9, false, null);
+        assertEquals(entries.size(), 10);
     }
 
     @Test(timeOut = 5000)
@@ -138,19 +123,9 @@ public class EntryCacheTest extends 
MockedBookKeeperTestCase {
             entryCache.insert(EntryImpl.create(0, i, data));
         }
 
-        final CountDownLatch counter = new CountDownLatch(1);
-
-        entryCache.asyncReadEntry(lh, 0, 9, false, new ReadEntriesCallback() {
-            public void readEntriesComplete(List<Entry> entries, Object ctx) {
-                assertEquals(entries.size(), 10);
-                counter.countDown();
-            }
-
-            public void readEntriesFailed(ManagedLedgerException exception, 
Object ctx) {
-                Assert.fail("should not have failed");
-            }
-        }, null);
-        counter.await();
+        when(ml.getLastConfirmedEntry()).thenReturn(new PositionImpl(0, 9));
+        final var entries = readEntry(entryCache, lh, 0, 9, false, null);
+        assertEquals(entries.size(), 10);
     }
 
     @Test(timeOut = 5000)
@@ -168,19 +143,9 @@ public class EntryCacheTest extends 
MockedBookKeeperTestCase {
         entryCache.insert(EntryImpl.create(0, 8, data));
         entryCache.insert(EntryImpl.create(0, 9, data));
 
-        final CountDownLatch counter = new CountDownLatch(1);
-
-        entryCache.asyncReadEntry(lh, 0, 9, false, new ReadEntriesCallback() {
-            public void readEntriesComplete(List<Entry> entries, Object ctx) {
-                assertEquals(entries.size(), 10);
-                counter.countDown();
-            }
-
-            public void readEntriesFailed(ManagedLedgerException exception, 
Object ctx) {
-                Assert.fail("should not have failed");
-            }
-        }, null);
-        counter.await();
+        when(ml.getLastConfirmedEntry()).thenReturn(new PositionImpl(0, 9));
+        final var entries = readEntry(entryCache, lh, 0, 9, false, null);
+        assertEquals(entries.size(), 10);
     }
 
     @Test(timeOut = 5000)
@@ -198,19 +163,9 @@ public class EntryCacheTest extends 
MockedBookKeeperTestCase {
         entryCache.insert(EntryImpl.create(0, 5, data));
         entryCache.insert(EntryImpl.create(0, 8, data));
 
-        final CountDownLatch counter = new CountDownLatch(1);
-
-        entryCache.asyncReadEntry(lh, 0, 9, false, new ReadEntriesCallback() {
-            public void readEntriesComplete(List<Entry> entries, Object ctx) {
-                assertEquals(entries.size(), 10);
-                counter.countDown();
-            }
-
-            public void readEntriesFailed(ManagedLedgerException exception, 
Object ctx) {
-                Assert.fail("should not have failed");
-            }
-        }, null);
-        counter.await();
+        when(ml.getLastConfirmedEntry()).thenReturn(new PositionImpl(0, 9));
+        final var entries = readEntry(entryCache, lh, 0, 9, false, null);
+        assertEquals(entries.size(), 10);
     }
 
     @Test
@@ -222,19 +177,25 @@ public class EntryCacheTest extends 
MockedBookKeeperTestCase {
         @Cleanup(value = "clear")
         EntryCache entryCache = cacheManager.getEntryCache(ml);
 
-        CompletableFuture<List<Entry>> cacheMissFutureEntries = new 
CompletableFuture<>();
-
-        entryCache.asyncReadEntry(lh, 0, 1, true, new ReadEntriesCallback() {
-            public void readEntriesComplete(List<Entry> entries, Object ctx) {
-                cacheMissFutureEntries.complete(entries);
-            }
-
-            public void readEntriesFailed(ManagedLedgerException exception, 
Object ctx) {
-                cacheMissFutureEntries.completeExceptionally(exception);
-            }
-        }, null);
-
-        List<Entry> cacheMissEntries = cacheMissFutureEntries.get();
+        readEntry(entryCache, lh, 0, 1, true, e -> {
+            assertTrue(e instanceof ManagedLedgerException);
+            assertTrue(e.getMessage().contains("LastConfirmedEntry is null 
when reading ledger 0"));
+        });
+
+        when(ml.getLastConfirmedEntry()).thenReturn(new PositionImpl(-1, -1));
+        readEntry(entryCache, lh, 0, 1, true, e -> {
+            assertTrue(e instanceof ManagedLedgerException);
+            assertTrue(e.getMessage().contains("LastConfirmedEntry is -1:-1 
when reading ledger 0"));
+        });
+
+        when(ml.getLastConfirmedEntry()).thenReturn(new PositionImpl(0, 0));
+        readEntry(entryCache, lh, 0, 1, true, e -> {
+            assertTrue(e instanceof ManagedLedgerException);
+            assertTrue(e.getMessage().contains("LastConfirmedEntry is 0:0 when 
reading entry 1"));
+        });
+
+        when(ml.getLastConfirmedEntry()).thenReturn(new PositionImpl(0, 1));
+        List<Entry> cacheMissEntries = readEntry(entryCache, lh, 0, 1, true, 
null);
         // Ensure first entry is 0 and
         assertEquals(cacheMissEntries.size(), 2);
         assertEquals(cacheMissEntries.get(0).getEntryId(), 0);
@@ -243,19 +204,7 @@ public class EntryCacheTest extends 
MockedBookKeeperTestCase {
         // Move the reader index to simulate consumption
         cacheMissEntries.get(0).getDataBuffer().readerIndex(10);
 
-        CompletableFuture<List<Entry>> cacheHitFutureEntries = new 
CompletableFuture<>();
-
-        entryCache.asyncReadEntry(lh, 0, 1, true, new ReadEntriesCallback() {
-            public void readEntriesComplete(List<Entry> entries, Object ctx) {
-                cacheHitFutureEntries.complete(entries);
-            }
-
-            public void readEntriesFailed(ManagedLedgerException exception, 
Object ctx) {
-                cacheHitFutureEntries.completeExceptionally(exception);
-            }
-        }, null);
-
-        List<Entry> cacheHitEntries = cacheHitFutureEntries.get();
+        List<Entry> cacheHitEntries = readEntry(entryCache, lh, 0, 1, true, 
null);
         assertEquals(cacheHitEntries.get(0).getEntryId(), 0);
         assertEquals(cacheHitEntries.get(0).getDataBuffer().readerIndex(), 0);
     }
@@ -269,7 +218,7 @@ public class EntryCacheTest extends 
MockedBookKeeperTestCase {
                 CompletableFuture<LedgerEntries> future = new 
CompletableFuture<>();
                 future.completeExceptionally(new 
BKNoSuchLedgerExistsException());
                 return future;
-            }).when(lh).readAsync(anyLong(), anyLong());
+            }).when(lh).readUnconfirmedAsync(anyLong(), anyLong());
 
         EntryCacheManager cacheManager = factory.getEntryCacheManager();
         @Cleanup(value = "clear")
@@ -278,18 +227,9 @@ public class EntryCacheTest extends 
MockedBookKeeperTestCase {
         byte[] data = new byte[10];
         entryCache.insert(EntryImpl.create(0, 2, data));
 
-        final CountDownLatch counter = new CountDownLatch(1);
-
-        entryCache.asyncReadEntry(lh, 0, 9, false, new ReadEntriesCallback() {
-            public void readEntriesComplete(List<Entry> entries, Object ctx) {
-                Assert.fail("should not complete");
-            }
-
-            public void readEntriesFailed(ManagedLedgerException exception, 
Object ctx) {
-                counter.countDown();
-            }
-        }, null);
-        counter.await();
+        when(ml.getLastConfirmedEntry()).thenReturn(new PositionImpl(0, 9));
+        readEntry(entryCache, lh, 0, 9, false, e ->
+                assertTrue(e instanceof 
ManagedLedgerException.LedgerNotExistException));
     }
 
     static ReadHandle getLedgerHandle() {
@@ -306,9 +246,35 @@ public class EntryCacheTest extends 
MockedBookKeeperTestCase {
                 LedgerEntries ledgerEntries = mock(LedgerEntries.class);
                 doAnswer((invocation2) -> 
entries.iterator()).when(ledgerEntries).iterator();
                 return CompletableFuture.completedFuture(ledgerEntries);
-            }).when(lh).readAsync(anyLong(), anyLong());
+            }).when(lh).readUnconfirmedAsync(anyLong(), anyLong());
 
         return lh;
     }
 
+    private List<Entry> readEntry(EntryCache entryCache, ReadHandle lh, long 
firstEntry, long lastEntry,
+                                  boolean shouldCacheEntry, 
Consumer<Throwable> assertion)
+            throws InterruptedException {
+        final var future = new CompletableFuture<List<Entry>>();
+        entryCache.asyncReadEntry(lh, firstEntry, lastEntry, shouldCacheEntry, 
new ReadEntriesCallback() {
+            @Override
+            public void readEntriesComplete(List<Entry> entries, Object ctx) {
+                future.complete(entries);
+            }
+
+            @Override
+            public void readEntriesFailed(ManagedLedgerException exception, 
Object ctx) {
+                future.completeExceptionally(exception);
+            }
+        }, null);
+        try {
+            final var entries = future.get();
+            assertNull(assertion);
+            return entries;
+        } catch (ExecutionException e) {
+            if (assertion != null) {
+                assertion.accept(e.getCause());
+            }
+            return List.of();
+        }
+    }
 }
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
index cd224e33e27..7201d76a6e5 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
@@ -313,7 +313,7 @@ public class OffloadPrefixReadTest extends 
MockedBookKeeperTestCase {
 
         @Override
         public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long 
firstEntry, long lastEntry) {
-            return unsupported();
+            return readAsync(firstEntry, lastEntry);
         }
 
         @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java
new file mode 100644
index 00000000000..9d810b06a7c
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import com.google.common.collect.Lists;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.mledger.impl.ShadowManagedLedgerImpl;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.PortManager;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class ShadowTopicRealBkTest {
+
+    private static final String cluster = "test";
+    private final int zkPort = PortManager.nextLockedFreePort();
+    private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, 
zkPort, PortManager::nextLockedFreePort);
+    private PulsarService pulsar;
+    private PulsarAdmin admin;
+
+    @BeforeClass
+    public void setup() throws Exception {
+        bk.start();
+        final var config = new ServiceConfiguration();
+        config.setClusterName(cluster);
+        config.setAdvertisedAddress("localhost");
+        config.setBrokerServicePort(Optional.of(0));
+        config.setWebServicePort(Optional.of(0));
+        config.setMetadataStoreUrl("zk:localhost:" + zkPort);
+        pulsar = new PulsarService(config);
+        pulsar.start();
+        admin = pulsar.getAdminClient();
+        admin.clusters().createCluster("test", 
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress())
+                .brokerServiceUrl(pulsar.getBrokerServiceUrl()).build());
+        admin.tenants().createTenant("public", 
TenantInfo.builder().allowedClusters(Set.of(cluster)).build());
+        admin.namespaces().createNamespace("public/default");
+    }
+
+    @AfterClass(alwaysRun = true)
+    public void cleanup() throws Exception {
+        if (pulsar != null) {
+            pulsar.close();
+        }
+        bk.stop();
+    }
+
+    @Test
+    public void testReadFromStorage() throws Exception {
+        final var sourceTopic = 
TopicName.get("test-read-from-source").toString();
+        final var shadowTopic = sourceTopic + "-shadow";
+
+        admin.topics().createNonPartitionedTopic(sourceTopic);
+        admin.topics().createShadowTopic(shadowTopic, sourceTopic);
+        admin.topics().setShadowTopics(sourceTopic, 
Lists.newArrayList(shadowTopic));
+
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(()->{
+            final var sourcePersistentTopic = (PersistentTopic) 
pulsar.getBrokerService()
+                    .getTopicIfExists(sourceTopic).get().orElseThrow();
+            final var replicator = (ShadowReplicator) 
sourcePersistentTopic.getShadowReplicators().get(shadowTopic);
+            Assert.assertNotNull(replicator);
+            Assert.assertEquals(String.valueOf(replicator.getState()), 
"Started");
+        });
+
+        final var client = pulsar.getClient();
+        // When the message was sent, there is no cursor, so it will read from 
the cache
+        final var producer = client.newProducer().topic(sourceTopic).create();
+        producer.send("message".getBytes());
+        // 1. Verify RangeEntryCacheImpl#readFromStorage
+        final var consumer = 
client.newConsumer().topic(shadowTopic).subscriptionName("sub")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
+        final var msg = consumer.receive(5, TimeUnit.SECONDS);
+        Assert.assertNotNull(msg);
+        Assert.assertEquals(msg.getValue(), "message".getBytes());
+
+        // 2. Verify EntryCache#asyncReadEntry
+        final var shadowManagedLedger = ((PersistentTopic) 
pulsar.getBrokerService().getTopicIfExists(shadowTopic).get()
+                .orElseThrow()).getManagedLedger();
+        Assert.assertTrue(shadowManagedLedger instanceof 
ShadowManagedLedgerImpl);
+        shadowManagedLedger.getEarliestMessagePublishTimeInBacklog().get(3, 
TimeUnit.SECONDS);
+    }
+}

Reply via email to