This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ca198408ce Add consensus check before selecting a segment for 
compaction or deletion (#17352)
7ca198408ce is described below

commit 7ca198408ce254da73a4a8ba1ae955ee95640715
Author: tarun11Mavani <[email protected]>
AuthorDate: Wed Jan 7 10:01:48 2026 +0530

    Add consensus check before selecting a segment for compaction or deletion 
(#17352)
---
 .../pinot/common/metrics/ControllerMeter.java      |  4 +-
 .../pinot/plugin/minion/tasks/MinionTaskUtils.java | 46 +++++++++++++++++++++-
 .../UpsertCompactMergeTaskGenerator.java           | 43 +++++++++++++-------
 .../UpsertCompactMergeTaskGeneratorTest.java       |  4 +-
 4 files changed, 79 insertions(+), 18 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
index 5289542a6ef..d812d1c1059 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
@@ -81,7 +81,9 @@ public enum ControllerMeter implements AbstractMetrics.Meter {
   // Audit logging metrics
   AUDIT_REQUEST_FAILURES("failures", true),
   AUDIT_RESPONSE_FAILURES("failures", true),
-  AUDIT_REQUEST_PAYLOAD_TRUNCATED("count", true);
+  AUDIT_REQUEST_PAYLOAD_TRUNCATED("count", true),
+  // Upsert compact merge task metrics
+  
UPSERT_COMPACT_MERGE_SEGMENT_SKIPPED_CONSENSUS_FAILURE("UpsertCompactMergeSegmentsSkipped",
 false);
 
   private final String _brokerMeterName;
   private final String _unit;
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
index 9d4379b63e4..de88fb3a0eb 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
@@ -34,6 +34,7 @@ import org.apache.helix.HelixAdmin;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.restlet.resources.ValidDocIdsBitmapResponse;
+import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo;
 import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
 import org.apache.pinot.common.utils.RoaringBitmapUtils;
 import org.apache.pinot.common.utils.ServiceStatus;
@@ -156,7 +157,7 @@ public class MinionTaskUtils {
       return singleFileGenerationTaskConfig;
     } catch (Exception e) {
       singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_MODE,
-            BatchConfigProperties.SegmentPushType.TAR.toString());
+          BatchConfigProperties.SegmentPushType.TAR.toString());
       return singleFileGenerationTaskConfig;
     }
   }
@@ -334,4 +335,47 @@ public class MinionTaskUtils {
     }
     return validDocIdsType;
   }
+
+  /**
+   * Checks if all replicas have consensus on validDoc counts for a segment.
+   * SAFETY LOGIC:
+   * 1. Only proceed with operations when ALL replicas agree on totalValidDocs 
count
+   * 2. Skip operations if ANY server hosting the segment is not in READY state
+   * 3. Include all replicas (even those with CRC mismatches) in consensus for 
safety
+   *
+   * @param segmentName the name of the segment being checked
+   * @param replicaMetadataList list of metadata from all replicas of the 
segment
+   * @return true if all replicas have consensus on validDoc counts, false 
otherwise
+   */
+  public static boolean hasValidDocConsensus(String segmentName,
+      List<ValidDocIdsMetadataInfo> replicaMetadataList) {
+
+    if (replicaMetadataList == null || replicaMetadataList.isEmpty()) {
+      LOGGER.warn("No replica metadata available for segment: {}", 
segmentName);
+      return false;
+    }
+
+    // Check server readiness and validDoc consensus
+    Long consensusValidDocs = null;
+    for (ValidDocIdsMetadataInfo metadata : replicaMetadataList) {
+      // Check server readiness - skip if ANY server is not ready
+      if (metadata.getServerStatus() != null && 
!metadata.getServerStatus().equals(ServiceStatus.Status.GOOD)) {
+        LOGGER.warn("Server {} is in {} state for segment: {}, skipping 
consensus check",
+            metadata.getInstanceId(), metadata.getServerStatus(), segmentName);
+        return false;
+      }
+
+      // Check if all replicas have the same totalValidDocs count
+      long validDocs = metadata.getTotalValidDocs();
+      if (consensusValidDocs == null) {
+        // First iteration, we record the value to compare against
+        consensusValidDocs = validDocs;
+      } else if (!consensusValidDocs.equals(validDocs)) {
+        LOGGER.warn("Inconsistent validDoc counts across replicas for segment: 
{}. Expected: {}, but found: {}",
+            segmentName, consensusValidDocs, validDocs);
+        return false;
+      }
+    }
+    return true;
+  }
 }
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
index 5a5705bcf05..904f6c28c1b 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
@@ -34,16 +34,18 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.helix.task.TaskState;
 import org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.metrics.ControllerMeter;
+import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo;
 import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
 import org.apache.pinot.common.utils.SegmentUtils;
-import org.apache.pinot.common.utils.ServiceStatus;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
 import 
org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtils;
 import org.apache.pinot.controller.util.ServerSegmentMetadataReader;
 import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils;
 import org.apache.pinot.spi.annotations.minion.TaskGenerator;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -187,7 +189,7 @@ public class UpsertCompactMergeTaskGenerator extends 
