This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a562cfd0583a8e455cc6a25e61d57a65aae7ec92
Author: ran <[email protected]>
AuthorDate: Wed Feb 10 05:08:01 2021 +0800

    add new method in ManagedCursor and ReadOnlyCursor to async read entries 
with max size bytes. (#9532)
    
    (cherry picked from commit 34fdb678da4d4008b27eb7d81e8b95707bdfc3ae)
---
 .../apache/bookkeeper/mledger/ManagedCursor.java   | 12 ++++++++
 .../apache/bookkeeper/mledger/ReadOnlyCursor.java  | 11 +++++++
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 10 +++++-
 .../mledger/impl/ManagedCursorContainerTest.java   |  6 ++++
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 36 ++++++++++++++++++++++
 .../pulsar/broker/admin/PersistentTopicsTest.java  |  6 ++--
 6 files changed, 77 insertions(+), 4 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index 7a7ac25..62ba829 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -104,6 +104,18 @@ public interface ManagedCursor {
      */
     void asyncReadEntries(int numberOfEntriesToRead, ReadEntriesCallback 
callback, Object ctx);
 
+
+    /**
+     * Asynchronously read entries from the ManagedLedger.
+     *
+     * @param numberOfEntriesToRead maximum number of entries to return
+     * @param maxSizeBytes          max size in bytes of the entries to return
+     * @param callback              callback object
+     * @param ctx                   opaque context
+     */
+    void asyncReadEntries(int numberOfEntriesToRead, long maxSizeBytes, 
ReadEntriesCallback callback,
+                          Object ctx);
+
     /**
      * Get 'N'th entry from the mark delete position in the cursor without 
updating any cursor positions.
      *
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ReadOnlyCursor.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ReadOnlyCursor.java
index f70a367..f74075f 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ReadOnlyCursor.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ReadOnlyCursor.java
@@ -50,6 +50,17 @@ public interface ReadOnlyCursor {
     void asyncReadEntries(int numberOfEntriesToRead, ReadEntriesCallback 
callback, Object ctx);
 
     /**
+     * Asynchronously read entries from the ManagedLedger.
+     *
+     * @param numberOfEntriesToRead maximum number of entries to return
+     * @param maxSizeBytes          max size in bytes of the entries to return
+     * @param callback              callback object
+     * @param ctx                   opaque context
+     */
+    void asyncReadEntries(int numberOfEntriesToRead, long maxSizeBytes, 
ReadEntriesCallback callback,
+                          Object ctx);
+
+    /**
      * Get the read position. This points to the next message to be read from 
the cursor.
      *
      * @return the read position
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 861b5ef..a0ba634 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -561,14 +561,22 @@ public class ManagedCursorImpl implements ManagedCursor {
     @Override
     public void asyncReadEntries(final int numberOfEntriesToRead, final 
ReadEntriesCallback callback,
             final Object ctx) {
+        asyncReadEntries(numberOfEntriesToRead, NO_MAX_SIZE_LIMIT, callback, 
ctx);
+    }
+
+    @Override
+    public void asyncReadEntries(int numberOfEntriesToRead, long maxSizeBytes, 
ReadEntriesCallback callback,
+                                 Object ctx) {
         checkArgument(numberOfEntriesToRead > 0);
         if (isClosed()) {
             callback.readEntriesFailed(new ManagedLedgerException("Cursor was 
already closed"), ctx);
             return;
         }
 
+        int numOfEntriesToRead = applyMaxSizeCap(numberOfEntriesToRead, 
maxSizeBytes);
+
         PENDING_READ_OPS_UPDATER.incrementAndGet(this);
-        OpReadEntry op = OpReadEntry.create(this, readPosition, 
numberOfEntriesToRead, callback, ctx);
+        OpReadEntry op = OpReadEntry.create(this, readPosition, 
numOfEntriesToRead, callback, ctx);
         ledger.asyncReadEntries(op);
     }
 
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index 5733671..f2766c2 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -82,6 +82,12 @@ public class ManagedCursorContainerTest {
         }
 
         @Override
+        public void asyncReadEntries(int numberOfEntriesToRead, long 
maxSizeBytes, ReadEntriesCallback callback,
+                                     Object ctx) {
+            callback.readEntriesComplete(null, ctx);
+        }
+
+        @Override
         public boolean hasMoreEntries() {
             return true;
         }
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 8bf0e6d..bcdbeca 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -462,6 +462,42 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
     }
 
     @Test(timeOut = 20000)
+    void testAsyncReadWithMaxSizeByte() throws Exception {
+        ManagedLedger ledger = factory.open("testAsyncReadWithMaxSizeByte");
+        ManagedCursor cursor = ledger.openCursor("c1");
+
+        for (int i = 0; i < 100; i++) {
+            ledger.addEntry(new byte[1024]);
+        }
+
+        // First time, since we don't have info, we'll get 1 single entry
+        readAndCheck(cursor, 10, 3 * 1024, 1);
+        // We should only return 3 entries, based on the max size
+        readAndCheck(cursor, 20, 3 * 1024, 3);
+        // If maxSize is < avg, we should get 1 entry
+        readAndCheck(cursor, 10, 500, 1);
+    }
+
+    private void readAndCheck(ManagedCursor cursor, int numEntriesToRead,
+                              long maxSizeBytes, int expectedNumRead) throws 
InterruptedException {
+        CountDownLatch counter = new CountDownLatch(1);
+        cursor.asyncReadEntries(numEntriesToRead, maxSizeBytes, new 
ReadEntriesCallback() {
+            @Override
+            public void readEntriesComplete(List<Entry> entries, Object ctx) {
+                Assert.assertEquals(entries.size(), expectedNumRead);
+                entries.forEach(e -> e.release());
+                counter.countDown();
+            }
+
+            @Override
+            public void readEntriesFailed(ManagedLedgerException exception, 
Object ctx) {
+                fail(exception.getMessage());
+            }
+        }, null);
+        counter.await();
+    }
+
+    @Test(timeOut = 20000)
     void markDeleteWithErrors() throws Exception {
         ManagedLedger ledger = factory.open("my_test_ledger");
         ManagedCursor cursor = ledger.openCursor("c1");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index c31b51a..32fd972 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -241,7 +241,7 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
         verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
 
-        TopicStats topicStats = persistentTopics.getStats(testTenant, 
testNamespace, testLocalTopicName, true, true);
+        TopicStats topicStats = persistentTopics.getStats(testTenant, 
testNamespace, testLocalTopicName, true, true, true);
         long msgBacklog = 
topicStats.subscriptions.get(SUB_EARLIEST).msgBacklog;
         System.out.println("Message back log for " + SUB_EARLIEST + " is :" + 
msgBacklog);
         Assert.assertEquals(msgBacklog, numberOfMessages);
@@ -254,7 +254,7 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
         responseCaptor = ArgumentCaptor.forClass(Response.class);
         verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
-        topicStats = persistentTopics.getStats(testTenant, testNamespace, 
testLocalTopicName, true, true);
+        topicStats = persistentTopics.getStats(testTenant, testNamespace, 
testLocalTopicName, true, true, true);
         msgBacklog = topicStats.subscriptions.get(SUB_LATEST).msgBacklog;
         System.out.println("Message back log for " + SUB_LATEST + " is :" + 
msgBacklog);
         Assert.assertEquals(msgBacklog, 0);
@@ -267,7 +267,7 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
         responseCaptor = ArgumentCaptor.forClass(Response.class);
         verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
-        topicStats = persistentTopics.getStats(testTenant, testNamespace, 
testLocalTopicName, true, true);
+        topicStats = persistentTopics.getStats(testTenant, testNamespace, 
testLocalTopicName, true, true, true);
         msgBacklog = 
topicStats.subscriptions.get(SUB_NONE_MESSAGE_ID).msgBacklog;
         System.out.println("Message back log for " + SUB_NONE_MESSAGE_ID + " 
is :" + msgBacklog);
         Assert.assertEquals(msgBacklog, 0);

Reply via email to