This is an automated email from the ASF dual-hosted git repository. chenhang pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 8bf800068bd5a1ad926544ec908a1501d425e78c Author: Hang Chen <[email protected]> AuthorDate: Fri Sep 3 23:59:52 2021 +0800 Forbid to read other topic's data in managedLedger layer (#11912) * forbid to read other topic's data in managedLedger layer * format code * update exception type * fix test (cherry picked from commit a7bdc5e25778ada8c781e9b3f9cc4694e5fd3f58) --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 7 +++ .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 60 +++++++++++++++++++--- .../broker/admin/impl/PersistentTopicsBase.java | 7 --- .../pulsar/broker/admin/PersistentTopicsTest.java | 18 +++++-- 4 files changed, 75 insertions(+), 17 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index a1e93a8..5a070e7 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1821,6 +1821,13 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { if (log.isDebugEnabled()) { log.debug("[{}] Reading entry ledger {}: {}", name, position.getLedgerId(), position.getEntryId()); } + if (!ledgers.containsKey(position.getLedgerId())) { + log.error("[{}] Failed to get message with ledger {}:{} the ledgerId does not belong to this topic " + + "or has been deleted.", name, position.getLedgerId(), position.getEntryId()); + callback.readEntryFailed(new ManagedLedgerException.NonRecoverableLedgerException("Message not found, " + + "the ledgerId does not belong to this topic or has been deleted"), ctx); + return; + } if (position.getLedgerId() == currentLedger.getId()) { asyncReadEntry(currentLedger, position, callback, ctx); } else { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index ee9ff41..ba6e877 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -68,6 +68,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.AsyncCallback.AddCallback; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; @@ -118,16 +119,12 @@ import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.Stat; import org.awaitility.Awaitility; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.testng.Assert; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +@Slf4j public class ManagedLedgerTest extends MockedBookKeeperTestCase { - - private static final Logger log = LoggerFactory.getLogger(ManagedLedgerTest.class); - private static final Charset Encoding = Charsets.UTF_8; @DataProvider(name = "checkOwnershipFlag") @@ -3033,7 +3030,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { Assert.assertEquals(finalManagedLedger.getTotalSize(), 0); }); } - + @Test(timeOut = 20000) public void testAsyncTruncateLedgerRetention() throws Exception { ManagedLedgerConfig config = new ManagedLedgerConfig(); @@ -3207,4 +3204,55 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { ledger.internalTrimConsumedLedgers(Futures.NULL_PROMISE); verify(ledgerOffloader, times(1)).getOffloadPolicies(); } + + @Test(timeOut = 30000) + public void testReadOtherManagedLedgersEntry() throws Exception { + ManagedLedgerImpl managedLedgerA = (ManagedLedgerImpl) factory.open("my_test_ledger_a"); + ManagedLedgerImpl managedLedgerB = (ManagedLedgerImpl) factory.open("my_test_ledger_b"); + + PositionImpl pa = (PositionImpl) managedLedgerA.addEntry("dummy-entry-a".getBytes(Encoding)); + PositionImpl pb = (PositionImpl) managedLedgerB.addEntry("dummy-entry-b".getBytes(Encoding)); + + // read managedLegerA's entry using managedLedgerA + CompletableFuture<byte[]> completableFutureA = new CompletableFuture<>(); + managedLedgerA.asyncReadEntry(pa, new ReadEntryCallback() { + @Override + public void readEntryComplete(Entry entry, Object ctx) { + completableFutureA.complete(entry.getData()); + } + + @Override + public void readEntryFailed(ManagedLedgerException exception, Object ctx) { + completableFutureA.completeExceptionally(exception.getCause()); + } + }, null); + + assertEquals("dummy-entry-a".getBytes(Encoding), completableFutureA.get()); + + // read managedLedgerB's entry using managedLedgerA + CompletableFuture<byte[]> completableFutureB = new CompletableFuture<>(); + managedLedgerA.asyncReadEntry(pb, new ReadEntryCallback() { + @Override + public void readEntryComplete(Entry entry, Object ctx) { + completableFutureB.complete(entry.getData()); + } + + @Override + public void readEntryFailed(ManagedLedgerException exception, Object ctx) { + completableFutureB.completeExceptionally(exception); + } + }, null); + + try { + completableFutureB.get(); + Assert.fail(); + } catch (Exception e) { + assertEquals(e.getCause().getMessage(), + "Message not found, the ledgerId does not belong to this topic or has been deleted"); + } + + managedLedgerA.close(); + managedLedgerB.close(); + + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index c7edc77..f63d318 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -2298,13 +2298,6 @@ public class PersistentTopicsBase extends AdminResource { } PersistentTopic topic = (PersistentTopic) getTopicReference(topicName); ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger(); - if (null == ledger.getLedgerInfo(ledgerId).get()) { - log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}, " - + "the ledgerId does not belong to this topic.", - clientAppId(), ledgerId, entryId, topicName); - asyncResponse.resume(new RestException(Status.NOT_FOUND, - "Message not found, the ledgerId does not belong to this topic")); - } ledger.asyncReadEntry(new PositionImpl(ledgerId, entryId), new AsyncCallbacks.ReadEntryCallback() { @Override public void readEntryFailed(ManagedLedgerException exception, Object ctx) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index dd74400..2377b9c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -883,10 +883,20 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { Message<byte[]> message2 = admin.topics().getMessageById(topicName2, id2.getLedgerId(), id2.getEntryId()); Assert.assertEquals(message2.getData(), data2.getBytes()); - Message<byte[]> message3 = admin.topics().getMessageById(topicName2, id1.getLedgerId(), id1.getEntryId()); - Assert.assertNull(message3); + Message<byte[]> message3 = null; + try { + message3 = admin.topics().getMessageById(topicName2, id1.getLedgerId(), id1.getEntryId()); + Assert.fail(); + } catch (Exception e) { + Assert.assertNull(message3); + } - Message<byte[]> message4 = admin.topics().getMessageById(topicName1, id2.getLedgerId(), id2.getEntryId()); - Assert.assertNull(message4); + Message<byte[]> message4 = null; + try { + message4 = admin.topics().getMessageById(topicName1, id2.getLedgerId(), id2.getEntryId()); + Assert.fail(); + } catch (Exception e) { + Assert.assertNull(message4); + } } }
