This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new cfdf6fc Estimate the size of a managed ledger suffix from a position (#1901) cfdf6fc is described below commit cfdf6fcbb96eea255a2f49de182411e6f8aa3544 Author: Ivan Kelly <iv...@apache.org> AuthorDate: Mon Jun 4 21:51:35 2018 +0200 Estimate the size of a managed ledger suffix from a position (#1901) This patch adds the ability to estimate the size of a suffix of a managed ledger by passing in a prefix. The size is an estimate. For complete ledgers after the position, the size can be calculated exactly, but for the first ledger, if it is partial, we can only get an estimate without having to make an RPC. The estimate for this ledger is based on the total size of that ledger and the number of entries in that ledger. --- .../apache/bookkeeper/mledger/ManagedCursor.java | 7 ++++++ .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 5 +++++ .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 18 ++++++++++++++++ .../mledger/impl/ManagedCursorContainerTest.java | 5 +++++ .../bookkeeper/mledger/impl/ManagedCursorTest.java | 25 +++++++++++++++++++++- 5 files changed, 59 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java index 186a450..584a376 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java @@ -529,6 +529,13 @@ public interface ManagedCursor { int getTotalNonContiguousDeletedMessagesRange(); /** + * Returns the estimated size of the unacknowledged backlog for this cursor + * + * @return the estimated size from the mark delete position of the cursor + */ + long getEstimatedSizeSinceMarkDeletePosition(); + + /** * Returns cursor throttle mark-delete rate. * * @return diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 194e8c0..898ac8b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -661,6 +661,11 @@ public class ManagedCursorImpl implements ManagedCursor { } @Override + public long getEstimatedSizeSinceMarkDeletePosition() { + return ledger.estimateBacklogFromPosition(markDeletePosition); + } + + @Override public long getNumberOfEntriesInBacklog() { if (log.isDebugEnabled()) { log.debug("[{}] Consumer {} cursor ml-entries: {} -- deleted-counter: {} other counters: mdPos {} rdPos {}", diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 1a606b7..68bc801 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -930,6 +930,24 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } } + long estimateBacklogFromPosition(PositionImpl pos) { + synchronized (this) { + LedgerInfo ledgerInfo = ledgers.get(pos.getLedgerId()); + if (ledgerInfo == null) { + return getTotalSize(); // position no longer in managed ledger, so return total size + } + long sizeBeforePosLedger = ledgers.values().stream().filter(li -> li.getLedgerId() < pos.getLedgerId()) + .mapToLong(li -> li.getSize()).sum(); + long size = getTotalSize() - sizeBeforePosLedger; + + if (pos.getLedgerId() == currentLedger.getId()) { + return size - consumedLedgerSize(currentLedgerSize, currentLedgerEntries, pos.getEntryId()); + } else { + return size - consumedLedgerSize(ledgerInfo.getSize(), ledgerInfo.getEntries(), pos.getEntryId()); + } + } + } + private long consumedLedgerSize(long ledgerSize, long ledgerEntries, long consumedEntries) { if (ledgerEntries <= 0) { return 0; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java index c9021ae..ecf6acf 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java @@ -278,6 +278,11 @@ public class ManagedCursorContainerTest { } @Override + public long getEstimatedSizeSinceMarkDeletePosition() { + return 0L; + } + + @Override public void setThrottleMarkDelete(double throttleMarkDelete) { } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index da15f15..a705d15 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -2684,6 +2684,29 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { assertEquals(readPosition, cursor.getReadPosition()); assertEquals(markDeletePosition, cursor.getMarkDeletedPosition()); } - + + @Test + public void testEstimatedUnackedSize() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(10); + ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig()); + + ManagedCursor cursor = ledger.openCursor("c1"); + + byte[] entryData = new byte[5]; + + // write 15 entries, saving position of 5th + for (int i = 0; i < 4; i++) { ledger.addEntry(entryData); } + Position deleteAt = ledger.addEntry(entryData); + for (int i = 0; i < 10; i++) { ledger.addEntry(entryData); } + + assertEquals(cursor.getEstimatedSizeSinceMarkDeletePosition(), 15 * entryData.length); + + cursor.markDelete(deleteAt); + + // it's not an estimate if all entries are the same size + assertEquals(cursor.getEstimatedSizeSinceMarkDeletePosition(), 10 * entryData.length); + } + private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class); } -- To stop receiving notification emails like this one, please contact si...@apache.org.