xiangfu0 commented on code in PR #17159:
URL: https://github.com/apache/pinot/pull/17159#discussion_r2512454077


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ServerReloadJobStatusCache.java:
##########
@@ -92,4 +102,76 @@ public ReloadJobStatus getOrCreate(String jobId) {
     }
     return status;
   }
+
+  /**
+   * Rebuilds the cache with new configuration and migrates existing entries.
+   * This method is synchronized to prevent concurrent rebuilds.
+   *
+   * @param newConfig new cache configuration to apply
+   */
+  private synchronized void rebuildCache(ServerReloadJobStatusCacheConfig 
newConfig) {
+    LOG.info("Rebuilding reload status cache with new config: {}", newConfig);
+
+    // Create new cache with new configuration
+    Cache<String, ReloadJobStatus> newCache = CacheBuilder.newBuilder()
+        .maximumSize(newConfig.getMaxSize())
+        .expireAfterWrite(newConfig.getTtlDays(), TimeUnit.DAYS)
+        .recordStats()
+        .build();
+
+    // Migrate existing entries from old cache to new cache
+    Cache<String, ReloadJobStatus> oldCache = _cache;
+    if (oldCache != null) {
+      for (Map.Entry<String, ReloadJobStatus> entry : 
oldCache.asMap().entrySet()) {

Review Comment:
   newCache.putAll(oldCache.asMap())



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ServerReloadJobStatusCache.java:
##########
@@ -92,4 +102,76 @@ public ReloadJobStatus getOrCreate(String jobId) {
     }
     return status;
   }
+
+  /**
+   * Rebuilds the cache with new configuration and migrates existing entries.
+   * This method is synchronized to prevent concurrent rebuilds.
+   *
+   * @param newConfig new cache configuration to apply
+   */
+  private synchronized void rebuildCache(ServerReloadJobStatusCacheConfig 
newConfig) {
+    LOG.info("Rebuilding reload status cache with new config: {}", newConfig);
+
+    // Create new cache with new configuration
+    Cache<String, ReloadJobStatus> newCache = CacheBuilder.newBuilder()
+        .maximumSize(newConfig.getMaxSize())
+        .expireAfterWrite(newConfig.getTtlDays(), TimeUnit.DAYS)
+        .recordStats()
+        .build();
+
+    // Migrate existing entries from old cache to new cache
+    Cache<String, ReloadJobStatus> oldCache = _cache;
+    if (oldCache != null) {
+      for (Map.Entry<String, ReloadJobStatus> entry : 
oldCache.asMap().entrySet()) {
+        newCache.put(entry.getKey(), entry.getValue());
+      }
+    }
+
+    // Atomically swap caches (volatile field ensures safe publication)
+    _cache = newCache;
+    _currentConfig = newConfig;
+
+    LOG.info("Successfully rebuilt reload status cache (size: {})", 
newCache.size());
+  }
+
+  /**
+   * Maps cluster configuration properties with a common prefix to a config 
POJO using Jackson.
+   * Uses PinotConfiguration.subset() to extract properties with the given 
prefix and
+   * Jackson's convertValue() for automatic object mapping.
+   *
+   * @param clusterConfigs map of all cluster configs from ZooKeeper
+   * @param configPrefix prefix to filter configs (e.g., 
"pinot.server.table.reload.status.cache")
+   * @return ServerReloadJobStatusCacheConfig with values from cluster config, 
defaults for missing values
+   */
+  @VisibleForTesting
+  static ServerReloadJobStatusCacheConfig buildFromClusterConfig(Map<String, 
String> clusterConfigs,
+      String configPrefix) {
+    final MapConfiguration mapConfig = new MapConfiguration(clusterConfigs);
+    final PinotConfiguration subsetConfig = new 
PinotConfiguration(mapConfig).subset(configPrefix);
+    return OBJECT_MAPPER.convertValue(subsetConfig.toMap(), 
ServerReloadJobStatusCacheConfig.class);
+  }
+
+  @VisibleForTesting
+  public ServerReloadJobStatusCacheConfig getCurrentConfig() {
+    return _currentConfig;
+  }
+
+  @Override
+  public void onChange(Set<String> changedConfigs, Map<String, String> 
clusterConfigs) {
+    boolean hasRelevantChanges = changedConfigs.stream()
+        .anyMatch(key -> key.startsWith(CONFIG_PREFIX));
+
+    if (!hasRelevantChanges) {
+      LOG.info("No reload cache config changes detected, skipping rebuild");
+      return;
+    }
+
+    try {
+      ServerReloadJobStatusCacheConfig newConfig = 
buildFromClusterConfig(clusterConfigs, CONFIG_PREFIX);

Review Comment:
   I would suggest to compare the new and old config anyway



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ServerReloadJobStatusCache.java:
##########
@@ -35,26 +42,29 @@
  *
  * <p>Thread-safe for concurrent access. Uses Guava Cache with LRU eviction
  * and time-based expiration.
+ *
+ * <p>Implements PinotClusterConfigChangeListener to support dynamic 
configuration
+ * updates from ZooKeeper cluster config. When config changes, cache is rebuilt
+ * with new settings and existing entries are migrated.
  */
 @ThreadSafe
-public class ServerReloadJobStatusCache {
+public class ServerReloadJobStatusCache implements 
PinotClusterConfigChangeListener {
   private static final Logger LOG = 
LoggerFactory.getLogger(ServerReloadJobStatusCache.class);
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  static final String CONFIG_PREFIX = "pinot.server.table.reload.status.cache";

Review Comment:
   shall we put this into CommonConstants?



##########
pinot-segment-local/pom.xml:
##########
@@ -99,6 +99,11 @@
       <artifactId>mockito-core</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.assertj</groupId>

Review Comment:
   let's not add this dependency, use existing testng



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to