This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 8740314944c [fix][broker] Fix shadow topics cannot be consumed when
the entry is not cached (#23147)
8740314944c is described below
commit 8740314944cc97bb586b333ebf98762ffc9be5dc
Author: Yunze Xu <[email protected]>
AuthorDate: Thu Aug 15 11:57:52 2024 +0800
[fix][broker] Fix shadow topics cannot be consumed when the entry is not
cached (#23147)
For shadow topics, a `ReadOnlyLedgerHandle` is created to read messages
from the source topic when the entry is not cached. However, it leverages the
`readAsync` API that validates the `lastAddConfirmed` field (LAC). In
`ReadOnlyLedgerHandle`, this field could never be updated, so `readAsync` could
fail immediately. See `LedgerHandle#readAsync`:
```java
if (lastEntry > lastAddConfirmed) {
LOG.error("ReadAsync exception on ledgerId:{} firstEntry:{}
lastEntry:{} lastAddConfirmed:{}",
ledgerId, firstEntry, lastEntry, lastAddConfirmed);
return FutureUtils.exception(new BKReadException());
}
```
This bug is not exposed because:
1. `PulsarMockReadHandle` does not maintain a LAC field.
2. The case for cache miss is never tested.
Replace `readAsync` with `readUnconfirmedAsync` and compare the entry range
with the `ManagedLedger#getLastConfirmedEntry`. The managed ledger already
maintains a `lastConfirmedEntry` to limit the last entry. See
`ManagedLedgerImpl#internalReadFromLedger`:
```java
Position lastPosition = lastConfirmedEntry;
if (ledger.getId() == lastPosition.getLedgerId()) {
lastEntryInLedger = lastPosition.getEntryId();
```
Add `ShadowTopicRealBkTest` to cover two code changes
`RangeEntryCacheImpl#readFromStorage` and `EntryCache#asyncReadEntry`.
Exceptionally, compare the entry range with the LAC of a ledger handle when
it does not exist in the managed ledger. It's because
`ReadOnlyManagedLedgerImpl` could read a ledger in another managed ledger.
<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
- [ ] `doc` <!-- Your PR contains doc changes. -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update
later -->
- [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->
PR in forked repository: https://github.com/BewareMyPower/pulsar/pull/33
<!--
After opening this PR, the build in apache/pulsar will fail and
instructions will
be provided for opening a PR in the PR author's forked repository.
apache/pulsar pull requests should be first tested in your own fork since
the
apache/pulsar CI based on GitHub Actions has constrained resources and
quota.
GitHub Actions provides separate quota for pull requests that are executed
in
a forked repository.
The tests will be run in the forked repository until all PR review comments
have
been handled, the tests pass and the PR is approved by a reviewer.
-->
(cherry picked from commit 15b88d250818bada5c1a94f5c54ef7806f88a500)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 2 +
.../mledger/impl/cache/EntryCacheDisabled.java | 4 +-
.../mledger/impl/cache/RangeEntryCacheImpl.java | 4 +-
.../mledger/impl/cache/ReadEntryUtils.java | 54 ++++++
.../mledger/impl/EntryCacheManagerTest.java | 7 +-
.../bookkeeper/mledger/impl/EntryCacheTest.java | 186 +++++++++------------
.../mledger/impl/OffloadPrefixReadTest.java | 2 +-
.../service/persistent/ShadowTopicRealBkTest.java | 109 ++++++++++++
8 files changed, 252 insertions(+), 116 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index b91cefb1520..1b1bc03f4bc 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -4054,6 +4054,8 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
public static ManagedLedgerException
createManagedLedgerException(Throwable t) {
if (t instanceof org.apache.bookkeeper.client.api.BKException) {
return
createManagedLedgerException(((org.apache.bookkeeper.client.api.BKException)
t).getCode());
+ } else if (t instanceof ManagedLedgerException) {
+ return (ManagedLedgerException) t;
} else if (t instanceof CompletionException
&& !(t.getCause() instanceof CompletionException) /* check to
avoid stackoverlflow */) {
return createManagedLedgerException(t.getCause());
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java
index d1050e00628..64595dfe47e 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java
@@ -79,7 +79,7 @@ public class EntryCacheDisabled implements EntryCache {
@Override
public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry,
boolean isSlowestReader,
final AsyncCallbacks.ReadEntriesCallback
callback, Object ctx) {
- lh.readAsync(firstEntry, lastEntry).thenAcceptAsync(
+ ReadEntryUtils.readAsync(ml, lh, firstEntry,
lastEntry).thenAcceptAsync(
ledgerEntries -> {
List<Entry> entries = new ArrayList<>();
long totalSize = 0;
@@ -107,7 +107,7 @@ public class EntryCacheDisabled implements EntryCache {
@Override
public void asyncReadEntry(ReadHandle lh, PositionImpl position,
AsyncCallbacks.ReadEntryCallback callback,
Object ctx) {
- lh.readAsync(position.getEntryId(),
position.getEntryId()).whenCompleteAsync(
+ ReadEntryUtils.readAsync(ml, lh, position.getEntryId(),
position.getEntryId()).whenCompleteAsync(
(ledgerEntries, exception) -> {
if (exception != null) {
ml.invalidateLedgerHandle(lh);
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
index 27aec6f178e..21eb62e5a8c 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
@@ -248,7 +248,7 @@ public class RangeEntryCacheImpl implements EntryCache {
manager.mlFactoryMBean.recordCacheHit(cachedEntry.getLength());
callback.readEntryComplete(cachedEntry, ctx);
} else {
- lh.readAsync(position.getEntryId(),
position.getEntryId()).thenAcceptAsync(
+ ReadEntryUtils.readAsync(ml, lh, position.getEntryId(),
position.getEntryId()).thenAcceptAsync(
ledgerEntries -> {
try {
Iterator<LedgerEntry> iterator =
ledgerEntries.iterator();
@@ -428,7 +428,7 @@ public class RangeEntryCacheImpl implements EntryCache {
CompletableFuture<List<EntryImpl>> readFromStorage(ReadHandle lh,
long firstEntry, long
lastEntry, boolean shouldCacheEntry) {
final int entriesToRead = (int) (lastEntry - firstEntry) + 1;
- CompletableFuture<List<EntryImpl>> readResult =
lh.readAsync(firstEntry, lastEntry)
+ CompletableFuture<List<EntryImpl>> readResult =
ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry)
.thenApply(
ledgerEntries -> {
requireNonNull(ml.getName());
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java
new file mode 100644
index 00000000000..5cf5f053f0c
--- /dev/null
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl.cache;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+
+class ReadEntryUtils {
+
+ static CompletableFuture<LedgerEntries> readAsync(ManagedLedger ml,
ReadHandle handle, long firstEntry,
+ long lastEntry) {
+ if (ml.getOptionalLedgerInfo(handle.getId()).isEmpty()) {
+ // The read handle comes from another managed ledger, in this
case, we can only compare the entry range with
+ // the LAC of that read handle. Specifically, it happens when this
method is called by a
+ // ReadOnlyManagedLedgerImpl object.
+ return handle.readAsync(firstEntry, lastEntry);
+ }
+ // Compare the entry range with the lastConfirmedEntry maintained by
the managed ledger because the entry cache
+ // of `ShadowManagedLedgerImpl` reads entries via
`ReadOnlyLedgerHandle`, which never updates `lastAddConfirmed`
+ final var lastConfirmedEntry = ml.getLastConfirmedEntry();
+ if (lastConfirmedEntry == null) {
+ return CompletableFuture.failedFuture(new ManagedLedgerException(
+ "LastConfirmedEntry is null when reading ledger " +
handle.getId()));
+ }
+ if (handle.getId() > lastConfirmedEntry.getLedgerId()) {
+ return CompletableFuture.failedFuture(new
ManagedLedgerException("LastConfirmedEntry is "
+ + lastConfirmedEntry + " when reading ledger " +
handle.getId()));
+ }
+ if (handle.getId() == lastConfirmedEntry.getLedgerId() && lastEntry >
lastConfirmedEntry.getEntryId()) {
+ return CompletableFuture.failedFuture(new
ManagedLedgerException("LastConfirmedEntry is "
+ + lastConfirmedEntry + " when reading entry " +
lastEntry));
+ }
+ return handle.readUnconfirmedAsync(firstEntry, lastEntry);
+ }
+}
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
index 1ab3198498a..de8c6f5d7d0 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
@@ -28,6 +28,7 @@ import static org.testng.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -43,6 +44,7 @@ import
org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.impl.cache.EntryCache;
import org.apache.bookkeeper.mledger.impl.cache.EntryCacheDisabled;
import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -390,6 +392,9 @@ public class EntryCacheManagerTest extends
MockedBookKeeperTestCase {
EntryCache entryCache = cacheManager.getEntryCache(ml1);
final CountDownLatch counter = new CountDownLatch(1);
+ when(ml1.getLastConfirmedEntry()).thenReturn(new PositionImpl(1L, 1L));
+
when(ml1.getOptionalLedgerInfo(lh.getId())).thenReturn(Optional.of(mock(
+ MLDataFormats.ManagedLedgerInfo.LedgerInfo.class)));
entryCache.asyncReadEntry(lh, new PositionImpl(1L,1L), new
AsyncCallbacks.ReadEntryCallback() {
public void readEntryComplete(Entry entry, Object ctx) {
Assert.assertNotEquals(entry, null);
@@ -404,7 +409,7 @@ public class EntryCacheManagerTest extends
MockedBookKeeperTestCase {
}, null);
counter.await();
- verify(lh).readAsync(anyLong(), anyLong());
+ verify(lh).readUnconfirmedAsync(anyLong(), anyLong());
}
}
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java
index c8338798f27..3964f62e8e6 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java
@@ -25,14 +25,16 @@ import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
import io.netty.buffer.Unpooled;
-
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
-
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
import lombok.Cleanup;
import org.apache.bookkeeper.client.BKException.BKNoSuchLedgerExistsException;
import org.apache.bookkeeper.client.api.LedgerEntries;
@@ -45,8 +47,8 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.cache.EntryCache;
import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
-import org.testng.Assert;
import org.testng.annotations.Test;
public class EntryCacheTest extends MockedBookKeeperTestCase {
@@ -60,6 +62,8 @@ public class EntryCacheTest extends MockedBookKeeperTestCase {
when(ml.getExecutor()).thenReturn(executor);
when(ml.getMbean()).thenReturn(new ManagedLedgerMBeanImpl(ml));
when(ml.getConfig()).thenReturn(new ManagedLedgerConfig());
+ when(ml.getOptionalLedgerInfo(0L)).thenReturn(Optional.of(mock(
+ MLDataFormats.ManagedLedgerInfo.LedgerInfo.class)));
}
@Test(timeOut = 5000)
@@ -76,22 +80,13 @@ public class EntryCacheTest extends
MockedBookKeeperTestCase {
entryCache.insert(EntryImpl.create(0, i, data));
}
- final CountDownLatch counter = new CountDownLatch(1);
-
- entryCache.asyncReadEntry(lh, 0, 9, false, new ReadEntriesCallback() {
- public void readEntriesComplete(List<Entry> entries, Object ctx) {
- assertEquals(entries.size(), 10);
- entries.forEach(Entry::release);
- counter.countDown();
- }
-
- public void readEntriesFailed(ManagedLedgerException exception,
Object ctx) {
- Assert.fail("should not have failed");
- }
- }, null);
- counter.await();
+ when(ml.getLastConfirmedEntry()).thenReturn(new PositionImpl(0, 9));
+ final var entries = readEntry(entryCache, lh, 0, 9, false, null);
+ assertEquals(entries.size(), 10);
+ entries.forEach(Entry::release);
// Verify no entries were read from bookkeeper
+ verify(lh, never()).readUnconfirmedAsync(anyLong(), anyLong());
verify(lh, never()).readAsync(anyLong(), anyLong());
}
@@ -109,19 +104,9 @@ public class EntryCacheTest extends
MockedBookKeeperTestCase {
entryCache.insert(EntryImpl.create(0, i, data));
}
- final CountDownLatch counter = new CountDownLatch(1);
-
- entryCache.asyncReadEntry(lh, 0, 9, false, new ReadEntriesCallback() {
- public void readEntriesComplete(List<Entry> entries, Object ctx) {
- assertEquals(entries.size(), 10);
- counter.countDown();
- }
-
- public void readEntriesFailed(ManagedLedgerException exception,
Object ctx) {
- Assert.fail("should not have failed");
- }
- }, null);
- counter.await();
+ when(ml.getLastConfirmedEntry()).thenReturn(new PositionImpl(0, 9));
+ final var entries = readEntry(entryCache, lh, 0, 9, false, null);
+ assertEquals(entries.size(), 10);
}
@Test(timeOut = 5000)
@@ -138,19 +123,9 @@ public class EntryCacheTest extends
MockedBookKeeperTestCase {
entryCache.insert(EntryImpl.create(0, i, data));
}
- final CountDownLatch counter = new CountDownLatch(1);
-
- entryCache.asyncReadEntry(lh, 0, 9, false, new ReadEntriesCallback() {
- public void readEntriesComplete(List<Entry> entries, Object ctx) {
- assertEquals(entries.size(), 10);
- counter.countDown();
- }
-
- public void readEntriesFailed(ManagedLedgerException exception,
Object ctx) {
- Assert.fail("should not have failed");
- }
- }, null);
- counter.await();
+ when(ml.getLastConfirmedEntry()).thenReturn(new PositionImpl(0, 9));
+ final var entries = readEntry(entryCache, lh, 0, 9, false, null);
+ assertEquals(entries.size(), 10);
}
@Test(timeOut = 5000)
@@ -168,19 +143,9 @@ public class EntryCacheTest extends
MockedBookKeeperTestCase {
entryCache.insert(EntryImpl.create(0, 8, data));
entryCache.insert(EntryImpl.create(0, 9, data));
- final CountDownLatch counter = new CountDownLatch(1);
-
- entryCache.asyncReadEntry(lh, 0, 9, false, new ReadEntriesCallback() {
- public void readEntriesComplete(List<Entry> entries, Object ctx) {
- assertEquals(entries.size(), 10);
- counter.countDown();
- }
-
- public void readEntriesFailed(ManagedLedgerException exception,
Object ctx) {
- Assert.fail("should not have failed");
- }
- }, null);
- counter.await();
+ when(ml.getLastConfirmedEntry()).thenReturn(new PositionImpl(0, 9));
+ final var entries = readEntry(entryCache, lh, 0, 9, false, null);
+ assertEquals(entries.size(), 10);
}
@Test(timeOut = 5000)
@@ -198,19 +163,9 @@ public class EntryCacheTest extends
MockedBookKeeperTestCase {
entryCache.insert(EntryImpl.create(0, 5, data));
entryCache.insert(EntryImpl.create(0, 8, data));
- final CountDownLatch counter = new CountDownLatch(1);
-
- entryCache.asyncReadEntry(lh, 0, 9, false, new ReadEntriesCallback() {
- public void readEntriesComplete(List<Entry> entries, Object ctx) {
- assertEquals(entries.size(), 10);
- counter.countDown();
- }
-
- public void readEntriesFailed(ManagedLedgerException exception,
Object ctx) {
- Assert.fail("should not have failed");
- }
- }, null);
- counter.await();
+ when(ml.getLastConfirmedEntry()).thenReturn(new PositionImpl(0, 9));
+ final var entries = readEntry(entryCache, lh, 0, 9, false, null);
+ assertEquals(entries.size(), 10);
}
@Test
@@ -222,19 +177,25 @@ public class EntryCacheTest extends
MockedBookKeeperTestCase {
@Cleanup(value = "clear")
EntryCache entryCache = cacheManager.getEntryCache(ml);
- CompletableFuture<List<Entry>> cacheMissFutureEntries = new
CompletableFuture<>();
-
- entryCache.asyncReadEntry(lh, 0, 1, true, new ReadEntriesCallback() {
- public void readEntriesComplete(List<Entry> entries, Object ctx) {
- cacheMissFutureEntries.complete(entries);
- }
-
- public void readEntriesFailed(ManagedLedgerException exception,
Object ctx) {
- cacheMissFutureEntries.completeExceptionally(exception);
- }
- }, null);
-
- List<Entry> cacheMissEntries = cacheMissFutureEntries.get();
+ readEntry(entryCache, lh, 0, 1, true, e -> {
+ assertTrue(e instanceof ManagedLedgerException);
+ assertTrue(e.getMessage().contains("LastConfirmedEntry is null
when reading ledger 0"));
+ });
+
+ when(ml.getLastConfirmedEntry()).thenReturn(new PositionImpl(-1, -1));
+ readEntry(entryCache, lh, 0, 1, true, e -> {
+ assertTrue(e instanceof ManagedLedgerException);
+ assertTrue(e.getMessage().contains("LastConfirmedEntry is -1:-1
when reading ledger 0"));
+ });
+
+ when(ml.getLastConfirmedEntry()).thenReturn(new PositionImpl(0, 0));
+ readEntry(entryCache, lh, 0, 1, true, e -> {
+ assertTrue(e instanceof ManagedLedgerException);
+ assertTrue(e.getMessage().contains("LastConfirmedEntry is 0:0 when
reading entry 1"));
+ });
+
+ when(ml.getLastConfirmedEntry()).thenReturn(new PositionImpl(0, 1));
+ List<Entry> cacheMissEntries = readEntry(entryCache, lh, 0, 1, true,
null);
// Ensure first entry is 0 and
assertEquals(cacheMissEntries.size(), 2);
assertEquals(cacheMissEntries.get(0).getEntryId(), 0);
@@ -243,19 +204,7 @@ public class EntryCacheTest extends
MockedBookKeeperTestCase {
// Move the reader index to simulate consumption
cacheMissEntries.get(0).getDataBuffer().readerIndex(10);
- CompletableFuture<List<Entry>> cacheHitFutureEntries = new
CompletableFuture<>();
-
- entryCache.asyncReadEntry(lh, 0, 1, true, new ReadEntriesCallback() {
- public void readEntriesComplete(List<Entry> entries, Object ctx) {
- cacheHitFutureEntries.complete(entries);
- }
-
- public void readEntriesFailed(ManagedLedgerException exception,
Object ctx) {
- cacheHitFutureEntries.completeExceptionally(exception);
- }
- }, null);
-
- List<Entry> cacheHitEntries = cacheHitFutureEntries.get();
+ List<Entry> cacheHitEntries = readEntry(entryCache, lh, 0, 1, true,
null);
assertEquals(cacheHitEntries.get(0).getEntryId(), 0);
assertEquals(cacheHitEntries.get(0).getDataBuffer().readerIndex(), 0);
}
@@ -269,7 +218,7 @@ public class EntryCacheTest extends
MockedBookKeeperTestCase {
CompletableFuture<LedgerEntries> future = new
CompletableFuture<>();
future.completeExceptionally(new
BKNoSuchLedgerExistsException());
return future;
- }).when(lh).readAsync(anyLong(), anyLong());
+ }).when(lh).readUnconfirmedAsync(anyLong(), anyLong());
EntryCacheManager cacheManager = factory.getEntryCacheManager();
@Cleanup(value = "clear")
@@ -278,18 +227,9 @@ public class EntryCacheTest extends
MockedBookKeeperTestCase {
byte[] data = new byte[10];
entryCache.insert(EntryImpl.create(0, 2, data));
- final CountDownLatch counter = new CountDownLatch(1);
-
- entryCache.asyncReadEntry(lh, 0, 9, false, new ReadEntriesCallback() {
- public void readEntriesComplete(List<Entry> entries, Object ctx) {
- Assert.fail("should not complete");
- }
-
- public void readEntriesFailed(ManagedLedgerException exception,
Object ctx) {
- counter.countDown();
- }
- }, null);
- counter.await();
+ when(ml.getLastConfirmedEntry()).thenReturn(new PositionImpl(0, 9));
+ readEntry(entryCache, lh, 0, 9, false, e ->
+ assertTrue(e instanceof
ManagedLedgerException.LedgerNotExistException));
}
static ReadHandle getLedgerHandle() {
@@ -306,9 +246,35 @@ public class EntryCacheTest extends
MockedBookKeeperTestCase {
LedgerEntries ledgerEntries = mock(LedgerEntries.class);
doAnswer((invocation2) ->
entries.iterator()).when(ledgerEntries).iterator();
return CompletableFuture.completedFuture(ledgerEntries);
- }).when(lh).readAsync(anyLong(), anyLong());
+ }).when(lh).readUnconfirmedAsync(anyLong(), anyLong());
return lh;
}
+ private List<Entry> readEntry(EntryCache entryCache, ReadHandle lh, long
firstEntry, long lastEntry,
+ boolean shouldCacheEntry,
Consumer<Throwable> assertion)
+ throws InterruptedException {
+ final var future = new CompletableFuture<List<Entry>>();
+ entryCache.asyncReadEntry(lh, firstEntry, lastEntry, shouldCacheEntry,
new ReadEntriesCallback() {
+ @Override
+ public void readEntriesComplete(List<Entry> entries, Object ctx) {
+ future.complete(entries);
+ }
+
+ @Override
+ public void readEntriesFailed(ManagedLedgerException exception,
Object ctx) {
+ future.completeExceptionally(exception);
+ }
+ }, null);
+ try {
+ final var entries = future.get();
+ assertNull(assertion);
+ return entries;
+ } catch (ExecutionException e) {
+ if (assertion != null) {
+ assertion.accept(e.getCause());
+ }
+ return List.of();
+ }
+ }
}
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
index cd224e33e27..7201d76a6e5 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
@@ -313,7 +313,7 @@ public class OffloadPrefixReadTest extends
MockedBookKeeperTestCase {
@Override
public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long
firstEntry, long lastEntry) {
- return unsupported();
+ return readAsync(firstEntry, lastEntry);
}
@Override
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java
new file mode 100644
index 00000000000..9d810b06a7c
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicRealBkTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import com.google.common.collect.Lists;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.mledger.impl.ShadowManagedLedgerImpl;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.PortManager;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class ShadowTopicRealBkTest {
+
+ private static final String cluster = "test";
+ private final int zkPort = PortManager.nextLockedFreePort();
+ private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2,
zkPort, PortManager::nextLockedFreePort);
+ private PulsarService pulsar;
+ private PulsarAdmin admin;
+
+ @BeforeClass
+ public void setup() throws Exception {
+ bk.start();
+ final var config = new ServiceConfiguration();
+ config.setClusterName(cluster);
+ config.setAdvertisedAddress("localhost");
+ config.setBrokerServicePort(Optional.of(0));
+ config.setWebServicePort(Optional.of(0));
+ config.setMetadataStoreUrl("zk:localhost:" + zkPort);
+ pulsar = new PulsarService(config);
+ pulsar.start();
+ admin = pulsar.getAdminClient();
+ admin.clusters().createCluster("test",
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress())
+ .brokerServiceUrl(pulsar.getBrokerServiceUrl()).build());
+ admin.tenants().createTenant("public",
TenantInfo.builder().allowedClusters(Set.of(cluster)).build());
+ admin.namespaces().createNamespace("public/default");
+ }
+
+ @AfterClass(alwaysRun = true)
+ public void cleanup() throws Exception {
+ if (pulsar != null) {
+ pulsar.close();
+ }
+ bk.stop();
+ }
+
+ @Test
+ public void testReadFromStorage() throws Exception {
+ final var sourceTopic =
TopicName.get("test-read-from-source").toString();
+ final var shadowTopic = sourceTopic + "-shadow";
+
+ admin.topics().createNonPartitionedTopic(sourceTopic);
+ admin.topics().createShadowTopic(shadowTopic, sourceTopic);
+ admin.topics().setShadowTopics(sourceTopic,
Lists.newArrayList(shadowTopic));
+
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(()->{
+ final var sourcePersistentTopic = (PersistentTopic)
pulsar.getBrokerService()
+ .getTopicIfExists(sourceTopic).get().orElseThrow();
+ final var replicator = (ShadowReplicator)
sourcePersistentTopic.getShadowReplicators().get(shadowTopic);
+ Assert.assertNotNull(replicator);
+ Assert.assertEquals(String.valueOf(replicator.getState()),
"Started");
+ });
+
+ final var client = pulsar.getClient();
+ // When the message was sent, there is no cursor, so it will read from
the cache
+ final var producer = client.newProducer().topic(sourceTopic).create();
+ producer.send("message".getBytes());
+ // 1. Verify RangeEntryCacheImpl#readFromStorage
+ final var consumer =
client.newConsumer().topic(shadowTopic).subscriptionName("sub")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
+ final var msg = consumer.receive(5, TimeUnit.SECONDS);
+ Assert.assertNotNull(msg);
+ Assert.assertEquals(msg.getValue(), "message".getBytes());
+
+ // 2. Verify EntryCache#asyncReadEntry
+ final var shadowManagedLedger = ((PersistentTopic)
pulsar.getBrokerService().getTopicIfExists(shadowTopic).get()
+ .orElseThrow()).getManagedLedger();
+ Assert.assertTrue(shadowManagedLedger instanceof
ShadowManagedLedgerImpl);
+ shadowManagedLedger.getEarliestMessagePublishTimeInBacklog().get(3,
TimeUnit.SECONDS);
+ }
+}