BaseTaskGenerator {
 
       SegmentSelectionResult segmentSelectionResult =
           processValidDocIdsMetadata(tableNameWithType, taskConfigs, 
candidateSegmentsMap, validDocIdsMetadataList,
-              alreadyMergedSegments);
+              alreadyMergedSegments, 
_clusterInfoAccessor.getControllerMetrics());
 
       if (!segmentSelectionResult.getSegmentsForDeletion().isEmpty()) {
         pinotHelixResourceManager.deleteSegments(tableNameWithType, 
segmentSelectionResult.getSegmentsForDeletion(),
@@ -240,10 +242,17 @@ public class UpsertCompactMergeTaskGenerator extends 
BaseTaskGenerator {
     return pinotTaskConfigs;
   }
 
+  /**
+   * Processes validDocIds metadata to determine segments eligible for 
deletion or compaction.
+   * Evaluates segments based on valid/invalid document counts, server 
readiness, and CRC consistency.
+   * Requires consensus across all replicas on validDoc counts before 
proceeding with any operations.
+   * Marks segments with zero valid documents for deletion and groups others 
by partition for compaction.
+   */
   @VisibleForTesting
   public static SegmentSelectionResult processValidDocIdsMetadata(String 
tableNameWithType,
       Map<String, String> taskConfigs, Map<String, SegmentZKMetadata> 
candidateSegmentsMap,
-      Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadataInfoMap, 
Set<String> alreadyMergedSegments) {
+      Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadataInfoMap, 
Set<String> alreadyMergedSegments,
+      ControllerMetrics controllerMetrics) {
     Map<Integer, List<SegmentMergerMetadata>> segmentsEligibleForCompactMerge 
= new HashMap<>();
     Set<String> segmentsForDeletion = new HashSet<>();
 
@@ -286,7 +295,23 @@ public class UpsertCompactMergeTaskGenerator extends 
BaseTaskGenerator {
         continue;
       }
       SegmentZKMetadata segment = candidateSegmentsMap.get(segmentName);
-      for (ValidDocIdsMetadataInfo validDocIdsMetadata : 
validDocIdsMetadataInfoMap.get(segmentName)) {
+      List<ValidDocIdsMetadataInfo> replicaMetadataList = 
validDocIdsMetadataInfoMap.get(segmentName);
+
+      // Check consensus across all replicas before proceeding with any 
operations
+      if (!MinionTaskUtils.hasValidDocConsensus(segmentName, 
replicaMetadataList)) {
+        LOGGER.info("Skipping segment {} for table {} - no consensus on 
validDoc counts across replicas",
+            segmentName, tableNameWithType);
+
+        // Emit metric to track segments skipped due to consensus failure
+        if (controllerMetrics != null) {
+          controllerMetrics.addMeteredTableValue(tableNameWithType,
+              
ControllerMeter.UPSERT_COMPACT_MERGE_SEGMENT_SKIPPED_CONSENSUS_FAILURE, 1L);
+        }
+        continue;
+      }
+
+      // Process with existing logic using the first replica with matching CRC 
(since all have consensus)
+      for (ValidDocIdsMetadataInfo validDocIdsMetadata : replicaMetadataList) {
         long totalInvalidDocs = validDocIdsMetadata.getTotalInvalidDocs();
         long totalValidDocs = validDocIdsMetadata.getTotalValidDocs();
         long segmentSizeInBytes = validDocIdsMetadata.getSegmentSizeInBytes();
@@ -298,16 +323,6 @@ public class UpsertCompactMergeTaskGenerator extends 
BaseTaskGenerator {
           continue;
         }
 
-        // skipping segments for which their servers are not in READY state. 
The bitmaps would be inconsistent when
-        // server is NOT READY as UPDATING segments might be updating the 
ONLINE segments
-        if (validDocIdsMetadata.getServerStatus() != null && 
!validDocIdsMetadata.getServerStatus()
-            .equals(ServiceStatus.Status.GOOD)) {
-          LOGGER.warn("Server {} is in {} state, skipping {} generation for 
segment: {}",
-              validDocIdsMetadata.getInstanceId(), 
validDocIdsMetadata.getServerStatus(),
-              MinionConstants.UpsertCompactMergeTask.TASK_TYPE, segmentName);
-          continue;
-        }
-
         // segments eligible for deletion with no valid records
         long totalDocs = validDocIdsMetadata.getTotalDocs();
         if (totalInvalidDocs == totalDocs) {
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java
index d074e3e3005..29a64624ae3 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java
@@ -375,7 +375,7 @@ public class UpsertCompactMergeTaskGeneratorTest {
 
     SegmentSelectionResult result = 
UpsertCompactMergeTaskGenerator.processValidDocIdsMetadata(
         RAW_TABLE_NAME + "_REALTIME", taskConfigs, candidateSegmentsMap,
-        validDocIdsMetadata, alreadyMergedSegments);
+        validDocIdsMetadata, alreadyMergedSegments, null);
 
     Assert.assertNotNull(result);
     Assert.assertNotNull(result.getSegmentsForCompactMergeByPartition());
@@ -413,7 +413,7 @@ public class UpsertCompactMergeTaskGeneratorTest {
 
     SegmentSelectionResult result = 
UpsertCompactMergeTaskGenerator.processValidDocIdsMetadata(
         RAW_TABLE_NAME + "_REALTIME", taskConfigs, candidateSegmentsMap,
-        validDocIdsMetadata, alreadyMergedSegments);
+        validDocIdsMetadata, alreadyMergedSegments, null);
 
     Assert.assertNotNull(result);
     Assert.assertEquals(result.getSegmentsForDeletion().size(), 1, "Should 
have one segment for deletion");


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to