This is an automated email from the ASF dual-hosted git repository.
sajjad pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a39ad22d7a Change Pre-filter logic in MergeRollup task (#15177)
a39ad22d7a is described below
commit a39ad22d7ae8489f486c359bdd2656a4a8585adb
Author: Quan Yuan (Kate) <[email protected]>
AuthorDate: Fri Mar 7 13:06:24 2025 -0800
Change Pre-filter logic in MergeRollup task (#15177)
---
.../MergeRollupMinionClusterIntegrationTest.java | 4 +-
.../mergerollup/MergeRollupTaskGenerator.java | 91 ++++++++++++----------
.../mergerollup/MergeRollupTaskGeneratorTest.java | 62 +++++++++++++--
3 files changed, 108 insertions(+), 49 deletions(-)
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
index 71a55da67d..e7b51c6064 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
@@ -1031,8 +1031,8 @@ public class MergeRollupMinionClusterIntegrationTest
extends BaseClusterIntegrat
String sqlQuery = "SELECT count(*) FROM " + tableName;
JsonNode expectedJson = postQuery(sqlQuery);
- long[] expectedNumBucketsToProcess100Days = {3, 2, 1, 0, 3, 2, 1, 0};
- long[] expectedNumBucketsToProcess200Days = {0, 0, 1, 1, 0, 0, 1, 1};
+ long[] expectedNumBucketsToProcess100Days = {2, 1, 0, 0, 3, 2, 1, 0};
+ long[] expectedNumBucketsToProcess200Days = {0, 0, 2, 1, 0, 0, 1, 1};
String realtimeTableName =
TableNameBuilder.REALTIME.tableNameWithType(tableName);
int numTasks = 0;
TaskSchedulingContext context = new TaskSchedulingContext()
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
index 10dcfcbf66..c4f9bd2175 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
@@ -42,6 +42,7 @@ import
org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.minion.MergeRollupTaskMetadata;
+import org.apache.pinot.common.utils.LLCSegmentName;
import
org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
import
org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
import
org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtils;
@@ -161,21 +162,22 @@ public class MergeRollupTaskGenerator extends
BaseTaskGenerator {
LOGGER.info("Start generating task configs for table: {} for task: {}",
tableNameWithType, taskType);
// Get all segment metadata
- List<SegmentZKMetadata> allSegments =
getSegmentsZKMetadataForTable(tableNameWithType);
- // Filter segments based on status
- List<SegmentZKMetadata> preSelectedSegmentsBasedOnStatus
- = filterSegmentsBasedOnStatus(tableConfig.getTableType(),
allSegments);
+ List<SegmentZKMetadata> allSegments =
+ tableConfig.getTableType() == TableType.OFFLINE
+ ? getSegmentsZKMetadataForTable(tableNameWithType)
+ : filterSegmentsforRealtimeTable(
+
getNonConsumingSegmentsZKMetadataForRealtimeTable(tableNameWithType));
// Select current segment snapshot based on lineage, filter out empty
segments
SegmentLineage segmentLineage =
_clusterInfoAccessor.getSegmentLineage(tableNameWithType);
Set<String> preSelectedSegmentsBasedOnLineage = new HashSet<>();
- for (SegmentZKMetadata segment : preSelectedSegmentsBasedOnStatus) {
+ for (SegmentZKMetadata segment : allSegments) {
preSelectedSegmentsBasedOnLineage.add(segment.getSegmentName());
}
SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(preSelectedSegmentsBasedOnLineage,
segmentLineage);
List<SegmentZKMetadata> preSelectedSegments = new ArrayList<>();
- for (SegmentZKMetadata segment : preSelectedSegmentsBasedOnStatus) {
+ for (SegmentZKMetadata segment : allSegments) {
if
(preSelectedSegmentsBasedOnLineage.contains(segment.getSegmentName()) &&
segment.getTotalDocs() > 0
&& MergeTaskUtils.allowMerge(segment)) {
preSelectedSegments.add(segment);
@@ -543,44 +545,49 @@ public class MergeRollupTaskGenerator extends
BaseTaskGenerator {
}
@VisibleForTesting
- static List<SegmentZKMetadata> filterSegmentsBasedOnStatus(TableType
tableType, List<SegmentZKMetadata> allSegments) {
- if (tableType == TableType.REALTIME) {
- // For realtime table, don't process
- // 1. in-progress segments (Segment.Realtime.Status.IN_PROGRESS)
- // 2. sealed segments with start time later than the earliest start time
of all in progress segments
- // This prevents those in-progress segments from not being merged.
- //
- // Note that we make the following two assumptions here:
- // 1. streaming data consumer lags are negligible
- // 2. streaming data records are ingested mostly in chronological order
(no records are ingested with delay larger
- // than bufferTimeMS)
- //
- // We don't handle the following cases intentionally because it will be
either overkill or too complex
- // 1. New partition added. If new partitions are not picked up timely,
the MergeRollupTask will move watermarks
- // forward, and may not be able to merge some lately-created segments
for those new partitions -- users should
- // configure pinot properly to discover new partitions timely, or
they should restart pinot servers manually
- // for new partitions to be picked up
- // 2. (1) no new in-progress segments are created for some partitions
(2) new in-progress segments are created for
- // partitions, but there is no record consumed (i.e, empty
in-progress segments). In those two cases,
- // if new records are consumed later, the MergeRollupTask may have
already moved watermarks forward, and may
- // not be able to merge those lately-created segments -- we assume
that users will have a way to backfill those
- // records correctly.
- long earliestStartTimeMsOfInProgressSegments = Long.MAX_VALUE;
- for (SegmentZKMetadata segmentZKMetadata : allSegments) {
- if (!segmentZKMetadata.getStatus().isCompleted()
- && segmentZKMetadata.getTotalDocs() > 0
- && segmentZKMetadata.getStartTimeMs() <
earliestStartTimeMsOfInProgressSegments) {
- earliestStartTimeMsOfInProgressSegments =
segmentZKMetadata.getStartTimeMs();
- }
+ static List<SegmentZKMetadata>
filterSegmentsforRealtimeTable(List<SegmentZKMetadata> allSegments) {
+ // For realtime table, don't process
+ // 1. in-progress segments (Segment.Realtime.Status.IN_PROGRESS), this has
been taken care of in
+ // getNonConsumingSegmentsZKMetadataForRealtimeTable()
+ // 2. most recent sealed segments in each partition, this prevents those
paused segments from being merged.
+ //
+ // Note that we make the following two assumptions here:
+ // 1. streaming data consumer lags are negligible
+ // 2. streaming data records are ingested mostly in chronological order
(no records are ingested with delay larger
+ // than bufferTimeMS)
+ //
+ // We don't handle the following cases intentionally because it will be
either overkill or too complex
+ // 1. New partition added. If new partitions are not picked up timely, the
MergeRollupTask will move watermarks
+ // forward, and may not be able to merge some lately-created segments
for those new partitions -- users should
+ // configure pinot properly to discover new partitions timely, or they
should restart pinot servers manually
+ // for new partitions to be picked up
+ // 2. (1) no new in-progress segments are created for some partitions (2)
new in-progress segments are created for
+ // partitions, but there is no record consumed (i.e, empty in-progress
segments). In those two cases,
+ // if new records are consumed later, the MergeRollupTask may have
already moved watermarks forward, and may
+ // not be able to merge those lately-created segments -- we assume that
users will have a way to backfill those
+ // records correctly.
+ Map<Integer, LLCSegmentName> partitionIdToLatestCompletedSegment = new
HashMap<>();
+ for (SegmentZKMetadata segmentZKMetadata : allSegments) {
+ String segmentName = segmentZKMetadata.getSegmentName();
+ if (LLCSegmentName.isLLCSegment(segmentName)) {
+ LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+
partitionIdToLatestCompletedSegment.compute(llcSegmentName.getPartitionGroupId(),
(partId, latestSegment) -> {
+ if (latestSegment == null) {
+ return llcSegmentName;
+ } else {
+ return latestSegment.getSequenceNumber() >
llcSegmentName.getSequenceNumber()
+ ? latestSegment : llcSegmentName;
+ }
+ });
}
- final long finalEarliestStartTimeMsOfInProgressSegments =
earliestStartTimeMsOfInProgressSegments;
- return allSegments.stream()
- .filter(segmentZKMetadata ->
segmentZKMetadata.getStatus().isCompleted()
- && segmentZKMetadata.getStartTimeMs() <
finalEarliestStartTimeMsOfInProgressSegments)
- .collect(Collectors.toList());
- } else {
- return allSegments;
}
+ Set<String> filteredSegmentNames = new HashSet<>();
+ for (LLCSegmentName llcSegmentName :
partitionIdToLatestCompletedSegment.values()) {
+ filteredSegmentNames.add(llcSegmentName.getSegmentName());
+ }
+ return allSegments.stream()
+ .filter(a -> !filteredSegmentNames.contains(a.getSegmentName()))
+ .collect(Collectors.toList());
}
/**
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
index 18d60d0f9b..7665e7308f 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
@@ -246,7 +246,7 @@ public class MergeRollupTaskGeneratorTest {
// the two following segments will be skipped when generating tasks
SegmentZKMetadata realtimeTableSegmentMetadata1 =
getSegmentZKMetadata("testTable__0__0__0", 5000, 50_000,
TimeUnit.MILLISECONDS, null);
-
realtimeTableSegmentMetadata1.setStatus(CommonConstants.Segment.Realtime.Status.IN_PROGRESS);
+
realtimeTableSegmentMetadata1.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
SegmentZKMetadata realtimeTableSegmentMetadata2 =
getSegmentZKMetadata("testTable__1__0__0", 5000, 50_000,
TimeUnit.MILLISECONDS, null);
when(mockClusterInfoProvide.getSegmentsZKMetadata(REALTIME_TABLE_NAME)).thenReturn(
@@ -266,15 +266,14 @@ public class MergeRollupTaskGeneratorTest {
// Skip task generation, if the table is a realtime table and all segments
are skipped
// We don't test realtime REFRESH table because this combination does not
make sense
-
assertTrue(MergeRollupTaskGenerator.filterSegmentsBasedOnStatus(TableType.REALTIME,
- Lists.newArrayList(realtimeTableSegmentMetadata1,
realtimeTableSegmentMetadata2)).isEmpty());
+ assertTrue(MergeRollupTaskGenerator.filterSegmentsforRealtimeTable(
+ Lists.newArrayList(realtimeTableSegmentMetadata1,
realtimeTableSegmentMetadata2)
+ ).isEmpty());
TableConfig realtimeTableConfig = getTableConfig(TableType.REALTIME, new
HashMap<>());
List<PinotTaskConfig> pinotTaskConfigs =
generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
assertTrue(pinotTaskConfigs.isEmpty());
// Skip task generation, if the table is an offline REFRESH table
-
assertFalse(MergeRollupTaskGenerator.filterSegmentsBasedOnStatus(TableType.OFFLINE,
- Lists.newArrayList(offlineTableSegmentMetadata)).isEmpty());
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setBatchIngestionConfig(new BatchIngestionConfig(null,
"REFRESH", null));
TableConfig offlineTableConfig = getTableConfig(TableType.OFFLINE, new
HashMap<>());
@@ -283,6 +282,46 @@ public class MergeRollupTaskGeneratorTest {
assertTrue(pinotTaskConfigs.isEmpty());
}
+ /**
+ * Test pre-filter of task generation
+ */
+ @Test
+ public void testFilterSegmentsforRealtimeTable() {
+ ClusterInfoAccessor mockClusterInfoProvide =
mock(ClusterInfoAccessor.class);
+
+
when(mockClusterInfoProvide.getTaskStates(MinionConstants.MergeRollupTask.TASK_TYPE)).thenReturn(new
HashMap<>());
+ // construct 3 following segments, among these, only 0_0 can be scheduled,
others should be filtered out
+ // partition 0, completed 0
+ SegmentZKMetadata realtimeTableSegmentMetadata1 =
+ getSegmentZKMetadata("testTable__0__0__20250224T0900Z", 5000,
6000, TimeUnit.MILLISECONDS,
+ null, "50000", "60000");
+
realtimeTableSegmentMetadata1.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+ // partition 0, completed 1
+ SegmentZKMetadata realtimeTableSegmentMetadata2 =
+ getSegmentZKMetadata("testTable__0__1__20250224T0902Z", 6000,
7000, TimeUnit.MILLISECONDS,
+ null, "60000", "70000");
+
realtimeTableSegmentMetadata2.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+ // partition 1, completed 0
+ SegmentZKMetadata realtimeTableSegmentMetadata3 =
+ getSegmentZKMetadata("testTable__1__0__20250224T0900Z", 5500,
6500, TimeUnit.MILLISECONDS,
+ null, "55000", "65000");
+
realtimeTableSegmentMetadata3.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+
when(mockClusterInfoProvide.getSegmentsZKMetadata(REALTIME_TABLE_NAME)).thenReturn(
+ Lists.newArrayList(realtimeTableSegmentMetadata1,
realtimeTableSegmentMetadata2,
+ realtimeTableSegmentMetadata3));
+ when(mockClusterInfoProvide.getIdealState(REALTIME_TABLE_NAME)).thenReturn(
+ getIdealState(REALTIME_TABLE_NAME,
Lists.newArrayList("testTable__0", "server0", "ONLINE")));
+
+ MergeRollupTaskGenerator generator = new MergeRollupTaskGenerator();
+ generator.init(mockClusterInfoProvide);
+
+ List<SegmentZKMetadata> filterResult =
MergeRollupTaskGenerator.filterSegmentsforRealtimeTable(
+ Lists.newArrayList(realtimeTableSegmentMetadata1,
realtimeTableSegmentMetadata2,
+ realtimeTableSegmentMetadata3));
+ assertEquals(filterResult.size(), 1);
+ assertEquals(filterResult.get(0).getSegmentName(),
"testTable__0__0__20250224T0900Z");
+ }
+
private void checkPinotTaskConfig(Map<String, String> pinotTaskConfig,
String segments, String mergeLevel,
String mergeType, String partitionBucketTimePeriod, String
roundBucketTimePeriod,
String maxNumRecordsPerSegments) {
@@ -1034,6 +1073,19 @@ public class MergeRollupTaskGeneratorTest {
return segmentZKMetadata;
}
+ private SegmentZKMetadata getSegmentZKMetadata(String segmentName, long
startTime, long endTime, TimeUnit timeUnit,
+ String downloadURL, String
startOffset, String endOffset) {
+ SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(segmentName);
+ segmentZKMetadata.setStartTime(startTime);
+ segmentZKMetadata.setEndTime(endTime);
+ segmentZKMetadata.setTimeUnit(timeUnit);
+ segmentZKMetadata.setDownloadUrl(downloadURL);
+ segmentZKMetadata.setTotalDocs(1000);
+ segmentZKMetadata.setStartOffset(startOffset);
+ segmentZKMetadata.setEndOffset(endOffset);
+ return segmentZKMetadata;
+ }
+
private IdealState getIdealState(String tableName, List<String>
segmentNames) {
IdealState idealState = new IdealState(tableName);
idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]