Copilot commented on code in PR #17159:
URL: https://github.com/apache/pinot/pull/17159#discussion_r2507807208
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ServerReloadJobStatusCache.java:
##########
@@ -92,4 +102,76 @@ public ReloadJobStatus getOrCreate(String jobId) {
}
return status;
}
+
+ /**
+ * Rebuilds the cache with new configuration and migrates existing entries.
+ * This method is synchronized to prevent concurrent rebuilds.
+ *
+ * @param newConfig new cache configuration to apply
+ */
+ private synchronized void rebuildCache(ServerReloadJobStatusCacheConfig
newConfig) {
+ LOG.info("Rebuilding reload status cache with new config: {}", newConfig);
+
+ // Create new cache with new configuration
+ Cache<String, ReloadJobStatus> newCache = CacheBuilder.newBuilder()
+ .maximumSize(newConfig.getMaxSize())
+ .expireAfterWrite(newConfig.getTtlDays(), TimeUnit.DAYS)
+ .recordStats()
+ .build();
+
+ // Migrate existing entries from old cache to new cache
+ Cache<String, ReloadJobStatus> oldCache = _cache;
+ if (oldCache != null) {
+ for (Map.Entry<String, ReloadJobStatus> entry :
oldCache.asMap().entrySet()) {
+ newCache.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ // Atomically swap caches (volatile field ensures safe publication)
+ _cache = newCache;
+ _currentConfig = newConfig;
+
+ LOG.info("Successfully rebuilt reload status cache (size: {})",
newCache.size());
+ }
+
+ /**
+ * Maps cluster configuration properties with a common prefix to a config
POJO using Jackson.
+ * Uses PinotConfiguration.subset() to extract properties with the given
prefix and
+ * Jackson's convertValue() for automatic object mapping.
+ *
+ * @param clusterConfigs map of all cluster configs from ZooKeeper
+ * @param configPrefix prefix to filter configs (e.g.,
"pinot.server.table.reload.status.cache")
+ * @return ServerReloadJobStatusCacheConfig with values from cluster config,
defaults for missing values
+ */
+ @VisibleForTesting
+ static ServerReloadJobStatusCacheConfig buildFromClusterConfig(Map<String,
String> clusterConfigs,
+ String configPrefix) {
+ final MapConfiguration mapConfig = new MapConfiguration(clusterConfigs);
+ final PinotConfiguration subsetConfig = new
PinotConfiguration(mapConfig).subset(configPrefix);
+ return OBJECT_MAPPER.convertValue(subsetConfig.toMap(),
ServerReloadJobStatusCacheConfig.class);
+ }
+
+ @VisibleForTesting
+ public synchronized ServerReloadJobStatusCacheConfig getCurrentConfig() {
Review Comment:
The synchronized modifier is unnecessary for this getter method since
`_currentConfig` is already declared as volatile. Volatile provides sufficient
visibility guarantees for reading the reference. The synchronization only adds
overhead without providing additional safety.
```suggestion
public ServerReloadJobStatusCacheConfig getCurrentConfig() {
```
##########
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/ServerReloadJobStatusCacheTest.java:
##########
@@ -0,0 +1,361 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.testng.annotations.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+/**
+ * Unit tests for ServerReloadJobStatusCache to verify correct config injection
+ * when onChange is called, cache rebuild logic, and entry migration.
+ */
+public class ServerReloadJobStatusCacheTest {
+
+ @Test
+ public void testDefaultConfigInitialization() {
+ // Given
+ ServerReloadJobStatusCache cache = new ServerReloadJobStatusCache();
+
+ // Then
+ ServerReloadJobStatusCacheConfig config = cache.getCurrentConfig();
+ assertThat(config).isNotNull();
+ assertThat(config.getMaxSize()).isEqualTo(10000);
+ assertThat(config.getTtlDays()).isEqualTo(30);
+ }
+
+ @Test
+ public void testOnChangeWithFullConfig() {
+ // Given
+ Map<String, String> properties = new HashMap<>();
+ properties.put("pinot.server.table.reload.status.cache.size.max", "5000");
+ properties.put("pinot.server.table.reload.status.cache.ttl.days", "15");
+ properties.put("some.other.config", "value");
+
+ ServerReloadJobStatusCache cache = new ServerReloadJobStatusCache();
+
+ // When
+ cache.onChange(properties.keySet(), properties);
+
+ // Then
+ ServerReloadJobStatusCacheConfig config = cache.getCurrentConfig();
+ assertThat(config.getMaxSize()).isEqualTo(5000);
+ assertThat(config.getTtlDays()).isEqualTo(15);
+ }
+
+ @Test
+ public void testOnChangeWithPartialConfig() {
+ // Given
+ Map<String, String> properties = new HashMap<>();
+ properties.put("pinot.server.table.reload.status.cache.size.max", "7500");
+ properties.put("some.other.config", "value");
+
+ ServerReloadJobStatusCache cache = new ServerReloadJobStatusCache();
+
+ // When
+ cache.onChange(properties.keySet(), properties);
+
+ // Then
+ ServerReloadJobStatusCacheConfig config = cache.getCurrentConfig();
+ assertThat(config.getMaxSize()).isEqualTo(7500);
+ // Verify default for unspecified config
+ assertThat(config.getTtlDays()).isEqualTo(30);
+ }
+
+ @Test
+ public void testOnChangeWithNoRelevantConfigs() {
+ // Given
+ Map<String, String> properties = new HashMap<>();
+ properties.put("some.other.config", "value");
+ properties.put("another.config", "123");
+
+ ServerReloadJobStatusCache cache = new ServerReloadJobStatusCache();
+
+ // When
+ cache.onChange(properties.keySet(), properties);
+
+ // Then - Should keep defaults
+ ServerReloadJobStatusCacheConfig config = cache.getCurrentConfig();
+ assertThat(config.getMaxSize()).isEqualTo(10000);
+ assertThat(config.getTtlDays()).isEqualTo(30);
+ }
+
+ @Test
+ public void testOnChangeWithInvalidValues() {
+ // Given
+ Map<String, String> properties = new HashMap<>();
+ properties.put("pinot.server.table.reload.status.cache.size.max",
"invalid");
+
+ ServerReloadJobStatusCache cache = new ServerReloadJobStatusCache();
+ ServerReloadJobStatusCacheConfig oldConfig = cache.getCurrentConfig();
+
+ // When - Invalid config should keep old cache
+ cache.onChange(properties.keySet(), properties);
+
+ // Then - Should keep old config due to error handling
+ ServerReloadJobStatusCacheConfig config = cache.getCurrentConfig();
+ assertThat(config).isSameAs(oldConfig);
+ assertThat(config.getMaxSize()).isEqualTo(10000);
+ }
+
+ @Test
+ public void testConfigUpdateOverwritesPrevious() {
+ // Given
+ ServerReloadJobStatusCache cache = new ServerReloadJobStatusCache();
+
+ // Set initial config
+ Map<String, String> initialProperties = new HashMap<>();
+ initialProperties.put("pinot.server.table.reload.status.cache.size.max",
"8000");
+ initialProperties.put("pinot.server.table.reload.status.cache.ttl.days",
"20");
+ cache.onChange(initialProperties.keySet(), initialProperties);
+ assertThat(cache.getCurrentConfig().getMaxSize()).isEqualTo(8000);
+ assertThat(cache.getCurrentConfig().getTtlDays()).isEqualTo(20);
+
+ // When - Update with new config
+ Map<String, String> updatedProperties = new HashMap<>();
+ updatedProperties.put("pinot.server.table.reload.status.cache.size.max",
"12000");
+ updatedProperties.put("pinot.server.table.reload.status.cache.ttl.days",
"45");
+ cache.onChange(updatedProperties.keySet(), updatedProperties);
+
+ // Then
+ assertThat(cache.getCurrentConfig().getMaxSize()).isEqualTo(12000);
+ assertThat(cache.getCurrentConfig().getTtlDays()).isEqualTo(45);
+ }
+
+ @Test
+ public void testZookeeperConfigDeletionRevertsToDefaults() {
+ // Given
+ ServerReloadJobStatusCache cache = new ServerReloadJobStatusCache();
+
+ // Set initial custom configs
+ Map<String, String> customProperties = new HashMap<>();
+ customProperties.put("pinot.server.table.reload.status.cache.size.max",
"15000");
+ customProperties.put("pinot.server.table.reload.status.cache.ttl.days",
"60");
+ cache.onChange(customProperties.keySet(), customProperties);
+
+ // Verify custom configs are applied
+ assertThat(cache.getCurrentConfig().getMaxSize()).isEqualTo(15000);
+ assertThat(cache.getCurrentConfig().getTtlDays()).isEqualTo(60);
+
+ // When - Simulate ZooKeeper config deletion with empty map
+ Map<String, String> emptyProperties = new HashMap<>();
+ cache.onChange(customProperties.keySet(), emptyProperties);
+
+ // Then - Verify all configs revert to defaults
+ assertThat(cache.getCurrentConfig().getMaxSize()).isEqualTo(10000);
+ assertThat(cache.getCurrentConfig().getTtlDays()).isEqualTo(30);
+ }
+
+ @Test
+ public void testBuildFromClusterConfigDirectly() {
+ // Given
+ Map<String, String> properties = new HashMap<>();
+ properties.put("pinot.server.table.reload.status.cache.size.max", "6000");
+ properties.put("pinot.server.table.reload.status.cache.ttl.days", "25");
+ properties.put("some.other.config", "value");
+
+ // When
+ ServerReloadJobStatusCacheConfig config =
+ ServerReloadJobStatusCache.buildFromClusterConfig(properties,
ServerReloadJobStatusCache.CONFIG_PREFIX);
+
+ // Then
+ assertThat(config.getMaxSize()).isEqualTo(6000);
+ assertThat(config.getTtlDays()).isEqualTo(25);
+ }
+
+ @Test
+ public void testCacheEntryMigrationOnRebuild() {
+ // Given
+ ServerReloadJobStatusCache cache = new ServerReloadJobStatusCache();
+
+ // Add some entries to cache
+ ReloadJobStatus status1 = cache.getOrCreate("job-1");
+ status1.incrementAndGetFailureCount();
+ ReloadJobStatus status2 = cache.getOrCreate("job-2");
+ status2.incrementAndGetFailureCount();
+ status2.incrementAndGetFailureCount();
+
+ // Verify initial state
+ assertThat(cache.getJobStatus("job-1").getFailureCount()).isEqualTo(1);
+ assertThat(cache.getJobStatus("job-2").getFailureCount()).isEqualTo(2);
+
+ // When - Trigger cache rebuild with config change
+ Map<String, String> properties = new HashMap<>();
+ properties.put("pinot.server.table.reload.status.cache.size.max", "5000");
+ properties.put("pinot.server.table.reload.status.cache.ttl.days", "15");
+ cache.onChange(properties.keySet(), properties);
+
+ // Then - Entries should be migrated to new cache
+ assertThat(cache.getJobStatus("job-1")).isNotNull();
+ assertThat(cache.getJobStatus("job-1").getFailureCount()).isEqualTo(1);
+ assertThat(cache.getJobStatus("job-2")).isNotNull();
+ assertThat(cache.getJobStatus("job-2").getFailureCount()).isEqualTo(2);
+
+ // Verify new config is applied
+ assertThat(cache.getCurrentConfig().getMaxSize()).isEqualTo(5000);
+ assertThat(cache.getCurrentConfig().getTtlDays()).isEqualTo(15);
+ }
+
+ @Test
+ public void testConcurrentAccessDuringRebuild() throws InterruptedException {
+ // Given
+ ServerReloadJobStatusCache cache = new ServerReloadJobStatusCache();
+
+ // Add entries to cache
+ for (int i = 0; i < 10; i++) {
+ ReloadJobStatus status = cache.getOrCreate("job-" + i);
+ status.incrementAndGetFailureCount();
+ }
+
+ // When - Concurrent rebuild and access
+ int numThreads = 5;
+ CountDownLatch latch = new CountDownLatch(numThreads);
+ AtomicInteger successCount = new AtomicInteger(0);
+
+ for (int i = 0; i < numThreads; i++) {
+ int threadId = i;
+ new Thread(() -> {
Review Comment:
Unnamed threads created with `new Thread()` should be avoided in tests. Use
`Executors.newFixedThreadPool()` or assign meaningful thread names for better
debugging and thread dumps. This is especially important for diagnosing test
failures in concurrent scenarios.
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ServerReloadJobStatusCache.java:
##########
@@ -92,4 +102,76 @@ public ReloadJobStatus getOrCreate(String jobId) {
}
return status;
}
+
+ /**
+ * Rebuilds the cache with new configuration and migrates existing entries.
+ * This method is synchronized to prevent concurrent rebuilds.
+ *
+ * @param newConfig new cache configuration to apply
+ */
+ private synchronized void rebuildCache(ServerReloadJobStatusCacheConfig
newConfig) {
Review Comment:
The method should document thread-safety guarantees and potential blocking
behavior. Consider adding a Javadoc note that this method blocks other rebuild
attempts during execution and that cache read operations (getJobStatus,
getOrCreate) remain available during rebuild.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]