This is an automated email from the ASF dual-hosted git repository.
xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 1d1d25dc0f Update getValidDocIdsMetadataFromServer to make call in
batches to servers and other bug fixes (#13314)
1d1d25dc0f is described below
commit 1d1d25dc0f1fc1abb73d9516414168c82b116b58
Author: Pratik Tibrewal <[email protected]>
AuthorDate: Thu Jun 6 23:25:59 2024 +0530
Update getValidDocIdsMetadataFromServer to make call in batches to servers
and other bug fixes (#13314)
* Update getValidDocIdsMetadataFromServer to make call in batches to server
---
.../api/resources/PinotTableRestletResource.java | 8 ++++--
.../controller/util/CompletionServiceHelper.java | 24 +++++++++++-----
.../util/ServerSegmentMetadataReader.java | 32 +++++++++++++---------
.../pinot/controller/util/TableMetadataReader.java | 4 +--
.../apache/pinot/core/common/MinionConstants.java | 5 ++++
.../UpsertCompactionTaskGenerator.java | 12 +++++---
.../pinot/server/api/resources/TablesResource.java | 8 ++++--
7 files changed, 62 insertions(+), 31 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index fafd42de93..59e748a08f 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -972,7 +972,10 @@ public class PinotTableRestletResource {
@ApiParam(value = "A list of segments", allowMultiple = true)
@QueryParam("segmentNames")
List<String> segmentNames,
@ApiParam(value = "Valid doc ids type") @QueryParam("validDocIdsType")
- @DefaultValue("SNAPSHOT") ValidDocIdsType validDocIdsType, @Context
HttpHeaders headers) {
+ @DefaultValue("SNAPSHOT") ValidDocIdsType validDocIdsType,
+ @ApiParam(value = "Number of segments in a batch per server request")
+ @QueryParam("serverRequestBatchSize") @DefaultValue("500") int
serverRequestBatchSize,
+ @Context HttpHeaders headers) {
tableName = DatabaseUtils.translateTableName(tableName, headers);
LOGGER.info("Received a request to fetch aggregate validDocIds metadata
for a table {}", tableName);
TableType tableType = Constants.validateTableType(tableTypeStr);
@@ -990,7 +993,8 @@ public class PinotTableRestletResource {
validDocIdsType = (validDocIdsType == null) ? ValidDocIdsType.SNAPSHOT :
validDocIdsType;
JsonNode segmentsMetadataJson =
tableMetadataReader.getAggregateValidDocIdsMetadata(tableNameWithType,
segmentNames,
- validDocIdsType.toString(),
_controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+ validDocIdsType.toString(),
_controllerConf.getServerAdminRequestTimeoutSeconds() * 1000,
+ serverRequestBatchSize);
validDocIdsMetadata =
JsonUtils.objectToPrettyString(segmentsMetadataJson);
} catch (InvalidConfigException e) {
throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.BAD_REQUEST);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/CompletionServiceHelper.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/CompletionServiceHelper.java
index 236242660a..6e9d891e5c 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/CompletionServiceHelper.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/CompletionServiceHelper.java
@@ -129,13 +129,20 @@ public class CompletionServiceHelper {
int statusCode =
multiHttpRequestResponse.getResponse().getStatusLine().getStatusCode();
if (statusCode >= 300) {
String reason =
multiHttpRequestResponse.getResponse().getStatusLine().getReasonPhrase();
- LOGGER.error("Server: {} returned error: {}, reason: {}", instance,
statusCode, reason);
+ LOGGER.error("Server: {} returned error: {}, reason: {} for uri:
{}", instance, statusCode, reason, uri);
completionServiceResponse._failedResponseCount++;
continue;
}
String responseString =
EntityUtils.toString(multiHttpRequestResponse.getResponse().getEntity());
- completionServiceResponse._httpResponses
- .put(multiRequestPerServer ? uri.toString() : instance,
responseString);
+ String key = multiRequestPerServer ? uri.toString() : instance;
+ // If there are multiple requests to the same server with the same URI
but different payloads,
+ // we append a count value to the key to ensure each response is
uniquely identified.
+ // Otherwise, the map will store only the last response, overwriting
previous ones.
+ if (multiRequestPerServer) {
+ int count =
completionServiceResponse._instanceToRequestCount.compute(key, (k, v) -> v ==
null ? 1 : v + 1);
+ key = key + "__" + count;
+ }
+ completionServiceResponse._httpResponses.put(key, responseString);
} catch (Exception e) {
String reason = useCase == null ? "" : String.format(" in '%s'",
useCase);
LOGGER.error("Connection error {}. Details: {}", reason,
e.getMessage());
@@ -151,10 +158,10 @@ public class CompletionServiceHelper {
}
}
- int numServersResponded = completionServiceResponse._httpResponses.size();
- if (numServersResponded != size) {
- LOGGER.warn("Finished reading information for table: {} with {}/{}
server responses", tableNameWithType,
- numServersResponded, size);
+ int numServerRequestsResponded =
completionServiceResponse._httpResponses.size();
+ if (numServerRequestsResponded != size) {
+ LOGGER.warn("Finished reading information for table: {} with {}/{}
server-request responses", tableNameWithType,
+ numServerRequestsResponded, size);
} else {
LOGGER.info("Finished reading information for table: {}",
tableNameWithType);
}
@@ -180,10 +187,13 @@ public class CompletionServiceHelper {
public Map<String, String> _httpResponses;
// Number of failures encountered when requesting
public int _failedResponseCount;
+ // Map of instance to count of requests
+ public Map<String, Integer> _instanceToRequestCount;
public CompletionServiceResponse() {
_httpResponses = new HashMap<>();
_failedResponseCount = 0;
+ _instanceToRequestCount = new HashMap<>();
}
}
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
index 320649c0d1..dcca712af4 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Preconditions;
import com.google.common.collect.BiMap;
+import com.google.common.collect.Lists;
import java.io.IOException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
@@ -220,7 +221,8 @@ public class ServerSegmentMetadataReader {
*/
public List<ValidDocIdsMetadataInfo> getValidDocIdsMetadataFromServer(String
tableNameWithType,
Map<String, List<String>> serverToSegmentsMap, BiMap<String, String>
serverToEndpoints,
- @Nullable List<String> segmentNames, int timeoutMs, String
validDocIdsType) {
+ @Nullable List<String> segmentNames, int timeoutMs, String
validDocIdsType,
+ int numSegmentsBatchPerServerRequest) {
List<Pair<String, String>> serverURLsAndBodies = new ArrayList<>();
for (Map.Entry<String, List<String>> serverToSegments :
serverToSegmentsMap.entrySet()) {
List<String> segmentsForServer = serverToSegments.getValue();
@@ -235,8 +237,12 @@ public class ServerSegmentMetadataReader {
}
}
}
-
serverURLsAndBodies.add(generateValidDocIdsMetadataURL(tableNameWithType,
segmentsToQuery, validDocIdsType,
- serverToEndpoints.get(serverToSegments.getKey())));
+
+ // Number of segments to query per server request. If a table has a lot
of segments, then we might send a
+ // huge payload to pinot-server in request. Batching the requests will
help in reducing the payload size.
+ Lists.partition(segmentsToQuery,
numSegmentsBatchPerServerRequest).forEach(segmentsToQueryBatch ->
+
serverURLsAndBodies.add(generateValidDocIdsMetadataURL(tableNameWithType,
segmentsToQueryBatch,
+ validDocIdsType,
serverToEndpoints.get(serverToSegments.getKey()))));
}
BiMap<String, String> endpointsToServers = serverToEndpoints.inverse();
@@ -247,12 +253,12 @@ public class ServerSegmentMetadataReader {
Map<String, String> requestHeaders = Map.of("Content-Type",
"application/json");
CompletionServiceHelper.CompletionServiceResponse serviceResponse =
- completionServiceHelper.doMultiPostRequest(serverURLsAndBodies,
tableNameWithType, false, requestHeaders,
+ completionServiceHelper.doMultiPostRequest(serverURLsAndBodies,
tableNameWithType, true, requestHeaders,
timeoutMs, null);
Map<String, ValidDocIdsMetadataInfo> validDocIdsMetadataInfos = new
HashMap<>();
int failedParses = 0;
- int returnedServersCount = 0;
+ int returnedServerRequestsCount = 0;
for (Map.Entry<String, String> streamResponse :
serviceResponse._httpResponses.entrySet()) {
try {
String validDocIdsMetadataList = streamResponse.getValue();
@@ -262,21 +268,21 @@ public class ServerSegmentMetadataReader {
for (ValidDocIdsMetadataInfo validDocIdsMetadataInfo:
validDocIdsMetadataInfoList) {
validDocIdsMetadataInfos.put(validDocIdsMetadataInfo.getSegmentName(),
validDocIdsMetadataInfo);
}
- returnedServersCount++;
+ returnedServerRequestsCount++;
} catch (Exception e) {
failedParses++;
- LOGGER.error("Unable to parse server {} response due to an error: ",
streamResponse.getKey(), e);
+ LOGGER.error("Unable to parse {} server-request response due to an
error: ", streamResponse.getKey(), e);
}
}
if (failedParses != 0) {
- LOGGER.error("Unable to parse server {} / {} response due to an error:
", failedParses,
+ LOGGER.error("Unable to parse {} / {} server-request responses due to an
error: ", failedParses,
serverURLsAndBodies.size());
}
- if (returnedServersCount != serverURLsAndBodies.size()) {
- LOGGER.error("Unable to get validDocIdsMetadata from all servers.
Expected: {}, Actual: {}",
- serverURLsAndBodies.size(), returnedServersCount);
+ if (returnedServerRequestsCount != serverURLsAndBodies.size()) {
+ LOGGER.error("Unable to get validDocIdsMetadata from all server
requests. Expected: {}, Actual: {}",
+ serverURLsAndBodies.size(), returnedServerRequestsCount);
}
if (segmentNames != null && !segmentNames.isEmpty() && segmentNames.size()
!= validDocIdsMetadataInfos.size()) {
@@ -284,8 +290,8 @@ public class ServerSegmentMetadataReader {
segmentNames.size(), validDocIdsMetadataInfos.size());
}
- LOGGER.info("Retrieved validDocIds metadata for {} segments from {}
servers.", validDocIdsMetadataInfos.size(),
- returnedServersCount);
+ LOGGER.info("Retrieved validDocIds metadata for {} segments from {} server
requests.",
+ validDocIdsMetadataInfos.size(), returnedServerRequestsCount);
return new ArrayList<>(validDocIdsMetadataInfos.values());
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
index 389c8d2e94..cc80291278 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
@@ -159,7 +159,7 @@ public class TableMetadataReader {
* @return a list of ValidDocIdsMetadataInfo
*/
public JsonNode getAggregateValidDocIdsMetadata(String tableNameWithType,
List<String> segmentNames,
- String validDocIdsType, int timeoutMs)
+ String validDocIdsType, int timeoutMs, int
numSegmentsBatchPerServerRequest)
throws InvalidConfigException {
final Map<String, List<String>> serverToSegments =
_pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
@@ -170,7 +170,7 @@ public class TableMetadataReader {
List<ValidDocIdsMetadataInfo> aggregateTableMetadataInfo =
serverSegmentMetadataReader.getValidDocIdsMetadataFromServer(tableNameWithType,
serverToSegments, endpoints,
- segmentNames, timeoutMs, validDocIdsType);
+ segmentNames, timeoutMs, validDocIdsType,
numSegmentsBatchPerServerRequest);
return JsonUtils.objectToJsonNode(aggregateTableMetadataInfo);
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index 02f00046b9..9b1b89b4b7 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -169,5 +169,10 @@ public class MinionConstants {
* Valid doc ids type
*/
public static final String VALID_DOC_IDS_TYPE = "validDocIdsType";
+
+ /**
+ * number of segments to query in one batch to fetch valid doc id
metadata, by default 500
+ */
+ public static final String NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST =
"numSegmentsBatchPerServerRequest";
}
}
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
index 0d22486e79..f63836f19b 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
@@ -58,6 +58,7 @@ public class UpsertCompactionTaskGenerator extends
BaseTaskGenerator {
private static final String DEFAULT_BUFFER_PERIOD = "7d";
private static final double DEFAULT_INVALID_RECORDS_THRESHOLD_PERCENT = 0.0;
private static final long DEFAULT_INVALID_RECORDS_THRESHOLD_COUNT = 0;
+ private static final int DEFAULT_NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST = 500;
public static class SegmentSelectionResult {
@@ -137,15 +138,18 @@ public class UpsertCompactionTaskGenerator extends
BaseTaskGenerator {
new ServerSegmentMetadataReader(_clusterInfoAccessor.getExecutor(),
_clusterInfoAccessor.getConnectionManager());
- // TODO: currently, we put segmentNames=null to get metadata for all
segments. We can change this to get
- // valid doc id metadata in batches with the loop.
-
// By default, we use 'snapshot' for validDocIdsType. This means that we
will use the validDocIds bitmap from
// the snapshot from Pinot segment. This will require 'enableSnapshot'
from UpsertConfig to be set to true.
String validDocIdsTypeStr =
taskConfigs.getOrDefault(UpsertCompactionTask.VALID_DOC_IDS_TYPE,
ValidDocIdsType.SNAPSHOT.toString());
ValidDocIdsType validDocIdsType =
ValidDocIdsType.valueOf(validDocIdsTypeStr.toUpperCase());
+ // Number of segments to query per server request. If a table has a lot
of segments, then we might send a
+ // huge payload to pinot-server in request. Batching the requests will
help in reducing the payload size.
+ int numSegmentsBatchPerServerRequest =
+
Integer.parseInt(taskConfigs.getOrDefault(UpsertCompactionTask.NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST,
+ String.valueOf(DEFAULT_NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST)));
+
// Validate that the snapshot is enabled if validDocIdsType is
validDocIdsSnapshot
if (validDocIdsType == ValidDocIdsType.SNAPSHOT) {
UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
@@ -163,7 +167,7 @@ public class UpsertCompactionTaskGenerator extends
BaseTaskGenerator {
List<ValidDocIdsMetadataInfo> validDocIdsMetadataList =
serverSegmentMetadataReader.getValidDocIdsMetadataFromServer(tableNameWithType,
serverToSegments,
- serverToEndpoints, null, 60_000, validDocIdsType.toString());
+ serverToEndpoints, null, 60_000, validDocIdsType.toString(),
numSegmentsBatchPerServerRequest);
Map<String, SegmentZKMetadata> completedSegmentsMap =
completedSegments.stream().collect(Collectors.toMap(SegmentZKMetadata::getSegmentName,
Function.identity()));
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index 063608235a..991f607199 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -657,9 +657,11 @@ public class TablesResource {
}
try {
if (!missingSegments.isEmpty()) {
- throw new WebApplicationException(
- String.format("Table %s has missing segments: %s)",
tableNameWithType, segments),
- Response.Status.NOT_FOUND);
+ // we need not abort here or throw exception as we can still process
the segments that are available
+ // During UpsertCompactionTaskGenerator, controller sends a lot of
segments to server to fetch validDocIds
+ // and it may happen that a segment is deleted concurrently. In such
cases, we should log a warning and
+ // process the remaining available segments.
+ LOGGER.warn("Table {} has missing segments {}", tableNameWithType,
missingSegments);
}
List<Map<String, Object>> allValidDocIdsMetadata = new
ArrayList<>(segmentDataManagers.size());
for (SegmentDataManager segmentDataManager : segmentDataManagers) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]