This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new cfdf6fc  Estimate the size of a managed ledger suffix from a position 
(#1901)
cfdf6fc is described below

commit cfdf6fcbb96eea255a2f49de182411e6f8aa3544
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Mon Jun 4 21:51:35 2018 +0200

    Estimate the size of a managed ledger suffix from a position (#1901)
    
    This patch adds the ability to estimate the size of a suffix of a
    managed ledger by passing in a prefix. The size is an estimate. For
    complete ledgers after the position, the size can be calculated
    exactly, but for the first ledger, if it is partial, we can only get
    an estimate without having to make an RPC. The estimate for this
    ledger is based on the total size of that ledger and the number of
    entries in that ledger.
---
 .../apache/bookkeeper/mledger/ManagedCursor.java   |  7 ++++++
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  5 +++++
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 18 ++++++++++++++++
 .../mledger/impl/ManagedCursorContainerTest.java   |  5 +++++
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 25 +++++++++++++++++++++-
 5 files changed, 59 insertions(+), 1 deletion(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index 186a450..584a376 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -529,6 +529,13 @@ public interface ManagedCursor {
     int getTotalNonContiguousDeletedMessagesRange();
 
     /**
+     * Returns the estimated size of the unacknowledged backlog for this cursor
+     *
+     * @return the estimated size from the mark delete position of the cursor
+     */
+    long getEstimatedSizeSinceMarkDeletePosition();
+
+    /**
      * Returns cursor throttle mark-delete rate.
      *
      * @return
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 194e8c0..898ac8b 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -661,6 +661,11 @@ public class ManagedCursorImpl implements ManagedCursor {
     }
 
     @Override
+    public long getEstimatedSizeSinceMarkDeletePosition() {
+        return ledger.estimateBacklogFromPosition(markDeletePosition);
+    }
+
+    @Override
     public long getNumberOfEntriesInBacklog() {
         if (log.isDebugEnabled()) {
             log.debug("[{}] Consumer {} cursor ml-entries: {} -- 
deleted-counter: {} other counters: mdPos {} rdPos {}",
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 1a606b7..68bc801 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -930,6 +930,24 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         }
     }
 
+    long estimateBacklogFromPosition(PositionImpl pos) {
+        synchronized (this) {
+            LedgerInfo ledgerInfo = ledgers.get(pos.getLedgerId());
+            if (ledgerInfo == null) {
+                return getTotalSize(); // position no longer in managed 
ledger, so return total size
+            }
+            long sizeBeforePosLedger = ledgers.values().stream().filter(li -> 
li.getLedgerId() < pos.getLedgerId())
+                .mapToLong(li -> li.getSize()).sum();
+            long size = getTotalSize() - sizeBeforePosLedger;
+
+            if (pos.getLedgerId() == currentLedger.getId()) {
+                return size - consumedLedgerSize(currentLedgerSize, 
currentLedgerEntries, pos.getEntryId());
+            } else {
+                return size - consumedLedgerSize(ledgerInfo.getSize(), 
ledgerInfo.getEntries(), pos.getEntryId());
+            }
+        }
+    }
+
     private long consumedLedgerSize(long ledgerSize, long ledgerEntries, long 
consumedEntries) {
         if (ledgerEntries <= 0) {
             return 0;
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index c9021ae..ecf6acf 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -278,6 +278,11 @@ public class ManagedCursorContainerTest {
         }
 
         @Override
+        public long getEstimatedSizeSinceMarkDeletePosition() {
+            return 0L;
+        }
+
+        @Override
         public void setThrottleMarkDelete(double throttleMarkDelete) {
         }
 
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index da15f15..a705d15 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -2684,6 +2684,29 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         assertEquals(readPosition, cursor.getReadPosition());
         assertEquals(markDeletePosition, cursor.getMarkDeletedPosition());
     }
-    
+
+    @Test
+    public void testEstimatedUnackedSize() throws Exception {
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(10);
+        ManagedLedger ledger = factory.open("my_test_ledger", new 
ManagedLedgerConfig());
+
+        ManagedCursor cursor = ledger.openCursor("c1");
+
+        byte[] entryData = new byte[5];
+
+        // write 15 entries, saving position of 5th
+        for (int i = 0; i < 4; i++) { ledger.addEntry(entryData); }
+        Position deleteAt = ledger.addEntry(entryData);
+        for (int i = 0; i < 10; i++) { ledger.addEntry(entryData); }
+
+        assertEquals(cursor.getEstimatedSizeSinceMarkDeletePosition(), 15 * 
entryData.length);
+
+        cursor.markDelete(deleteAt);
+
+        // it's not an estimate if all entries are the same size
+        assertEquals(cursor.getEstimatedSizeSinceMarkDeletePosition(), 10 * 
entryData.length);
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(ManagedCursorTest.class);
 }

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.

Reply via email to