This is an automated email from the ASF dual-hosted git repository.
xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 577830cbe0 allow to set targetInstance for reloadSegment (#14393)
577830cbe0 is described below
commit 577830cbe0a185219e3a2c3c5d22ac4d038877b1
Author: Xiaobing <[email protected]>
AuthorDate: Tue Nov 5 18:03:25 2024 -0800
allow to set targetInstance for reloadSegment (#14393)
---
.../api/resources/PinotSegmentRestletResource.java | 62 +++++++++++++---------
.../helix/core/PinotHelixResourceManager.java | 19 ++++---
2 files changed, 47 insertions(+), 34 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index 2f0d565590..9422fc64a6 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -296,8 +296,7 @@ public class PinotSegmentRestletResource {
} catch (Exception e) {
throw new ControllerApplicationException(LOGGER,
String.format("Exception while listing segment lineage: %s for
table: %s.", e.getMessage(),
- tableNameWithType),
- Status.INTERNAL_SERVER_ERROR, e);
+ tableNameWithType), Status.INTERNAL_SERVER_ERROR, e);
}
}
@@ -360,8 +359,8 @@ public class PinotSegmentRestletResource {
private JsonNode getExtraMetaData(String tableName, String segmentName,
List<String> columns) {
try {
- TableMetadataReader tableMetadataReader = new
TableMetadataReader(_executor,
- _connectionManager, _pinotHelixResourceManager);
+ TableMetadataReader tableMetadataReader =
+ new TableMetadataReader(_executor, _connectionManager,
_pinotHelixResourceManager);
return tableMetadataReader.getSegmentMetadata(tableName, segmentName,
columns,
_controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
} catch (InvalidConfigException e) {
@@ -390,19 +389,21 @@ public class PinotSegmentRestletResource {
@ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName,
@ApiParam(value = "Name of the segment", required = true)
@PathParam("segmentName") @Encoded String segmentName,
@ApiParam(value = "Whether to force server to download segment")
@QueryParam("forceDownload")
- @DefaultValue("false") boolean forceDownload, @Context HttpHeaders
headers) {
+ @DefaultValue("false") boolean forceDownload,
+ @ApiParam(value = "Name of the target instance to reload")
@QueryParam("targetInstance") @Nullable
+ String targetInstance, @Context HttpHeaders headers) {
tableName = DatabaseUtils.translateTableName(tableName, headers);
long startTimeMs = System.currentTimeMillis();
segmentName = URIUtils.decode(segmentName);
String tableNameWithType = getExistingTable(tableName, segmentName);
Pair<Integer, String> msgInfo =
- _pinotHelixResourceManager.reloadSegment(tableNameWithType,
segmentName, forceDownload);
+ _pinotHelixResourceManager.reloadSegment(tableNameWithType,
segmentName, forceDownload, targetInstance);
boolean zkJobMetaWriteSuccess = false;
- if (msgInfo.getLeft() > 0) {
+ int numReloadMsgSent = msgInfo.getLeft();
+ if (numReloadMsgSent > 0) {
try {
if
(_pinotHelixResourceManager.addNewReloadSegmentJob(tableNameWithType,
segmentName, msgInfo.getRight(),
- startTimeMs,
- msgInfo.getLeft())) {
+ startTimeMs, numReloadMsgSent)) {
zkJobMetaWriteSuccess = true;
} else {
LOGGER.error("Failed to add reload segment job meta into zookeeper
for table: {}, segment: {}",
@@ -414,11 +415,11 @@ public class PinotSegmentRestletResource {
}
return new SuccessResponse(
String.format("Submitted reload job id: %s, sent %d reload messages.
Job meta ZK storage status: %s",
- msgInfo.getRight(), msgInfo.getLeft(), zkJobMetaWriteSuccess ?
"SUCCESS" : "FAILED"));
- } else {
- throw new ControllerApplicationException(LOGGER,
- "Failed to find segment: " + segmentName + " in table: " +
tableName, Status.NOT_FOUND);
+ msgInfo.getRight(), numReloadMsgSent, zkJobMetaWriteSuccess ?
"SUCCESS" : "FAILED"));
}
+ throw new ControllerApplicationException(LOGGER,
+ String.format("Failed to find segment: %s in table: %s on %s",
segmentName, tableName,
+ targetInstance == null ? "every instance" : targetInstance),
Status.NOT_FOUND);
}
/**
@@ -522,15 +523,14 @@ public class PinotSegmentRestletResource {
public ServerReloadControllerJobStatusResponse getReloadJobStatus(
@ApiParam(value = "Reload job id", required = true) @PathParam("jobId")
String reloadJobId)
throws Exception {
- Map<String, String> controllerJobZKMetadata = _pinotHelixResourceManager.
- getControllerJobZKMetadata(reloadJobId,
ControllerJobType.RELOAD_SEGMENT);
+ Map<String, String> controllerJobZKMetadata =
+ _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId,
ControllerJobType.RELOAD_SEGMENT);
if (controllerJobZKMetadata == null) {
throw new ControllerApplicationException(LOGGER, "Failed to find
controller job id: " + reloadJobId,
Status.NOT_FOUND);
}
- String tableNameWithType =
-
controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE);
+ String tableNameWithType =
controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE);
Map<String, List<String>> serverToSegments;
String singleSegmentName =
@@ -571,7 +571,7 @@ public class PinotSegmentRestletResource {
serverReloadControllerJobStatusResponse.setSuccessCount(0);
int totalSegments = 0;
- for (Map.Entry<String, List<String>> entry: serverToSegments.entrySet()) {
+ for (Map.Entry<String, List<String>> entry : serverToSegments.entrySet()) {
totalSegments += entry.getValue().size();
}
serverReloadControllerJobStatusResponse.setTotalSegmentCount(totalSegments);
@@ -587,8 +587,7 @@ public class PinotSegmentRestletResource {
serverReloadControllerJobStatusResponse.getSuccessCount() +
response.getSuccessCount());
} catch (Exception e) {
serverReloadControllerJobStatusResponse.setTotalServerCallsFailed(
-
serverReloadControllerJobStatusResponse.getTotalServerCallsFailed() + 1
- );
+
serverReloadControllerJobStatusResponse.getTotalServerCallsFailed() + 1);
}
}
@@ -596,8 +595,7 @@ public class PinotSegmentRestletResource {
serverReloadControllerJobStatusResponse.setMetadata(controllerJobZKMetadata);
// Add derived fields
- long submissionTime =
-
Long.parseLong(controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS));
+ long submissionTime =
Long.parseLong(controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS));
double timeElapsedInMinutes = ((double) System.currentTimeMillis() -
(double) submissionTime) / (1000.0 * 60.0);
int remainingSegments =
serverReloadControllerJobStatusResponse.getTotalSegmentCount()
- serverReloadControllerJobStatusResponse.getSuccessCount();
@@ -625,7 +623,9 @@ public class PinotSegmentRestletResource {
@ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName,
@ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String
tableTypeStr,
@ApiParam(value = "Whether to force server to download segment")
@QueryParam("forceDownload")
- @DefaultValue("false") boolean forceDownload, @Context HttpHeaders
headers)
+ @DefaultValue("false") boolean forceDownload,
+ @ApiParam(value = "Name of the target instance to reload")
@QueryParam("targetInstance") @Nullable
+ String targetInstance, @Context HttpHeaders headers)
throws JsonProcessingException {
tableName = DatabaseUtils.translateTableName(tableName, headers);
long startTimeMs = System.currentTimeMillis();
@@ -644,15 +644,20 @@ public class PinotSegmentRestletResource {
LOGGER);
Map<String, Map<String, String>> perTableMsgData = new LinkedHashMap<>();
for (String tableNameWithType : tableNamesWithType) {
- Pair<Integer, String> msgInfo =
_pinotHelixResourceManager.reloadAllSegments(tableNameWithType, forceDownload);
+ Pair<Integer, String> msgInfo =
+ _pinotHelixResourceManager.reloadAllSegments(tableNameWithType,
forceDownload, targetInstance);
+ int numReloadMsgSent = msgInfo.getLeft();
+ if (numReloadMsgSent <= 0) {
+ continue;
+ }
Map<String, String> tableReloadMeta = new HashMap<>();
- tableReloadMeta.put("numMessagesSent",
String.valueOf(msgInfo.getLeft()));
+ tableReloadMeta.put("numMessagesSent", String.valueOf(numReloadMsgSent));
tableReloadMeta.put("reloadJobId", msgInfo.getRight());
perTableMsgData.put(tableNameWithType, tableReloadMeta);
// Store in ZK
try {
if
(_pinotHelixResourceManager.addNewReloadAllSegmentsJob(tableNameWithType,
msgInfo.getRight(), startTimeMs,
- msgInfo.getLeft())) {
+ numReloadMsgSent)) {
tableReloadMeta.put("reloadJobMetaZKStorageStatus", "SUCCESS");
} else {
tableReloadMeta.put("reloadJobMetaZKStorageStatus", "FAILED");
@@ -663,6 +668,11 @@ public class PinotSegmentRestletResource {
LOGGER.error("Failed to add reload all segments job meta into
zookeeper for table: {}", tableNameWithType, e);
}
}
+ if (perTableMsgData.isEmpty()) {
+ throw new ControllerApplicationException(LOGGER,
+ String.format("Failed to find any segments in table: %s on %s",
tableName,
+ targetInstance == null ? "every instance" : targetInstance),
Status.NOT_FOUND);
+ }
return new SuccessResponse(JsonUtils.objectToString(perTableMsgData));
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 048152fefe..099cf4b5e8 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1520,7 +1520,7 @@ public class PinotHelixResourceManager {
LOGGER.info("Reloading tables with name: {}", schemaName);
List<String> tableNamesWithType =
getExistingTableNamesWithType(schemaName, null);
for (String tableNameWithType : tableNamesWithType) {
- reloadAllSegments(tableNameWithType, false);
+ reloadAllSegments(tableNameWithType, false, null);
}
}
}
@@ -2605,8 +2605,10 @@ public class PinotHelixResourceManager {
sendSegmentRefreshMessage(tableNameWithType, segmentName, true, true);
}
- public Pair<Integer, String> reloadAllSegments(String tableNameWithType,
boolean forceDownload) {
- LOGGER.info("Sending reload message for table: {} with forceDownload: {}",
tableNameWithType, forceDownload);
+ public Pair<Integer, String> reloadAllSegments(String tableNameWithType,
boolean forceDownload,
+ @Nullable String targetInstance) {
+ LOGGER.info("Sending reload message for table: {} with forceDownload: {},
and target: {}", tableNameWithType,
+ forceDownload, targetInstance == null ? "every instance" :
targetInstance);
if (forceDownload) {
TableType tt =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
@@ -2617,7 +2619,7 @@ public class PinotHelixResourceManager {
Criteria recipientCriteria = new Criteria();
recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
- recipientCriteria.setInstanceName("%");
+ recipientCriteria.setInstanceName(targetInstance == null ? "%" :
targetInstance);
recipientCriteria.setResource(tableNameWithType);
recipientCriteria.setSessionSpecific(true);
SegmentReloadMessage segmentReloadMessage = new
SegmentReloadMessage(tableNameWithType, forceDownload);
@@ -2635,9 +2637,10 @@ public class PinotHelixResourceManager {
return Pair.of(numMessagesSent, segmentReloadMessage.getMsgId());
}
- public Pair<Integer, String> reloadSegment(String tableNameWithType, String
segmentName, boolean forceDownload) {
- LOGGER.info("Sending reload message for segment: {} in table: {} with
forceDownload: {}", segmentName,
- tableNameWithType, forceDownload);
+ public Pair<Integer, String> reloadSegment(String tableNameWithType, String
segmentName, boolean forceDownload,
+ @Nullable String targetInstance) {
+ LOGGER.info("Sending reload message for segment: {} in table: {} with
forceDownload: {}, and target: {}",
+ segmentName, tableNameWithType, forceDownload, targetInstance == null
? "every instance" : targetInstance);
if (forceDownload) {
TableType tt =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
@@ -2649,7 +2652,7 @@ public class PinotHelixResourceManager {
Criteria recipientCriteria = new Criteria();
recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
- recipientCriteria.setInstanceName("%");
+ recipientCriteria.setInstanceName(targetInstance == null ? "%" :
targetInstance);
recipientCriteria.setResource(tableNameWithType);
recipientCriteria.setPartition(segmentName);
recipientCriteria.setSessionSpecific(true);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]