This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 2efd5cac01 return the current rebalance result if already done (#13488)
2efd5cac01 is described below
commit 2efd5cac01d5606f77c16163c0d6962fb32f63e3
Author: Johan Adami <[email protected]>
AuthorDate: Mon Jul 8 14:49:56 2024 -0400
return the current rebalance result if already done (#13488)
---
.../api/resources/PinotTableRestletResource.java | 36 +++++++++++++++++-----
1 file changed, 29 insertions(+), 7 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index e6d2f5b49a..dbe67fe673 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -47,6 +47,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.inject.Inject;
@@ -677,14 +678,31 @@ public class PinotTableRestletResource {
if (dryRunResult.getStatus() == RebalanceResult.Status.DONE) {
// If dry-run succeeded, run rebalance asynchronously
rebalanceConfig.setDryRun(false);
- _executorService.submit(() -> {
+ Future<RebalanceResult> rebalanceResultFuture =
_executorService.submit(() -> {
try {
- _pinotHelixResourceManager.rebalanceTable(tableNameWithType,
rebalanceConfig, rebalanceJobId, true);
+ return _pinotHelixResourceManager.rebalanceTable(
+ tableNameWithType, rebalanceConfig, rebalanceJobId, true);
} catch (Throwable t) {
- LOGGER.error("Caught exception/error while rebalancing table:
{}", tableNameWithType, t);
+ String errorMsg = String.format("Caught exception/error while
rebalancing table: %s", tableNameWithType);
+ LOGGER.error(errorMsg, t);
+ return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.FAILED, errorMsg, null, null, null);
}
});
- waitForJobIdToPersist(dryRunResult.getJobId(), tableNameWithType);
+ boolean isJobIdPersisted = waitForRebalanceToPersist(
+ dryRunResult.getJobId(), tableNameWithType,
rebalanceResultFuture);
+
+ if (rebalanceResultFuture.isDone()) {
+ try {
+ return rebalanceResultFuture.get();
+ } catch (Throwable t) {
+ if (!isJobIdPersisted) {
+ // If the jobId is not persisted, we return the exception to
indicate the rebalance failed.
+ // Otherwise, polling the job id return NOT_FOUND indefinitely.
+ throw new ControllerApplicationException(LOGGER,
t.getMessage(), Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ }
+ }
+
return new RebalanceResult(dryRunResult.getJobId(),
RebalanceResult.Status.IN_PROGRESS,
"In progress, check controller logs for updates",
dryRunResult.getInstanceAssignment(),
dryRunResult.getTierInstanceAssignment(),
dryRunResult.getSegmentAssignment());
@@ -699,17 +717,21 @@ public class PinotTableRestletResource {
}
/**
- * Waits for jobId to be persisted using a retry policy.
+ * Waits for jobId to be persisted or the rebalance to complete using a
retry policy.
* Tables with 100k+ segments take up to a few seconds for the jobId to
persist. This ensures the jobId is present
* before returning the jobId to the caller, so they can correctly poll the
jobId.
*/
- public void waitForJobIdToPersist(String jobId, String tableNameWithType) {
+ public boolean waitForRebalanceToPersist(
+ String jobId, String tableNameWithType, Future<RebalanceResult>
rebalanceResultFuture) {
try {
// This retry policy waits at most for 7.5s to 15s in total. This is
chosen to cover typical delays for tables
// with many segments and avoid excessive HTTP request timeouts.
- RetryPolicies.exponentialBackoffRetryPolicy(5, 500L, 2.0).attempt(() ->
getControllerJobMetadata(jobId) != null);
+ RetryPolicies.exponentialBackoffRetryPolicy(5, 500L, 2.0).attempt(() ->
+ getControllerJobMetadata(jobId) != null ||
rebalanceResultFuture.isDone());
+ return true;
} catch (Exception e) {
LOGGER.warn("waiting for jobId not successful while rebalancing table:
{}", tableNameWithType);
+ return false;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]