This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e1b0e5357e Refactor PinotTaskManager class (#12964)
e1b0e5357e is described below
commit e1b0e5357ebfcecffcc6cce3997a3edcdac1aa2c
Author: Pratik Tibrewal <[email protected]>
AuthorDate: Sat Apr 20 03:35:01 2024 +0530
Refactor PinotTaskManager class (#12964)
---
.../api/resources/PinotTaskRestletResource.java | 14 +-
.../helix/core/minion/CronJobScheduleJob.java | 2 +-
.../helix/core/minion/PinotTaskManager.java | 162 +++++++++------------
.../MergeRollupMinionClusterIntegrationTest.java | 140 +++++++++---------
.../tests/PurgeMinionClusterIntegrationTest.java | 40 +++--
...fflineSegmentsMinionClusterIntegrationTest.java | 55 ++++---
.../tests/SimpleMinionClusterIntegrationTest.java | 45 +++---
.../integration/tests/TlsIntegrationTest.java | 2 +-
.../tests/UpsertTableIntegrationTest.java | 14 +-
.../tests/UrlAuthRealtimeIntegrationTest.java | 2 +-
10 files changed, 217 insertions(+), 259 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
index e09bde8466..0d9d3a05c1 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
@@ -618,22 +618,20 @@ public class PinotTaskRestletResource {
@ApiOperation("Schedule tasks and return a map from task type to task name
scheduled")
public Map<String, String> scheduleTasks(@ApiParam(value = "Task type")
@QueryParam("taskType") String taskType,
@ApiParam(value = "Table name (with type suffix)")
@QueryParam("tableName") String tableName,
- @ApiParam(value = "Minion Instance tag to schedule the task explicitly
on")
- @QueryParam("minionInstanceTag") @Nullable String minionInstanceTag,
- @Context HttpHeaders headers) {
+ @ApiParam(value = "Minion Instance tag to schedule the task explicitly
on") @QueryParam("minionInstanceTag")
+ @Nullable String minionInstanceTag, @Context HttpHeaders headers) {
String database = headers != null ? headers.getHeaderString(DATABASE) :
DEFAULT_DATABASE;
if (taskType != null) {
// Schedule task for the given task type
- List<String> taskNames = tableName != null
- ? _pinotTaskManager.scheduleTask(taskType,
+ List<String> taskNames = tableName != null ?
_pinotTaskManager.scheduleTaskForTable(taskType,
DatabaseUtils.translateTableName(tableName, headers),
minionInstanceTag)
: _pinotTaskManager.scheduleTaskForDatabase(taskType, database,
minionInstanceTag);
return Collections.singletonMap(taskType, taskNames == null ? null :
StringUtils.join(taskNames, ','));
} else {
// Schedule tasks for all task types
- Map<String, List<String>> allTaskNames = tableName != null
- ?
_pinotTaskManager.scheduleTasks(DatabaseUtils.translateTableName(tableName,
headers), minionInstanceTag)
- : _pinotTaskManager.scheduleTasksForDatabase(database,
minionInstanceTag);
+ Map<String, List<String>> allTaskNames = tableName != null ?
_pinotTaskManager.scheduleAllTasksForTable(
+ DatabaseUtils.translateTableName(tableName, headers),
minionInstanceTag)
+ : _pinotTaskManager.scheduleAllTasksForDatabase(database,
minionInstanceTag);
return allTaskNames.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry ->
String.join(",", entry.getValue())));
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java
index 8c0433854f..f9b250b2bc 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java
@@ -65,7 +65,7 @@ public class CronJobScheduleJob implements Job {
return;
}
long jobStartTime = System.currentTimeMillis();
- pinotTaskManager.scheduleTask(taskType, table);
+ pinotTaskManager.scheduleTaskForTable(taskType, table, null);
LOGGER.info("Finished CronJob: table - {}, task - {}, next runtime is
{}", table, taskType,
jobExecutionContext.getNextFireTime());
pinotTaskManager.getControllerMetrics().addTimedTableValue(PinotTaskManager.getCronJobName(table,
taskType),
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index 4029944139..97417d6bea 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -22,7 +22,6 @@ import com.google.common.base.Preconditions;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
@@ -480,30 +479,72 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
}
/**
- * Public API to schedule tasks (all task types) for all tables in all
databases.
+ * Schedules tasks (all task types) for all tables.
* It might be called from the non-leader controller.
* Returns a map from the task type to the list of tasks scheduled.
*/
- public synchronized Map<String, List<String>> scheduleTasks() {
- return scheduleTasks(_pinotHelixResourceManager.getAllTables(), false,
null);
+ public synchronized Map<String, List<String>>
scheduleAllTasksForAllTables(@Nullable String minionInstanceTag) {
+ return scheduleTasks(_pinotHelixResourceManager.getAllTables(), false,
minionInstanceTag);
}
/**
- * Public API to schedule tasks (all task types) for all tables in given
database.
+ * Schedules tasks (all task types) for all tables in the given database.
* It might be called from the non-leader controller.
* Returns a map from the task type to the list of tasks scheduled.
*/
- public synchronized Map<String, List<String>>
scheduleTasksForDatabase(@Nullable String database,
+ public synchronized Map<String, List<String>>
scheduleAllTasksForDatabase(@Nullable String database,
@Nullable String minionInstanceTag) {
return scheduleTasks(_pinotHelixResourceManager.getAllTables(database),
false, minionInstanceTag);
}
+ /**
+ * Schedules tasks (all task types) for the given table.
+ * It might be called from the non-leader controller.
+ * Returns a map from the task type to the list of tasks scheduled.
+ */
+ public synchronized Map<String, List<String>>
scheduleAllTasksForTable(String tableNameWithType,
+ @Nullable String minionInstanceTag) {
+ return scheduleTasks(List.of(tableNameWithType), false, minionInstanceTag);
+ }
+
+ /**
+ * Schedules task for the given task type for all tables.
+ * It might be called from the non-leader controller.
+ * Returns a list of tasks scheduled, or {@code null} if no task is
scheduled.
+ */
+ @Nullable
+ public synchronized List<String> scheduleTaskForAllTables(String taskType,
@Nullable String minionInstanceTag) {
+ return scheduleTask(taskType, _pinotHelixResourceManager.getAllTables(),
minionInstanceTag);
+ }
+
+ /**
+ * Schedules task for the given task type for all tables in the given
database.
+ * It might be called from the non-leader controller.
+ * Returns a list of tasks scheduled, or {@code null} if no task is
scheduled.
+ */
+ @Nullable
+ public synchronized List<String> scheduleTaskForDatabase(String taskType,
@Nullable String database,
+ @Nullable String minionInstanceTag) {
+ return scheduleTask(taskType,
_pinotHelixResourceManager.getAllTables(database), minionInstanceTag);
+ }
+
+ /**
+ * Schedules task for the given task type for the give table.
+ * It might be called from the non-leader controller.
+ * Returns a list of tasks scheduled, or {@code null} if no task is
scheduled.
+ */
+ @Nullable
+ public synchronized List<String> scheduleTaskForTable(String taskType,
String tableNameWithType,
+ @Nullable String minionInstanceTag) {
+ return scheduleTask(taskType, List.of(tableNameWithType),
minionInstanceTag);
+ }
+
/**
* Helper method to schedule tasks (all task types) for the given tables
that have the tasks enabled. Returns a map
* from the task type to the list of the tasks scheduled.
*/
- private synchronized Map<String, List<String>> scheduleTasks(List<String>
tableNamesWithType,
- boolean isLeader, @Nullable String minionInstanceTag) {
+ private synchronized Map<String, List<String>> scheduleTasks(List<String>
tableNamesWithType, boolean isLeader,
+ @Nullable String minionInstanceTag) {
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED,
1L);
// Scan all table configs to get the tables with tasks enabled
@@ -541,6 +582,27 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
return tasksScheduled;
}
+ @Nullable
+ private synchronized List<String> scheduleTask(String taskType, List<String>
tables,
+ @Nullable String minionInstanceTag) {
+ PinotTaskGenerator taskGenerator =
_taskGeneratorRegistry.getTaskGenerator(taskType);
+ Preconditions.checkState(taskGenerator != null, "Task type: %s is not
registered", taskType);
+
+ // Scan all table configs to get the tables with task enabled
+ List<TableConfig> enabledTableConfigs = new ArrayList<>();
+ for (String tableNameWithType : tables) {
+ TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig != null && tableConfig.getTaskConfig() != null &&
tableConfig.getTaskConfig()
+ .isTaskTypeEnabled(taskType)) {
+ enabledTableConfigs.add(tableConfig);
+ }
+ }
+
+ _helixTaskResourceManager.ensureTaskQueueExists(taskType);
+ addTaskTypeMetricsUpdaterIfNeeded(taskType);
+ return scheduleTask(taskGenerator, enabledTableConfigs, false,
minionInstanceTag);
+ }
+
/**
* Helper method to schedule task with the given task generator for the
given tables that have the task enabled.
* Returns the list of task names, or {@code null} if no task is scheduled.
@@ -554,8 +616,8 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
for (TableConfig tableConfig : enabledTableConfigs) {
String tableName = tableConfig.getTableName();
try {
- String minionInstanceTag = minionInstanceTagForTask != null
- ? minionInstanceTagForTask :
taskGenerator.getMinionInstanceTag(tableConfig);
+ String minionInstanceTag = minionInstanceTagForTask != null ?
minionInstanceTagForTask
+ : taskGenerator.getMinionInstanceTag(tableConfig);
List<PinotTaskConfig> presentTaskConfig =
minionInstanceTagToTaskConfigs.computeIfAbsent(minionInstanceTag,
k -> new ArrayList<>());
taskGenerator.generateTasks(List.of(tableConfig), presentTaskConfig);
@@ -622,86 +684,6 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
return submittedTaskNames;
}
- /**
- * Public API to schedule tasks (all task types) for the given table. It
might be called from the non-leader
- * controller. Returns a map from the task type to the list of tasks
scheduled.
- */
- public synchronized Map<String, List<String>> scheduleTasks(String
tableNameWithType) {
- return scheduleTasks(Collections.singletonList(tableNameWithType), false,
null);
- }
-
- /**
- * Public API to schedule tasks (all task types) for the given table on a
specific instance tag.
- * It might be called from the non-leader controller. Returns a map from the
task type to the list of tasks scheduled.
- */
- public synchronized Map<String, List<String>> scheduleTasks(String
tableNameWithType,
- @Nullable String minionInstanceTag) {
- return scheduleTasks(Collections.singletonList(tableNameWithType), false,
minionInstanceTag);
- }
-
- /**
- * Public API to schedule task for the given task type in all databases.
- * It might be called from the non-leader controller.
- * Returns the list of task names, or {@code null} if no task is scheduled.
- */
- @Nullable
- public synchronized List<String> scheduleTask(String taskType, @Nullable
String minionInstanceTag) {
- return scheduleTask(taskType, _pinotHelixResourceManager.getAllTables(),
minionInstanceTag);
- }
-
- /**
- * Public API to schedule task for the given task type in given database.
- * It might be called from the non-leader controller.
- * Returns the list of task name, or {@code null} if no task is scheduled.
- */
- @Nullable
- public synchronized List<String> scheduleTaskForDatabase(String taskType,
@Nullable String database,
- @Nullable String minionInstanceTag) {
- return scheduleTask(taskType,
_pinotHelixResourceManager.getAllTables(database), minionInstanceTag);
- }
-
- @Nullable
- private List<String> scheduleTask(String taskType, List<String> tables,
@Nullable String minionInstanceTag) {
- PinotTaskGenerator taskGenerator =
_taskGeneratorRegistry.getTaskGenerator(taskType);
- Preconditions.checkState(taskGenerator != null, "Task type: %s is not
registered", taskType);
-
- // Scan all table configs to get the tables with task enabled
- List<TableConfig> enabledTableConfigs = new ArrayList<>();
- for (String tableNameWithType : tables) {
- TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
- if (tableConfig != null && tableConfig.getTaskConfig() != null &&
tableConfig.getTaskConfig()
- .isTaskTypeEnabled(taskType)) {
- enabledTableConfigs.add(tableConfig);
- }
- }
-
- _helixTaskResourceManager.ensureTaskQueueExists(taskType);
- addTaskTypeMetricsUpdaterIfNeeded(taskType);
- return scheduleTask(taskGenerator, enabledTableConfigs, false,
minionInstanceTag);
- }
-
- /**
- * Public API to schedule task for the given task type on the given table.
It might be called from the non-leader
- * controller. Returns the list of task names, or {@code null} if no task is
scheduled.
- */
- @Nullable
- public synchronized List<String> scheduleTask(String taskType, String
tableNameWithType,
- @Nullable String minionInstanceTag) {
- PinotTaskGenerator taskGenerator =
_taskGeneratorRegistry.getTaskGenerator(taskType);
- Preconditions.checkState(taskGenerator != null, "Task type: %s is not
registered", taskType);
-
- TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
- Preconditions.checkState(tableConfig != null, "Failed to find table config
for table: %s", tableNameWithType);
-
- Preconditions.checkState(
- tableConfig.getTaskConfig() != null &&
tableConfig.getTaskConfig().isTaskTypeEnabled(taskType),
- "Table: %s does not have task type: %s enabled", tableNameWithType,
taskType);
-
- _helixTaskResourceManager.ensureTaskQueueExists(taskType);
- addTaskTypeMetricsUpdaterIfNeeded(taskType);
- return scheduleTask(taskGenerator, Collections.singletonList(tableConfig),
false, minionInstanceTag);
- }
-
@Override
protected void processTables(List<String> tableNamesWithType, Properties
taskProperties) {
scheduleTasks(tableNamesWithType, true, null);
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
index b655416c87..c5be600661 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
@@ -139,14 +139,14 @@ public class MergeRollupMinionClusterIntegrationTest
extends BaseClusterIntegrat
List<File> avroFiles = unpackAvroData(_tempDir);
// Create and upload segments
- ClusterIntegrationTestUtils
- .buildSegmentsFromAvro(avroFiles, singleLevelConcatTableConfig,
schema, 0, _segmentDir1, _tarDir1);
+ ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles,
singleLevelConcatTableConfig, schema, 0, _segmentDir1,
+ _tarDir1);
buildSegmentsFromAvroWithPostfix(avroFiles, singleLevelRollupTableConfig,
schema, 0, _segmentDir2, _tarDir2, "1");
buildSegmentsFromAvroWithPostfix(avroFiles, singleLevelRollupTableConfig,
schema, 0, _segmentDir2, _tarDir2, "2");
- ClusterIntegrationTestUtils
- .buildSegmentsFromAvro(avroFiles, multiLevelConcatTableConfig, schema,
0, _segmentDir3, _tarDir3);
- ClusterIntegrationTestUtils
- .buildSegmentsFromAvro(avroFiles,
singleLevelConcatMetadataTableConfig, schema, 0, _segmentDir4, _tarDir4);
+ ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles,
multiLevelConcatTableConfig, schema, 0, _segmentDir3,
+ _tarDir3);
+ ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles,
singleLevelConcatMetadataTableConfig, schema, 0,
+ _segmentDir4, _tarDir4);
uploadSegments(SINGLE_LEVEL_CONCAT_TEST_TABLE, _tarDir1);
uploadSegments(SINGLE_LEVEL_ROLLUP_TEST_TABLE, _tarDir2);
uploadSegments(MULTI_LEVEL_CONCAT_TEST_TABLE, _tarDir3);
@@ -160,8 +160,8 @@ public class MergeRollupMinionClusterIntegrationTest
extends BaseClusterIntegrat
schema.setSchemaName(MULTI_LEVEL_CONCAT_PROCESS_ALL_REALTIME_TABLE);
addSchema(schema);
TableConfig singleLevelConcatProcessAllRealtimeTableConfig =
- createRealtimeTableConfigWithProcessAllMode(avroFiles.get(0),
- MULTI_LEVEL_CONCAT_PROCESS_ALL_REALTIME_TABLE,
PROCESS_ALL_MODE_KAFKA_TOPIC);
+ createRealtimeTableConfigWithProcessAllMode(avroFiles.get(0),
MULTI_LEVEL_CONCAT_PROCESS_ALL_REALTIME_TABLE,
+ PROCESS_ALL_MODE_KAFKA_TOPIC);
addTableConfig(singleLevelConcatProcessAllRealtimeTableConfig);
// Push data into Kafka
@@ -172,9 +172,8 @@ public class MergeRollupMinionClusterIntegrationTest
extends BaseClusterIntegrat
ClusterIntegrationTestUtils.pushAvroIntoKafka(avroFiles.subList(0, 3),
"localhost:" + getKafkaPort(),
PROCESS_ALL_MODE_KAFKA_TOPIC, getMaxNumKafkaMessagesPerBatch(),
getKafkaMessageHeader(), getPartitionColumn(),
injectTombstones());
- ClusterIntegrationTestUtils
- .buildSegmentsFromAvro(avroFiles.subList(3, 9),
singleLevelConcatProcessAllRealtimeTableConfig, schema, 0,
- _segmentDir5, _tarDir5);
+ ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles.subList(3, 9),
+ singleLevelConcatProcessAllRealtimeTableConfig, schema, 0,
_segmentDir5, _tarDir5);
// Wait for all documents loaded
waitForAllDocsLoaded(600_000L);
@@ -216,14 +215,14 @@ public class MergeRollupMinionClusterIntegrationTest
extends BaseClusterIntegrat
private TableConfig createOfflineTableConfig(String tableName,
TableTaskConfig taskConfig,
@Nullable SegmentPartitionConfig partitionConfig) {
- return new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName)
-
.setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn())
-
.setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns())
-
.setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns())
-
.setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion())
-
.setLoadMode(getLoadMode()).setTaskConfig(taskConfig).setBrokerTenant(getBrokerTenant())
-
.setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig())
-
.setNullHandlingEnabled(getNullHandlingEnabled()).setSegmentPartitionConfig(partitionConfig).build();
+ return new
TableConfigBuilder(TableType.OFFLINE).setTableName(tableName).setTimeColumnName(getTimeColumnName())
+
.setSortedColumn(getSortedColumn()).setInvertedIndexColumns(getInvertedIndexColumns())
+
.setNoDictionaryColumns(getNoDictionaryColumns()).setRangeIndexColumns(getRangeIndexColumns())
+
.setBloomFilterColumns(getBloomFilterColumns()).setFieldConfigList(getFieldConfigs())
+
.setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode())
+
.setTaskConfig(taskConfig).setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant())
+
.setIngestionConfig(getIngestionConfig()).setNullHandlingEnabled(getNullHandlingEnabled())
+ .setSegmentPartitionConfig(partitionConfig).build();
}
protected TableConfig createRealtimeTableConfigWithProcessAllMode(File
sampleAvroFile, String tableName,
@@ -246,12 +245,12 @@ public class MergeRollupMinionClusterIntegrationTest
extends BaseClusterIntegrat
tableTaskConfigs.put("ActualElapsedTime.aggregationType", "min");
tableTaskConfigs.put("WeatherDelay.aggregationType", "sum");
tableTaskConfigs.put("mode", "processAll");
- return new TableConfigBuilder(TableType.REALTIME).setTableName(tableName)
-
.setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn())
-
.setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns())
-
.setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns())
-
.setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion())
- .setLoadMode(getLoadMode()).setTaskConfig(
+ return new
TableConfigBuilder(TableType.REALTIME).setTableName(tableName).setTimeColumnName(getTimeColumnName())
+
.setSortedColumn(getSortedColumn()).setInvertedIndexColumns(getInvertedIndexColumns())
+
.setNoDictionaryColumns(getNoDictionaryColumns()).setRangeIndexColumns(getRangeIndexColumns())
+
.setBloomFilterColumns(getBloomFilterColumns()).setFieldConfigList(getFieldConfigs())
+
.setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode())
+ .setTaskConfig(
new
TableTaskConfig(Collections.singletonMap(MinionConstants.MergeRollupTask.TASK_TYPE,
tableTaskConfigs)))
.setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig())
.setQueryConfig(getQueryConfig()).setStreamConfigs(streamConfigs)
@@ -411,17 +410,16 @@ public class MergeRollupMinionClusterIntegrationTest
extends BaseClusterIntegrat
int numTasks = 0;
List<String> taskList;
for (String tasks =
-
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0);
- tasks != null;
- taskList =
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE),
- tasks = taskList != null ? taskList.get(0) : null,
- numTasks++) {
+ _taskManager.scheduleAllTasksForTable(offlineTableName,
null).get(MinionConstants.MergeRollupTask.TASK_TYPE)
+ .get(0); tasks != null; taskList =
+ _taskManager.scheduleAllTasksForTable(offlineTableName,
null).get(MinionConstants.MergeRollupTask.TASK_TYPE),
+ tasks = taskList != null ? taskList.get(0) : null, numTasks++) {
assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(),
expectedNumSubTasks[numTasks]);
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
- assertNull(
-
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+ assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+ .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
waitForTaskToComplete();
// Check watermark
@@ -527,17 +525,16 @@ public class MergeRollupMinionClusterIntegrationTest
extends BaseClusterIntegrat
int numTasks = 0;
List<String> taskList;
for (String tasks =
-
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0);
- tasks != null;
- taskList =
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE),
- tasks = taskList != null ? taskList.get(0) : null,
- numTasks++) {
+ _taskManager.scheduleAllTasksForTable(offlineTableName,
null).get(MinionConstants.MergeRollupTask.TASK_TYPE)
+ .get(0); tasks != null; taskList =
+ _taskManager.scheduleAllTasksForTable(offlineTableName,
null).get(MinionConstants.MergeRollupTask.TASK_TYPE),
+ tasks = taskList != null ? taskList.get(0) : null, numTasks++) {
assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(),
expectedNumSubTasks[numTasks]);
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
- assertNull(
-
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+ assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+ .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
waitForTaskToComplete();
// Check watermark
@@ -636,17 +633,16 @@ public class MergeRollupMinionClusterIntegrationTest
extends BaseClusterIntegrat
int numTasks = 0;
List<String> taskList;
for (String tasks =
-
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0);
- tasks != null;
- taskList =
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE),
- tasks = taskList != null ? taskList.get(0) : null,
- numTasks++) {
+ _taskManager.scheduleAllTasksForTable(offlineTableName,
null).get(MinionConstants.MergeRollupTask.TASK_TYPE)
+ .get(0); tasks != null; taskList =
+ _taskManager.scheduleAllTasksForTable(offlineTableName,
null).get(MinionConstants.MergeRollupTask.TASK_TYPE),
+ tasks = taskList != null ? taskList.get(0) : null, numTasks++) {
assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(),
1);
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
- assertNull(
-
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+ assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+ .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
waitForTaskToComplete();
// Check watermark
@@ -788,17 +784,16 @@ public class MergeRollupMinionClusterIntegrationTest
extends BaseClusterIntegrat
int numTasks = 0;
List<String> taskList;
for (String tasks =
-
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0);
- tasks != null;
- taskList =
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE),
- tasks = taskList != null ? taskList.get(0) : null,
- numTasks++) {
+ _taskManager.scheduleAllTasksForTable(offlineTableName,
null).get(MinionConstants.MergeRollupTask.TASK_TYPE)
+ .get(0); tasks != null; taskList =
+ _taskManager.scheduleAllTasksForTable(offlineTableName,
null).get(MinionConstants.MergeRollupTask.TASK_TYPE),
+ tasks = taskList != null ? taskList.get(0) : null, numTasks++) {
assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(),
expectedNumSubTasks[numTasks]);
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
- assertNull(
-
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+ assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+ .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
waitForTaskToComplete();
// Check watermark
@@ -859,8 +854,8 @@ public class MergeRollupMinionClusterIntegrationTest
extends BaseClusterIntegrat
return false;
}
// Check if the task metadata is cleaned up
- if (MinionTaskMetadataUtils
- .fetchTaskMetadata(_propertyStore,
MinionConstants.MergeRollupTask.TASK_TYPE, tableNameWithType) != null) {
+ if (MinionTaskMetadataUtils.fetchTaskMetadata(_propertyStore,
MinionConstants.MergeRollupTask.TASK_TYPE,
+ tableNameWithType) != null) {
return false;
}
return true;
@@ -921,18 +916,17 @@ public class MergeRollupMinionClusterIntegrationTest
extends BaseClusterIntegrat
int numTasks = 0;
List<String> taskList;
for (String tasks =
-
taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0);
- tasks != null;
- taskList =
taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE),
- tasks = taskList != null ? taskList.get(0) : null,
- numTasks++) {
+ taskManager.scheduleAllTasksForTable(realtimeTableName,
null).get(MinionConstants.MergeRollupTask.TASK_TYPE)
+ .get(0); tasks != null; taskList =
+ taskManager.scheduleAllTasksForTable(realtimeTableName,
null).get(MinionConstants.MergeRollupTask.TASK_TYPE),
+ tasks = taskList != null ? taskList.get(0) : null, numTasks++) {
// assertEquals(helixTaskResourceManager.getSubtaskConfigs(tasks).size(),
expectedNumSubTasks[numTasks]);
assertTrue(helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
- assertNull(
-
taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+ assertNull(taskManager.scheduleAllTasksForTable(realtimeTableName, null)
+ .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
waitForTaskToComplete();
// Check watermark
@@ -1027,17 +1021,16 @@ public class MergeRollupMinionClusterIntegrationTest
extends BaseClusterIntegrat
int numTasks = 0;
List<String> taskList;
for (String tasks =
-
taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0);
- tasks != null; taskList =
-
taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE),
- tasks = taskList != null ? taskList.get(0) : null,
- numTasks++) {
+ taskManager.scheduleAllTasksForTable(realtimeTableName,
null).get(MinionConstants.MergeRollupTask.TASK_TYPE)
+ .get(0); tasks != null; taskList =
+ taskManager.scheduleAllTasksForTable(realtimeTableName,
null).get(MinionConstants.MergeRollupTask.TASK_TYPE),
+ tasks = taskList != null ? taskList.get(0) : null, numTasks++) {
assertTrue(helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
- assertNull(
-
taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+ assertNull(taskManager.scheduleAllTasksForTable(realtimeTableName, null)
+ .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
waitForTaskToComplete();
// Check not using watermarks
@@ -1069,11 +1062,10 @@ public class MergeRollupMinionClusterIntegrationTest
extends BaseClusterIntegrat
waitForAllDocsLoaded(600_000L);
for (String tasks =
-
taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0);
- tasks != null; taskList =
-
taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE),
- tasks = taskList != null ? taskList.get(0) : null,
- numTasks++) {
+ taskManager.scheduleAllTasksForTable(realtimeTableName,
null).get(MinionConstants.MergeRollupTask.TASK_TYPE)
+ .get(0); tasks != null; taskList =
+ taskManager.scheduleAllTasksForTable(realtimeTableName,
null).get(MinionConstants.MergeRollupTask.TASK_TYPE),
+ tasks = taskList != null ? taskList.get(0) : null, numTasks++) {
waitForTaskToComplete();
// Check metrics
long numBucketsToProcess =
MetricValueUtils.getGaugeValue(_controllerStarter.getControllerMetrics(),
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
index c4ba131f6d..da4e85696c 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.integration.tests;
-import com.google.common.collect.ImmutableList;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
@@ -63,7 +62,6 @@ public class PurgeMinionClusterIntegrationTest extends
BaseClusterIntegrationTes
private static final String PURGE_DELTA_NOT_PASSED_TABLE = "myTable3";
private static final String PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE =
"myTable4";
-
protected PinotHelixTaskResourceManager _helixTaskResourceManager;
protected PinotTaskManager _taskManager;
protected PinotHelixResourceManager _pinotHelixResourceManager;
@@ -83,12 +81,8 @@ public class PurgeMinionClusterIntegrationTest extends
BaseClusterIntegrationTes
startBrokers(1);
startServers(1);
- List<String> allTables = ImmutableList.of(
- PURGE_FIRST_RUN_TABLE,
- PURGE_DELTA_PASSED_TABLE,
- PURGE_DELTA_NOT_PASSED_TABLE,
- PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE
- );
+ List<String> allTables = List.of(PURGE_FIRST_RUN_TABLE,
PURGE_DELTA_PASSED_TABLE, PURGE_DELTA_NOT_PASSED_TABLE,
+ PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE);
Schema schema = null;
TableConfig tableConfig = null;
for (String tableName : allTables) {
@@ -152,12 +146,9 @@ public class PurgeMinionClusterIntegrationTest extends
BaseClusterIntegrationTes
private void setRecordPurger() {
MinionContext minionContext = MinionContext.getInstance();
minionContext.setRecordPurgerFactory(rawTableName -> {
- List<String> tableNames = Arrays.asList(
- PURGE_FIRST_RUN_TABLE,
- PURGE_DELTA_PASSED_TABLE,
- PURGE_DELTA_NOT_PASSED_TABLE,
- PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE
- );
+ List<String> tableNames =
+ Arrays.asList(PURGE_FIRST_RUN_TABLE, PURGE_DELTA_PASSED_TABLE,
PURGE_DELTA_NOT_PASSED_TABLE,
+ PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE);
if (tableNames.contains(rawTableName)) {
return row -> row.getValue("ArrTime").equals(1);
} else {
@@ -195,11 +186,12 @@ public class PurgeMinionClusterIntegrationTest extends
BaseClusterIntegrationTes
// 5. Check the purge process itself by setting an expecting number of rows
String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(PURGE_FIRST_RUN_TABLE);
-
assertNotNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+ assertNotNull(
+ _taskManager.scheduleAllTasksForTable(offlineTableName,
null).get(MinionConstants.PurgeTask.TASK_TYPE));
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.PurgeTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
-
assertNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+ assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName,
null).get(MinionConstants.PurgeTask.TASK_TYPE));
waitForTaskToComplete();
// Check that metadata contains expected values
@@ -209,7 +201,7 @@ public class PurgeMinionClusterIntegrationTest extends
BaseClusterIntegrationTes
metadata.getCustomMap().containsKey(MinionConstants.PurgeTask.TASK_TYPE +
MinionConstants.TASK_TIME_SUFFIX));
}
// Should not generate new purge task as the last time purge is not
greater than last + 1day (default purge delay)
-
assertNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+ assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName,
null).get(MinionConstants.PurgeTask.TASK_TYPE));
// 52 rows with ArrTime = 1
// 115545 totals rows
@@ -239,11 +231,12 @@ public class PurgeMinionClusterIntegrationTest extends
BaseClusterIntegrationTes
// 5. Check the purge process itself by setting an expecting number of rows
String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(PURGE_DELTA_PASSED_TABLE);
-
assertNotNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+ assertNotNull(
+ _taskManager.scheduleAllTasksForTable(offlineTableName,
null).get(MinionConstants.PurgeTask.TASK_TYPE));
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.PurgeTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
-
assertNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+ assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName,
null).get(MinionConstants.PurgeTask.TASK_TYPE));
waitForTaskToComplete();
// Check that metadata contains expected values
@@ -255,7 +248,7 @@ public class PurgeMinionClusterIntegrationTest extends
BaseClusterIntegrationTes
assertTrue(System.currentTimeMillis() - Long.parseLong(purgeTime) <
86400000);
}
// Should not generate new purge task as the last time purge is not
greater than last + 1day (default purge delay)
-
assertNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+ assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName,
null).get(MinionConstants.PurgeTask.TASK_TYPE));
// 52 rows with ArrTime = 1
// 115545 totals rows
@@ -287,7 +280,7 @@ public class PurgeMinionClusterIntegrationTest extends
BaseClusterIntegrationTes
String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(PURGE_DELTA_NOT_PASSED_TABLE);
// No task should be schedule as the delay is not passed
-
assertNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+ assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName,
null).get(MinionConstants.PurgeTask.TASK_TYPE));
for (SegmentZKMetadata metadata :
_pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) {
// Check purge time
String purgeTime =
@@ -338,10 +331,11 @@ public class PurgeMinionClusterIntegrationTest extends
BaseClusterIntegrationTes
// schedule purge tasks
String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE);
-
assertNotNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+ assertNotNull(
+ _taskManager.scheduleAllTasksForTable(offlineTableName,
null).get(MinionConstants.PurgeTask.TASK_TYPE));
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.PurgeTask.TASK_TYPE)));
-
assertNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+ assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName,
null).get(MinionConstants.PurgeTask.TASK_TYPE));
waitForTaskToComplete();
// Check that metadata contains expected values
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
index 043c654ef7..e6c8ce2700 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
@@ -134,14 +134,14 @@ public class
RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends BaseC
Map<String, String> taskConfigsWithMetadata = new HashMap<>();
taskConfigsWithMetadata.put(BatchConfigProperties.OVERWRITE_OUTPUT,
"true");
- taskConfigsWithMetadata.put(
- BatchConfigProperties.PUSH_MODE,
BatchConfigProperties.SegmentPushType.METADATA.toString());
+ taskConfigsWithMetadata.put(BatchConfigProperties.PUSH_MODE,
+ BatchConfigProperties.SegmentPushType.METADATA.toString());
String tableWithMetadataPush = "myTable2";
schema.setSchemaName(tableWithMetadataPush);
addSchema(schema);
TableConfig realtimeMetadataTableConfig =
createRealtimeTableConfig(avroFiles.get(0), tableWithMetadataPush,
- new TableTaskConfig(Collections.singletonMap(
- MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
taskConfigsWithMetadata)));
+ new
TableTaskConfig(Collections.singletonMap(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
+ taskConfigsWithMetadata)));
realtimeMetadataTableConfig.setIngestionConfig(ingestionConfig);
realtimeMetadataTableConfig.setFieldConfigList(Collections.singletonList(tsFieldConfig));
addTableConfig(realtimeMetadataTableConfig);
@@ -151,7 +151,6 @@ public class
RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends BaseC
offlineMetadataTableConfig.setFieldConfigList(Collections.singletonList(tsFieldConfig));
addTableConfig(offlineMetadataTableConfig);
-
// Push data into Kafka
pushAvroIntoKafka(avroFiles);
@@ -163,7 +162,6 @@ public class
RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends BaseC
waitForDocsLoaded(600_000L, true, tableWithMetadataPush);
-
_taskResourceManager = _controllerStarter.getHelixTaskResourceManager();
_taskManager = _controllerStarter.getTaskManager();
_realtimeTableName =
TableNameBuilder.REALTIME.tableNameWithType(getTableName());
@@ -181,8 +179,8 @@ public class
RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends BaseC
}
_dataSmallestTimeMs = minSegmentTimeMs;
- segmentsZKMetadata =
_helixResourceManager.getSegmentsZKMetadata(_realtimeMetadataTableName);
- minSegmentTimeMs = Long.MAX_VALUE;
+ segmentsZKMetadata =
_helixResourceManager.getSegmentsZKMetadata(_realtimeMetadataTableName);
+ minSegmentTimeMs = Long.MAX_VALUE;
for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
if (segmentZKMetadata.getStatus() ==
CommonConstants.Segment.Realtime.Status.DONE) {
minSegmentTimeMs = Math.min(minSegmentTimeMs,
segmentZKMetadata.getStartTimeMs());
@@ -193,29 +191,28 @@ public class
RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends BaseC
private TableConfig createOfflineTableConfig(String tableName, @Nullable
TableTaskConfig taskConfig,
@Nullable SegmentPartitionConfig partitionConfig) {
- return new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName)
-
.setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn())
-
.setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns())
-
.setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns())
-
.setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion())
-
.setLoadMode(getLoadMode()).setTaskConfig(taskConfig).setBrokerTenant(getBrokerTenant())
-
.setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig())
-
.setNullHandlingEnabled(getNullHandlingEnabled()).setSegmentPartitionConfig(partitionConfig).build();
+ return new
TableConfigBuilder(TableType.OFFLINE).setTableName(tableName).setTimeColumnName(getTimeColumnName())
+
.setSortedColumn(getSortedColumn()).setInvertedIndexColumns(getInvertedIndexColumns())
+
.setNoDictionaryColumns(getNoDictionaryColumns()).setRangeIndexColumns(getRangeIndexColumns())
+
.setBloomFilterColumns(getBloomFilterColumns()).setFieldConfigList(getFieldConfigs())
+
.setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode())
+
.setTaskConfig(taskConfig).setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant())
+
.setIngestionConfig(getIngestionConfig()).setNullHandlingEnabled(getNullHandlingEnabled())
+ .setSegmentPartitionConfig(partitionConfig).build();
}
protected TableConfig createRealtimeTableConfig(File sampleAvroFile, String
tableName, TableTaskConfig taskConfig) {
AvroFileSchemaKafkaAvroMessageDecoder._avroFile = sampleAvroFile;
- return new TableConfigBuilder(TableType.REALTIME).setTableName(tableName)
-
.setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn())
-
.setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns())
-
.setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns())
-
.setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion())
-
.setLoadMode(getLoadMode()).setTaskConfig(taskConfig).setBrokerTenant(getBrokerTenant())
-
.setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig()).setQueryConfig(getQueryConfig())
-
.setStreamConfigs(getStreamConfigs()).setNullHandlingEnabled(getNullHandlingEnabled()).build();
+ return new
TableConfigBuilder(TableType.REALTIME).setTableName(tableName).setTimeColumnName(getTimeColumnName())
+
.setSortedColumn(getSortedColumn()).setInvertedIndexColumns(getInvertedIndexColumns())
+
.setNoDictionaryColumns(getNoDictionaryColumns()).setRangeIndexColumns(getRangeIndexColumns())
+
.setBloomFilterColumns(getBloomFilterColumns()).setFieldConfigList(getFieldConfigs())
+
.setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode())
+
.setTaskConfig(taskConfig).setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant())
+
.setIngestionConfig(getIngestionConfig()).setQueryConfig(getQueryConfig()).setStreamConfigs(getStreamConfigs())
+ .setNullHandlingEnabled(getNullHandlingEnabled()).build();
}
-
@Test
public void testRealtimeToOfflineSegmentsTask()
throws Exception {
@@ -234,12 +231,12 @@ public class
RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends BaseC
long expectedWatermark = _dataSmallestTimeMs + 86400000;
for (int i = 0; i < 3; i++) {
// Schedule task
- assertNotNull(_taskManager.scheduleTasks(_realtimeTableName)
+ assertNotNull(_taskManager.scheduleAllTasksForTable(_realtimeTableName,
null)
.get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
assertTrue(_taskResourceManager.getTaskQueues().contains(
PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)));
// Should not generate more tasks
- assertNull(_taskManager.scheduleTasks(_realtimeTableName)
+ assertNull(_taskManager.scheduleAllTasksForTable(_realtimeTableName,
null)
.get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
// Wait at most 600 seconds for all tasks COMPLETED
@@ -286,12 +283,12 @@ public class
RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends BaseC
_taskManager.cleanUpTask();
for (int i = 0; i < 3; i++) {
// Schedule task
- assertNotNull(_taskManager.scheduleTasks(_realtimeMetadataTableName)
+
assertNotNull(_taskManager.scheduleAllTasksForTable(_realtimeMetadataTableName,
null)
.get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
assertTrue(_taskResourceManager.getTaskQueues().contains(
PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)));
// Should not generate more tasks
- assertNull(_taskManager.scheduleTasks(_realtimeMetadataTableName)
+
assertNull(_taskManager.scheduleAllTasksForTable(_realtimeMetadataTableName,
null)
.get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
// Wait at most 600 seconds for all tasks COMPLETED
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
index 241c1c0876..78aa4d1c24 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
@@ -87,8 +87,8 @@ public class SimpleMinionClusterIntegrationTest extends
ClusterTest {
properties.put(TASK_TYPE +
MinionConstants.MAX_ATTEMPTS_PER_TASK_KEY_SUFFIX, "2");
helixResourceManager.getHelixAdmin().setConfig(
- new
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER)
- .forCluster(helixResourceManager.getHelixClusterName()).build(),
properties);
+ new
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(
+ helixResourceManager.getHelixClusterName()).build(), properties);
// Add 3 offline tables, where 2 of them have TestTask enabled
addDummySchema(TABLE_NAME_1);
@@ -136,7 +136,7 @@ public class SimpleMinionClusterIntegrationTest extends
ClusterTest {
assertEquals(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).size(), 0);
// Should create the task queues and generate a task in the same minion
instance
- List<String> task1 = _taskManager.scheduleTasks().get(TASK_TYPE);
+ List<String> task1 =
_taskManager.scheduleAllTasksForAllTables(null).get(TASK_TYPE);
assertNotNull(task1);
assertEquals(task1.size(), 1);
assertTrue(_helixTaskResourceManager.getTaskQueues()
@@ -150,7 +150,7 @@ public class SimpleMinionClusterIntegrationTest extends
ClusterTest {
verifyTaskCount(task1.get(0), 0, 1, 1, 2);
// Should generate one more task, with two sub-tasks. Both of these
sub-tasks will wait
// since we have one minion instance that is still running one of the
sub-tasks.
- List<String> task2 = _taskManager.scheduleTask(TASK_TYPE, null);
+ List<String> task2 = _taskManager.scheduleTaskForAllTables(TASK_TYPE,
null);
assertNotNull(task2);
assertEquals(task2.size(), 1);
assertTrue(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).contains(task2.get(0)));
@@ -159,8 +159,8 @@ public class SimpleMinionClusterIntegrationTest extends
ClusterTest {
// Should not generate more tasks since
SimpleMinionClusterIntegrationTests.NUM_TASKS is 2.
// Our test task generator does not generate if there are already this
many sub-tasks in the
// running+waiting count already.
- assertNull(_taskManager.scheduleTasks().get(TASK_TYPE));
- assertNull(_taskManager.scheduleTask(TASK_TYPE, null));
+ assertNull(_taskManager.scheduleAllTasksForAllTables(null).get(TASK_TYPE));
+ assertNull(_taskManager.scheduleTaskForAllTables(TASK_TYPE, null));
// Wait at most 60 seconds for all tasks IN_PROGRESS
TestUtils.waitForCondition(input -> {
@@ -183,13 +183,12 @@ public class SimpleMinionClusterIntegrationTest extends
ClusterTest {
String inProgressGauge = TASK_TYPE + "." + TaskState.IN_PROGRESS;
String stoppedGauge = TASK_TYPE + "." + TaskState.STOPPED;
String completedGauge = TASK_TYPE + "." + TaskState.COMPLETED;
- TestUtils.waitForCondition(
- input -> MetricValueUtils.getGlobalGaugeValue(controllerMetrics,
inProgressGauge, ControllerGauge.TASK_STATUS)
+ TestUtils.waitForCondition(input ->
+ MetricValueUtils.getGlobalGaugeValue(controllerMetrics,
inProgressGauge, ControllerGauge.TASK_STATUS)
== NUM_TASKS
&& MetricValueUtils.getGlobalGaugeValue(controllerMetrics,
stoppedGauge, ControllerGauge.TASK_STATUS) == 0
&& MetricValueUtils.getGlobalGaugeValue(controllerMetrics,
completedGauge, ControllerGauge.TASK_STATUS)
- == 0,
- ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges");
+ == 0, ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller
gauges");
// Stop the task queue
_helixTaskResourceManager.stopTaskQueue(TASK_TYPE);
@@ -211,14 +210,12 @@ public class SimpleMinionClusterIntegrationTest extends
ClusterTest {
}, STATE_TRANSITION_TIMEOUT_MS, "Failed to get all tasks STOPPED");
// Wait at most 30 seconds for ZK callback to update the controller gauges
- TestUtils.waitForCondition(
- input -> MetricValueUtils.getGlobalGaugeValue(controllerMetrics,
inProgressGauge, ControllerGauge.TASK_STATUS)
- == 0
+ TestUtils.waitForCondition(input ->
+ MetricValueUtils.getGlobalGaugeValue(controllerMetrics,
inProgressGauge, ControllerGauge.TASK_STATUS) == 0
&& MetricValueUtils.getGlobalGaugeValue(controllerMetrics,
stoppedGauge, ControllerGauge.TASK_STATUS)
== NUM_TASKS
&& MetricValueUtils.getGlobalGaugeValue(controllerMetrics,
completedGauge, ControllerGauge.TASK_STATUS)
- == 0,
- ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges");
+ == 0, ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller
gauges");
// Task deletion requires the task queue to be stopped,
// so deleting task1 here before resuming the task queue.
@@ -247,13 +244,11 @@ public class SimpleMinionClusterIntegrationTest extends
ClusterTest {
}, STATE_TRANSITION_TIMEOUT_MS, "Failed to get all tasks COMPLETED");
// Wait at most 30 seconds for ZK callback to update the controller gauges
- TestUtils.waitForCondition(
- input -> MetricValueUtils.getGlobalGaugeValue(controllerMetrics,
inProgressGauge, ControllerGauge.TASK_STATUS)
- == 0
+ TestUtils.waitForCondition(input ->
+ MetricValueUtils.getGlobalGaugeValue(controllerMetrics,
inProgressGauge, ControllerGauge.TASK_STATUS) == 0
&& MetricValueUtils.getGlobalGaugeValue(controllerMetrics,
stoppedGauge, ControllerGauge.TASK_STATUS) == 0
- && MetricValueUtils.getGlobalGaugeValue(controllerMetrics,
completedGauge, ControllerGauge.TASK_STATUS)
- == (NUM_TASKS - 1),
- ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges");
+ && MetricValueUtils.getGlobalGaugeValue(controllerMetrics,
completedGauge, ControllerGauge.TASK_STATUS) == (
+ NUM_TASKS - 1), ZK_CALLBACK_TIMEOUT_MS, "Failed to update the
controller gauges");
// Delete the task queue
_helixTaskResourceManager.deleteTaskQueue(TASK_TYPE, false);
@@ -263,13 +258,11 @@ public class SimpleMinionClusterIntegrationTest extends
ClusterTest {
STATE_TRANSITION_TIMEOUT_MS, "Failed to delete the task queue");
// Wait at most 30 seconds for ZK callback to update the controller gauges
- TestUtils.waitForCondition(
- input -> MetricValueUtils.getGlobalGaugeValue(controllerMetrics,
inProgressGauge, ControllerGauge.TASK_STATUS)
- == 0
+ TestUtils.waitForCondition(input ->
+ MetricValueUtils.getGlobalGaugeValue(controllerMetrics,
inProgressGauge, ControllerGauge.TASK_STATUS) == 0
&& MetricValueUtils.getGlobalGaugeValue(controllerMetrics,
stoppedGauge, ControllerGauge.TASK_STATUS) == 0
&& MetricValueUtils.getGlobalGaugeValue(controllerMetrics,
completedGauge, ControllerGauge.TASK_STATUS)
- == 0,
- ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges");
+ == 0, ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller
gauges");
}
@AfterClass
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java
index d292ef4c9b..5058fd4b75 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java
@@ -489,7 +489,7 @@ public class TlsIntegrationTest extends
BaseClusterIntegrationTest {
Assert.assertTrue(resultBeforeOffline.getResultSet(0).getLong(0) > 0);
// schedule offline segment generation
- Assert.assertNotNull(_controllerStarter.getTaskManager().scheduleTasks());
+
Assert.assertNotNull(_controllerStarter.getTaskManager().scheduleAllTasksForAllTables(null));
// wait for offline segments
JsonNode offlineSegments = TestUtils.waitForResult(() -> {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
index 238d515b54..19c3ac61ff 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
@@ -471,8 +471,8 @@ public class UpsertTableIntegrationTest extends
BaseClusterIntegrationTestSet {
waitForAllDocsLoaded(tableName, 600_000L, 1000);
assertEquals(getScore(tableName), 3692);
waitForNumQueriedSegmentsToConverge(tableName, 10_000L, 3);
-
-
assertNotNull(_taskManager.scheduleTasks(TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName))
+ String realtimeTableName =
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName);
+ assertNotNull(_taskManager.scheduleAllTasksForTable(realtimeTableName,
null)
.get(MinionConstants.UpsertCompactionTask.TASK_TYPE));
waitForTaskToComplete();
waitForAllDocsLoaded(tableName, 600_000L, 3);
@@ -501,8 +501,8 @@ public class UpsertTableIntegrationTest extends
BaseClusterIntegrationTestSet {
waitForAllDocsLoaded(tableName, 600_000L, 2000);
assertEquals(getScore(tableName), 3692);
waitForNumQueriedSegmentsToConverge(tableName, 10_000L, 5);
-
-
assertNotNull(_taskManager.scheduleTasks(TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName))
+ String realtimeTableName =
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName);
+ assertNotNull(_taskManager.scheduleAllTasksForTable(realtimeTableName,
null)
.get(MinionConstants.UpsertCompactionTask.TASK_TYPE));
waitForTaskToComplete();
waitForAllDocsLoaded(tableName, 600_000L, 3);
@@ -546,7 +546,8 @@ public class UpsertTableIntegrationTest extends
BaseClusterIntegrationTestSet {
// Run segment compaction. This time, we expect that the deleting rows are
still there because they are
// as part of the consuming segment
-
assertNotNull(_taskManager.scheduleTasks(TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName))
+ String realtimeTableName =
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName);
+ assertNotNull(_taskManager.scheduleAllTasksForTable(realtimeTableName,
null)
.get(MinionConstants.UpsertCompactionTask.TASK_TYPE));
waitForTaskToComplete();
waitForAllDocsLoaded(tableName, 600_000L, 3);
@@ -563,7 +564,8 @@ public class UpsertTableIntegrationTest extends
BaseClusterIntegrationTestSet {
assertEquals(getNumDeletedRows(tableName), 2);
// Run segment compaction. This time, we expect that the deleting rows are
cleaned up
-
assertNotNull(_taskManager.scheduleTasks(TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName))
+ realtimeTableName =
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName);
+ assertNotNull(_taskManager.scheduleAllTasksForTable(realtimeTableName,
null)
.get(MinionConstants.UpsertCompactionTask.TASK_TYPE));
waitForTaskToComplete();
waitForAllDocsLoaded(tableName, 600_000L, 3);
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UrlAuthRealtimeIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UrlAuthRealtimeIntegrationTest.java
index e8389b377f..08aa9aee6a 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UrlAuthRealtimeIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UrlAuthRealtimeIntegrationTest.java
@@ -203,7 +203,7 @@ public class UrlAuthRealtimeIntegrationTest extends
BaseClusterIntegrationTest {
Assert.assertTrue(resultBeforeOffline.getResultSet(0).getLong(0) > 0);
// schedule offline segment generation
- Assert.assertNotNull(_controllerStarter.getTaskManager().scheduleTasks());
+
Assert.assertNotNull(_controllerStarter.getTaskManager().scheduleAllTasksForAllTables(null));
// wait for offline segments
JsonNode offlineSegments = TestUtils.waitForResult(() -> {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]