reddycharan commented on a change in pull request #1391: Issue #570: 
EntryLogManagerForEntryLogPerLedger implementation
URL: https://github.com/apache/bookkeeper/pull/1391#discussion_r189063807
 
 

 ##########
 File path: 
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java
 ##########
 @@ -0,0 +1,463 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import io.netty.buffer.ByteBuf;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
+import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.commons.lang.mutable.MutableInt;
+
+@Slf4j
+class EntryLogManagerForEntryLogPerLedger extends EntryLogManagerBase {
+
+    static class EntryLogAndLockTuple {
+        private final Lock ledgerLock;
+        private BufferedLogChannel entryLog;
+
+        public EntryLogAndLockTuple() {
+            ledgerLock = new ReentrantLock();
+        }
+
+        public Lock getLedgerLock() {
+            return ledgerLock;
+        }
+
+        public BufferedLogChannel getEntryLog() {
+            return entryLog;
+        }
+
+        public void setEntryLog(BufferedLogChannel entryLog) {
+            this.entryLog = entryLog;
+        }
+    }
+
+    private LoadingCache<Long, EntryLogAndLockTuple> ledgerIdEntryLogMap;
+    /*
+     * every time active logChannel is accessed from ledgerIdEntryLogMap
+     * cache, the accesstime of that entry is updated. But for certain
+     * operations we dont want to impact accessTime of the entries (like
+     * periodic flush of current active logChannels), and those operations
+     * can use this copy of references.
+     */
+    private final ConcurrentHashMap<Long, BufferedLogChannel> 
replicaOfCurrentLogChannels;
+    private final CacheLoader<Long, EntryLogAndLockTuple> 
entryLogAndLockTupleCacheLoader;
+    private EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus;
+    private final int entrylogMapAccessExpiryTimeInSeconds;
+    private final int maximumNumberOfActiveEntryLogs;
+
+    EntryLogManagerForEntryLogPerLedger(ServerConfiguration conf, 
LedgerDirsManager ledgerDirsManager,
+            EntryLoggerAllocator entryLoggerAllocator, 
List<EntryLogger.EntryLogListener> listeners,
+            EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus) 
throws IOException {
+        super(conf, ledgerDirsManager, entryLoggerAllocator, listeners);
+        this.recentlyCreatedEntryLogsStatus = recentlyCreatedEntryLogsStatus;
+        this.rotatedLogChannels = new 
CopyOnWriteArrayList<BufferedLogChannel>();
+        this.replicaOfCurrentLogChannels = new ConcurrentHashMap<Long, 
BufferedLogChannel>();
+        this.entrylogMapAccessExpiryTimeInSeconds = 
conf.getEntrylogMapAccessExpiryTimeInSeconds();
+        this.maximumNumberOfActiveEntryLogs = 
conf.getMaximumNumberOfActiveEntryLogs();
+        ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
+        this.entryLogAndLockTupleCacheLoader = new CacheLoader<Long, 
EntryLogAndLockTuple>() {
+            @Override
+            public EntryLogAndLockTuple load(Long key) throws Exception {
+                return new EntryLogAndLockTuple();
+            }
+        };
+        /*
+         * Currently we are relying on access time based eviction policy for
+         * removal of EntryLogAndLockTuple, so if the EntryLogAndLockTuple of
+         * the ledger is not accessed in
+         * entrylogMapAccessExpiryTimeInSeconds period, it will be removed
+         * from the cache.
+         *
+         * We are going to introduce explicit advisory writeClose call, with
+         * that explicit call EntryLogAndLockTuple of the ledger will be
+         * removed from the cache. But still timebased eviciton policy is
+         * needed because it is not guaranteed that Bookie/EntryLogger would
+         * receive successfully write close call in all the cases.
+         */
+        ledgerIdEntryLogMap = CacheBuilder.newBuilder()
+                .expireAfterAccess(entrylogMapAccessExpiryTimeInSeconds, 
TimeUnit.SECONDS)
+                .maximumSize(maximumNumberOfActiveEntryLogs)
+                .removalListener(new RemovalListener<Long, 
EntryLogAndLockTuple>() {
+                    @Override
+                    public void onRemoval(
+                            RemovalNotification<Long, EntryLogAndLockTuple> 
expiredLedgerEntryLogMapEntry) {
+                        removalOnExpiry(expiredLedgerEntryLogMapEntry);
+                    }
+                }).build(entryLogAndLockTupleCacheLoader);
+    }
+
+    /*
+     * This method is called when access time of that ledger has elapsed
+     * entrylogMapAccessExpiryTimeInSeconds period and the entry for that
+     * ledger is removed from cache. Since the entrylog of this ledger is
+     * not active anymore it has to be removed from
+     * replicaOfCurrentLogChannels and added to rotatedLogChannels.
+     *
+     * Because of performance/optimizations concerns the cleanup maintenance
+     * operations wont happen automatically, for more info on eviction
+     * cleanup maintenance tasks -
+     * https://google.github.io/guava/releases/19.0/api/docs/com/google/
+     * common/cache/CacheBuilder.html
+     *
+     */
+    private void removalOnExpiry(RemovalNotification<Long, 
EntryLogAndLockTuple> expiredLedgerEntryLogMapEntry) {
+        Long ledgerId = expiredLedgerEntryLogMapEntry.getKey();
+        log.debug("LedgerId {} is not accessed for 
entrylogMapAccessExpiryTimeInSeconds"
+                + " period so it is being evicted from the cache map", 
ledgerId);
+        EntryLogAndLockTuple entryLogAndLockTuple = 
expiredLedgerEntryLogMapEntry.getValue();
+        Lock lock = entryLogAndLockTuple.ledgerLock;
+        BufferedLogChannel logChannel = entryLogAndLockTuple.entryLog;
+        lock.lock();
+        try {
+            replicaOfCurrentLogChannels.remove(logChannel.getLogId());
 
 Review comment:
   yes, in the existing implementation entry would be removed only in the 
following cases EXPIRED and SIZE. In these scenarios we would want the eviction 
behavior. COLLECTED is not relevant, since we are not using 
WeakKeys/WeakValues/SoftValues. When explicit writeclose is implemented we 
would do EXPLICIT removal by calling invalidate method explicitly, and even in 
that case we would need same behavior. In our implementation we would never get 
into REPLACED scenario, because we are not calling 'put' or 'putAll' method 
anywhere, value of the entry (EntryLogAndLockTuple) is CacheLoaded by the 
CacheLoader.
   
   Being said that, I should rename this method and remove word 'Expiry' from 
name and update the method description. Thanks for pointing that out. 
   
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to