This is an automated email from the ASF dual-hosted git repository.

xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 0cf091a7ef Add support to retry failed table rebalance (#11740)
0cf091a7ef is described below

commit 0cf091a7ef822470fd2c0195a926b35e8cddcf72
Author: Xiaobing <[email protected]>
AuthorDate: Wed Oct 18 14:33:07 2023 -0700

    Add support to retry failed table rebalance (#11740)
    
    * add support to retry failed table rebalance
    * add exponential retry backoff and more metrics
    * add CANCELLED job status and cancelRebalance restful API for users to 
cancel rebalance
---
 .../configs/controller.yml                         |   6 +
 .../etc/jmx_prometheus_javaagent/configs/pinot.yml |   6 +
 .../pinot/common/metrics/ControllerMeter.java      |   3 +
 .../pinot/common/metrics/ControllerTimer.java      |   1 +
 .../pinot/controller/BaseControllerStarter.java    |   5 +
 .../apache/pinot/controller/ControllerConf.java    |  22 +-
 .../api/resources/PinotTableRestletResource.java   | 126 +++++--
 .../ServerRebalanceJobStatusResponse.java          |  11 +
 .../helix/core/PinotHelixResourceManager.java      | 147 +++++---
 .../core/rebalance/NoOpTableRebalanceObserver.java |  10 +
 .../helix/core/rebalance/RebalanceChecker.java     | 331 ++++++++++++++++++
 .../helix/core/rebalance/RebalanceConfig.java      |  66 ++++
 .../core/rebalance/RebalanceJobConstants.java      |   2 +
 .../helix/core/rebalance/RebalanceResult.java      |   6 +-
 .../core/rebalance/TableRebalanceContext.java      |  94 +++++
 .../core/rebalance/TableRebalanceObserver.java     |   4 +
 .../rebalance/TableRebalanceProgressStats.java     |   6 +-
 .../helix/core/rebalance/TableRebalancer.java      |  35 ++
 .../rebalance/ZkBasedTableRebalanceObserver.java   | 123 +++++--
 .../tenant/ZkBasedTenantRebalanceObserver.java     |   4 +-
 ...ControllerPeriodicTaskStarterStatelessTest.java |   2 +-
 .../helix/core/rebalance/RebalanceCheckerTest.java | 388 +++++++++++++++++++++
 .../TestZkBasedTableRebalanceObserver.java         |   6 +-
 .../java/org/apache/pinot/core/auth/Actions.java   |   1 +
 ...PartialUpsertTableRebalanceIntegrationTest.java |   5 +-
 25 files changed, 1284 insertions(+), 126 deletions(-)

diff --git 
a/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/controller.yml 
b/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/controller.yml
index cfd4f9af31..c4071887ed 100644
--- a/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/controller.yml
+++ b/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/controller.yml
@@ -106,6 +106,12 @@ rules:
   labels:
     table: "$1"
     taskType: "$2"
+- pattern: "\"org.apache.pinot.common.metrics\"<type=\"ControllerMetrics\", 
name=\"pinot.controller.([^\\.]*?)\\.(\\w+).tableRebalanceExecutionTimeMs\"><>(\\w+)"
+  name: "pinot_controller_tableRebalanceExecutionTimeMs_$3"
+  cache: true
+  labels:
+    table: "$1"
+    result: "$2"
 - pattern: "\"org.apache.pinot.common.metrics\"<type=\"ControllerMetrics\", 
name=\"pinot.controller.taskStatus.([^\\.]*?)\\.(\\w+)\"><>(\\w+)"
   name: "pinot_controller_taskStatus_$3"
   cache: true
diff --git a/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/pinot.yml 
b/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/pinot.yml
index 6660ee42d7..1c435be787 100644
--- a/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/pinot.yml
+++ b/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/pinot.yml
@@ -106,6 +106,12 @@ rules:
   labels:
     table: "$1"
     taskType: "$2"
+- pattern: "\"org.apache.pinot.common.metrics\"<type=\"ControllerMetrics\", 
name=\"pinot.controller.([^\\.]*?)\\.(\\w+).tableRebalanceExecutionTimeMs\"><>(\\w+)"
+  name: "pinot_controller_tableRebalanceExecutionTimeMs_$3"
+  cache: true
+  labels:
+    table: "$1"
+    result: "$2"
 - pattern: "\"org.apache.pinot.common.metrics\"<type=\"ControllerMetrics\", 
name=\"pinot.controller.taskStatus.(\\w+)\\.(\\w+)\"><>(\\w+)"
   name: "pinot_controller_taskStatus_$3"
   cache: true
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
index ac933acf6a..2e84f8b510 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
@@ -61,6 +61,9 @@ public enum ControllerMeter implements AbstractMetrics.Meter {
   
LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_ERROR("LLCSegmentDeepStoreUploadRetryError",
 false),
   SEGMENT_MISSING_DEEP_STORE_LINK("RealtimeSegmentMissingDeepStoreLink", 
false),
   DELETED_TMP_SEGMENT_COUNT("DeletedTmpSegmentCount", false),
+  TABLE_REBALANCE_FAILURE_DETECTED("TableRebalanceFailureDetected", false),
+  TABLE_REBALANCE_RETRY("TableRebalanceRetry", false),
+  TABLE_REBALANCE_RETRY_TOO_MANY_TIMES("TableRebalanceRetryTooManyTimes", 
false),
   NUMBER_ADHOC_TASKS_SUBMITTED("adhocTasks", false);
 
 
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerTimer.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerTimer.java
index 6627382db6..64608d0037 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerTimer.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerTimer.java
@@ -26,6 +26,7 @@ import org.apache.pinot.common.Utils;
  *
  */
 public enum ControllerTimer implements AbstractMetrics.Timer {
+  TABLE_REBALANCE_EXECUTION_TIME_MS("tableRebalanceExecutionTimeMs", false),
   CRON_SCHEDULER_JOB_EXECUTION_TIME_MS("cronSchedulerJobExecutionTimeMs", 
false);
 
   private final String _timerName;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 185b1d506e..2093aab0b4 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -93,6 +93,7 @@ import 
org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
 import org.apache.pinot.controller.helix.core.minion.TaskMetricsEmitter;
 import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
 import 
org.apache.pinot.controller.helix.core.realtime.SegmentCompletionManager;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceChecker;
 import 
org.apache.pinot.controller.helix.core.rebalance.tenant.DefaultTenantRebalancer;
 import 
org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalancer;
 import org.apache.pinot.controller.helix.core.relocation.SegmentRelocator;
@@ -174,6 +175,7 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
   protected SegmentRelocator _segmentRelocator;
   protected RetentionManager _retentionManager;
   protected SegmentStatusChecker _segmentStatusChecker;
+  protected RebalanceChecker _rebalanceChecker;
   protected RealtimeConsumerMonitor _realtimeConsumerMonitor;
   protected PinotTaskManager _taskManager;
   protected TaskManagerStatusCache<TaskGeneratorMostRecentRunInfo> 
_taskManagerStatusCache;
@@ -818,6 +820,9 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
         new SegmentStatusChecker(_helixResourceManager, 
_leadControllerManager, _config, _controllerMetrics,
             _executorService);
     periodicTasks.add(_segmentStatusChecker);
+    _rebalanceChecker = new RebalanceChecker(_helixResourceManager, 
_leadControllerManager, _config, _controllerMetrics,
+        _executorService);
+    periodicTasks.add(_rebalanceChecker);
     _realtimeConsumerMonitor =
         new RealtimeConsumerMonitor(_config, _helixResourceManager, 
_leadControllerManager, _controllerMetrics,
             _executorService);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index cb6ee99c4f..b8e2247b08 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -178,9 +178,9 @@ public class ControllerConf extends PinotConfiguration {
     public static final String 
DEPRECATED_SEGMENT_RELOCATOR_FREQUENCY_IN_SECONDS =
         "controller.segment.relocator.frequencyInSeconds";
     public static final String SEGMENT_RELOCATOR_FREQUENCY_PERIOD = 
"controller.segment.relocator.frequencyPeriod";
+    public static final String REBALANCE_CHECKER_FREQUENCY_PERIOD = 
"controller.rebalance.checker.frequencyPeriod";
     // Because segment level validation is expensive and requires heavy ZK 
access, we run segment level validation
-    // with a
-    // separate interval
+    // with a separate interval
     // Deprecated as of 0.8.0
     @Deprecated
     public static final String 
DEPRECATED_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS =
@@ -199,6 +199,8 @@ public class ControllerConf extends PinotConfiguration {
     // RealtimeSegmentRelocator has been rebranded as SegmentRelocator
     public static final String 
DEPRECATED_REALTIME_SEGMENT_RELOCATION_INITIAL_DELAY_IN_SECONDS =
         "controller.realtimeSegmentRelocation.initialDelayInSeconds";
+    public static final String REBALANCE_CHECKER_INITIAL_DELAY_IN_SECONDS =
+        "controller.rebalanceChecker.initialDelayInSeconds";
     public static final String SEGMENT_RELOCATOR_INITIAL_DELAY_IN_SECONDS =
         "controller.segmentRelocator.initialDelayInSeconds";
     public static final String SEGMENT_RELOCATOR_ENABLE_LOCAL_TIER_MIGRATION =
@@ -237,7 +239,8 @@ public class ControllerConf extends PinotConfiguration {
     private static final int 
DEFAULT_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS = 24 * 60 * 60; 
// 24 Hours.
     private static final int 
DEFAULT_REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
     private static final int 
DEFAULT_BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
-    private static final int DEFAULT_STATUS_CONTROLLER_FREQUENCY_IN_SECONDS = 
5 * 60; // 5 minutes
+    private static final int DEFAULT_STATUS_CHECKER_FREQUENCY_IN_SECONDS = 5 * 
60; // 5 minutes
+    private static final int DEFAULT_REBALANCE_CHECKER_FREQUENCY_IN_SECONDS = 
5 * 60; // 5 minutes
     private static final int DEFAULT_TASK_METRICS_EMITTER_FREQUENCY_IN_SECONDS 
= 5 * 60; // 5 minutes
     private static final int 
DEFAULT_STATUS_CONTROLLER_WAIT_FOR_PUSH_TIME_IN_SECONDS = 10 * 60; // 10 minutes
     private static final int DEFAULT_TASK_MANAGER_FREQUENCY_IN_SECONDS = -1; 
// Disabled
@@ -604,7 +607,7 @@ public class ControllerConf extends PinotConfiguration {
     return 
Optional.ofNullable(getProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_FREQUENCY_PERIOD))
         .map(period -> (int) convertPeriodToSeconds(period)).orElseGet(
             () -> 
getProperty(ControllerPeriodicTasksConf.DEPRECATED_STATUS_CHECKER_FREQUENCY_IN_SECONDS,
-                
ControllerPeriodicTasksConf.DEFAULT_STATUS_CONTROLLER_FREQUENCY_IN_SECONDS));
+                
ControllerPeriodicTasksConf.DEFAULT_STATUS_CHECKER_FREQUENCY_IN_SECONDS));
   }
 
   public void setStatusCheckerFrequencyInSeconds(int 
statusCheckerFrequencyInSeconds) {
@@ -612,6 +615,17 @@ public class ControllerConf extends PinotConfiguration {
         Integer.toString(statusCheckerFrequencyInSeconds));
   }
 
+  public int getRebalanceCheckerFrequencyInSeconds() {
+    return 
Optional.ofNullable(getProperty(ControllerPeriodicTasksConf.REBALANCE_CHECKER_FREQUENCY_PERIOD))
+        .map(period -> (int) convertPeriodToSeconds(period)).orElse(
+            
ControllerPeriodicTasksConf.DEFAULT_REBALANCE_CHECKER_FREQUENCY_IN_SECONDS);
+  }
+
+  public long getRebalanceCheckerInitialDelayInSeconds() {
+    return 
getProperty(ControllerPeriodicTasksConf.REBALANCE_CHECKER_INITIAL_DELAY_IN_SECONDS,
+        ControllerPeriodicTasksConf.getRandomInitialDelayInSeconds());
+  }
+
   public int getRealtimeConsumerMonitorRunFrequency() {
     return 
Optional.ofNullable(getProperty(ControllerPeriodicTasksConf.RT_CONSUMER_MONITOR_FREQUENCY_PERIOD))
         .map(period -> (int) convertPeriodToSeconds(period)).orElse(
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index 58abb6c366..f0216da013 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -95,6 +95,7 @@ import 
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManag
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
+import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceContext;
 import 
org.apache.pinot.controller.helix.core.rebalance.TableRebalanceProgressStats;
 import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
 import org.apache.pinot.controller.recommender.RecommenderDriver;
@@ -615,11 +616,16 @@ public class PinotTableRestletResource {
   @QueryParam("externalViewCheckIntervalInMs") long 
externalViewCheckIntervalInMs,
       @ApiParam(value = "How long to wait till external view converges with 
ideal states") @DefaultValue("3600000")
       @QueryParam("externalViewStabilizationTimeoutInMs") long 
externalViewStabilizationTimeoutInMs,
+      @ApiParam(value = "How often to make a status update (i.e. heartbeat)") 
@DefaultValue("300000")
+      @QueryParam("heartbeatIntervalInMs") long heartbeatIntervalInMs,
+      @ApiParam(value = "How long to wait for next status update (i.e. 
heartbeat) before the job is considered failed")
+      @DefaultValue("3600000") @QueryParam("heartbeatTimeoutInMs") long 
heartbeatTimeoutInMs,
+      @ApiParam(value = "Max number of attempts to rebalance") 
@DefaultValue("3") @QueryParam("maxAttempts")
+      int maxAttempts, @ApiParam(value = "Initial delay to exponentially 
backoff retry") @DefaultValue("300000")
+  @QueryParam("retryInitialDelayInMs") long retryInitialDelayInMs,
       @ApiParam(value = "Whether to update segment target tier as part of the 
rebalance") @DefaultValue("false")
       @QueryParam("updateTargetTier") boolean updateTargetTier) {
-
     String tableNameWithType = constructTableNameWithType(tableName, 
tableTypeStr);
-
     RebalanceConfig rebalanceConfig = new RebalanceConfig();
     rebalanceConfig.setDryRun(dryRun);
     rebalanceConfig.setReassignInstances(reassignInstances);
@@ -630,6 +636,12 @@ public class PinotTableRestletResource {
     rebalanceConfig.setBestEfforts(bestEfforts);
     
rebalanceConfig.setExternalViewCheckIntervalInMs(externalViewCheckIntervalInMs);
     
rebalanceConfig.setExternalViewStabilizationTimeoutInMs(externalViewStabilizationTimeoutInMs);
+    heartbeatIntervalInMs = Math.max(externalViewCheckIntervalInMs, 
heartbeatIntervalInMs);
+    rebalanceConfig.setHeartbeatIntervalInMs(heartbeatIntervalInMs);
+    heartbeatTimeoutInMs = Math.max(heartbeatTimeoutInMs, 3 * 
heartbeatIntervalInMs);
+    rebalanceConfig.setHeartbeatTimeoutInMs(heartbeatTimeoutInMs);
+    rebalanceConfig.setMaxAttempts(maxAttempts);
+    rebalanceConfig.setRetryInitialDelayInMs(retryInitialDelayInMs);
     rebalanceConfig.setUpdateTargetTier(updateTargetTier);
     String rebalanceJobId = 
TableRebalancer.createUniqueRebalanceJobIdentifier();
 
@@ -666,6 +678,79 @@ public class PinotTableRestletResource {
     }
   }
 
+  @DELETE
+  @Produces(MediaType.APPLICATION_JSON)
+  @Authenticate(AccessType.UPDATE)
+  @Path("/tables/{tableName}/rebalance")
+  @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = 
Actions.Table.CANCEL_REBALANCE)
+  @ApiOperation(value = "Cancel all rebalance jobs for the given table, and 
noop if no rebalance is running", notes =
+      "Cancel all rebalance jobs for the given table, and noop if no rebalance 
is running")
+  public List<String> cancelRebalance(
+      @ApiParam(value = "Name of the table to rebalance", required = true) 
@PathParam("tableName") String tableName,
+      @ApiParam(value = "OFFLINE|REALTIME", required = true) 
@QueryParam("type") String tableTypeStr) {
+    String tableNameWithType = constructTableNameWithType(tableName, 
tableTypeStr);
+    List<String> cancelledJobIds = new ArrayList<>();
+    boolean updated =
+        _pinotHelixResourceManager.updateJobsForTable(tableNameWithType, 
ControllerJobType.TABLE_REBALANCE,
+            jobMetadata -> {
+              String jobId = 
jobMetadata.get(CommonConstants.ControllerJob.JOB_ID);
+              try {
+                String jobStatsInStr = 
jobMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS);
+                TableRebalanceProgressStats jobStats =
+                    JsonUtils.stringToObject(jobStatsInStr, 
TableRebalanceProgressStats.class);
+                if (jobStats.getStatus() != 
RebalanceResult.Status.IN_PROGRESS) {
+                  return;
+                }
+                cancelledJobIds.add(jobId);
+                LOGGER.info("Cancel rebalance job: {} for table: {}", jobId, 
tableNameWithType);
+                jobStats.setStatus(RebalanceResult.Status.CANCELLED);
+                
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
+                    JsonUtils.objectToString(jobStats));
+              } catch (Exception e) {
+                LOGGER.error("Failed to cancel rebalance job: {} for table: 
{}", jobId, tableNameWithType, e);
+              }
+            });
+    LOGGER.info("Tried to cancel existing jobs at best effort and done: {}", 
updated);
+    return cancelledJobIds;
+  }
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Authenticate(AccessType.UPDATE)
+  @Path("/rebalanceStatus/{jobId}")
+  @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.GET_REBALANCE_STATUS)
+  @ApiOperation(value = "Gets detailed stats of a rebalance operation",
+      notes = "Gets detailed stats of a rebalance operation")
+  public ServerRebalanceJobStatusResponse rebalanceStatus(
+      @ApiParam(value = "Rebalance Job Id", required = true) 
@PathParam("jobId") String jobId)
+      throws JsonProcessingException {
+    Map<String, String> controllerJobZKMetadata =
+        _pinotHelixResourceManager.getControllerJobZKMetadata(jobId, 
ControllerJobType.TABLE_REBALANCE);
+
+    if (controllerJobZKMetadata == null) {
+      throw new ControllerApplicationException(LOGGER, "Failed to find 
controller job id: " + jobId,
+          Response.Status.NOT_FOUND);
+    }
+    ServerRebalanceJobStatusResponse serverRebalanceJobStatusResponse = new 
ServerRebalanceJobStatusResponse();
+    TableRebalanceProgressStats tableRebalanceProgressStats = 
JsonUtils.stringToObject(
+        
controllerJobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS),
+        TableRebalanceProgressStats.class);
+    
serverRebalanceJobStatusResponse.setTableRebalanceProgressStats(tableRebalanceProgressStats);
+
+    long timeSinceStartInSecs = 0L;
+    if (RebalanceResult.Status.DONE != 
tableRebalanceProgressStats.getStatus()) {
+      timeSinceStartInSecs = (System.currentTimeMillis() - 
tableRebalanceProgressStats.getStartTimeMs()) / 1000;
+    }
+    
serverRebalanceJobStatusResponse.setTimeElapsedSinceStartInSeconds(timeSinceStartInSecs);
+
+    String jobCtxInStr = 
controllerJobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT);
+    if (StringUtils.isNotEmpty(jobCtxInStr)) {
+      TableRebalanceContext jobCtx = JsonUtils.stringToObject(jobCtxInStr, 
TableRebalanceContext.class);
+      serverRebalanceJobStatusResponse.setTableRebalanceContext(jobCtx);
+    }
+    return serverRebalanceJobStatusResponse;
+  }
+
   @GET
   @Produces(MediaType.APPLICATION_JSON)
   @Path("/tables/{tableName}/state")
@@ -724,37 +809,6 @@ public class PinotTableRestletResource {
     }
   }
 
-  @GET
-  @Produces(MediaType.APPLICATION_JSON)
-  @Authenticate(AccessType.UPDATE)
-  @Path("/rebalanceStatus/{jobId}")
-  @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.GET_REBALANCE_STATUS)
-  @ApiOperation(value = "Gets detailed stats of a rebalance operation",
-      notes = "Gets detailed stats of a rebalance operation")
-  public ServerRebalanceJobStatusResponse rebalanceStatus(
-      @ApiParam(value = "Rebalance Job Id", required = true) 
@PathParam("jobId") String jobId)
-      throws JsonProcessingException {
-    Map<String, String> controllerJobZKMetadata =
-        _pinotHelixResourceManager.getControllerJobZKMetadata(jobId, 
ControllerJobType.TABLE_REBALANCE);
-
-    if (controllerJobZKMetadata == null) {
-      throw new ControllerApplicationException(LOGGER, "Failed to find 
controller job id: " + jobId,
-          Response.Status.NOT_FOUND);
-    }
-    TableRebalanceProgressStats tableRebalanceProgressStats = 
JsonUtils.stringToObject(
-        
controllerJobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS),
-        TableRebalanceProgressStats.class);
-    long timeSinceStartInSecs = 0L;
-    if 
(!RebalanceResult.Status.DONE.toString().equals(tableRebalanceProgressStats.getStatus()))
 {
-      timeSinceStartInSecs = (System.currentTimeMillis() - 
tableRebalanceProgressStats.getStartTimeMs()) / 1000;
-    }
-
-    ServerRebalanceJobStatusResponse serverRebalanceJobStatusResponse = new 
ServerRebalanceJobStatusResponse();
-    
serverRebalanceJobStatusResponse.setTableRebalanceProgressStats(tableRebalanceProgressStats);
-    
serverRebalanceJobStatusResponse.setTimeElapsedSinceStartInSeconds(timeSinceStartInSecs);
-    return serverRebalanceJobStatusResponse;
-  }
-
   @GET
   @Path("/tables/{tableName}/stats")
   @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = 
Actions.Table.GET_METADATA)
@@ -987,10 +1041,10 @@ public class PinotTableRestletResource {
     }
     Map<String, Map<String, String>> result = new HashMap<>();
     for (String tableNameWithType : tableNamesWithType) {
-      
result.putAll(_pinotHelixResourceManager.getAllJobsForTable(tableNameWithType,
-          jobTypesToFilter == null ? validJobTypes : jobTypesToFilter));
+      result.putAll(_pinotHelixResourceManager.getAllJobs(jobTypesToFilter == 
null ? validJobTypes : jobTypesToFilter,
+          jobMetadata -> 
jobMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE)
+              .equals(tableNameWithType)));
     }
-
     return result;
   }
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ServerRebalanceJobStatusResponse.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ServerRebalanceJobStatusResponse.java
index 3018b4877e..3551c9c811 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ServerRebalanceJobStatusResponse.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ServerRebalanceJobStatusResponse.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.controller.api.resources;
 
