This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 4a2ca66  [Branch-2.7]Forbid to read other topic's data in 
managedLedger layer (#11913)
4a2ca66 is described below

commit 4a2ca6679a3080a69dba83ad6388fd25e662166a
Author: Hang Chen <[email protected]>
AuthorDate: Thu Sep 23 18:06:11 2021 +0800

    [Branch-2.7]Forbid to read other topic's data in managedLedger layer 
(#11913)
    
    * forbid to read other topic's data in managedLedger layer
    
    * update exception type
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  7 +++
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 51 ++++++++++++++++++++++
 2 files changed, 58 insertions(+)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 5a5fe49..44e9d67 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1657,6 +1657,13 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         if (log.isDebugEnabled()) {
             log.debug("[{}] Reading entry ledger {}: {}", name, 
position.getLedgerId(), position.getEntryId());
         }
+        if (!ledgers.containsKey(position.getLedgerId())) {
+            log.error("[{}] Failed to get message with ledger {}:{} the 
ledgerId does not belong to this topic.",
+                name, position.getLedgerId(), position.getEntryId());
+            callback.readEntryFailed(new 
ManagedLedgerException.NonRecoverableLedgerException("Message not found, "
+                + "the ledgerId does not belong to this topic or has been 
deleted"), ctx);
+            return;
+        }
         if (position.getLedgerId() == currentLedger.getId()) {
             asyncReadEntry(currentLedger, position, callback, ctx);
         } else {
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 13fd4db..56f6ca3 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -2966,4 +2966,55 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
         cursor3.close();
         ledger.close();
     }
+
+    @Test(timeOut = 30000)
+    public void testReadOtherManagedLedgersEntry() throws Exception {
+        ManagedLedgerImpl managedLedgerA = (ManagedLedgerImpl) 
factory.open("my_test_ledger_a");
+        ManagedLedgerImpl managedLedgerB = (ManagedLedgerImpl) 
factory.open("my_test_ledger_b");
+
+        PositionImpl pa = (PositionImpl) 
managedLedgerA.addEntry("dummy-entry-a".getBytes(Encoding));
+        PositionImpl pb = (PositionImpl) 
managedLedgerB.addEntry("dummy-entry-b".getBytes(Encoding));
+
+        // read managedLegerA's entry using managedLedgerA
+        CompletableFuture<byte[]> completableFutureA = new 
CompletableFuture<>();
+        managedLedgerA.asyncReadEntry(pa, new ReadEntryCallback() {
+            @Override
+            public void readEntryComplete(Entry entry, Object ctx) {
+                completableFutureA.complete(entry.getData());
+            }
+
+            @Override
+            public void readEntryFailed(ManagedLedgerException exception, 
Object ctx) {
+                completableFutureA.completeExceptionally(exception.getCause());
+            }
+        }, null);
+
+        assertEquals("dummy-entry-a".getBytes(Encoding), 
completableFutureA.get());
+
+        // read managedLedgerB's entry using managedLedgerA
+        CompletableFuture<byte[]> completableFutureB = new 
CompletableFuture<>();
+        managedLedgerA.asyncReadEntry(pb, new ReadEntryCallback() {
+            @Override
+            public void readEntryComplete(Entry entry, Object ctx) {
+                completableFutureB.complete(entry.getData());
+            }
+
+            @Override
+            public void readEntryFailed(ManagedLedgerException exception, 
Object ctx) {
+                completableFutureB.completeExceptionally(exception);
+            }
+        }, null);
+
+        try {
+            completableFutureB.get();
+            Assert.fail();
+        } catch (Exception e) {
+            assertEquals(e.getCause().getMessage(),
+                "Message not found, the ledgerId does not belong to this topic 
or has been deleted");
+        }
+
+        managedLedgerA.close();
+        managedLedgerB.close();
+
+    }
 }

Reply via email to