eolivelli commented on code in PR #17241:
URL: https://github.com/apache/pulsar/pull/17241#discussion_r956144230
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java:
##########
@@ -256,6 +276,130 @@ public void asyncReadEntry(ReadHandle lh, long
firstEntry, long lastEntry, boole
}
}
+ @AllArgsConstructor
+ private static final class ReadEntriesCallbackWithContext {
+ final ReadEntriesCallback callback;
+ final Object ctx;
+ final long startEntry;
+ final long endEntry;
+ }
+
+ private CachedPendingRead findBestCandidate(PendingReadKey key,
Map<PendingReadKey, CachedPendingRead> ledgerCache,
+ AtomicBoolean created) {
+ synchronized (ledgerCache) {
+ CachedPendingRead existing = ledgerCache.get(key);
+ if (existing != null) {
+ return existing;
+ }
+ for (Map.Entry<PendingReadKey, CachedPendingRead> entry :
ledgerCache.entrySet()) {
+ if (entry.getKey().includes(key)) {
+ return entry.getValue();
+ }
+ }
+ created.set(true);
+ CachedPendingRead newRead = new CachedPendingRead(key,
ledgerCache);
+ ledgerCache.put(key, newRead);
+ return newRead;
+ }
+ }
+
+ private class CachedPendingRead {
+ final PendingReadKey key;
+ final Map<PendingReadKey, CachedPendingRead> ledgerCache;
+ final List<ReadEntriesCallbackWithContext> callbacks = new
ArrayList<>(1);
+ boolean completed = false;
+
+ public CachedPendingRead(PendingReadKey key,
+ Map<PendingReadKey, CachedPendingRead>
ledgerCache) {
+ this.key = key;
+ this.ledgerCache = ledgerCache;
+ }
+
+ private List<EntryImpl> keepEntries(List<EntryImpl> list, long
startEntry, long endEntry) {
+ List<EntryImpl> result = new ArrayList<>((int) (endEntry -
startEntry));
+ for (EntryImpl entry : list) {
+ long entryId = entry.getEntryId();
+ if (startEntry <= entryId && entryId <= endEntry) {
+ result.add(entry);
+ } else {
+ entry.release();
+ }
+ }
+ return result;
+ }
+
+ public void attach(CompletableFuture<List<EntryImpl>> handle) {
+ // when the future is done remove this from the map
+ // new reads will go to a new instance
+ // this is required because we are going to do refcount management
+ // on the results of the callback
+ handle.whenComplete((___, error) -> {
+ synchronized (CachedPendingRead.this) {
+ completed = true;
+ synchronized (ledgerCache) {
+ ledgerCache.remove(key, this);
+ }
+ }
+ });
+
+ handle.thenAcceptAsync(entriesToReturn -> {
+ synchronized (CachedPendingRead.this) {
+ if (callbacks.size() == 1) {
+ ReadEntriesCallbackWithContext first =
callbacks.get(0);
+ if (first.startEntry == key.startEntry
+ && first.endEntry == key.endEntry) {
+ // perfect match, no copy, this is the most common
case
+ first.callback.readEntriesComplete((List)
entriesToReturn,
+ first.ctx);
+ } else {
+ first.callback.readEntriesComplete(
+ (List) keepEntries(entriesToReturn,
first.startEntry, first.endEntry),
+ first.ctx);
+ }
+ } else {
+ for (ReadEntriesCallbackWithContext callback :
callbacks) {
+ List<EntryImpl> copy = new
ArrayList<>(entriesToReturn.size());
+ long callbackStartEntry = callback.startEntry;
+ long callbackEndEntry = callback.endEntry;
+ for (EntryImpl entry : entriesToReturn) {
+ long entryId = entry.getEntryId();
+ if (callbackStartEntry <= entryId && entryId
<= callbackEndEntry) {
+ EntryImpl entryCopy =
EntryImpl.create(entry);
+ copy.add(entryCopy);
+ }
+ }
+ callback.callback.readEntriesComplete((List) copy,
callback.ctx);
+ }
+ for (EntryImpl entry : entriesToReturn) {
+ entry.release();
+ }
+ }
+ }
+ },
ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception -> {
+ synchronized (CachedPendingRead.this) {
+ for (ReadEntriesCallbackWithContext callback : callbacks) {
+ if (exception instanceof BKException
Review Comment:
sure, thanks. previous code had a "invalidateLedger" that has been moved
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]