This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 4a2ca66 [Branch-2.7]Forbid to read other topic's data in
managedLedger layer (#11913)
4a2ca66 is described below
commit 4a2ca6679a3080a69dba83ad6388fd25e662166a
Author: Hang Chen <[email protected]>
AuthorDate: Thu Sep 23 18:06:11 2021 +0800
[Branch-2.7]Forbid to read other topic's data in managedLedger layer
(#11913)
* forbid to read other topic's data in managedLedger layer
* update exception type
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 7 +++
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 51 ++++++++++++++++++++++
2 files changed, 58 insertions(+)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 5a5fe49..44e9d67 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1657,6 +1657,13 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
if (log.isDebugEnabled()) {
log.debug("[{}] Reading entry ledger {}: {}", name,
position.getLedgerId(), position.getEntryId());
}
+ if (!ledgers.containsKey(position.getLedgerId())) {
+ log.error("[{}] Failed to get message with ledger {}:{} the
ledgerId does not belong to this topic.",
+ name, position.getLedgerId(), position.getEntryId());
+ callback.readEntryFailed(new
ManagedLedgerException.NonRecoverableLedgerException("Message not found, "
+ + "the ledgerId does not belong to this topic or has been
deleted"), ctx);
+ return;
+ }
if (position.getLedgerId() == currentLedger.getId()) {
asyncReadEntry(currentLedger, position, callback, ctx);
} else {
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 13fd4db..56f6ca3 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -2966,4 +2966,55 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
cursor3.close();
ledger.close();
}
+
+ @Test(timeOut = 30000)
+ public void testReadOtherManagedLedgersEntry() throws Exception {
+ ManagedLedgerImpl managedLedgerA = (ManagedLedgerImpl)
factory.open("my_test_ledger_a");
+ ManagedLedgerImpl managedLedgerB = (ManagedLedgerImpl)
factory.open("my_test_ledger_b");
+
+ PositionImpl pa = (PositionImpl)
managedLedgerA.addEntry("dummy-entry-a".getBytes(Encoding));
+ PositionImpl pb = (PositionImpl)
managedLedgerB.addEntry("dummy-entry-b".getBytes(Encoding));
+
+ // read managedLegerA's entry using managedLedgerA
+ CompletableFuture<byte[]> completableFutureA = new
CompletableFuture<>();
+ managedLedgerA.asyncReadEntry(pa, new ReadEntryCallback() {
+ @Override
+ public void readEntryComplete(Entry entry, Object ctx) {
+ completableFutureA.complete(entry.getData());
+ }
+
+ @Override
+ public void readEntryFailed(ManagedLedgerException exception,
Object ctx) {
+ completableFutureA.completeExceptionally(exception.getCause());
+ }
+ }, null);
+
+ assertEquals("dummy-entry-a".getBytes(Encoding),
completableFutureA.get());
+
+ // read managedLedgerB's entry using managedLedgerA
+ CompletableFuture<byte[]> completableFutureB = new
CompletableFuture<>();
+ managedLedgerA.asyncReadEntry(pb, new ReadEntryCallback() {
+ @Override
+ public void readEntryComplete(Entry entry, Object ctx) {
+ completableFutureB.complete(entry.getData());
+ }
+
+ @Override
+ public void readEntryFailed(ManagedLedgerException exception,
Object ctx) {
+ completableFutureB.completeExceptionally(exception);
+ }
+ }, null);
+
+ try {
+ completableFutureB.get();
+ Assert.fail();
+ } catch (Exception e) {
+ assertEquals(e.getCause().getMessage(),
+ "Message not found, the ledgerId does not belong to this topic
or has been deleted");
+ }
+
+ managedLedgerA.close();
+ managedLedgerB.close();
+
+ }
}