merlimat commented on code in PR #24623: URL: https://github.com/apache/pulsar/pull/24623#discussion_r2306184644
########## managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeCacheEntryWrapper.java: ########## @@ -150,12 +154,43 @@ <R> R withWriteLock(Function<RangeCacheEntryWrapper, R> function) { } } + <R> R withOptimisticReadLock(Function<RangeCacheEntryWrapper, R> function) { Review Comment: This seems to be unused. Nit: we should also make clear that the passed function can be invoked > 1 time. ########## managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ReferenceCountedEntry.java: ########## @@ -24,5 +24,13 @@ * An Entry that is also reference counted. */ public interface ReferenceCountedEntry extends Entry, ReferenceCounted { + EntryReadCountHandler getReadCountHandler(); + default boolean hasExpectedReads() { Review Comment: couldn't this method be encapsulated into the `EntryReadCountHandler` interface? We could still call it like : ```java entry.getReadCountHandler().hasExpectedReads(); ``` ########## managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryReadCountHandlerImpl.java: ########## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl; + +import com.google.common.annotations.VisibleForTesting; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import org.apache.bookkeeper.mledger.EntryReadCountHandler; + +public class EntryReadCountHandlerImpl implements EntryReadCountHandler { + private static final AtomicIntegerFieldUpdater<EntryReadCountHandlerImpl> expectedReadCountUpdater = Review Comment: nit: should this be `EXPECTED_READ_COUNT_UPDATER`? ########## managed-ledger/src/main/java/org/apache/bookkeeper/mledger/Entry.java: ########## @@ -67,6 +67,17 @@ public interface Entry { */ boolean release(); + /** + * Managed Ledger implementations of EntryImpl should implement this method to return the read count handler + * associated with the entry. + * This handler is used to track how many times the entry has been read and to manage + * the eviction of entries from the broker cache based on their expected read count. + * @return + */ + default EntryReadCountHandler getReadCountHandler() { Review Comment: This seems to be defined in the RefCountedEntry interface as well. Are these 2 interfaces not part of the same hierarchy? ########## pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java: ########## @@ -446,17 +446,21 @@ private CompletableFuture<Channel> connectToAddress(InetSocketAddress logicalAdd .thenCompose(ch -> channelInitializerHandler.initializeClientCnx(ch, logicalAddress, unresolvedPhysicalAddress)) - .thenCompose(channel -> toCompletableFuture(channel.connect(physicalAddress))); + .thenCompose(channel -> connectToPhysicalAddress(channel, physicalAddress)); Review Comment: Is this change strictly needed for pip-430, or could be moved to separate PR? ########## managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeCacheEntryWrapper.java: ########## @@ -150,12 +154,43 @@ <R> R withWriteLock(Function<RangeCacheEntryWrapper, R> function) { } } + <R> R withOptimisticReadLock(Function<RangeCacheEntryWrapper, R> function) { + long stamp = lock.tryOptimisticRead(); + R result = function.apply(this); + if (!lock.validate(stamp)) { + stamp = lock.readLock(); + try { + result = function.apply(this); + } finally { + lock.unlockRead(stamp); + } + } + return result; + } + + <R> R withReadLock(Function<RangeCacheEntryWrapper, R> function) { Review Comment: This too, seems not be used. ########## managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryReadCountHandlerImpl.java: ########## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl; + +import com.google.common.annotations.VisibleForTesting; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import org.apache.bookkeeper.mledger.EntryReadCountHandler; + +public class EntryReadCountHandlerImpl implements EntryReadCountHandler { + private static final AtomicIntegerFieldUpdater<EntryReadCountHandlerImpl> expectedReadCountUpdater = + AtomicIntegerFieldUpdater.newUpdater(EntryReadCountHandlerImpl.class, "expectedReadCount"); + + private volatile int expectedReadCount; + + private EntryReadCountHandlerImpl(int expectedReadCount) { + this.expectedReadCount = expectedReadCount; + } + + public int getExpectedReadCount() { + return expectedReadCount; + } + + @Override + public void incrementExpectedReadCount() { + expectedReadCountUpdater.incrementAndGet(this); + } + + @Override + public void markRead() { + expectedReadCountUpdater.decrementAndGet(this); + } + + /** + * Creates an instance of EntryReadCountHandlerImpl if the expected read count is greater than 0. + * If the expected read count is 0 or less, it returns null. + * @param expectedReadCount the expected read count for the entry + * @return an instance of EntryReadCountHandlerImpl or null + */ + public static EntryReadCountHandlerImpl maybeCreate(int expectedReadCount) { + return expectedReadCount > 0 ? new EntryReadCountHandlerImpl(expectedReadCount) : null; + } + + @VisibleForTesting + public void setExpectedReadCount(int expectedReadCount) { + this.expectedReadCount = expectedReadCount; + } + + @Override + public String toString() { Review Comment: maybe use `@ToString()` on the class? ########## managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ActiveManagedCursorContainerImpl.java: ########## @@ -0,0 +1,836 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl; + +import static java.util.Objects.requireNonNull; +import com.google.common.annotations.VisibleForTesting; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.StampedLock; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.Position; +import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.commons.lang3.tuple.Pair; + +/** + * Implementation of {@link ActiveManagedCursorContainer} that tracks active cursors for cache eviction purposes. + * This implementation is optimized for the use with cacheEvictionByExpectedReadCount. It doesn't implement + * the {@link #cursorUpdated(ManagedCursor, Position)} method, as it is not needed for this use case. Cursors + * are updated using the {@link #updateCursor(ManagedCursor, Position)} method instead. This allows lazy updates + * to track the ordering of cursors and their positions without the need to immediately reorder the list of cursors + * on every cursor update. The cacheEvictionByExpectedReadCount use case will only need to know the number of cursors + * and to be able to know how many cursors are at the same position or before a given position when a backlogged read + * is performed. When cursors are all performing tailing reads, the ordering of cursors is not important and can be + * updated lazily when needed. + */ +@Slf4j +public class ActiveManagedCursorContainerImpl implements ActiveManagedCursorContainer { + private static class Node { + final ManagedCursor cursor; + Position position; + Position pendingPosition; + boolean pendingRemove = false; + MutableInt numberOfCursorsAtSamePositionOrBefore; Review Comment: Couldn't this be just and `int`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org