codelipenghui commented on code in PR #15955: URL: https://github.com/apache/pulsar/pull/15955#discussion_r931762130
########## managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/SharedEntryCacheImpl.java: ########## @@ -0,0 +1,216 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl.cache; + +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException; +import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.api.BKException; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.impl.EntryImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor; +import org.apache.commons.lang3.tuple.Pair; + +@Slf4j +class SharedEntryCacheImpl implements EntryCache { + + private final SharedEntryCacheManagerImpl entryCacheManager; + private final ManagedLedgerImpl ml; + private final ManagedLedgerInterceptor interceptor; + + SharedEntryCacheImpl(ManagedLedgerImpl ml, SharedEntryCacheManagerImpl entryCacheManager) { + this.ml = ml; + this.entryCacheManager = entryCacheManager; + this.interceptor = ml.getManagedLedgerInterceptor(); + } + + @Override + public String getName() { + return ml.getName(); + } + + @Override + public boolean insert(EntryImpl entry) { + return entryCacheManager.insert(entry); + } + + @Override + public void invalidateEntries(PositionImpl lastPosition) { + // No-Op. The cache invalidation is based only on rotating the segment buffers + } + + @Override + public void invalidateEntriesBeforeTimestamp(long timestamp) { + // No-Op. The cache invalidation is based only on rotating the segment buffers + } + + @Override + public void invalidateAllEntries(long ledgerId) { + // No-Op. The cache invalidation is based only on rotating the segment buffers + } + + @Override + public void clear() { + // No-Op. The cache invalidation is based only on rotating the segment buffers + } + + private static final Pair<Integer, Long> NO_EVICTION = Pair.of(0, 0L); + + @Override + public Pair<Integer, Long> evictEntries(long sizeToFree) { + return NO_EVICTION; + } + + @Override + public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader, + AsyncCallbacks.ReadEntriesCallback callback, Object ctx) { + final long ledgerId = lh.getId(); + final int entriesToRead = (int) (lastEntry - firstEntry) + 1; + + if (log.isDebugEnabled()) { + log.debug("[{}] Reading entries range ledger {}: {} to {}", ml.getName(), ledgerId, firstEntry, lastEntry); + } + + List<Entry> cachedEntries = new ArrayList<>(entriesToRead); + long totalCachedSize = entryCacheManager.getRange(ledgerId, firstEntry, lastEntry, cachedEntries); + + if (cachedEntries.size() == entriesToRead) { + // All entries found in cache + entryCacheManager.getFactoryMBean().recordCacheHits(entriesToRead, totalCachedSize); + if (log.isDebugEnabled()) { + log.debug("[{}] Ledger {} -- Found in cache entries: {}-{}", ml.getName(), ledgerId, firstEntry, + lastEntry); + } + + callback.readEntriesComplete(cachedEntries, ctx); + + } else { + if (!cachedEntries.isEmpty()) { + cachedEntries.forEach(entry -> entry.release()); + } Review Comment: Looks like we are safe to return part of the data from the cache? I'm not sure if I missed something, a little waste of resources to skip a partially hit cache data. The old implementation also follows this way, so we can also use a separate PR to improve this part if possible. ########## managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/SharedEntryCacheManagerImpl.java: ########## @@ -0,0 +1,218 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl.cache; + +import io.netty.buffer.ByteBuf; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.locks.StampedLock; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; +import org.apache.bookkeeper.mledger.impl.EntryImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryMBeanImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; + +@Slf4j +public class SharedEntryCacheManagerImpl implements EntryCacheManager { + + private final ManagedLedgerFactoryConfig config; + private final ManagedLedgerFactoryMBeanImpl factoryMBean; + private final List<SharedCacheSegment> segments = new ArrayList<>(); + private int currentSegmentIdx = 0; + private final int segmentSize; + private final int segmentsCount; + + private final StampedLock lock = new StampedLock(); + + private static final int DEFAULT_MAX_SEGMENT_SIZE = 1 * 1024 * 1024 * 1024; + + public SharedEntryCacheManagerImpl(ManagedLedgerFactoryImpl factory) { + this.config = factory.getConfig(); + this.factoryMBean = factory.getMbean(); + long maxCacheSize = config.getMaxCacheSize(); + if (maxCacheSize > 0) { + this.segmentsCount = Math.max(2, (int) (maxCacheSize / DEFAULT_MAX_SEGMENT_SIZE)); + this.segmentSize = (int) (maxCacheSize / segmentsCount); + + for (int i = 0; i < segmentsCount; i++) { + if (config.isCopyEntriesInCache()) { + segments.add(new SharedCacheSegmentBufferCopy(segmentSize)); + } else { + segments.add(new SharedCacheSegmentBufferRefCount(segmentSize)); + } + } + } else { + this.segmentsCount = 0; + this.segmentSize = 0; + } + } + + ManagedLedgerFactoryMBeanImpl getFactoryMBean() { + return factoryMBean; + } + + @Override + public EntryCache getEntryCache(ManagedLedgerImpl ml) { + if (getMaxSize() > 0) { + return new SharedEntryCacheImpl(ml, this); + } else { + return new EntryCacheDisabled(ml); + } + } + + @Override + public void removeEntryCache(String name) { + // no-op + } + + @Override + public long getSize() { + long totalSize = 0; + for (int i = 0; i < segmentsCount; i++) { + totalSize += segments.get(i).getSize(); + } + return totalSize; + } + + @Override + public long getMaxSize() { + return config.getMaxCacheSize(); + } + + @Override + public void clear() { + segments.forEach(SharedCacheSegment::clear); + } + + @Override + public void close() { + segments.forEach(SharedCacheSegment::close); + } + + @Override + public void updateCacheSizeAndThreshold(long maxSize) { + + } + + @Override + public void updateCacheEvictionWatermark(double cacheEvictionWatermark) { + // No-Op. We don't use the cache eviction watermark in this implementation + } + + @Override + public double getCacheEvictionWatermark() { + return config.getCacheEvictionWatermark(); + } + + boolean insert(EntryImpl entry) { + int entrySize = entry.getLength(); + + if (entrySize > segmentSize) { + log.debug("entrySize {} > segmentSize {}, skip update read cache!", entrySize, segmentSize); + return false; + } + + long stamp = lock.readLock(); + try { + SharedCacheSegment s = segments.get(currentSegmentIdx); + + if (s.insert(entry.getLedgerId(), entry.getEntryId(), entry.getDataBuffer())) { + return true; + } + } finally { + lock.unlockRead(stamp); + } + + // We could not insert in segment, we to get the write lock and roll-over to + // next segment + stamp = lock.writeLock(); + + try { + SharedCacheSegment segment = segments.get(currentSegmentIdx); + + if (segment.insert(entry.getLedgerId(), entry.getEntryId(), entry.getDataBuffer())) { + return true; + } + + // Roll to next segment + currentSegmentIdx = (currentSegmentIdx + 1) % segmentsCount; + segment = segments.get(currentSegmentIdx); + segment.clear(); + return segment.insert(entry.getLedgerId(), entry.getEntryId(), entry.getDataBuffer()); + } finally { + lock.unlockWrite(stamp); + } + } + + EntryImpl get(long ledgerId, long entryId) { + long stamp = lock.readLock(); + + try { + // We need to check all the segments, starting from the current one and looking + // backward to minimize the checks for recently inserted entries + for (int i = 0; i < segmentsCount; i++) { + int segmentIdx = (currentSegmentIdx + (segmentsCount - i)) % segmentsCount; + + ByteBuf res = segments.get(segmentIdx).get(ledgerId, entryId); + if (res != null) { + return EntryImpl.create(ledgerId, entryId, res); + } + } + } finally { + lock.unlockRead(stamp); + } + + return null; + } + + long getRange(long ledgerId, long firstEntryId, long lastEntryId, List<Entry> results) { + long totalSize = 0; + long stamp = lock.readLock(); + + try { + // We need to check all the segments, starting from the current one and looking + // backward to minimize the checks for recently inserted entries + long entryId = firstEntryId; + for (int i = 0; i < segmentsCount; i++) { + int segmentIdx = (currentSegmentIdx + (segmentsCount - i)) % segmentsCount; + SharedCacheSegment s = segments.get(segmentIdx); + + for (; entryId <= lastEntryId; entryId++) { + ByteBuf res = s.get(ledgerId, entryId); + if (res != null) { + results.add(EntryImpl.create(ledgerId, entryId, res)); + totalSize += res.readableBytes(); + } else { + break; + } Review Comment: Is it possible that the cache of subsequent entries still in this segment? after we get null from this segment, we will move to the next segment. ########## managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/SharedEntryCacheImpl.java: ########## @@ -0,0 +1,216 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl.cache; + +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException; +import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.api.BKException; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.impl.EntryImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor; +import org.apache.commons.lang3.tuple.Pair; + +@Slf4j +class SharedEntryCacheImpl implements EntryCache { + + private final SharedEntryCacheManagerImpl entryCacheManager; + private final ManagedLedgerImpl ml; + private final ManagedLedgerInterceptor interceptor; + + SharedEntryCacheImpl(ManagedLedgerImpl ml, SharedEntryCacheManagerImpl entryCacheManager) { + this.ml = ml; + this.entryCacheManager = entryCacheManager; + this.interceptor = ml.getManagedLedgerInterceptor(); + } + + @Override + public String getName() { + return ml.getName(); + } + + @Override + public boolean insert(EntryImpl entry) { + return entryCacheManager.insert(entry); + } + + @Override + public void invalidateEntries(PositionImpl lastPosition) { + // No-Op. The cache invalidation is based only on rotating the segment buffers + } + + @Override + public void invalidateEntriesBeforeTimestamp(long timestamp) { + // No-Op. The cache invalidation is based only on rotating the segment buffers + } + + @Override + public void invalidateAllEntries(long ledgerId) { + // No-Op. The cache invalidation is based only on rotating the segment buffers + } + + @Override + public void clear() { + // No-Op. The cache invalidation is based only on rotating the segment buffers + } + + private static final Pair<Integer, Long> NO_EVICTION = Pair.of(0, 0L); + + @Override + public Pair<Integer, Long> evictEntries(long sizeToFree) { + return NO_EVICTION; + } + + @Override + public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader, + AsyncCallbacks.ReadEntriesCallback callback, Object ctx) { + final long ledgerId = lh.getId(); + final int entriesToRead = (int) (lastEntry - firstEntry) + 1; + + if (log.isDebugEnabled()) { + log.debug("[{}] Reading entries range ledger {}: {} to {}", ml.getName(), ledgerId, firstEntry, lastEntry); + } + + List<Entry> cachedEntries = new ArrayList<>(entriesToRead); + long totalCachedSize = entryCacheManager.getRange(ledgerId, firstEntry, lastEntry, cachedEntries); + + if (cachedEntries.size() == entriesToRead) { + // All entries found in cache + entryCacheManager.getFactoryMBean().recordCacheHits(entriesToRead, totalCachedSize); + if (log.isDebugEnabled()) { + log.debug("[{}] Ledger {} -- Found in cache entries: {}-{}", ml.getName(), ledgerId, firstEntry, + lastEntry); + } + + callback.readEntriesComplete(cachedEntries, ctx); + + } else { + if (!cachedEntries.isEmpty()) { + cachedEntries.forEach(entry -> entry.release()); + } + + // Read all the entries from bookkeeper + lh.readAsync(firstEntry, lastEntry).thenAcceptAsync( + ledgerEntries -> { + checkNotNull(ml.getName()); + checkNotNull(ml.getExecutor()); + + try { + // We got the entries, we need to transform them to a List<> type + long totalSize = 0; + final List<Entry> entriesToReturn = Lists.newArrayListWithExpectedSize(entriesToRead); + for (LedgerEntry e : ledgerEntries) { + EntryImpl entry = EntryCacheManager.create(e, interceptor); + + entriesToReturn.add(entry); + totalSize += entry.getLength(); + } + + entryCacheManager.getFactoryMBean().recordCacheMiss(entriesToReturn.size(), totalSize); + ml.getMbean().addReadEntriesSample(entriesToReturn.size(), totalSize); + + callback.readEntriesComplete(entriesToReturn, ctx); + } finally { + ledgerEntries.close(); + } + }, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception -> { + if (exception instanceof BKException + && ((BKException) exception).getCode() == BKException.Code.TooManyRequestsException) { + callback.readEntriesFailed(createManagedLedgerException(exception), ctx); + } else { + ml.invalidateLedgerHandle(lh); + ManagedLedgerException mlException = createManagedLedgerException(exception); + callback.readEntriesFailed(mlException, ctx); + } + return null; + }); + } + } + + @Override + public void asyncReadEntry(ReadHandle lh, PositionImpl position, AsyncCallbacks.ReadEntryCallback callback, + Object ctx) { + try { + asyncReadEntry0(lh, position, callback, ctx); + } catch (Throwable t) { + log.warn("[{}] Failed to read entries for {}-{}", getName(), lh.getId(), position, t); + callback.readEntryFailed(createManagedLedgerException(t), ctx); + } + } + + private void asyncReadEntry0(ReadHandle lh, PositionImpl position, AsyncCallbacks.ReadEntryCallback callback, + Object ctx) { + if (log.isDebugEnabled()) { + log.debug("[{}] Reading entry ledger {}: {}", ml.getName(), lh.getId(), position.getEntryId()); + } + + EntryImpl cachedEntry = entryCacheManager.get(position.getLedgerId(), position.getEntryId()); + + if (cachedEntry != null) { + entryCacheManager.getFactoryMBean().recordCacheHit(cachedEntry.getLength()); + callback.readEntryComplete(cachedEntry, ctx); + } else { + lh.readAsync(position.getEntryId(), position.getEntryId()).thenAcceptAsync( + ledgerEntries -> { + try { + Iterator<LedgerEntry> iterator = ledgerEntries.iterator(); + if (iterator.hasNext()) { + LedgerEntry ledgerEntry = iterator.next(); + EntryImpl returnEntry = EntryCacheManager.create(ledgerEntry, interceptor); + + entryCacheManager.getFactoryMBean().recordCacheMiss(1, returnEntry.getLength()); + ml.getMbean().addReadEntriesSample(1, returnEntry.getLength()); + callback.readEntryComplete(returnEntry, ctx); + } else { + // got an empty sequence + callback.readEntryFailed(new ManagedLedgerException("Could not read given position"), + ctx); + } + } finally { + ledgerEntries.close(); + } + }, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception -> { + ml.invalidateLedgerHandle(lh); + callback.readEntryFailed(createManagedLedgerException(exception), ctx); + return null; + }); + } + } + + @Override + public long getSize() { + return 0; Review Comment: It's better to add some comments here, return 0 here to avoid the cache eviction, and will not expose topic-level cache size metrics since the implementation shared the cache across all topics. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
