This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 7ca198408ce Add consensus check before selecting a segment for
compaction or deletion (#17352)
7ca198408ce is described below
commit 7ca198408ce254da73a4a8ba1ae955ee95640715
Author: tarun11Mavani <[email protected]>
AuthorDate: Wed Jan 7 10:01:48 2026 +0530
Add consensus check before selecting a segment for compaction or deletion
(#17352)
---
.../pinot/common/metrics/ControllerMeter.java | 4 +-
.../pinot/plugin/minion/tasks/MinionTaskUtils.java | 46 +++++++++++++++++++++-
.../UpsertCompactMergeTaskGenerator.java | 43 +++++++++++++-------
.../UpsertCompactMergeTaskGeneratorTest.java | 4 +-
4 files changed, 79 insertions(+), 18 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
index 5289542a6ef..d812d1c1059 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
@@ -81,7 +81,9 @@ public enum ControllerMeter implements AbstractMetrics.Meter {
// Audit logging metrics
AUDIT_REQUEST_FAILURES("failures", true),
AUDIT_RESPONSE_FAILURES("failures", true),
- AUDIT_REQUEST_PAYLOAD_TRUNCATED("count", true);
+ AUDIT_REQUEST_PAYLOAD_TRUNCATED("count", true),
+ // Upsert compact merge task metrics
+
UPSERT_COMPACT_MERGE_SEGMENT_SKIPPED_CONSENSUS_FAILURE("UpsertCompactMergeSegmentsSkipped",
false);
private final String _brokerMeterName;
private final String _unit;
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
index 9d4379b63e4..de88fb3a0eb 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
@@ -34,6 +34,7 @@ import org.apache.helix.HelixAdmin;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.restlet.resources.ValidDocIdsBitmapResponse;
+import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo;
import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
import org.apache.pinot.common.utils.RoaringBitmapUtils;
import org.apache.pinot.common.utils.ServiceStatus;
@@ -156,7 +157,7 @@ public class MinionTaskUtils {
return singleFileGenerationTaskConfig;
} catch (Exception e) {
singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_MODE,
- BatchConfigProperties.SegmentPushType.TAR.toString());
+ BatchConfigProperties.SegmentPushType.TAR.toString());
return singleFileGenerationTaskConfig;
}
}
@@ -334,4 +335,47 @@ public class MinionTaskUtils {
}
return validDocIdsType;
}
+
+ /**
+ * Checks if all replicas have consensus on validDoc counts for a segment.
+ * SAFETY LOGIC:
+ * 1. Only proceed with operations when ALL replicas agree on totalValidDocs
count
+ * 2. Skip operations if ANY server hosting the segment is not in READY state
+ * 3. Include all replicas (even those with CRC mismatches) in consensus for
safety
+ *
+ * @param segmentName the name of the segment being checked
+ * @param replicaMetadataList list of metadata from all replicas of the
segment
+ * @return true if all replicas have consensus on validDoc counts, false
otherwise
+ */
+ public static boolean hasValidDocConsensus(String segmentName,
+ List<ValidDocIdsMetadataInfo> replicaMetadataList) {
+
+ if (replicaMetadataList == null || replicaMetadataList.isEmpty()) {
+ LOGGER.warn("No replica metadata available for segment: {}",
segmentName);
+ return false;
+ }
+
+ // Check server readiness and validDoc consensus
+ Long consensusValidDocs = null;
+ for (ValidDocIdsMetadataInfo metadata : replicaMetadataList) {
+ // Check server readiness - skip if ANY server is not ready
+ if (metadata.getServerStatus() != null &&
!metadata.getServerStatus().equals(ServiceStatus.Status.GOOD)) {
+ LOGGER.warn("Server {} is in {} state for segment: {}, skipping
consensus check",
+ metadata.getInstanceId(), metadata.getServerStatus(), segmentName);
+ return false;
+ }
+
+ // Check if all replicas have the same totalValidDocs count
+ long validDocs = metadata.getTotalValidDocs();
+ if (consensusValidDocs == null) {
+ // First iteration, we record the value to compare against
+ consensusValidDocs = validDocs;
+ } else if (!consensusValidDocs.equals(validDocs)) {
+ LOGGER.warn("Inconsistent validDoc counts across replicas for segment:
{}. Expected: {}, but found: {}",
+ segmentName, consensusValidDocs, validDocs);
+ return false;
+ }
+ }
+ return true;
+ }
}
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
index 5a5705bcf05..904f6c28c1b 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
@@ -34,16 +34,18 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.helix.task.TaskState;
import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.metrics.ControllerMeter;
+import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo;
import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
import org.apache.pinot.common.utils.SegmentUtils;
-import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import
org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
import
org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtils;
import org.apache.pinot.controller.util.ServerSegmentMetadataReader;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils;
import org.apache.pinot.spi.annotations.minion.TaskGenerator;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -187,7 +189,7 @@ public class UpsertCompactMergeTaskGenerator extends
BaseTaskGenerator {
SegmentSelectionResult segmentSelectionResult =
processValidDocIdsMetadata(tableNameWithType, taskConfigs,
candidateSegmentsMap, validDocIdsMetadataList,
- alreadyMergedSegments);
+ alreadyMergedSegments,
_clusterInfoAccessor.getControllerMetrics());
if (!segmentSelectionResult.getSegmentsForDeletion().isEmpty()) {
pinotHelixResourceManager.deleteSegments(tableNameWithType,
segmentSelectionResult.getSegmentsForDeletion(),
@@ -240,10 +242,17 @@ public class UpsertCompactMergeTaskGenerator extends
BaseTaskGenerator {
return pinotTaskConfigs;
}
+ /**
+ * Processes validDocIds metadata to determine segments eligible for
deletion or compaction.
+ * Evaluates segments based on valid/invalid document counts, server
readiness, and CRC consistency.
+ * Requires consensus across all replicas on validDoc counts before
proceeding with any operations.
+ * Marks segments with zero valid documents for deletion and groups others
by partition for compaction.
+ */
@VisibleForTesting
public static SegmentSelectionResult processValidDocIdsMetadata(String
tableNameWithType,
Map<String, String> taskConfigs, Map<String, SegmentZKMetadata>
candidateSegmentsMap,
- Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadataInfoMap,
Set<String> alreadyMergedSegments) {
+ Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadataInfoMap,
Set<String> alreadyMergedSegments,
+ ControllerMetrics controllerMetrics) {
Map<Integer, List<SegmentMergerMetadata>> segmentsEligibleForCompactMerge
= new HashMap<>();
Set<String> segmentsForDeletion = new HashSet<>();
@@ -286,7 +295,23 @@ public class UpsertCompactMergeTaskGenerator extends
BaseTaskGenerator {
continue;
}
SegmentZKMetadata segment = candidateSegmentsMap.get(segmentName);
- for (ValidDocIdsMetadataInfo validDocIdsMetadata :
validDocIdsMetadataInfoMap.get(segmentName)) {
+ List<ValidDocIdsMetadataInfo> replicaMetadataList =
validDocIdsMetadataInfoMap.get(segmentName);
+
+ // Check consensus across all replicas before proceeding with any
operations
+ if (!MinionTaskUtils.hasValidDocConsensus(segmentName,
replicaMetadataList)) {
+ LOGGER.info("Skipping segment {} for table {} - no consensus on
validDoc counts across replicas",
+ segmentName, tableNameWithType);
+
+ // Emit metric to track segments skipped due to consensus failure
+ if (controllerMetrics != null) {
+ controllerMetrics.addMeteredTableValue(tableNameWithType,
+
ControllerMeter.UPSERT_COMPACT_MERGE_SEGMENT_SKIPPED_CONSENSUS_FAILURE, 1L);
+ }
+ continue;
+ }
+
+ // Process with existing logic using the first replica with matching CRC
(since all have consensus)
+ for (ValidDocIdsMetadataInfo validDocIdsMetadata : replicaMetadataList) {
long totalInvalidDocs = validDocIdsMetadata.getTotalInvalidDocs();
long totalValidDocs = validDocIdsMetadata.getTotalValidDocs();
long segmentSizeInBytes = validDocIdsMetadata.getSegmentSizeInBytes();
@@ -298,16 +323,6 @@ public class UpsertCompactMergeTaskGenerator extends
BaseTaskGenerator {
continue;
}
- // skipping segments for which their servers are not in READY state.
The bitmaps would be inconsistent when
- // server is NOT READY as UPDATING segments might be updating the
ONLINE segments
- if (validDocIdsMetadata.getServerStatus() != null &&
!validDocIdsMetadata.getServerStatus()
- .equals(ServiceStatus.Status.GOOD)) {
- LOGGER.warn("Server {} is in {} state, skipping {} generation for
segment: {}",
- validDocIdsMetadata.getInstanceId(),
validDocIdsMetadata.getServerStatus(),
- MinionConstants.UpsertCompactMergeTask.TASK_TYPE, segmentName);
- continue;
- }
-
// segments eligible for deletion with no valid records
long totalDocs = validDocIdsMetadata.getTotalDocs();
if (totalInvalidDocs == totalDocs) {
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java
index d074e3e3005..29a64624ae3 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java
@@ -375,7 +375,7 @@ public class UpsertCompactMergeTaskGeneratorTest {
SegmentSelectionResult result =
UpsertCompactMergeTaskGenerator.processValidDocIdsMetadata(
RAW_TABLE_NAME + "_REALTIME", taskConfigs, candidateSegmentsMap,
- validDocIdsMetadata, alreadyMergedSegments);
+ validDocIdsMetadata, alreadyMergedSegments, null);
Assert.assertNotNull(result);
Assert.assertNotNull(result.getSegmentsForCompactMergeByPartition());
@@ -413,7 +413,7 @@ public class UpsertCompactMergeTaskGeneratorTest {
SegmentSelectionResult result =
UpsertCompactMergeTaskGenerator.processValidDocIdsMetadata(
RAW_TABLE_NAME + "_REALTIME", taskConfigs, candidateSegmentsMap,
- validDocIdsMetadata, alreadyMergedSegments);
+ validDocIdsMetadata, alreadyMergedSegments, null);
Assert.assertNotNull(result);
Assert.assertEquals(result.getSegmentsForDeletion().size(), 1, "Should
have one segment for deletion");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]