This is an automated email from the ASF dual-hosted git repository.
xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 0cf091a7ef Add support to retry failed table rebalance (#11740)
0cf091a7ef is described below
commit 0cf091a7ef822470fd2c0195a926b35e8cddcf72
Author: Xiaobing <[email protected]>
AuthorDate: Wed Oct 18 14:33:07 2023 -0700
Add support to retry failed table rebalance (#11740)
* add support to retry failed table rebalance
* add exponential retry backoff and more metrics
* add CANCELLED job status and cancelRebalance restful API for users to
cancel rebalance
---
.../configs/controller.yml | 6 +
.../etc/jmx_prometheus_javaagent/configs/pinot.yml | 6 +
.../pinot/common/metrics/ControllerMeter.java | 3 +
.../pinot/common/metrics/ControllerTimer.java | 1 +
.../pinot/controller/BaseControllerStarter.java | 5 +
.../apache/pinot/controller/ControllerConf.java | 22 +-
.../api/resources/PinotTableRestletResource.java | 126 +++++--
.../ServerRebalanceJobStatusResponse.java | 11 +
.../helix/core/PinotHelixResourceManager.java | 147 +++++---
.../core/rebalance/NoOpTableRebalanceObserver.java | 10 +
.../helix/core/rebalance/RebalanceChecker.java | 331 ++++++++++++++++++
.../helix/core/rebalance/RebalanceConfig.java | 66 ++++
.../core/rebalance/RebalanceJobConstants.java | 2 +
.../helix/core/rebalance/RebalanceResult.java | 6 +-
.../core/rebalance/TableRebalanceContext.java | 94 +++++
.../core/rebalance/TableRebalanceObserver.java | 4 +
.../rebalance/TableRebalanceProgressStats.java | 6 +-
.../helix/core/rebalance/TableRebalancer.java | 35 ++
.../rebalance/ZkBasedTableRebalanceObserver.java | 123 +++++--
.../tenant/ZkBasedTenantRebalanceObserver.java | 4 +-
...ControllerPeriodicTaskStarterStatelessTest.java | 2 +-
.../helix/core/rebalance/RebalanceCheckerTest.java | 388 +++++++++++++++++++++
.../TestZkBasedTableRebalanceObserver.java | 6 +-
.../java/org/apache/pinot/core/auth/Actions.java | 1 +
...PartialUpsertTableRebalanceIntegrationTest.java | 5 +-
25 files changed, 1284 insertions(+), 126 deletions(-)
diff --git
a/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/controller.yml
b/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/controller.yml
index cfd4f9af31..c4071887ed 100644
--- a/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/controller.yml
+++ b/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/controller.yml
@@ -106,6 +106,12 @@ rules:
labels:
table: "$1"
taskType: "$2"
+- pattern: "\"org.apache.pinot.common.metrics\"<type=\"ControllerMetrics\",
name=\"pinot.controller.([^\\.]*?)\\.(\\w+).tableRebalanceExecutionTimeMs\"><>(\\w+)"
+ name: "pinot_controller_tableRebalanceExecutionTimeMs_$3"
+ cache: true
+ labels:
+ table: "$1"
+ result: "$2"
- pattern: "\"org.apache.pinot.common.metrics\"<type=\"ControllerMetrics\",
name=\"pinot.controller.taskStatus.([^\\.]*?)\\.(\\w+)\"><>(\\w+)"
name: "pinot_controller_taskStatus_$3"
cache: true
diff --git a/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/pinot.yml
b/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/pinot.yml
index 6660ee42d7..1c435be787 100644
--- a/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/pinot.yml
+++ b/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/pinot.yml
@@ -106,6 +106,12 @@ rules:
labels:
table: "$1"
taskType: "$2"
+- pattern: "\"org.apache.pinot.common.metrics\"<type=\"ControllerMetrics\",
name=\"pinot.controller.([^\\.]*?)\\.(\\w+).tableRebalanceExecutionTimeMs\"><>(\\w+)"
+ name: "pinot_controller_tableRebalanceExecutionTimeMs_$3"
+ cache: true
+ labels:
+ table: "$1"
+ result: "$2"
- pattern: "\"org.apache.pinot.common.metrics\"<type=\"ControllerMetrics\",
name=\"pinot.controller.taskStatus.(\\w+)\\.(\\w+)\"><>(\\w+)"
name: "pinot_controller_taskStatus_$3"
cache: true
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
index ac933acf6a..2e84f8b510 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
@@ -61,6 +61,9 @@ public enum ControllerMeter implements AbstractMetrics.Meter {
LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_ERROR("LLCSegmentDeepStoreUploadRetryError",
false),
SEGMENT_MISSING_DEEP_STORE_LINK("RealtimeSegmentMissingDeepStoreLink",
false),
DELETED_TMP_SEGMENT_COUNT("DeletedTmpSegmentCount", false),
+ TABLE_REBALANCE_FAILURE_DETECTED("TableRebalanceFailureDetected", false),
+ TABLE_REBALANCE_RETRY("TableRebalanceRetry", false),
+ TABLE_REBALANCE_RETRY_TOO_MANY_TIMES("TableRebalanceRetryTooManyTimes",
false),
NUMBER_ADHOC_TASKS_SUBMITTED("adhocTasks", false);
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerTimer.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerTimer.java
index 6627382db6..64608d0037 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerTimer.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerTimer.java
@@ -26,6 +26,7 @@ import org.apache.pinot.common.Utils;
*
*/
public enum ControllerTimer implements AbstractMetrics.Timer {
+ TABLE_REBALANCE_EXECUTION_TIME_MS("tableRebalanceExecutionTimeMs", false),
CRON_SCHEDULER_JOB_EXECUTION_TIME_MS("cronSchedulerJobExecutionTimeMs",
false);
private final String _timerName;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 185b1d506e..2093aab0b4 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -93,6 +93,7 @@ import
org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import org.apache.pinot.controller.helix.core.minion.TaskMetricsEmitter;
import
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import
org.apache.pinot.controller.helix.core.realtime.SegmentCompletionManager;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceChecker;
import
org.apache.pinot.controller.helix.core.rebalance.tenant.DefaultTenantRebalancer;
import
org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalancer;
import org.apache.pinot.controller.helix.core.relocation.SegmentRelocator;
@@ -174,6 +175,7 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
protected SegmentRelocator _segmentRelocator;
protected RetentionManager _retentionManager;
protected SegmentStatusChecker _segmentStatusChecker;
+ protected RebalanceChecker _rebalanceChecker;
protected RealtimeConsumerMonitor _realtimeConsumerMonitor;
protected PinotTaskManager _taskManager;
protected TaskManagerStatusCache<TaskGeneratorMostRecentRunInfo>
_taskManagerStatusCache;
@@ -818,6 +820,9 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
new SegmentStatusChecker(_helixResourceManager,
_leadControllerManager, _config, _controllerMetrics,
_executorService);
periodicTasks.add(_segmentStatusChecker);
+ _rebalanceChecker = new RebalanceChecker(_helixResourceManager,
_leadControllerManager, _config, _controllerMetrics,
+ _executorService);
+ periodicTasks.add(_rebalanceChecker);
_realtimeConsumerMonitor =
new RealtimeConsumerMonitor(_config, _helixResourceManager,
_leadControllerManager, _controllerMetrics,
_executorService);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index cb6ee99c4f..b8e2247b08 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -178,9 +178,9 @@ public class ControllerConf extends PinotConfiguration {
public static final String
DEPRECATED_SEGMENT_RELOCATOR_FREQUENCY_IN_SECONDS =
"controller.segment.relocator.frequencyInSeconds";
public static final String SEGMENT_RELOCATOR_FREQUENCY_PERIOD =
"controller.segment.relocator.frequencyPeriod";
+ public static final String REBALANCE_CHECKER_FREQUENCY_PERIOD =
"controller.rebalance.checker.frequencyPeriod";
// Because segment level validation is expensive and requires heavy ZK
access, we run segment level validation
- // with a
- // separate interval
+ // with a separate interval
// Deprecated as of 0.8.0
@Deprecated
public static final String
DEPRECATED_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS =
@@ -199,6 +199,8 @@ public class ControllerConf extends PinotConfiguration {
// RealtimeSegmentRelocator has been rebranded as SegmentRelocator
public static final String
DEPRECATED_REALTIME_SEGMENT_RELOCATION_INITIAL_DELAY_IN_SECONDS =
"controller.realtimeSegmentRelocation.initialDelayInSeconds";
+ public static final String REBALANCE_CHECKER_INITIAL_DELAY_IN_SECONDS =
+ "controller.rebalanceChecker.initialDelayInSeconds";
public static final String SEGMENT_RELOCATOR_INITIAL_DELAY_IN_SECONDS =
"controller.segmentRelocator.initialDelayInSeconds";
public static final String SEGMENT_RELOCATOR_ENABLE_LOCAL_TIER_MIGRATION =
@@ -237,7 +239,8 @@ public class ControllerConf extends PinotConfiguration {
private static final int
DEFAULT_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS = 24 * 60 * 60;
// 24 Hours.
private static final int
DEFAULT_REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
private static final int
DEFAULT_BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
- private static final int DEFAULT_STATUS_CONTROLLER_FREQUENCY_IN_SECONDS =
5 * 60; // 5 minutes
+ private static final int DEFAULT_STATUS_CHECKER_FREQUENCY_IN_SECONDS = 5 *
60; // 5 minutes
+ private static final int DEFAULT_REBALANCE_CHECKER_FREQUENCY_IN_SECONDS =
5 * 60; // 5 minutes
private static final int DEFAULT_TASK_METRICS_EMITTER_FREQUENCY_IN_SECONDS
= 5 * 60; // 5 minutes
private static final int
DEFAULT_STATUS_CONTROLLER_WAIT_FOR_PUSH_TIME_IN_SECONDS = 10 * 60; // 10 minutes
private static final int DEFAULT_TASK_MANAGER_FREQUENCY_IN_SECONDS = -1;
// Disabled
@@ -604,7 +607,7 @@ public class ControllerConf extends PinotConfiguration {
return
Optional.ofNullable(getProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_FREQUENCY_PERIOD))
.map(period -> (int) convertPeriodToSeconds(period)).orElseGet(
() ->
getProperty(ControllerPeriodicTasksConf.DEPRECATED_STATUS_CHECKER_FREQUENCY_IN_SECONDS,
-
ControllerPeriodicTasksConf.DEFAULT_STATUS_CONTROLLER_FREQUENCY_IN_SECONDS));
+
ControllerPeriodicTasksConf.DEFAULT_STATUS_CHECKER_FREQUENCY_IN_SECONDS));
}
public void setStatusCheckerFrequencyInSeconds(int
statusCheckerFrequencyInSeconds) {
@@ -612,6 +615,17 @@ public class ControllerConf extends PinotConfiguration {
Integer.toString(statusCheckerFrequencyInSeconds));
}
+ public int getRebalanceCheckerFrequencyInSeconds() {
+ return
Optional.ofNullable(getProperty(ControllerPeriodicTasksConf.REBALANCE_CHECKER_FREQUENCY_PERIOD))
+ .map(period -> (int) convertPeriodToSeconds(period)).orElse(
+
ControllerPeriodicTasksConf.DEFAULT_REBALANCE_CHECKER_FREQUENCY_IN_SECONDS);
+ }
+
+ public long getRebalanceCheckerInitialDelayInSeconds() {
+ return
getProperty(ControllerPeriodicTasksConf.REBALANCE_CHECKER_INITIAL_DELAY_IN_SECONDS,
+ ControllerPeriodicTasksConf.getRandomInitialDelayInSeconds());
+ }
+
public int getRealtimeConsumerMonitorRunFrequency() {
return
Optional.ofNullable(getProperty(ControllerPeriodicTasksConf.RT_CONSUMER_MONITOR_FREQUENCY_PERIOD))
.map(period -> (int) convertPeriodToSeconds(period)).orElse(
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index 58abb6c366..f0216da013 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -95,6 +95,7 @@ import
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManag
import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
+import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceContext;
import
org.apache.pinot.controller.helix.core.rebalance.TableRebalanceProgressStats;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
import org.apache.pinot.controller.recommender.RecommenderDriver;
@@ -615,11 +616,16 @@ public class PinotTableRestletResource {
@QueryParam("externalViewCheckIntervalInMs") long
externalViewCheckIntervalInMs,
@ApiParam(value = "How long to wait till external view converges with
ideal states") @DefaultValue("3600000")
@QueryParam("externalViewStabilizationTimeoutInMs") long
externalViewStabilizationTimeoutInMs,
+ @ApiParam(value = "How often to make a status update (i.e. heartbeat)")
@DefaultValue("300000")
+ @QueryParam("heartbeatIntervalInMs") long heartbeatIntervalInMs,
+ @ApiParam(value = "How long to wait for next status update (i.e.
heartbeat) before the job is considered failed")
+ @DefaultValue("3600000") @QueryParam("heartbeatTimeoutInMs") long
heartbeatTimeoutInMs,
+ @ApiParam(value = "Max number of attempts to rebalance")
@DefaultValue("3") @QueryParam("maxAttempts")
+ int maxAttempts, @ApiParam(value = "Initial delay to exponentially
backoff retry") @DefaultValue("300000")
+ @QueryParam("retryInitialDelayInMs") long retryInitialDelayInMs,
@ApiParam(value = "Whether to update segment target tier as part of the
rebalance") @DefaultValue("false")
@QueryParam("updateTargetTier") boolean updateTargetTier) {
-
String tableNameWithType = constructTableNameWithType(tableName,
tableTypeStr);
-
RebalanceConfig rebalanceConfig = new RebalanceConfig();
rebalanceConfig.setDryRun(dryRun);
rebalanceConfig.setReassignInstances(reassignInstances);
@@ -630,6 +636,12 @@ public class PinotTableRestletResource {
rebalanceConfig.setBestEfforts(bestEfforts);
rebalanceConfig.setExternalViewCheckIntervalInMs(externalViewCheckIntervalInMs);
rebalanceConfig.setExternalViewStabilizationTimeoutInMs(externalViewStabilizationTimeoutInMs);
+ heartbeatIntervalInMs = Math.max(externalViewCheckIntervalInMs,
heartbeatIntervalInMs);
+ rebalanceConfig.setHeartbeatIntervalInMs(heartbeatIntervalInMs);
+ heartbeatTimeoutInMs = Math.max(heartbeatTimeoutInMs, 3 *
heartbeatIntervalInMs);
+ rebalanceConfig.setHeartbeatTimeoutInMs(heartbeatTimeoutInMs);
+ rebalanceConfig.setMaxAttempts(maxAttempts);
+ rebalanceConfig.setRetryInitialDelayInMs(retryInitialDelayInMs);
rebalanceConfig.setUpdateTargetTier(updateTargetTier);
String rebalanceJobId =
TableRebalancer.createUniqueRebalanceJobIdentifier();
@@ -666,6 +678,79 @@ public class PinotTableRestletResource {
}
}
+ @DELETE
+ @Produces(MediaType.APPLICATION_JSON)
+ @Authenticate(AccessType.UPDATE)
+ @Path("/tables/{tableName}/rebalance")
+ @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action =
Actions.Table.CANCEL_REBALANCE)
+ @ApiOperation(value = "Cancel all rebalance jobs for the given table, and
noop if no rebalance is running", notes =
+ "Cancel all rebalance jobs for the given table, and noop if no rebalance
is running")
+ public List<String> cancelRebalance(
+ @ApiParam(value = "Name of the table to rebalance", required = true)
@PathParam("tableName") String tableName,
+ @ApiParam(value = "OFFLINE|REALTIME", required = true)
@QueryParam("type") String tableTypeStr) {
+ String tableNameWithType = constructTableNameWithType(tableName,
tableTypeStr);
+ List<String> cancelledJobIds = new ArrayList<>();
+ boolean updated =
+ _pinotHelixResourceManager.updateJobsForTable(tableNameWithType,
ControllerJobType.TABLE_REBALANCE,
+ jobMetadata -> {
+ String jobId =
jobMetadata.get(CommonConstants.ControllerJob.JOB_ID);
+ try {
+ String jobStatsInStr =
jobMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS);
+ TableRebalanceProgressStats jobStats =
+ JsonUtils.stringToObject(jobStatsInStr,
TableRebalanceProgressStats.class);
+ if (jobStats.getStatus() !=
RebalanceResult.Status.IN_PROGRESS) {
+ return;
+ }
+ cancelledJobIds.add(jobId);
+ LOGGER.info("Cancel rebalance job: {} for table: {}", jobId,
tableNameWithType);
+ jobStats.setStatus(RebalanceResult.Status.CANCELLED);
+
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
+ JsonUtils.objectToString(jobStats));
+ } catch (Exception e) {
+ LOGGER.error("Failed to cancel rebalance job: {} for table:
{}", jobId, tableNameWithType, e);
+ }
+ });
+ LOGGER.info("Tried to cancel existing jobs at best effort and done: {}",
updated);
+ return cancelledJobIds;
+ }
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Authenticate(AccessType.UPDATE)
+ @Path("/rebalanceStatus/{jobId}")
+ @Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.GET_REBALANCE_STATUS)
+ @ApiOperation(value = "Gets detailed stats of a rebalance operation",
+ notes = "Gets detailed stats of a rebalance operation")
+ public ServerRebalanceJobStatusResponse rebalanceStatus(
+ @ApiParam(value = "Rebalance Job Id", required = true)
@PathParam("jobId") String jobId)
+ throws JsonProcessingException {
+ Map<String, String> controllerJobZKMetadata =
+ _pinotHelixResourceManager.getControllerJobZKMetadata(jobId,
ControllerJobType.TABLE_REBALANCE);
+
+ if (controllerJobZKMetadata == null) {
+ throw new ControllerApplicationException(LOGGER, "Failed to find
controller job id: " + jobId,
+ Response.Status.NOT_FOUND);
+ }
+ ServerRebalanceJobStatusResponse serverRebalanceJobStatusResponse = new
ServerRebalanceJobStatusResponse();
+ TableRebalanceProgressStats tableRebalanceProgressStats =
JsonUtils.stringToObject(
+
controllerJobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS),
+ TableRebalanceProgressStats.class);
+
serverRebalanceJobStatusResponse.setTableRebalanceProgressStats(tableRebalanceProgressStats);
+
+ long timeSinceStartInSecs = 0L;
+ if (RebalanceResult.Status.DONE !=
tableRebalanceProgressStats.getStatus()) {
+ timeSinceStartInSecs = (System.currentTimeMillis() -
tableRebalanceProgressStats.getStartTimeMs()) / 1000;
+ }
+
serverRebalanceJobStatusResponse.setTimeElapsedSinceStartInSeconds(timeSinceStartInSecs);
+
+ String jobCtxInStr =
controllerJobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT);
+ if (StringUtils.isNotEmpty(jobCtxInStr)) {
+ TableRebalanceContext jobCtx = JsonUtils.stringToObject(jobCtxInStr,
TableRebalanceContext.class);
+ serverRebalanceJobStatusResponse.setTableRebalanceContext(jobCtx);
+ }
+ return serverRebalanceJobStatusResponse;
+ }
+
@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("/tables/{tableName}/state")
@@ -724,37 +809,6 @@ public class PinotTableRestletResource {
}
}
- @GET
- @Produces(MediaType.APPLICATION_JSON)
- @Authenticate(AccessType.UPDATE)
- @Path("/rebalanceStatus/{jobId}")
- @Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.GET_REBALANCE_STATUS)
- @ApiOperation(value = "Gets detailed stats of a rebalance operation",
- notes = "Gets detailed stats of a rebalance operation")
- public ServerRebalanceJobStatusResponse rebalanceStatus(
- @ApiParam(value = "Rebalance Job Id", required = true)
@PathParam("jobId") String jobId)
- throws JsonProcessingException {
- Map<String, String> controllerJobZKMetadata =
- _pinotHelixResourceManager.getControllerJobZKMetadata(jobId,
ControllerJobType.TABLE_REBALANCE);
-
- if (controllerJobZKMetadata == null) {
- throw new ControllerApplicationException(LOGGER, "Failed to find
controller job id: " + jobId,
- Response.Status.NOT_FOUND);
- }
- TableRebalanceProgressStats tableRebalanceProgressStats =
JsonUtils.stringToObject(
-
controllerJobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS),
- TableRebalanceProgressStats.class);
- long timeSinceStartInSecs = 0L;
- if
(!RebalanceResult.Status.DONE.toString().equals(tableRebalanceProgressStats.getStatus()))
{
- timeSinceStartInSecs = (System.currentTimeMillis() -
tableRebalanceProgressStats.getStartTimeMs()) / 1000;
- }
-
- ServerRebalanceJobStatusResponse serverRebalanceJobStatusResponse = new
ServerRebalanceJobStatusResponse();
-
serverRebalanceJobStatusResponse.setTableRebalanceProgressStats(tableRebalanceProgressStats);
-
serverRebalanceJobStatusResponse.setTimeElapsedSinceStartInSeconds(timeSinceStartInSecs);
- return serverRebalanceJobStatusResponse;
- }
-
@GET
@Path("/tables/{tableName}/stats")
@Authorize(targetType = TargetType.TABLE, paramName = "tableName", action =
Actions.Table.GET_METADATA)
@@ -987,10 +1041,10 @@ public class PinotTableRestletResource {
}
Map<String, Map<String, String>> result = new HashMap<>();
for (String tableNameWithType : tableNamesWithType) {
-
result.putAll(_pinotHelixResourceManager.getAllJobsForTable(tableNameWithType,
- jobTypesToFilter == null ? validJobTypes : jobTypesToFilter));
+ result.putAll(_pinotHelixResourceManager.getAllJobs(jobTypesToFilter ==
null ? validJobTypes : jobTypesToFilter,
+ jobMetadata ->
jobMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE)
+ .equals(tableNameWithType)));
}
-
return result;
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ServerRebalanceJobStatusResponse.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ServerRebalanceJobStatusResponse.java
index 3018b4877e..3551c9c811 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ServerRebalanceJobStatusResponse.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ServerRebalanceJobStatusResponse.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.controller.api.resources;
+import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceContext;
import
org.apache.pinot.controller.helix.core.rebalance.TableRebalanceProgressStats;
@@ -26,6 +27,8 @@ public class ServerRebalanceJobStatusResponse {
private TableRebalanceProgressStats _tableRebalanceProgressStats;
+ private TableRebalanceContext _tableRebalanceContext;
+
public void setTimeElapsedSinceStartInSeconds(Long timeElapsedSinceStart) {
_timeElapsedSinceStartInSeconds = timeElapsedSinceStart;
}
@@ -41,4 +44,12 @@ public class ServerRebalanceJobStatusResponse {
public long getTimeElapsedSinceStartInSeconds() {
return _timeElapsedSinceStartInSeconds;
}
+
+ public TableRebalanceContext getTableRebalanceContext() {
+ return _tableRebalanceContext;
+ }
+
+ public void setTableRebalanceContext(TableRebalanceContext
tableRebalanceContext) {
+ _tableRebalanceContext = tableRebalanceContext;
+ }
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index bfb4918bf7..a85401d706 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -48,7 +48,9 @@ import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
import java.util.function.Function;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.ws.rs.BadRequestException;
@@ -86,7 +88,6 @@ import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.assignment.InstancePartitionsUtils;
@@ -146,6 +147,7 @@ import
org.apache.pinot.controller.helix.core.lineage.LineageManagerFactory;
import
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
+import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceContext;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
import
org.apache.pinot.controller.helix.core.rebalance.ZkBasedTableRebalanceObserver;
import org.apache.pinot.controller.helix.core.util.ZKMetadataUtils;
@@ -1981,43 +1983,38 @@ public class PinotHelixResourceManager {
/**
* Returns the ZK metdata for the given jobId and jobType
* @param jobId the id of the job
- * @param jobType Job Path
+ * @param jobType the type of the job to figure out where job metadata is
kept in ZK
* @return Map representing the job's ZK properties
*/
@Nullable
public Map<String, String> getControllerJobZKMetadata(String jobId,
ControllerJobType jobType) {
- String controllerJobResourcePath =
ZKMetadataProvider.constructPropertyStorePathForControllerJob(jobType);
- ZNRecord taskResourceZnRecord =
_propertyStore.get(controllerJobResourcePath, null, AccessOption.PERSISTENT);
- if (taskResourceZnRecord != null) {
- return taskResourceZnRecord.getMapFields().get(jobId);
- }
- return null;
+ String jobResourcePath =
ZKMetadataProvider.constructPropertyStorePathForControllerJob(jobType);
+ ZNRecord jobsZnRecord = _propertyStore.get(jobResourcePath, null,
AccessOption.PERSISTENT);
+ return jobsZnRecord != null ? jobsZnRecord.getMapFields().get(jobId) :
null;
}
/**
- * Returns a Map of jobId to job's ZK metadata for the given table
- * @param tableNameWithType the table for which jobs are to be fetched
+ * Returns a Map of jobId to job's ZK metadata that passes the checker, like
for specific tables.
* @return A Map of jobId to job properties
*/
- public Map<String, Map<String, String>> getAllJobsForTable(String
tableNameWithType,
- Set<ControllerJobType> jobTypes) {
+ public Map<String, Map<String, String>> getAllJobs(Set<ControllerJobType>
jobTypes,
+ Predicate<Map<String, String>> jobMetadataChecker) {
Map<String, Map<String, String>> controllerJobs = new HashMap<>();
for (ControllerJobType jobType : jobTypes) {
- String jobsResourcePath =
ZKMetadataProvider.constructPropertyStorePathForControllerJob(jobType);
- try {
- ZNRecord znRecord = _propertyStore.get(jobsResourcePath, null, -1);
- if (znRecord != null) {
- Map<String, Map<String, String>> tableJobsRecord =
znRecord.getMapFields();
- for (Map.Entry<String, Map<String, String>> tableEntry :
tableJobsRecord.entrySet()) {
- if
(tableEntry.getValue().get(CommonConstants.ControllerJob.JOB_TYPE).equals(jobType.name())
- &&
tableEntry.getValue().get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE)
- .equals(tableNameWithType)) {
- controllerJobs.put(tableEntry.getKey(), tableEntry.getValue());
- }
- }
+ String jobResourcePath =
ZKMetadataProvider.constructPropertyStorePathForControllerJob(jobType);
+ ZNRecord jobsZnRecord = _propertyStore.get(jobResourcePath, null,
AccessOption.PERSISTENT);
+ if (jobsZnRecord == null) {
+ continue;
+ }
+ Map<String, Map<String, String>> jobMetadataMap =
jobsZnRecord.getMapFields();
+ for (Map.Entry<String, Map<String, String>> jobMetadataEntry :
jobMetadataMap.entrySet()) {
+ String jobId = jobMetadataEntry.getKey();
+ Map<String, String> jobMetadata = jobMetadataEntry.getValue();
+
Preconditions.checkState(jobMetadata.get(CommonConstants.ControllerJob.JOB_TYPE).equals(jobType.name()),
+ "Got unexpected jobType: %s at jobResourcePath: %s with jobId:
%s", jobType.name(), jobResourcePath, jobId);
+ if (jobMetadataChecker.test(jobMetadata)) {
+ controllerJobs.put(jobId, jobMetadata);
}
- } catch (ZkNoNodeException e) {
- LOGGER.warn("Could not find controller job node for table : {}
jobType: {}", tableNameWithType, jobType, e);
}
}
return controllerJobs;
@@ -2041,8 +2038,7 @@ public class PinotHelixResourceManager {
jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
Long.toString(jobSubmissionTimeMs));
jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT,
Integer.toString(numMessagesSent));
jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME,
segmentName);
- return addControllerJobToZK(jobId, jobMetadata,
-
ZKMetadataProvider.constructPropertyStorePathForControllerJob(ControllerJobType.RELOAD_SEGMENT));
+ return addControllerJobToZK(jobId, jobMetadata,
ControllerJobType.RELOAD_SEGMENT);
}
public boolean addNewForceCommitJob(String tableNameWithType, String jobId,
long jobSubmissionTimeMs,
@@ -2055,8 +2051,7 @@ public class PinotHelixResourceManager {
jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
Long.toString(jobSubmissionTimeMs));
jobMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST,
JsonUtils.objectToString(consumingSegmentsCommitted));
- return addControllerJobToZK(jobId, jobMetadata,
-
ZKMetadataProvider.constructPropertyStorePathForControllerJob(ControllerJobType.FORCE_COMMIT));
+ return addControllerJobToZK(jobId, jobMetadata,
ControllerJobType.FORCE_COMMIT);
}
/**
@@ -2075,32 +2070,81 @@ public class PinotHelixResourceManager {
jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobType.RELOAD_SEGMENT.toString());
jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
Long.toString(jobSubmissionTimeMs));
jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT,
Integer.toString(numberOfMessagesSent));
- return addControllerJobToZK(jobId, jobMetadata,
-
ZKMetadataProvider.constructPropertyStorePathForControllerJob(ControllerJobType.RELOAD_SEGMENT));
+ return addControllerJobToZK(jobId, jobMetadata,
ControllerJobType.RELOAD_SEGMENT);
}
- public boolean addControllerJobToZK(String jobId, Map<String, String>
jobMetadata, String jobResourcePath) {
+ /**
+ * Adds a new job metadata for controller job like table rebalance or reload
into ZK
+ * @param jobId job's UUID
+ * @param jobMetadata the job metadata
+ * @param jobType the type of the job to figure out where job metadata is
kept in ZK
+ * @return boolean representing success / failure of the ZK write step
+ */
+ public boolean addControllerJobToZK(String jobId, Map<String, String>
jobMetadata, ControllerJobType jobType) {
+ return addControllerJobToZK(jobId, jobMetadata, jobType, prev -> true);
+ }
+
+ /**
+ * Adds a new job metadata for controller job like table rebalance or reload
into ZK
+ * @param jobId job's UUID
+ * @param jobMetadata the job metadata
+ * @param jobType the type of the job to figure out where job metadata is
kept in ZK
+ * @param prevJobMetadataChecker to check the previous job metadata before
adding new one
+ * @return boolean representing success / failure of the ZK write step
+ */
+ public boolean addControllerJobToZK(String jobId, Map<String, String>
jobMetadata, ControllerJobType jobType,
+ Predicate<Map<String, String>> prevJobMetadataChecker) {
Preconditions.checkState(jobMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)
!= null,
"Submission Time in JobMetadata record not set. Cannot expire these
records");
+ String jobResourcePath =
ZKMetadataProvider.constructPropertyStorePathForControllerJob(jobType);
Stat stat = new Stat();
- ZNRecord tableJobsZnRecord = _propertyStore.get(jobResourcePath, stat,
AccessOption.PERSISTENT);
- if (tableJobsZnRecord != null) {
- Map<String, Map<String, String>> tasks =
tableJobsZnRecord.getMapFields();
- tasks.put(jobId, jobMetadata);
- if (tasks.size() >
CommonConstants.ControllerJob.MAXIMUM_CONTROLLER_JOBS_IN_ZK) {
- tasks = tasks.entrySet().stream().sorted((v1, v2) -> Long.compare(
+ ZNRecord jobsZnRecord = _propertyStore.get(jobResourcePath, stat,
AccessOption.PERSISTENT);
+ if (jobsZnRecord != null) {
+ Map<String, Map<String, String>> jobMetadataMap =
jobsZnRecord.getMapFields();
+ Map<String, String> prevJobMetadata = jobMetadataMap.get(jobId);
+ if (!prevJobMetadataChecker.test(prevJobMetadata)) {
+ return false;
+ }
+ jobMetadataMap.put(jobId, jobMetadata);
+ if (jobMetadataMap.size() >
CommonConstants.ControllerJob.MAXIMUM_CONTROLLER_JOBS_IN_ZK) {
+ jobMetadataMap = jobMetadataMap.entrySet().stream().sorted((v1, v2) ->
Long.compare(
Long.parseLong(v2.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)),
Long.parseLong(v1.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS))))
.collect(Collectors.toList()).subList(0,
CommonConstants.ControllerJob.MAXIMUM_CONTROLLER_JOBS_IN_ZK)
.stream().collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
}
- tableJobsZnRecord.setMapFields(tasks);
- return _propertyStore.set(jobResourcePath, tableJobsZnRecord,
stat.getVersion(), AccessOption.PERSISTENT);
+ jobsZnRecord.setMapFields(jobMetadataMap);
+ return _propertyStore.set(jobResourcePath, jobsZnRecord,
stat.getVersion(), AccessOption.PERSISTENT);
} else {
- tableJobsZnRecord = new ZNRecord(jobResourcePath);
- tableJobsZnRecord.setMapField(jobId, jobMetadata);
- return _propertyStore.set(jobResourcePath, tableJobsZnRecord,
AccessOption.PERSISTENT);
+ jobsZnRecord = new ZNRecord(jobResourcePath);
+ jobsZnRecord.setMapField(jobId, jobMetadata);
+ return _propertyStore.set(jobResourcePath, jobsZnRecord,
AccessOption.PERSISTENT);
+ }
+ }
+
+ /**
+ * Update existing job metadata belong to the table
+ * @param tableNameWithType whose job metadata to be updated
+ * @param jobType the type of the job to figure out where job metadata is
kept in ZK
+ * @param updater to modify the job metadata in place
+ * @return boolean representing success / failure of the ZK write step
+ */
+ public boolean updateJobsForTable(String tableNameWithType,
ControllerJobType jobType,
+ Consumer<Map<String, String>> updater) {
+ String jobResourcePath =
ZKMetadataProvider.constructPropertyStorePathForControllerJob(jobType);
+ Stat stat = new Stat();
+ ZNRecord jobsZnRecord = _propertyStore.get(jobResourcePath, stat,
AccessOption.PERSISTENT);
+ if (jobsZnRecord == null) {
+ return true;
+ }
+ Map<String, Map<String, String>> jobMetadataMap =
jobsZnRecord.getMapFields();
+ for (Map<String, String> jobMetadata : jobMetadataMap.values()) {
+ if
(jobMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE).equals(tableNameWithType))
{
+ updater.accept(jobMetadata);
+ }
}
+ jobsZnRecord.setMapFields(jobMetadataMap);
+ return _propertyStore.set(jobResourcePath, jobsZnRecord,
stat.getVersion(), AccessOption.PERSISTENT);
}
@VisibleForTesting
@@ -3112,12 +3156,19 @@ public class PinotHelixResourceManager {
throw new TableNotFoundException("Failed to find table config for table:
" + tableNameWithType);
}
Preconditions.checkState(rebalanceJobId != null, "RebalanceId not
populated in the rebalanceConfig");
- if (rebalanceConfig.isUpdateTargetTier()) {
- updateTargetTier(rebalanceJobId, tableNameWithType, tableConfig);
- }
ZkBasedTableRebalanceObserver zkBasedTableRebalanceObserver = null;
if (trackRebalanceProgress) {
- zkBasedTableRebalanceObserver = new
ZkBasedTableRebalanceObserver(tableNameWithType, rebalanceJobId, this);
+ zkBasedTableRebalanceObserver = new
ZkBasedTableRebalanceObserver(tableNameWithType, rebalanceJobId,
+ TableRebalanceContext.forInitialAttempt(rebalanceJobId,
rebalanceConfig), this);
+ }
+ return rebalanceTable(tableNameWithType, tableConfig, rebalanceJobId,
rebalanceConfig,
+ zkBasedTableRebalanceObserver);
+ }
+
+ public RebalanceResult rebalanceTable(String tableNameWithType, TableConfig
tableConfig, String rebalanceJobId,
+ RebalanceConfig rebalanceConfig, @Nullable ZkBasedTableRebalanceObserver
zkBasedTableRebalanceObserver) {
+ if (rebalanceConfig.isUpdateTargetTier()) {
+ updateTargetTier(rebalanceJobId, tableNameWithType, tableConfig);
}
TableRebalancer tableRebalancer =
new TableRebalancer(_helixZkManager, zkBasedTableRebalanceObserver,
_controllerMetrics);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/NoOpTableRebalanceObserver.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/NoOpTableRebalanceObserver.java
index 9bb9179c90..a435e35ead 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/NoOpTableRebalanceObserver.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/NoOpTableRebalanceObserver.java
@@ -36,4 +36,14 @@ public class NoOpTableRebalanceObserver implements
TableRebalanceObserver {
@Override
public void onError(String errorMsg) {
}
+
+ @Override
+ public boolean isStopped() {
+ return false;
+ }
+
+ @Override
+ public RebalanceResult.Status getStopStatus() {
+ throw new UnsupportedOperationException();
+ }
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java
new file mode 100644
index 0000000000..93e4b04dcb
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
+import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.common.metrics.ControllerMeter;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Periodic task to check whether a user triggered rebalance job is completed
or not, and retry if failed. The retry
+ * job is started with the same rebalance configs provided by the user and
does best effort to stop the other jobs
+ * for the same table. This task can be configured to just check failures and
report metrics, and not to do retry.
+ */
+public class RebalanceChecker extends ControllerPeriodicTask<Void> {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RebalanceChecker.class);
+ private static final double RETRY_DELAY_SCALE_FACTOR = 2.0;
+ private final ExecutorService _executorService;
+
+ public RebalanceChecker(PinotHelixResourceManager pinotHelixResourceManager,
+ LeadControllerManager leadControllerManager, ControllerConf config,
ControllerMetrics controllerMetrics,
+ ExecutorService executorService) {
+ super(RebalanceChecker.class.getSimpleName(),
config.getRebalanceCheckerFrequencyInSeconds(),
+ config.getRebalanceCheckerInitialDelayInSeconds(),
pinotHelixResourceManager, leadControllerManager,
+ controllerMetrics);
+ _executorService = executorService;
+ }
+
+ @Override
+ protected void processTables(List<String> tableNamesWithType, Properties
periodicTaskProperties) {
+ int numTables = tableNamesWithType.size();
+ LOGGER.info("Processing {} tables in task: {}", numTables, _taskName);
+ int numTablesProcessed = retryRebalanceTables(new
HashSet<>(tableNamesWithType));
+
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED,
_taskName,
+ numTablesProcessed);
+ LOGGER.info("Finish processing {}/{} tables in task: {}",
numTablesProcessed, numTables, _taskName);
+ }
+
+ /**
+ * Rare but the task may be executed by more than one threads because user
can trigger the periodic task to run
+ * immediately, in addition to the one scheduled to run periodically. So
make this method synchronized to be simple.
+ */
+ private synchronized int retryRebalanceTables(Set<String>
tableNamesWithType) {
+ // Get all jobMetadata for all the given tables with a single ZK read.
+ Map<String, Map<String, String>> allJobMetadataByJobId =
+
_pinotHelixResourceManager.getAllJobs(Collections.singleton(ControllerJobType.TABLE_REBALANCE),
+ jobMetadata -> tableNamesWithType.contains(
+
jobMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE)));
+ Map<String, Map<String, Map<String, String>>> tableJobMetadataMap = new
HashMap<>();
+ allJobMetadataByJobId.forEach((jobId, jobMetadata) -> {
+ String tableNameWithType =
jobMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE);
+ tableJobMetadataMap.computeIfAbsent(tableNameWithType, k -> new
HashMap<>()).put(jobId, jobMetadata);
+ });
+ int numTablesProcessed = 0;
+ for (String tableNameWithType : tableNamesWithType) {
+ try {
+ LOGGER.info("Start to retry rebalance for table: {}",
tableNameWithType);
+ // Get all rebalance jobs as tracked in ZK for this table to check if
there is any failure to retry.
+ Map<String, Map<String, String>> allJobMetadata =
tableJobMetadataMap.get(tableNameWithType);
+ if (allJobMetadata.isEmpty()) {
+ LOGGER.info("No rebalance job has been triggered for table: {}. Skip
retry", tableNameWithType);
+ continue;
+ }
+ retryRebalanceTable(tableNameWithType, allJobMetadata);
+ numTablesProcessed++;
+ } catch (Exception e) {
+ LOGGER.error("Failed to retry rebalance for table: {}",
tableNameWithType, e);
+ _controllerMetrics.addMeteredTableValue(tableNameWithType + "." +
_taskName,
+ ControllerMeter.PERIODIC_TASK_ERROR, 1L);
+ }
+ }
+ return numTablesProcessed;
+ }
+
+ @VisibleForTesting
+ void retryRebalanceTable(String tableNameWithType, Map<String, Map<String,
String>> allJobMetadata)
+ throws Exception {
+ // Skip retry for the table if rebalance job is still running or has
completed, in specific:
+ // 1) Skip retry if any rebalance job is actively running. Being actively
running means the job is at IN_PROGRESS
+ // status, and has updated its status kept in ZK within the heartbeat
timeout. It's possible that more than one
+ // rebalance jobs are running for the table, but that's fine with
idempotent rebalance algorithm.
+ // 2) Skip retry if the most recently started rebalance job has completed
with DONE or NO_OP. It's possible that
+ // jobs started earlier may be still running, but they are ignored here.
+ //
+ // Otherwise, we can get a list of failed rebalance jobs, i.e. those at
FAILED status; or IN_PROGRESS status but
+ // haven't updated their status kept in ZK within the heartbeat timeout.
For those candidate jobs to retry:
+ // 1) Firstly, group them by the original jobIds they retry for so that we
can skip those exceeded maxRetry.
+ // 2) For the remaining jobs, we take the one started most recently and
retry it with its original configs.
+ // 3) If configured, we can abort the other rebalance jobs for the table
by setting their status to FAILED.
+ Map<String/*original jobId*/, Set<Pair<TableRebalanceContext/*job
attempts*/, Long
+ /*startTime*/>>> candidateJobs = getCandidateJobs(tableNameWithType,
allJobMetadata);
+ if (candidateJobs.isEmpty()) {
+ LOGGER.info("Found no failed rebalance jobs for table: {}. Skip retry",
tableNameWithType);
+ return;
+ }
+ _controllerMetrics.addMeteredTableValue(tableNameWithType,
ControllerMeter.TABLE_REBALANCE_FAILURE_DETECTED, 1L);
+ Pair<TableRebalanceContext, Long> jobContextAndStartTime =
getLatestJob(candidateJobs);
+ if (jobContextAndStartTime == null) {
+ LOGGER.info("Rebalance has been retried too many times for table: {}.
Skip retry", tableNameWithType);
+ _controllerMetrics.addMeteredTableValue(tableNameWithType,
ControllerMeter.TABLE_REBALANCE_RETRY_TOO_MANY_TIMES,
+ 1L);
+ return;
+ }
+ TableRebalanceContext jobCtx = jobContextAndStartTime.getLeft();
+ String prevJobId = jobCtx.getJobId();
+ RebalanceConfig rebalanceConfig = jobCtx.getConfig();
+ long jobStartTimeMs = jobContextAndStartTime.getRight();
+ long retryDelayMs =
getRetryDelayInMs(rebalanceConfig.getRetryInitialDelayInMs(),
jobCtx.getAttemptId());
+ if (jobStartTimeMs + retryDelayMs > System.currentTimeMillis()) {
+ LOGGER.info("Delay retry for failed rebalance job: {} that started at:
{}, by: {}ms", prevJobId, jobStartTimeMs,
+ retryDelayMs);
+ return;
+ }
+ abortExistingJobs(tableNameWithType, _pinotHelixResourceManager);
+ // Get tableConfig only when the table needs to retry rebalance, and get
it before submitting rebalance to another
+ // thread, in order to avoid unnecessary ZK reads and making too many ZK
reads in a short time.
+ TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+ Preconditions.checkState(tableConfig != null, "Failed to find table config
for table: {}", tableNameWithType);
+ _executorService.submit(() -> {
+ // Retry rebalance in another thread as rebalance can take time.
+ try {
+ retryRebalanceTableWithContext(tableNameWithType, tableConfig, jobCtx);
+ } catch (Throwable t) {
+ LOGGER.error("Failed to retry rebalance for table: {} asynchronously",
tableNameWithType, t);
+ }
+ });
+ }
+
+ private void retryRebalanceTableWithContext(String tableNameWithType,
TableConfig tableConfig,
+ TableRebalanceContext jobCtx) {
+ String prevJobId = jobCtx.getJobId();
+ RebalanceConfig rebalanceConfig = jobCtx.getConfig();
+ TableRebalanceContext retryCtx =
+ TableRebalanceContext.forRetry(jobCtx.getOriginalJobId(),
rebalanceConfig, jobCtx.getAttemptId() + 1);
+ String attemptJobId = retryCtx.getJobId();
+ LOGGER.info("Retry rebalance job: {} for table: {} with attempt job: {}",
prevJobId, tableNameWithType,
+ attemptJobId);
+ _controllerMetrics.addMeteredTableValue(tableNameWithType,
ControllerMeter.TABLE_REBALANCE_RETRY, 1L);
+ ZkBasedTableRebalanceObserver observer =
+ new ZkBasedTableRebalanceObserver(tableNameWithType, attemptJobId,
retryCtx, _pinotHelixResourceManager);
+ RebalanceResult result =
+ _pinotHelixResourceManager.rebalanceTable(tableNameWithType,
tableConfig, attemptJobId, rebalanceConfig,
+ observer);
+ LOGGER.info("New attempt: {} for table: {} is done with result status:
{}", attemptJobId, tableNameWithType,
+ result.getStatus());
+ }
+
+ @VisibleForTesting
+ static long getRetryDelayInMs(long initDelayMs, int attemptId) {
+ // TODO: Just exponential backoff by factor 2 for now. Can add other retry
polices as needed.
+ // The attemptId starts from 1, so minus one as the exponent.
+ double minDelayMs = initDelayMs * Math.pow(RETRY_DELAY_SCALE_FACTOR,
attemptId - 1);
+ double maxDelayMs = minDelayMs * RETRY_DELAY_SCALE_FACTOR;
+ return RandomUtils.nextLong((long) minDelayMs, (long) maxDelayMs);
+ }
+
+ private static void abortExistingJobs(String tableNameWithType,
PinotHelixResourceManager pinotHelixResourceManager) {
+ boolean updated =
pinotHelixResourceManager.updateJobsForTable(tableNameWithType,
ControllerJobType.TABLE_REBALANCE,
+ jobMetadata -> {
+ String jobId = jobMetadata.get(CommonConstants.ControllerJob.JOB_ID);
+ try {
+ String jobStatsInStr =
jobMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS);
+ TableRebalanceProgressStats jobStats =
+ JsonUtils.stringToObject(jobStatsInStr,
TableRebalanceProgressStats.class);
+ if (jobStats.getStatus() != RebalanceResult.Status.IN_PROGRESS) {
+ return;
+ }
+ LOGGER.info("Abort rebalance job: {} for table: {}", jobId,
tableNameWithType);
+ jobStats.setStatus(RebalanceResult.Status.ABORTED);
+
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
+ JsonUtils.objectToString(jobStats));
+ } catch (Exception e) {
+ LOGGER.error("Failed to abort rebalance job: {} for table: {}",
jobId, tableNameWithType, e);
+ }
+ });
+ LOGGER.info("Tried to abort existing jobs at best effort and done: {}",
updated);
+ }
+
+ @VisibleForTesting
+ static Pair<TableRebalanceContext, Long> getLatestJob(
+ Map<String, Set<Pair<TableRebalanceContext, Long>>> candidateJobs) {
+ Pair<TableRebalanceContext, Long> candidateJobRun = null;
+ for (Map.Entry<String, Set<Pair<TableRebalanceContext, Long>>> entry :
candidateJobs.entrySet()) {
+ // The job configs from all retry jobs are same, as the same set of job
configs is used to do retry.
+ // The job metadata kept in ZK is cleaned by submission time order
gradually, so we can't compare Set.size()
+ // against maxAttempts, but check retryNum of each run to see if retries
have exceeded limit.
+ Set<Pair<TableRebalanceContext, Long>> jobRuns = entry.getValue();
+ int maxAttempts =
jobRuns.iterator().next().getLeft().getConfig().getMaxAttempts();
+ Pair<TableRebalanceContext, Long> latestJobRun = null;
+ for (Pair<TableRebalanceContext, Long> jobRun : jobRuns) {
+ if (jobRun.getLeft().getAttemptId() >= maxAttempts) {
+ latestJobRun = null;
+ break;
+ }
+ if (latestJobRun == null || latestJobRun.getRight() <
jobRun.getRight()) {
+ latestJobRun = jobRun;
+ }
+ }
+ if (latestJobRun == null) {
+ LOGGER.info("Rebalance job: {} had exceeded maxAttempts: {}. Skip
retry", entry.getKey(), maxAttempts);
+ continue;
+ }
+ if (candidateJobRun == null || candidateJobRun.getRight() <
latestJobRun.getRight()) {
+ candidateJobRun = latestJobRun;
+ }
+ }
+ return candidateJobRun;
+ }
+
+ @VisibleForTesting
+ static Map<String, Set<Pair<TableRebalanceContext, Long>>>
getCandidateJobs(String tableNameWithType,
+ Map<String, Map<String, String>> allJobMetadata)
+ throws Exception {
+ long nowMs = System.currentTimeMillis();
+ Map<String, Set<Pair<TableRebalanceContext, Long>>> candidates = new
HashMap<>();
+ // If the job started most recently has already completed, then skip retry
for the table.
+ Pair<String, Long> latestStartedJob = null;
+ Pair<String, Long> latestCompletedJob = null;
+ // The processing order of job metadata from the given Map is not
deterministic. Track the completed original
+ // jobs so that we can simply skip the retry jobs belonging to the
completed original jobs.
+ Map<String, String> completedOriginalJobs = new HashMap<>();
+ Set<String> cancelledOriginalJobs = new HashSet<>();
+ for (Map.Entry<String, Map<String, String>> entry :
allJobMetadata.entrySet()) {
+ String jobId = entry.getKey();
+ Map<String, String> jobMetadata = entry.getValue();
+ long statsUpdatedAt =
Long.parseLong(jobMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS));
+ String jobStatsInStr =
jobMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS);
+ if (StringUtils.isEmpty(jobStatsInStr)) {
+ LOGGER.info("Skip rebalance job: {} as it has no job progress stats",
jobId);
+ continue;
+ }
+ String jobCtxInStr =
jobMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT);
+ if (StringUtils.isEmpty(jobCtxInStr)) {
+ LOGGER.info("Skip rebalance job: {} as it has no job context", jobId);
+ continue;
+ }
+ TableRebalanceProgressStats jobStats =
JsonUtils.stringToObject(jobStatsInStr, TableRebalanceProgressStats.class);
+ TableRebalanceContext jobCtx = JsonUtils.stringToObject(jobCtxInStr,
TableRebalanceContext.class);
+ long jobStartTimeMs = jobStats.getStartTimeMs();
+ if (latestStartedJob == null || latestStartedJob.getRight() <
jobStartTimeMs) {
+ latestStartedJob = Pair.of(jobId, jobStartTimeMs);
+ }
+ String originalJobId = jobCtx.getOriginalJobId();
+ RebalanceResult.Status jobStatus = jobStats.getStatus();
+ if (jobStatus == RebalanceResult.Status.DONE || jobStatus ==
RebalanceResult.Status.NO_OP) {
+ LOGGER.info("Skip rebalance job: {} as it has completed with status:
{}", jobId, jobStatus);
+ completedOriginalJobs.put(originalJobId, jobId);
+ if (latestCompletedJob == null || latestCompletedJob.getRight() <
jobStartTimeMs) {
+ latestCompletedJob = Pair.of(jobId, jobStartTimeMs);
+ }
+ continue;
+ }
+ if (jobStatus == RebalanceResult.Status.FAILED || jobStatus ==
RebalanceResult.Status.ABORTED) {
+ LOGGER.info("Found rebalance job: {} for original job: {} has been
stopped with status: {}", jobId,
+ originalJobId, jobStatus);
+ candidates.computeIfAbsent(originalJobId, (k) -> new
HashSet<>()).add(Pair.of(jobCtx, jobStartTimeMs));
+ continue;
+ }
+ if (jobStatus == RebalanceResult.Status.CANCELLED) {
+ LOGGER.info("Found cancelled rebalance job: {} for original job: {}",
jobId, originalJobId);
+ cancelledOriginalJobs.add(originalJobId);
+ continue;
+ }
+ // Check if an IN_PROGRESS job is still actively running.
+ long heartbeatTimeoutMs = jobCtx.getConfig().getHeartbeatTimeoutInMs();
+ if (nowMs - statsUpdatedAt < heartbeatTimeoutMs) {
+ LOGGER.info("Rebalance job: {} is actively running with status updated
at: {} within timeout: {}. Skip "
+ + "retry for table: {}", jobId, statsUpdatedAt,
heartbeatTimeoutMs, tableNameWithType);
+ return Collections.emptyMap();
+ }
+ // The job is considered failed, but it's possible it is still running,
then we might end up with more than one
+ // rebalance jobs running in parallel for a table. The rebalance
algorithm is idempotent, so this should be fine
+ // for the correctness.
+ LOGGER.info("Found stuck rebalance job: {} for original job: {}", jobId,
originalJobId);
+ candidates.computeIfAbsent(originalJobId, (k) -> new
HashSet<>()).add(Pair.of(jobCtx, jobStartTimeMs));
+ }
+ if (latestCompletedJob != null &&
latestCompletedJob.getLeft().equals(latestStartedJob.getLeft())) {
+ LOGGER.info("Rebalance job: {} started most recently has already done.
Skip retry for table: {}",
+ latestCompletedJob.getLeft(), tableNameWithType);
+ return Collections.emptyMap();
+ }
+ for (String jobId : cancelledOriginalJobs) {
+ LOGGER.info("Skip original job: {} as it's cancelled", jobId);
+ candidates.remove(jobId);
+ }
+ for (Map.Entry<String, String> entry : completedOriginalJobs.entrySet()) {
+ LOGGER.info("Skip original job: {} as it's completed by attempt: {}",
entry.getKey(), entry.getValue());
+ candidates.remove(entry.getKey());
+ }
+ return candidates;
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
index 78e3b19bca..94709f065c 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
@@ -84,6 +84,24 @@ public class RebalanceConfig {
@ApiModelProperty(example = "false")
private boolean _updateTargetTier = false;
+ // Update job status every this interval as heartbeat, to indicate the job
is still actively running.
+ @JsonProperty("heartbeatIntervalInMs")
+ @ApiModelProperty(example = "300000")
+ private long _heartbeatIntervalInMs = 300000L;
+
+ // The job is considered as failed if not updating its status by this
timeout, even though it's IN_PROGRESS status.
+ @JsonProperty("heartbeatTimeoutInMs")
+ @ApiModelProperty(example = "3600000")
+ private long _heartbeatTimeoutInMs = 3600000L;
+
+ @JsonProperty("maxAttempts")
+ @ApiModelProperty(example = "3")
+ private int _maxAttempts = 3;
+
+ @JsonProperty("retryInitialDelayInMs")
+ @ApiModelProperty(example = "300000")
+ private long _retryInitialDelayInMs = 300000L;
+
public boolean isDryRun() {
return _dryRun;
}
@@ -164,6 +182,50 @@ public class RebalanceConfig {
_updateTargetTier = updateTargetTier;
}
+ public long getHeartbeatIntervalInMs() {
+ return _heartbeatIntervalInMs;
+ }
+
+ public void setHeartbeatIntervalInMs(long heartbeatIntervalInMs) {
+ _heartbeatIntervalInMs = heartbeatIntervalInMs;
+ }
+
+ public long getHeartbeatTimeoutInMs() {
+ return _heartbeatTimeoutInMs;
+ }
+
+ public void setHeartbeatTimeoutInMs(long heartbeatTimeoutInMs) {
+ _heartbeatTimeoutInMs = heartbeatTimeoutInMs;
+ }
+
+ public int getMaxAttempts() {
+ return _maxAttempts;
+ }
+
+ public void setMaxAttempts(int maxAttempts) {
+ _maxAttempts = maxAttempts;
+ }
+
+ public long getRetryInitialDelayInMs() {
+ return _retryInitialDelayInMs;
+ }
+
+ public void setRetryInitialDelayInMs(long retryInitialDelayInMs) {
+ _retryInitialDelayInMs = retryInitialDelayInMs;
+ }
+
+ @Override
+ public String toString() {
+ return "RebalanceConfig{" + "_dryRun=" + _dryRun + ", _reassignInstances="
+ _reassignInstances
+ + ", _includeConsuming=" + _includeConsuming + ", _bootstrap=" +
_bootstrap + ", _downtime=" + _downtime
+ + ", _minAvailableReplicas=" + _minAvailableReplicas + ",
_bestEfforts=" + _bestEfforts
+ + ", _externalViewCheckIntervalInMs=" + _externalViewCheckIntervalInMs
+ + ", _externalViewStabilizationTimeoutInMs=" +
_externalViewStabilizationTimeoutInMs + ", _updateTargetTier="
+ + _updateTargetTier + ", _heartbeatIntervalInMs=" +
_heartbeatIntervalInMs + ", _heartbeatTimeoutInMs="
+ + _heartbeatTimeoutInMs + ", _maxAttempts=" + _maxAttempts + ",
_retryInitialDelayInMs="
+ + _retryInitialDelayInMs + '}';
+ }
+
public static RebalanceConfig copy(RebalanceConfig cfg) {
RebalanceConfig rc = new RebalanceConfig();
rc._dryRun = cfg._dryRun;
@@ -176,6 +238,10 @@ public class RebalanceConfig {
rc._externalViewCheckIntervalInMs = cfg._externalViewCheckIntervalInMs;
rc._externalViewStabilizationTimeoutInMs =
cfg._externalViewStabilizationTimeoutInMs;
rc._updateTargetTier = cfg._updateTargetTier;
+ rc._heartbeatIntervalInMs = cfg._heartbeatIntervalInMs;
+ rc._heartbeatTimeoutInMs = cfg._heartbeatTimeoutInMs;
+ rc._maxAttempts = cfg._maxAttempts;
+ rc._retryInitialDelayInMs = cfg._retryInitialDelayInMs;
return rc;
}
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceJobConstants.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceJobConstants.java
index 8070d44972..68314bbba2 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceJobConstants.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceJobConstants.java
@@ -24,4 +24,6 @@ public class RebalanceJobConstants {
// Progress status of the rebalance operartion
public static final String JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS =
"REBALANCE_PROGRESS_STATS";
+ // Configs to retry the rebalance operartion
+ public static final String JOB_METADATA_KEY_REBALANCE_CONTEXT =
"REBALANCE_CONTEXT";
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
index ed3ad624d8..2be7fc7753 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
@@ -87,6 +87,10 @@ public class RebalanceResult {
}
public enum Status {
- NO_OP, DONE, FAILED, IN_PROGRESS
+ // FAILED if the job has ended with known exceptions;
+ // ABORTED if the job is stopped by others but retry is still allowed;
+ // CANCELLED if the job is stopped by user, and retry is cancelled too;
+ // UNKNOWN_ERROR if the job hits on an unexpected exception.
+ NO_OP, DONE, FAILED, IN_PROGRESS, ABORTED, CANCELLED, UNKNOWN_ERROR
}
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceContext.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceContext.java
new file mode 100644
index 0000000000..48e43558cc
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceContext.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+/**
+ * Track the job configs and attempt numbers as part of the job ZK metadata to
retry failed rebalance.
+ */
+public class TableRebalanceContext {
+ private static final int INITIAL_ATTEMPT_ID = 1;
+ private String _jobId;
+ private String _originalJobId;
+ private RebalanceConfig _config;
+ private int _attemptId;
+
+ public static TableRebalanceContext forInitialAttempt(String originalJobId,
RebalanceConfig config) {
+ return new TableRebalanceContext(originalJobId, config,
INITIAL_ATTEMPT_ID);
+ }
+
+ public static TableRebalanceContext forRetry(String originalJobId,
RebalanceConfig config, int attemptId) {
+ return new TableRebalanceContext(originalJobId, config, attemptId);
+ }
+
+ public TableRebalanceContext() {
+ // For JSON deserialization.
+ }
+
+ private TableRebalanceContext(String originalJobId, RebalanceConfig config,
int attemptId) {
+ _jobId = createAttemptJobId(originalJobId, attemptId);
+ _originalJobId = originalJobId;
+ _config = config;
+ _attemptId = attemptId;
+ }
+
+ public int getAttemptId() {
+ return _attemptId;
+ }
+
+ public void setAttemptId(int attemptId) {
+ _attemptId = attemptId;
+ }
+
+ public String getOriginalJobId() {
+ return _originalJobId;
+ }
+
+ public void setOriginalJobId(String originalJobId) {
+ _originalJobId = originalJobId;
+ }
+
+ public String getJobId() {
+ return _jobId;
+ }
+
+ public void setJobId(String jobId) {
+ _jobId = jobId;
+ }
+
+ public RebalanceConfig getConfig() {
+ return _config;
+ }
+
+ public void setConfig(RebalanceConfig config) {
+ _config = config;
+ }
+
+ @Override
+ public String toString() {
+ return "TableRebalanceContext{" + "_jobId='" + _jobId + '\'' + ",
_originalJobId='" + _originalJobId + '\''
+ + ", _config=" + _config + ", _attemptId=" + _attemptId + '}';
+ }
+
+ private static String createAttemptJobId(String originalJobId, int
attemptId) {
+ if (attemptId == INITIAL_ATTEMPT_ID) {
+ return originalJobId;
+ }
+ return originalJobId + "_" + attemptId;
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceObserver.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceObserver.java
index d52dce1f6e..e9c5c299cf 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceObserver.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceObserver.java
@@ -43,4 +43,8 @@ public interface TableRebalanceObserver {
void onSuccess(String msg);
void onError(String errorMsg);
+
+ boolean isStopped();
+
+ RebalanceResult.Status getStopStatus();
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceProgressStats.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceProgressStats.java
index 6d6f70b5c7..aad369ae33 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceProgressStats.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceProgressStats.java
@@ -42,7 +42,7 @@ public class TableRebalanceProgressStats {
}
// Done/In_progress/Failed
- private String _status;
+ private RebalanceResult.Status _status;
// When did Rebalance start
private long _startTimeMs;
// How long did rebalance take
@@ -62,7 +62,7 @@ public class TableRebalanceProgressStats {
_initialToTargetStateConvergence = new RebalanceStateStats();
}
- public void setStatus(String status) {
+ public void setStatus(RebalanceResult.Status status) {
_status = status;
}
@@ -90,7 +90,7 @@ public class TableRebalanceProgressStats {
_completionStatusMsg = completionStatusMsg;
}
- public String getStatus() {
+ public RebalanceResult.Status getStatus() {
return _status;
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index a86c3e27a1..a27c4ffbfb 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -29,6 +29,7 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.ToIntFunction;
import javax.annotation.Nullable;
@@ -47,6 +48,7 @@ import
org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.assignment.InstancePartitionsUtils;
import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.metrics.ControllerTimer;
import org.apache.pinot.common.tier.PinotServerTierStorage;
import org.apache.pinot.common.tier.Tier;
import org.apache.pinot.common.tier.TierFactory;
@@ -138,6 +140,24 @@ public class TableRebalancer {
public RebalanceResult rebalance(TableConfig tableConfig, RebalanceConfig
rebalanceConfig,
@Nullable String rebalanceJobId) {
+ long startTime = System.currentTimeMillis();
+ String tableNameWithType = tableConfig.getTableName();
+ RebalanceResult.Status status = RebalanceResult.Status.UNKNOWN_ERROR;
+ try {
+ RebalanceResult result = doRebalance(tableConfig, rebalanceConfig,
rebalanceJobId);
+ status = result.getStatus();
+ return result;
+ } finally {
+ if (_controllerMetrics != null) {
+ _controllerMetrics.addTimedTableValue(String.format("%s.%s",
tableNameWithType, status.toString()),
+ ControllerTimer.TABLE_REBALANCE_EXECUTION_TIME_MS,
System.currentTimeMillis() - startTime,
+ TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+
+ private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig
rebalanceConfig,
+ @Nullable String rebalanceJobId) {
long startTimeMs = System.currentTimeMillis();
String tableNameWithType = tableConfig.getTableName();
if (rebalanceJobId == null) {
@@ -355,6 +375,11 @@ public class TableRebalancer {
"For rebalanceId: %s, caught exception while waiting for
ExternalView to converge for table: %s, "
+ "aborting the rebalance", rebalanceJobId, tableNameWithType);
LOGGER.warn(errorMsg, e);
+ if (_tableRebalanceObserver.isStopped()) {
+ return new RebalanceResult(rebalanceJobId,
_tableRebalanceObserver.getStopStatus(),
+ "Caught exception while waiting for ExternalView to converge: "
+ e, instancePartitionsMap,
+ tierToInstancePartitionsMap, targetAssignment);
+ }
_tableRebalanceObserver.onError(errorMsg);
return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.FAILED,
"Caught exception while waiting for ExternalView to converge: " +
e, instancePartitionsMap,
@@ -434,6 +459,11 @@ public class TableRebalancer {
// Record change of current ideal state and the new target
_tableRebalanceObserver.onTrigger(TableRebalanceObserver.Trigger.IDEAL_STATE_CHANGE_TRIGGER,
currentAssignment,
targetAssignment);
+ if (_tableRebalanceObserver.isStopped()) {
+ return new RebalanceResult(rebalanceJobId,
_tableRebalanceObserver.getStopStatus(),
+ "Rebalance has stopped already before updating the IdealState",
instancePartitionsMap,
+ tierToInstancePartitionsMap, targetAssignment);
+ }
Map<String, Map<String, String>> nextAssignment =
getNextAssignment(currentAssignment, targetAssignment,
minAvailableReplicas, enableStrictReplicaGroup);
LOGGER.info("For rebalanceId: {}, got the next assignment for table: {}
with number of segments to be moved to "
@@ -691,6 +721,11 @@ public class TableRebalancer {
_tableRebalanceObserver.onTrigger(
TableRebalanceObserver.Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER,
externalView.getRecord().getMapFields(),
idealState.getRecord().getMapFields());
+ if (_tableRebalanceObserver.isStopped()) {
+ throw new RuntimeException(
+ String.format("Rebalance for table: %s has already stopped with
status: %s", tableNameWithType,
+ _tableRebalanceObserver.getStopStatus()));
+ }
if (isExternalViewConverged(tableNameWithType,
externalView.getRecord().getMapFields(),
idealState.getRecord().getMapFields(), bestEfforts,
segmentsToMonitor)) {
LOGGER.info("ExternalView converged for table: {}",
tableNameWithType);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
index 1978df35f3..7b57147ec0 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
@@ -19,10 +19,10 @@
package org.apache.pinot.controller.helix.core.rebalance;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.Map;
-import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.spi.utils.CommonConstants;
@@ -30,6 +30,7 @@ import org.apache.pinot.spi.utils.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
* <code>ZkBasedTableRebalanceObserver</code> observes rebalance progress and
tracks rebalance status,
* stats in Zookeeper. This will be used to show the progress of rebalance to
users via rebalanceStatus API.
@@ -39,12 +40,16 @@ public class ZkBasedTableRebalanceObserver implements
TableRebalanceObserver {
private final String _tableNameWithType;
private final String _rebalanceJobId;
private final PinotHelixResourceManager _pinotHelixResourceManager;
- private TableRebalanceProgressStats _tableRebalanceProgressStats;
+ private final TableRebalanceProgressStats _tableRebalanceProgressStats;
+ private final TableRebalanceContext _tableRebalanceContext;
+ private long _lastUpdateTimeMs;
// Keep track of number of updates. Useful during debugging.
private int _numUpdatesToZk;
+ private boolean _isStopped = false;
+ private RebalanceResult.Status _stopStatus;
public ZkBasedTableRebalanceObserver(String tableNameWithType, String
rebalanceJobId,
- PinotHelixResourceManager pinotHelixResourceManager) {
+ TableRebalanceContext tableRebalanceContext, PinotHelixResourceManager
pinotHelixResourceManager) {
Preconditions.checkState(tableNameWithType != null, "Table name cannot be
null");
Preconditions.checkState(rebalanceJobId != null, "rebalanceId cannot be
null");
Preconditions.checkState(pinotHelixResourceManager != null,
"PinotHelixManager cannot be null");
@@ -52,16 +57,19 @@ public class ZkBasedTableRebalanceObserver implements
TableRebalanceObserver {
_rebalanceJobId = rebalanceJobId;
_pinotHelixResourceManager = pinotHelixResourceManager;
_tableRebalanceProgressStats = new TableRebalanceProgressStats();
+ _tableRebalanceContext = tableRebalanceContext;
_numUpdatesToZk = 0;
}
@Override
public void onTrigger(Trigger trigger, Map<String, Map<String, String>>
currentState,
Map<String, Map<String, String>> targetState) {
+ boolean updatedStatsInZk = false;
switch (trigger) {
case START_TRIGGER:
updateOnStart(currentState, targetState);
trackStatsInZk();
+ updatedStatsInZk = true;
break;
// Write to Zk if there's change since previous stats computation
case IDEAL_STATE_CHANGE_TRIGGER:
@@ -71,6 +79,7 @@ public class ZkBasedTableRebalanceObserver implements
TableRebalanceObserver {
latest)) {
_tableRebalanceProgressStats.setCurrentToTargetConvergence(latest);
trackStatsInZk();
+ updatedStatsInZk = true;
}
break;
case EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER:
@@ -79,18 +88,28 @@ public class ZkBasedTableRebalanceObserver implements
TableRebalanceObserver {
_tableRebalanceProgressStats.getExternalViewToIdealStateConvergence(), latest))
{
_tableRebalanceProgressStats.setExternalViewToIdealStateConvergence(latest);
trackStatsInZk();
+ updatedStatsInZk = true;
}
break;
default:
throw new IllegalArgumentException("Unimplemented trigger: " +
trigger);
}
+ // The onTrigger method is mainly driven by the while loop of waiting for
external view to converge to ideal
+ // state. That while loop wait for at least externalViewCheckIntervalInMs.
So the real interval to send out
+ // heartbeat is the max(heartbeat_interval, externalViewCheckIntervalInMs);
+ long heartbeatIntervalInMs =
_tableRebalanceContext.getConfig().getHeartbeatIntervalInMs();
+ if (!updatedStatsInZk && System.currentTimeMillis() - _lastUpdateTimeMs >
heartbeatIntervalInMs) {
+ LOGGER.debug("Update status of rebalance job: {} for table: {} after
{}ms as heartbeat", _rebalanceJobId,
+ _tableNameWithType, heartbeatIntervalInMs);
+ trackStatsInZk();
+ }
}
private void updateOnStart(Map<String, Map<String, String>> currentState,
Map<String, Map<String, String>> targetState) {
- Preconditions.checkState(_tableRebalanceProgressStats.getStatus() !=
RebalanceResult.Status.IN_PROGRESS.toString(),
+ Preconditions.checkState(RebalanceResult.Status.IN_PROGRESS !=
_tableRebalanceProgressStats.getStatus(),
"Rebalance Observer onStart called multiple times");
-
_tableRebalanceProgressStats.setStatus(RebalanceResult.Status.IN_PROGRESS.toString());
+ _tableRebalanceProgressStats.setStatus(RebalanceResult.Status.IN_PROGRESS);
_tableRebalanceProgressStats.setInitialToTargetStateConvergence(
getDifferenceBetweenTableRebalanceStates(targetState, currentState));
_tableRebalanceProgressStats.setStartTimeMs(System.currentTimeMillis());
@@ -98,13 +117,12 @@ public class ZkBasedTableRebalanceObserver implements
TableRebalanceObserver {
@Override
public void onSuccess(String msg) {
- Preconditions.checkState(_tableRebalanceProgressStats.getStatus() !=
RebalanceResult.Status.DONE.toString(),
+ Preconditions.checkState(RebalanceResult.Status.DONE !=
_tableRebalanceProgressStats.getStatus(),
"Table Rebalance already completed");
- long timeToFinishInSeconds =
- (System.currentTimeMillis() -
_tableRebalanceProgressStats.getStartTimeMs()) / 1000L;
+ long timeToFinishInSeconds = (System.currentTimeMillis() -
_tableRebalanceProgressStats.getStartTimeMs()) / 1000L;
_tableRebalanceProgressStats.setCompletionStatusMsg(msg);
_tableRebalanceProgressStats.setTimeToFinishInSeconds(timeToFinishInSeconds);
-
_tableRebalanceProgressStats.setStatus(RebalanceResult.Status.DONE.toString());
+ _tableRebalanceProgressStats.setStatus(RebalanceResult.Status.DONE);
// Zero out the in_progress convergence stats
TableRebalanceProgressStats.RebalanceStateStats stats = new
TableRebalanceProgressStats.RebalanceStateStats();
_tableRebalanceProgressStats.setExternalViewToIdealStateConvergence(stats);
@@ -114,42 +132,97 @@ public class ZkBasedTableRebalanceObserver implements
TableRebalanceObserver {
@Override
public void onError(String errorMsg) {
- long timeToFinishInSeconds =
- (long) (System.currentTimeMillis() -
_tableRebalanceProgressStats.getStartTimeMs()) / 1000;
+ long timeToFinishInSeconds = (System.currentTimeMillis() -
_tableRebalanceProgressStats.getStartTimeMs()) / 1000;
_tableRebalanceProgressStats.setTimeToFinishInSeconds(timeToFinishInSeconds);
-
_tableRebalanceProgressStats.setStatus(RebalanceResult.Status.FAILED.toString());
+ _tableRebalanceProgressStats.setStatus(RebalanceResult.Status.FAILED);
_tableRebalanceProgressStats.setCompletionStatusMsg(errorMsg);
trackStatsInZk();
}
+ @Override
+ public boolean isStopped() {
+ return _isStopped;
+ }
+
+ @Override
+ public RebalanceResult.Status getStopStatus() {
+ return _stopStatus;
+ }
+
public int getNumUpdatesToZk() {
return _numUpdatesToZk;
}
+ @VisibleForTesting
+ TableRebalanceContext getTableRebalanceContext() {
+ return _tableRebalanceContext;
+ }
+
private void trackStatsInZk() {
+ Map<String, String> jobMetadata =
+ createJobMetadata(_tableNameWithType, _rebalanceJobId,
_tableRebalanceProgressStats, _tableRebalanceContext);
+ _pinotHelixResourceManager.addControllerJobToZK(_rebalanceJobId,
jobMetadata, ControllerJobType.TABLE_REBALANCE,
+ prevJobMetadata -> {
+ // In addition to updating job progress status, the observer also
checks if the job status is IN_PROGRESS.
+ // If not, then no need to update the job status, and we keep this
status to end the job promptly.
+ if (prevJobMetadata == null) {
+ return true;
+ }
+ String prevStatsInStr =
prevJobMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS);
+ TableRebalanceProgressStats prevStats;
+ try {
+ prevStats = JsonUtils.stringToObject(prevStatsInStr,
TableRebalanceProgressStats.class);
+ } catch (JsonProcessingException ignore) {
+ return true;
+ }
+ if (prevStats == null || RebalanceResult.Status.IN_PROGRESS ==
prevStats.getStatus()) {
+ return true;
+ }
+ _isStopped = true;
+ _stopStatus = prevStats.getStatus();
+ LOGGER.warn("Rebalance job: {} for table: {} has already stopped
with status: {}", _rebalanceJobId,
+ _tableNameWithType, _stopStatus);
+ // No need to update job status if job has ended. This also keeps
the last status from being overwritten.
+ return false;
+ });
+ _numUpdatesToZk++;
+ _lastUpdateTimeMs = System.currentTimeMillis();
+ LOGGER.debug("Made {} ZK updates for rebalance job: {} of table: {}",
_numUpdatesToZk, _rebalanceJobId,
+ _tableNameWithType);
+ }
+
+ @VisibleForTesting
+ static Map<String, String> createJobMetadata(String tableNameWithType,
String jobId,
+ TableRebalanceProgressStats tableRebalanceProgressStats,
TableRebalanceContext tableRebalanceContext) {
Map<String, String> jobMetadata = new HashMap<>();
- jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE,
_tableNameWithType);
- jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, _rebalanceJobId);
+ jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE,
tableNameWithType);
+ jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
Long.toString(System.currentTimeMillis()));
jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobType.TABLE_REBALANCE.name());
try {
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
- JsonUtils.objectToString(_tableRebalanceProgressStats));
+ JsonUtils.objectToString(tableRebalanceProgressStats));
} catch (JsonProcessingException e) {
- LOGGER.error("Error serialising rebalance stats to JSON for persisting
to ZK {}", _rebalanceJobId, e);
+ LOGGER.error("Error serialising stats for rebalance job: {} of table: {}
to keep in ZK", jobId, tableNameWithType,
+ e);
}
- _pinotHelixResourceManager.addControllerJobToZK(_rebalanceJobId,
jobMetadata,
-
ZKMetadataProvider.constructPropertyStorePathForControllerJob(ControllerJobType.TABLE_REBALANCE));
- _numUpdatesToZk++;
- LOGGER.debug("Number of updates to Zk: {} for rebalanceJob: {} ",
_numUpdatesToZk, _rebalanceJobId);
+ try {
+ jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT,
+ JsonUtils.objectToString(tableRebalanceContext));
+ } catch (JsonProcessingException e) {
+ LOGGER.error("Error serialising retry configs for rebalance job: {} of
table: {} to keep in ZK", jobId,
+ tableNameWithType, e);
+ }
+ return jobMetadata;
}
/**
- * Takes in targetState and sourceState and computes stats based on the
comparison
- * between sourceState and targetState.This captures how far the source
state is from
- * the target state. Example - if there are 4 segments and 16 replicas in
the source state
- * not matching the target state, _segmentsToRebalance is 4 and
_replicasToRebalance is 16.
- * @param targetState- The state that we want to get to
+ * Takes in targetState and sourceState and computes stats based on the
comparison between sourceState and
+ * targetState.This captures how far the source state is from the target
state. Example - if there are 4 segments and
+ * 16 replicas in the source state not matching the target state,
_segmentsToRebalance is 4 and _replicasToRebalance
+ * is 16.
+ *
+ * @param targetState - The state that we want to get to
* @param sourceState - A given state that needs to converge to targetState
* @return RebalanceStats
*/
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
index 6c26985539..9fabc64d9d 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
@@ -25,7 +25,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants;
@@ -108,8 +107,7 @@ public class ZkBasedTenantRebalanceObserver implements
TenantRebalanceObserver {
} catch (JsonProcessingException e) {
LOGGER.error("Error serialising rebalance stats to JSON for persisting
to ZK {}", _jobId, e);
}
- _pinotHelixResourceManager.addControllerJobToZK(_jobId, jobMetadata,
-
ZKMetadataProvider.constructPropertyStorePathForControllerJob(ControllerJobType.TENANT_REBALANCE));
+ _pinotHelixResourceManager.addControllerJobToZK(_jobId, jobMetadata,
ControllerJobType.TENANT_REBALANCE);
_numUpdatesToZk++;
LOGGER.debug("Number of updates to Zk: {} for rebalanceJob: {} ",
_numUpdatesToZk, _jobId);
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterStatelessTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterStatelessTest.java
index 6172c67def..a1b086c8fa 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterStatelessTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterStatelessTest.java
@@ -57,7 +57,7 @@ public class ControllerPeriodicTaskStarterStatelessTest
extends ControllerTest {
}
private class MockControllerStarter extends ControllerStarter {
- private static final int NUM_PERIODIC_TASKS = 10;
+ private static final int NUM_PERIODIC_TASKS = 11;
public MockControllerStarter() {
super();
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceCheckerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceCheckerTest.java
new file mode 100644
index 0000000000..22663fa333
--- /dev/null
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceCheckerTest.java
@@ -0,0 +1,388 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.mockito.ArgumentCaptor;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
+
+public class RebalanceCheckerTest {
+
+ @Test
+ public void testGetRetryDelayInMs() {
+ assertEquals(RebalanceChecker.getRetryDelayInMs(0, 1), 0);
+ assertEquals(RebalanceChecker.getRetryDelayInMs(0, 2), 0);
+ assertEquals(RebalanceChecker.getRetryDelayInMs(0, 3), 0);
+
+ for (long initDelayMs : new long[]{1, 30000, 3600000}) {
+ long delayMs = RebalanceChecker.getRetryDelayInMs(initDelayMs, 1);
+ assertTrue(delayMs >= initDelayMs && delayMs < initDelayMs * 2);
+ delayMs = RebalanceChecker.getRetryDelayInMs(initDelayMs, 2);
+ assertTrue(delayMs >= initDelayMs * 2 && delayMs < initDelayMs * 4);
+ delayMs = RebalanceChecker.getRetryDelayInMs(initDelayMs, 3);
+ assertTrue(delayMs >= initDelayMs * 4 && delayMs < initDelayMs * 8);
+ }
+ }
+
+ @Test
+ public void testGetCandidateJobs()
+ throws Exception {
+ String tableName = "table01";
+ Map<String, Map<String, String>> allJobMetadata = new HashMap<>();
+
+ // Original job run as job1, and all its retry jobs failed too.
+ RebalanceConfig jobCfg = new RebalanceConfig();
+ jobCfg.setMaxAttempts(4);
+ TableRebalanceProgressStats stats = new TableRebalanceProgressStats();
+ stats.setStatus(RebalanceResult.Status.FAILED);
+ stats.setStartTimeMs(1000);
+ TableRebalanceContext jobCtx =
TableRebalanceContext.forInitialAttempt("job1", jobCfg);
+ Map<String, String> jobMetadata =
ZkBasedTableRebalanceObserver.createJobMetadata(tableName, "job1", stats,
jobCtx);
+ allJobMetadata.put("job1", jobMetadata);
+ // 3 failed retry runs for job1
+ jobMetadata = createDummyJobMetadata(tableName, "job1", 2, 1100,
RebalanceResult.Status.FAILED);
+ allJobMetadata.put("job1_2", jobMetadata);
+ jobMetadata = createDummyJobMetadata(tableName, "job1", 3, 1200,
RebalanceResult.Status.ABORTED);
+ allJobMetadata.put("job1_3", jobMetadata);
+ jobMetadata = createDummyJobMetadata(tableName, "job1", 4, 1300,
RebalanceResult.Status.FAILED);
+ allJobMetadata.put("job1_4", jobMetadata);
+
+ // Original job run as job2, and its retry job job2_1 completed.
+ jobCfg = new RebalanceConfig();
+ jobCfg.setMaxAttempts(4);
+ stats = new TableRebalanceProgressStats();
+ stats.setStatus(RebalanceResult.Status.FAILED);
+ stats.setStartTimeMs(2000);
+ jobCtx = TableRebalanceContext.forInitialAttempt("job2", jobCfg);
+ jobMetadata = ZkBasedTableRebalanceObserver.createJobMetadata(tableName,
"job2", stats, jobCtx);
+ allJobMetadata.put("job2", jobMetadata);
+ jobMetadata = createDummyJobMetadata(tableName, "job2", 2, 2100,
RebalanceResult.Status.DONE);
+ allJobMetadata.put("job2_2", jobMetadata);
+
+ // Original job run as job3, and failed to send out heartbeat in time.
+ jobCfg = new RebalanceConfig();
+ jobCfg.setMaxAttempts(4);
+ stats = new TableRebalanceProgressStats();
+ stats.setStatus(RebalanceResult.Status.IN_PROGRESS);
+ stats.setStartTimeMs(3000);
+ jobCtx = TableRebalanceContext.forInitialAttempt("job3", jobCfg);
+ jobMetadata = ZkBasedTableRebalanceObserver.createJobMetadata(tableName,
"job3", stats, jobCtx);
+ jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, "3000");
+ allJobMetadata.put("job3", jobMetadata);
+
+ // Original job run as job4, which didn't have retryJobCfg as from old
version of the code.
+ stats = new TableRebalanceProgressStats();
+ stats.setStatus(RebalanceResult.Status.FAILED);
+ stats.setStartTimeMs(4000);
+ jobMetadata = ZkBasedTableRebalanceObserver.createJobMetadata(tableName,
"job4", stats, null);
+
jobMetadata.remove(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT);
+ allJobMetadata.put("job4", jobMetadata);
+
+ // Only need to retry job1 and job3, as job2 is completed and job4 is from
old version of code.
+ Map<String, Set<Pair<TableRebalanceContext, Long>>> jobs =
+ RebalanceChecker.getCandidateJobs(tableName, allJobMetadata);
+ assertEquals(jobs.size(), 2);
+ assertTrue(jobs.containsKey("job1"));
+ assertTrue(jobs.containsKey("job3"));
+ assertEquals(jobs.get("job1").size(), 4); // four runs including
job1,job1_1,job1_2,job1_3
+ assertEquals(jobs.get("job3").size(), 1); // just a single run job3
+
+ // Abort job1 and cancel its retries, then only job3 is retry candidate.
+ jobMetadata = allJobMetadata.get("job1_4");
+ cancelRebalanceJob(jobMetadata);
+ jobs = RebalanceChecker.getCandidateJobs(tableName, allJobMetadata);
+ assertEquals(jobs.size(), 1);
+ assertTrue(jobs.containsKey("job3"));
+ assertEquals(jobs.get("job3").size(), 1); // just a single run job3
+
+ // Add latest job5 that's already done, thus no need to retry for table.
+ jobCfg = new RebalanceConfig();
+ jobCfg.setMaxAttempts(4);
+ stats = new TableRebalanceProgressStats();
+ stats.setStatus(RebalanceResult.Status.DONE);
+ stats.setStartTimeMs(5000);
+ jobCtx = TableRebalanceContext.forInitialAttempt("job5", jobCfg);
+ jobMetadata = ZkBasedTableRebalanceObserver.createJobMetadata(tableName,
"job5", stats, jobCtx);
+ allJobMetadata.put("job5", jobMetadata);
+ jobs = RebalanceChecker.getCandidateJobs(tableName, allJobMetadata);
+ assertEquals(jobs.size(), 0);
+ }
+
+ @Test
+ public void testGetLatestJob() {
+ Map<String, Set<Pair<TableRebalanceContext, Long>>> jobs = new HashMap<>();
+ // The most recent job run is job1_3, and within 3 maxAttempts.
+ jobs.put("job1",
+ ImmutableSet.of(Pair.of(createDummyJobCtx("job1", 1), 10L),
Pair.of(createDummyJobCtx("job1", 2), 20L),
+ Pair.of(createDummyJobCtx("job1", 3), 1020L)));
+ jobs.put("job2", ImmutableSet.of(Pair.of(createDummyJobCtx("job2", 1),
1000L)));
+ Pair<TableRebalanceContext, Long> jobTime =
RebalanceChecker.getLatestJob(jobs);
+ assertNotNull(jobTime);
+ assertEquals(jobTime.getLeft().getJobId(), "job1_3");
+
+ // The most recent job run is job1_4, but reached 3 maxAttempts.
+ jobs.put("job1",
+ ImmutableSet.of(Pair.of(createDummyJobCtx("job1", 1), 10L),
Pair.of(createDummyJobCtx("job1", 2), 20L),
+ Pair.of(createDummyJobCtx("job1", 3), 1020L),
Pair.of(createDummyJobCtx("job1", 4), 2020L)));
+ jobTime = RebalanceChecker.getLatestJob(jobs);
+ assertNotNull(jobTime);
+ assertEquals(jobTime.getLeft().getJobId(), "job2");
+
+ // Add job3 that's started more recently.
+ jobs.put("job3", ImmutableSet.of(Pair.of(createDummyJobCtx("job3", 1),
3000L)));
+ jobTime = RebalanceChecker.getLatestJob(jobs);
+ assertNotNull(jobTime);
+ assertEquals(jobTime.getLeft().getJobId(), "job3");
+
+ // Remove job2 and job3, and we'd have no job to retry then.
+ jobs.remove("job2");
+ jobs.remove("job3");
+ jobTime = RebalanceChecker.getLatestJob(jobs);
+ assertNull(jobTime);
+ }
+
+ @Test
+ public void testRetryRebalance()
+ throws Exception {
+ String tableName = "table01";
+ LeadControllerManager leadController = mock(LeadControllerManager.class);
+ ControllerMetrics metrics = mock(ControllerMetrics.class);
+ ExecutorService exec = MoreExecutors.newDirectExecutorService();
+ ControllerConf cfg = new ControllerConf();
+
+ Map<String, Map<String, String>> allJobMetadata = new HashMap<>();
+ // Original job run as job1, and all its retry jobs failed too.
+ RebalanceConfig jobCfg = new RebalanceConfig();
+ jobCfg.setMaxAttempts(4);
+ TableRebalanceProgressStats stats = new TableRebalanceProgressStats();
+ stats.setStatus(RebalanceResult.Status.FAILED);
+ stats.setStartTimeMs(1000);
+ TableRebalanceContext jobCtx =
TableRebalanceContext.forInitialAttempt("job1", jobCfg);
+ Map<String, String> jobMetadata =
ZkBasedTableRebalanceObserver.createJobMetadata(tableName, "job1", stats,
jobCtx);
+ allJobMetadata.put("job1", jobMetadata);
+ // 3 failed retry runs for job1
+ jobMetadata = createDummyJobMetadata(tableName, "job1", 2, 1100,
RebalanceResult.Status.FAILED);
+ allJobMetadata.put("job1_2", jobMetadata);
+ jobMetadata = createDummyJobMetadata(tableName, "job1", 3, 1200,
RebalanceResult.Status.FAILED);
+ allJobMetadata.put("job1_3", jobMetadata);
+ jobMetadata = createDummyJobMetadata(tableName, "job1", 4, 5300,
RebalanceResult.Status.FAILED);
+ allJobMetadata.put("job1_4", jobMetadata);
+
+ // Original job run as job2, and its retry job job2_1 completed.
+ jobCfg = new RebalanceConfig();
+ jobCfg.setMaxAttempts(4);
+ stats = new TableRebalanceProgressStats();
+ stats.setStatus(RebalanceResult.Status.FAILED);
+ stats.setStartTimeMs(2000);
+ jobCtx = TableRebalanceContext.forInitialAttempt("job2", jobCfg);
+ jobMetadata = ZkBasedTableRebalanceObserver.createJobMetadata(tableName,
"job2", stats, jobCtx);
+ allJobMetadata.put("job2", jobMetadata);
+ jobMetadata = createDummyJobMetadata(tableName, "job2", 2, 2100,
RebalanceResult.Status.DONE);
+ allJobMetadata.put("job2_2", jobMetadata);
+
+ // Original job run as job3, and failed to send out heartbeat in time.
+ jobCfg = new RebalanceConfig();
+ jobCfg.setMaxAttempts(4);
+ stats = new TableRebalanceProgressStats();
+ stats.setStatus(RebalanceResult.Status.IN_PROGRESS);
+ stats.setStartTimeMs(3000);
+ jobCtx = TableRebalanceContext.forInitialAttempt("job3", jobCfg);
+ jobMetadata = ZkBasedTableRebalanceObserver.createJobMetadata(tableName,
"job3", stats, jobCtx);
+ jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, "3000");
+ allJobMetadata.put("job3", jobMetadata);
+
+ TableConfig tableConfig = mock(TableConfig.class);
+ PinotHelixResourceManager helixManager =
mock(PinotHelixResourceManager.class);
+ when(helixManager.getTableConfig(tableName)).thenReturn(tableConfig);
+ when(helixManager.getAllJobs(any(), any())).thenReturn(allJobMetadata);
+ RebalanceChecker checker = new RebalanceChecker(helixManager,
leadController, cfg, metrics, exec);
+ // Although job1_3 was submitted most recently but job1 had exceeded
maxAttempts. Chose job3 to retry, which got
+ // stuck at in progress status.
+ checker.retryRebalanceTable(tableName, allJobMetadata);
+ // The new retry job is for job3 and attemptId is increased to 2.
+ ArgumentCaptor<ZkBasedTableRebalanceObserver> observerCaptor =
+ ArgumentCaptor.forClass(ZkBasedTableRebalanceObserver.class);
+ verify(helixManager, times(1)).rebalanceTable(eq(tableName), any(),
anyString(), any(), observerCaptor.capture());
+ ZkBasedTableRebalanceObserver observer = observerCaptor.getValue();
+ jobCtx = observer.getTableRebalanceContext();
+ assertEquals(jobCtx.getOriginalJobId(), "job3");
+ assertEquals(jobCtx.getAttemptId(), 2);
+ }
+
+ @Test
+ public void testRetryRebalanceWithBackoff()
+ throws Exception {
+ String tableName = "table01";
+ LeadControllerManager leadController = mock(LeadControllerManager.class);
+ ControllerMetrics metrics = mock(ControllerMetrics.class);
+ ExecutorService exec = MoreExecutors.newDirectExecutorService();
+ ControllerConf cfg = new ControllerConf();
+
+ Map<String, Map<String, String>> allJobMetadata = new HashMap<>();
+ // Original job run as job1, and all its retry jobs failed too.
+ RebalanceConfig jobCfg = new RebalanceConfig();
+ jobCfg.setMaxAttempts(4);
+ long nowMs = System.currentTimeMillis();
+ TableRebalanceProgressStats stats = new TableRebalanceProgressStats();
+ stats.setStatus(RebalanceResult.Status.FAILED);
+ stats.setStartTimeMs(nowMs);
+ TableRebalanceContext jobCtx =
TableRebalanceContext.forInitialAttempt("job1", jobCfg);
+ Map<String, String> jobMetadata =
ZkBasedTableRebalanceObserver.createJobMetadata(tableName, "job1", stats,
jobCtx);
+ allJobMetadata.put("job1", jobMetadata);
+
+ PinotHelixResourceManager helixManager =
mock(PinotHelixResourceManager.class);
+ TableConfig tableConfig = mock(TableConfig.class);
+ when(helixManager.getTableConfig(tableName)).thenReturn(tableConfig);
+ RebalanceChecker checker = new RebalanceChecker(helixManager,
leadController, cfg, metrics, exec);
+ checker.retryRebalanceTable(tableName, allJobMetadata);
+ // Retry for job1 is delayed with 5min backoff.
+ ArgumentCaptor<ZkBasedTableRebalanceObserver> observerCaptor =
+ ArgumentCaptor.forClass(ZkBasedTableRebalanceObserver.class);
+ verify(helixManager, times(0)).rebalanceTable(eq(tableName), any(),
anyString(), any(), observerCaptor.capture());
+
+ // Set initial delay to 0 to disable retry backoff.
+ jobCfg.setRetryInitialDelayInMs(0);
+ jobMetadata = ZkBasedTableRebalanceObserver.createJobMetadata(tableName,
"job1", stats, jobCtx);
+ allJobMetadata.put("job1", jobMetadata);
+ checker.retryRebalanceTable(tableName, allJobMetadata);
+ // Retry for job1 is delayed with 0 backoff.
+ observerCaptor =
ArgumentCaptor.forClass(ZkBasedTableRebalanceObserver.class);
+ verify(helixManager, times(1)).rebalanceTable(eq(tableName), any(),
anyString(), any(), observerCaptor.capture());
+ }
+
+ @Test
+ public void testAddUpdateControllerJobsForTable() {
+ ControllerConf cfg = new ControllerConf();
+ cfg.setZkStr("localhost:2181");
+ cfg.setHelixClusterName("cluster01");
+ PinotHelixResourceManager pinotHelixManager = new
PinotHelixResourceManager(cfg);
+ HelixManager helixZkManager = mock(HelixManager.class);
+ ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class);
+ String zkPath =
ZKMetadataProvider.constructPropertyStorePathForControllerJob(ControllerJobType.TABLE_REBALANCE);
+ ZNRecord jobsZnRecord = new ZNRecord("jobs");
+ when(propertyStore.get(eq(zkPath), any(),
eq(AccessOption.PERSISTENT))).thenReturn(jobsZnRecord);
+
when(helixZkManager.getClusterManagmentTool()).thenReturn(mock(HelixAdmin.class));
+ when(helixZkManager.getHelixPropertyStore()).thenReturn(propertyStore);
+
when(helixZkManager.getHelixDataAccessor()).thenReturn(mock(HelixDataAccessor.class));
+ pinotHelixManager.start(helixZkManager, null);
+
+ pinotHelixManager.addControllerJobToZK("job1",
+ ImmutableMap.of("jobId", "job1", "submissionTimeMs", "1000",
"tableName", "table01"),
+ ControllerJobType.TABLE_REBALANCE, jmd -> true);
+ pinotHelixManager.addControllerJobToZK("job2",
+ ImmutableMap.of("jobId", "job2", "submissionTimeMs", "2000",
"tableName", "table01"),
+ ControllerJobType.TABLE_REBALANCE, jmd -> false);
+ pinotHelixManager.addControllerJobToZK("job3",
+ ImmutableMap.of("jobId", "job3", "submissionTimeMs", "3000",
"tableName", "table02"),
+ ControllerJobType.TABLE_REBALANCE, jmd -> true);
+ pinotHelixManager.addControllerJobToZK("job4",
+ ImmutableMap.of("jobId", "job4", "submissionTimeMs", "4000",
"tableName", "table02"),
+ ControllerJobType.TABLE_REBALANCE, jmd -> true);
+ Map<String, Map<String, String>> jmds = jobsZnRecord.getMapFields();
+ assertEquals(jmds.size(), 3);
+ assertTrue(jmds.containsKey("job1"));
+ assertTrue(jmds.containsKey("job3"));
+ assertTrue(jmds.containsKey("job4"));
+
+ Set<String> expectedJobs01 = new HashSet<>();
+ pinotHelixManager.updateJobsForTable("table01",
ControllerJobType.TABLE_REBALANCE,
+ jmd -> expectedJobs01.add(jmd.get("jobId")));
+ assertEquals(expectedJobs01.size(), 1);
+ assertTrue(expectedJobs01.contains("job1"));
+
+ Set<String> expectedJobs02 = new HashSet<>();
+ pinotHelixManager.updateJobsForTable("table02",
ControllerJobType.TABLE_REBALANCE,
+ jmd -> expectedJobs02.add(jmd.get("jobId")));
+ assertEquals(expectedJobs02.size(), 2);
+ assertTrue(expectedJobs02.contains("job3"));
+ assertTrue(expectedJobs02.contains("job4"));
+ }
+
+ private static TableRebalanceContext createDummyJobCtx(String originalJobId,
int attemptId) {
+ TableRebalanceContext jobCtx = new TableRebalanceContext();
+ RebalanceConfig cfg = new RebalanceConfig();
+ cfg.setMaxAttempts(4);
+ jobCtx.setJobId(originalJobId);
+ if (attemptId > 1) {
+ jobCtx.setJobId(originalJobId + "_" + attemptId);
+ }
+ jobCtx.setOriginalJobId(originalJobId);
+ jobCtx.setConfig(cfg);
+ jobCtx.setAttemptId(attemptId);
+ return jobCtx;
+ }
+
+ private static Map<String, String> createDummyJobMetadata(String tableName,
String originalJobId, int attemptId,
+ long startTimeMs, RebalanceResult.Status status) {
+ RebalanceConfig cfg = new RebalanceConfig();
+ cfg.setMaxAttempts(4);
+ TableRebalanceProgressStats stats = new TableRebalanceProgressStats();
+ stats.setStatus(status);
+ stats.setStartTimeMs(startTimeMs);
+ TableRebalanceContext jobCtx =
TableRebalanceContext.forRetry(originalJobId, cfg, attemptId);
+ String attemptJobId = originalJobId + "_" + attemptId;
+ return ZkBasedTableRebalanceObserver.createJobMetadata(tableName,
attemptJobId, stats, jobCtx);
+ }
+
+ private static void cancelRebalanceJob(Map<String, String> jobMetadata)
+ throws JsonProcessingException {
+ String jobStatsInStr =
jobMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS);
+ TableRebalanceProgressStats jobStats =
JsonUtils.stringToObject(jobStatsInStr, TableRebalanceProgressStats.class);
+ jobStats.setStatus(RebalanceResult.Status.CANCELLED);
+
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
+ JsonUtils.objectToString(jobStats));
+ }
+}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java
index 74b8a4367f..143caebf2b 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java
@@ -35,14 +35,16 @@ import static org.testng.Assert.assertEquals;
public class TestZkBasedTableRebalanceObserver {
+ // This is a test to verify if Zk stats are pushed out correctly
@Test
- // This is a test to verify if Zk stats are pushed out correctly
void testZkObserverTracking() {
PinotHelixResourceManager pinotHelixResourceManager =
mock(PinotHelixResourceManager.class);
// Mocking this. We will verify using numZkUpdate stat
when(pinotHelixResourceManager.addControllerJobToZK(any(), any(),
any())).thenReturn(true);
+ TableRebalanceContext retryCtx = new TableRebalanceContext();
+ retryCtx.setConfig(new RebalanceConfig());
ZkBasedTableRebalanceObserver observer =
- new ZkBasedTableRebalanceObserver("dummy", "dummyId",
pinotHelixResourceManager);
+ new ZkBasedTableRebalanceObserver("dummy", "dummyId", retryCtx,
pinotHelixResourceManager);
Map<String, Map<String, String>> source = new TreeMap<>();
Map<String, Map<String, String>> target = new TreeMap<>();
target.put("segment1",
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
index e72d066bc9..db6941bf1f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
@@ -90,6 +90,7 @@ public class Actions {
// Action names for table
public static class Table {
public static final String BUILD_ROUTING = "BuildRouting";
+ public static final String CANCEL_REBALANCE = "CancelRebalance";
public static final String CREATE_INSTANCE_PARTITIONS =
"CreateInstancePartitions";
public static final String CREATE_SCHEMA = "CreateSchema";
public static final String CREATE_TABLE = "CreateTable";
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
index 0f88a36d06..4a543556f7 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
@@ -370,9 +370,8 @@ public class PartialUpsertTableRebalanceIntegrationTest
extends BaseClusterInteg
ServerRebalanceJobStatusResponse serverRebalanceJobStatusResponse =
JsonUtils.stringToObject(httpResponse.getResponse(),
ServerRebalanceJobStatusResponse.class);
- String status =
serverRebalanceJobStatusResponse.getTableRebalanceProgressStats().getStatus();
- return status.equals(RebalanceResult.Status.DONE.toString()) ||
status.equals(
- RebalanceResult.Status.FAILED.toString());
+ RebalanceResult.Status status =
serverRebalanceJobStatusResponse.getTableRebalanceProgressStats().getStatus();
+ return status != RebalanceResult.Status.IN_PROGRESS;
} catch (HttpErrorStatusException | URISyntaxException e) {
throw new IOException(e);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]