smengcl commented on code in PR #4567:
URL: https://github.com/apache/ozone/pull/4567#discussion_r1220669021


##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java:
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.snapshot;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheLoader;
+import org.apache.hadoop.ozone.om.IOmMetadataReader;
+import org.apache.hadoop.ozone.om.OmSnapshot;
+import org.apache.hadoop.ozone.om.OmSnapshotManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_NOT_FOUND;
+import static 
org.apache.hadoop.ozone.om.helpers.SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE;
+
+/**
+ * Thread-safe custom unbounded LRU cache to manage open snapshot DB instances.
+ */
+public class SnapshotCache {
+
+  static final Logger LOG = LoggerFactory.getLogger(SnapshotCache.class);
+
+  // Snapshot cache internal hash map.
+  // Key:   DB snapshot table key
+  // Value: OmSnapshot instance, each holds a DB instance handle inside
+  // TODO: Also wrap SoftReference<> around the value?
+  private final ConcurrentHashMap<String, ReferenceCounted<IOmMetadataReader>>
+      dbMap;
+
+  // Linked hash set that holds OmSnapshot instances whose reference count
+  // has reached zero. Those entries are eligible to be evicted and closed.
+  // Sorted in last used order.
+  // Least-recently-used entry located at the beginning.
+  // TODO: Check thread safety. Try ConcurrentHashMultiset ?
+  private final LinkedHashSet<ReferenceCounted<IOmMetadataReader>>
+      pendingEvictionList;
+  private final OmSnapshotManager omSnapshotManager;
+  private final CacheLoader<String, OmSnapshot> cacheLoader;
+  // Soft-limit of the total number of snapshot DB instances allowed to be
+  // opened on the OM.
+  private final int cacheSizeLimit;
+
+  public SnapshotCache(
+      OmSnapshotManager omSnapshotManager,
+      CacheLoader<String, OmSnapshot> cacheLoader,
+      int cacheSizeLimit) {
+    this.dbMap = new ConcurrentHashMap<>();
+    this.pendingEvictionList = new LinkedHashSet<>();
+    this.omSnapshotManager = omSnapshotManager;
+    this.cacheLoader = cacheLoader;
+    this.cacheSizeLimit = cacheSizeLimit;
+  }
+
+  @VisibleForTesting
+  ConcurrentHashMap<String, ReferenceCounted<IOmMetadataReader>> getDbMap() {
+    return dbMap;
+  }
+
+  @VisibleForTesting
+  LinkedHashSet<ReferenceCounted<IOmMetadataReader>> getPendingEvictionList() {
+    return pendingEvictionList;
+  }
+
+  /**
+   * @return number of DB instances currently held in cache.
+   */
+  public int size() {
+    return dbMap.size();
+  }
+
+  /**
+   * Immediately invalidate an entry.
+   * @param key DB snapshot table key
+   */
+  public void invalidate(String key) throws IOException {
+    dbMap.computeIfPresent(key, (k, v) -> {
+      pendingEvictionList.remove(v);
+      try {
+        ((OmSnapshot) v.get()).close();
+      } catch (IOException e) {
+        throw new IllegalStateException("Failed to close snapshot: " + key, e);
+      }
+      // Remove the entry from map by returning null
+      return null;
+    });
+  }
+
+  /**
+   * Immediately invalidate all entries and close their DB instances in cache.
+   */
+  public void invalidateAll() {
+    Iterator<Map.Entry<String, ReferenceCounted<IOmMetadataReader>>>
+        it = dbMap.entrySet().iterator();
+
+    while (it.hasNext()) {
+      Map.Entry<String, ReferenceCounted<IOmMetadataReader>> entry = it.next();
+      pendingEvictionList.remove(entry.getValue());
+      OmSnapshot omSnapshot = (OmSnapshot) entry.getValue().get();
+      try {
+        // TODO: If wrapped with SoftReference<>, omSnapshot could be null?
+        omSnapshot.close();
+      } catch (IOException e) {
+        throw new IllegalStateException("Failed to close snapshot", e);
+      }
+      it.remove();
+    }
+  }
+
+  /**
+   * State the reason the current thread is getting the OmSnapshot instance.
+   */
+  public enum Reason {
+    FS_API_READ,
+    SNAPDIFF_READ,
+    DEEP_CLEAN_WRITE,
+    GARBAGE_COLLECTION_WRITE
+  }
+
+  /**
+   * Get or load OmSnapshot. Must be close()d after use.
+   * TODO: [SNAPSHOT] Can add reason enum to param list later.
+   * @param key snapshot table key
+   * @return an OmSnapshot instance, or null on error
+   */
+  public ReferenceCounted<IOmMetadataReader> get(String key)
+      throws IOException {
+    // Atomic operation to initialize the OmSnapshot instance (once) if the key
+    // does not exist.
+    ReferenceCounted<IOmMetadataReader> rcOmSnapshot =
+        dbMap.computeIfAbsent(key, k -> {
+          LOG.info("Loading snapshot. Table key: {}", k);
+          try {
+            return new ReferenceCounted<>(cacheLoader.load(k));
+          } catch (OMException omEx) {
+            // Return null if the snapshot is no longer active
+            if (!omEx.getResult().equals(FILE_NOT_FOUND)) {
+              throw new IllegalStateException(omEx);
+            }
+          } catch (IOException ioEx) {
+            // Failed to load snapshot DB
+            throw new IllegalStateException(ioEx);
+          } catch (Exception ex) {
+            // Unexpected and unknown exception thrown from CacheLoader#load
+            throw new IllegalStateException(ex);
+          }
+          // Do not put the value in the map on exception
+          return null;
+        });
+
+    if (rcOmSnapshot == null) {
+      // The only exception that would fall through the loader logic above
+      // is OMException with FILE_NOT_FOUND.
+      throw new OMException("Snapshot table key '" + key + "' not found, "
+          + "or the snapshot is no longer active",
+          OMException.ResultCodes.FILE_NOT_FOUND);
+    }
+
+    // If the snapshot is already loaded in cache, the check inside the loader
+    // above is ignored. But we would still want to reject all get()s except
+    // when called from SDT (and some) if the snapshot is not active any more.
+    if (!omSnapshotManager.isSnapshotStatus(key, SNAPSHOT_ACTIVE) &&
+        !SnapshotUtils.isCalledFromSnapshotDeletingService()) {
+      throw new OMException("Unable to load snapshot. " +
+          "Snapshot with table key '" + key + "' is no longer active",
+          FILE_NOT_FOUND);
+    }
+
+    // Increment the reference count on the instance.
+    rcOmSnapshot.incrementRefCount();
+
+    // Remove instance from clean up list when it exists.
+    // TODO: [SNAPSHOT] Check thread safety with release()
+    pendingEvictionList.remove(rcOmSnapshot);
+
+    // Check if any entries can be cleaned up.
+    // At this point, cache size might temporarily exceed cacheSizeLimit
+    // even if there are entries that can be evicted, which is fine since it
+    // is a soft limit.
+    cleanup();
+
+    return rcOmSnapshot;
+  }
+
+  /**
+   * Release the reference count on the OmSnapshot instance.
+   * @param key snapshot table key
+   */
+  public void release(String key) {

Review Comment:
   Ah I see, `ReferenceCounted::close` does not directly trigger 
`SnapshotCache::release` right now.
   
   I would need to incorporate the `pendingEvictionList` logic in there 
somehow. So that `cleanup()` would act correctly upon cache eviction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to