This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f4c086ef51 Avoid Inconsistencies among replicas during Upsert 
Compaction Tasks (#17696)
2f4c086ef51 is described below

commit 2f4c086ef517d848c3746a522ad14d5bff85af10
Author: Chaitanya Deepthi <[email protected]>
AuthorDate: Fri Mar 6 13:12:37 2026 -0800

    Avoid Inconsistencies among replicas during Upsert Compaction Tasks (#17696)
---
 .../apache/pinot/core/common/MinionConstants.java  |  15 ++
 .../pinot/plugin/minion/tasks/MinionTaskUtils.java | 154 ++++++-----
 .../UpsertCompactionTaskExecutor.java              |  10 +-
 .../UpsertCompactMergeTaskExecutor.java            |  16 +-
 .../UpsertCompactMergeTaskGenerator.java           |  18 +-
 .../plugin/minion/tasks/MinionTaskUtilsTest.java   | 285 +++++++++++++++++++++
 6 files changed, 405 insertions(+), 93 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java 
b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index 1f784fb368b..9e059e98417 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -92,6 +92,11 @@ public class MinionConstants {
    */
   public static final String SEGMENT_DOWNLOAD_PARALLELISM = 
"segmentDownloadParallelism";
 
+  /** Valid doc ids consensus mode (executor-only). Kept internal; executors 
pass config string. */
+  public enum ValidDocIdsConsensusMode {
+    UNSAFE, EQUAL, MOST_VALID_DOCS
+  }
+
   // Purges rows inside segment that match chosen criteria
   public static class PurgeTask {
     public static final String TASK_TYPE = "PurgeTask";
@@ -257,6 +262,16 @@ public class MinionConstants {
      * number of segments to query in one batch to fetch valid doc id 
metadata, by default 500
      */
     public static final String NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST = 
"numSegmentsBatchPerServerRequest";
+
+    /**
+     * Valid doc ids consensus mode used by the executor only (generator 
unchanged). Values: UNSAFE, EQUAL,
+     * MOST_VALID_DOCS. UNSAFE = use first server with matching CRC and READY; 
EQUAL = require all replicas
+     * to have the same valid doc set (default); MOST_VALID_DOCS = use replica 
with most valid docs.
+     */
+    public static final String VALID_DOC_IDS_CONSENSUS_MODE_KEY = 
"validDocIdsConsensusMode";
+
+    /** Default: equal valid doc set consensus across replicas. */
+    public static final String DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE = "EQUAL";
   }
 
   public static class UpsertCompactMergeTask {
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
index ca197849210..396a91fc477 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
@@ -36,7 +36,6 @@ import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.auth.AuthProviderUtils;
 import org.apache.pinot.common.auth.NullAuthProvider;
 import org.apache.pinot.common.restlet.resources.ValidDocIdsBitmapResponse;
-import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo;
 import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
 import org.apache.pinot.common.utils.RoaringBitmapUtils;
 import org.apache.pinot.common.utils.ServiceStatus;
@@ -67,6 +66,14 @@ import org.slf4j.LoggerFactory;
 public class MinionTaskUtils {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(MinionTaskUtils.class);
 
+  /** Package-private for testing: parses validDocIdsComparisonMode config 
string. */
+  static MinionConstants.ValidDocIdsConsensusMode 
parseValidDocIdsConsensusMode(String value) {
+    if (value == null || value.isBlank()) {
+      return MinionConstants.ValidDocIdsConsensusMode.EQUAL;
+    }
+    return 
MinionConstants.ValidDocIdsConsensusMode.valueOf(value.toUpperCase().trim());
+  }
+
   private static final String DEFAULT_DIR_PATH_TERMINATOR = "/";
 
   public static final String DATETIME_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
@@ -281,22 +288,22 @@ public class MinionTaskUtils {
   }
 
   /**
-   * Returns the validDocID bitmap from the server whose local segment crc 
matches both crc of ZK metadata and
-   * deepstore copy (expectedCrc).
+   * Returns the validDocIds bitmap from server(s). {@code comparisonMode} is 
the task config value: UNSAFE,
+   * EQUAL (default), or MOST_VALID_DOCS.
    */
   @Nullable
   public static RoaringBitmap getValidDocIdFromServerMatchingCrc(String 
tableNameWithType, String segmentName,
-      String validDocIdsType, MinionContext minionContext, String expectedCrc) 
{
+      String validDocIdsType, MinionContext minionContext, String expectedCrc, 
String comparisonModeStr) {
+    MinionConstants.ValidDocIdsConsensusMode consensusMode = 
parseValidDocIdsConsensusMode(comparisonModeStr);
     String clusterName = minionContext.getHelixManager().getClusterName();
     HelixAdmin helixAdmin = 
minionContext.getHelixManager().getClusterManagmentTool();
-    RoaringBitmap validDocIds = null;
     List<String> servers = getServers(segmentName, tableNameWithType, 
helixAdmin, clusterName);
+    List<RoaringBitmap> matchingBitmaps = new ArrayList<>();
+
     for (String server : servers) {
       InstanceConfig instanceConfig = 
helixAdmin.getInstanceConfig(clusterName, server);
       String endpoint = InstanceUtils.getServerAdminEndpoint(instanceConfig);
 
-      // We only need aggregated table size and the total number of docs/rows. 
Skipping column related stats, by
-      // passing an empty list.
       ServerSegmentMetadataReader serverSegmentMetadataReader = new 
ServerSegmentMetadataReader();
       ValidDocIdsBitmapResponse validDocIdsBitmapResponse;
       try {
@@ -304,43 +311,91 @@ public class MinionTaskUtils {
             
serverSegmentMetadataReader.getValidDocIdsBitmapFromServer(tableNameWithType, 
segmentName, endpoint,
                 validDocIdsType, 60_000);
       } catch (Exception e) {
-        LOGGER.warn("Unable to retrieve validDocIds bitmap for segment: " + 
segmentName + " from endpoint: "
-            + endpoint, e);
-        continue;
+        if (consensusMode == MinionConstants.ValidDocIdsConsensusMode.UNSAFE) {
+          LOGGER.warn(
+              "Unable to retrieve validDocIds bitmap for segment: " + 
segmentName + " from endpoint: " + endpoint, e);
+          continue;
+        } else {
+          throw new IllegalStateException(
+              "Unable to retrieve validDocIds bitmap for segment: " + 
segmentName + " from endpoint: " + endpoint, e);
+        }
       }
 
+      String crcFromValidDocIdsBitmap = 
validDocIdsBitmapResponse.getSegmentCrc();
       // Check crc from the downloaded segment against the crc returned from 
the server along with the valid doc id
       // bitmap. If this doesn't match, this means that we are hitting the 
race condition where the segment has been
       // uploaded successfully while the server is still reloading the 
segment. Reloading can take a while when the
       // offheap upsert is used because we will need to delete & add all 
primary keys.
       // `BaseSingleSegmentConversionExecutor.executeTask()` already checks 
for the crc from the task generator
       // against the crc from the current segment zk metadata, so we don't 
need to check that here.
-      String crcFromValidDocIdsBitmap = 
validDocIdsBitmapResponse.getSegmentCrc();
       if (!expectedCrc.equals(crcFromValidDocIdsBitmap)) {
-        // In this scenario, we are hitting the other replica of the segment 
which did not commit to ZK or deepstore.
-        // We will skip processing this bitmap to query other server to 
confirm if there is a valid matching CRC.
-        String message = "CRC mismatch for segment: " + segmentName + ", 
expected value based on task generator: "
-            + expectedCrc + ", actual crc from validDocIdsBitmapResponse from 
endpoint " + endpoint + ": "
-            + crcFromValidDocIdsBitmap;
-        LOGGER.warn(message);
-        continue;
+        if (consensusMode == MinionConstants.ValidDocIdsConsensusMode.UNSAFE) {
+          LOGGER.warn("CRC mismatch for segment: {} from endpoint {}, 
skipping", segmentName, endpoint);
+          continue;
+        } else {
+          throw new IllegalStateException(
+              "CRC mismatch for segment: " + segmentName + ", expected: " + 
expectedCrc + ", actual from endpoint "
+                  + endpoint + ": " + crcFromValidDocIdsBitmap);
+        }
       }
 
-      // skipping servers which are not in READY state. The bitmaps would be 
inconsistent when
-      // server is NOT READY as UPDATING segments might be updating the ONLINE 
segments
       if (validDocIdsBitmapResponse.getServerStatus() != null && 
!validDocIdsBitmapResponse.getServerStatus()
           .equals(ServiceStatus.Status.GOOD)) {
-        String message = "Server " + validDocIdsBitmapResponse.getInstanceId() 
+ " is in "
-            + validDocIdsBitmapResponse.getServerStatus() + " state, skipping 
it for execution for segment: "
-            + validDocIdsBitmapResponse.getSegmentName() + ". Will try other 
servers.";
-        LOGGER.warn(message);
-        continue;
+        if (consensusMode == MinionConstants.ValidDocIdsConsensusMode.UNSAFE) {
+          LOGGER.warn("Server {} not READY for segment {}, skipping", 
validDocIdsBitmapResponse.getInstanceId(),
+              segmentName);
+          continue;
+        } else {
+          throw new IllegalStateException("Server " + 
validDocIdsBitmapResponse.getInstanceId() + " is in "
+              + validDocIdsBitmapResponse.getServerStatus() + " state for 
segment: " + segmentName
+              + ". Failing task to avoid inconsistency among replicas.");
+        }
+      }
+
+      RoaringBitmap bitmap = 
RoaringBitmapUtils.deserialize(validDocIdsBitmapResponse.getBitmap());
+      int cardinality = bitmap.getCardinality();
+
+      if (consensusMode == MinionConstants.ValidDocIdsConsensusMode.UNSAFE) {
+        LOGGER.info("Using server {} with {} valid docs for segment {} 
(mode=UNSAFE)", server, cardinality,
+            segmentName);
+        return bitmap;
       }
 
-      validDocIds = 
RoaringBitmapUtils.deserialize(validDocIdsBitmapResponse.getBitmap());
-      break;
+      matchingBitmaps.add(bitmap);
     }
-    return validDocIds;
+
+    if (matchingBitmaps.isEmpty()) {
+      return null;
+    }
+
+    if (consensusMode == MinionConstants.ValidDocIdsConsensusMode.EQUAL) {
+      RoaringBitmap consensusBitMap = matchingBitmaps.get(0);
+      for (RoaringBitmap b : matchingBitmaps) {
+        if (!b.equals(consensusBitMap)) {
+          throw new IllegalStateException("No consensus on validDocs across 
replicas for segment: " + segmentName
+              + ". Failing task to avoid replica inconsistency.");
+        }
+      }
+      LOGGER.info("All {} servers have {} valid docs for segment {}", 
servers.size(), consensusBitMap.getCardinality(),
+          segmentName);
+      return consensusBitMap;
+    }
+
+    // MOST_VALID_DOCS: explicitly pick the bitmap with the maximum valid doc 
count
+    RoaringBitmap maxCardinalityMap = null;
+    int maxCard = -1;
+    for (RoaringBitmap b : matchingBitmaps) {
+      int card = b.getCardinality();
+      if (card > maxCard) {
+        maxCard = card;
+        maxCardinalityMap = b;
+      }
+    }
+    if (maxCardinalityMap != null) {
+      LOGGER.info("Selected server with {} valid docs for segment {} 
(mode=MOST_VALID_DOCS, checked {} servers)",
+          maxCard, segmentName, servers.size());
+    }
+    return maxCardinalityMap;
   }
 
   public static String toUTCString(long epochMillis) {
@@ -396,47 +451,4 @@ public class MinionTaskUtils {
     }
     return validDocIdsType;
   }
-
-  /**
-   * Checks if all replicas have consensus on validDoc counts for a segment.
-   * SAFETY LOGIC:
-   * 1. Only proceed with operations when ALL replicas agree on totalValidDocs 
count
-   * 2. Skip operations if ANY server hosting the segment is not in READY state
-   * 3. Include all replicas (even those with CRC mismatches) in consensus for 
safety
-   *
-   * @param segmentName the name of the segment being checked
-   * @param replicaMetadataList list of metadata from all replicas of the 
segment
-   * @return true if all replicas have consensus on validDoc counts, false 
otherwise
-   */
-  public static boolean hasValidDocConsensus(String segmentName,
-      List<ValidDocIdsMetadataInfo> replicaMetadataList) {
-
-    if (replicaMetadataList == null || replicaMetadataList.isEmpty()) {
-      LOGGER.warn("No replica metadata available for segment: {}", 
segmentName);
-      return false;
-    }
-
-    // Check server readiness and validDoc consensus
-    Long consensusValidDocs = null;
-    for (ValidDocIdsMetadataInfo metadata : replicaMetadataList) {
-      // Check server readiness - skip if ANY server is not ready
-      if (metadata.getServerStatus() != null && 
!metadata.getServerStatus().equals(ServiceStatus.Status.GOOD)) {
-        LOGGER.warn("Server {} is in {} state for segment: {}, skipping 
consensus check",
-            metadata.getInstanceId(), metadata.getServerStatus(), segmentName);
-        return false;
-      }
-
-      // Check if all replicas have the same totalValidDocs count
-      long validDocs = metadata.getTotalValidDocs();
-      if (consensusValidDocs == null) {
-        // First iteration, we record the value to compare against
-        consensusValidDocs = validDocs;
-      } else if (!consensusValidDocs.equals(validDocs)) {
-        LOGGER.warn("Inconsistent validDoc counts across replicas for segment: 
{}. Expected: {}, but found: {}",
-            segmentName, consensusValidDocs, validDocs);
-        return false;
-      }
-    }
-    return true;
-  }
 }
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
index f4497b69ebd..189072f5bc9 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
@@ -75,9 +75,17 @@ public class UpsertCompactionTaskExecutor extends 
BaseSingleSegmentConversionExe
       LOGGER.error(message);
       throw new IllegalStateException(message);
     }
+
+    // Executor-only: read comparison mode string from task config (no auth 
resolution or URL hits).
+    Map<String, String> taskConfigs =
+        tableConfig.getTaskConfig() != null ? 
tableConfig.getTaskConfig().getConfigsForTaskType(taskType) : null;
+    String consensusMode =
+        taskConfigs != null ? 
taskConfigs.getOrDefault(UpsertCompactionTask.VALID_DOC_IDS_CONSENSUS_MODE_KEY,
+            UpsertCompactionTask.DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE)
+            : UpsertCompactionTask.DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE;
     RoaringBitmap validDocIds =
         MinionTaskUtils.getValidDocIdFromServerMatchingCrc(tableNameWithType, 
segmentName, validDocIdsTypeStr,
-            MINION_CONTEXT, originalSegmentCrcFromTaskGenerator);
+            MINION_CONTEXT, originalSegmentCrcFromTaskGenerator, 
consensusMode);
     if (validDocIds == null) {
       // no valid crc match found or no validDocIds obtained from all servers
       // error out the task instead of silently failing so that we can track 
it via task-error metrics
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutor.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutor.java
index e5384ecca64..d148e9d26be 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutor.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutor.java
@@ -109,15 +109,23 @@ public class UpsertCompactMergeTaskExecutor extends 
BaseMultipleSegmentsConversi
         
List.of(configs.get(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY).split(","));
     validateCRCForInputSegments(segmentMetadataList, 
originalSegmentCrcFromTaskGenerator);
 
-    // Fetch validDocID snapshot from server and get record-reader for 
compacted reader.
+    // Executor-only: read comparison mode string from task config (no auth 
resolution or URL hits).
+    Map<String, String> taskConfigs =
+        tableConfig.getTaskConfig() != null ? 
tableConfig.getTaskConfig().getConfigsForTaskType(taskType) : null;
+    String consensusMode = taskConfigs != null ? taskConfigs.getOrDefault(
+        MinionConstants.UpsertCompactionTask.VALID_DOC_IDS_CONSENSUS_MODE_KEY,
+        
MinionConstants.UpsertCompactionTask.DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE)
+        : 
MinionConstants.UpsertCompactionTask.DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE;
+
     List<RecordReader> recordReaders = segmentMetadataList.stream().map(x -> {
       RoaringBitmap validDocIds = 
MinionTaskUtils.getValidDocIdFromServerMatchingCrc(tableNameWithType, 
x.getName(),
-          ValidDocIdsType.SNAPSHOT.name(), MINION_CONTEXT, x.getCrc());
+          ValidDocIdsType.SNAPSHOT.name(), MINION_CONTEXT, x.getCrc(), 
consensusMode);
       if (validDocIds == null) {
         // no valid crc match found or no validDocIds obtained from all servers
         // error out the task instead of silently failing so that we can track 
it via task-error metrics
-        String message = String.format("No validDocIds found from all servers. 
They either failed to download "
-            + "or did not match crc from segment copy obtained from deepstore 
/ servers. " + "Expected crc: %s", "");
+        String message = "No validDocIds found from all servers for segment: " 
+ x.getName()
+            + ". They either failed to download or did not match crc from 
segment copy obtained from "
+            + "deepstore/servers. Expected crc: " + x.getCrc();
         LOGGER.error(message);
         throw new IllegalStateException(message);
       }
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
index 904f6c28c1b..3d72a7c6b8c 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
@@ -34,7 +34,6 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.helix.task.TaskState;
 import org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo;
 import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
@@ -45,7 +44,6 @@ import 
org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtil
 import org.apache.pinot.controller.util.ServerSegmentMetadataReader;
 import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.minion.PinotTaskConfig;
-import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils;
 import org.apache.pinot.spi.annotations.minion.TaskGenerator;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -295,23 +293,9 @@ public class UpsertCompactMergeTaskGenerator extends 
BaseTaskGenerator {
         continue;
       }
       SegmentZKMetadata segment = candidateSegmentsMap.get(segmentName);
-      List<ValidDocIdsMetadataInfo> replicaMetadataList = 
validDocIdsMetadataInfoMap.get(segmentName);
-
-      // Check consensus across all replicas before proceeding with any 
operations
-      if (!MinionTaskUtils.hasValidDocConsensus(segmentName, 
replicaMetadataList)) {
-        LOGGER.info("Skipping segment {} for table {} - no consensus on 
validDoc counts across replicas",
-            segmentName, tableNameWithType);
-
-        // Emit metric to track segments skipped due to consensus failure
-        if (controllerMetrics != null) {
-          controllerMetrics.addMeteredTableValue(tableNameWithType,
-              
ControllerMeter.UPSERT_COMPACT_MERGE_SEGMENT_SKIPPED_CONSENSUS_FAILURE, 1L);
-        }
-        continue;
-      }
 
       // Process with existing logic using the first replica with matching CRC 
(since all have consensus)
-      for (ValidDocIdsMetadataInfo validDocIdsMetadata : replicaMetadataList) {
+      for (ValidDocIdsMetadataInfo validDocIdsMetadata : 
validDocIdsMetadataInfoMap.get(segmentName)) {
         long totalInvalidDocs = validDocIdsMetadata.getTotalInvalidDocs();
         long totalValidDocs = validDocIdsMetadata.getTotalValidDocs();
         long segmentSizeInBytes = validDocIdsMetadata.getSegmentSizeInBytes();
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java
index 0ad879896f3..709118ba341 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java
@@ -21,11 +21,22 @@ package org.apache.pinot.plugin.minion.tasks;
 import java.net.URI;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.restlet.resources.ValidDocIdsBitmapResponse;
 import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
+import org.apache.pinot.common.utils.RoaringBitmapUtils;
+import org.apache.pinot.common.utils.ServiceStatus;
 import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
+import org.apache.pinot.controller.util.ServerSegmentMetadataReader;
 import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.common.MinionConstants.UpsertCompactionTask;
+import org.apache.pinot.minion.MinionContext;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableTaskConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -33,15 +44,24 @@ import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.filesystem.LocalPinotFS;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.apache.pinot.spi.utils.CommonConstants.Helix;
+import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
 import org.apache.pinot.spi.utils.Enablement;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.mockito.MockedConstruction;
+import org.roaringbitmap.RoaringBitmap;
 import org.testng.annotations.Test;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockConstruction;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.expectThrows;
 
@@ -275,6 +295,271 @@ public class MinionTaskUtilsTest {
         "'snapshot' must not be 'DISABLE' with validDocIdsType: 
SNAPSHOT_WITH_DELETE");
   }
 
+  @Test
+  public void testParseValidDocIdsConsensusMode() {
+    // Null or blank defaults to EQUAL
+    assertEquals(MinionTaskUtils.parseValidDocIdsConsensusMode(null),
+        MinionConstants.ValidDocIdsConsensusMode.EQUAL);
+    assertEquals(MinionTaskUtils.parseValidDocIdsConsensusMode(""),
+        MinionConstants.ValidDocIdsConsensusMode.EQUAL);
+    assertEquals(MinionTaskUtils.parseValidDocIdsConsensusMode("   "),
+        MinionConstants.ValidDocIdsConsensusMode.EQUAL);
+
+    // UNSAFE
+    assertEquals(MinionTaskUtils.parseValidDocIdsConsensusMode("UNSAFE"),
+        MinionConstants.ValidDocIdsConsensusMode.UNSAFE);
+    assertEquals(MinionTaskUtils.parseValidDocIdsConsensusMode("unsafe"),
+        MinionConstants.ValidDocIdsConsensusMode.UNSAFE);
+
+    // EQUAL
+    assertEquals(MinionTaskUtils.parseValidDocIdsConsensusMode("EQUAL"),
+        MinionConstants.ValidDocIdsConsensusMode.EQUAL);
+    assertEquals(MinionTaskUtils.parseValidDocIdsConsensusMode("  EQUAL  "),
+        MinionConstants.ValidDocIdsConsensusMode.EQUAL);
+
+    // MOST_VALID_DOCS
+    
assertEquals(MinionTaskUtils.parseValidDocIdsConsensusMode("MOST_VALID_DOCS"),
+        MinionConstants.ValidDocIdsConsensusMode.MOST_VALID_DOCS);
+    
assertEquals(MinionTaskUtils.parseValidDocIdsConsensusMode("most_valid_docs"),
+        MinionConstants.ValidDocIdsConsensusMode.MOST_VALID_DOCS);
+
+    // Invalid value throws
+    expectThrows(IllegalArgumentException.class,
+        () -> MinionTaskUtils.parseValidDocIdsConsensusMode("INVALID_MODE"));
+  }
+
+  /**
+   * Builds a RoaringBitmap with {@code numDocs} valid doc ids (0..numDocs-1).
+   */
+  private static RoaringBitmap makeBitmap(int numDocs) {
+    RoaringBitmap b = new RoaringBitmap();
+    for (int i = 0; i < numDocs; i++) {
+      b.add(i);
+    }
+    return b;
+  }
+
+  /**
+   * Builds a ValidDocIdsBitmapResponse for testing: same segmentCrc and GOOD 
status.
+   */
+  private static ValidDocIdsBitmapResponse makeResponse(String segmentName, 
String crc, String instanceId,
+      RoaringBitmap bitmap) {
+    return new ValidDocIdsBitmapResponse(segmentName, crc, 
ValidDocIdsType.SNAPSHOT,
+        RoaringBitmapUtils.serialize(bitmap), instanceId, 
ServiceStatus.Status.GOOD);
+  }
+
+  /**
+   * Creates an InstanceConfig so that InstanceUtils.getServerAdminEndpoint() 
returns a valid URL.
+   */
+  private static InstanceConfig makeInstanceConfig(String instanceId) {
+    InstanceConfig config = new InstanceConfig(instanceId);
+    config.setHostName("localhost");
+    config.getRecord().setIntField(Helix.Instance.ADMIN_PORT_KEY, 8098);
+    return config;
+  }
+
+  /**
+   * Sets up MinionContext with mock Helix so getServers() returns the given 
server list.
+   */
+  private void setupMinionContextWithServers(String tableNameWithType, String 
segmentName, String[] servers) {
+    ExternalView externalView = new ExternalView(tableNameWithType);
+    Map<String, String> assignment = new HashMap<>();
+    for (String s : servers) {
+      assignment.put(s, SegmentStateModel.ONLINE);
+    }
+    externalView.getRecord().getMapFields().put(segmentName, assignment);
+
+    HelixAdmin helixAdmin = mock(HelixAdmin.class);
+    when(helixAdmin.getResourceExternalView(anyString(), 
eq(tableNameWithType))).thenReturn(externalView);
+    for (String server : servers) {
+      when(helixAdmin.getInstanceConfig(anyString(), 
eq(server))).thenReturn(makeInstanceConfig(server));
+    }
+
+    HelixManager helixManager = mock(HelixManager.class);
+    when(helixManager.getClusterName()).thenReturn("testCluster");
+    when(helixManager.getClusterManagmentTool()).thenReturn(helixAdmin);
+
+    MinionContext.getInstance().setHelixManager(helixManager);
+  }
+
+  /**
+   * Calls getValidDocIdFromServerMatchingCrc with ServerSegmentMetadataReader 
mocked. Each invocation of
+   * getValidDocIdsBitmapFromServer returns the next element of 
responseOrThrowByCallOrder; if it is an Exception,
+   * that exception is thrown (simulating fetch failure).
+   */
+  private static RoaringBitmap 
getValidDocIdFromServerMatchingCrcWithMockedReader(String tableName,
+      String segmentName, String expectedCrc, String consensusMode, 
List<Object> responseOrThrowByCallOrder,
+      String[] servers, MinionTaskUtilsTest testInstance) {
+    testInstance.setupMinionContextWithServers(tableName, segmentName, 
servers);
+    // Shared across all mock instances (production creates one reader per 
server).
+    AtomicInteger callIndex = new AtomicInteger(0);
+    try (MockedConstruction<ServerSegmentMetadataReader> ignored = 
mockConstruction(ServerSegmentMetadataReader.class,
+        (mock, context) -> {
+          when(mock.getValidDocIdsBitmapFromServer(anyString(), anyString(), 
anyString(), anyString(), anyInt()))
+              .thenAnswer(inv -> {
+                int i = callIndex.getAndIncrement();
+                if (i >= responseOrThrowByCallOrder.size()) {
+                  throw new IllegalStateException("Mock received more calls 
than expected");
+                }
+                Object action = responseOrThrowByCallOrder.get(i);
+                if (action instanceof Exception) {
+                  throw (Exception) action;
+                }
+                return (ValidDocIdsBitmapResponse) action;
+              });
+        })) {
+      return MinionTaskUtils.getValidDocIdFromServerMatchingCrc(tableName, 
segmentName,
+          ValidDocIdsType.SNAPSHOT.name(), MinionContext.getInstance(), 
expectedCrc, consensusMode);
+    }
+  }
+
+  @Test
+  public void testSameValidDocsEqualConsensus() {
+    String tableName = "myTable_REALTIME";
+    String segmentName = "seg1";
+    String expectedCrc = "crc1";
+    List<Object> responses = List.of(
+        makeResponse(segmentName, expectedCrc, "server1", makeBitmap(5)),
+        makeResponse(segmentName, expectedCrc, "server2", makeBitmap(5)),
+        makeResponse(segmentName, expectedCrc, "server3", makeBitmap(5)));
+    RoaringBitmap result = 
getValidDocIdFromServerMatchingCrcWithMockedReader(tableName, segmentName, 
expectedCrc,
+        "EQUAL", responses, new String[]{"server1", "server2", "server3"}, 
this);
+    assertNotNull(result);
+    assertEquals(result.getCardinality(), 5);
+  }
+
+  @Test
+  public void testSameValidDocsMaxValidDocs() {
+    String tableName = "myTable_REALTIME";
+    String segmentName = "seg1";
+    String expectedCrc = "crc1";
+    List<Object> responses = List.of(
+        makeResponse(segmentName, expectedCrc, "server1", makeBitmap(5)),
+        makeResponse(segmentName, expectedCrc, "server2", makeBitmap(5)),
+        makeResponse(segmentName, expectedCrc, "server3", makeBitmap(5)));
+    RoaringBitmap result = 
getValidDocIdFromServerMatchingCrcWithMockedReader(tableName, segmentName, 
expectedCrc,
+        "MOST_VALID_DOCS", responses, new String[]{"server1", "server2", 
"server3"}, this);
+    assertNotNull(result);
+    assertEquals(result.getCardinality(), 5);
+  }
+
+  @Test
+  public void testSameValidDocsNone() {
+    String tableName = "myTable_REALTIME";
+    String segmentName = "seg1";
+    String expectedCrc = "crc1";
+    List<Object> responses = List.of(
+        makeResponse(segmentName, expectedCrc, "server1", makeBitmap(5)),
+        makeResponse(segmentName, expectedCrc, "server2", makeBitmap(5)),
+        makeResponse(segmentName, expectedCrc, "server3", makeBitmap(5)));
+    RoaringBitmap result = 
getValidDocIdFromServerMatchingCrcWithMockedReader(tableName, segmentName, 
expectedCrc,
+        "UNSAFE", responses, new String[]{"server1", "server2", "server3"}, 
this);
+    assertNotNull(result);
+    assertEquals(result.getCardinality(), 5);
+  }
+
+  @Test
+  public void testDifferentValidDocsMaxValidDocsMax() {
+    String tableName = "myTable_REALTIME";
+    String segmentName = "seg1";
+    String expectedCrc = "crc1";
+    List<Object> responses = List.of(
+        makeResponse(segmentName, expectedCrc, "server1", makeBitmap(5)),
+        makeResponse(segmentName, expectedCrc, "server2", makeBitmap(3)),
+        makeResponse(segmentName, expectedCrc, "server3", makeBitmap(4)));
+    RoaringBitmap result = 
getValidDocIdFromServerMatchingCrcWithMockedReader(tableName, segmentName, 
expectedCrc,
+        "MOST_VALID_DOCS", responses, new String[]{"server1", "server2", 
"server3"}, this);
+    assertNotNull(result);
+    assertEquals(result.getCardinality(), 5);
+  }
+
+  @Test
+  public void testsomeServersNoValidDocsEqualConsensus() {
+    String tableName = "myTable_REALTIME";
+    String segmentName = "seg1";
+    String expectedCrc = "crc1";
+    List<Object> responses = List.of(
+        makeResponse(segmentName, expectedCrc, "server1", makeBitmap(0)),
+        makeResponse(segmentName, expectedCrc, "server2", makeBitmap(0)),
+        makeResponse(segmentName, expectedCrc, "server3", makeBitmap(3)));
+    expectThrows(IllegalStateException.class,
+        () -> getValidDocIdFromServerMatchingCrcWithMockedReader(tableName, 
segmentName, expectedCrc,
+            "EQUAL", responses, new String[]{"server1", "server2", "server3"}, 
this));
+  }
+
+  @Test
+  public void testsomeServersNoValidDocsMaxValidDocs() {
+    String tableName = "myTable_REALTIME";
+    String segmentName = "seg1";
+    String expectedCrc = "crc1";
+    List<Object> responses = List.of(
+        makeResponse(segmentName, expectedCrc, "server1", makeBitmap(0)),
+        makeResponse(segmentName, expectedCrc, "server2", makeBitmap(0)),
+        makeResponse(segmentName, expectedCrc, "server3", makeBitmap(3)));
+    RoaringBitmap result = 
getValidDocIdFromServerMatchingCrcWithMockedReader(tableName, segmentName, 
expectedCrc,
+        "MOST_VALID_DOCS", responses, new String[]{"server1", "server2", 
"server3"}, this);
+    assertNotNull(result);
+    assertEquals(result.getCardinality(), 3);
+  }
+
+  @Test
+  public void testSomeServersNoValidDocsNone() {
+    String tableName = "myTable_REALTIME";
+    String segmentName = "seg1";
+    String expectedCrc = "crc1";
+    List<Object> responses = List.of(
+        makeResponse(segmentName, expectedCrc, "server1", makeBitmap(0)),
+        makeResponse(segmentName, expectedCrc, "server2", makeBitmap(0)),
+        makeResponse(segmentName, expectedCrc, "server3", makeBitmap(3)));
+    RoaringBitmap result = 
getValidDocIdFromServerMatchingCrcWithMockedReader(tableName, segmentName, 
expectedCrc,
+        "UNSAFE", responses, new String[]{"server1", "server2", "server3"}, 
this);
+    assertNotNull(result);
+    assertEquals(result.getCardinality(), 0);
+  }
+
+  // --- one server fails (returns null): EQUAL throws; others skip and use 
remaining ---
+
+  @Test
+  public void testOneServerFailsEqualConsensus() {
+    String tableName = "myTable_REALTIME";
+    String segmentName = "seg1";
+    String expectedCrc = "crc1";
+    List<Object> responses = List.of(
+        makeResponse(segmentName, expectedCrc, "server1", makeBitmap(5)),
+        new RuntimeException("simulated fetch failure"),
+        makeResponse(segmentName, expectedCrc, "server3", makeBitmap(5)));
+    expectThrows(IllegalStateException.class,
+        () -> getValidDocIdFromServerMatchingCrcWithMockedReader(tableName, 
segmentName, expectedCrc,
+            "EQUAL", responses, new String[]{"server1", "server2", "server3"}, 
this));
+  }
+
+  @Test
+  public void testOneServerFailsNone() {
+    String tableName = "myTable_REALTIME";
+    String segmentName = "seg1";
+    String expectedCrc = "crc1";
+    List<Object> responses = List.of(
+        new RuntimeException("simulated fetch failure"),
+        makeResponse(segmentName, expectedCrc, "server2", makeBitmap(3)),
+        makeResponse(segmentName, expectedCrc, "server3", makeBitmap(5)));
+    RoaringBitmap result = 
getValidDocIdFromServerMatchingCrcWithMockedReader(tableName, segmentName, 
expectedCrc,
+        "UNSAFE", responses, new String[]{"server1", "server2", "server3"}, 
this);
+    assertNotNull(result);
+    assertEquals(result.getCardinality(), 3);
+  }
+
+  @Test
+  public void testAllServersFailMostValidDocs() {
+    String tableName = "myTable_REALTIME";
+    String segmentName = "seg1";
+    String expectedCrc = "crc1";
+    List<Object> responses = List.of(new RuntimeException("simulated"), new 
RuntimeException("simulated"),
+        new RuntimeException("simulated"));
+    expectThrows(IllegalStateException.class,
+        () -> getValidDocIdFromServerMatchingCrcWithMockedReader(tableName, 
segmentName, expectedCrc, "MOST_VALID_DOCS",
+            responses, new String[]{"server1", "server2", "server3"}, this));
+  }
+
   @Test
   public void testGetPushTaskConfigNoConfig() {
     Map<String, String> taskConfig = new HashMap<>();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to