This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d2fa9dd097 Implement builtin-purge task (#8589)
d2fa9dd097 is described below

commit d2fa9dd097f5c26c20da46b5ac66da49f302d0ff
Author: Airliquide76 <[email protected]>
AuthorDate: Tue May 31 05:05:22 2022 +0200

    Implement builtin-purge task (#8589)
    
    * Implement builtin-purge task
    
    * Add todo to purge lastly purged segment first
    
    * Add segment purging order
    
    * add timeIntervalCheck for purge task
    
    * add integration test
    
    Co-authored-by: francois autaa <[email protected]>
---
 .../apache/pinot/core/common/MinionConstants.java  |   2 +
 .../tests/PurgeMinionClusterIntegrationTest.java   | 357 +++++++++++++++++++++
 .../minion/tasks/purge/PurgeTaskGenerator.java     | 153 +++++++++
 3 files changed, 512 insertions(+)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java 
b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index e0560248ad..c4007ee561 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -67,6 +67,8 @@ public class MinionConstants {
   // Purges rows inside segment that match chosen criteria
   public static class PurgeTask {
     public static final String TASK_TYPE = "PurgeTask";
+    public static final String LAST_PURGE_TIME_THREESOLD_PERIOD = 
"lastPurgeTimeThresholdPeriod";
+    public static final String DEFAULT_LAST_PURGE_TIME_THRESHOLD_PERIOD = 
"14d";
   }
 
   // Common config keys for segment merge tasks.
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
new file mode 100644
index 0000000000..d44b5711db
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
@@ -0,0 +1,357 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.lineage.SegmentLineageAccessHelper;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.minion.MinionTaskMetadataUtils;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import 
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
+import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.minion.MinionContext;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+/**
+ * Integration test for minion task of type "PurgeTask"
+ */
+public class PurgeMinionClusterIntegrationTest extends 
BaseClusterIntegrationTest {
+  private static final String PURGE_FIRST_RUN_TABLE = "myTable1";
+  private static final String PURGE_DELTA_PASSED_TABLE = "myTable2";
+  private static final String PURGE_DELTA_NOT_PASSED_TABLE = "myTable3";
+
+  protected PinotHelixTaskResourceManager _helixTaskResourceManager;
+  protected PinotTaskManager _taskManager;
+  protected PinotHelixResourceManager _pinotHelixResourceManager;
+  protected String _tableName;
+
+  protected final File _segmentDir1 = new File(_tempDir, "segmentDir1");
+  protected final File _segmentDir2 = new File(_tempDir, "segmentDir2");
+  protected final File _segmentDir3 = new File(_tempDir, "segmentDir3");
+
+  protected final File _tarDir1 = new File(_tempDir, "tarDir1");
+  protected final File _tarDir2 = new File(_tempDir, "tarDir2");
+  protected final File _tarDir3 = new File(_tempDir, "tarDir3");
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir1, _tarDir1,
+        _segmentDir2, _tarDir2, _segmentDir3, _tarDir3);
+
+    // Start the Pinot cluster
+    startZk();
+    startController();
+    startBrokers(1);
+    startServers(1);
+
+    // Create and upload the schema and table config
+    Schema schema = createSchema();
+    addSchema(schema);
+    setTableName(PURGE_DELTA_NOT_PASSED_TABLE);
+    TableConfig purgeDeltaNotPassedTableConfig = createOfflineTableConfig();
+    purgeDeltaNotPassedTableConfig.setTaskConfig(getPurgeTaskConfig());
+    setTableName(PURGE_FIRST_RUN_TABLE);
+    TableConfig purgeTableConfig = createOfflineTableConfig();
+    purgeTableConfig.setTaskConfig(getPurgeTaskConfig());
+
+    setTableName(PURGE_DELTA_PASSED_TABLE);
+    TableConfig purgeDeltaPassedTableConfig = createOfflineTableConfig();
+    purgeDeltaPassedTableConfig.setTaskConfig(getPurgeTaskConfig());
+
+    addTableConfig(purgeTableConfig);
+    addTableConfig(purgeDeltaPassedTableConfig);
+    addTableConfig(purgeDeltaNotPassedTableConfig);
+
+    // Unpack the Avro files
+    List<File> avroFiles = unpackAvroData(_tempDir);
+
+    // Create and upload segments
+    ClusterIntegrationTestUtils
+        .buildSegmentsFromAvro(avroFiles, purgeTableConfig, schema, 0, 
_segmentDir1, _tarDir1);
+    ClusterIntegrationTestUtils
+        .buildSegmentsFromAvro(avroFiles, purgeDeltaPassedTableConfig,
+            schema, 0, _segmentDir2, _tarDir2);
+    ClusterIntegrationTestUtils
+        .buildSegmentsFromAvro(avroFiles, purgeDeltaNotPassedTableConfig,
+            schema, 0, _segmentDir3, _tarDir3);
+
+    uploadSegments(PURGE_FIRST_RUN_TABLE, _tarDir1);
+    uploadSegments(PURGE_DELTA_PASSED_TABLE, _tarDir2);
+    uploadSegments(PURGE_DELTA_NOT_PASSED_TABLE, _tarDir3);
+
+    // Initialize the query generator
+    setUpQueryGenerator(avroFiles);
+    startMinion();
+    setRecordPurger();
+    _helixTaskResourceManager = 
_controllerStarter.getHelixTaskResourceManager();
+    _taskManager = _controllerStarter.getTaskManager();
+    _pinotHelixResourceManager = _controllerStarter.getHelixResourceManager();
+
+    //set up metadata on segment to check how code handle passed and not 
passed delay
+    String tablenameOfflineNotPassed = 
TableNameBuilder.OFFLINE.tableNameWithType(PURGE_DELTA_NOT_PASSED_TABLE);
+    String tablenameOfflinePassed = 
TableNameBuilder.OFFLINE.tableNameWithType(PURGE_DELTA_PASSED_TABLE);
+
+    //set up passed delay
+    List<SegmentZKMetadata> segmentsZKMetadataDeltaPassed = 
_pinotHelixResourceManager
+        .getSegmentsZKMetadata(tablenameOfflinePassed);
+
+    Map<String, String> customSegmentMetadataPassed = new HashMap<>();
+    customSegmentMetadataPassed.put(MinionConstants.PurgeTask.TASK_TYPE
+        + MinionConstants.TASK_TIME_SUFFIX, 
String.valueOf(System.currentTimeMillis() - 88400000));
+
+    for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadataDeltaPassed) {
+      segmentZKMetadata.setCustomMap(customSegmentMetadataPassed);
+      _pinotHelixResourceManager.updateZkMetadata(tablenameOfflinePassed, 
segmentZKMetadata);
+    }
+    //set up not passed delay
+    List<SegmentZKMetadata> segmentsZKMetadataDeltaNotPassed = 
_pinotHelixResourceManager
+        .getSegmentsZKMetadata(tablenameOfflineNotPassed);
+    Map<String, String> customSegmentMetadataNotPassed = new HashMap<>();
+    customSegmentMetadataNotPassed.put(MinionConstants.PurgeTask.TASK_TYPE
+        + MinionConstants.TASK_TIME_SUFFIX, 
String.valueOf(System.currentTimeMillis() - 4000));
+    for (SegmentZKMetadata segmentZKMetadata : 
segmentsZKMetadataDeltaNotPassed) {
+      segmentZKMetadata.setCustomMap(customSegmentMetadataNotPassed);
+      _pinotHelixResourceManager.updateZkMetadata(tablenameOfflineNotPassed, 
segmentZKMetadata);
+    }
+  }
+
+  private void setRecordPurger() {
+    MinionContext minionContext = MinionContext.getInstance();
+    minionContext.setRecordPurgerFactory(rawTableName -> {
+      List<String> tableNames = Arrays.asList(PURGE_FIRST_RUN_TABLE,
+          PURGE_DELTA_PASSED_TABLE, PURGE_DELTA_NOT_PASSED_TABLE);
+      if (tableNames.contains(rawTableName)) {
+        return row -> row.getValue("Quarter").equals(1);
+      } else {
+        return null;
+      }
+    });
+  }
+
+  @Override
+  public String getTableName() {
+    return _tableName;
+  }
+
+  public void setTableName(String tableName) {
+    _tableName = tableName;
+  }
+
+  private TableTaskConfig getPurgeTaskConfig() {
+    Map<String, String> tableTaskConfigs = new HashMap<>();
+    
tableTaskConfigs.put(MinionConstants.PurgeTask.LAST_PURGE_TIME_THREESOLD_PERIOD,
 "1d");
+    return new 
TableTaskConfig(Collections.singletonMap(MinionConstants.PurgeTask.TASK_TYPE, 
tableTaskConfigs));
+  }
+
+  /**
+   * Test purge with no metadata on the segments (checking null safe 
implementation)
+   */
+  @Test
+  public void testFirstRunPurge()
+      throws Exception {
+    // Expected purge task generation :
+    // 1. No previous purge run so all segment should be processed and purge 
metadata sould be added to the segments
+    // 2. Check that we cannot run on same time two purge generation ensuring 
running segment will be skipped
+    // 3. Check segment metadata to ensure purgeTime is updated into the 
metadata
+    // 4. Check after the first run of the purge if we rerun a purge task 
generation no task should be scheduled
+    // 5. Check the purge process itself by setting an expecting number of row
+
+    String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(PURGE_FIRST_RUN_TABLE);
+
+    
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE);
+    assertTrue(_helixTaskResourceManager.getTaskQueues()
+        
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.PurgeTask.TASK_TYPE)));
+      // Will not schedule task if there's incomplete task
+    assertNull(
+        
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+    waitForTaskToComplete();
+
+    // check that metadat
+    for (SegmentZKMetadata metadata : 
_pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) {
+      // Check purgeTimeIn
+      
assertTrue(metadata.getCustomMap().containsKey(MinionConstants.PurgeTask.TASK_TYPE
+          + MinionConstants.TASK_TIME_SUFFIX));
+    }
+    // should not reload a new purge as the last time purge is not greater 
than last + 1day (default purge delay)
+    assertNull(
+        
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+
+    // 28057 Rows with quarter = 1
+    // 115545 Totals Rows
+    // Expecting 87488 to the final time
+    String sqlQuery = "SELECT count(*) FROM " + PURGE_FIRST_RUN_TABLE;
+    JsonNode expectedJson = postQuery(sqlQuery, _brokerBaseApiUrl);
+    assertTrue(expectedJson.toString().contains("\"rows\":[[87488]]"));
+
+    // Drop the table
+    dropOfflineTable(PURGE_FIRST_RUN_TABLE);
+
+    // Check if the task metadata is cleaned up on table deletion
+    verifyTableDelete(offlineTableName);
+  }
+
+  /**
+   * Test purge with passed delay
+   */
+  @Test
+  public void testPassedDelayTimePurge()
+      throws Exception {
+    // Expected purge task generation :
+    // 1. The purge time on this test is greater than the threshold expected 
(863660000 > 1d (86400000) )
+    // 2. Check that we cannot run on same time two purge generation ensuring 
running segment will be skipped
+    // 3. Check segment metadata to ensure purgeTime is updated into the 
metadata
+    // 4. Check after the first run of the purge if we rerun a purge task 
generation no task should be scheduled
+    // 5. Check the purge process itself by setting an expecting number of row
+
+    String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(PURGE_DELTA_PASSED_TABLE);
+    
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE);
+    assertTrue(_helixTaskResourceManager.getTaskQueues()
+          
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.PurgeTask.TASK_TYPE)));
+    // Will not schedule task if there's incomplete task
+    assertNull(
+        
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+    waitForTaskToComplete();
+    // check that metadata contains expected values
+    for (SegmentZKMetadata metadata : 
_pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) {
+      // Check purgeTimeIn
+      
assertTrue(metadata.getCustomMap().containsKey(MinionConstants.PurgeTask.TASK_TYPE
+          + MinionConstants.TASK_TIME_SUFFIX));
+      //check that the purge have been run on these segments
+      assertTrue(System.currentTimeMillis() - 
Long.parseLong(metadata.getCustomMap()
+          .get(MinionConstants.PurgeTask.TASK_TYPE + 
MinionConstants.TASK_TIME_SUFFIX)) < 86400000);
+    }
+    // should not reload a new purge as the last time purge is not greater 
than last + 1day (default purge delay)
+    assertNull(
+        
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+
+    // 28057 Rows with quarter = 1
+    // 115545 Totals Rows
+    // Expecting 87488 to the final time
+    String sqlQuery = "SELECT count(*) FROM " + PURGE_DELTA_PASSED_TABLE;
+    JsonNode expectedJson = postQuery(sqlQuery, _brokerBaseApiUrl);
+
+    assertTrue(expectedJson.toString().contains("\"rows\":[[87488]]"));
+
+    // Drop the table
+    dropOfflineTable(PURGE_DELTA_PASSED_TABLE);
+
+    // Check if the task metadata is cleaned up on table deletion
+    verifyTableDelete(offlineTableName);
+  }
+
+  /**
+   * Test purge with not passed delay
+   */
+  @Test
+  public void testNotPassedDelayTimePurge()
+      throws Exception {
+    // Expected no purge task generation :
+    // 1. segment purge time is set to System.currentTimeMillis() -
+    // 4000 so a new purge should not be triggered as the delay is 1d 86400000 
ms
+    // 2. Check no task Have been scheduled
+    // 3. Check segment metadata to ensure purgeTime is not updated into the 
metadata
+    // 4. Check the purge process itself have not been runned by setting an 
expecting number of row
+
+    String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(PURGE_DELTA_NOT_PASSED_TABLE);
+
+    //no task should be schedule as the delay is not passed
+    assertNull(
+        
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+    for (SegmentZKMetadata metadata : 
_pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) {
+      
assertTrue(metadata.getCustomMap().containsKey(MinionConstants.PurgeTask.TASK_TYPE
+          + MinionConstants.TASK_TIME_SUFFIX));
+      //check that the purge have not been run on these segments
+      assertTrue(System.currentTimeMillis() - 
Long.parseLong(metadata.getCustomMap()
+          .get(MinionConstants.PurgeTask.TASK_TYPE + 
MinionConstants.TASK_TIME_SUFFIX)) > 4000);
+      assertTrue(System.currentTimeMillis() - 
Long.parseLong(metadata.getCustomMap()
+          .get(MinionConstants.PurgeTask.TASK_TYPE + 
MinionConstants.TASK_TIME_SUFFIX)) < 86400000);
+    }
+    // 28057 Rows with quarter = 1
+    // 115545 Totals Rows
+    // Expecting 87488 to the final time
+    String sqlQuery = "SELECT count(*) FROM " + PURGE_DELTA_NOT_PASSED_TABLE;
+    JsonNode expectedJson = postQuery(sqlQuery, _brokerBaseApiUrl);
+
+    assertTrue(expectedJson.toString().contains("\"rows\":[[115545]]"));
+
+    // Drop the table
+    dropOfflineTable(PURGE_DELTA_NOT_PASSED_TABLE);
+
+    // Check if the task metadata is cleaned up on table deletion
+    verifyTableDelete(offlineTableName);
+  }
+
+  protected void verifyTableDelete(String tableNameWithType) {
+    TestUtils.waitForCondition(input -> {
+      // Check if the segment lineage is cleaned up
+      if (SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, 
tableNameWithType) != null) {
+        return false;
+      }
+      // Check if the task metadata is cleaned up
+      if (MinionTaskMetadataUtils
+          .fetchTaskMetadata(_propertyStore, 
MinionConstants.PurgeTask.TASK_TYPE, tableNameWithType) != null) {
+        return false;
+      }
+      return true;
+    }, 1_000L, 60_000L, "Failed to delete table");
+  }
+
+  protected void waitForTaskToComplete() {
+    TestUtils.waitForCondition(input -> {
+      // Check task state
+      for (TaskState taskState : 
_helixTaskResourceManager.getTaskStates(MinionConstants.PurgeTask.TASK_TYPE)
+          .values()) {
+        if (taskState != TaskState.COMPLETED) {
+          return false;
+        }
+      }
+      return true;
+    }, 600_000L, "Failed to complete task");
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws Exception {
+    stopMinion();
+    stopServer();
+    stopBroker();
+    stopController();
+    stopZk();
+    FileUtils.deleteDirectory(_tempDir);
+  }
+}
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskGenerator.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskGenerator.java
new file mode 100644
index 0000000000..ff73ff7197
--- /dev/null
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskGenerator.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.purge;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.common.data.Segment;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import 
org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
+import 
org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtils;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.annotations.minion.TaskGenerator;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@TaskGenerator
+public class PurgeTaskGenerator extends BaseTaskGenerator {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PurgeTaskGenerator.class);
+
+  @Override
+  public String getTaskType() {
+    return MinionConstants.PurgeTask.TASK_TYPE;
+  }
+
+  @Override
+  public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
+    LOGGER.info("Start generating PurgeTask");
+    String taskType = MinionConstants.PurgeTask.TASK_TYPE;
+    List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+
+    for (TableConfig tableConfig : tableConfigs) {
+
+      String tableName = tableConfig.getTableName();
+      if (tableConfig.getTableType() == TableType.REALTIME) {
+        LOGGER.warn("Skip generating task: {} for real-time table: {}", 
taskType, tableName);
+        continue;
+      }
+
+      Map<String, String> taskConfigs;
+      TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+      if (tableTaskConfig == null) {
+        LOGGER.warn("Failed to find task config for table: {}", tableName);
+        continue;
+      }
+      taskConfigs = 
tableTaskConfig.getConfigsForTaskType(MinionConstants.PurgeTask.TASK_TYPE);
+      Preconditions.checkNotNull(taskConfigs, "Task config shouldn't be null 
for Table: {}", tableName);
+
+      String deltaTimePeriod =
+          
taskConfigs.getOrDefault(MinionConstants.PurgeTask.LAST_PURGE_TIME_THREESOLD_PERIOD,
+              
MinionConstants.PurgeTask.DEFAULT_LAST_PURGE_TIME_THRESHOLD_PERIOD);
+      long purgeDeltaMs = TimeUtils.convertPeriodToMillis(deltaTimePeriod);
+
+      LOGGER.info("Start generating task configs for table: {} for task: {}", 
tableName, taskType);
+      // Get max number of tasks for this table
+      int tableMaxNumTasks;
+      String tableMaxNumTasksConfig = 
taskConfigs.get(MinionConstants.TABLE_MAX_NUM_TASKS_KEY);
+      if (tableMaxNumTasksConfig != null) {
+        try {
+          tableMaxNumTasks = Integer.parseInt(tableMaxNumTasksConfig);
+        } catch (Exception e) {
+          tableMaxNumTasks = Integer.MAX_VALUE;
+          LOGGER.warn("MaxNumTasks have been wrongly set for table : {}, and 
task {}", tableName, taskType);
+        }
+      } else {
+        tableMaxNumTasks = Integer.MAX_VALUE;
+      }
+      List<SegmentZKMetadata> offlineSegmentsZKMetadata = 
_clusterInfoAccessor.getSegmentsZKMetadata(tableName);
+      List<SegmentZKMetadata> purgedSegmentsZKMetadata = new ArrayList<>();
+      List<SegmentZKMetadata> notpurgedSegmentsZKMetadata = new ArrayList<>();
+
+      for (SegmentZKMetadata segmentMetadata: offlineSegmentsZKMetadata) {
+
+       if (segmentMetadata.getCustomMap() != null && 
segmentMetadata.getCustomMap().containsKey(
+           MinionConstants.PurgeTask.TASK_TYPE + 
MinionConstants.TASK_TIME_SUFFIX)) {
+         purgedSegmentsZKMetadata.add(segmentMetadata);
+       } else {
+         notpurgedSegmentsZKMetadata.add(segmentMetadata);
+       }
+      }
+      Collections.sort(purgedSegmentsZKMetadata, Comparator.comparing(
+          segmentZKMetadata -> segmentZKMetadata.getCustomMap()
+              .get(MinionConstants.PurgeTask.TASK_TYPE + 
MinionConstants.TASK_TIME_SUFFIX),
+          Comparator.nullsFirst(Comparator.naturalOrder())));
+      //add already purged segment at the end
+      notpurgedSegmentsZKMetadata.addAll(purgedSegmentsZKMetadata);
+
+      int tableNumTasks = 0;
+      Set<Segment> runningSegments =
+          
TaskGeneratorUtils.getRunningSegments(MinionConstants.PurgeTask.TASK_TYPE, 
_clusterInfoAccessor);
+      for (SegmentZKMetadata segmentZKMetadata : notpurgedSegmentsZKMetadata) {
+        Map<String, String> configs = new HashMap<>();
+        String segmentName = segmentZKMetadata.getSegmentName();
+        Long tsLastPurge;
+        if (segmentZKMetadata.getCustomMap() != null) {
+          tsLastPurge = Long.valueOf(segmentZKMetadata.getCustomMap()
+              .get(MinionConstants.PurgeTask.TASK_TYPE + 
MinionConstants.TASK_TIME_SUFFIX));
+        } else {
+          tsLastPurge = 0L;
+        }
+
+        //skip running segment
+        if (runningSegments.contains(new Segment(tableName, segmentName))) {
+          continue;
+        }
+        if ((tsLastPurge != null) && ((System.currentTimeMillis() - 
tsLastPurge) < purgeDeltaMs)) {
+          //skip if purge delay is not reached
+          continue;
+        }
+        if (tableNumTasks == tableMaxNumTasks) {
+          break;
+        }
+        configs.put(MinionConstants.TABLE_NAME_KEY, tableName);
+        configs.put(MinionConstants.SEGMENT_NAME_KEY, segmentName);
+        configs.put(MinionConstants.DOWNLOAD_URL_KEY, 
segmentZKMetadata.getDownloadUrl());
+        configs.put(MinionConstants.UPLOAD_URL_KEY, 
_clusterInfoAccessor.getVipUrl() + "/segments");
+        configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY, 
String.valueOf(segmentZKMetadata.getCrc()));
+        pinotTaskConfigs.add(new PinotTaskConfig(taskType, configs));
+        tableNumTasks++;
+      }
+      LOGGER.info("Finished generating {} tasks configs for table: {} " + "for 
task: {}", tableNumTasks, tableName,
+          taskType);
+    }
+    return pinotTaskConfigs;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to