VladRodionov commented on code in PR #8231:
URL: https://github.com/apache/hbase/pull/8231#discussion_r3236370236


##########
hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/cache/TopologyBackedCacheAccessService.java:
##########
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile.cache;
+
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
+import org.apache.hadoop.hbase.io.hfile.BlockType;
+import org.apache.hadoop.hbase.io.hfile.CacheStats;
+import org.apache.hadoop.hbase.io.hfile.Cacheable;
+import org.apache.hadoop.hbase.io.hfile.HFileBlock;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * {@link CacheAccessService} implementation backed by {@link CacheTopology} 
and {@link CacheEngine}
+ * instances.
+ * <p>
+ * This implementation connects the cache access layer to the topology, 
policy, and engine
+ * abstractions. {@link CachePlacementAdmissionPolicy} decides whether a block 
should be admitted,
+ * where it should be placed, which representation should be used, and whether 
a hit should trigger
+ * promotion. {@link CacheTopology} provides tier structure, engine lookup, 
aggregate statistics,
+ * and topology-specific promotion mechanics. {@link CacheEngine} performs the 
actual storage
+ * operations.
+ * </p>
+ * <p>
+ * This class is the topology-backed counterpart to {@link 
BlockCacheBackedCacheAccessService}. The
+ * block-cache-backed implementation is useful for incremental migration with 
no behavior change.
+ * This implementation is useful once callers are ready to exercise the new 
topology and engine
+ * abstractions directly through {@link CacheAccessService}.
+ * </p>
+ * <p>
+ * Representation conversion is not performed by this initial implementation. 
This preserves current
+ * behavior while leaving room for future packed/unpacked or engine-default 
representation handling.
+ * </p>
+ */
[email protected]
+public class TopologyBackedCacheAccessService implements CacheAccessService {
+
+  private final CacheTopology topology;
+  private final CachePlacementAdmissionPolicy policy;
+  private final CacheTopologyView topologyView;
+
+  /**
+   * Creates a topology-backed cache access service.
+   * @param topology cache topology used for tier structure, engine lookup, 
and promotion mechanics
+   * @param policy   placement and admission policy used for cache access 
decisions
+   */
+  public TopologyBackedCacheAccessService(CacheTopology topology,
+    CachePlacementAdmissionPolicy policy) {
+    this.topology = Objects.requireNonNull(topology, "topology must not be 
null");
+    this.policy = Objects.requireNonNull(policy, "policy must not be null");
+    this.topologyView =
+      Objects.requireNonNull(topology.getView(), "topology view must not be 
null");
+  }
+
+  /**
+   * Returns the topology used by this service.
+   * <p>
+   * This accessor is intended for tests, diagnostics, and transitional 
wiring. HBase read/write
+   * path callers should use {@link CacheAccessService} methods instead of 
accessing topology
+   * directly.
+   * </p>
+   * @return cache topology
+   */
+  public CacheTopology getTopology() {
+    return topology;
+  }
+
+  /**
+   * Returns the placement/admission policy used by this service.
+   * @return placement/admission policy
+   */
+  public CachePlacementAdmissionPolicy getPolicy() {
+    return policy;
+  }
+
+  /**
+   * Returns the read-only topology view used by policy calls.
+   * @return read-only topology view
+   */
+  public CacheTopologyView getTopologyView() {
+    return topologyView;
+  }
+
+  /**
+   * Returns a human-readable service name.
+   * @return service name
+   */
+  @Override
+  public String getName() {
+    return topology.getName();
+  }
+
+  /**
+   * Fetches a block by checking topology tiers in lookup order.
+   * <p>
+   * The lookup order is defined by {@link CacheTopology#getTiers()}. On a 
cache hit, this method
+   * asks the configured {@link CachePlacementAdmissionPolicy} whether the 
block should be promoted.
+   * If promotion is requested and the target tier exists, promotion mechanics 
are delegated to
+   * {@link CacheTopology#promote(BlockCacheKey, Cacheable, CacheEngine, 
CacheEngine)}.
+   * </p>
+   * @param cacheKey block to fetch
+   * @param context  cache request context
+   * @return cached block, or {@code null} if not present in any tier
+   */
+  @Override
+  public Cacheable getBlock(BlockCacheKey cacheKey, CacheRequestContext 
context) {
+    Objects.requireNonNull(cacheKey, "cacheKey must not be null");
+    Objects.requireNonNull(context, "context must not be null");
+
+    for (CacheTier tier : topology.getTiers()) {
+      Optional<CacheEngine> engine = topology.getEngine(tier);
+      if (engine.isEmpty()) {
+        continue;
+      }
+
+      Cacheable block = getBlockFromEngine(engine.get(), cacheKey, context);
+      if (block != null) {
+        maybePromote(cacheKey, block, tier, engine.get(), context);
+        return block;
+      }
+    }
+
+    return null;
+  }
+
+  /**
+   * Adds a block to the cache using policy-selected target tiers.
+   * <p>
+   * This method first asks the configured policy whether the block should be 
admitted. If admitted,
+   * the policy selects the target tier or tiers. The block is then inserted 
into each selected
+   * engine using {@link CacheEngine#cacheBlock(BlockCacheKey, Cacheable, 
boolean, boolean)}.
+   * </p>
+   * <p>
+   * The policy's representation decision is intentionally not applied in this 
initial
+   * implementation. The current block object is passed through unchanged.
+   * </p>
+   * @param cacheKey block cache key
+   * @param block    block contents
+   * @param context  cache write context
+   */
+  @Override
+  public void cacheBlock(BlockCacheKey cacheKey, Cacheable block, 
CacheWriteContext context) {
+    Objects.requireNonNull(cacheKey, "cacheKey must not be null");
+    Objects.requireNonNull(block, "block must not be null");
+    Objects.requireNonNull(context, "context must not be null");
+
+    AdmissionDecision admission =
+      policy.shouldAdmit(cacheKey, block, context, AdmissionPriority.NORMAL, 
topologyView);
+    if (!admission.isAdmitted()) {
+      return;
+    }
+
+    policy.selectRepresentation(cacheKey, block, context, topologyView);
+    TierDecision tierDecision = policy.selectTier(cacheKey, block, context, 
topologyView);
+    for (CacheTier tier : tierDecision.getTiers()) {
+      Optional<CacheEngine> engine = topology.getEngine(tier);
+      if (engine.isPresent()) {
+        engine.get().cacheBlock(cacheKey, block, context.isInMemory(), 
context.isWaitWhenCache());
+      }
+    }
+  }
+
+  /**
+   * Evicts a single block from all engines participating in the topology.
+   * @param cacheKey block to remove
+   * @return {@code true} if at least one engine removed the block, {@code 
false} otherwise
+   */
+  @Override
+  public boolean evictBlock(BlockCacheKey cacheKey) {
+    Objects.requireNonNull(cacheKey, "cacheKey must not be null");
+
+    boolean evicted = false;
+    for (CacheEngine engine : topology.getEngines()) {
+      evicted |= engine.evictBlock(cacheKey);
+    }
+    return evicted;
+  }
+
+  /**
+   * Evicts all cached blocks for the given HFile from all engines 
participating in the topology.
+   * @param hfileName HFile name
+   * @return total number of blocks removed across all engines
+   */
+  @Override
+  public int evictBlocksByHfileName(String hfileName) {
+    Objects.requireNonNull(hfileName, "hfileName must not be null");
+
+    int evicted = 0;
+    for (CacheEngine engine : topology.getEngines()) {
+      evicted += engine.evictBlocksByHfileName(hfileName);
+    }
+    return evicted;
+  }
+
+  /**
+   * Evicts cached blocks for the given HFile range from all engines 
participating in the topology.
+   * @param hfileName  HFile name
+   * @param initOffset inclusive start offset
+   * @param endOffset  inclusive end offset
+   * @return total number of blocks removed across all engines
+   */
+  @Override
+  public int evictBlocksRangeByHfileName(String hfileName, long initOffset, 
long endOffset) {
+    Objects.requireNonNull(hfileName, "hfileName must not be null");
+
+    int evicted = 0;
+    for (CacheEngine engine : topology.getEngines()) {
+      evicted += engine.evictBlocksRangeByHfileName(hfileName, initOffset, 
endOffset);
+    }
+    return evicted;
+  }
+
+  /**
+   * Evicts cached blocks for the given region from all engines participating 
in the topology.
+   * @param regionName region name
+   * @return total number of blocks removed across all engines
+   */
+  @Override
+  public int evictBlocksByRegionName(String regionName) {
+    Objects.requireNonNull(regionName, "regionName must not be null");
+
+    int evicted = 0;
+    for (CacheEngine engine : topology.getEngines()) {
+      evicted += engine.evictBlocksByRegionName(regionName);
+    }
+    return evicted;
+  }
+
+  /**
+   * Returns aggregate topology statistics.
+   * @return aggregate cache statistics
+   */
+  @Override
+  public CacheStats getStats() {
+    return topology.getStats();
+  }
+
+  /**
+   * Shuts down the topology.
+   */
+  @Override
+  public void shutdown() {
+    topology.shutdown();
+  }
+
+  /**
+   * Returns aggregate maximum configured cache size across participating 
engines.
+   * @return aggregate maximum cache size
+   */
+  @Override
+  public long getMaxSize() {
+    long size = 0L;
+    for (CacheEngine engine : topology.getEngines()) {
+      size += engine.getMaxSize();
+    }
+    return size;
+  }
+
+  /**
+   * Returns aggregate free cache size across participating engines.
+   * @return aggregate free size
+   */
+  @Override
+  public long getFreeSize() {
+    long size = 0L;
+    for (CacheEngine engine : topology.getEngines()) {
+      size += engine.getFreeSize();
+    }
+    return size;
+  }
+
+  /**
+   * Returns aggregate occupied cache size across participating engines.
+   * @return aggregate occupied cache size
+   */
+  @Override
+  public long size() {
+    long size = 0L;
+    for (CacheEngine engine : topology.getEngines()) {
+      size += engine.size();
+    }
+    return size;
+  }
+
+  /**
+   * Returns aggregate occupied data-block size across participating engines.
+   * @return aggregate occupied data-block size
+   */
+  @Override
+  public long getCurrentDataSize() {
+    long size = 0L;
+    for (CacheEngine engine : topology.getEngines()) {
+      size += engine.getCurrentDataSize();
+    }
+    return size;
+  }
+
+  /**
+   * Returns aggregate cached block count across participating engines.
+   * @return aggregate cached block count
+   */
+  @Override
+  public long getBlockCount() {
+    long count = 0L;
+    for (CacheEngine engine : topology.getEngines()) {
+      count += engine.getBlockCount();
+    }
+    return count;
+  }
+
+  /**
+   * Returns aggregate cached data block count across participating engines.
+   * @return aggregate cached data block count
+   */
+  @Override
+  public long getDataBlockCount() {
+    long count = 0L;
+    for (CacheEngine engine : topology.getEngines()) {
+      count += engine.getDataBlockCount();
+    }
+    return count;
+  }
+
+  /**
+   * Checks whether the given block fits into at least one participating 
engine that supports this
+   * check.
+   * @param block block to check
+   * @return empty if no engine supports this check; otherwise whether at 
least one engine can fit
+   *         the block
+   */
+  @Override
+  public Optional<Boolean> blockFitsIntoTheCache(HFileBlock block) {
+    Objects.requireNonNull(block, "block must not be null");
+
+    boolean unsupported = true;
+    for (CacheEngine engine : topology.getEngines()) {
+      Optional<Boolean> result = engine.blockFitsIntoTheCache(block);
+      if (result.isPresent()) {
+        unsupported = false;
+        if (result.get()) {
+          return Optional.of(true);
+        }
+      }
+    }
+    return unsupported ? Optional.empty() : Optional.of(false);
+  }
+
+  /**
+   * Checks whether the block represented by the given key is present in any 
participating engine
+   * that supports this check.
+   * @param key block cache key
+   * @return empty if no engine supports this check; otherwise whether any 
engine has the block
+   */
+  @Override
+  public Optional<Boolean> isAlreadyCached(BlockCacheKey key) {
+    Objects.requireNonNull(key, "key must not be null");
+
+    boolean unsupported = true;
+    for (CacheEngine engine : topology.getEngines()) {
+      Optional<Boolean> result = engine.isAlreadyCached(key);
+      if (result.isPresent()) {
+        unsupported = false;
+        if (result.get()) {
+          return Optional.of(true);
+        }
+      }
+    }
+    return unsupported ? Optional.empty() : Optional.of(false);
+  }
+
+  /**
+   * Returns the first available cached block size reported by participating 
engines.
+   * <p>
+   * If the same block exists in multiple engines, this method returns the 
first present size in
+   * topology engine order. It does not sum duplicate copies.
+   * </p>
+   * @param key block cache key
+   * @return empty if unsupported or not present; otherwise cached block size
+   */
+  @Override
+  public Optional<Integer> getBlockSize(BlockCacheKey key) {
+    Objects.requireNonNull(key, "key must not be null");
+
+    for (CacheEngine engine : topology.getEngines()) {
+      Optional<Integer> size = engine.getBlockSize(key);
+      if (size.isPresent()) {
+        return size;
+      }
+    }
+    return Optional.empty();
+  }
+
+  /**
+   * Returns whether at least one participating engine is enabled.
+   * @return {@code true} if at least one engine is enabled, {@code false} 
otherwise
+   */
+  @Override
+  public boolean isCacheEnabled() {
+    for (CacheEngine engine : topology.getEngines()) {
+      if (engine.isCacheEnabled()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Waits for all participating engines to complete initialization.
+   * @param timeout maximum time to wait per engine
+   * @return {@code true} if all engines report ready/enabled, {@code false} 
otherwise
+   */
+  @Override
+  public boolean waitForCacheInitialization(long timeout) {
+    boolean enabled = true;
+    for (CacheEngine engine : topology.getEngines()) {
+      enabled &= engine.waitForCacheInitialization(timeout);
+    }
+    return enabled;
+  }
+
+  /**
+   * Propagates a configuration change to all participating engines.
+   * @param config new configuration
+   */
+  @Override
+  public void onConfigurationChange(Configuration config) {
+    Objects.requireNonNull(config, "config must not be null");
+    for (CacheEngine engine : topology.getEngines()) {
+      engine.onConfigurationChange(config);
+    }
+  }
+
+  private Cacheable getBlockFromEngine(CacheEngine engine, BlockCacheKey 
cacheKey,
+    CacheRequestContext context) {
+    Optional<BlockType> blockType = context.getBlockType();
+    if (blockType.isPresent()) {
+      return engine.getBlock(cacheKey, context.isCaching(), context.isRepeat(),
+        context.isUpdateCacheMetrics(), blockType.get());
+    }
+    return engine.getBlock(cacheKey, context.isCaching(), context.isRepeat(),
+      context.isUpdateCacheMetrics());
+  }
+
+  private void maybePromote(BlockCacheKey cacheKey, Cacheable block, CacheTier 
sourceTier,
+    CacheEngine sourceEngine, CacheRequestContext context) {
+    PromotionDecision decision =
+      policy.shouldPromote(cacheKey, block, sourceTier, context, topologyView);
+    if (decision == null || !decision.shouldPromote()) {

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to