+import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceContext;
 import 
org.apache.pinot.controller.helix.core.rebalance.TableRebalanceProgressStats;
 
 
@@ -26,6 +27,8 @@ public class ServerRebalanceJobStatusResponse {
 
   private TableRebalanceProgressStats _tableRebalanceProgressStats;
 
+  private TableRebalanceContext _tableRebalanceContext;
+
   public void setTimeElapsedSinceStartInSeconds(Long timeElapsedSinceStart) {
     _timeElapsedSinceStartInSeconds = timeElapsedSinceStart;
   }
@@ -41,4 +44,12 @@ public class ServerRebalanceJobStatusResponse {
   public long getTimeElapsedSinceStartInSeconds() {
     return _timeElapsedSinceStartInSeconds;
   }
+
+  public TableRebalanceContext getTableRebalanceContext() {
+    return _tableRebalanceContext;
+  }
+
+  public void setTableRebalanceContext(TableRebalanceContext 
tableRebalanceContext) {
+    _tableRebalanceContext = tableRebalanceContext;
+  }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index bfb4918bf7..a85401d706 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -48,7 +48,9 @@ import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import javax.ws.rs.BadRequestException;
@@ -86,7 +88,6 @@ import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
 import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.assignment.InstancePartitionsUtils;
@@ -146,6 +147,7 @@ import 
org.apache.pinot.controller.helix.core.lineage.LineageManagerFactory;
 import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
+import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceContext;
 import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
 import 
org.apache.pinot.controller.helix.core.rebalance.ZkBasedTableRebalanceObserver;
 import org.apache.pinot.controller.helix.core.util.ZKMetadataUtils;
@@ -1981,43 +1983,38 @@ public class PinotHelixResourceManager {
   /**
    * Returns the ZK metdata for the given jobId and jobType
    * @param jobId the id of the job
-   * @param jobType Job Path
+   * @param jobType the type of the job to figure out where job metadata is 
kept in ZK
    * @return Map representing the job's ZK properties
    */
   @Nullable
   public Map<String, String> getControllerJobZKMetadata(String jobId, 
ControllerJobType jobType) {
-    String controllerJobResourcePath = 
ZKMetadataProvider.constructPropertyStorePathForControllerJob(jobType);
-    ZNRecord taskResourceZnRecord = 
_propertyStore.get(controllerJobResourcePath, null, AccessOption.PERSISTENT);
-    if (taskResourceZnRecord != null) {
-      return taskResourceZnRecord.getMapFields().get(jobId);
-    }
-    return null;
+    String jobResourcePath = 
ZKMetadataProvider.constructPropertyStorePathForControllerJob(jobType);
+    ZNRecord jobsZnRecord = _propertyStore.get(jobResourcePath, null, 
AccessOption.PERSISTENT);
+    return jobsZnRecord != null ? jobsZnRecord.getMapFields().get(jobId) : 
null;
   }
 
   /**
-   * Returns a Map of jobId to job's ZK metadata for the given table
-   * @param tableNameWithType the table for which jobs are to be fetched
+   * Returns a Map of jobId to job's ZK metadata that passes the checker, like 
for specific tables.
    * @return A Map of jobId to job properties
    */
-  public Map<String, Map<String, String>> getAllJobsForTable(String 
tableNameWithType,
-      Set<ControllerJobType> jobTypes) {
+  public Map<String, Map<String, String>> getAllJobs(Set<ControllerJobType> 
jobTypes,
+      Predicate<Map<String, String>> jobMetadataChecker) {
     Map<String, Map<String, String>> controllerJobs = new HashMap<>();
     for (ControllerJobType jobType : jobTypes) {
-      String jobsResourcePath = 
ZKMetadataProvider.constructPropertyStorePathForControllerJob(jobType);
-      try {
-        ZNRecord znRecord = _propertyStore.get(jobsResourcePath, null, -1);
-        if (znRecord != null) {
-          Map<String, Map<String, String>> tableJobsRecord = 
znRecord.getMapFields();
-          for (Map.Entry<String, Map<String, String>> tableEntry : 
tableJobsRecord.entrySet()) {
-            if 
(tableEntry.getValue().get(CommonConstants.ControllerJob.JOB_TYPE).equals(jobType.name())
-                && 
tableEntry.getValue().get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE)
-                .equals(tableNameWithType)) {
-              controllerJobs.put(tableEntry.getKey(), tableEntry.getValue());
-            }
-          }
+      String jobResourcePath = 
ZKMetadataProvider.constructPropertyStorePathForControllerJob(jobType);
+      ZNRecord jobsZnRecord = _propertyStore.get(jobResourcePath, null, 
AccessOption.PERSISTENT);
+      if (jobsZnRecord == null) {
+        continue;
+      }
+      Map<String, Map<String, String>> jobMetadataMap = 
jobsZnRecord.getMapFields();
+      for (Map.Entry<String, Map<String, String>> jobMetadataEntry : 
jobMetadataMap.entrySet()) {
+        String jobId = jobMetadataEntry.getKey();
+        Map<String, String> jobMetadata = jobMetadataEntry.getValue();
+        
Preconditions.checkState(jobMetadata.get(CommonConstants.ControllerJob.JOB_TYPE).equals(jobType.name()),
+            "Got unexpected jobType: %s at jobResourcePath: %s with jobId: 
%s", jobType.name(), jobResourcePath, jobId);
+        if (jobMetadataChecker.test(jobMetadata)) {
+          controllerJobs.put(jobId, jobMetadata);
         }
-      } catch (ZkNoNodeException e) {
-        LOGGER.warn("Could not find controller job node for table : {} 
jobType: {}", tableNameWithType, jobType, e);
       }
     }
     return controllerJobs;
@@ -2041,8 +2038,7 @@ public class PinotHelixResourceManager {
     jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
Long.toString(jobSubmissionTimeMs));
     jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT, 
Integer.toString(numMessagesSent));
     
jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME, 
segmentName);
-    return addControllerJobToZK(jobId, jobMetadata,
-        
ZKMetadataProvider.constructPropertyStorePathForControllerJob(ControllerJobType.RELOAD_SEGMENT));
+    return addControllerJobToZK(jobId, jobMetadata, 
ControllerJobType.RELOAD_SEGMENT);
   }
 
   public boolean addNewForceCommitJob(String tableNameWithType, String jobId, 
long jobSubmissionTimeMs,
@@ -2055,8 +2051,7 @@ public class PinotHelixResourceManager {
     jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
Long.toString(jobSubmissionTimeMs));
     
jobMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST,
         JsonUtils.objectToString(consumingSegmentsCommitted));
-    return addControllerJobToZK(jobId, jobMetadata,
-        
ZKMetadataProvider.constructPropertyStorePathForControllerJob(ControllerJobType.FORCE_COMMIT));
+    return addControllerJobToZK(jobId, jobMetadata, 
ControllerJobType.FORCE_COMMIT);
   }
 
   /**
@@ -2075,32 +2070,81 @@ public class PinotHelixResourceManager {
     jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobType.RELOAD_SEGMENT.toString());
     jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
Long.toString(jobSubmissionTimeMs));
     jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT, 
Integer.toString(numberOfMessagesSent));
-    return addControllerJobToZK(jobId, jobMetadata,
-        
ZKMetadataProvider.constructPropertyStorePathForControllerJob(ControllerJobType.RELOAD_SEGMENT));
+    return addControllerJobToZK(jobId, jobMetadata, 
ControllerJobType.RELOAD_SEGMENT);
   }
 
-  public boolean addControllerJobToZK(String jobId, Map<String, String> 
jobMetadata, String jobResourcePath) {
+  /**
+   * Adds a new job metadata for controller job like table rebalance or reload 
into ZK
+   * @param jobId job's UUID
+   * @param jobMetadata the job metadata
+   * @param jobType the type of the job to figure out where job metadata is 
kept in ZK
+   * @return boolean representing success / failure of the ZK write step
+   */
+  public boolean addControllerJobToZK(String jobId, Map<String, String> 
jobMetadata, ControllerJobType jobType) {
+    return addControllerJobToZK(jobId, jobMetadata, jobType, prev -> true);
+  }
+
+  /**
+   * Adds a new job metadata for controller job like table rebalance or reload 
into ZK
+   * @param jobId job's UUID
+   * @param jobMetadata the job metadata
+   * @param jobType the type of the job to figure out where job metadata is 
kept in ZK
+   * @param prevJobMetadataChecker to check the previous job metadata before 
adding new one
+   * @return boolean representing success / failure of the ZK write step
+   */
+  public boolean addControllerJobToZK(String jobId, Map<String, String> 
jobMetadata, ControllerJobType jobType,
+      Predicate<Map<String, String>> prevJobMetadataChecker) {
     
Preconditions.checkState(jobMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)
 != null,
         "Submission Time in JobMetadata record not set. Cannot expire these 
records");
+    String jobResourcePath = 
ZKMetadataProvider.constructPropertyStorePathForControllerJob(jobType);
     Stat stat = new Stat();
-    ZNRecord tableJobsZnRecord = _propertyStore.get(jobResourcePath, stat, 
AccessOption.PERSISTENT);
-    if (tableJobsZnRecord != null) {
-      Map<String, Map<String, String>> tasks = 
tableJobsZnRecord.getMapFields();
-      tasks.put(jobId, jobMetadata);
-      if (tasks.size() > 
CommonConstants.ControllerJob.MAXIMUM_CONTROLLER_JOBS_IN_ZK) {
-        tasks = tasks.entrySet().stream().sorted((v1, v2) -> Long.compare(
+    ZNRecord jobsZnRecord = _propertyStore.get(jobResourcePath, stat, 
AccessOption.PERSISTENT);
+    if (jobsZnRecord != null) {
+      Map<String, Map<String, String>> jobMetadataMap = 
jobsZnRecord.getMapFields();
+      Map<String, String> prevJobMetadata = jobMetadataMap.get(jobId);
+      if (!prevJobMetadataChecker.test(prevJobMetadata)) {
+        return false;
+      }
+      jobMetadataMap.put(jobId, jobMetadata);
+      if (jobMetadataMap.size() > 
CommonConstants.ControllerJob.MAXIMUM_CONTROLLER_JOBS_IN_ZK) {
+        jobMetadataMap = jobMetadataMap.entrySet().stream().sorted((v1, v2) -> 
Long.compare(
                 
Long.parseLong(v2.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)),
                 
Long.parseLong(v1.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS))))
             .collect(Collectors.toList()).subList(0, 
CommonConstants.ControllerJob.MAXIMUM_CONTROLLER_JOBS_IN_ZK)
             .stream().collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
       }
-      tableJobsZnRecord.setMapFields(tasks);
-      return _propertyStore.set(jobResourcePath, tableJobsZnRecord, 
stat.getVersion(), AccessOption.PERSISTENT);
+      jobsZnRecord.setMapFields(jobMetadataMap);
+      return _propertyStore.set(jobResourcePath, jobsZnRecord, 
stat.getVersion(), AccessOption.PERSISTENT);
     } else {
-      tableJobsZnRecord = new ZNRecord(jobResourcePath);
-      tableJobsZnRecord.setMapField(jobId, jobMetadata);
-      return _propertyStore.set(jobResourcePath, tableJobsZnRecord, 
AccessOption.PERSISTENT);
+      jobsZnRecord = new ZNRecord(jobResourcePath);
+      jobsZnRecord.setMapField(jobId, jobMetadata);
+      return _propertyStore.set(jobResourcePath, jobsZnRecord, 
AccessOption.PERSISTENT);
+    }
+  }
+
+  /**
+   * Update existing job metadata belong to the table
+   * @param tableNameWithType whose job metadata to be updated
+   * @param jobType the type of the job to figure out where job metadata is 
kept in ZK
+   * @param updater to modify the job metadata in place
+   * @return boolean representing success / failure of the ZK write step
+   */
+  public boolean updateJobsForTable(String tableNameWithType, 
ControllerJobType jobType,
+      Consumer<Map<String, String>> updater) {
+    String jobResourcePath = 
ZKMetadataProvider.constructPropertyStorePathForControllerJob(jobType);
+    Stat stat = new Stat();
+    ZNRecord jobsZnRecord = _propertyStore.get(jobResourcePath, stat, 
AccessOption.PERSISTENT);
+    if (jobsZnRecord == null) {
+      return true;
+    }
+    Map<String, Map<String, String>> jobMetadataMap = 
jobsZnRecord.getMapFields();
+    for (Map<String, String> jobMetadata : jobMetadataMap.values()) {
+      if 
(jobMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE).equals(tableNameWithType))
 {
+        updater.accept(jobMetadata);
+      }
     }
+    jobsZnRecord.setMapFields(jobMetadataMap);
+    return _propertyStore.set(jobResourcePath, jobsZnRecord, 
stat.getVersion(), AccessOption.PERSISTENT);
   }
 
   @VisibleForTesting
@@ -3112,12 +3156,19 @@ public class PinotHelixResourceManager {
       throw new TableNotFoundException("Failed to find table config for table: 
" + tableNameWithType);
     }
     Preconditions.checkState(rebalanceJobId != null, "RebalanceId not 
populated in the rebalanceConfig");
-    if (rebalanceConfig.isUpdateTargetTier()) {
-      updateTargetTier(rebalanceJobId, tableNameWithType, tableConfig);
-    }
     ZkBasedTableRebalanceObserver zkBasedTableRebalanceObserver = null;
     if (trackRebalanceProgress) {
-      zkBasedTableRebalanceObserver = new 
ZkBasedTableRebalanceObserver(tableNameWithType, rebalanceJobId, this);
+      zkBasedTableRebalanceObserver = new 
ZkBasedTableRebalanceObserver(tableNameWithType, rebalanceJobId,
+          TableRebalanceContext.forInitialAttempt(rebalanceJobId, 
rebalanceConfig), this);
+    }
+    return rebalanceTable(tableNameWithType, tableConfig, rebalanceJobId, 
rebalanceConfig,
+        zkBasedTableRebalanceObserver);
+  }
+
+  public RebalanceResult rebalanceTable(String tableNameWithType, TableConfig 
tableConfig, String rebalanceJobId,
+      RebalanceConfig rebalanceConfig, @Nullable ZkBasedTableRebalanceObserver 
zkBasedTableRebalanceObserver) {
+    if (rebalanceConfig.isUpdateTargetTier()) {
+      updateTargetTier(rebalanceJobId, tableNameWithType, tableConfig);
     }
     TableRebalancer tableRebalancer =
         new TableRebalancer(_helixZkManager, zkBasedTableRebalanceObserver, 
_controllerMetrics);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/NoOpTableRebalanceObserver.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/NoOpTableRebalanceObserver.java
index 9bb9179c90..a435e35ead 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/NoOpTableRebalanceObserver.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/NoOpTableRebalanceObserver.java
@@ -36,4 +36,14 @@ public class NoOpTableRebalanceObserver implements 
TableRebalanceObserver {
   @Override
   public void onError(String errorMsg) {
   }
+
+  @Override
+  public boolean isStopped() {
+    return false;
+  }
+
+  @Override
+  public RebalanceResult.Status getStopStatus() {
+    throw new UnsupportedOperationException();
+  }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java
new file mode 100644
index 0000000000..93e4b04dcb
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
+import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.common.metrics.ControllerMeter;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Periodic task to check whether a user triggered rebalance job is completed 
or not, and retry if failed. The retry
+ * job is started with the same rebalance configs provided by the user and 
does best effort to stop the other jobs
+ * for the same table. This task can be configured to just check failures and 
report metrics, and not to do retry.
+ */
+public class RebalanceChecker extends ControllerPeriodicTask<Void> {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RebalanceChecker.class);
+  private static final double RETRY_DELAY_SCALE_FACTOR = 2.0;
+  private final ExecutorService _executorService;
+
+  public RebalanceChecker(PinotHelixResourceManager pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, ControllerConf config, 
ControllerMetrics controllerMetrics,
+      ExecutorService executorService) {
+    super(RebalanceChecker.class.getSimpleName(), 
config.getRebalanceCheckerFrequencyInSeconds(),
+        config.getRebalanceCheckerInitialDelayInSeconds(), 
pinotHelixResourceManager, leadControllerManager,
+        controllerMetrics);
+    _executorService = executorService;
+  }
+
+  @Override
+  protected void processTables(List<String> tableNamesWithType, Properties 
periodicTaskProperties) {
+    int numTables = tableNamesWithType.size();
+    LOGGER.info("Processing {} tables in task: {}", numTables, _taskName);
+    int numTablesProcessed = retryRebalanceTables(new 
HashSet<>(tableNamesWithType));
+    
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED,
 _taskName,
+        numTablesProcessed);
+    LOGGER.info("Finish processing {}/{} tables in task: {}", 
numTablesProcessed, numTables, _taskName);
+  }
+
+  /**
+   * Rare but the task may be executed by more than one threads because user 
can trigger the periodic task to run
+   * immediately, in addition to the one scheduled to run periodically. So 
make this method synchronized to be simple.
+   */
+  private synchronized int retryRebalanceTables(Set<String> 
tableNamesWithType) {
+    // Get all jobMetadata for all the given tables with a single ZK read.
+    Map<String, Map<String, String>> allJobMetadataByJobId =
+        
_pinotHelixResourceManager.getAllJobs(Collections.singleton(ControllerJobType.TABLE_REBALANCE),
+            jobMetadata -> tableNamesWithType.contains(
+                
jobMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE)));
+    Map<String, Map<String, Map<String, String>>> tableJobMetadataMap = new 
HashMap<>();
+    allJobMetadataByJobId.forEach((jobId, jobMetadata) -> {
+      String tableNameWithType = 
jobMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE);
+      tableJobMetadataMap.computeIfAbsent(tableNameWithType, k -> new 
HashMap<>()).put(jobId, jobMetadata);
+    });
+    int numTablesProcessed = 0;
+    for (String tableNameWithType : tableNamesWithType) {
+      try {
+        LOGGER.info("Start to retry rebalance for table: {}", 
tableNameWithType);
+        // Get all rebalance jobs as tracked in ZK for this table to check if 
there is any failure to retry.
+        Map<String, Map<String, String>> allJobMetadata = 
tableJobMetadataMap.get(tableNameWithType);
+        if (allJobMetadata.isEmpty()) {
+          LOGGER.info("No rebalance job has been triggered for table: {}. Skip 
retry", tableNameWithType);
+          continue;
+        }
+        retryRebalanceTable(tableNameWithType, allJobMetadata);
+        numTablesProcessed++;
+      } catch (Exception e) {
+        LOGGER.error("Failed to retry rebalance for table: {}", 
tableNameWithType, e);
+        _controllerMetrics.addMeteredTableValue(tableNameWithType + "." + 
_taskName,
+            ControllerMeter.PERIODIC_TASK_ERROR, 1L);
+      }
+    }
+    return numTablesProcessed;
+  }
+
+  @VisibleForTesting
+  void retryRebalanceTable(String tableNameWithType, Map<String, Map<String, 
String>> allJobMetadata)
+      throws Exception {
+    // Skip retry for the table if rebalance job is still running or has 
completed, in specific:
+    // 1) Skip retry if any rebalance job is actively running. Being actively 
running means the job is at IN_PROGRESS
+    // status, and has updated its status kept in ZK within the heartbeat 
timeout. It's possible that more than one
+    // rebalance jobs are running for the table, but that's fine with 
idempotent rebalance algorithm.
+    // 2) Skip retry if the most recently started rebalance job has completed 
with DONE or NO_OP. It's possible that
+    // jobs started earlier may be still running, but they are ignored here.
+    //
+    // Otherwise, we can get a list of failed rebalance jobs, i.e. those at 
FAILED status; or IN_PROGRESS status but
+    // haven't updated their status kept in ZK within the heartbeat timeout. 
For those candidate jobs to retry:
+    // 1) Firstly, group them by the original jobIds they retry for so that we 
can skip those exceeded maxRetry.
+    // 2) For the remaining jobs, we take the one started most recently and 
retry it with its original configs.
+    // 3) If configured, we can abort the other rebalance jobs for the table 
by setting their status to FAILED.
+    Map<String/*original jobId*/, Set<Pair<TableRebalanceContext/*job 
attempts*/, Long
+        /*startTime*/>>> candidateJobs = getCandidateJobs(tableNameWithType, 
allJobMetadata);
+    if (candidateJobs.isEmpty()) {
+      LOGGER.info("Found no failed rebalance jobs for table: {}. Skip retry", 
tableNameWithType);
+      return;
+    }
+    _controllerMetrics.addMeteredTableValue(tableNameWithType, 
ControllerMeter.TABLE_REBALANCE_FAILURE_DETECTED, 1L);
+    Pair<TableRebalanceContext, Long> jobContextAndStartTime = 
getLatestJob(candidateJobs);
+    if (jobContextAndStartTime == null) {
+      LOGGER.info("Rebalance has been retried too many times for table: {}. 
Skip retry", tableNameWithType);
+      _controllerMetrics.addMeteredTableValue(tableNameWithType, 
ControllerMeter.TABLE_REBALANCE_RETRY_TOO_MANY_TIMES,
+          1L);
+      return;
+    }
+    TableRebalanceContext jobCtx = jobContextAndStartTime.getLeft();
+    String prevJobId = jobCtx.getJobId();
+    RebalanceConfig rebalanceConfig = jobCtx.getConfig();
+    long jobStartTimeMs = jobContextAndStartTime.getRight();
+    long retryDelayMs = 
getRetryDelayInMs(rebalanceConfig.getRetryInitialDelayInMs(), 
jobCtx.getAttemptId());
+    if (jobStartTimeMs + retryDelayMs > System.currentTimeMillis()) {
+      LOGGER.info("Delay retry for failed rebalance job: {} that started at: 
{}, by: {}ms", prevJobId, jobStartTimeMs,
+          retryDelayMs);
+      return;
+    }
+    abortExistingJobs(tableNameWithType, _pinotHelixResourceManager);
+    // Get tableConfig only when the table needs to retry rebalance, and get 
it before submitting rebalance to another
+    // thread, in order to avoid unnecessary ZK reads and making too many ZK 
reads in a short time.
+    TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+    Preconditions.checkState(tableConfig != null, "Failed to find table config 
for table: {}", tableNameWithType);
+    _executorService.submit(() -> {
+      // Retry rebalance in another thread as rebalance can take time.
+      try {
+        retryRebalanceTableWithContext(tableNameWithType, tableConfig, jobCtx);
+      } catch (Throwable t) {
+        LOGGER.error("Failed to retry rebalance for table: {} asynchronously", 
tableNameWithType, t);
+      }
+    });
+  }
+
+  private void retryRebalanceTableWithContext(String tableNameWithType, 
TableConfig tableConfig,
+      TableRebalanceContext jobCtx) {
+    String prevJobId = jobCtx.getJobId();
+    RebalanceConfig rebalanceConfig = jobCtx.getConfig();
+    TableRebalanceContext retryCtx =
+        TableRebalanceContext.forRetry(jobCtx.getOriginalJobId(), 
rebalanceConfig, jobCtx.getAttemptId() + 1);
+    String attemptJobId = retryCtx.getJobId();
+    LOGGER.info("Retry rebalance job: {} for table: {} with attempt job: {}", 
prevJobId, tableNameWithType,
+        attemptJobId);
+    _controllerMetrics.addMeteredTableValue(tableNameWithType, 
ControllerMeter.TABLE_REBALANCE_RETRY, 1L);
+    ZkBasedTableRebalanceObserver observer =
+        new ZkBasedTableRebalanceObserver(tableNameWithType, attemptJobId, 
retryCtx, _pinotHelixResourceManager);
+    RebalanceResult result =
+        _pinotHelixResourceManager.rebalanceTable(tableNameWithType, 
tableConfig, attemptJobId, rebalanceConfig,
+            observer);
+    LOGGER.info("New attempt: {} for table: {} is done with result status: 
{}", attemptJobId, tableNameWithType,
+        result.getStatus());
+  }
+
+  @VisibleForTesting
+  static long getRetryDelayInMs(long initDelayMs, int attemptId) {
+    // TODO: Just exponential backoff by factor 2 for now. Can add other retry 
polices as needed.
+    // The attemptId starts from 1, so minus one as the exponent.
+    double minDelayMs = initDelayMs * Math.pow(RETRY_DELAY_SCALE_FACTOR, 
attemptId - 1);
+    double maxDelayMs = minDelayMs * RETRY_DELAY_SCALE_FACTOR;
+    return RandomUtils.nextLong((long) minDelayMs, (long) maxDelayMs);
+  }
+
+  private static void abortExistingJobs(String tableNameWithType, 
PinotHelixResourceManager pinotHelixResourceManager) {
+    boolean updated = 
pinotHelixResourceManager.updateJobsForTable(tableNameWithType, 
ControllerJobType.TABLE_REBALANCE,
+        jobMetadata -> {
+          String jobId = jobMetadata.get(CommonConstants.ControllerJob.JOB_ID);
+          try {
+            String jobStatsInStr = 
jobMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS);
+            TableRebalanceProgressStats jobStats =
+                JsonUtils.stringToObject(jobStatsInStr, 
TableRebalanceProgressStats.class);
+            if (jobStats.getStatus() != RebalanceResult.Status.IN_PROGRESS) {
+              return;
+            }
+            LOGGER.info("Abort rebalance job: {} for table: {}", jobId, 
tableNameWithType);
+            jobStats.setStatus(RebalanceResult.Status.ABORTED);
+            
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
+                JsonUtils.objectToString(jobStats));
+          } catch (Exception e) {
+            LOGGER.error("Failed to abort rebalance job: {} for table: {}", 
jobId, tableNameWithType, e);
+          }
+        });
+    LOGGER.info("Tried to abort existing jobs at best effort and done: {}", 
updated);
+  }
+
+  @VisibleForTesting
+  static Pair<TableRebalanceContext, Long> getLatestJob(
+      Map<String, Set<Pair<TableRebalanceContext, Long>>> candidateJobs) {
+    Pair<TableRebalanceContext, Long> candidateJobRun = null;
+    for (Map.Entry<String, Set<Pair<TableRebalanceContext, Long>>> entry : 
candidateJobs.entrySet()) {
+      // The job configs from all retry jobs are same, as the same set of job 
configs is used to do retry.
+      // The job metadata kept in ZK is cleaned by submission time order 
gradually, so we can't compare Set.size()
+      // against maxAttempts, but check retryNum of each run to see if retries 
have exceeded limit.
+      Set<Pair<TableRebalanceContext, Long>> jobRuns = entry.getValue();
+      int maxAttempts = 
jobRuns.iterator().next().getLeft().getConfig().getMaxAttempts();
+      Pair<TableRebalanceContext, Long> latestJobRun = null;
+      for (Pair<TableRebalanceContext, Long> jobRun : jobRuns) {
+        if (jobRun.getLeft().getAttemptId() >= maxAttempts) {
+          latestJobRun = null;
+          break;
+        }
+        if (latestJobRun == null || latestJobRun.getRight() < 
jobRun.getRight()) {
+          latestJobRun = jobRun;
+        }
+      }
+      if (latestJobRun == null) {
+        LOGGER.info("Rebalance job: {} had exceeded maxAttempts: {}. Skip 
retry", entry.getKey(), maxAttempts);
+        continue;
+      }
+      if (candidateJobRun == null || candidateJobRun.getRight() < 
latestJobRun.getRight()) {
+        candidateJobRun = latestJobRun;
+      }
+    }
+    return candidateJobRun;
+  }
+
+  @VisibleForTesting
+  static Map<String, Set<Pair<TableRebalanceContext, Long>>> 
getCandidateJobs(String tableNameWithType,
+      Map<String, Map<String, String>> allJobMetadata)
+      throws Exception {
+    long nowMs = System.currentTimeMillis();
+    Map<String, Set<Pair<TableRebalanceContext, Long>>> candidates = new 
HashMap<>();
+    // If the job started most recently has already completed, then skip retry 
for the table.
+    Pair<String, Long> latestStartedJob = null;
+    Pair<String, Long> latestCompletedJob = null;
+    // The processing order of job metadata from the given Map is not 
deterministic. Track the completed original
+    // jobs so that we can simply skip the retry jobs belonging to the 
completed original jobs.
+    Map<String, String> completedOriginalJobs = new HashMap<>();
+    Set<String> cancelledOriginalJobs = new HashSet<>();
+    for (Map.Entry<String, Map<String, String>> entry : 
allJobMetadata.entrySet()) {
+      String jobId = entry.getKey();
+      Map<String, String> jobMetadata = entry.getValue();
+      long statsUpdatedAt = 
Long.parseLong(jobMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS));
+      String jobStatsInStr = 
jobMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS);
+      if (StringUtils.isEmpty(jobStatsInStr)) {
+        LOGGER.info("Skip rebalance job: {} as it has no job progress stats", 
jobId);
+        continue;
+      }
+      String jobCtxInStr = 
jobMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT);
+      if (StringUtils.isEmpty(jobCtxInStr)) {
+        LOGGER.info("Skip rebalance job: {} as it has no job context", jobId);
+        continue;
+      }
+      TableRebalanceProgressStats jobStats = 
JsonUtils.stringToObject(jobStatsInStr, TableRebalanceProgressStats.class);
+      TableRebalanceContext jobCtx = JsonUtils.stringToObject(jobCtxInStr, 
TableRebalanceContext.class);
+      long jobStartTimeMs = jobStats.getStartTimeMs();
+      if (latestStartedJob == null || latestStartedJob.getRight() < 
jobStartTimeMs) {
+        latestStartedJob = Pair.of(jobId, jobStartTimeMs);
+      }
+      String originalJobId = jobCtx.getOriginalJobId();
+      RebalanceResult.Status jobStatus = jobStats.getStatus();
+      if (jobStatus == RebalanceResult.Status.DONE || jobStatus == 
RebalanceResult.Status.NO_OP) {
+        LOGGER.info("Skip rebalance job: {} as it has completed with status: 
{}", jobId, jobStatus);
+        completedOriginalJobs.put(originalJobId, jobId);
+        if (latestCompletedJob == null || latestCompletedJob.getRight() < 
jobStartTimeMs) {
+          latestCompletedJob = Pair.of(jobId, jobStartTimeMs);
+        }
+        continue;
+      }
+      if (jobStatus == RebalanceResult.Status.FAILED || jobStatus == 
RebalanceResult.Status.ABORTED) {
+        LOGGER.info("Found rebalance job: {} for original job: {} has been 
stopped with status: {}", jobId,
+            originalJobId, jobStatus);
+        candidates.computeIfAbsent(originalJobId, (k) -> new 
HashSet<>()).add(Pair.of(jobCtx, jobStartTimeMs));
+        continue;
+      }
+      if (jobStatus == RebalanceResult.Status.CANCELLED) {
+        LOGGER.info("Found cancelled rebalance job: {} for original job: {}", 
jobId, originalJobId);
+        cancelledOriginalJobs.add(originalJobId);
+        continue;
+      }
+      // Check if an IN_PROGRESS job is still actively running.
+      long heartbeatTimeoutMs = jobCtx.getConfig().getHeartbeatTimeoutInMs();
+      if (nowMs - statsUpdatedAt < heartbeatTimeoutMs) {
+        LOGGER.info("Rebalance job: {} is actively running with status updated 
at: {} within timeout: {}. Skip "
+            + "retry for table: {}", jobId, statsUpdatedAt, 
heartbeatTimeoutMs, tableNameWithType);
+        return Collections.emptyMap();
+      }
+      // The job is considered failed, but it's possible it is still running, 
then we might end up with more than one
+      // rebalance jobs running in parallel for a table. The rebalance 
algorithm is idempotent, so this should be fine
+      // for the correctness.
+      LOGGER.info("Found stuck rebalance job: {} for original job: {}", jobId, 
originalJobId);
+      candidates.computeIfAbsent(originalJobId, (k) -> new 
HashSet<>()).add(Pair.of(jobCtx, jobStartTimeMs));
+    }
+    if (latestCompletedJob != null && 
latestCompletedJob.getLeft().equals(latestStartedJob.getLeft())) {
+      LOGGER.info("Rebalance job: {} started most recently has already done. 
Skip retry for table: {}",
+          latestCompletedJob.getLeft(), tableNameWithType);
+      return Collections.emptyMap();
+    }
+    for (String jobId : cancelledOriginalJobs) {
+      LOGGER.info("Skip original job: {} as it's cancelled", jobId);
+      candidates.remove(jobId);
+    }
+    for (Map.Entry<String, String> entry : completedOriginalJobs.entrySet()) {
+      LOGGER.info("Skip original job: {} as it's completed by attempt: {}", 
entry.getKey(), entry.getValue());
+      candidates.remove(entry.getKey());
+    }
+    return candidates;
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
index 78e3b19bca..94709f065c 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
@@ -84,6 +84,24 @@ public class RebalanceConfig {
   @ApiModelProperty(example = "false")
   private boolean _updateTargetTier = false;
 
+  // Update job status every this interval as heartbeat, to indicate the job 
is still actively running.
+  @JsonProperty("heartbeatIntervalInMs")
+  @ApiModelProperty(example = "300000")
+  private long _heartbeatIntervalInMs = 300000L;
+
+  // The job is considered as failed if not updating its status by this 
timeout, even though it's IN_PROGRESS status.
+  @JsonProperty("heartbeatTimeoutInMs")
+  @ApiModelProperty(example = "3600000")
+  private long _heartbeatTimeoutInMs = 3600000L;
+
+  @JsonProperty("maxAttempts")
+  @ApiModelProperty(example = "3")
+  private int _maxAttempts = 3;
+
+  @JsonProperty("retryInitialDelayInMs")
+  @ApiModelProperty(example = "300000")
+  private long _retryInitialDelayInMs = 300000L;
+
   public boolean isDryRun() {
     return _dryRun;
   }
@@ -164,6 +182,50 @@ public class RebalanceConfig {
     _updateTargetTier = updateTargetTier;
   }
 
+  public long getHeartbeatIntervalInMs() {
+    return _heartbeatIntervalInMs;
+  }
+
+  public void setHeartbeatIntervalInMs(long heartbeatIntervalInMs) {
+    _heartbeatIntervalInMs = heartbeatIntervalInMs;
+  }
+
+  public long getHeartbeatTimeoutInMs() {
+    return _heartbeatTimeoutInMs;
+  }
+
+  public void setHeartbeatTimeoutInMs(long heartbeatTimeoutInMs) {
+    _heartbeatTimeoutInMs = heartbeatTimeoutInMs;
+  }
+
+  public int getMaxAttempts() {
+    return _maxAttempts;
+  }
+
+  public void setMaxAttempts(int maxAttempts) {
+    _maxAttempts = maxAttempts;
+  }
+
+  public long getRetryInitialDelayInMs() {
+    return _retryInitialDelayInMs;
+  }
+
+  public void setRetryInitialDelayInMs(long retryInitialDelayInMs) {
+    _retryInitialDelayInMs = retryInitialDelayInMs;
+  }
+
+  @Override
+  public String toString() {
+    return "RebalanceConfig{" + "_dryRun=" + _dryRun + ", _reassignInstances=" 
+ _reassignInstances
+        + ", _includeConsuming=" + _includeConsuming + ", _bootstrap=" + 
_bootstrap + ", _downtime=" + _downtime
+        + ", _minAvailableReplicas=" + _minAvailableReplicas + ", 
_bestEfforts=" + _bestEfforts
+        + ", _externalViewCheckIntervalInMs=" + _externalViewCheckIntervalInMs
+        + ", _externalViewStabilizationTimeoutInMs=" + 
_externalViewStabilizationTimeoutInMs + ", _updateTargetTier="
+        + _updateTargetTier + ", _heartbeatIntervalInMs=" + 
_heartbeatIntervalInMs + ", _heartbeatTimeoutInMs="
+        + _heartbeatTimeoutInMs + ", _maxAttempts=" + _maxAttempts + ", 
_retryInitialDelayInMs="
+        + _retryInitialDelayInMs + '}';
+  }
+
   public static RebalanceConfig copy(RebalanceConfig cfg) {
     RebalanceConfig rc = new RebalanceConfig();
     rc._dryRun = cfg._dryRun;
@@ -176,6 +238,10 @@ public class RebalanceConfig {
     rc._externalViewCheckIntervalInMs = cfg._externalViewCheckIntervalInMs;
     rc._externalViewStabilizationTimeoutInMs = 
cfg._externalViewStabilizationTimeoutInMs;
     rc._updateTargetTier = cfg._updateTargetTier;
+    rc._heartbeatIntervalInMs = cfg._heartbeatIntervalInMs;
+    rc._heartbeatTimeoutInMs = cfg._heartbeatTimeoutInMs;
+    rc._maxAttempts = cfg._maxAttempts;
+    rc._retryInitialDelayInMs = cfg._retryInitialDelayInMs;
     return rc;
   }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceJobConstants.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceJobConstants.java
index 8070d44972..68314bbba2 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceJobConstants.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceJobConstants.java
@@ -24,4 +24,6 @@ public class RebalanceJobConstants {
 
   // Progress status of the rebalance operartion
   public static final String JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS = 
"REBALANCE_PROGRESS_STATS";
+  // Configs to retry the rebalance operartion
+  public static final String JOB_METADATA_KEY_REBALANCE_CONTEXT = 
"REBALANCE_CONTEXT";
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
index ed3ad624d8..2be7fc7753 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
@@ -87,6 +87,10 @@ public class RebalanceResult {
   }
 
   public enum Status {
-    NO_OP, DONE, FAILED, IN_PROGRESS
+    // FAILED if the job has ended with known exceptions;
+    // ABORTED if the job is stopped by others but retry is still allowed;
+    // CANCELLED if the job is stopped by user, and retry is cancelled too;
+    // UNKNOWN_ERROR if the job hits on an unexpected exception.
+    NO_OP, DONE, FAILED, IN_PROGRESS, ABORTED, CANCELLED, UNKNOWN_ERROR
   }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceContext.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceContext.java
new file mode 100644
index 0000000000..48e43558cc
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceContext.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+/**
+ * Track the job configs and attempt numbers as part of the job ZK metadata to 
retry failed rebalance.
+ */
+public class TableRebalanceContext {
+  private static final int INITIAL_ATTEMPT_ID = 1;
+  private String _jobId;
+  private String _originalJobId;
+  private RebalanceConfig _config;
+  private int _attemptId;
+
+  public static TableRebalanceContext forInitialAttempt(String originalJobId, 
RebalanceConfig config) {
+    return new TableRebalanceContext(originalJobId, config, 
INITIAL_ATTEMPT_ID);
+  }
+
+  public static TableRebalanceContext forRetry(String originalJobId, 
RebalanceConfig config, int attemptId) {
+    return new TableRebalanceContext(originalJobId, config, attemptId);
+  }
+
+  public TableRebalanceContext() {
+    // For JSON deserialization.
+  }
+
+  private TableRebalanceContext(String originalJobId, RebalanceConfig config, 
int attemptId) {
+    _jobId = createAttemptJobId(originalJobId, attemptId);
+    _originalJobId = originalJobId;
+    _config = config;
+    _attemptId = attemptId;
+  }
+
+  public int getAttemptId() {
+    return _attemptId;
+  }
+
+  public void setAttemptId(int attemptId) {
+    _attemptId = attemptId;
+  }
+
+  public String getOriginalJobId() {
+    return _originalJobId;
+  }
+
+  public void setOriginalJobId(String originalJobId) {
+    _originalJobId = originalJobId;
+  }
+
+  public String getJobId() {
+    return _jobId;
+  }
+
+  public void setJobId(String jobId) {
+    _jobId = jobId;
+  }
+
+  public RebalanceConfig getConfig() {
+    return _config;
+  }
+
+  public void setConfig(RebalanceConfig config) {
+    _config = config;
+  }
+
+  @Override
+  public String toString() {
+    return "TableRebalanceContext{" + "_jobId='" + _jobId + '\'' + ", 
_originalJobId='" + _originalJobId + '\''
+        + ", _config=" + _config + ", _attemptId=" + _attemptId + '}';
+  }
+
+  private static String createAttemptJobId(String originalJobId, int 
attemptId) {
+    if (attemptId == INITIAL_ATTEMPT_ID) {
+      return originalJobId;
+    }
+    return originalJobId + "_" + attemptId;
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceObserver.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceObserver.java
index d52dce1f6e..e9c5c299cf 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceObserver.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceObserver.java
@@ -43,4 +43,8 @@ public interface TableRebalanceObserver {
   void onSuccess(String msg);
 
   void onError(String errorMsg);
+
+  boolean isStopped();
+
+  RebalanceResult.Status getStopStatus();
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceProgressStats.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceProgressStats.java
index 6d6f70b5c7..aad369ae33 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceProgressStats.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceProgressStats.java
@@ -42,7 +42,7 @@ public class TableRebalanceProgressStats {
   }
 
   // Done/In_progress/Failed
-  private String _status;
+  private RebalanceResult.Status _status;
   // When did Rebalance start
   private long _startTimeMs;
   // How long did rebalance take
@@ -62,7 +62,7 @@ public class TableRebalanceProgressStats {
     _initialToTargetStateConvergence = new RebalanceStateStats();
   }
 
-  public void setStatus(String status) {
+  public void setStatus(RebalanceResult.Status status) {
     _status = status;
   }
 
@@ -90,7 +90,7 @@ public class TableRebalanceProgressStats {
     _completionStatusMsg = completionStatusMsg;
   }
 
-  public String getStatus() {
+  public RebalanceResult.Status getStatus() {
     return _status;
   }
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index a86c3e27a1..a27c4ffbfb 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -29,6 +29,7 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.ToIntFunction;
 import javax.annotation.Nullable;
@@ -47,6 +48,7 @@ import 
org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.assignment.InstancePartitionsUtils;
 import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.metrics.ControllerTimer;
 import org.apache.pinot.common.tier.PinotServerTierStorage;
 import org.apache.pinot.common.tier.Tier;
 import org.apache.pinot.common.tier.TierFactory;
@@ -138,6 +140,24 @@ public class TableRebalancer {
 
   public RebalanceResult rebalance(TableConfig tableConfig, RebalanceConfig 
rebalanceConfig,
       @Nullable String rebalanceJobId) {
+    long startTime = System.currentTimeMillis();
+    String tableNameWithType = tableConfig.getTableName();
+    RebalanceResult.Status status = RebalanceResult.Status.UNKNOWN_ERROR;
+    try {
+      RebalanceResult result = doRebalance(tableConfig, rebalanceConfig, 
rebalanceJobId);
+      status = result.getStatus();
+      return result;
+    } finally {
+      if (_controllerMetrics != null) {
+        _controllerMetrics.addTimedTableValue(String.format("%s.%s", 
tableNameWithType, status.toString()),
+            ControllerTimer.TABLE_REBALANCE_EXECUTION_TIME_MS, 
System.currentTimeMillis() - startTime,
+            TimeUnit.MILLISECONDS);
+      }
+    }
+  }
+
+  private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig 
rebalanceConfig,
+      @Nullable String rebalanceJobId) {
     long startTimeMs = System.currentTimeMillis();
     String tableNameWithType = tableConfig.getTableName();
     if (rebalanceJobId == null) {
@@ -355,6 +375,11 @@ public class TableRebalancer {
             "For rebalanceId: %s, caught exception while waiting for 
ExternalView to converge for table: %s, "
                 + "aborting the rebalance", rebalanceJobId, tableNameWithType);
         LOGGER.warn(errorMsg, e);
+        if (_tableRebalanceObserver.isStopped()) {
+          return new RebalanceResult(rebalanceJobId, 
_tableRebalanceObserver.getStopStatus(),
+              "Caught exception while waiting for ExternalView to converge: " 
+ e, instancePartitionsMap,
+              tierToInstancePartitionsMap, targetAssignment);
+        }
         _tableRebalanceObserver.onError(errorMsg);
         return new RebalanceResult(rebalanceJobId, 
RebalanceResult.Status.FAILED,
             "Caught exception while waiting for ExternalView to converge: " + 
e, instancePartitionsMap,
@@ -434,6 +459,11 @@ public class TableRebalancer {
       // Record change of current ideal state and the new target
       
_tableRebalanceObserver.onTrigger(TableRebalanceObserver.Trigger.IDEAL_STATE_CHANGE_TRIGGER,
 currentAssignment,
           targetAssignment);
+      if (_tableRebalanceObserver.isStopped()) {
+        return new RebalanceResult(rebalanceJobId, 
_tableRebalanceObserver.getStopStatus(),
+            "Rebalance has stopped already before updating the IdealState", 
instancePartitionsMap,
+            tierToInstancePartitionsMap, targetAssignment);
+      }
       Map<String, Map<String, String>> nextAssignment =
           getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup);
       LOGGER.info("For rebalanceId: {}, got the next assignment for table: {} 
with number of segments to be moved to "
@@ -691,6 +721,11 @@ public class TableRebalancer {
         _tableRebalanceObserver.onTrigger(
             
TableRebalanceObserver.Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER,
             externalView.getRecord().getMapFields(), 
idealState.getRecord().getMapFields());
+        if (_tableRebalanceObserver.isStopped()) {
+          throw new RuntimeException(
+              String.format("Rebalance for table: %s has already stopped with 
status: %s", tableNameWithType,
+                  _tableRebalanceObserver.getStopStatus()));
+        }
         if (isExternalViewConverged(tableNameWithType, 
externalView.getRecord().getMapFields(),
             idealState.getRecord().getMapFields(), bestEfforts, 
segmentsToMonitor)) {
           LOGGER.info("ExternalView converged for table: {}", 
tableNameWithType);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
index 1978df35f3..7b57147ec0 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
@@ -19,10 +19,10 @@
 package org.apache.pinot.controller.helix.core.rebalance;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.spi.utils.CommonConstants;
@@ -30,6 +30,7 @@ import org.apache.pinot.spi.utils.JsonUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * <code>ZkBasedTableRebalanceObserver</code> observes rebalance progress and 
tracks rebalance status,
  * stats in Zookeeper. This will be used to show the progress of rebalance to 
users via rebalanceStatus API.
@@ -39,12 +40,16 @@ public class ZkBasedTableRebalanceObserver implements 
TableRebalanceObserver {
   private final String _tableNameWithType;
   private final String _rebalanceJobId;
   private final PinotHelixResourceManager _pinotHelixResourceManager;
-  private TableRebalanceProgressStats _tableRebalanceProgressStats;
+  private final TableRebalanceProgressStats _tableRebalanceProgressStats;
+  private final TableRebalanceContext _tableRebalanceContext;
+  private long _lastUpdateTimeMs;
   // Keep track of number of updates. Useful during debugging.
   private int _numUpdatesToZk;
+  private boolean _isStopped = false;
+  private RebalanceResult.Status _stopStatus;
 
   public ZkBasedTableRebalanceObserver(String tableNameWithType, String 
rebalanceJobId,
-      PinotHelixResourceManager pinotHelixResourceManager) {
+      TableRebalanceContext tableRebalanceContext, PinotHelixResourceManager 
pinotHelixResourceManager) {
     Preconditions.checkState(tableNameWithType != null, "Table name cannot be 
null");
     Preconditions.checkState(rebalanceJobId != null, "rebalanceId cannot be 
null");
     Preconditions.checkState(pinotHelixResourceManager != null, 
"PinotHelixManager cannot be null");
@@ -52,16 +57,19 @@ public class ZkBasedTableRebalanceObserver implements 
TableRebalanceObserver {
     _rebalanceJobId = rebalanceJobId;
     _pinotHelixResourceManager = pinotHelixResourceManager;
     _tableRebalanceProgressStats = new TableRebalanceProgressStats();
+    _tableRebalanceContext = tableRebalanceContext;
     _numUpdatesToZk = 0;
   }
 
   @Override
   public void onTrigger(Trigger trigger, Map<String, Map<String, String>> 
currentState,
       Map<String, Map<String, String>> targetState) {
+    boolean updatedStatsInZk = false;
     switch (trigger) {
       case START_TRIGGER:
         updateOnStart(currentState, targetState);
         trackStatsInZk();
+        updatedStatsInZk = true;
         break;
       // Write to Zk if there's change since previous stats computation
       case IDEAL_STATE_CHANGE_TRIGGER:
@@ -71,6 +79,7 @@ public class ZkBasedTableRebalanceObserver implements 
TableRebalanceObserver {
             latest)) {
           _tableRebalanceProgressStats.setCurrentToTargetConvergence(latest);
           trackStatsInZk();
+          updatedStatsInZk = true;
         }
         break;
       case EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER:
@@ -79,18 +88,28 @@ public class ZkBasedTableRebalanceObserver implements 
TableRebalanceObserver {
             
_tableRebalanceProgressStats.getExternalViewToIdealStateConvergence(), latest)) 
{
           
_tableRebalanceProgressStats.setExternalViewToIdealStateConvergence(latest);
           trackStatsInZk();
+          updatedStatsInZk = true;
         }
         break;
       default:
         throw new IllegalArgumentException("Unimplemented trigger: " + 
trigger);
     }
+    // The onTrigger method is mainly driven by the while loop of waiting for 
external view to converge to ideal
+    // state. That while loop wait for at least externalViewCheckIntervalInMs. 
So the real interval to send out
+    // heartbeat is the max(heartbeat_interval, externalViewCheckIntervalInMs);
+    long heartbeatIntervalInMs = 
_tableRebalanceContext.getConfig().getHeartbeatIntervalInMs();
+    if (!updatedStatsInZk && System.currentTimeMillis() - _lastUpdateTimeMs > 
heartbeatIntervalInMs) {
+      LOGGER.debug("Update status of rebalance job: {} for table: {} after 
{}ms as heartbeat", _rebalanceJobId,
+          _tableNameWithType, heartbeatIntervalInMs);
+      trackStatsInZk();
+    }
   }
 
   private void updateOnStart(Map<String, Map<String, String>> currentState,
       Map<String, Map<String, String>> targetState) {
-    Preconditions.checkState(_tableRebalanceProgressStats.getStatus() != 
RebalanceResult.Status.IN_PROGRESS.toString(),
+    Preconditions.checkState(RebalanceResult.Status.IN_PROGRESS != 
_tableRebalanceProgressStats.getStatus(),
         "Rebalance Observer onStart called multiple times");
-    
_tableRebalanceProgressStats.setStatus(RebalanceResult.Status.IN_PROGRESS.toString());
+    _tableRebalanceProgressStats.setStatus(RebalanceResult.Status.IN_PROGRESS);
     _tableRebalanceProgressStats.setInitialToTargetStateConvergence(
         getDifferenceBetweenTableRebalanceStates(targetState, currentState));
     _tableRebalanceProgressStats.setStartTimeMs(System.currentTimeMillis());
@@ -98,13 +117,12 @@ public class ZkBasedTableRebalanceObserver implements 
TableRebalanceObserver {
 
   @Override
   public void onSuccess(String msg) {
-    Preconditions.checkState(_tableRebalanceProgressStats.getStatus() != 
RebalanceResult.Status.DONE.toString(),
+    Preconditions.checkState(RebalanceResult.Status.DONE != 
_tableRebalanceProgressStats.getStatus(),
         "Table Rebalance already completed");
-    long timeToFinishInSeconds =
-         (System.currentTimeMillis() - 
_tableRebalanceProgressStats.getStartTimeMs()) / 1000L;
+    long timeToFinishInSeconds = (System.currentTimeMillis() - 
_tableRebalanceProgressStats.getStartTimeMs()) / 1000L;
     _tableRebalanceProgressStats.setCompletionStatusMsg(msg);
     
_tableRebalanceProgressStats.setTimeToFinishInSeconds(timeToFinishInSeconds);
-    
_tableRebalanceProgressStats.setStatus(RebalanceResult.Status.DONE.toString());
+    _tableRebalanceProgressStats.setStatus(RebalanceResult.Status.DONE);
     // Zero out the in_progress convergence stats
     TableRebalanceProgressStats.RebalanceStateStats stats = new 
TableRebalanceProgressStats.RebalanceStateStats();
     _tableRebalanceProgressStats.setExternalViewToIdealStateConvergence(stats);
@@ -114,42 +132,97 @@ public class ZkBasedTableRebalanceObserver implements 
TableRebalanceObserver {
 
   @Override
   public void onError(String errorMsg) {
-    long timeToFinishInSeconds =
-        (long) (System.currentTimeMillis() - 
_tableRebalanceProgressStats.getStartTimeMs()) / 1000;
+    long timeToFinishInSeconds = (System.currentTimeMillis() - 
_tableRebalanceProgressStats.getStartTimeMs()) / 1000;
     
_tableRebalanceProgressStats.setTimeToFinishInSeconds(timeToFinishInSeconds);
-    
_tableRebalanceProgressStats.setStatus(RebalanceResult.Status.FAILED.toString());
+    _tableRebalanceProgressStats.setStatus(RebalanceResult.Status.FAILED);
     _tableRebalanceProgressStats.setCompletionStatusMsg(errorMsg);
     trackStatsInZk();
   }
 
+  @Override
+  public boolean isStopped() {
+    return _isStopped;
+  }
+
+  @Override
+  public RebalanceResult.Status getStopStatus() {
+    return _stopStatus;
+  }
+
   public int getNumUpdatesToZk() {
     return _numUpdatesToZk;
   }
 
+  @VisibleForTesting
+  TableRebalanceContext getTableRebalanceContext() {
+    return _tableRebalanceContext;
+  }
+
   private void trackStatsInZk() {
+    Map<String, String> jobMetadata =
+        createJobMetadata(_tableNameWithType, _rebalanceJobId, 
_tableRebalanceProgressStats, _tableRebalanceContext);
+    _pinotHelixResourceManager.addControllerJobToZK(_rebalanceJobId, 
jobMetadata, ControllerJobType.TABLE_REBALANCE,
+        prevJobMetadata -> {
+          // In addition to updating job progress status, the observer also 
checks if the job status is IN_PROGRESS.
+          // If not, then no need to update the job status, and we keep this 
status to end the job promptly.
+          if (prevJobMetadata == null) {
+            return true;
+          }
+          String prevStatsInStr = 
prevJobMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS);
+          TableRebalanceProgressStats prevStats;
+          try {
+            prevStats = JsonUtils.stringToObject(prevStatsInStr, 
TableRebalanceProgressStats.class);
+          } catch (JsonProcessingException ignore) {
+            return true;
+          }
+          if (prevStats == null || RebalanceResult.Status.IN_PROGRESS == 
prevStats.getStatus()) {
+            return true;
+          }
+          _isStopped = true;
+          _stopStatus = prevStats.getStatus();
+          LOGGER.warn("Rebalance job: {} for table: {} has already stopped 
with status: {}", _rebalanceJobId,
+              _tableNameWithType, _stopStatus);
+          // No need to update job status if job has ended. This also keeps 
the last status from being overwritten.
+          return false;
+        });
+    _numUpdatesToZk++;
+    _lastUpdateTimeMs = System.currentTimeMillis();
+    LOGGER.debug("Made {} ZK updates for rebalance job: {} of table: {}", 
_numUpdatesToZk, _rebalanceJobId,
+        _tableNameWithType);
+  }
+
+  @VisibleForTesting
+  static Map<String, String> createJobMetadata(String tableNameWithType, 
String jobId,
+      TableRebalanceProgressStats tableRebalanceProgressStats, 
TableRebalanceContext tableRebalanceContext) {
     Map<String, String> jobMetadata = new HashMap<>();
-    jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, 
_tableNameWithType);
-    jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, _rebalanceJobId);
+    jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, 
tableNameWithType);
+    jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
     jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
Long.toString(System.currentTimeMillis()));
     jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobType.TABLE_REBALANCE.name());
     try {
       
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
-          JsonUtils.objectToString(_tableRebalanceProgressStats));
+          JsonUtils.objectToString(tableRebalanceProgressStats));
     } catch (JsonProcessingException e) {
-      LOGGER.error("Error serialising rebalance stats to JSON for persisting 
to ZK {}", _rebalanceJobId, e);
+      LOGGER.error("Error serialising stats for rebalance job: {} of table: {} 
to keep in ZK", jobId, tableNameWithType,
+          e);
     }
-    _pinotHelixResourceManager.addControllerJobToZK(_rebalanceJobId, 
jobMetadata,
-        
ZKMetadataProvider.constructPropertyStorePathForControllerJob(ControllerJobType.TABLE_REBALANCE));
-    _numUpdatesToZk++;
-    LOGGER.debug("Number of updates to Zk: {} for rebalanceJob: {}  ", 
_numUpdatesToZk, _rebalanceJobId);
+    try {
+      jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT,
+          JsonUtils.objectToString(tableRebalanceContext));
+    } catch (JsonProcessingException e) {
+      LOGGER.error("Error serialising retry configs for rebalance job: {} of 
table: {} to keep in ZK", jobId,
+          tableNameWithType, e);
+    }
+    return jobMetadata;
   }
 
   /**
-   * Takes in targetState and sourceState and computes stats based on the 
comparison
-   * between sourceState and targetState.This captures how far the source 
state is from
-   * the target state. Example - if there are 4 segments and 16 replicas in 
the source state
-   * not matching the target state, _segmentsToRebalance is 4 and 
_replicasToRebalance is 16.
-   * @param targetState- The state that we want to get to
+   * Takes in targetState and sourceState and computes stats based on the 
comparison between sourceState and
+   * targetState.This captures how far the source state is from the target 
state. Example - if there are 4 segments and
+   * 16 replicas in the source state not matching the target state, 
_segmentsToRebalance is 4 and _replicasToRebalance
+   * is 16.
+   *
+   * @param targetState - The state that we want to get to
    * @param sourceState - A given state that needs to converge to targetState
    * @return RebalanceStats
    */
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
index 6c26985539..9fabc64d9d 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
@@ -25,7 +25,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants;
@@ -108,8 +107,7 @@ public class ZkBasedTenantRebalanceObserver implements 
TenantRebalanceObserver {
     } catch (JsonProcessingException e) {
       LOGGER.error("Error serialising rebalance stats to JSON for persisting 
to ZK {}", _jobId, e);
     }
-    _pinotHelixResourceManager.addControllerJobToZK(_jobId, jobMetadata,
-        
ZKMetadataProvider.constructPropertyStorePathForControllerJob(ControllerJobType.TENANT_REBALANCE));
+    _pinotHelixResourceManager.addControllerJobToZK(_jobId, jobMetadata, 
ControllerJobType.TENANT_REBALANCE);
     _numUpdatesToZk++;
     LOGGER.debug("Number of updates to Zk: {} for rebalanceJob: {}  ", 
_numUpdatesToZk, _jobId);
   }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterStatelessTest.java
index 6172c67def..a1b086c8fa 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterStatelessTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterStatelessTest.java
@@ -57,7 +57,7 @@ public class ControllerPeriodicTaskStarterStatelessTest 
extends ControllerTest {
   }
 
   private class MockControllerStarter extends ControllerStarter {
-    private static final int NUM_PERIODIC_TASKS = 10;
+    private static final int NUM_PERIODIC_TASKS = 11;
 
     public MockControllerStarter() {
       super();
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceCheckerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceCheckerTest.java
new file mode 100644
index 0000000000..22663fa333
--- /dev/null
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceCheckerTest.java
@@ -0,0 +1,388 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.mockito.ArgumentCaptor;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
+
+public class RebalanceCheckerTest {
+
+  @Test
+  public void testGetRetryDelayInMs() {
+    assertEquals(RebalanceChecker.getRetryDelayInMs(0, 1), 0);
+    assertEquals(RebalanceChecker.getRetryDelayInMs(0, 2), 0);
+    assertEquals(RebalanceChecker.getRetryDelayInMs(0, 3), 0);
+
+    for (long initDelayMs : new long[]{1, 30000, 3600000}) {
+      long delayMs = RebalanceChecker.getRetryDelayInMs(initDelayMs, 1);
+      assertTrue(delayMs >= initDelayMs && delayMs < initDelayMs * 2);
+      delayMs = RebalanceChecker.getRetryDelayInMs(initDelayMs, 2);
+      assertTrue(delayMs >= initDelayMs * 2 && delayMs < initDelayMs * 4);
+      delayMs = RebalanceChecker.getRetryDelayInMs(initDelayMs, 3);
+      assertTrue(delayMs >= initDelayMs * 4 && delayMs < initDelayMs * 8);
+    }
+  }
+
+  @Test
+  public void testGetCandidateJobs()
+      throws Exception {
+    String tableName = "table01";
+    Map<String, Map<String, String>> allJobMetadata = new HashMap<>();
+
+    // Original job run as job1, and all its retry jobs failed too.
+    RebalanceConfig jobCfg = new RebalanceConfig();
+    jobCfg.setMaxAttempts(4);
+    TableRebalanceProgressStats stats = new TableRebalanceProgressStats();
+    stats.setStatus(RebalanceResult.Status.FAILED);
+    stats.setStartTimeMs(1000);
+    TableRebalanceContext jobCtx = 
TableRebalanceContext.forInitialAttempt("job1", jobCfg);
+    Map<String, String> jobMetadata = 
ZkBasedTableRebalanceObserver.createJobMetadata(tableName, "job1", stats, 
jobCtx);
+    allJobMetadata.put("job1", jobMetadata);
+    // 3 failed retry runs for job1
+    jobMetadata = createDummyJobMetadata(tableName, "job1", 2, 1100, 
RebalanceResult.Status.FAILED);
+    allJobMetadata.put("job1_2", jobMetadata);
+    jobMetadata = createDummyJobMetadata(tableName, "job1", 3, 1200, 
RebalanceResult.Status.ABORTED);
+    allJobMetadata.put("job1_3", jobMetadata);
+    jobMetadata = createDummyJobMetadata(tableName, "job1", 4, 1300, 
RebalanceResult.Status.FAILED);
+    allJobMetadata.put("job1_4", jobMetadata);
+
+    // Original job run as job2, and its retry job job2_1 completed.
+    jobCfg = new RebalanceConfig();
+    jobCfg.setMaxAttempts(4);
+    stats = new TableRebalanceProgressStats();
+    stats.setStatus(RebalanceResult.Status.FAILED);
+    stats.setStartTimeMs(2000);
+    jobCtx = TableRebalanceContext.forInitialAttempt("job2", jobCfg);
+    jobMetadata = ZkBasedTableRebalanceObserver.createJobMetadata(tableName, 
"job2", stats, jobCtx);
+    allJobMetadata.put("job2", jobMetadata);
+    jobMetadata = createDummyJobMetadata(tableName, "job2", 2, 2100, 
RebalanceResult.Status.DONE);
+    allJobMetadata.put("job2_2", jobMetadata);
+
+    // Original job run as job3, and failed to send out heartbeat in time.
+    jobCfg = new RebalanceConfig();
+    jobCfg.setMaxAttempts(4);
+    stats = new TableRebalanceProgressStats();
+    stats.setStatus(RebalanceResult.Status.IN_PROGRESS);
+    stats.setStartTimeMs(3000);
+    jobCtx = TableRebalanceContext.forInitialAttempt("job3", jobCfg);
+    jobMetadata = ZkBasedTableRebalanceObserver.createJobMetadata(tableName, 
"job3", stats, jobCtx);
+    jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, "3000");
+    allJobMetadata.put("job3", jobMetadata);
+
+    // Original job run as job4, which didn't have retryJobCfg as from old 
version of the code.
+    stats = new TableRebalanceProgressStats();
+    stats.setStatus(RebalanceResult.Status.FAILED);
+    stats.setStartTimeMs(4000);
+    jobMetadata = ZkBasedTableRebalanceObserver.createJobMetadata(tableName, 
"job4", stats, null);
+    
jobMetadata.remove(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT);
+    allJobMetadata.put("job4", jobMetadata);
+
+    // Only need to retry job1 and job3, as job2 is completed and job4 is from 
old version of code.
+    Map<String, Set<Pair<TableRebalanceContext, Long>>> jobs =
+        RebalanceChecker.getCandidateJobs(tableName, allJobMetadata);
+    assertEquals(jobs.size(), 2);
+    assertTrue(jobs.containsKey("job1"));
+    assertTrue(jobs.containsKey("job3"));
+    assertEquals(jobs.get("job1").size(), 4); // four runs including 
job1,job1_1,job1_2,job1_3
+    assertEquals(jobs.get("job3").size(), 1); // just a single run job3
+
+    // Abort job1 and cancel its retries, then only job3 is retry candidate.
+    jobMetadata = allJobMetadata.get("job1_4");
+    cancelRebalanceJob(jobMetadata);
+    jobs = RebalanceChecker.getCandidateJobs(tableName, allJobMetadata);
+    assertEquals(jobs.size(), 1);
+    assertTrue(jobs.containsKey("job3"));
+    assertEquals(jobs.get("job3").size(), 1); // just a single run job3
+
+    // Add latest job5 that's already done, thus no need to retry for table.
+    jobCfg = new RebalanceConfig();
+    jobCfg.setMaxAttempts(4);
+    stats = new TableRebalanceProgressStats();
+    stats.setStatus(RebalanceResult.Status.DONE);
+    stats.setStartTimeMs(5000);
+    jobCtx = TableRebalanceContext.forInitialAttempt("job5", jobCfg);
+    jobMetadata = ZkBasedTableRebalanceObserver.createJobMetadata(tableName, 
"job5", stats, jobCtx);
+    allJobMetadata.put("job5", jobMetadata);
+    jobs = RebalanceChecker.getCandidateJobs(tableName, allJobMetadata);
+    assertEquals(jobs.size(), 0);
+  }
+
+  @Test
+  public void testGetLatestJob() {
+    Map<String, Set<Pair<TableRebalanceContext, Long>>> jobs = new HashMap<>();
+    // The most recent job run is job1_3, and within 3 maxAttempts.
+    jobs.put("job1",
+        ImmutableSet.of(Pair.of(createDummyJobCtx("job1", 1), 10L), 
Pair.of(createDummyJobCtx("job1", 2), 20L),
+            Pair.of(createDummyJobCtx("job1", 3), 1020L)));
+    jobs.put("job2", ImmutableSet.of(Pair.of(createDummyJobCtx("job2", 1), 
1000L)));
+    Pair<TableRebalanceContext, Long> jobTime = 
RebalanceChecker.getLatestJob(jobs);
+    assertNotNull(jobTime);
+    assertEquals(jobTime.getLeft().getJobId(), "job1_3");
+
+    // The most recent job run is job1_4, but reached 3 maxAttempts.
+    jobs.put("job1",
+        ImmutableSet.of(Pair.of(createDummyJobCtx("job1", 1), 10L), 
Pair.of(createDummyJobCtx("job1", 2), 20L),
+            Pair.of(createDummyJobCtx("job1", 3), 1020L), 
Pair.of(createDummyJobCtx("job1", 4), 2020L)));
+    jobTime = RebalanceChecker.getLatestJob(jobs);
+    assertNotNull(jobTime);
+    assertEquals(jobTime.getLeft().getJobId(), "job2");
+
+    // Add job3 that's started more recently.
+    jobs.put("job3", ImmutableSet.of(Pair.of(createDummyJobCtx("job3", 1), 
3000L)));
+    jobTime = RebalanceChecker.getLatestJob(jobs);
+    assertNotNull(jobTime);
+    assertEquals(jobTime.getLeft().getJobId(), "job3");
+
+    // Remove job2 and job3, and we'd have no job to retry then.
+    jobs.remove("job2");
+    jobs.remove("job3");
+    jobTime = RebalanceChecker.getLatestJob(jobs);
+    assertNull(jobTime);
+  }
+
+  @Test
+  public void testRetryRebalance()
+      throws Exception {
+    String tableName = "table01";
+    LeadControllerManager leadController = mock(LeadControllerManager.class);
+    ControllerMetrics metrics = mock(ControllerMetrics.class);
+    ExecutorService exec = MoreExecutors.newDirectExecutorService();
+    ControllerConf cfg = new ControllerConf();
+
+    Map<String, Map<String, String>> allJobMetadata = new HashMap<>();
+    // Original job run as job1, and all its retry jobs failed too.
+    RebalanceConfig jobCfg = new RebalanceConfig();
+    jobCfg.setMaxAttempts(4);
+    TableRebalanceProgressStats stats = new TableRebalanceProgressStats();
+    stats.setStatus(RebalanceResult.Status.FAILED);
+    stats.setStartTimeMs(1000);
+    TableRebalanceContext jobCtx = 
TableRebalanceContext.forInitialAttempt("job1", jobCfg);
+    Map<String, String> jobMetadata = 
ZkBasedTableRebalanceObserver.createJobMetadata(tableName, "job1", stats, 
jobCtx);
+    allJobMetadata.put("job1", jobMetadata);
+    // 3 failed retry runs for job1
+    jobMetadata = createDummyJobMetadata(tableName, "job1", 2, 1100, 
RebalanceResult.Status.FAILED);
+    allJobMetadata.put("job1_2", jobMetadata);
+    jobMetadata = createDummyJobMetadata(tableName, "job1", 3, 1200, 
RebalanceResult.Status.FAILED);
+    allJobMetadata.put("job1_3", jobMetadata);
+    jobMetadata = createDummyJobMetadata(tableName, "job1", 4, 5300, 
RebalanceResult.Status.FAILED);
+    allJobMetadata.put("job1_4", jobMetadata);
+
+    // Original job run as job2, and its retry job job2_1 completed.
+    jobCfg = new RebalanceConfig();
+    jobCfg.setMaxAttempts(4);
+    stats = new TableRebalanceProgressStats();
+    stats.setStatus(RebalanceResult.Status.FAILED);
+    stats.setStartTimeMs(2000);
+    jobCtx = TableRebalanceContext.forInitialAttempt("job2", jobCfg);
+    jobMetadata = ZkBasedTableRebalanceObserver.createJobMetadata(tableName, 
"job2", stats, jobCtx);
+    allJobMetadata.put("job2", jobMetadata);
+    jobMetadata = createDummyJobMetadata(tableName, "job2", 2, 2100, 
RebalanceResult.Status.DONE);
+    allJobMetadata.put("job2_2", jobMetadata);
+
+    // Original job run as job3, and failed to send out heartbeat in time.
+    jobCfg = new RebalanceConfig();
+    jobCfg.setMaxAttempts(4);
+    stats = new TableRebalanceProgressStats();
+    stats.setStatus(RebalanceResult.Status.IN_PROGRESS);
+    stats.setStartTimeMs(3000);
+    jobCtx = TableRebalanceContext.forInitialAttempt("job3", jobCfg);
+    jobMetadata = ZkBasedTableRebalanceObserver.createJobMetadata(tableName, 
"job3", stats, jobCtx);
+    jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, "3000");
+    allJobMetadata.put("job3", jobMetadata);
+
+    TableConfig tableConfig = mock(TableConfig.class);
+    PinotHelixResourceManager helixManager = 
mock(PinotHelixResourceManager.class);
+    when(helixManager.getTableConfig(tableName)).thenReturn(tableConfig);
+    when(helixManager.getAllJobs(any(), any())).thenReturn(allJobMetadata);
+    RebalanceChecker checker = new RebalanceChecker(helixManager, 
leadController, cfg, metrics, exec);
+    // Although job1_3 was submitted most recently but job1 had exceeded 
maxAttempts. Chose job3 to retry, which got
+    // stuck at in progress status.
+    checker.retryRebalanceTable(tableName, allJobMetadata);
+    // The new retry job is for job3 and attemptId is increased to 2.
+    ArgumentCaptor<ZkBasedTableRebalanceObserver> observerCaptor =
+        ArgumentCaptor.forClass(ZkBasedTableRebalanceObserver.class);
+    verify(helixManager, times(1)).rebalanceTable(eq(tableName), any(), 
anyString(), any(), observerCaptor.capture());
+    ZkBasedTableRebalanceObserver observer = observerCaptor.getValue();
+    jobCtx = observer.getTableRebalanceContext();
+    assertEquals(jobCtx.getOriginalJobId(), "job3");
+    assertEquals(jobCtx.getAttemptId(), 2);
+  }
+
+  @Test
+  public void testRetryRebalanceWithBackoff()
+      throws Exception {
+    String tableName = "table01";
+    LeadControllerManager leadController = mock(LeadControllerManager.class);
+    ControllerMetrics metrics = mock(ControllerMetrics.class);
+    ExecutorService exec = MoreExecutors.newDirectExecutorService();
+    ControllerConf cfg = new ControllerConf();
+
+    Map<String, Map<String, String>> allJobMetadata = new HashMap<>();
+    // Original job run as job1, and all its retry jobs failed too.
+    RebalanceConfig jobCfg = new RebalanceConfig();
+    jobCfg.setMaxAttempts(4);
+    long nowMs = System.currentTimeMillis();
+    TableRebalanceProgressStats stats = new TableRebalanceProgressStats();
+    stats.setStatus(RebalanceResult.Status.FAILED);
+    stats.setStartTimeMs(nowMs);
+    TableRebalanceContext jobCtx = 
TableRebalanceContext.forInitialAttempt("job1", jobCfg);
+    Map<String, String> jobMetadata = 
ZkBasedTableRebalanceObserver.createJobMetadata(tableName, "job1", stats, 
jobCtx);
+    allJobMetadata.put("job1", jobMetadata);
+
+    PinotHelixResourceManager helixManager = 
mock(PinotHelixResourceManager.class);
+    TableConfig tableConfig = mock(TableConfig.class);
+    when(helixManager.getTableConfig(tableName)).thenReturn(tableConfig);
+    RebalanceChecker checker = new RebalanceChecker(helixManager, 
leadController, cfg, metrics, exec);
+    checker.retryRebalanceTable(tableName, allJobMetadata);
+    // Retry for job1 is delayed with 5min backoff.
+    ArgumentCaptor<ZkBasedTableRebalanceObserver> observerCaptor =
+        ArgumentCaptor.forClass(ZkBasedTableRebalanceObserver.class);
+    verify(helixManager, times(0)).rebalanceTable(eq(tableName), any(), 
anyString(), any(), observerCaptor.capture());
+
+    // Set initial delay to 0 to disable retry backoff.
+    jobCfg.setRetryInitialDelayInMs(0);
+    jobMetadata = ZkBasedTableRebalanceObserver.createJobMetadata(tableName, 
"job1", stats, jobCtx);
+    allJobMetadata.put("job1", jobMetadata);
+    checker.retryRebalanceTable(tableName, allJobMetadata);
+    // Retry for job1 is delayed with 0 backoff.
+    observerCaptor = 
ArgumentCaptor.forClass(ZkBasedTableRebalanceObserver.class);
+    verify(helixManager, times(1)).rebalanceTable(eq(tableName), any(), 
anyString(), any(), observerCaptor.capture());
+  }
+
+  @Test
+  public void testAddUpdateControllerJobsForTable() {
+    ControllerConf cfg = new ControllerConf();
+    cfg.setZkStr("localhost:2181");
+    cfg.setHelixClusterName("cluster01");
+    PinotHelixResourceManager pinotHelixManager = new 
PinotHelixResourceManager(cfg);
+    HelixManager helixZkManager = mock(HelixManager.class);
+    ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class);
+    String zkPath = 
ZKMetadataProvider.constructPropertyStorePathForControllerJob(ControllerJobType.TABLE_REBALANCE);
+    ZNRecord jobsZnRecord = new ZNRecord("jobs");
+    when(propertyStore.get(eq(zkPath), any(), 
eq(AccessOption.PERSISTENT))).thenReturn(jobsZnRecord);
+    
when(helixZkManager.getClusterManagmentTool()).thenReturn(mock(HelixAdmin.class));
+    when(helixZkManager.getHelixPropertyStore()).thenReturn(propertyStore);
+    
when(helixZkManager.getHelixDataAccessor()).thenReturn(mock(HelixDataAccessor.class));
+    pinotHelixManager.start(helixZkManager, null);
+
+    pinotHelixManager.addControllerJobToZK("job1",
+        ImmutableMap.of("jobId", "job1", "submissionTimeMs", "1000", 
"tableName", "table01"),
+        ControllerJobType.TABLE_REBALANCE, jmd -> true);
+    pinotHelixManager.addControllerJobToZK("job2",
+        ImmutableMap.of("jobId", "job2", "submissionTimeMs", "2000", 
"tableName", "table01"),
+        ControllerJobType.TABLE_REBALANCE, jmd -> false);
+    pinotHelixManager.addControllerJobToZK("job3",
+        ImmutableMap.of("jobId", "job3", "submissionTimeMs", "3000", 
"tableName", "table02"),
+        ControllerJobType.TABLE_REBALANCE, jmd -> true);
+    pinotHelixManager.addControllerJobToZK("job4",
+        ImmutableMap.of("jobId", "job4", "submissionTimeMs", "4000", 
"tableName", "table02"),
+        ControllerJobType.TABLE_REBALANCE, jmd -> true);
+    Map<String, Map<String, String>> jmds = jobsZnRecord.getMapFields();
+    assertEquals(jmds.size(), 3);
+    assertTrue(jmds.containsKey("job1"));
+    assertTrue(jmds.containsKey("job3"));
+    assertTrue(jmds.containsKey("job4"));
+
+    Set<String> expectedJobs01 = new HashSet<>();
+    pinotHelixManager.updateJobsForTable("table01", 
ControllerJobType.TABLE_REBALANCE,
+        jmd -> expectedJobs01.add(jmd.get("jobId")));
+    assertEquals(expectedJobs01.size(), 1);
+    assertTrue(expectedJobs01.contains("job1"));
+
+    Set<String> expectedJobs02 = new HashSet<>();
+    pinotHelixManager.updateJobsForTable("table02", 
ControllerJobType.TABLE_REBALANCE,
+        jmd -> expectedJobs02.add(jmd.get("jobId")));
+    assertEquals(expectedJobs02.size(), 2);
+    assertTrue(expectedJobs02.contains("job3"));
+    assertTrue(expectedJobs02.contains("job4"));
+  }
+
+  private static TableRebalanceContext createDummyJobCtx(String originalJobId, 
int attemptId) {
+    TableRebalanceContext jobCtx = new TableRebalanceContext();
+    RebalanceConfig cfg = new RebalanceConfig();
+    cfg.setMaxAttempts(4);
+    jobCtx.setJobId(originalJobId);
+    if (attemptId > 1) {
+      jobCtx.setJobId(originalJobId + "_" + attemptId);
+    }
+    jobCtx.setOriginalJobId(originalJobId);
+    jobCtx.setConfig(cfg);
+    jobCtx.setAttemptId(attemptId);
+    return jobCtx;
+  }
+
+  private static Map<String, String> createDummyJobMetadata(String tableName, 
String originalJobId, int attemptId,
+      long startTimeMs, RebalanceResult.Status status) {
+    RebalanceConfig cfg = new RebalanceConfig();
+    cfg.setMaxAttempts(4);
+    TableRebalanceProgressStats stats = new TableRebalanceProgressStats();
+    stats.setStatus(status);
+    stats.setStartTimeMs(startTimeMs);
+    TableRebalanceContext jobCtx = 
TableRebalanceContext.forRetry(originalJobId, cfg, attemptId);
+    String attemptJobId = originalJobId + "_" + attemptId;
+    return ZkBasedTableRebalanceObserver.createJobMetadata(tableName, 
attemptJobId, stats, jobCtx);
+  }
+
+  private static void cancelRebalanceJob(Map<String, String> jobMetadata)
+      throws JsonProcessingException {
+    String jobStatsInStr = 
jobMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS);
+    TableRebalanceProgressStats jobStats = 
JsonUtils.stringToObject(jobStatsInStr, TableRebalanceProgressStats.class);
+    jobStats.setStatus(RebalanceResult.Status.CANCELLED);
+    
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
+        JsonUtils.objectToString(jobStats));
+  }
+}
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java
index 74b8a4367f..143caebf2b 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java
@@ -35,14 +35,16 @@ import static org.testng.Assert.assertEquals;
 
 public class TestZkBasedTableRebalanceObserver {
 
+  // This is a test to verify if Zk stats are pushed out correctly
   @Test
-    // This is a test to verify if Zk stats are pushed out correctly
   void testZkObserverTracking() {
     PinotHelixResourceManager pinotHelixResourceManager = 
mock(PinotHelixResourceManager.class);
     // Mocking this. We will verify using numZkUpdate stat
     when(pinotHelixResourceManager.addControllerJobToZK(any(), any(), 
any())).thenReturn(true);
+    TableRebalanceContext retryCtx = new TableRebalanceContext();
+    retryCtx.setConfig(new RebalanceConfig());
     ZkBasedTableRebalanceObserver observer =
-        new ZkBasedTableRebalanceObserver("dummy", "dummyId", 
pinotHelixResourceManager);
+        new ZkBasedTableRebalanceObserver("dummy", "dummyId", retryCtx, 
pinotHelixResourceManager);
     Map<String, Map<String, String>> source = new TreeMap<>();
     Map<String, Map<String, String>> target = new TreeMap<>();
     target.put("segment1",
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java 
b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
index e72d066bc9..db6941bf1f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
@@ -90,6 +90,7 @@ public class Actions {
   // Action names for table
   public static class Table {
     public static final String BUILD_ROUTING = "BuildRouting";
+    public static final String CANCEL_REBALANCE = "CancelRebalance";
     public static final String CREATE_INSTANCE_PARTITIONS = 
"CreateInstancePartitions";
     public static final String CREATE_SCHEMA = "CreateSchema";
     public static final String CREATE_TABLE = "CreateTable";
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
index 0f88a36d06..4a543556f7 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
@@ -370,9 +370,8 @@ public class PartialUpsertTableRebalanceIntegrationTest 
extends BaseClusterInteg
 
           ServerRebalanceJobStatusResponse serverRebalanceJobStatusResponse =
               JsonUtils.stringToObject(httpResponse.getResponse(), 
ServerRebalanceJobStatusResponse.class);
-          String status = 
serverRebalanceJobStatusResponse.getTableRebalanceProgressStats().getStatus();
-          return status.equals(RebalanceResult.Status.DONE.toString()) || 
status.equals(
-              RebalanceResult.Status.FAILED.toString());
+          RebalanceResult.Status status = 
serverRebalanceJobStatusResponse.getTableRebalanceProgressStats().getStatus();
+          return status != RebalanceResult.Status.IN_PROGRESS;
         } catch (HttpErrorStatusException | URISyntaxException e) {
           throw new IOException(e);
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to