This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a562cfd0583a8e455cc6a25e61d57a65aae7ec92 Author: ran <[email protected]> AuthorDate: Wed Feb 10 05:08:01 2021 +0800 add new method in ManagedCursor and ReadOnlyCursor to async read entries with max size bytes. (#9532) (cherry picked from commit 34fdb678da4d4008b27eb7d81e8b95707bdfc3ae) --- .../apache/bookkeeper/mledger/ManagedCursor.java | 12 ++++++++ .../apache/bookkeeper/mledger/ReadOnlyCursor.java | 11 +++++++ .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 10 +++++- .../mledger/impl/ManagedCursorContainerTest.java | 6 ++++ .../bookkeeper/mledger/impl/ManagedCursorTest.java | 36 ++++++++++++++++++++++ .../pulsar/broker/admin/PersistentTopicsTest.java | 6 ++-- 6 files changed, 77 insertions(+), 4 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java index 7a7ac25..62ba829 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java @@ -104,6 +104,18 @@ public interface ManagedCursor { */ void asyncReadEntries(int numberOfEntriesToRead, ReadEntriesCallback callback, Object ctx); + + /** + * Asynchronously read entries from the ManagedLedger. + * + * @param numberOfEntriesToRead maximum number of entries to return + * @param maxSizeBytes max size in bytes of the entries to return + * @param callback callback object + * @param ctx opaque context + */ + void asyncReadEntries(int numberOfEntriesToRead, long maxSizeBytes, ReadEntriesCallback callback, + Object ctx); + /** * Get 'N'th entry from the mark delete position in the cursor without updating any cursor positions. * diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ReadOnlyCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ReadOnlyCursor.java index f70a367..f74075f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ReadOnlyCursor.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ReadOnlyCursor.java @@ -50,6 +50,17 @@ public interface ReadOnlyCursor { void asyncReadEntries(int numberOfEntriesToRead, ReadEntriesCallback callback, Object ctx); /** + * Asynchronously read entries from the ManagedLedger. + * + * @param numberOfEntriesToRead maximum number of entries to return + * @param maxSizeBytes max size in bytes of the entries to return + * @param callback callback object + * @param ctx opaque context + */ + void asyncReadEntries(int numberOfEntriesToRead, long maxSizeBytes, ReadEntriesCallback callback, + Object ctx); + + /** * Get the read position. This points to the next message to be read from the cursor. * * @return the read position diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 861b5ef..a0ba634 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -561,14 +561,22 @@ public class ManagedCursorImpl implements ManagedCursor { @Override public void asyncReadEntries(final int numberOfEntriesToRead, final ReadEntriesCallback callback, final Object ctx) { + asyncReadEntries(numberOfEntriesToRead, NO_MAX_SIZE_LIMIT, callback, ctx); + } + + @Override + public void asyncReadEntries(int numberOfEntriesToRead, long maxSizeBytes, ReadEntriesCallback callback, + Object ctx) { checkArgument(numberOfEntriesToRead > 0); if (isClosed()) { callback.readEntriesFailed(new ManagedLedgerException("Cursor was already closed"), ctx); return; } + int numOfEntriesToRead = applyMaxSizeCap(numberOfEntriesToRead, maxSizeBytes); + PENDING_READ_OPS_UPDATER.incrementAndGet(this); - OpReadEntry op = OpReadEntry.create(this, readPosition, numberOfEntriesToRead, callback, ctx); + OpReadEntry op = OpReadEntry.create(this, readPosition, numOfEntriesToRead, callback, ctx); ledger.asyncReadEntries(op); } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java index 5733671..f2766c2 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java @@ -82,6 +82,12 @@ public class ManagedCursorContainerTest { } @Override + public void asyncReadEntries(int numberOfEntriesToRead, long maxSizeBytes, ReadEntriesCallback callback, + Object ctx) { + callback.readEntriesComplete(null, ctx); + } + + @Override public boolean hasMoreEntries() { return true; } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 8bf0e6d..bcdbeca 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -462,6 +462,42 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { } @Test(timeOut = 20000) + void testAsyncReadWithMaxSizeByte() throws Exception { + ManagedLedger ledger = factory.open("testAsyncReadWithMaxSizeByte"); + ManagedCursor cursor = ledger.openCursor("c1"); + + for (int i = 0; i < 100; i++) { + ledger.addEntry(new byte[1024]); + } + + // First time, since we don't have info, we'll get 1 single entry + readAndCheck(cursor, 10, 3 * 1024, 1); + // We should only return 3 entries, based on the max size + readAndCheck(cursor, 20, 3 * 1024, 3); + // If maxSize is < avg, we should get 1 entry + readAndCheck(cursor, 10, 500, 1); + } + + private void readAndCheck(ManagedCursor cursor, int numEntriesToRead, + long maxSizeBytes, int expectedNumRead) throws InterruptedException { + CountDownLatch counter = new CountDownLatch(1); + cursor.asyncReadEntries(numEntriesToRead, maxSizeBytes, new ReadEntriesCallback() { + @Override + public void readEntriesComplete(List<Entry> entries, Object ctx) { + Assert.assertEquals(entries.size(), expectedNumRead); + entries.forEach(e -> e.release()); + counter.countDown(); + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + fail(exception.getMessage()); + } + }, null); + counter.await(); + } + + @Test(timeOut = 20000) void markDeleteWithErrors() throws Exception { ManagedLedger ledger = factory.open("my_test_ledger"); ManagedCursor cursor = ledger.openCursor("c1"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index c31b51a..32fd972 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -241,7 +241,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); - TopicStats topicStats = persistentTopics.getStats(testTenant, testNamespace, testLocalTopicName, true, true); + TopicStats topicStats = persistentTopics.getStats(testTenant, testNamespace, testLocalTopicName, true, true, true); long msgBacklog = topicStats.subscriptions.get(SUB_EARLIEST).msgBacklog; System.out.println("Message back log for " + SUB_EARLIEST + " is :" + msgBacklog); Assert.assertEquals(msgBacklog, numberOfMessages); @@ -254,7 +254,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { responseCaptor = ArgumentCaptor.forClass(Response.class); verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); - topicStats = persistentTopics.getStats(testTenant, testNamespace, testLocalTopicName, true, true); + topicStats = persistentTopics.getStats(testTenant, testNamespace, testLocalTopicName, true, true, true); msgBacklog = topicStats.subscriptions.get(SUB_LATEST).msgBacklog; System.out.println("Message back log for " + SUB_LATEST + " is :" + msgBacklog); Assert.assertEquals(msgBacklog, 0); @@ -267,7 +267,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { responseCaptor = ArgumentCaptor.forClass(Response.class); verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); - topicStats = persistentTopics.getStats(testTenant, testNamespace, testLocalTopicName, true, true); + topicStats = persistentTopics.getStats(testTenant, testNamespace, testLocalTopicName, true, true, true); msgBacklog = topicStats.subscriptions.get(SUB_NONE_MESSAGE_ID).msgBacklog; System.out.println("Message back log for " + SUB_NONE_MESSAGE_ID + " is :" + msgBacklog); Assert.assertEquals(msgBacklog, 0);
