Jackie-Jiang commented on code in PR #17696:
URL: https://github.com/apache/pinot/pull/17696#discussion_r2802207592
##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java:
##########
@@ -281,47 +293,67 @@ public static RoaringBitmap
getValidDocIdFromServerMatchingCrc(String tableNameW
// We only need aggregated table size and the total number of docs/rows.
Skipping column related stats, by
// passing an empty list.
ServerSegmentMetadataReader serverSegmentMetadataReader = new
ServerSegmentMetadataReader();
- ValidDocIdsBitmapResponse validDocIdsBitmapResponse;
+ ValidDocIdsBitmapResponse validDocIdsBitmapResponse = null;
try {
validDocIdsBitmapResponse =
serverSegmentMetadataReader.getValidDocIdsBitmapFromServer(tableNameWithType,
segmentName, endpoint,
validDocIdsType, 60_000);
} catch (Exception e) {
- LOGGER.warn("Unable to retrieve validDocIds bitmap for segment: " +
segmentName + " from endpoint: "
- + endpoint, e);
- continue;
+ // We need validDocIds from all servers; do not continue if any server
fails to return the bitmap.
+ if (TableConfigUtils.checkForInconsistentStateConfigs(tableConfig)) {
Review Comment:
Perform this check on the caller side, and pass in as a boolean flag (e.g.
`strict`)
##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java:
##########
@@ -281,47 +293,67 @@ public static RoaringBitmap
getValidDocIdFromServerMatchingCrc(String tableNameW
// We only need aggregated table size and the total number of docs/rows.
Skipping column related stats, by
// passing an empty list.
ServerSegmentMetadataReader serverSegmentMetadataReader = new
ServerSegmentMetadataReader();
- ValidDocIdsBitmapResponse validDocIdsBitmapResponse;
+ ValidDocIdsBitmapResponse validDocIdsBitmapResponse = null;
try {
validDocIdsBitmapResponse =
serverSegmentMetadataReader.getValidDocIdsBitmapFromServer(tableNameWithType,
segmentName, endpoint,
validDocIdsType, 60_000);
} catch (Exception e) {
- LOGGER.warn("Unable to retrieve validDocIds bitmap for segment: " +
segmentName + " from endpoint: "
- + endpoint, e);
- continue;
+ // We need validDocIds from all servers; do not continue if any server
fails to return the bitmap.
+ if (TableConfigUtils.checkForInconsistentStateConfigs(tableConfig)) {
+ throw new IllegalStateException(
+ "Unable to retrieve validDocIds bitmap for segment: " +
segmentName + " from endpoint: " + endpoint
+ + ". ValidDocIds bitmap is required from all servers holding
the segment.", e);
+ } else {
+ LOGGER.warn(
+ "Unable to retrieve validDocIds bitmap for segment: " +
segmentName + " from endpoint: " + endpoint, e);
+ continue;
+ }
}
// Check crc from the downloaded segment against the crc returned from
the server along with the valid doc id
- // bitmap. If this doesn't match, this means that we are hitting the
race condition where the segment has been
- // uploaded successfully while the server is still reloading the
segment. Reloading can take a while when the
- // offheap upsert is used because we will need to delete & add all
primary keys.
- // `BaseSingleSegmentConversionExecutor.executeTask()` already checks
for the crc from the task generator
- // against the crc from the current segment zk metadata, so we don't
need to check that here.
+ // bitmap. If this doesn't match, we are hitting a replica that has not
committed to ZK/deepstore yet (e.g.
+ // still reloading). We require all servers to have matching CRC to
avoid partial upsert inconsistencies.
String crcFromValidDocIdsBitmap =
validDocIdsBitmapResponse.getSegmentCrc();
if (!expectedCrc.equals(crcFromValidDocIdsBitmap)) {
- // In this scenario, we are hitting the other replica of the segment
which did not commit to ZK or deepstore.
- // We will skip processing this bitmap to query other server to
confirm if there is a valid matching CRC.
- String message = "CRC mismatch for segment: " + segmentName + ",
expected value based on task generator: "
- + expectedCrc + ", actual crc from validDocIdsBitmapResponse from
endpoint " + endpoint + ": "
- + crcFromValidDocIdsBitmap;
- LOGGER.warn(message);
- continue;
+ if (TableConfigUtils.checkForInconsistentStateConfigs(tableConfig)) {
+ throw new IllegalStateException("CRC mismatch for segment: " +
segmentName + " from endpoint: " + endpoint
+ + ". Expected CRC (from task generator): " + expectedCrc + ",
actual from server: "
+ + crcFromValidDocIdsBitmap
+ + ". ValidDocIds bitmap is required from all servers with
matching CRC to avoid replica inconsistency.");
+ } else {
Review Comment:
Why removing the warning log?
##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java:
##########
@@ -281,47 +293,67 @@ public static RoaringBitmap
getValidDocIdFromServerMatchingCrc(String tableNameW
// We only need aggregated table size and the total number of docs/rows.
Skipping column related stats, by
// passing an empty list.
ServerSegmentMetadataReader serverSegmentMetadataReader = new
ServerSegmentMetadataReader();
- ValidDocIdsBitmapResponse validDocIdsBitmapResponse;
+ ValidDocIdsBitmapResponse validDocIdsBitmapResponse = null;
try {
validDocIdsBitmapResponse =
serverSegmentMetadataReader.getValidDocIdsBitmapFromServer(tableNameWithType,
segmentName, endpoint,
validDocIdsType, 60_000);
} catch (Exception e) {
- LOGGER.warn("Unable to retrieve validDocIds bitmap for segment: " +
segmentName + " from endpoint: "
- + endpoint, e);
- continue;
+ // We need validDocIds from all servers; do not continue if any server
fails to return the bitmap.
+ if (TableConfigUtils.checkForInconsistentStateConfigs(tableConfig)) {
+ throw new IllegalStateException(
+ "Unable to retrieve validDocIds bitmap for segment: " +
segmentName + " from endpoint: " + endpoint
+ + ". ValidDocIds bitmap is required from all servers holding
the segment.", e);
+ } else {
+ LOGGER.warn(
+ "Unable to retrieve validDocIds bitmap for segment: " +
segmentName + " from endpoint: " + endpoint, e);
+ continue;
+ }
}
// Check crc from the downloaded segment against the crc returned from
the server along with the valid doc id
- // bitmap. If this doesn't match, this means that we are hitting the
race condition where the segment has been
- // uploaded successfully while the server is still reloading the
segment. Reloading can take a while when the
- // offheap upsert is used because we will need to delete & add all
primary keys.
- // `BaseSingleSegmentConversionExecutor.executeTask()` already checks
for the crc from the task generator
- // against the crc from the current segment zk metadata, so we don't
need to check that here.
+ // bitmap. If this doesn't match, we are hitting a replica that has not
committed to ZK/deepstore yet (e.g.
+ // still reloading). We require all servers to have matching CRC to
avoid partial upsert inconsistencies.
String crcFromValidDocIdsBitmap =
validDocIdsBitmapResponse.getSegmentCrc();
if (!expectedCrc.equals(crcFromValidDocIdsBitmap)) {
- // In this scenario, we are hitting the other replica of the segment
which did not commit to ZK or deepstore.
- // We will skip processing this bitmap to query other server to
confirm if there is a valid matching CRC.
- String message = "CRC mismatch for segment: " + segmentName + ",
expected value based on task generator: "
- + expectedCrc + ", actual crc from validDocIdsBitmapResponse from
endpoint " + endpoint + ": "
- + crcFromValidDocIdsBitmap;
- LOGGER.warn(message);
- continue;
+ if (TableConfigUtils.checkForInconsistentStateConfigs(tableConfig)) {
+ throw new IllegalStateException("CRC mismatch for segment: " +
segmentName + " from endpoint: " + endpoint
+ + ". Expected CRC (from task generator): " + expectedCrc + ",
actual from server: "
+ + crcFromValidDocIdsBitmap
+ + ". ValidDocIds bitmap is required from all servers with
matching CRC to avoid replica inconsistency.");
+ } else {
+ continue;
+ }
}
- // skipping servers which are not in READY state. The bitmaps would be
inconsistent when
- // server is NOT READY as UPDATING segments might be updating the ONLINE
segments
+ // Require all servers to be READY. Bitmaps are inconsistent when server
is NOT READY (e.g. UPDATING segments).
if (validDocIdsBitmapResponse.getServerStatus() != null &&
!validDocIdsBitmapResponse.getServerStatus()
.equals(ServiceStatus.Status.GOOD)) {
- String message = "Server " + validDocIdsBitmapResponse.getInstanceId()
+ " is in "
- + validDocIdsBitmapResponse.getServerStatus() + " state, skipping
it for execution for segment: "
- + validDocIdsBitmapResponse.getSegmentName() + ". Will try other
servers.";
- LOGGER.warn(message);
- continue;
+ if (TableConfigUtils.checkForInconsistentStateConfigs(tableConfig)) {
+ throw new IllegalStateException("Server " +
validDocIdsBitmapResponse.getInstanceId() + " is in "
+ + validDocIdsBitmapResponse.getServerStatus() + " state for
segment: " + segmentName
+ + ". All servers holding the segment must be READY to avoid
partial upsert replica inconsistency.");
+ } else {
+ continue;
+ }
}
- validDocIds =
RoaringBitmapUtils.deserialize(validDocIdsBitmapResponse.getBitmap());
- break;
+ RoaringBitmap candidateValidDocIds =
RoaringBitmapUtils.deserialize(validDocIdsBitmapResponse.getBitmap());
+ int candidateCardinality = candidateValidDocIds.getCardinality();
+
+ // Pick the server with the highest cardinality (most valid docs) to
minimize "primary keys not replaced"
+ // warnings during segment replacement. This is important for partial
upsert tables where replicas may have
+ // different validDocIds due to consumption lag or out-of-order record
delivery.
+ if (candidateCardinality > maxCardinality) {
Review Comment:
Do we also want to perform the check on regular cases (e.g. FULL upsert)?
##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java:
##########
@@ -281,47 +293,67 @@ public static RoaringBitmap
getValidDocIdFromServerMatchingCrc(String tableNameW
// We only need aggregated table size and the total number of docs/rows.
Skipping column related stats, by
// passing an empty list.
ServerSegmentMetadataReader serverSegmentMetadataReader = new
ServerSegmentMetadataReader();
- ValidDocIdsBitmapResponse validDocIdsBitmapResponse;
+ ValidDocIdsBitmapResponse validDocIdsBitmapResponse = null;
try {
validDocIdsBitmapResponse =
serverSegmentMetadataReader.getValidDocIdsBitmapFromServer(tableNameWithType,
segmentName, endpoint,
validDocIdsType, 60_000);
} catch (Exception e) {
- LOGGER.warn("Unable to retrieve validDocIds bitmap for segment: " +
segmentName + " from endpoint: "
- + endpoint, e);
- continue;
+ // We need validDocIds from all servers; do not continue if any server
fails to return the bitmap.
+ if (TableConfigUtils.checkForInconsistentStateConfigs(tableConfig)) {
+ throw new IllegalStateException(
+ "Unable to retrieve validDocIds bitmap for segment: " +
segmentName + " from endpoint: " + endpoint
+ + ". ValidDocIds bitmap is required from all servers holding
the segment.", e);
+ } else {
+ LOGGER.warn(
+ "Unable to retrieve validDocIds bitmap for segment: " +
segmentName + " from endpoint: " + endpoint, e);
+ continue;
+ }
}
// Check crc from the downloaded segment against the crc returned from
the server along with the valid doc id
- // bitmap. If this doesn't match, this means that we are hitting the
race condition where the segment has been
- // uploaded successfully while the server is still reloading the
segment. Reloading can take a while when the
- // offheap upsert is used because we will need to delete & add all
primary keys.
- // `BaseSingleSegmentConversionExecutor.executeTask()` already checks
for the crc from the task generator
- // against the crc from the current segment zk metadata, so we don't
need to check that here.
+ // bitmap. If this doesn't match, we are hitting a replica that has not
committed to ZK/deepstore yet (e.g.
+ // still reloading). We require all servers to have matching CRC to
avoid partial upsert inconsistencies.
String crcFromValidDocIdsBitmap =
validDocIdsBitmapResponse.getSegmentCrc();
if (!expectedCrc.equals(crcFromValidDocIdsBitmap)) {
- // In this scenario, we are hitting the other replica of the segment
which did not commit to ZK or deepstore.
- // We will skip processing this bitmap to query other server to
confirm if there is a valid matching CRC.
- String message = "CRC mismatch for segment: " + segmentName + ",
expected value based on task generator: "
- + expectedCrc + ", actual crc from validDocIdsBitmapResponse from
endpoint " + endpoint + ": "
- + crcFromValidDocIdsBitmap;
- LOGGER.warn(message);
- continue;
+ if (TableConfigUtils.checkForInconsistentStateConfigs(tableConfig)) {
+ throw new IllegalStateException("CRC mismatch for segment: " +
segmentName + " from endpoint: " + endpoint
+ + ". Expected CRC (from task generator): " + expectedCrc + ",
actual from server: "
+ + crcFromValidDocIdsBitmap
+ + ". ValidDocIds bitmap is required from all servers with
matching CRC to avoid replica inconsistency.");
+ } else {
+ continue;
+ }
}
- // skipping servers which are not in READY state. The bitmaps would be
inconsistent when
- // server is NOT READY as UPDATING segments might be updating the ONLINE
segments
+ // Require all servers to be READY. Bitmaps are inconsistent when server
is NOT READY (e.g. UPDATING segments).
if (validDocIdsBitmapResponse.getServerStatus() != null &&
!validDocIdsBitmapResponse.getServerStatus()
.equals(ServiceStatus.Status.GOOD)) {
- String message = "Server " + validDocIdsBitmapResponse.getInstanceId()
+ " is in "
- + validDocIdsBitmapResponse.getServerStatus() + " state, skipping
it for execution for segment: "
- + validDocIdsBitmapResponse.getSegmentName() + ". Will try other
servers.";
- LOGGER.warn(message);
- continue;
+ if (TableConfigUtils.checkForInconsistentStateConfigs(tableConfig)) {
+ throw new IllegalStateException("Server " +
validDocIdsBitmapResponse.getInstanceId() + " is in "
+ + validDocIdsBitmapResponse.getServerStatus() + " state for
segment: " + segmentName
+ + ". All servers holding the segment must be READY to avoid
partial upsert replica inconsistency.");
+ } else {
+ continue;
Review Comment:
Same here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]