Jason918 commented on code in PR #15955:
URL: https://github.com/apache/pulsar/pull/15955#discussion_r937552499
##########
pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongLongPairHashMap.java:
##########
@@ -204,6 +204,12 @@ public LongPair get(long key1, long key2) {
return getSection(h).get(key1, key2, (int) h);
}
+ public long getFirstValue(long key1, long key2) {
Review Comment:
Better to add some doc here. The method name is a bit confusing.
```suggestion
/**
* @return get(key1, key2).first;
*/
public long getFirstValue(long key1, long key2) {
```
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/SharedCacheSegmentBufferRefCount.java:
##########
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl.cache;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.IllegalReferenceCountException;
+import java.util.concurrent.atomic.AtomicInteger;
+import
org.apache.pulsar.common.util.collections.ConcurrentLongPairObjectHashMap;
+
+class SharedCacheSegmentBufferRefCount implements SharedCacheSegment {
+
+ private final AtomicInteger currentSize = new AtomicInteger();
+ private final ConcurrentLongPairObjectHashMap<ByteBuf> index;
+ private final int segmentSize;
+
+ SharedCacheSegmentBufferRefCount(int segmentSize) {
+ this.segmentSize = segmentSize;
+ this.index = ConcurrentLongPairObjectHashMap.<ByteBuf>newBuilder()
+ // We are going to often clear() the map, with the expectation
that it's going to get filled again
+ // immediately after. In these conditions it does not make
sense to shrink it each time.
+ .autoShrink(false)
+ .concurrencyLevel(Runtime.getRuntime().availableProcessors() *
2)
+ .build();
+ }
+
+ @Override
+ public boolean insert(long ledgerId, long entryId, ByteBuf entry) {
+ int newSize = currentSize.addAndGet(entry.readableBytes());
+
+ if (newSize > segmentSize) {
+ // The segment is full
Review Comment:
Should reduce `currentSize` by `entry.readableBytes()`, this insertion is
failed.
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/SharedEntryCacheManagerImpl.java:
##########
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl.cache;
+
+import io.netty.buffer.ByteBuf;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.locks.StampedLock;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryMBeanImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+
+@Slf4j
+public class SharedEntryCacheManagerImpl implements EntryCacheManager {
+
+ private final ManagedLedgerFactoryConfig config;
+ private final ManagedLedgerFactoryMBeanImpl factoryMBean;
+ private final List<SharedCacheSegment> segments = new ArrayList<>();
+ private int currentSegmentIdx = 0;
+ private final int segmentSize;
+ private final int segmentsCount;
+
+ private final StampedLock lock = new StampedLock();
+
+ private static final int DEFAULT_MAX_SEGMENT_SIZE = 1 * 1024 * 1024 * 1024;
+
+ public SharedEntryCacheManagerImpl(ManagedLedgerFactoryImpl factory) {
+ this.config = factory.getConfig();
+ this.factoryMBean = factory.getMbean();
+ long maxCacheSize = config.getMaxCacheSize();
+ if (maxCacheSize > 0) {
+ this.segmentsCount = Math.max(2, (int) (maxCacheSize /
DEFAULT_MAX_SEGMENT_SIZE));
+ this.segmentSize = (int) (maxCacheSize / segmentsCount);
+
+ for (int i = 0; i < segmentsCount; i++) {
+ if (config.isCopyEntriesInCache()) {
+ segments.add(new
SharedCacheSegmentBufferCopy(segmentSize));
+ } else {
+ segments.add(new
SharedCacheSegmentBufferRefCount(segmentSize));
+ }
+ }
+ } else {
+ this.segmentsCount = 0;
+ this.segmentSize = 0;
+ }
+ }
+
+ ManagedLedgerFactoryMBeanImpl getFactoryMBean() {
+ return factoryMBean;
+ }
+
+ @Override
+ public EntryCache getEntryCache(ManagedLedgerImpl ml) {
+ if (getMaxSize() > 0) {
+ return new SharedEntryCacheImpl(ml, this);
+ } else {
+ return new EntryCacheDisabled(ml);
+ }
+ }
+
+ @Override
+ public void removeEntryCache(String name) {
+ // no-op
+ }
+
+ @Override
+ public long getSize() {
+ long totalSize = 0;
+ for (int i = 0; i < segmentsCount; i++) {
+ totalSize += segments.get(i).getSize();
+ }
+ return totalSize;
+ }
+
+ @Override
+ public long getMaxSize() {
+ return config.getMaxCacheSize();
+ }
+
+ @Override
+ public void clear() {
+ segments.forEach(SharedCacheSegment::clear);
+ }
+
+ @Override
+ public void close() {
+ segments.forEach(SharedCacheSegment::close);
+ }
+
+ @Override
+ public void updateCacheSizeAndThreshold(long maxSize) {
+
Review Comment:
This method should be supported when user update `managedLedgerCacheSizeMB`.
We can add an error log here to let user know this.
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/SharedCacheSegmentBufferCopy.java:
##########
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl.cache;
+
+import io.netty.buffer.ByteBuf;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
+
+class SharedCacheSegmentBufferCopy implements AutoCloseable,
SharedCacheSegment {
+
+ private final ByteBuf cacheBuffer;
+ private final AtomicInteger currentOffset = new AtomicInteger();
+ private final ConcurrentLongLongPairHashMap index;
+ private final int segmentSize;
+
+ private static final int ALIGN_64_MASK = ~(64 - 1);
+
+ SharedCacheSegmentBufferCopy(int segmentSize) {
+ this.segmentSize = segmentSize;
+ this.cacheBuffer = PulsarByteBufAllocator.DEFAULT.buffer(segmentSize,
segmentSize);
+ this.cacheBuffer.writerIndex(segmentSize - 1);
+ this.index = ConcurrentLongLongPairHashMap.newBuilder()
+ // We are going to often clear() the map, with the expectation
that it's going to get filled again
+ // immediately after. In these conditions it does not make
sense to shrink it each time.
+ .autoShrink(false)
+ .concurrencyLevel(Runtime.getRuntime().availableProcessors() *
8)
+ .build();
+ }
+
+ @Override
+ public boolean insert(long ledgerId, long entryId, ByteBuf entry) {
+ int entrySize = entry.readableBytes();
+ int alignedSize = align64(entrySize);
+ int offset = currentOffset.getAndAdd(alignedSize);
+
+ if (offset + entrySize > segmentSize) {
+ // The segment is full
Review Comment:
Reset `currentOffset` back to `offset` ?
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/SharedCacheSegmentBufferRefCount.java:
##########
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl.cache;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.IllegalReferenceCountException;
+import java.util.concurrent.atomic.AtomicInteger;
+import
org.apache.pulsar.common.util.collections.ConcurrentLongPairObjectHashMap;
+
+class SharedCacheSegmentBufferRefCount implements SharedCacheSegment {
+
+ private final AtomicInteger currentSize = new AtomicInteger();
+ private final ConcurrentLongPairObjectHashMap<ByteBuf> index;
+ private final int segmentSize;
+
+ SharedCacheSegmentBufferRefCount(int segmentSize) {
+ this.segmentSize = segmentSize;
+ this.index = ConcurrentLongPairObjectHashMap.<ByteBuf>newBuilder()
+ // We are going to often clear() the map, with the expectation
that it's going to get filled again
+ // immediately after. In these conditions it does not make
sense to shrink it each time.
+ .autoShrink(false)
+ .concurrencyLevel(Runtime.getRuntime().availableProcessors() *
2)
+ .build();
+ }
+
+ @Override
+ public boolean insert(long ledgerId, long entryId, ByteBuf entry) {
+ int newSize = currentSize.addAndGet(entry.readableBytes());
+
+ if (newSize > segmentSize) {
+ // The segment is full
+ return false;
+ } else {
+ // Insert entry into read cache segment
+ ByteBuf oldValue = index.putIfAbsent(ledgerId, entryId,
entry.retain());
+ if (oldValue != null) {
+ entry.release();
+ return false;
+ } else {
+ return true;
+ }
+ }
+ }
+
+ @Override
+ public ByteBuf get(long ledgerId, long entryId) {
+ ByteBuf entry = index.get(ledgerId, entryId);
+ if (entry != null) {
+ try {
+ return entry.retain();
+ } catch (IllegalReferenceCountException e) {
+ // Entry was removed between the get() and the retain() calls
+ return null;
+ }
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public int getSize() {
+ return currentSize.get();
+ }
+
+ @Override
+ public void close() {
+ clear();
+ }
+
+ @Override
+ public void clear() {
+ index.forEach((ledgerId, entryId, e) -> e.release());
+ index.clear();
Review Comment:
Reset `currentSize` to 0.
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/SharedEntryCacheManagerImpl.java:
##########
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl.cache;
+
+import io.netty.buffer.ByteBuf;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.locks.StampedLock;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryMBeanImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+
+@Slf4j
+public class SharedEntryCacheManagerImpl implements EntryCacheManager {
+
+ private final ManagedLedgerFactoryConfig config;
+ private final ManagedLedgerFactoryMBeanImpl factoryMBean;
+ private final List<SharedCacheSegment> segments = new ArrayList<>();
+ private int currentSegmentIdx = 0;
+ private final int segmentSize;
+ private final int segmentsCount;
+
+ private final StampedLock lock = new StampedLock();
+
+ private static final int DEFAULT_MAX_SEGMENT_SIZE = 1 * 1024 * 1024 * 1024;
+
+ public SharedEntryCacheManagerImpl(ManagedLedgerFactoryImpl factory) {
+ this.config = factory.getConfig();
+ this.factoryMBean = factory.getMbean();
+ long maxCacheSize = config.getMaxCacheSize();
+ if (maxCacheSize > 0) {
+ this.segmentsCount = Math.max(2, (int) (maxCacheSize /
DEFAULT_MAX_SEGMENT_SIZE));
+ this.segmentSize = (int) (maxCacheSize / segmentsCount);
+
+ for (int i = 0; i < segmentsCount; i++) {
+ if (config.isCopyEntriesInCache()) {
+ segments.add(new
SharedCacheSegmentBufferCopy(segmentSize));
+ } else {
+ segments.add(new
SharedCacheSegmentBufferRefCount(segmentSize));
+ }
+ }
+ } else {
+ this.segmentsCount = 0;
+ this.segmentSize = 0;
+ }
+ }
+
+ ManagedLedgerFactoryMBeanImpl getFactoryMBean() {
+ return factoryMBean;
+ }
+
+ @Override
+ public EntryCache getEntryCache(ManagedLedgerImpl ml) {
+ if (getMaxSize() > 0) {
+ return new SharedEntryCacheImpl(ml, this);
+ } else {
+ return new EntryCacheDisabled(ml);
+ }
+ }
+
+ @Override
+ public void removeEntryCache(String name) {
+ // no-op
+ }
+
+ @Override
+ public long getSize() {
+ long totalSize = 0;
+ for (int i = 0; i < segmentsCount; i++) {
+ totalSize += segments.get(i).getSize();
+ }
+ return totalSize;
+ }
+
+ @Override
+ public long getMaxSize() {
+ return config.getMaxCacheSize();
+ }
+
+ @Override
+ public void clear() {
+ segments.forEach(SharedCacheSegment::clear);
+ }
+
+ @Override
+ public void close() {
+ segments.forEach(SharedCacheSegment::close);
+ }
+
+ @Override
+ public void updateCacheSizeAndThreshold(long maxSize) {
+
+ }
+
+ @Override
+ public void updateCacheEvictionWatermark(double cacheEvictionWatermark) {
+ // No-Op. We don't use the cache eviction watermark in this
implementation
+ }
+
+ @Override
+ public double getCacheEvictionWatermark() {
+ return config.getCacheEvictionWatermark();
+ }
+
+ boolean insert(EntryImpl entry) {
+ int entrySize = entry.getLength();
+
+ if (entrySize > segmentSize) {
+ log.debug("entrySize {} > segmentSize {}, skip update read
cache!", entrySize, segmentSize);
+ return false;
+ }
+
+ long stamp = lock.readLock();
+ try {
+ SharedCacheSegment s = segments.get(currentSegmentIdx);
+
+ if (s.insert(entry.getLedgerId(), entry.getEntryId(),
entry.getDataBuffer())) {
+ return true;
+ }
+ } finally {
+ lock.unlockRead(stamp);
+ }
+
+ // We could not insert in segment, we to get the write lock and
roll-over to
+ // next segment
+ stamp = lock.writeLock();
+
+ try {
+ SharedCacheSegment segment = segments.get(currentSegmentIdx);
+
+ if (segment.insert(entry.getLedgerId(), entry.getEntryId(),
entry.getDataBuffer())) {
+ return true;
+ }
+
+ // Roll to next segment
+ currentSegmentIdx = (currentSegmentIdx + 1) % segmentsCount;
+ segment = segments.get(currentSegmentIdx);
+ segment.clear();
+ return segment.insert(entry.getLedgerId(), entry.getEntryId(),
entry.getDataBuffer());
+ } finally {
+ lock.unlockWrite(stamp);
+ }
+ }
+
+ EntryImpl get(long ledgerId, long entryId) {
+ long stamp = lock.readLock();
+
+ try {
+ // We need to check all the segments, starting from the current
one and looking
+ // backward to minimize the checks for recently inserted entries
+ for (int i = 0; i < segmentsCount; i++) {
+ int segmentIdx = (currentSegmentIdx + (segmentsCount - i)) %
segmentsCount;
+
+ ByteBuf res = segments.get(segmentIdx).get(ledgerId, entryId);
+ if (res != null) {
+ return EntryImpl.create(ledgerId, entryId, res);
+ }
+ }
+ } finally {
+ lock.unlockRead(stamp);
+ }
+
+ return null;
+ }
+
+ long getRange(long ledgerId, long firstEntryId, long lastEntryId,
List<Entry> results) {
+ long totalSize = 0;
+ long stamp = lock.readLock();
+
+ try {
+ // We need to check all the segments, starting from the current
one and looking
+ // backward to minimize the checks for recently inserted entries
+ long entryId = firstEntryId;
+ for (int i = 0; i < segmentsCount; i++) {
+ int segmentIdx = (currentSegmentIdx + (segmentsCount - i)) %
segmentsCount;
+ SharedCacheSegment s = segments.get(segmentIdx);
+
+ for (; entryId <= lastEntryId; entryId++) {
+ ByteBuf res = s.get(ledgerId, entryId);
+ if (res != null) {
+ results.add(EntryImpl.create(ledgerId, entryId, res));
+ totalSize += res.readableBytes();
+ } else {
+ break;
+ }
Review Comment:
> Is it possible that the cache of subsequent entries still in this segment?
after we get null from this segment, we will move to the next segment.
+1
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]