Author: fpj Date: Sat Jul 13 15:52:29 2013 New Revision: 1502806 URL: http://svn.apache.org/r1502806 Log: BOOKKEEPER-601: readahead cache size isn't updated correctly (sijie via fpj)
Modified: zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt zookeeper/bookkeeper/branches/branch-4.2/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java Modified: zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt?rev=1502806&r1=1502805&r2=1502806&view=diff ============================================================================== --- zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt (original) +++ zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt Sat Jul 13 15:52:29 2013 @@ -44,6 +44,8 @@ Release 4.2.2 - Unreleased BOOKKEEPER-579: TestSubAfterCloseSub was put in a wrong package (sijie via ivank) + BOOKKEEPER-601: readahead cache size isn't updated correctly (sijie via fpj) + hedwig-client: BOOKKEEPER-598: Fails to compile - RESUBSCRIBE_EXCEPTION conflict (Matthew Farrellee via ivank) Modified: zookeeper/bookkeeper/branches/branch-4.2/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.2/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java?rev=1502806&r1=1502805&r2=1502806&view=diff ============================================================================== --- zookeeper/bookkeeper/branches/branch-4.2/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java (original) +++ zookeeper/bookkeeper/branches/branch-4.2/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java Sat Jul 13 15:52:29 2013 @@ -17,11 +17,9 @@ */ package org.apache.hedwig.server.persistence; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; -import java.util.Map; import java.util.Queue; import java.util.Set; import java.util.SortedMap; @@ -30,33 +28,28 @@ import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.hedwig.protocol.PubSubProtocol; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.protobuf.ByteString; +import java.util.concurrent.atomic.AtomicLong; import org.apache.bookkeeper.util.MathUtils; import org.apache.bookkeeper.util.OrderedSafeExecutor; import org.apache.bookkeeper.util.SafeRunnable; import org.apache.hedwig.exceptions.PubSubException; import org.apache.hedwig.exceptions.PubSubException.ServerNotResponsibleForTopicException; +import org.apache.hedwig.protocol.PubSubProtocol; import org.apache.hedwig.protocol.PubSubProtocol.Message; import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId; import org.apache.hedwig.protoextensions.MessageIdUtils; import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.server.common.UnexpectedError; import org.apache.hedwig.server.jmx.HedwigJMXService; import org.apache.hedwig.server.jmx.HedwigMBeanInfo; import org.apache.hedwig.server.jmx.HedwigMBeanRegistry; -import org.apache.hedwig.server.persistence.ReadAheadCacheBean; import org.apache.hedwig.util.Callback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.protobuf.ByteString; public class ReadAheadCache implements PersistenceManager, HedwigJMXService { @@ -186,10 +179,12 @@ public class ReadAheadCache implements P * the real persistence manager. */ + @Override public long getSeqIdAfterSkipping(ByteString topic, long seqId, int skipAmount) { return realPersistenceManager.getSeqIdAfterSkipping(topic, seqId, skipAmount); } + @Override public MessageSeqId getCurrentSeqIdForTopic(ByteString topic) throws ServerNotResponsibleForTopicException { return realPersistenceManager.getCurrentSeqIdForTopic(topic); } @@ -203,6 +198,7 @@ public class ReadAheadCache implements P * our callback on the return path * */ + @Override public void persistMessage(PersistRequest request) { // make a new PersistRequest object so that we can insert our own // callback in the middle. Assign the original request as the context @@ -225,6 +221,7 @@ public class ReadAheadCache implements P * In case there is a failure in persisting, just pass it to the * original callback */ + @Override public void operationFailed(Object ctx, PubSubException exception) { PersistRequest originalRequest = (PersistRequest) ctx; Callback<PubSubProtocol.MessageSeqId> originalCallback = originalRequest.getCallback(); @@ -237,6 +234,7 @@ public class ReadAheadCache implements P * success, and then opportunistically treat the message as if it just * came in through a scan */ + @Override public void operationFinished(Object ctx, PubSubProtocol.MessageSeqId resultOfOperation) { PersistRequest originalRequest = (PersistRequest) ctx; @@ -283,6 +281,7 @@ public class ReadAheadCache implements P * 2. Scan - Since the scan needs to touch the cache, we will just enqueue * the scan request and let the cache maintainer thread handle it. */ + @Override public void scanSingleMessage(ScanRequest request) { // Let the scan requests be serialized through the queue enqueueWithoutFailureByTopic(request.getTopic(), @@ -295,6 +294,7 @@ public class ReadAheadCache implements P * 3. Enqueue the request so that the cache maintainer thread can delete all * message-ids older than the one specified */ + @Override public void deliveredUntil(ByteString topic, Long seqId) { enqueueWithoutFailureByTopic(topic, new DeliveredUntil(topic, seqId)); } @@ -308,18 +308,22 @@ public class ReadAheadCache implements P * get aged out of the cache eventually. For now, there is no need to * proactively remove those entries from the cache. */ + @Override public void consumedUntil(ByteString topic, Long seqId) { realPersistenceManager.consumedUntil(topic, seqId); } + @Override public void setMessageBound(ByteString topic, Integer bound) { realPersistenceManager.setMessageBound(topic, bound); } + @Override public void clearMessageBound(ByteString topic) { realPersistenceManager.clearMessageBound(topic); } + @Override public void consumeToBound(ByteString topic) { realPersistenceManager.consumeToBound(topic); } @@ -327,6 +331,7 @@ public class ReadAheadCache implements P /** * Stop the readahead cache. */ + @Override public void stop() { try { keepRunning = false; @@ -439,6 +444,7 @@ public class ReadAheadCache implements P this.topic = topic; } + @Override public void messageScanned(Object ctx, Message message) { // Any message we read is potentially useful for us, so lets first @@ -473,10 +479,12 @@ public class ReadAheadCache implements P } + @Override public void scanFailed(Object ctx, Exception exception) { enqueueDeleteOfRemainingStubs(exception); } + @Override public void scanFinished(Object ctx, ReasonForFinish reason) { // If the scan finished because no more messages are present, its ok // to leave the stubs in place because they will get filled in as @@ -501,6 +509,7 @@ public class ReadAheadCache implements P protected static class HashSetCacheKeyFactory implements Factory<Set<CacheKey>> { protected final static HashSetCacheKeyFactory instance = new HashSetCacheKeyFactory(); + @Override public Set<CacheKey> newInstance() { return new HashSet<CacheKey>(); } @@ -509,6 +518,7 @@ public class ReadAheadCache implements P protected static class TreeSetLongFactory implements Factory<SortedSet<Long>> { protected final static TreeSetLongFactory instance = new TreeSetLongFactory(); + @Override public SortedSet<Long> newInstance() { return new TreeSet<Long>(); } @@ -527,7 +537,6 @@ public class ReadAheadCache implements P logger.debug("Adding msg {} to readahead cache", cacheKey); CacheValue cacheValue; - if ((cacheValue = cache.get(cacheKey)) == null) { cacheValue = new CacheValue(); CacheValue oldValue = cache.putIfAbsent(cacheKey, cacheValue); @@ -539,11 +548,13 @@ public class ReadAheadCache implements P } CacheSegment segment = cacheSegment.get(); - int size = message.getBody().size(); + if (cacheValue.isStub()) { // update cache size only when cache value is a stub + int size = message.getBody().size(); - // update the cache size - segment.presentSegmentSize.addAndGet(size); - presentCacheSize.addAndGet(size); + // update the cache size + segment.presentSegmentSize.addAndGet(size); + presentCacheSize.addAndGet(size); + } synchronized (cacheValue) { // finally add the message to the cache @@ -663,6 +674,7 @@ public class ReadAheadCache implements P * on the callbacks registered for that stub, and delete the entry from * the cache */ + @Override public void performRequest() { removeMessageFromCache(cacheKey, exception, // maintainTimeIndex= @@ -696,6 +708,7 @@ public class ReadAheadCache implements P this.request = request; } + @Override public void performRequest() { // cancel scan request cancelScanRequest(request.getScanRequest()); @@ -732,6 +745,7 @@ public class ReadAheadCache implements P this.message = message; } + @Override public void performRequest() { addMessageToCache(cacheKey, message, MathUtils.now()); } @@ -747,6 +761,7 @@ public class ReadAheadCache implements P this.seqId = seqId; } + @Override public void performRequest() { SortedSet<Long> orderedSeqIds = orderedIndexOnSeqId.get(topic); if (orderedSeqIds == null) { @@ -795,6 +810,7 @@ public class ReadAheadCache implements P * underlying persistence manager. */ + @Override public void performRequest() { RangeScanRequest readAheadRequest = doReadAhead(request); @@ -805,7 +821,7 @@ public class ReadAheadCache implements P CacheValue cacheValue = cache.get(cacheKey); if (null == cacheValue) { logger.error("Cache key {} is removed after installing stub when scanning.", cacheKey); - // reissue the request + // reissue the request scanSingleMessage(request); return; }