This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 1fad53a344 Make upsert compaction task more robust to crc mismatch
(#13489)
1fad53a344 is described below
commit 1fad53a344398593cdff4271a2d472cc16cf24d4
Author: Pratik Tibrewal <[email protected]>
AuthorDate: Fri Jul 12 20:21:42 2024 +0530
Make upsert compaction task more robust to crc mismatch (#13489)
* Allow upsert compaction to work properly during schema / indexing updates
* iterate through all servers
* improve upsert compaction task generator logic
* address comments - refactor
* address comments - more refactoring
* address comments -- undo some refactoring
* minor address
---
.../util/ServerSegmentMetadataReader.java | 28 ++++++--
.../pinot/plugin/minion/tasks/MinionTaskUtils.java | 81 +++++++++++++++-------
.../UpsertCompactionTaskExecutor.java | 41 +++++------
.../UpsertCompactionTaskGenerator.java | 51 +++++++-------
.../UpsertCompactionTaskGeneratorTest.java | 52 ++++++++------
5 files changed, 153 insertions(+), 100 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
index 9c2ffa6196..b3fd851ff4 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
@@ -34,6 +34,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.core.MediaType;
@@ -215,14 +216,30 @@ public class ServerSegmentMetadataReader {
/**
* This method is called when the API request is to fetch validDocId
metadata for a list segments of the given table.
- * This method will pick a server that hosts the target segment and fetch
the segment metadata result.
+ * This method will pick one server randomly that hosts the target segment
and fetch the segment metadata result.
*
- * @return segment metadata as a JSON string
+ * @return list of valid doc id metadata, one per segment processed.
*/
public List<ValidDocIdsMetadataInfo> getValidDocIdsMetadataFromServer(String
tableNameWithType,
Map<String, List<String>> serverToSegmentsMap, BiMap<String, String>
serverToEndpoints,
@Nullable List<String> segmentNames, int timeoutMs, String
validDocIdsType,
int numSegmentsBatchPerServerRequest) {
+ return getSegmentToValidDocIdsMetadataFromServer(tableNameWithType,
serverToSegmentsMap, serverToEndpoints,
+ segmentNames, timeoutMs, validDocIdsType,
numSegmentsBatchPerServerRequest).values().stream()
+ .filter(list -> list != null && !list.isEmpty()).map(list ->
list.get(0)).collect(Collectors.toList());
+ }
+
+ /**
+ * This method is called when the API request is to fetch validDocId
metadata for a list segments of the given table.
+ * This method will pick all servers that hosts the target segment and fetch
the segment metadata result and
+ * return as a list.
+ *
+ * @return map of segment name to list of valid doc id metadata where each
element is every server's metadata.
+ */
+ public Map<String, List<ValidDocIdsMetadataInfo>>
getSegmentToValidDocIdsMetadataFromServer(String tableNameWithType,
+ Map<String, List<String>> serverToSegmentsMap, BiMap<String, String>
serverToEndpoints,
+ @Nullable List<String> segmentNames, int timeoutMs, String
validDocIdsType,
+ int numSegmentsBatchPerServerRequest) {
List<Pair<String, String>> serverURLsAndBodies = new ArrayList<>();
for (Map.Entry<String, List<String>> serverToSegments :
serverToSegmentsMap.entrySet()) {
List<String> segmentsForServer = serverToSegments.getValue();
@@ -256,7 +273,7 @@ public class ServerSegmentMetadataReader {
completionServiceHelper.doMultiPostRequest(serverURLsAndBodies,
tableNameWithType, true, requestHeaders,
timeoutMs, null);
- Map<String, ValidDocIdsMetadataInfo> validDocIdsMetadataInfos = new
HashMap<>();
+ Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadataInfos = new
HashMap<>();
int failedParses = 0;
int returnedServerRequestsCount = 0;
for (Map.Entry<String, String> streamResponse :
serviceResponse._httpResponses.entrySet()) {
@@ -266,7 +283,8 @@ public class ServerSegmentMetadataReader {
JsonUtils.stringToObject(validDocIdsMetadataList, new
TypeReference<ArrayList<ValidDocIdsMetadataInfo>>() {
});
for (ValidDocIdsMetadataInfo validDocIdsMetadataInfo :
validDocIdsMetadataInfoList) {
-
validDocIdsMetadataInfos.put(validDocIdsMetadataInfo.getSegmentName(),
validDocIdsMetadataInfo);
+
validDocIdsMetadataInfos.computeIfAbsent(validDocIdsMetadataInfo.getSegmentName(),
k -> new ArrayList<>())
+ .add(validDocIdsMetadataInfo);
}
returnedServerRequestsCount++;
} catch (Exception e) {
@@ -292,7 +310,7 @@ public class ServerSegmentMetadataReader {
LOGGER.info("Retrieved validDocIds metadata for {} segments from {} server
requests.",
validDocIdsMetadataInfos.size(), returnedServerRequestsCount);
- return new ArrayList<>(validDocIdsMetadataInfos.values());
+ return validDocIdsMetadataInfos;
}
/**
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
index 9b7dad1955..55dfb97f98 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
@@ -23,10 +23,12 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import javax.annotation.Nullable;
import org.apache.helix.HelixAdmin;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.restlet.resources.ValidDocIdsBitmapResponse;
+import org.apache.pinot.common.utils.RoaringBitmapUtils;
import org.apache.pinot.common.utils.config.InstanceUtils;
import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
import org.apache.pinot.controller.util.ServerSegmentMetadataReader;
@@ -42,6 +44,7 @@ import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -143,32 +146,6 @@ public class MinionTaskUtils {
return dirInStr;
}
- public static ValidDocIdsBitmapResponse getValidDocIdsBitmap(String
tableNameWithType, String segmentName,
- String validDocIdsType, MinionContext minionContext) {
- HelixAdmin helixAdmin =
minionContext.getHelixManager().getClusterManagmentTool();
- String clusterName = minionContext.getHelixManager().getClusterName();
-
- List<String> servers = getServers(segmentName, tableNameWithType,
helixAdmin, clusterName);
- for (String server : servers) {
- InstanceConfig instanceConfig =
helixAdmin.getInstanceConfig(clusterName, server);
- String endpoint = InstanceUtils.getServerAdminEndpoint(instanceConfig);
-
- // We only need aggregated table size and the total number of docs/rows.
Skipping column related stats, by
- // passing an empty list.
- ServerSegmentMetadataReader serverSegmentMetadataReader = new
ServerSegmentMetadataReader();
- try {
- return
serverSegmentMetadataReader.getValidDocIdsBitmapFromServer(tableNameWithType,
segmentName, endpoint,
- validDocIdsType, 60_000);
- } catch (Exception e) {
- LOGGER.warn(
- String.format("Unable to retrieve validDocIds bitmap for segment:
%s from endpoint: %s", segmentName,
- endpoint), e);
- }
- }
- throw new IllegalStateException(
- String.format("Unable to retrieve validDocIds bitmap for segment: %s
from servers: %s", segmentName, servers));
- }
-
public static List<String> getServers(String segmentName, String
tableNameWithType, HelixAdmin helixAdmin,
String clusterName) {
ExternalView externalView =
helixAdmin.getResourceExternalView(clusterName, tableNameWithType);
@@ -206,4 +183,56 @@ public class MinionTaskUtils {
}
return defaultValue;
}
+
+ /**
+ * Returns the validDocID bitmap from the server whose local segment crc
matches both crc of ZK metadata and
+ * deepstore copy (expectedCrc).
+ */
+ @Nullable
+ public static RoaringBitmap getValidDocIdFromServerMatchingCrc(String
tableNameWithType, String segmentName,
+ String validDocIdsType, MinionContext minionContext, String expectedCrc)
{
+ String clusterName = minionContext.getHelixManager().getClusterName();
+ HelixAdmin helixAdmin =
minionContext.getHelixManager().getClusterManagmentTool();
+ RoaringBitmap validDocIds = null;
+ List<String> servers = getServers(segmentName, tableNameWithType,
helixAdmin, clusterName);
+ for (String server : servers) {
+ InstanceConfig instanceConfig =
helixAdmin.getInstanceConfig(clusterName, server);
+ String endpoint = InstanceUtils.getServerAdminEndpoint(instanceConfig);
+
+ // We only need aggregated table size and the total number of docs/rows.
Skipping column related stats, by
+ // passing an empty list.
+ ServerSegmentMetadataReader serverSegmentMetadataReader = new
ServerSegmentMetadataReader();
+ ValidDocIdsBitmapResponse validDocIdsBitmapResponse;
+ try {
+ validDocIdsBitmapResponse =
+
serverSegmentMetadataReader.getValidDocIdsBitmapFromServer(tableNameWithType,
segmentName, endpoint,
+ validDocIdsType, 60_000);
+ } catch (Exception e) {
+ LOGGER.warn(
+ String.format("Unable to retrieve validDocIds bitmap for segment:
%s from endpoint: %s", segmentName,
+ endpoint), e);
+ continue;
+ }
+
+ // Check crc from the downloaded segment against the crc returned from
the server along with the valid doc id
+ // bitmap. If this doesn't match, this means that we are hitting the
race condition where the segment has been
+ // uploaded successfully while the server is still reloading the
segment. Reloading can take a while when the
+ // offheap upsert is used because we will need to delete & add all
primary keys.
+ // `BaseSingleSegmentConversionExecutor.executeTask()` already checks
for the crc from the task generator
+ // against the crc from the current segment zk metadata, so we don't
need to check that here.
+ String crcFromValidDocIdsBitmap =
validDocIdsBitmapResponse.getSegmentCrc();
+ if (!expectedCrc.equals(crcFromValidDocIdsBitmap)) {
+ // In this scenario, we are hitting the other replica of the segment
which did not commit to ZK or deepstore.
+ // We will skip processing this bitmap to query other server to
confirm if there is a valid matching CRC.
+ String message = String.format("CRC mismatch for segment: %s, expected
value based on task generator: %s, "
+ + "actual crc from validDocIdsBitmapResponse from endpoint %s:
%s", segmentName, expectedCrc, endpoint,
+ crcFromValidDocIdsBitmap);
+ LOGGER.warn(message);
+ continue;
+ }
+ validDocIds =
RoaringBitmapUtils.deserialize(validDocIdsBitmapResponse.getBitmap());
+ break;
+ }
+ return validDocIds;
+ }
}
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
index ec5cc127d9..e683214f43 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
@@ -23,9 +23,7 @@ import java.util.Collections;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import
org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
-import org.apache.pinot.common.restlet.resources.ValidDocIdsBitmapResponse;
import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
-import org.apache.pinot.common.utils.RoaringBitmapUtils;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
import
org.apache.pinot.plugin.minion.tasks.BaseSingleSegmentConversionExecutor;
@@ -60,31 +58,28 @@ public class UpsertCompactionTaskExecutor extends
BaseSingleSegmentConversionExe
String validDocIdsTypeStr =
configs.getOrDefault(MinionConstants.UpsertCompactionTask.VALID_DOC_IDS_TYPE,
ValidDocIdsType.SNAPSHOT.name());
- ValidDocIdsType validDocIdsType =
ValidDocIdsType.valueOf(validDocIdsTypeStr.toUpperCase());
- ValidDocIdsBitmapResponse validDocIdsBitmapResponse =
- MinionTaskUtils.getValidDocIdsBitmap(tableNameWithType, segmentName,
validDocIdsType.toString(),
- MINION_CONTEXT);
-
- // Check crc from the downloaded segment against the crc returned from the
server along with the valid doc id
- // bitmap. If this doesn't match, this means that we are hitting the race
condition where the segment has been
- // uploaded successfully while the server is still reloading the segment.
Reloading can take a while when the
- // offheap upsert is used because we will need to delete & add all primary
keys.
- // `BaseSingleSegmentConversionExecutor.executeTask()` already checks for
the crc from the task generator
- // against the crc from the current segment zk metadata, so we don't need
to check that here.
SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir);
String originalSegmentCrcFromTaskGenerator =
configs.get(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY);
String crcFromDeepStorageSegment = segmentMetadata.getCrc();
- String crcFromValidDocIdsBitmap =
validDocIdsBitmapResponse.getSegmentCrc();
- if (!originalSegmentCrcFromTaskGenerator.equals(crcFromDeepStorageSegment)
- ||
!originalSegmentCrcFromTaskGenerator.equals(crcFromValidDocIdsBitmap)) {
- LOGGER.warn("CRC mismatch for segment: {}, expected: {}, actual crc from
server: {}", segmentName,
- crcFromDeepStorageSegment,
validDocIdsBitmapResponse.getSegmentCrc());
- return new
SegmentConversionResult.Builder().setTableNameWithType(tableNameWithType).setSegmentName(segmentName)
- .build();
+ if
(!originalSegmentCrcFromTaskGenerator.equals(crcFromDeepStorageSegment)) {
+ String message = String.format("Crc mismatched between ZK and deepstore
copy of segment: %s. Expected crc "
+ + "from ZK: %s, crc from deepstore: %s", segmentName,
originalSegmentCrcFromTaskGenerator,
+ crcFromDeepStorageSegment);
+ LOGGER.error(message);
+ throw new IllegalStateException(message);
+ }
+ RoaringBitmap validDocIds =
+ MinionTaskUtils.getValidDocIdFromServerMatchingCrc(tableNameWithType,
segmentName, validDocIdsTypeStr,
+ MINION_CONTEXT, originalSegmentCrcFromTaskGenerator);
+ if (validDocIds == null) {
+ // no valid crc match found or no validDocIds obtained from all servers
+ // error out the task instead of silently failing so that we can track
it via task-error metrics
+ String message = String.format("No validDocIds found from all servers.
They either failed to download "
+ + "or did not match crc from segment copy obtained from
deepstore / servers. " + "Expected crc: %s",
+ originalSegmentCrcFromTaskGenerator);
+ LOGGER.error(message);
+ throw new IllegalStateException(message);
}
-
- RoaringBitmap validDocIds =
RoaringBitmapUtils.deserialize(validDocIdsBitmapResponse.getBitmap());
-
if (validDocIds.isEmpty()) {
// prevents empty segment generation
LOGGER.info("validDocIds is empty, skip the task. Table: {}, segment:
{}", tableNameWithType, segmentName);
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
index 0357bca6fe..64cbe03fe8 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
@@ -165,8 +165,8 @@ public class UpsertCompactionTaskGenerator extends
BaseTaskGenerator {
validDocIdsType));
}
- List<ValidDocIdsMetadataInfo> validDocIdsMetadataList =
-
serverSegmentMetadataReader.getValidDocIdsMetadataFromServer(tableNameWithType,
serverToSegments,
+ Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadataList =
+
serverSegmentMetadataReader.getSegmentToValidDocIdsMetadataFromServer(tableNameWithType,
serverToSegments,
serverToEndpoints, null, 60_000, validDocIdsType.toString(),
numSegmentsBatchPerServerRequest);
Map<String, SegmentZKMetadata> completedSegmentsMap =
@@ -209,7 +209,8 @@ public class UpsertCompactionTaskGenerator extends
BaseTaskGenerator {
@VisibleForTesting
public static SegmentSelectionResult processValidDocIdsMetadata(Map<String,
String> taskConfigs,
- Map<String, SegmentZKMetadata> completedSegmentsMap,
List<ValidDocIdsMetadataInfo> validDocIdsMetadataInfoList) {
+ Map<String, SegmentZKMetadata> completedSegmentsMap,
+ Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadataInfoMap) {
double invalidRecordsThresholdPercent = Double.parseDouble(
taskConfigs.getOrDefault(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_PERCENT,
String.valueOf(DEFAULT_INVALID_RECORDS_THRESHOLD_PERCENT)));
@@ -218,30 +219,31 @@ public class UpsertCompactionTaskGenerator extends
BaseTaskGenerator {
String.valueOf(DEFAULT_INVALID_RECORDS_THRESHOLD_COUNT)));
List<Pair<SegmentZKMetadata, Long>> segmentsForCompaction = new
ArrayList<>();
List<String> segmentsForDeletion = new ArrayList<>();
- for (ValidDocIdsMetadataInfo validDocIdsMetadata :
validDocIdsMetadataInfoList) {
- long totalInvalidDocs = validDocIdsMetadata.getTotalInvalidDocs();
- String segmentName = validDocIdsMetadata.getSegmentName();
-
- // Skip segments if the crc from zk metadata and server does not match.
They may be being reloaded.
- SegmentZKMetadata segment = completedSegmentsMap.get(segmentName);
- if (segment == null) {
+ for (String segmentName : validDocIdsMetadataInfoMap.keySet()) {
+ // check if segment is part of completed segments
+ if (!completedSegmentsMap.containsKey(segmentName)) {
LOGGER.warn("Segment {} is not found in the completed segments list,
skipping it for compaction", segmentName);
continue;
}
+ SegmentZKMetadata segment = completedSegmentsMap.get(segmentName);
+ for (ValidDocIdsMetadataInfo validDocIdsMetadata :
validDocIdsMetadataInfoMap.get(segmentName)) {
+ long totalInvalidDocs = validDocIdsMetadata.getTotalInvalidDocs();
- if (segment.getCrc() !=
Long.parseLong(validDocIdsMetadata.getSegmentCrc())) {
- LOGGER.warn(
- "CRC mismatch for segment: {}, skipping it for compaction
(segmentZKMetadata={}, validDocIdsMetadata={})",
- segmentName, segment.getCrc(),
validDocIdsMetadata.getSegmentCrc());
- continue;
- }
- long totalDocs = validDocIdsMetadata.getTotalDocs();
- double invalidRecordPercent = ((double) totalInvalidDocs / totalDocs) *
100;
- if (totalInvalidDocs == totalDocs) {
- segmentsForDeletion.add(segment.getSegmentName());
- } else if (invalidRecordPercent >= invalidRecordsThresholdPercent
- && totalInvalidDocs >= invalidRecordsThresholdCount) {
- segmentsForCompaction.add(Pair.of(segment, totalInvalidDocs));
+ // Skip segments if the crc from zk metadata and server does not
match. They may be being reloaded.
+ if (segment.getCrc() !=
Long.parseLong(validDocIdsMetadata.getSegmentCrc())) {
+ LOGGER.warn("CRC mismatch for segment: {}, (segmentZKMetadata={},
validDocIdsMetadata={})", segmentName,
+ segment.getCrc(), validDocIdsMetadata.getSegmentCrc());
+ continue;
+ }
+ long totalDocs = validDocIdsMetadata.getTotalDocs();
+ double invalidRecordPercent = ((double) totalInvalidDocs / totalDocs)
* 100;
+ if (totalInvalidDocs == totalDocs) {
+ segmentsForDeletion.add(segment.getSegmentName());
+ } else if (invalidRecordPercent >= invalidRecordsThresholdPercent
+ && totalInvalidDocs >= invalidRecordsThresholdCount) {
+ segmentsForCompaction.add(Pair.of(segment, totalInvalidDocs));
+ }
+ break;
}
}
segmentsForCompaction.sort((o1, o2) -> {
@@ -254,8 +256,7 @@ public class UpsertCompactionTaskGenerator extends
BaseTaskGenerator {
});
return new SegmentSelectionResult(
-
segmentsForCompaction.stream().map(Map.Entry::getKey).collect(Collectors.toList()),
- segmentsForDeletion);
+
segmentsForCompaction.stream().map(Map.Entry::getKey).collect(Collectors.toList()),
segmentsForDeletion);
}
@VisibleForTesting
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
index 971a288f82..ec8d8ea786 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
@@ -82,6 +82,7 @@ public class UpsertCompactionTaskGeneratorTest {
_completedSegment.setEndTime(System.currentTimeMillis() -
TimeUtils.convertPeriodToMillis("1d"));
_completedSegment.setTimeUnit(TimeUnit.MILLISECONDS);
_completedSegment.setTotalDocs(100L);
+ _completedSegment.setCrc(1000);
_completedSegment2 = new SegmentZKMetadata("testTable__1");
_completedSegment2.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
@@ -89,6 +90,7 @@ public class UpsertCompactionTaskGeneratorTest {
_completedSegment2.setEndTime(System.currentTimeMillis());
_completedSegment2.setTimeUnit(TimeUnit.MILLISECONDS);
_completedSegment2.setTotalDocs(10L);
+ _completedSegment2.setCrc(2000);
_completedSegmentsMap = new HashMap<>();
_completedSegmentsMap.put(_completedSegment.getSegmentName(),
_completedSegment);
@@ -231,24 +233,27 @@ public class UpsertCompactionTaskGeneratorTest {
public void testProcessValidDocIdsMetadata()
throws IOException {
Map<String, String> compactionConfigs = getCompactionConfigs("1", "10");
- String json = "[{" + "\"totalValidDocs\" : 50," + "\"totalInvalidDocs\" :
50," + "\"segmentName\" : \""
- + _completedSegment.getSegmentName() + "\"," + "\"totalDocs\" : 100" +
", \"segmentCrc\": \""
- + _completedSegment.getCrc() + "\"}," + "{" + "\"totalValidDocs\" :
0," + "\"totalInvalidDocs\" : 10,"
- + "\"segmentName\" : \"" + _completedSegment2.getSegmentName() + "\",
" + "\"segmentCrc\" : \""
- + _completedSegment2.getCrc() + "\"," + "\"totalDocs\" : 10" + "}]";
-
- List<ValidDocIdsMetadataInfo> validDocIdsMetadataInfo =
- JsonUtils.stringToObject(json, new
TypeReference<ArrayList<ValidDocIdsMetadataInfo>>() {
+ String json = "{\"testTable__0\": [{\"totalValidDocs\": 50,
\"totalInvalidDocs\": 50, "
+ + "\"segmentName\": \"testTable__0\", \"totalDocs\": 100,
\"segmentCrc\": \"1000\"}], "
+ + "\"testTable__1\": [{\"totalValidDocs\": 0, "
+ + "\"totalInvalidDocs\": 10, \"segmentName\": \"testTable__1\",
\"totalDocs\": 10, \"segmentCrc\": \"2000\"}]}";
+
+ Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadataInfo =
+ JsonUtils.stringToObject(json, new TypeReference<>() {
});
+ // no completed segments scenario, there shouldn't be any segment selected
for compaction
UpsertCompactionTaskGenerator.SegmentSelectionResult
segmentSelectionResult =
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, new
HashMap<>(),
validDocIdsMetadataInfo);
assertEquals(segmentSelectionResult.getSegmentsForCompaction().size(), 0);
+ // test with valid crc and thresholds
segmentSelectionResult =
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs,
_completedSegmentsMap,
validDocIdsMetadataInfo);
+ assertEquals(segmentSelectionResult.getSegmentsForCompaction().size(), 1);
+ assertEquals(segmentSelectionResult.getSegmentsForDeletion().size(), 1);
assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(),
_completedSegment.getSegmentName());
assertEquals(segmentSelectionResult.getSegmentsForDeletion().get(0),
_completedSegment2.getSegmentName());
@@ -259,6 +264,7 @@ public class UpsertCompactionTaskGeneratorTest {
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs,
_completedSegmentsMap,
validDocIdsMetadataInfo);
assertTrue(segmentSelectionResult.getSegmentsForCompaction().isEmpty());
+ assertEquals(segmentSelectionResult.getSegmentsForDeletion().size(), 1);
assertEquals(segmentSelectionResult.getSegmentsForDeletion().get(0),
_completedSegment2.getSegmentName());
// test without an invalidRecordsThresholdPercent
@@ -266,6 +272,8 @@ public class UpsertCompactionTaskGeneratorTest {
segmentSelectionResult =
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs,
_completedSegmentsMap,
validDocIdsMetadataInfo);
+ assertEquals(segmentSelectionResult.getSegmentsForDeletion().size(), 1);
+ assertEquals(segmentSelectionResult.getSegmentsForCompaction().size(), 1);
assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(),
_completedSegment.getSegmentName());
assertEquals(segmentSelectionResult.getSegmentsForDeletion().get(0),
_completedSegment2.getSegmentName());
@@ -275,18 +283,19 @@ public class UpsertCompactionTaskGeneratorTest {
segmentSelectionResult =
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs,
_completedSegmentsMap,
validDocIdsMetadataInfo);
+ assertEquals(segmentSelectionResult.getSegmentsForDeletion().size(), 1);
+ assertEquals(segmentSelectionResult.getSegmentsForCompaction().size(), 1);
assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(),
_completedSegment.getSegmentName());
assertEquals(segmentSelectionResult.getSegmentsForDeletion().get(0),
_completedSegment2.getSegmentName());
// Test the case where the completedSegment from api has different crc
than segment from zk metadata.
- json = "[{" + "\"totalValidDocs\" : 50," + "\"totalInvalidDocs\" : 50," +
"\"segmentName\" : \""
- + _completedSegment.getSegmentName() + "\"," + "\"totalDocs\" : 100" +
", \"segmentCrc\": \""
- + "1234567890" + "\"}," + "{" + "\"totalValidDocs\" : 0," +
"\"totalInvalidDocs\" : 10,"
- + "\"segmentName\" : \"" + _completedSegment2.getSegmentName() + "\",
" + "\"segmentCrc\" : \""
- + _completedSegment2.getCrc() + "\","
- + "\"totalDocs\" : 10" + "}]";
- validDocIdsMetadataInfo = JsonUtils.stringToObject(json, new
TypeReference<ArrayList<ValidDocIdsMetadataInfo>>() {
+ json = "{\"" + _completedSegment.getSegmentName() + "\":
[{\"totalValidDocs\": 50, \"totalInvalidDocs\": 50, "
+ + "\"segmentName\": \"" + _completedSegment.getSegmentName() + "\",
\"totalDocs\": 100, \"segmentCrc\": "
+ + "\"1234567890\"}], \"" + _completedSegment2.getSegmentName() + "\":
[{\"totalValidDocs\": 0, "
+ + "\"totalInvalidDocs\": 10, \"segmentName\": \"" +
_completedSegment2.getSegmentName() + "\", "
+ + "\"segmentCrc\": \"" + _completedSegment2.getCrc() + "\",
\"totalDocs\": 10}]}";
+ validDocIdsMetadataInfo = JsonUtils.stringToObject(json, new
TypeReference<>() {
});
segmentSelectionResult =
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs,
_completedSegmentsMap,
@@ -301,12 +310,13 @@ public class UpsertCompactionTaskGeneratorTest {
_completedSegment2.getSegmentName());
// check if both the candidates for compaction are coming in sorted
descending order
- json = "[{" + "\"totalValidDocs\" : 50," + "\"totalInvalidDocs\" : 50," +
"\"segmentName\" : \""
- + _completedSegment.getSegmentName() + "\"," + "\"totalDocs\" : 100" +
", \"segmentCrc\": \""
- + _completedSegment.getCrc() + "\"}," + "{" + "\"totalValidDocs\" :
10," + "\"totalInvalidDocs\" : 40,"
- + "\"segmentName\" : \"" + _completedSegment2.getSegmentName() + "\",
" + "\"segmentCrc\" : \""
- + _completedSegment2.getCrc() + "\"," + "\"totalDocs\" : 50" + "}]";
- validDocIdsMetadataInfo = JsonUtils.stringToObject(json, new
TypeReference<ArrayList<ValidDocIdsMetadataInfo>>() {
+ json = "{\"" + _completedSegment.getSegmentName() + "\":
[{\"totalValidDocs\": 50, \"totalInvalidDocs\": 50, "
+ + "\"segmentName\": \"" + _completedSegment.getSegmentName() + "\",
\"totalDocs\": 100, \"segmentCrc\": \""
+ + _completedSegment.getCrc() + "\"}], \"" +
_completedSegment2.getSegmentName() + "\": "
+ + "[{\"totalValidDocs\": 10, \"totalInvalidDocs\": 40,
\"segmentName\": \""
+ + _completedSegment2.getSegmentName() + "\", \"segmentCrc\": \"" +
_completedSegment2.getCrc() + "\", "
+ + "\"totalDocs\": 50}]}";
+ validDocIdsMetadataInfo = JsonUtils.stringToObject(json, new
TypeReference<>() {
});
compactionConfigs = getCompactionConfigs("30", "0");
segmentSelectionResult =
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]