ivankelly commented on a change in pull request #832: Issue 620: Close the 
fileChannels for read when they are idle
URL: https://github.com/apache/bookkeeper/pull/832#discussion_r166325050
 
 

 ##########
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
 ##########
 @@ -330,54 +333,216 @@ private int readFromLogChannel(long entryLogId, 
BufferedReadChannel channel, Byt
      * A thread-local variable that wraps a mapping of log ids to 
bufferedchannels
      * These channels should be used only for reading. logChannel is the one
      * that is used for writes.
+     * We use this Guava cache to store the BufferedReadChannel.
+     * When the BufferedReadChannel is removed, the underlying fileChannel's 
refCnt decrease 1,
+     * temporally use 1h to relax replace after reading.
      */
-    private final ThreadLocal<Map<Long, BufferedReadChannel>> logid2Channel =
-            new ThreadLocal<Map<Long, BufferedReadChannel>>() {
+    private final ThreadLocal<Cache<Long, BufferedReadChannel>> logid2Channel =
+            new ThreadLocal<Cache<Long, BufferedReadChannel>>() {
         @Override
-        public Map<Long, BufferedReadChannel> initialValue() {
+        public Cache<Long, BufferedReadChannel> initialValue() {
             // Since this is thread local there only one modifier
             // We dont really need the concurrency, but we need to use
             // the weak values. Therefore using the concurrency level of 1
-            return new MapMaker().concurrencyLevel(1)
-                .weakValues()
-                .makeMap();
+            return CacheBuilder.newBuilder().concurrencyLevel(1)
+                    .expireAfterAccess(readChannelCacheExpireTimeMs, 
TimeUnit.MILLISECONDS)
+                    //decrease the refCnt
+                    .removalListener(removal -> logid2FileChannel.get((Long) 
removal.getKey()).release())
+                    .build(readChannelLoader);
+        }
+    };
+
+    @VisibleForTesting
+    long getReadChannelCacheExpireTimeMs() {
+        return readChannelCacheExpireTimeMs;
+    }
+
+    @VisibleForTesting
+    CacheLoader<Long, BufferedReadChannel> getReadChannelLoader() {
+        return readChannelLoader;
+    }
+
+    private final  CacheLoader<Long, BufferedReadChannel> readChannelLoader =
+            new CacheLoader<Long, BufferedReadChannel> () {
+        public BufferedReadChannel load(Long entryLogId) throws Exception {
+
+            return getChannelForLogId(entryLogId);
+
         }
     };
 
     /**
-     * Each thread local buffered read channel can share the same file handle 
because reads are not relative
-     * and don't cause a change in the channel's position. We use this map to 
store the file channels. Each
-     * file channel is mapped to a log id which represents an open log file.
+     * FileChannelBackingCache used to cache RefCntFileChannels for read.
+     * In order to avoid get released file, adopt design of 
FileInfoBackingCache.
+     * @see FileInfoBackingCache
      */
-    private final ConcurrentMap<Long, FileChannel> logid2FileChannel = new 
ConcurrentHashMap<Long, FileChannel>();
+    class FileChannelBackingCache {
+        static final int DEAD_REF = -0xdead;
+
+        final ConcurrentHashMap<Long, CachedFileChannel> fileChannels = new 
ConcurrentHashMap<>();
+
+        CachedFileChannel loadFileChannel(long logId) throws IOException {
+            CachedFileChannel cachedFileChannel = fileChannels.get(logId);
+            if (cachedFileChannel != null) {
+                boolean retained = cachedFileChannel.tryRetain();
+                assert(retained);
+                return cachedFileChannel;
+            }
+            File file = findFile(logId);
+            // get channel is used to open an existing entry log file
+            // it would be better to open using read mode
+            FileChannel newFc = new RandomAccessFile(file, "r").getChannel();
+            cachedFileChannel = new CachedFileChannel(logId, newFc);
+            fileChannels.put(logId, cachedFileChannel);
+            boolean retained = cachedFileChannel.tryRetain();
+            assert(retained);
+            return cachedFileChannel;
+        }
+
+        /**
+         * close FileChannel and remove from cache when possible.
+         * @param logId
+         * @param fc
+         */
+        private void releaseFileChannel(long logId, CachedFileChannel fc) {
+            if (fc.markDead()) {
+                try {
+                    fc.fc.close();
+                } catch (IOException e) {
+                    LOG.warn("Exception occurred in 
ReferenceCountedFileChannel"
+                            + " while closing channel for log file: {}", fc);
+                } finally {
+                    IOUtils.close(LOG, fc.fc);
+                }
+                fileChannels.remove(logId);
+            }
+        }
+
+        /**
+         * Remove all entries for this log file in each thread's cache.
+         * @param logId
+         */
+        public void removeFromChannelsAndClose(long logId) {
+            //remove the fileChannel from FileChannelBackingCache and close it
+            CachedFileChannel fileChannel = fileChannels.remove(logId);
+            fileChannel.release();
+            try {
+                fileChannel.fc.close();
+            } catch (IOException e) {
+                LOG.warn("Exception occurred in CachedFileChannel"
+                        + " while closing channel for log file: {}", logId);
+            } finally {
+                IOUtils.close(LOG, fileChannel.fc);
+            }
+        }
+
+        void closeAllFileChannels() throws IOException {
+            for (Map.Entry<Long, CachedFileChannel> entry : 
fileChannels.entrySet()) {
+                entry.getValue().fc.close();
+            }
+        }
+
+        public CachedFileChannel get(Long logId) {
+            return fileChannels.get(logId);
+        }
+
+        class CachedFileChannel {
+            private final FileChannel fc;
+            private final long entryLogId;
+            final AtomicInteger refCount;
+
+            CachedFileChannel(long entryLogId, FileChannel fileChannel) {
+                this.entryLogId = entryLogId;
+                this.fc = fileChannel;
+                this.refCount = new AtomicInteger(0);
+            }
+
+            /**
+             * Mark this fileinfo as dead. We can only mark a fileinfo as
+             * dead if noone currently holds a reference to it.
+             *
+             * @return true if we marked as dead, false otherwise
+             */
+            private boolean markDead() {
+                return refCount.compareAndSet(0, DEAD_REF);
+            }
+
+            /**
+             * Attempt to retain the file info.
+             * When a client obtains a fileinfo from a container object,
 
 Review comment:
   this is the fileinfo comment. needs to be changed for this usage. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to