This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d111363e68 Add support for cluster config change listeners for 
controller periodic tasks. (#16908)
5d111363e68 is described below

commit 5d111363e687660e77ecb01cb847305c39f34310
Author: 9aman <[email protected]>
AuthorDate: Tue Sep 30 18:01:32 2025 +0530

    Add support for cluster config change listeners for controller periodic 
tasks. (#16908)
    
    * Add support for cluster config change listeners for controller periodic 
tasks.
    This will allow dyanmic configurations changes for the periodic tasks that 
relied on controller restarts to pick changes till now
    
    * Fixing mvn spotless issues
    
    * Add integration test WIP
    
    * Add integration tests for change listener and orphan segments cleanup
    
    * Minor fix with the issue of boolean parsing returning false even in case 
of invalid values
---
 .../pinot/controller/BaseControllerStarter.java    |  15 ++
 .../core/periodictask/ControllerPeriodicTask.java  |   8 +-
 .../helix/core/retention/RetentionManager.java     |  84 ++++++++++-
 .../tests/RetentionManagerIntegrationTest.java     | 153 +++++++++++++++++++++
 4 files changed, 256 insertions(+), 4 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index f1de83f9181..d23404561a9 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -100,6 +100,7 @@ import 
org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
 import 
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
 import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
 import org.apache.pinot.controller.helix.core.minion.TaskMetricsEmitter;
+import 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
 import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
 import org.apache.pinot.controller.helix.core.realtime.SegmentCompletionConfig;
 import 
org.apache.pinot.controller.helix.core.realtime.SegmentCompletionManager;
@@ -394,6 +395,10 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
     return _realtimeSegmentValidationManager;
   }
 
+  public RetentionManager getRetentionManager() {
+    return _retentionManager;
+  }
+
   public BrokerResourceValidationManager getBrokerResourceValidationManager() {
     return _brokerResourceValidationManager;
   }
@@ -594,6 +599,16 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
 
     // Setting up periodic tasks
     List<PeriodicTask> controllerPeriodicTasks = 
setupControllerPeriodicTasks();
+
+    // Register ControllerPeriodicTasks as cluster config change listeners
+    LOGGER.info("Registering ControllerPeriodicTasks as cluster config change 
listeners");
+    for (PeriodicTask periodicTask : controllerPeriodicTasks) {
+      if (periodicTask instanceof ControllerPeriodicTask) {
+        ControllerPeriodicTask<?> controllerPeriodicTask = 
(ControllerPeriodicTask<?>) periodicTask;
+        
_clusterConfigChangeHandler.registerClusterConfigChangeListener(controllerPeriodicTask);
+        LOGGER.info("Registered {} as config change listener", 
controllerPeriodicTask.getTaskName());
+      }
+    }
     LOGGER.info("Init controller periodic tasks scheduler");
     _periodicTaskScheduler = new PeriodicTaskScheduler();
     _periodicTaskScheduler.init(controllerPeriodicTasks);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index 48d177f9544..e49266f429b 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Sets;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -33,6 +34,7 @@ import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.core.periodictask.BasePeriodicTask;
 import org.apache.pinot.core.periodictask.PeriodicTask;
+import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,7 +46,7 @@ import org.slf4j.LoggerFactory;
  * @param <C> the context type
  */
 @ThreadSafe
-public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask {
+public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask 
implements PinotClusterConfigChangeListener {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ControllerPeriodicTask.class);
   public static final String RUN_SEGMENT_LEVEL_VALIDATION = 
"runSegmentLevelValidation";
 
@@ -175,4 +177,8 @@ public abstract class ControllerPeriodicTask<C> extends 
BasePeriodicTask {
    */
   protected void nonLeaderCleanup(List<String> tableNamesWithType) {
   }
+
+  @Override
+  public void onChange(Set<String> changedConfigs, Map<String, String> 
clusterConfigs) {
+  }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index 3d1bd43e418..f46978b5cd1 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -79,12 +79,12 @@ public class RetentionManager extends 
ControllerPeriodicTask<Void> {
   public static final long OLD_LLC_SEGMENTS_RETENTION_IN_MILLIS = 
TimeUnit.DAYS.toMillis(5L);
   public static final int DEFAULT_UNTRACKED_SEGMENTS_DELETION_BATCH_SIZE = 100;
   private static final RetryPolicy DEFAULT_RETRY_POLICY = 
RetryPolicies.randomDelayRetryPolicy(20, 100L, 200L);
-  private final boolean _untrackedSegmentDeletionEnabled;
-  private final int _untrackedSegmentsRetentionTimeInDays;
+  private volatile boolean _untrackedSegmentDeletionEnabled;
+  private volatile int _untrackedSegmentsRetentionTimeInDays;
   private final int _agedSegmentsDeletionBatchSize;
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(RetentionManager.class);
-  private final boolean _isHybridTableRetentionStrategyEnabled;
+  private volatile boolean _isHybridTableRetentionStrategyEnabled;
   private final BrokerServiceHelper _brokerServiceHelper;
 
   public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager,
@@ -517,4 +517,82 @@ public class RetentionManager extends 
ControllerPeriodicTask<Void> {
     }
     return new TimeRetentionStrategy(TimeUnit.DAYS, 
_untrackedSegmentsRetentionTimeInDays);
   }
+
+  @Override
+  public void onChange(Set<String> changedConfigs, Map<String, String> 
clusterConfigs) {
+    if 
(changedConfigs.contains(ControllerConf.ControllerPeriodicTasksConf.ENABLE_UNTRACKED_SEGMENT_DELETION))
 {
+      updateUntrackedSegmentDeletionEnabled(
+          
clusterConfigs.get(ControllerConf.ControllerPeriodicTasksConf.ENABLE_UNTRACKED_SEGMENT_DELETION));
+    }
+
+    if 
(changedConfigs.contains(ControllerConf.ControllerPeriodicTasksConf.UNTRACKED_SEGMENTS_RETENTION_TIME_IN_DAYS))
 {
+      updateUntrackedSegmentsRetentionTimeInDays(
+          
clusterConfigs.get(ControllerConf.ControllerPeriodicTasksConf.UNTRACKED_SEGMENTS_RETENTION_TIME_IN_DAYS));
+    }
+
+    if 
(changedConfigs.contains(ControllerConf.ENABLE_HYBRID_TABLE_RETENTION_STRATEGY))
 {
+      updateHybridTableRetentionStrategyEnabled(
+          
clusterConfigs.get(ControllerConf.ENABLE_HYBRID_TABLE_RETENTION_STRATEGY));
+    }
+  }
+
+  private void updateUntrackedSegmentDeletionEnabled(String newValue) {
+    boolean oldValue = _untrackedSegmentDeletionEnabled;
+
+    // Validate that the value is a proper boolean string
+    if (!"true".equalsIgnoreCase(newValue) && 
!"false".equalsIgnoreCase(newValue)) {
+      LOGGER.warn("Invalid value for untrackedSegmentDeletionEnabled: {}, 
keeping current value: {}", newValue,
+          oldValue);
+      return;
+    }
+
+    boolean parsedValue = Boolean.parseBoolean(newValue);
+    if (oldValue == parsedValue) {
+      LOGGER.info("No change in untrackedSegmentDeletionEnabled, current 
value: {}", oldValue);
+    } else {
+      _untrackedSegmentDeletionEnabled = parsedValue;
+      LOGGER.info("Updated untrackedSegmentDeletionEnabled from {} to {}", 
oldValue, parsedValue);
+    }
+  }
+
+  private void updateUntrackedSegmentsRetentionTimeInDays(String newValue) {
+    int oldValue = _untrackedSegmentsRetentionTimeInDays;
+    try {
+      int parsedValue = Integer.parseInt(newValue);
+      if (parsedValue <= 0) {
+        LOGGER.warn(
+            "Invalid value for untrackedSegmentsRetentionTimeInDays: {}, must 
be positive, keeping current value: {}",
+            parsedValue, oldValue);
+      } else if (oldValue == parsedValue) {
+        LOGGER.info("No change in untrackedSegmentsRetentionTimeInDays, 
current value: {}", oldValue);
+      } else {
+        _untrackedSegmentsRetentionTimeInDays = parsedValue;
+        LOGGER.info("Updated untrackedSegmentsRetentionTimeInDays from {} to 
{}", oldValue, parsedValue);
+      }
+    } catch (NumberFormatException e) {
+      LOGGER.warn("Invalid value for untrackedSegmentsRetentionTimeInDays: {}, 
keeping current value: {}", newValue,
+          oldValue);
+    }
+  }
+
+  private void updateHybridTableRetentionStrategyEnabled(String newValue) {
+    boolean oldValue = _isHybridTableRetentionStrategyEnabled;
+    try {
+      boolean parsedValue = Boolean.parseBoolean(newValue);
+      if (oldValue == parsedValue) {
+        LOGGER.info("No change in isHybridTableRetentionStrategyEnabled, 
current value: {}", oldValue);
+      } else {
+        _isHybridTableRetentionStrategyEnabled = parsedValue;
+        LOGGER.info("Updated isHybridTableRetentionStrategyEnabled from {} to 
{}", oldValue, parsedValue);
+      }
+    } catch (Exception e) {
+      LOGGER.warn("Invalid value for isHybridTableRetentionStrategyEnabled: 
{}, keeping current value: {}", newValue,
+          oldValue);
+    }
+  }
+
+  @VisibleForTesting
+  public boolean isUntrackedSegmentDeletionEnabled() {
+    return _untrackedSegmentDeletionEnabled;
+  }
 }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RetentionManagerIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RetentionManagerIntegrationTest.java
new file mode 100644
index 00000000000..d1b229dbb15
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RetentionManagerIntegrationTest.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.utils.URIUtils;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.server.starter.helix.HelixInstanceDataManagerConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.util.TestUtils;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class RetentionManagerIntegrationTest extends 
BaseClusterIntegrationTest {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RetentionManagerIntegrationTest.class);
+
+  protected List<File> _avroFiles;
+
+  @Override
+  protected void overrideControllerConf(Map<String, Object> properties) {
+    
properties.put(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_MANAGER_SCHEDULER_ENABLED,
 true);
+    
properties.put(ControllerConf.ControllerPeriodicTasksConf.ENABLE_DEEP_STORE_RETRY_UPLOAD_LLC_SEGMENT,
 true);
+    
properties.put(ControllerConf.ControllerPeriodicTasksConf.RETENTION_MANAGER_INITIAL_DELAY_IN_SECONDS,
 5000);
+  }
+
+  @Override
+  protected void overrideServerConf(PinotConfiguration serverConf) {
+    try {
+      LOGGER.info("Set segment.store.uri: {} for server with scheme: {}", 
_controllerConfig.getDataDir(),
+          new URI(_controllerConfig.getDataDir()).getScheme());
+      serverConf.setProperty("pinot.server.instance.segment.store.uri", 
"file:" + _controllerConfig.getDataDir());
+      serverConf.setProperty("pinot.server.instance." + 
HelixInstanceDataManagerConfig.UPLOAD_SEGMENT_TO_DEEP_STORE,
+          "true");
+    } catch (URISyntaxException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+    startZk();
+    startController();
+    startBroker();
+    startServer();
+    setupTable();
+    waitForAllDocsLoaded(600_000L);
+  }
+
+  protected void setupTable()
+      throws Exception {
+    _avroFiles = unpackAvroData(_tempDir);
+    startKafka();
+    pushAvroIntoKafka(_avroFiles);
+
+    Schema schema = createSchema();
+    addSchema(schema);
+
+    TableConfig tableConfig = createRealtimeTableConfig(_avroFiles.get(0));
+    tableConfig.getValidationConfig().setRetentionTimeUnit("DAYS");
+    tableConfig.getValidationConfig().setRetentionTimeValue("2");
+    addTableConfig(tableConfig);
+
+    waitForDocsLoaded(600_000L, true, tableConfig.getTableName());
+  }
+
+  @Test
+  public void testClusterConfigChangeListener()
+      throws IOException, URISyntaxException {
+    // Disable orphan segment deletion to ensure orphan segments are not 
cleaned up
+    
updateClusterConfig(Map.of(ControllerConf.ControllerPeriodicTasksConf.ENABLE_UNTRACKED_SEGMENT_DELETION,
 "false"));
+
+    createFakeSegments(_controllerConfig.getDataDir(), getTableName(), 
"orphan_segment", 4);
+
+    _controllerStarter.getRetentionManager().run();
+
+    PinotFS pinotFS = 
PinotFSFactory.create(URIUtils.getUri(_controllerConfig.getDataDir()).getScheme());
+
+    // Verify that 6 segments remain: 2 CONSUMING segments + 4 orphan segments
+    TestUtils.waitForCondition((aVoid) -> {
+      try {
+        String[] fileList = pinotFS.listFiles(new 
URI(_controllerConfig.getDataDir() + "/" + getTableName()), false);
+        return fileList.length == 6;
+      } catch (IOException | URISyntaxException e) {
+        throw new RuntimeException(e);
+      }
+    }, 5000, 10000, "Expected 6 segments (2 CONSUMING + 4 orphan) but found 
different count");
+
+    // Enable orphan segment deletion
+    
updateClusterConfig(Map.of(ControllerConf.ControllerPeriodicTasksConf.ENABLE_UNTRACKED_SEGMENT_DELETION,
 "true"));
+
+    // Wait for the config change to take effect
+    TestUtils.waitForCondition((aVoid) -> 
_controllerStarter.getRetentionManager().isUntrackedSegmentDeletionEnabled(),
+        1000, 10000, "UntrackedSegmentDeletionEnabled is still false. Should 
have been set to true");
+
+    _controllerStarter.getRetentionManager().run();
+
+    // Verify that only 2 CONSUMING segments remain after orphan segment 
cleanup
+    TestUtils.waitForCondition((aVoid) -> {
+      try {
+        String[] fileList = pinotFS.listFiles(new 
URI(_controllerConfig.getDataDir() + "/" + getTableName()), false);
+        return fileList.length == 2;
+      } catch (IOException | URISyntaxException e) {
+        throw new RuntimeException(e);
+      }
+    }, 5000, 100000, "Expected 2 CONSUMING segments after orphan cleanup but 
found different count");
+  }
+
+  private void createFakeSegments(String dataDir, String tableName, String 
orphanSegmentPrefix,
+      int numberOfOrphanSegment)
+      throws URISyntaxException, IOException {
+    PinotFS pinotFS = 
PinotFSFactory.create(URIUtils.getUri(dataDir).getScheme());
+    for (int i = 0; i < numberOfOrphanSegment; i++) {
+      String segmentPath = dataDir + "/" + tableName + "/" + 
orphanSegmentPrefix + "_" + i;
+      pinotFS.touch(new URI(segmentPath));
+      File file = new File(segmentPath);
+      if (!file.setLastModified(DateTime.now().minusDays(100).getMillis())) {
+        LOGGER.warn("Failed to set last modified time for file: {}", 
segmentPath);
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to