Jackie-Jiang commented on code in PR #17696:
URL: https://github.com/apache/pinot/pull/17696#discussion_r2886014045


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java:
##########
@@ -281,66 +295,111 @@ public static boolean 
extractMinionAllowDownloadFromServer(TableConfig tableConf
   }
 
   /**
-   * Returns the validDocID bitmap from the server whose local segment crc 
matches both crc of ZK metadata and
-   * deepstore copy (expectedCrc).
+   * Returns the validDocIds bitmap from server(s). {@code comparisonMode} is 
the task config value: NONE,
+   * EQUAL_CONSENSUS(default), or MAX_VALID_DOCS.
    */
   @Nullable
   public static RoaringBitmap getValidDocIdFromServerMatchingCrc(String 
tableNameWithType, String segmentName,
-      String validDocIdsType, MinionContext minionContext, String expectedCrc) 
{
+      String validDocIdsType, MinionContext minionContext, String expectedCrc, 
String comparisonModeStr) {
+    ValidDocIdsComparisonMode comparisonMode = 
parseValidDocIdsComparisonMode(comparisonModeStr);
     String clusterName = minionContext.getHelixManager().getClusterName();
     HelixAdmin helixAdmin = 
minionContext.getHelixManager().getClusterManagmentTool();
-    RoaringBitmap validDocIds = null;
     List<String> servers = getServers(segmentName, tableNameWithType, 
helixAdmin, clusterName);
+    List<RoaringBitmap> matchingBitmaps = new ArrayList<>();
+
     for (String server : servers) {
       InstanceConfig instanceConfig = 
helixAdmin.getInstanceConfig(clusterName, server);
       String endpoint = InstanceUtils.getServerAdminEndpoint(instanceConfig);
 
-      // We only need aggregated table size and the total number of docs/rows. 
Skipping column related stats, by
-      // passing an empty list.
       ServerSegmentMetadataReader serverSegmentMetadataReader = new 
ServerSegmentMetadataReader();
       ValidDocIdsBitmapResponse validDocIdsBitmapResponse;
       try {
         validDocIdsBitmapResponse =
             
serverSegmentMetadataReader.getValidDocIdsBitmapFromServer(tableNameWithType, 
segmentName, endpoint,
                 validDocIdsType, 60_000);
       } catch (Exception e) {
-        LOGGER.warn("Unable to retrieve validDocIds bitmap for segment: " + 
segmentName + " from endpoint: "
-            + endpoint, e);
+        if (comparisonMode == ValidDocIdsComparisonMode.EQUAL_CONSENSUS) {

Review Comment:
   How about the mode with most valid docs?



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java:
##########
@@ -281,66 +295,111 @@ public static boolean 
extractMinionAllowDownloadFromServer(TableConfig tableConf
   }
 
   /**
-   * Returns the validDocID bitmap from the server whose local segment crc 
matches both crc of ZK metadata and
-   * deepstore copy (expectedCrc).
+   * Returns the validDocIds bitmap from server(s). {@code comparisonMode} is 
the task config value: NONE,
+   * EQUAL_CONSENSUS(default), or MAX_VALID_DOCS.
    */
   @Nullable
   public static RoaringBitmap getValidDocIdFromServerMatchingCrc(String 
tableNameWithType, String segmentName,
-      String validDocIdsType, MinionContext minionContext, String expectedCrc) 
{
+      String validDocIdsType, MinionContext minionContext, String expectedCrc, 
String comparisonModeStr) {
+    ValidDocIdsComparisonMode comparisonMode = 
parseValidDocIdsComparisonMode(comparisonModeStr);
     String clusterName = minionContext.getHelixManager().getClusterName();
     HelixAdmin helixAdmin = 
minionContext.getHelixManager().getClusterManagmentTool();
-    RoaringBitmap validDocIds = null;
     List<String> servers = getServers(segmentName, tableNameWithType, 
helixAdmin, clusterName);
+    List<RoaringBitmap> matchingBitmaps = new ArrayList<>();
+
     for (String server : servers) {
       InstanceConfig instanceConfig = 
helixAdmin.getInstanceConfig(clusterName, server);
       String endpoint = InstanceUtils.getServerAdminEndpoint(instanceConfig);
 
-      // We only need aggregated table size and the total number of docs/rows. 
Skipping column related stats, by
-      // passing an empty list.
       ServerSegmentMetadataReader serverSegmentMetadataReader = new 
ServerSegmentMetadataReader();
       ValidDocIdsBitmapResponse validDocIdsBitmapResponse;
       try {
         validDocIdsBitmapResponse =
             
serverSegmentMetadataReader.getValidDocIdsBitmapFromServer(tableNameWithType, 
segmentName, endpoint,
                 validDocIdsType, 60_000);
       } catch (Exception e) {
-        LOGGER.warn("Unable to retrieve validDocIds bitmap for segment: " + 
segmentName + " from endpoint: "
-            + endpoint, e);
+        if (comparisonMode == ValidDocIdsComparisonMode.EQUAL_CONSENSUS) {
+          throw new IllegalStateException(
+              "Unable to retrieve validDocIds bitmap for segment: " + 
segmentName + " from endpoint: " + endpoint, e);
+        }
+        LOGGER.warn(
+            "Unable to retrieve validDocIds bitmap for segment: " + 
segmentName + " from endpoint: " + endpoint, e);
         continue;
       }
 
+      String crcFromValidDocIdsBitmap = 
validDocIdsBitmapResponse.getSegmentCrc();
       // Check crc from the downloaded segment against the crc returned from 
the server along with the valid doc id
       // bitmap. If this doesn't match, this means that we are hitting the 
race condition where the segment has been
       // uploaded successfully while the server is still reloading the 
segment. Reloading can take a while when the
       // offheap upsert is used because we will need to delete & add all 
primary keys.
       // `BaseSingleSegmentConversionExecutor.executeTask()` already checks 
for the crc from the task generator
       // against the crc from the current segment zk metadata, so we don't 
need to check that here.
-      String crcFromValidDocIdsBitmap = 
validDocIdsBitmapResponse.getSegmentCrc();
       if (!expectedCrc.equals(crcFromValidDocIdsBitmap)) {
-        // In this scenario, we are hitting the other replica of the segment 
which did not commit to ZK or deepstore.
-        // We will skip processing this bitmap to query other server to 
confirm if there is a valid matching CRC.
-        String message = "CRC mismatch for segment: " + segmentName + ", 
expected value based on task generator: "
-            + expectedCrc + ", actual crc from validDocIdsBitmapResponse from 
endpoint " + endpoint + ": "
-            + crcFromValidDocIdsBitmap;
-        LOGGER.warn(message);
+        if (comparisonMode == ValidDocIdsComparisonMode.EQUAL_CONSENSUS) {

Review Comment:
   Same as above



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java:
##########
@@ -281,66 +295,111 @@ public static boolean 
extractMinionAllowDownloadFromServer(TableConfig tableConf
   }
 
   /**
-   * Returns the validDocID bitmap from the server whose local segment crc 
matches both crc of ZK metadata and
-   * deepstore copy (expectedCrc).
+   * Returns the validDocIds bitmap from server(s). {@code comparisonMode} is 
the task config value: NONE,
+   * EQUAL_CONSENSUS(default), or MAX_VALID_DOCS.
    */
   @Nullable
   public static RoaringBitmap getValidDocIdFromServerMatchingCrc(String 
tableNameWithType, String segmentName,
-      String validDocIdsType, MinionContext minionContext, String expectedCrc) 
{
+      String validDocIdsType, MinionContext minionContext, String expectedCrc, 
String comparisonModeStr) {
+    ValidDocIdsComparisonMode comparisonMode = 
parseValidDocIdsComparisonMode(comparisonModeStr);
     String clusterName = minionContext.getHelixManager().getClusterName();
     HelixAdmin helixAdmin = 
minionContext.getHelixManager().getClusterManagmentTool();
-    RoaringBitmap validDocIds = null;
     List<String> servers = getServers(segmentName, tableNameWithType, 
helixAdmin, clusterName);
+    List<RoaringBitmap> matchingBitmaps = new ArrayList<>();
+
     for (String server : servers) {
       InstanceConfig instanceConfig = 
helixAdmin.getInstanceConfig(clusterName, server);
       String endpoint = InstanceUtils.getServerAdminEndpoint(instanceConfig);
 
-      // We only need aggregated table size and the total number of docs/rows. 
Skipping column related stats, by
-      // passing an empty list.
       ServerSegmentMetadataReader serverSegmentMetadataReader = new 
ServerSegmentMetadataReader();
       ValidDocIdsBitmapResponse validDocIdsBitmapResponse;
       try {
         validDocIdsBitmapResponse =
             
serverSegmentMetadataReader.getValidDocIdsBitmapFromServer(tableNameWithType, 
segmentName, endpoint,
                 validDocIdsType, 60_000);
       } catch (Exception e) {
-        LOGGER.warn("Unable to retrieve validDocIds bitmap for segment: " + 
segmentName + " from endpoint: "
-            + endpoint, e);
+        if (comparisonMode == ValidDocIdsComparisonMode.EQUAL_CONSENSUS) {
+          throw new IllegalStateException(
+              "Unable to retrieve validDocIds bitmap for segment: " + 
segmentName + " from endpoint: " + endpoint, e);
+        }
+        LOGGER.warn(
+            "Unable to retrieve validDocIds bitmap for segment: " + 
segmentName + " from endpoint: " + endpoint, e);
         continue;
       }
 
+      String crcFromValidDocIdsBitmap = 
validDocIdsBitmapResponse.getSegmentCrc();
       // Check crc from the downloaded segment against the crc returned from 
the server along with the valid doc id
       // bitmap. If this doesn't match, this means that we are hitting the 
race condition where the segment has been
       // uploaded successfully while the server is still reloading the 
segment. Reloading can take a while when the
       // offheap upsert is used because we will need to delete & add all 
primary keys.
       // `BaseSingleSegmentConversionExecutor.executeTask()` already checks 
for the crc from the task generator
       // against the crc from the current segment zk metadata, so we don't 
need to check that here.
-      String crcFromValidDocIdsBitmap = 
validDocIdsBitmapResponse.getSegmentCrc();
       if (!expectedCrc.equals(crcFromValidDocIdsBitmap)) {
-        // In this scenario, we are hitting the other replica of the segment 
which did not commit to ZK or deepstore.
-        // We will skip processing this bitmap to query other server to 
confirm if there is a valid matching CRC.
-        String message = "CRC mismatch for segment: " + segmentName + ", 
expected value based on task generator: "
-            + expectedCrc + ", actual crc from validDocIdsBitmapResponse from 
endpoint " + endpoint + ": "
-            + crcFromValidDocIdsBitmap;
-        LOGGER.warn(message);
+        if (comparisonMode == ValidDocIdsComparisonMode.EQUAL_CONSENSUS) {
+          throw new IllegalStateException(
+              "CRC mismatch for segment: " + segmentName + ", expected: " + 
expectedCrc + ", actual from endpoint "
+                  + endpoint + ": " + crcFromValidDocIdsBitmap);
+        }
+        LOGGER.warn("CRC mismatch for segment: {} from endpoint {}, skipping", 
segmentName, endpoint);
         continue;
       }
 
-      // skipping servers which are not in READY state. The bitmaps would be 
inconsistent when
-      // server is NOT READY as UPDATING segments might be updating the ONLINE 
segments
       if (validDocIdsBitmapResponse.getServerStatus() != null && 
!validDocIdsBitmapResponse.getServerStatus()
           .equals(ServiceStatus.Status.GOOD)) {
-        String message = "Server " + validDocIdsBitmapResponse.getInstanceId() 
+ " is in "
-            + validDocIdsBitmapResponse.getServerStatus() + " state, skipping 
it for execution for segment: "
-            + validDocIdsBitmapResponse.getSegmentName() + ". Will try other 
servers.";
-        LOGGER.warn(message);
+        if (comparisonMode == ValidDocIdsComparisonMode.EQUAL_CONSENSUS) {
+          throw new IllegalStateException("Server " + 
validDocIdsBitmapResponse.getInstanceId() + " is in "
+              + validDocIdsBitmapResponse.getServerStatus() + " state for 
segment: " + segmentName
+              + ". Failing task to avoid inconsistency among replicas.");
+        }
+        LOGGER.warn("Server {} not READY for segment {}, skipping", 
validDocIdsBitmapResponse.getInstanceId(),
+            segmentName);
         continue;
       }
 
-      validDocIds = 
RoaringBitmapUtils.deserialize(validDocIdsBitmapResponse.getBitmap());
-      break;
+      RoaringBitmap bitmap = 
RoaringBitmapUtils.deserialize(validDocIdsBitmapResponse.getBitmap());
+      int cardinality = bitmap.getCardinality();
+
+      if (comparisonMode == ValidDocIdsComparisonMode.NONE) {
+        LOGGER.info("Using server {} with {} valid docs for segment {} 
(mode=NONE)", server, cardinality, segmentName);
+        return bitmap;
+      }
+
+      matchingBitmaps.add(bitmap);
+    }
+
+    if (matchingBitmaps.isEmpty()) {
+      return null;
+    }
+
+    if (comparisonMode == ValidDocIdsComparisonMode.EQUAL_CONSENSUS) {
+      RoaringBitmap consensusBitMap = matchingBitmaps.get(0);
+      long consensusCardinality = consensusBitMap.getCardinality();

Review Comment:
   Should we directly compare bitmap? I feel it is more robust than comparing 
cardinality



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java:
##########
@@ -67,6 +66,21 @@
 public class MinionTaskUtils {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(MinionTaskUtils.class);
 
+  /** Valid doc ids comparison mode (executor-only). Kept internal; executors 
pass config string. */
+  enum ValidDocIdsComparisonMode {

Review Comment:
   Let's move this enum into `MinionConstants`
   
   Suggest naming it `ValidDocIdsConsensusMode`, and make the mode `UNSAFE`, 
`SAME`, `MOST_VALID_DOCS`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to