This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d2fa9dd097 Implement builtin-purge task (#8589)
d2fa9dd097 is described below
commit d2fa9dd097f5c26c20da46b5ac66da49f302d0ff
Author: Airliquide76 <[email protected]>
AuthorDate: Tue May 31 05:05:22 2022 +0200
Implement builtin-purge task (#8589)
* Implement builtin-purge task
* Add todo to purge lastly purged segment first
* Add segment purging order
* add timeIntervalCheck for purge task
* add integration test
Co-authored-by: francois autaa <[email protected]>
---
.../apache/pinot/core/common/MinionConstants.java | 2 +
.../tests/PurgeMinionClusterIntegrationTest.java | 357 +++++++++++++++++++++
.../minion/tasks/purge/PurgeTaskGenerator.java | 153 +++++++++
3 files changed, 512 insertions(+)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index e0560248ad..c4007ee561 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -67,6 +67,8 @@ public class MinionConstants {
// Purges rows inside segment that match chosen criteria
public static class PurgeTask {
public static final String TASK_TYPE = "PurgeTask";
+ public static final String LAST_PURGE_TIME_THREESOLD_PERIOD =
"lastPurgeTimeThresholdPeriod";
+ public static final String DEFAULT_LAST_PURGE_TIME_THRESHOLD_PERIOD =
"14d";
}
// Common config keys for segment merge tasks.
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
new file mode 100644
index 0000000000..d44b5711db
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
@@ -0,0 +1,357 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.lineage.SegmentLineageAccessHelper;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.minion.MinionTaskMetadataUtils;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
+import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.minion.MinionContext;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+/**
+ * Integration test for minion task of type "PurgeTask"
+ */
+public class PurgeMinionClusterIntegrationTest extends
BaseClusterIntegrationTest {
+ private static final String PURGE_FIRST_RUN_TABLE = "myTable1";
+ private static final String PURGE_DELTA_PASSED_TABLE = "myTable2";
+ private static final String PURGE_DELTA_NOT_PASSED_TABLE = "myTable3";
+
+ protected PinotHelixTaskResourceManager _helixTaskResourceManager;
+ protected PinotTaskManager _taskManager;
+ protected PinotHelixResourceManager _pinotHelixResourceManager;
+ protected String _tableName;
+
+ protected final File _segmentDir1 = new File(_tempDir, "segmentDir1");
+ protected final File _segmentDir2 = new File(_tempDir, "segmentDir2");
+ protected final File _segmentDir3 = new File(_tempDir, "segmentDir3");
+
+ protected final File _tarDir1 = new File(_tempDir, "tarDir1");
+ protected final File _tarDir2 = new File(_tempDir, "tarDir2");
+ protected final File _tarDir3 = new File(_tempDir, "tarDir3");
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir1, _tarDir1,
+ _segmentDir2, _tarDir2, _segmentDir3, _tarDir3);
+
+ // Start the Pinot cluster
+ startZk();
+ startController();
+ startBrokers(1);
+ startServers(1);
+
+ // Create and upload the schema and table config
+ Schema schema = createSchema();
+ addSchema(schema);
+ setTableName(PURGE_DELTA_NOT_PASSED_TABLE);
+ TableConfig purgeDeltaNotPassedTableConfig = createOfflineTableConfig();
+ purgeDeltaNotPassedTableConfig.setTaskConfig(getPurgeTaskConfig());
+ setTableName(PURGE_FIRST_RUN_TABLE);
+ TableConfig purgeTableConfig = createOfflineTableConfig();
+ purgeTableConfig.setTaskConfig(getPurgeTaskConfig());
+
+ setTableName(PURGE_DELTA_PASSED_TABLE);
+ TableConfig purgeDeltaPassedTableConfig = createOfflineTableConfig();
+ purgeDeltaPassedTableConfig.setTaskConfig(getPurgeTaskConfig());
+
+ addTableConfig(purgeTableConfig);
+ addTableConfig(purgeDeltaPassedTableConfig);
+ addTableConfig(purgeDeltaNotPassedTableConfig);
+
+ // Unpack the Avro files
+ List<File> avroFiles = unpackAvroData(_tempDir);
+
+ // Create and upload segments
+ ClusterIntegrationTestUtils
+ .buildSegmentsFromAvro(avroFiles, purgeTableConfig, schema, 0,
_segmentDir1, _tarDir1);
+ ClusterIntegrationTestUtils
+ .buildSegmentsFromAvro(avroFiles, purgeDeltaPassedTableConfig,
+ schema, 0, _segmentDir2, _tarDir2);
+ ClusterIntegrationTestUtils
+ .buildSegmentsFromAvro(avroFiles, purgeDeltaNotPassedTableConfig,
+ schema, 0, _segmentDir3, _tarDir3);
+
+ uploadSegments(PURGE_FIRST_RUN_TABLE, _tarDir1);
+ uploadSegments(PURGE_DELTA_PASSED_TABLE, _tarDir2);
+ uploadSegments(PURGE_DELTA_NOT_PASSED_TABLE, _tarDir3);
+
+ // Initialize the query generator
+ setUpQueryGenerator(avroFiles);
+ startMinion();
+ setRecordPurger();
+ _helixTaskResourceManager =
_controllerStarter.getHelixTaskResourceManager();
+ _taskManager = _controllerStarter.getTaskManager();
+ _pinotHelixResourceManager = _controllerStarter.getHelixResourceManager();
+
+ //set up metadata on segment to check how code handle passed and not
passed delay
+ String tablenameOfflineNotPassed =
TableNameBuilder.OFFLINE.tableNameWithType(PURGE_DELTA_NOT_PASSED_TABLE);
+ String tablenameOfflinePassed =
TableNameBuilder.OFFLINE.tableNameWithType(PURGE_DELTA_PASSED_TABLE);
+
+ //set up passed delay
+ List<SegmentZKMetadata> segmentsZKMetadataDeltaPassed =
_pinotHelixResourceManager
+ .getSegmentsZKMetadata(tablenameOfflinePassed);
+
+ Map<String, String> customSegmentMetadataPassed = new HashMap<>();
+ customSegmentMetadataPassed.put(MinionConstants.PurgeTask.TASK_TYPE
+ + MinionConstants.TASK_TIME_SUFFIX,
String.valueOf(System.currentTimeMillis() - 88400000));
+
+ for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadataDeltaPassed) {
+ segmentZKMetadata.setCustomMap(customSegmentMetadataPassed);
+ _pinotHelixResourceManager.updateZkMetadata(tablenameOfflinePassed,
segmentZKMetadata);
+ }
+ //set up not passed delay
+ List<SegmentZKMetadata> segmentsZKMetadataDeltaNotPassed =
_pinotHelixResourceManager
+ .getSegmentsZKMetadata(tablenameOfflineNotPassed);
+ Map<String, String> customSegmentMetadataNotPassed = new HashMap<>();
+ customSegmentMetadataNotPassed.put(MinionConstants.PurgeTask.TASK_TYPE
+ + MinionConstants.TASK_TIME_SUFFIX,
String.valueOf(System.currentTimeMillis() - 4000));
+ for (SegmentZKMetadata segmentZKMetadata :
segmentsZKMetadataDeltaNotPassed) {
+ segmentZKMetadata.setCustomMap(customSegmentMetadataNotPassed);
+ _pinotHelixResourceManager.updateZkMetadata(tablenameOfflineNotPassed,
segmentZKMetadata);
+ }
+ }
+
+ private void setRecordPurger() {
+ MinionContext minionContext = MinionContext.getInstance();
+ minionContext.setRecordPurgerFactory(rawTableName -> {
+ List<String> tableNames = Arrays.asList(PURGE_FIRST_RUN_TABLE,
+ PURGE_DELTA_PASSED_TABLE, PURGE_DELTA_NOT_PASSED_TABLE);
+ if (tableNames.contains(rawTableName)) {
+ return row -> row.getValue("Quarter").equals(1);
+ } else {
+ return null;
+ }
+ });
+ }
+
+ @Override
+ public String getTableName() {
+ return _tableName;
+ }
+
+ public void setTableName(String tableName) {
+ _tableName = tableName;
+ }
+
+ private TableTaskConfig getPurgeTaskConfig() {
+ Map<String, String> tableTaskConfigs = new HashMap<>();
+
tableTaskConfigs.put(MinionConstants.PurgeTask.LAST_PURGE_TIME_THREESOLD_PERIOD,
"1d");
+ return new
TableTaskConfig(Collections.singletonMap(MinionConstants.PurgeTask.TASK_TYPE,
tableTaskConfigs));
+ }
+
+ /**
+ * Test purge with no metadata on the segments (checking null safe
implementation)
+ */
+ @Test
+ public void testFirstRunPurge()
+ throws Exception {
+ // Expected purge task generation :
+ // 1. No previous purge run so all segment should be processed and purge
metadata sould be added to the segments
+ // 2. Check that we cannot run on same time two purge generation ensuring
running segment will be skipped
+ // 3. Check segment metadata to ensure purgeTime is updated into the
metadata
+ // 4. Check after the first run of the purge if we rerun a purge task
generation no task should be scheduled
+ // 5. Check the purge process itself by setting an expecting number of row
+
+ String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(PURGE_FIRST_RUN_TABLE);
+
+
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE);
+ assertTrue(_helixTaskResourceManager.getTaskQueues()
+
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.PurgeTask.TASK_TYPE)));
+ // Will not schedule task if there's incomplete task
+ assertNull(
+
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+ waitForTaskToComplete();
+
+ // check that metadat
+ for (SegmentZKMetadata metadata :
_pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) {
+ // Check purgeTimeIn
+
assertTrue(metadata.getCustomMap().containsKey(MinionConstants.PurgeTask.TASK_TYPE
+ + MinionConstants.TASK_TIME_SUFFIX));
+ }
+ // should not reload a new purge as the last time purge is not greater
than last + 1day (default purge delay)
+ assertNull(
+
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+
+ // 28057 Rows with quarter = 1
+ // 115545 Totals Rows
+ // Expecting 87488 to the final time
+ String sqlQuery = "SELECT count(*) FROM " + PURGE_FIRST_RUN_TABLE;
+ JsonNode expectedJson = postQuery(sqlQuery, _brokerBaseApiUrl);
+ assertTrue(expectedJson.toString().contains("\"rows\":[[87488]]"));
+
+ // Drop the table
+ dropOfflineTable(PURGE_FIRST_RUN_TABLE);
+
+ // Check if the task metadata is cleaned up on table deletion
+ verifyTableDelete(offlineTableName);
+ }
+
+ /**
+ * Test purge with passed delay
+ */
+ @Test
+ public void testPassedDelayTimePurge()
+ throws Exception {
+ // Expected purge task generation :
+ // 1. The purge time on this test is greater than the threshold expected
(863660000 > 1d (86400000) )
+ // 2. Check that we cannot run on same time two purge generation ensuring
running segment will be skipped
+ // 3. Check segment metadata to ensure purgeTime is updated into the
metadata
+ // 4. Check after the first run of the purge if we rerun a purge task
generation no task should be scheduled
+ // 5. Check the purge process itself by setting an expecting number of row
+
+ String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(PURGE_DELTA_PASSED_TABLE);
+
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE);
+ assertTrue(_helixTaskResourceManager.getTaskQueues()
+
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.PurgeTask.TASK_TYPE)));
+ // Will not schedule task if there's incomplete task
+ assertNull(
+
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+ waitForTaskToComplete();
+ // check that metadata contains expected values
+ for (SegmentZKMetadata metadata :
_pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) {
+ // Check purgeTimeIn
+
assertTrue(metadata.getCustomMap().containsKey(MinionConstants.PurgeTask.TASK_TYPE
+ + MinionConstants.TASK_TIME_SUFFIX));
+ //check that the purge have been run on these segments
+ assertTrue(System.currentTimeMillis() -
Long.parseLong(metadata.getCustomMap()
+ .get(MinionConstants.PurgeTask.TASK_TYPE +
MinionConstants.TASK_TIME_SUFFIX)) < 86400000);
+ }
+ // should not reload a new purge as the last time purge is not greater
than last + 1day (default purge delay)
+ assertNull(
+
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+
+ // 28057 Rows with quarter = 1
+ // 115545 Totals Rows
+ // Expecting 87488 to the final time
+ String sqlQuery = "SELECT count(*) FROM " + PURGE_DELTA_PASSED_TABLE;
+ JsonNode expectedJson = postQuery(sqlQuery, _brokerBaseApiUrl);
+
+ assertTrue(expectedJson.toString().contains("\"rows\":[[87488]]"));
+
+ // Drop the table
+ dropOfflineTable(PURGE_DELTA_PASSED_TABLE);
+
+ // Check if the task metadata is cleaned up on table deletion
+ verifyTableDelete(offlineTableName);
+ }
+
+ /**
+ * Test purge with not passed delay
+ */
+ @Test
+ public void testNotPassedDelayTimePurge()
+ throws Exception {
+ // Expected no purge task generation :
+ // 1. segment purge time is set to System.currentTimeMillis() -
+ // 4000 so a new purge should not be triggered as the delay is 1d 86400000
ms
+ // 2. Check no task Have been scheduled
+ // 3. Check segment metadata to ensure purgeTime is not updated into the
metadata
+ // 4. Check the purge process itself have not been runned by setting an
expecting number of row
+
+ String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(PURGE_DELTA_NOT_PASSED_TABLE);
+
+ //no task should be schedule as the delay is not passed
+ assertNull(
+
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+ for (SegmentZKMetadata metadata :
_pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) {
+
assertTrue(metadata.getCustomMap().containsKey(MinionConstants.PurgeTask.TASK_TYPE
+ + MinionConstants.TASK_TIME_SUFFIX));
+ //check that the purge have not been run on these segments
+ assertTrue(System.currentTimeMillis() -
Long.parseLong(metadata.getCustomMap()
+ .get(MinionConstants.PurgeTask.TASK_TYPE +
MinionConstants.TASK_TIME_SUFFIX)) > 4000);
+ assertTrue(System.currentTimeMillis() -
Long.parseLong(metadata.getCustomMap()
+ .get(MinionConstants.PurgeTask.TASK_TYPE +
MinionConstants.TASK_TIME_SUFFIX)) < 86400000);
+ }
+ // 28057 Rows with quarter = 1
+ // 115545 Totals Rows
+ // Expecting 87488 to the final time
+ String sqlQuery = "SELECT count(*) FROM " + PURGE_DELTA_NOT_PASSED_TABLE;
+ JsonNode expectedJson = postQuery(sqlQuery, _brokerBaseApiUrl);
+
+ assertTrue(expectedJson.toString().contains("\"rows\":[[115545]]"));
+
+ // Drop the table
+ dropOfflineTable(PURGE_DELTA_NOT_PASSED_TABLE);
+
+ // Check if the task metadata is cleaned up on table deletion
+ verifyTableDelete(offlineTableName);
+ }
+
+ protected void verifyTableDelete(String tableNameWithType) {
+ TestUtils.waitForCondition(input -> {
+ // Check if the segment lineage is cleaned up
+ if (SegmentLineageAccessHelper.getSegmentLineage(_propertyStore,
tableNameWithType) != null) {
+ return false;
+ }
+ // Check if the task metadata is cleaned up
+ if (MinionTaskMetadataUtils
+ .fetchTaskMetadata(_propertyStore,
MinionConstants.PurgeTask.TASK_TYPE, tableNameWithType) != null) {
+ return false;
+ }
+ return true;
+ }, 1_000L, 60_000L, "Failed to delete table");
+ }
+
+ protected void waitForTaskToComplete() {
+ TestUtils.waitForCondition(input -> {
+ // Check task state
+ for (TaskState taskState :
_helixTaskResourceManager.getTaskStates(MinionConstants.PurgeTask.TASK_TYPE)
+ .values()) {
+ if (taskState != TaskState.COMPLETED) {
+ return false;
+ }
+ }
+ return true;
+ }, 600_000L, "Failed to complete task");
+ }
+
+ @AfterClass
+ public void tearDown()
+ throws Exception {
+ stopMinion();
+ stopServer();
+ stopBroker();
+ stopController();
+ stopZk();
+ FileUtils.deleteDirectory(_tempDir);
+ }
+}
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskGenerator.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskGenerator.java
new file mode 100644
index 0000000000..ff73ff7197
--- /dev/null
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskGenerator.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.purge;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.common.data.Segment;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import
org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
+import
org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtils;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.annotations.minion.TaskGenerator;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@TaskGenerator
+public class PurgeTaskGenerator extends BaseTaskGenerator {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PurgeTaskGenerator.class);
+
+ @Override
+ public String getTaskType() {
+ return MinionConstants.PurgeTask.TASK_TYPE;
+ }
+
+ @Override
+ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
+ LOGGER.info("Start generating PurgeTask");
+ String taskType = MinionConstants.PurgeTask.TASK_TYPE;
+ List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+
+ for (TableConfig tableConfig : tableConfigs) {
+
+ String tableName = tableConfig.getTableName();
+ if (tableConfig.getTableType() == TableType.REALTIME) {
+ LOGGER.warn("Skip generating task: {} for real-time table: {}",
taskType, tableName);
+ continue;
+ }
+
+ Map<String, String> taskConfigs;
+ TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+ if (tableTaskConfig == null) {
+ LOGGER.warn("Failed to find task config for table: {}", tableName);
+ continue;
+ }
+ taskConfigs =
tableTaskConfig.getConfigsForTaskType(MinionConstants.PurgeTask.TASK_TYPE);
+ Preconditions.checkNotNull(taskConfigs, "Task config shouldn't be null
for Table: {}", tableName);
+
+ String deltaTimePeriod =
+
taskConfigs.getOrDefault(MinionConstants.PurgeTask.LAST_PURGE_TIME_THREESOLD_PERIOD,
+
MinionConstants.PurgeTask.DEFAULT_LAST_PURGE_TIME_THRESHOLD_PERIOD);
+ long purgeDeltaMs = TimeUtils.convertPeriodToMillis(deltaTimePeriod);
+
+ LOGGER.info("Start generating task configs for table: {} for task: {}",
tableName, taskType);
+ // Get max number of tasks for this table
+ int tableMaxNumTasks;
+ String tableMaxNumTasksConfig =
taskConfigs.get(MinionConstants.TABLE_MAX_NUM_TASKS_KEY);
+ if (tableMaxNumTasksConfig != null) {
+ try {
+ tableMaxNumTasks = Integer.parseInt(tableMaxNumTasksConfig);
+ } catch (Exception e) {
+ tableMaxNumTasks = Integer.MAX_VALUE;
+ LOGGER.warn("MaxNumTasks have been wrongly set for table : {}, and
task {}", tableName, taskType);
+ }
+ } else {
+ tableMaxNumTasks = Integer.MAX_VALUE;
+ }
+ List<SegmentZKMetadata> offlineSegmentsZKMetadata =
_clusterInfoAccessor.getSegmentsZKMetadata(tableName);
+ List<SegmentZKMetadata> purgedSegmentsZKMetadata = new ArrayList<>();
+ List<SegmentZKMetadata> notpurgedSegmentsZKMetadata = new ArrayList<>();
+
+ for (SegmentZKMetadata segmentMetadata: offlineSegmentsZKMetadata) {
+
+ if (segmentMetadata.getCustomMap() != null &&
segmentMetadata.getCustomMap().containsKey(
+ MinionConstants.PurgeTask.TASK_TYPE +
MinionConstants.TASK_TIME_SUFFIX)) {
+ purgedSegmentsZKMetadata.add(segmentMetadata);
+ } else {
+ notpurgedSegmentsZKMetadata.add(segmentMetadata);
+ }
+ }
+ Collections.sort(purgedSegmentsZKMetadata, Comparator.comparing(
+ segmentZKMetadata -> segmentZKMetadata.getCustomMap()
+ .get(MinionConstants.PurgeTask.TASK_TYPE +
MinionConstants.TASK_TIME_SUFFIX),
+ Comparator.nullsFirst(Comparator.naturalOrder())));
+ //add already purged segment at the end
+ notpurgedSegmentsZKMetadata.addAll(purgedSegmentsZKMetadata);
+
+ int tableNumTasks = 0;
+ Set<Segment> runningSegments =
+
TaskGeneratorUtils.getRunningSegments(MinionConstants.PurgeTask.TASK_TYPE,
_clusterInfoAccessor);
+ for (SegmentZKMetadata segmentZKMetadata : notpurgedSegmentsZKMetadata) {
+ Map<String, String> configs = new HashMap<>();
+ String segmentName = segmentZKMetadata.getSegmentName();
+ Long tsLastPurge;
+ if (segmentZKMetadata.getCustomMap() != null) {
+ tsLastPurge = Long.valueOf(segmentZKMetadata.getCustomMap()
+ .get(MinionConstants.PurgeTask.TASK_TYPE +
MinionConstants.TASK_TIME_SUFFIX));
+ } else {
+ tsLastPurge = 0L;
+ }
+
+ //skip running segment
+ if (runningSegments.contains(new Segment(tableName, segmentName))) {
+ continue;
+ }
+ if ((tsLastPurge != null) && ((System.currentTimeMillis() -
tsLastPurge) < purgeDeltaMs)) {
+ //skip if purge delay is not reached
+ continue;
+ }
+ if (tableNumTasks == tableMaxNumTasks) {
+ break;
+ }
+ configs.put(MinionConstants.TABLE_NAME_KEY, tableName);
+ configs.put(MinionConstants.SEGMENT_NAME_KEY, segmentName);
+ configs.put(MinionConstants.DOWNLOAD_URL_KEY,
segmentZKMetadata.getDownloadUrl());
+ configs.put(MinionConstants.UPLOAD_URL_KEY,
_clusterInfoAccessor.getVipUrl() + "/segments");
+ configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY,
String.valueOf(segmentZKMetadata.getCrc()));
+ pinotTaskConfigs.add(new PinotTaskConfig(taskType, configs));
+ tableNumTasks++;
+ }
+ LOGGER.info("Finished generating {} tasks configs for table: {} " + "for
task: {}", tableNumTasks, tableName,
+ taskType);
+ }
+ return pinotTaskConfigs;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]