This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 17dfafed0d4 Fix flaky TenantRebalancerTest  (#16956)
17dfafed0d4 is described below

commit 17dfafed0d4d169773321b64c81ac172dc459cd2
Author: Jhow <[email protected]>
AuthorDate: Mon Oct 13 14:50:18 2025 -0700

    Fix flaky TenantRebalancerTest  (#16956)
---
 .../tenant/ZkBasedTenantRebalanceObserver.java     |   26 +-
 .../rebalance/tenant/TenantRebalancerTest.java     | 1043 ++++++++++----------
 2 files changed, 550 insertions(+), 519 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
index e1237238065..b1a54cbf59b 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -46,16 +47,16 @@ import org.slf4j.LoggerFactory;
 
 public class ZkBasedTenantRebalanceObserver {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ZkBasedTenantRebalanceObserver.class);
-  public static final int DEFAULT_ZK_UPDATE_MAX_RETRIES = 3;
   private static final int MIN_ZK_UPDATE_RETRY_DELAY_MS = 100;
   private static final int MAX_ZK_UPDATE_RETRY_DELAY_MS = 200;
+  public static final int DEFAULT_ZK_UPDATE_MAX_RETRIES = 5;
 
   private final PinotHelixResourceManager _pinotHelixResourceManager;
   private final String _jobId;
   private final String _tenantName;
   // Keep track of number of updates. Useful during debugging.
   private final AtomicInteger _numUpdatesToZk;
-  private final int _zkUpdateMaxRetries;
+  private final RetryPolicy _retryPolicy;
   private boolean _isDone;
 
   private ZkBasedTenantRebalanceObserver(String jobId, String tenantName,
@@ -64,16 +65,24 @@ public class ZkBasedTenantRebalanceObserver {
     _jobId = jobId;
     _tenantName = tenantName;
     _pinotHelixResourceManager = pinotHelixResourceManager;
-    _zkUpdateMaxRetries = zkUpdateMaxRetries;
     _numUpdatesToZk = new AtomicInteger(0);
+    _retryPolicy = RetryPolicies.randomDelayRetryPolicy(zkUpdateMaxRetries, 
MIN_ZK_UPDATE_RETRY_DELAY_MS,
+        MAX_ZK_UPDATE_RETRY_DELAY_MS);
   }
 
   private ZkBasedTenantRebalanceObserver(String jobId, String tenantName, 
TenantRebalanceProgressStats progressStats,
       TenantRebalanceContext tenantRebalanceContext, PinotHelixResourceManager 
pinotHelixResourceManager,
       int zkUpdateMaxRetries) {
     this(jobId, tenantName, pinotHelixResourceManager, zkUpdateMaxRetries);
-    _pinotHelixResourceManager.addControllerJobToZK(_jobId, 
makeJobMetadata(tenantRebalanceContext, progressStats),
-        ControllerJobTypes.TENANT_REBALANCE);
+    try {
+      _retryPolicy.attempt(() -> 
_pinotHelixResourceManager.addControllerJobToZK(_jobId,
+          makeJobMetadata(tenantRebalanceContext, progressStats),
+          ControllerJobTypes.TENANT_REBALANCE, Objects::isNull)
+      );
+    } catch (AttemptFailureException e) {
+      LOGGER.error("Error creating initial job metadata in ZK for jobId: {} 
for tenant rebalance", _jobId, e);
+      throw new RuntimeException("Error creating initial job metadata in ZK 
for jobId: " + _jobId, e);
+    }
     _numUpdatesToZk.incrementAndGet();
   }
 
@@ -255,12 +264,11 @@ public class ZkBasedTenantRebalanceObserver {
   private void updateTenantRebalanceJobMetadataInZk(
       BiConsumer<TenantRebalanceContext, TenantRebalanceProgressStats> updater)
       throws AttemptFailureException {
-    RetryPolicy retry = 
RetryPolicies.randomDelayRetryPolicy(_zkUpdateMaxRetries, 
MIN_ZK_UPDATE_RETRY_DELAY_MS,
-        MAX_ZK_UPDATE_RETRY_DELAY_MS);
-    retry.attempt(() -> {
+    _retryPolicy.attempt(() -> {
       Map<String, String> jobMetadata =
           _pinotHelixResourceManager.getControllerJobZKMetadata(_jobId, 
ControllerJobTypes.TENANT_REBALANCE);
       if (jobMetadata == null) {
+        LOGGER.warn("Skip updating ZK since job metadata is not present in ZK 
for jobId: {}", _jobId);
         return false;
       }
       TenantRebalanceContext originalContext = 
TenantRebalanceContext.fromTenantRebalanceJobMetadata(jobMetadata);
@@ -297,7 +305,7 @@ public class ZkBasedTenantRebalanceObserver {
       if (updateSuccessful) {
         return true;
       } else {
-        LOGGER.info(
+        LOGGER.warn(
             "Tenant rebalance context or progress stats is out of sync with ZK 
while polling, fetching the latest "
                 + "context and progress stats from ZK and retry. jobId: {}", 
_jobId);
         return false;
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java
index 92f8dc729bb..c926014ef7c 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java
@@ -120,44 +120,44 @@ public class TenantRebalancerTest extends ControllerTest {
       addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX + 
(numServers + i), true);
     }
 
-    Map<String, Map<String, String>> oldSegmentAssignment =
-        
_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME_B).getRecord().getMapFields();
-
-    // rebalance the tables on test tenant
-    TenantRebalanceConfig config = new TenantRebalanceConfig();
-    config.setTenantName(TENANT_NAME);
-    config.setVerboseResult(true);
-    TenantRebalanceResult result = tenantRebalancer.rebalance(config);
-    RebalanceResult rebalanceResult = 
result.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_B);
-    Map<String, Map<String, String>> rebalancedAssignment = 
rebalanceResult.getSegmentAssignment();
-    // assignment should not change, with a NO_OP status as no now server is 
added to test tenant
-    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
-    assertEquals(oldSegmentAssignment, rebalancedAssignment);
-
-    // rebalance the tables on default tenant
-    config.setTenantName(DEFAULT_TENANT_NAME);
-    result = tenantRebalancer.rebalance(config);
-    // rebalancing default tenant should distribute the segment of table A 
over 6 servers
-    rebalanceResult = 
result.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_A);
-    InstancePartitions partitions = 
rebalanceResult.getInstanceAssignment().get(InstancePartitionsType.OFFLINE);
-    assertEquals(partitions.getPartitionToInstancesMap().get("0_0").size(), 6);
-
-    // ensure the ideal state and external view converges
-    assertTrue(waitForCompletion(result.getJobId()));
-    TenantRebalanceProgressStats progressStats = 
getProgress(result.getJobId());
-    
assertTrue(progressStats.getTableRebalanceJobIdMap().containsKey(OFFLINE_TABLE_NAME_A));
-    assertEquals(progressStats.getTableStatusMap().get(OFFLINE_TABLE_NAME_A),
-        TenantRebalanceProgressStats.TableStatus.DONE.name());
-    Map<String, Map<String, String>> idealState =
-        
_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME_A).getRecord().getMapFields();
-    Map<String, Map<String, String>> externalView =
-        
_helixResourceManager.getTableExternalView(OFFLINE_TABLE_NAME_A).getRecord().getMapFields();
-    assertEquals(idealState, externalView);
-
-    _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_A);
-    _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_B);
-    for (int i = 0; i < numServers + numServersToAdd; i++) {
-      stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+    try {
+      Map<String, Map<String, String>> oldSegmentAssignment =
+          
_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME_B).getRecord().getMapFields();
+
+      // rebalance the tables on test tenant
+      TenantRebalanceConfig config = new TenantRebalanceConfig();
+      config.setTenantName(TENANT_NAME);
+      config.setVerboseResult(true);
+      TenantRebalanceResult result = tenantRebalancer.rebalance(config);
+      RebalanceResult rebalanceResult = 
result.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_B);
+      Map<String, Map<String, String>> rebalancedAssignment = 
rebalanceResult.getSegmentAssignment();
+      // assignment should not change, with a NO_OP status as no now server is 
added to test tenant
+      assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+      assertEquals(oldSegmentAssignment, rebalancedAssignment);
+      // rebalance the tables on default tenant
+      config.setTenantName(DEFAULT_TENANT_NAME);
+      result = tenantRebalancer.rebalance(config);
+      // rebalancing default tenant should distribute the segment of table A 
over 6 servers
+      rebalanceResult = 
result.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_A);
+      InstancePartitions partitions = 
rebalanceResult.getInstanceAssignment().get(InstancePartitionsType.OFFLINE);
+      assertEquals(partitions.getPartitionToInstancesMap().get("0_0").size(), 
6);
+      // ensure the ideal state and external view converges
+      assertTrue(waitForCompletion(result.getJobId()));
+      TenantRebalanceProgressStats progressStats = 
getProgress(result.getJobId());
+      
assertTrue(progressStats.getTableRebalanceJobIdMap().containsKey(OFFLINE_TABLE_NAME_A));
+      assertEquals(progressStats.getTableStatusMap().get(OFFLINE_TABLE_NAME_A),
+          TenantRebalanceProgressStats.TableStatus.DONE.name());
+      Map<String, Map<String, String>> idealState =
+          
_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME_A).getRecord().getMapFields();
+      Map<String, Map<String, String>> externalView =
+          
_helixResourceManager.getTableExternalView(OFFLINE_TABLE_NAME_A).getRecord().getMapFields();
+      assertEquals(idealState, externalView);
+    } finally {
+      _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_A);
+      _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_B);
+      for (int i = 0; i < numServers + numServersToAdd; i++) {
+        stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+      }
     }
   }
 
@@ -191,90 +191,92 @@ public class TenantRebalancerTest extends ControllerTest {
 
     addTenantTagToInstances(TENANT_NAME);
 
-    // rebalance the tables on test tenant
-    TenantRebalanceConfig config = new TenantRebalanceConfig();
-    config.setTenantName(TENANT_NAME);
-    config.setVerboseResult(true);
-    config.setDryRun(true);
-
-    // leave allow and block tables empty
-    config.setIncludeTables(Collections.emptySet());
-    config.setExcludeTables(Collections.emptySet());
+    try {
+      // rebalance the tables on test tenant
+      TenantRebalanceConfig config = new TenantRebalanceConfig();
+      config.setTenantName(TENANT_NAME);
+      config.setVerboseResult(true);
+      config.setDryRun(true);
 
-    TenantRebalanceResult tenantRebalanceResult = 
tenantRebalancer.rebalance(config);
+      // leave allow and block tables empty
+      config.setIncludeTables(Collections.emptySet());
+      config.setExcludeTables(Collections.emptySet());
 
-    RebalanceResult rebalanceResultA = 
tenantRebalanceResult.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_A);
-    RebalanceResult rebalanceResultB = 
tenantRebalanceResult.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_B);
+      TenantRebalanceResult tenantRebalanceResult = 
tenantRebalancer.rebalance(config);
 
-    assertNotNull(rebalanceResultA);
-    assertNotNull(rebalanceResultB);
+      RebalanceResult rebalanceResultA = 
tenantRebalanceResult.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_A);
+      RebalanceResult rebalanceResultB = 
tenantRebalanceResult.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_B);
 
-    assertEquals(rebalanceResultA.getStatus(), RebalanceResult.Status.DONE);
-    assertEquals(rebalanceResultB.getStatus(), RebalanceResult.Status.DONE);
+      assertNotNull(rebalanceResultA);
+      assertNotNull(rebalanceResultB);
 
-    // block table B
-    config.setExcludeTables(Collections.singleton(OFFLINE_TABLE_NAME_B));
+      assertEquals(rebalanceResultA.getStatus(), RebalanceResult.Status.DONE);
+      assertEquals(rebalanceResultB.getStatus(), RebalanceResult.Status.DONE);
 
-    tenantRebalanceResult = tenantRebalancer.rebalance(config);
+      // block table B
+      config.setExcludeTables(Collections.singleton(OFFLINE_TABLE_NAME_B));
 
-    rebalanceResultA = 
tenantRebalanceResult.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_A);
-    rebalanceResultB = 
tenantRebalanceResult.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_B);
+      tenantRebalanceResult = tenantRebalancer.rebalance(config);
 
-    assertNotNull(rebalanceResultA);
-    assertNull(rebalanceResultB);
-    assertEquals(rebalanceResultA.getStatus(), RebalanceResult.Status.DONE);
+      rebalanceResultA = 
tenantRebalanceResult.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_A);
+      rebalanceResultB = 
tenantRebalanceResult.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_B);
 
-    // allow all tables explicitly, block table B, this should result the same 
as above case
-    Set<String> includeTables = new HashSet<>();
-    includeTables.add(OFFLINE_TABLE_NAME_A);
-    includeTables.add(OFFLINE_TABLE_NAME_B);
-    config.setIncludeTables(includeTables);
-    config.setExcludeTables(Collections.singleton(OFFLINE_TABLE_NAME_B));
+      assertNotNull(rebalanceResultA);
+      assertNull(rebalanceResultB);
+      assertEquals(rebalanceResultA.getStatus(), RebalanceResult.Status.DONE);
 
-    tenantRebalanceResult = tenantRebalancer.rebalance(config);
+      // allow all tables explicitly, block table B, this should result the 
same as above case
+      Set<String> includeTables = new HashSet<>();
+      includeTables.add(OFFLINE_TABLE_NAME_A);
+      includeTables.add(OFFLINE_TABLE_NAME_B);
+      config.setIncludeTables(includeTables);
+      config.setExcludeTables(Collections.singleton(OFFLINE_TABLE_NAME_B));
 
-    rebalanceResultA = 
tenantRebalanceResult.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_A);
-    rebalanceResultB = 
tenantRebalanceResult.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_B);
+      tenantRebalanceResult = tenantRebalancer.rebalance(config);
 
-    assertNotNull(rebalanceResultA);
-    assertNull(rebalanceResultB);
-    assertEquals(rebalanceResultA.getStatus(), RebalanceResult.Status.DONE);
+      rebalanceResultA = 
tenantRebalanceResult.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_A);
+      rebalanceResultB = 
tenantRebalanceResult.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_B);
 
-    // allow table B
-    config.setIncludeTables(Collections.singleton(OFFLINE_TABLE_NAME_B));
-    config.setExcludeTables(Collections.emptySet());
+      assertNotNull(rebalanceResultA);
+      assertNull(rebalanceResultB);
+      assertEquals(rebalanceResultA.getStatus(), RebalanceResult.Status.DONE);
 
-    tenantRebalanceResult = tenantRebalancer.rebalance(config);
+      // allow table B
+      config.setIncludeTables(Collections.singleton(OFFLINE_TABLE_NAME_B));
+      config.setExcludeTables(Collections.emptySet());
 
-    rebalanceResultA = 
tenantRebalanceResult.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_A);
-    rebalanceResultB = 
tenantRebalanceResult.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_B);
+      tenantRebalanceResult = tenantRebalancer.rebalance(config);
 
-    assertNull(rebalanceResultA);
-    assertNotNull(rebalanceResultB);
+      rebalanceResultA = 
tenantRebalanceResult.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_A);
+      rebalanceResultB = 
tenantRebalanceResult.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_B);
 
-    assertEquals(rebalanceResultB.getStatus(), RebalanceResult.Status.DONE);
+      assertNull(rebalanceResultA);
+      assertNotNull(rebalanceResultB);
 
-    // allow table B, also block table B
-    config.setIncludeTables(Collections.singleton(OFFLINE_TABLE_NAME_B));
-    config.setExcludeTables(Collections.singleton(OFFLINE_TABLE_NAME_B));
+      assertEquals(rebalanceResultB.getStatus(), RebalanceResult.Status.DONE);
 
-    tenantRebalanceResult = tenantRebalancer.rebalance(config);
+      // allow table B, also block table B
+      config.setIncludeTables(Collections.singleton(OFFLINE_TABLE_NAME_B));
+      config.setExcludeTables(Collections.singleton(OFFLINE_TABLE_NAME_B));
 
-    assertTrue(tenantRebalanceResult.getRebalanceTableResults().isEmpty());
+      tenantRebalanceResult = tenantRebalancer.rebalance(config);
 
-    // allow a non-existing table
-    
config.setIncludeTables(Collections.singleton("TableDoesNotExist_OFFLINE"));
-    config.setExcludeTables(Collections.emptySet());
+      assertTrue(tenantRebalanceResult.getRebalanceTableResults().isEmpty());
 
-    tenantRebalanceResult = tenantRebalancer.rebalance(config);
+      // allow a non-existing table
+      
config.setIncludeTables(Collections.singleton("TableDoesNotExist_OFFLINE"));
+      config.setExcludeTables(Collections.emptySet());
 
-    assertTrue(tenantRebalanceResult.getRebalanceTableResults().isEmpty());
+      tenantRebalanceResult = tenantRebalancer.rebalance(config);
 
-    _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_A);
-    _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_B);
+      assertTrue(tenantRebalanceResult.getRebalanceTableResults().isEmpty());
+    } finally {
+      _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_A);
+      _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_B);
 
-    for (int i = 0; i < numServers + numServersToAdd; i++) {
-      stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+      for (int i = 0; i < numServers + numServersToAdd; i++) {
+        stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+      }
     }
   }
 
@@ -300,62 +302,67 @@ public class TenantRebalancerTest extends ControllerTest {
     TenantRebalancer tenantRebalancer =
         new TenantRebalancer(_tableRebalanceManager, _helixResourceManager, 
_executorService);
 
-    // table A set tenantConfig.tenants.server to tenantName
-    // SHOULD be selected as tenant's table
-    TableConfig tableConfigA = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(tableNameA)
-        
.setServerTenant(TENANT_NAME).setBrokerTenant(DEFAULT_TENANT_NAME).build();
-    // table B set tenantConfig.tagOverrideConfig.realtimeConsuming to 
tenantName
-    // SHOULD be selected as tenant's table
-    TableConfig tableConfigB = new 
TableConfigBuilder(TableType.REALTIME).setTableName(tableNameB)
-        .setServerTenant(DEFAULT_TENANT_NAME)
-        .setBrokerTenant(DEFAULT_TENANT_NAME)
-        .setTagOverrideConfig(new 
TagOverrideConfig(TagNameUtils.getRealtimeTagForTenant(TENANT_NAME), null))
-        
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap())
-        .build();
-    // table C set instanceAssignmentConfigMap.OFFLINE.tagPoolConfig.tag to 
tenantName
-    // SHOULD be selected as tenant's table
-    TableConfig tableConfigC = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(tableNameC)
-        .setServerTenant(DEFAULT_TENANT_NAME)
-        .setBrokerTenant(DEFAULT_TENANT_NAME)
-        .setInstanceAssignmentConfigMap(
-            Collections.singletonMap("OFFLINE", new InstanceAssignmentConfig(
-                new 
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false, 
0, null), null,
-                new InstanceReplicaGroupPartitionConfig(true, 0, 1, 0, 0, 0, 
false, null), null, true
-            ))).build();
-    // table D set tierConfigList[0].serverTag to tenantName
-    // SHOULD be selected as tenant's table
-    TableConfig tableConfigD = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(tableNameD)
-        .setServerTenant(DEFAULT_TENANT_NAME)
-        .setBrokerTenant(DEFAULT_TENANT_NAME)
-        .setTierConfigList(Collections.singletonList(
-            new TierConfig("dummyTier", 
TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "7d", null,
-                TierFactory.PINOT_SERVER_STORAGE_TYPE,
-                TagNameUtils.getOfflineTagForTenant(TENANT_NAME), null, 
null))).build();
-    // table E set to default tenant
-    // SHOULD NOT be selected as tenant's table
-    TableConfig tableConfigE = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(tableNameE)
-        
.setServerTenant(DEFAULT_TENANT_NAME).setBrokerTenant(DEFAULT_TENANT_NAME).setNumReplicas(NUM_REPLICAS).build();
-    // Create the table
-    _helixResourceManager.addTable(tableConfigA);
-    _helixResourceManager.addTable(tableConfigB);
-    _helixResourceManager.addTable(tableConfigC);
-    _helixResourceManager.addTable(tableConfigD);
-    _helixResourceManager.addTable(tableConfigE);
-    Set<String> tenantTables = tenantRebalancer.getTenantTables(TENANT_NAME);
-    assertEquals(tenantTables.size(), 4);
-    
assertTrue(tenantTables.contains(TableNameBuilder.OFFLINE.tableNameWithType(tableNameA)));
-    
assertTrue(tenantTables.contains(TableNameBuilder.REALTIME.tableNameWithType(tableNameB)));
-    
assertTrue(tenantTables.contains(TableNameBuilder.OFFLINE.tableNameWithType(tableNameC)));
-    
assertTrue(tenantTables.contains(TableNameBuilder.OFFLINE.tableNameWithType(tableNameD)));
-    
assertFalse(tenantTables.contains(TableNameBuilder.OFFLINE.tableNameWithType(tableNameE)));
-
-    _helixResourceManager.deleteOfflineTable(tableNameA);
-    _helixResourceManager.deleteRealtimeTable(tableNameB);
-    _helixResourceManager.deleteOfflineTable(tableNameC);
-    _helixResourceManager.deleteOfflineTable(tableNameD);
-    _helixResourceManager.deleteOfflineTable(tableNameE);
-    for (int i = 0; i < numServers; i++) {
-      stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+    try {
+      // table A set tenantConfig.tenants.server to tenantName
+      // SHOULD be selected as tenant's table
+      TableConfig tableConfigA = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(tableNameA)
+          
.setServerTenant(TENANT_NAME).setBrokerTenant(DEFAULT_TENANT_NAME).build();
+      // table B set tenantConfig.tagOverrideConfig.realtimeConsuming to 
tenantName
+      // SHOULD be selected as tenant's table
+      TableConfig tableConfigB = new 
TableConfigBuilder(TableType.REALTIME).setTableName(tableNameB)
+          .setServerTenant(DEFAULT_TENANT_NAME)
+          .setBrokerTenant(DEFAULT_TENANT_NAME)
+          .setTagOverrideConfig(new 
TagOverrideConfig(TagNameUtils.getRealtimeTagForTenant(TENANT_NAME), null))
+          
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap())
+          .build();
+      // table C set instanceAssignmentConfigMap.OFFLINE.tagPoolConfig.tag to 
tenantName
+      // SHOULD be selected as tenant's table
+      TableConfig tableConfigC = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(tableNameC)
+          .setServerTenant(DEFAULT_TENANT_NAME)
+          .setBrokerTenant(DEFAULT_TENANT_NAME)
+          .setInstanceAssignmentConfigMap(
+              Collections.singletonMap("OFFLINE", new InstanceAssignmentConfig(
+                  new 
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false, 
0, null), null,
+                  new InstanceReplicaGroupPartitionConfig(true, 0, 1, 0, 0, 0, 
false, null), null, true
+              ))).build();
+      // table D set tierConfigList[0].serverTag to tenantName
+      // SHOULD be selected as tenant's table
+      TableConfig tableConfigD = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(tableNameD)
+          .setServerTenant(DEFAULT_TENANT_NAME)
+          .setBrokerTenant(DEFAULT_TENANT_NAME)
+          .setTierConfigList(Collections.singletonList(
+              new TierConfig("dummyTier", 
TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "7d", null,
+                  TierFactory.PINOT_SERVER_STORAGE_TYPE,
+                  TagNameUtils.getOfflineTagForTenant(TENANT_NAME), null, 
null))).build();
+      // table E set to default tenant
+      // SHOULD NOT be selected as tenant's table
+      TableConfig tableConfigE = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(tableNameE)
+          .setServerTenant(DEFAULT_TENANT_NAME)
+          .setBrokerTenant(DEFAULT_TENANT_NAME)
+          .setNumReplicas(NUM_REPLICAS)
+          .build();
+      // Create the table
+      _helixResourceManager.addTable(tableConfigA);
+      _helixResourceManager.addTable(tableConfigB);
+      _helixResourceManager.addTable(tableConfigC);
+      _helixResourceManager.addTable(tableConfigD);
+      _helixResourceManager.addTable(tableConfigE);
+      Set<String> tenantTables = tenantRebalancer.getTenantTables(TENANT_NAME);
+      assertEquals(tenantTables.size(), 4);
+      
assertTrue(tenantTables.contains(TableNameBuilder.OFFLINE.tableNameWithType(tableNameA)));
+      
assertTrue(tenantTables.contains(TableNameBuilder.REALTIME.tableNameWithType(tableNameB)));
+      
assertTrue(tenantTables.contains(TableNameBuilder.OFFLINE.tableNameWithType(tableNameC)));
+      
assertTrue(tenantTables.contains(TableNameBuilder.OFFLINE.tableNameWithType(tableNameD)));
+      
assertFalse(tenantTables.contains(TableNameBuilder.OFFLINE.tableNameWithType(tableNameE)));
+    } finally {
+      _helixResourceManager.deleteOfflineTable(tableNameA);
+      _helixResourceManager.deleteRealtimeTable(tableNameB);
+      _helixResourceManager.deleteOfflineTable(tableNameC);
+      _helixResourceManager.deleteOfflineTable(tableNameD);
+      _helixResourceManager.deleteOfflineTable(tableNameE);
+      for (int i = 0; i < numServers; i++) {
+        stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+      }
     }
   }
 
@@ -388,118 +395,120 @@ public class TenantRebalancerTest extends 
ControllerTest {
     }
     addTenantTagToInstances(TENANT_NAME);
 
-    // rebalance the tables on test tenant, this should be a pure scale out
-    TenantRebalanceConfig config = new TenantRebalanceConfig();
-    config.setTenantName(TENANT_NAME);
-    config.setVerboseResult(true);
-    config.setDryRun(true);
-    TenantRebalanceResult dryRunResult = tenantRebalancer.rebalance(config);
-
-    RebalanceSummaryResult.ServerInfo serverInfo =
-        
dryRunResult.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_B).getRebalanceSummaryResult().getServerInfo();
-    assertEquals(serverInfo.getServersAdded().size(), 3);
-    assertEquals(serverInfo.getServersRemoved().size(), 0);
-
-    Queue<TenantRebalancer.TenantTableRebalanceJobContext> tableQueue =
-        tenantRebalancer.createTableQueue(config, 
dryRunResult.getRebalanceTableResults());
-    // Dimension Table B should be rebalance first since it is a dim table, 
and we're doing scale out
-    TenantRebalancer.TenantTableRebalanceJobContext jobContext = 
tableQueue.poll();
-    assertNotNull(jobContext);
-    assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_B);
-    jobContext = tableQueue.poll();
-    assertNotNull(jobContext);
-    assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_A);
-
-    // untag server 0, now the rebalance is not a pure scale in/out
-    _helixResourceManager.updateInstanceTags(SERVER_INSTANCE_ID_PREFIX + 0, 
"", false);
-    dryRunResult = tenantRebalancer.rebalance(config);
-
-    serverInfo =
-        
dryRunResult.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_B).getRebalanceSummaryResult().getServerInfo();
-    assertEquals(serverInfo.getServersAdded().size(), 3);
-    assertEquals(serverInfo.getServersRemoved().size(), 1);
-
-    tableQueue = tenantRebalancer.createTableQueue(config, 
dryRunResult.getRebalanceTableResults());
-    // Dimension table B should be rebalance first in this case. (it does not 
matter whether dimension tables are
-    // rebalanced first or last in this case, simply because we defaulted it 
to be first while non-pure scale in/out)
-    jobContext = tableQueue.poll();
-    assertNotNull(jobContext);
-    assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_B);
-    jobContext = tableQueue.poll();
-    assertNotNull(jobContext);
-    assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_A);
-
-    // untag the added servers, now the rebalance is a pure scale in
-    for (int i = numServers; i < numServers + numServersToAdd; i++) {
-      _helixResourceManager.updateInstanceTags(SERVER_INSTANCE_ID_PREFIX + i, 
"", false);
-    }
-    dryRunResult = tenantRebalancer.rebalance(config);
-
-    serverInfo =
-        
dryRunResult.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_B).getRebalanceSummaryResult().getServerInfo();
-    assertEquals(serverInfo.getServersAdded().size(), 0);
-    assertEquals(serverInfo.getServersRemoved().size(), 1);
-
-    tableQueue = tenantRebalancer.createTableQueue(config, 
dryRunResult.getRebalanceTableResults());
-    // Dimension table B should be rebalance last in this case
-    jobContext = tableQueue.poll();
-    assertNotNull(jobContext);
-    assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_A);
-    jobContext = tableQueue.poll();
-    assertNotNull(jobContext);
-    assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_B);
-
-    // set table B in parallel blacklist, so that it ends up in sequential 
queue, and table A in parallel queue
-    
Pair<ConcurrentLinkedDeque<TenantRebalancer.TenantTableRebalanceJobContext>,
-        Queue<TenantRebalancer.TenantTableRebalanceJobContext>>
-        queues =
-        tenantRebalancer.createParallelAndSequentialQueues(config, 
dryRunResult.getRebalanceTableResults(), null,
-            Collections.singleton(OFFLINE_TABLE_NAME_B));
-    Queue<TenantRebalancer.TenantTableRebalanceJobContext> parallelQueue = 
queues.getLeft();
-    Queue<TenantRebalancer.TenantTableRebalanceJobContext> sequentialQueue = 
queues.getRight();
-    jobContext = parallelQueue.poll();
-    assertNotNull(jobContext);
-    assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_A);
-    assertNull(parallelQueue.poll());
-    jobContext = sequentialQueue.poll();
-    assertNotNull(jobContext);
-    assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_B);
-    assertNull(sequentialQueue.poll());
-
-    // set table B in parallel whitelist, so that it ends up in parallel 
queue, and table A in sequential queue
-    queues = tenantRebalancer.createParallelAndSequentialQueues(config, 
dryRunResult.getRebalanceTableResults(),
-        Collections.singleton(OFFLINE_TABLE_NAME_B), null);
-    parallelQueue = queues.getLeft();
-    sequentialQueue = queues.getRight();
-    jobContext = parallelQueue.poll();
-    assertNotNull(jobContext);
-    assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_B);
-    assertNull(parallelQueue.poll());
-    jobContext = sequentialQueue.poll();
-    assertNotNull(jobContext);
-    assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_A);
-    assertNull(sequentialQueue.poll());
-
-    // set both tables in parallel whitelist, and table B in parallel 
blacklist, so that B ends up in sequential
-    // queue, and table A in parallel queue
-    queues = tenantRebalancer.createParallelAndSequentialQueues(config, 
dryRunResult.getRebalanceTableResults(),
-        Set.of(OFFLINE_TABLE_NAME_A, OFFLINE_TABLE_NAME_B), 
Collections.singleton(OFFLINE_TABLE_NAME_B));
-    parallelQueue = queues.getLeft();
-    sequentialQueue = queues.getRight();
-    jobContext = parallelQueue.poll();
-    assertNotNull(jobContext);
-    assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_A);
-    assertNull(parallelQueue.poll());
-    jobContext = sequentialQueue.poll();
-    assertNotNull(jobContext);
-    assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_B);
-    assertNull(sequentialQueue.poll());
-
-    _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_A);
-    _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_B);
-
-    for (int i = 0; i < numServers + numServersToAdd; i++) {
-      stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+    try {
+      // rebalance the tables on test tenant, this should be a pure scale out
+      TenantRebalanceConfig config = new TenantRebalanceConfig();
+      config.setTenantName(TENANT_NAME);
+      config.setVerboseResult(true);
+      config.setDryRun(true);
+      TenantRebalanceResult dryRunResult = tenantRebalancer.rebalance(config);
+
+      RebalanceSummaryResult.ServerInfo serverInfo =
+          
dryRunResult.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_B).getRebalanceSummaryResult().getServerInfo();
+      assertEquals(serverInfo.getServersAdded().size(), 3);
+      assertEquals(serverInfo.getServersRemoved().size(), 0);
+
+      Queue<TenantRebalancer.TenantTableRebalanceJobContext> tableQueue =
+          tenantRebalancer.createTableQueue(config, 
dryRunResult.getRebalanceTableResults());
+      // Dimension Table B should be rebalance first since it is a dim table, 
and we're doing scale out
+      TenantRebalancer.TenantTableRebalanceJobContext jobContext = 
tableQueue.poll();
+      assertNotNull(jobContext);
+      assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_B);
+      jobContext = tableQueue.poll();
+      assertNotNull(jobContext);
+      assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_A);
+
+      // untag server 0, now the rebalance is not a pure scale in/out
+      _helixResourceManager.updateInstanceTags(SERVER_INSTANCE_ID_PREFIX + 0, 
"", false);
+      dryRunResult = tenantRebalancer.rebalance(config);
+
+      serverInfo =
+          
dryRunResult.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_B).getRebalanceSummaryResult().getServerInfo();
+      assertEquals(serverInfo.getServersAdded().size(), 3);
+      assertEquals(serverInfo.getServersRemoved().size(), 1);
+
+      tableQueue = tenantRebalancer.createTableQueue(config, 
dryRunResult.getRebalanceTableResults());
+      // Dimension table B should be rebalance first in this case. (it does 
not matter whether dimension tables are
+      // rebalanced first or last in this case, simply because we defaulted it 
to be first while non-pure scale in/out)
+      jobContext = tableQueue.poll();
+      assertNotNull(jobContext);
+      assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_B);
+      jobContext = tableQueue.poll();
+      assertNotNull(jobContext);
+      assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_A);
+
+      // untag the added servers, now the rebalance is a pure scale in
+      for (int i = numServers; i < numServers + numServersToAdd; i++) {
+        _helixResourceManager.updateInstanceTags(SERVER_INSTANCE_ID_PREFIX + 
i, "", false);
+      }
+      dryRunResult = tenantRebalancer.rebalance(config);
+
+      serverInfo =
+          
dryRunResult.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_B).getRebalanceSummaryResult().getServerInfo();
+      assertEquals(serverInfo.getServersAdded().size(), 0);
+      assertEquals(serverInfo.getServersRemoved().size(), 1);
+
+      tableQueue = tenantRebalancer.createTableQueue(config, 
dryRunResult.getRebalanceTableResults());
+      // Dimension table B should be rebalance last in this case
+      jobContext = tableQueue.poll();
+      assertNotNull(jobContext);
+      assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_A);
+      jobContext = tableQueue.poll();
+      assertNotNull(jobContext);
+      assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_B);
+
+      // set table B in parallel blacklist, so that it ends up in sequential 
queue, and table A in parallel queue
+      
Pair<ConcurrentLinkedDeque<TenantRebalancer.TenantTableRebalanceJobContext>,
+          Queue<TenantRebalancer.TenantTableRebalanceJobContext>>
+          queues =
+          tenantRebalancer.createParallelAndSequentialQueues(config, 
dryRunResult.getRebalanceTableResults(), null,
+              Collections.singleton(OFFLINE_TABLE_NAME_B));
+      Queue<TenantRebalancer.TenantTableRebalanceJobContext> parallelQueue = 
queues.getLeft();
+      Queue<TenantRebalancer.TenantTableRebalanceJobContext> sequentialQueue = 
queues.getRight();
+      jobContext = parallelQueue.poll();
+      assertNotNull(jobContext);
+      assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_A);
+      assertNull(parallelQueue.poll());
+      jobContext = sequentialQueue.poll();
+      assertNotNull(jobContext);
+      assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_B);
+      assertNull(sequentialQueue.poll());
+
+      // set table B in parallel whitelist, so that it ends up in parallel 
queue, and table A in sequential queue
+      queues = tenantRebalancer.createParallelAndSequentialQueues(config, 
dryRunResult.getRebalanceTableResults(),
+          Collections.singleton(OFFLINE_TABLE_NAME_B), null);
+      parallelQueue = queues.getLeft();
+      sequentialQueue = queues.getRight();
+      jobContext = parallelQueue.poll();
+      assertNotNull(jobContext);
+      assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_B);
+      assertNull(parallelQueue.poll());
+      jobContext = sequentialQueue.poll();
+      assertNotNull(jobContext);
+      assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_A);
+      assertNull(sequentialQueue.poll());
+
+      // set both tables in parallel whitelist, and table B in parallel 
blacklist, so that B ends up in sequential
+      // queue, and table A in parallel queue
+      queues = tenantRebalancer.createParallelAndSequentialQueues(config, 
dryRunResult.getRebalanceTableResults(),
+          Set.of(OFFLINE_TABLE_NAME_A, OFFLINE_TABLE_NAME_B), 
Collections.singleton(OFFLINE_TABLE_NAME_B));
+      parallelQueue = queues.getLeft();
+      sequentialQueue = queues.getRight();
+      jobContext = parallelQueue.poll();
+      assertNotNull(jobContext);
+      assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_A);
+      assertNull(parallelQueue.poll());
+      jobContext = sequentialQueue.poll();
+      assertNotNull(jobContext);
+      assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_B);
+      assertNull(sequentialQueue.poll());
+    } finally {
+      _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_A);
+      _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_B);
+
+      for (int i = 0; i < numServers + numServersToAdd; i++) {
+        stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+      }
     }
   }
 
@@ -1123,57 +1132,60 @@ public class TenantRebalancerTest extends 
ControllerTest {
     }
     addTenantTagToInstances(TENANT_NAME);
 
-    String jobId = "test-poll-job-123";
-
-    // Create tenant rebalance context with tables in queues
-    TenantRebalanceConfig config = new TenantRebalanceConfig();
-    config.setTenantName(TENANT_NAME);
-    config.setVerboseResult(true);
-    config.setDryRun(true);
-
-    TenantRebalanceResult dryRunResult = tenantRebalancer.rebalance(config);
-    TenantRebalanceContext context = new TenantRebalanceContext(
-        jobId, config, 1,
-        dryRunResult.getRebalanceTableResults().keySet().stream()
-            .map(tableName -> new 
TenantRebalancer.TenantTableRebalanceJobContext(tableName, tableName + "_job", 
false))
-            .collect(Collectors.toCollection(ConcurrentLinkedDeque::new)),
-        new LinkedList<>(),
-        new ConcurrentLinkedQueue<>()
-    );
-
-    TenantRebalanceProgressStats progressStats =
-        new 
TenantRebalanceProgressStats(dryRunResult.getRebalanceTableResults().keySet());
-
-    // Test polling from parallel queue
-    ZkBasedTenantRebalanceObserver observer =
-        new ZkBasedTenantRebalanceObserver(jobId, TENANT_NAME, progressStats, 
context, _helixResourceManager);
-    TenantRebalancer.TenantTableRebalanceJobContext polledJob = 
observer.pollParallel();
-    assertNotNull(polledJob);
-    
assertTrue(dryRunResult.getRebalanceTableResults().containsKey(polledJob.getTableName()));
-
-    // Test polling from sequential queue (should be empty)
-    TenantRebalancer.TenantTableRebalanceJobContext sequentialJob = 
observer.pollSequential();
-    assertNull(sequentialJob);
-
-    // Verify the job was moved to ongoing queue and status was updated
-    Map<String, String> updatedMetadata =
-        _helixResourceManager.getControllerJobZKMetadata(jobId, 
ControllerJobTypes.TENANT_REBALANCE);
-    assertNotNull(updatedMetadata);
-    TenantRebalanceContext updatedContext = 
TenantRebalanceContext.fromTenantRebalanceJobMetadata(updatedMetadata);
-    TenantRebalanceProgressStats updatedStats =
-        
TenantRebalanceProgressStats.fromTenantRebalanceJobMetadata(updatedMetadata);
-
-    assertNotNull(updatedContext);
-    assertNotNull(updatedStats);
-    assertEquals(updatedContext.getOngoingJobsQueue().size(), 1);
-    
assertEquals(updatedStats.getTableStatusMap().get(polledJob.getTableName()),
-        TenantRebalanceProgressStats.TableStatus.REBALANCING.name());
-
-    _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_A);
-    _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_B);
+    try {
+      String jobId = "test-poll-job-123";
+
+      // Create tenant rebalance context with tables in queues
+      TenantRebalanceConfig config = new TenantRebalanceConfig();
+      config.setTenantName(TENANT_NAME);
+      config.setVerboseResult(true);
+      config.setDryRun(true);
+
+      TenantRebalanceResult dryRunResult = tenantRebalancer.rebalance(config);
+      TenantRebalanceContext context = new TenantRebalanceContext(
+          jobId, config, 1,
+          dryRunResult.getRebalanceTableResults().keySet().stream()
+              .map(tableName -> new 
TenantRebalancer.TenantTableRebalanceJobContext(tableName, tableName + "_job",
+                  false))
+              .collect(Collectors.toCollection(ConcurrentLinkedDeque::new)),
+          new LinkedList<>(),
+          new ConcurrentLinkedQueue<>()
+      );
+
+      TenantRebalanceProgressStats progressStats =
+          new 
TenantRebalanceProgressStats(dryRunResult.getRebalanceTableResults().keySet());
+
+      // Test polling from parallel queue
+      ZkBasedTenantRebalanceObserver observer =
+          new ZkBasedTenantRebalanceObserver(jobId, TENANT_NAME, 
progressStats, context, _helixResourceManager);
+      TenantRebalancer.TenantTableRebalanceJobContext polledJob = 
observer.pollParallel();
+      assertNotNull(polledJob);
+      
assertTrue(dryRunResult.getRebalanceTableResults().containsKey(polledJob.getTableName()));
+
+      // Test polling from sequential queue (should be empty)
+      TenantRebalancer.TenantTableRebalanceJobContext sequentialJob = 
observer.pollSequential();
+      assertNull(sequentialJob);
+
+      // Verify the job was moved to ongoing queue and status was updated
+      Map<String, String> updatedMetadata =
+          _helixResourceManager.getControllerJobZKMetadata(jobId, 
ControllerJobTypes.TENANT_REBALANCE);
+      assertNotNull(updatedMetadata);
+      TenantRebalanceContext updatedContext = 
TenantRebalanceContext.fromTenantRebalanceJobMetadata(updatedMetadata);
+      TenantRebalanceProgressStats updatedStats =
+          
TenantRebalanceProgressStats.fromTenantRebalanceJobMetadata(updatedMetadata);
+
+      assertNotNull(updatedContext);
+      assertNotNull(updatedStats);
+      assertEquals(updatedContext.getOngoingJobsQueue().size(), 1);
+      
assertEquals(updatedStats.getTableStatusMap().get(polledJob.getTableName()),
+          TenantRebalanceProgressStats.TableStatus.REBALANCING.name());
+    } finally {
+      _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_A);
+      _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_B);
 
-    for (int i = 0; i < numServers + numServersToAdd; i++) {
-      stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+      for (int i = 0; i < numServers + numServersToAdd; i++) {
+        stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+      }
     }
   }
 
@@ -1206,78 +1218,81 @@ public class TenantRebalancerTest extends 
ControllerTest {
     }
     addTenantTagToInstances(TENANT_NAME);
 
-    // Create observer and test cancellation
-    String jobId = "test-cancel-job-456";
-
-    // Create tenant rebalance context with tables in queues
-    TenantRebalanceConfig config = new TenantRebalanceConfig();
-    config.setTenantName(TENANT_NAME);
-    config.setVerboseResult(true);
-    config.setDryRun(true);
-
-    TenantRebalanceResult dryRunResult = tenantRebalancer.rebalance(config);
-    Set<String> tableNames = dryRunResult.getRebalanceTableResults().keySet();
-
-    TenantRebalanceContext context = new TenantRebalanceContext(
-        jobId, config, 1,
-        tableNames.stream()
-            .map(tableName -> new 
TenantRebalancer.TenantTableRebalanceJobContext(tableName, tableName + "_job", 
false))
-            .collect(Collectors.toCollection(ConcurrentLinkedDeque::new)),
-        new LinkedList<>(),
-        new ConcurrentLinkedQueue<>()
-    );
-
-    TenantRebalanceProgressStats progressStats = new 
TenantRebalanceProgressStats(tableNames);
-
-    // Test cancellation by user
-    ZkBasedTenantRebalanceObserver observer =
-        new ZkBasedTenantRebalanceObserver(jobId, TENANT_NAME, progressStats, 
context, _helixResourceManager);
-
-    // move one job to ongoing to test cancellation from that state
-    TenantRebalancer.TenantTableRebalanceJobContext polledJob = 
observer.pollParallel();
-    assertNotNull(polledJob);
-    
assertTrue(dryRunResult.getRebalanceTableResults().containsKey(polledJob.getTableName()));
-
-    Pair<List<String>, Boolean> cancelResult = observer.cancelJob(true);
-    assertTrue(cancelResult.getRight()); // cancellation was successful
-    assertTrue(cancelResult.getLeft()
-        .isEmpty()); // no jobs were cancelled (the polled job hasn't started 
its table rebalance job yet thus won't
-    // show in the cancelled list)
-
-    // Verify that queues are emptied and status is updated
-    Map<String, String> updatedMetadata =
-        _helixResourceManager.getControllerJobZKMetadata(jobId, 
ControllerJobTypes.TENANT_REBALANCE);
-    assertNotNull(updatedMetadata);
-    TenantRebalanceContext updatedContext = 
TenantRebalanceContext.fromTenantRebalanceJobMetadata(updatedMetadata);
-    TenantRebalanceProgressStats updatedStats =
-        
TenantRebalanceProgressStats.fromTenantRebalanceJobMetadata(updatedMetadata);
-
-    assertNotNull(updatedContext);
-    assertNotNull(updatedStats);
-    assertTrue(updatedContext.getParallelQueue().isEmpty());
-    assertTrue(updatedContext.getSequentialQueue().isEmpty());
-    assertTrue(updatedContext.getOngoingJobsQueue().isEmpty());
-    assertEquals(updatedStats.getRemainingTables(), 0);
-    assertEquals(updatedStats.getCompletionStatusMsg(), "Tenant rebalance job 
has been cancelled.");
-    assertTrue(updatedStats.getTimeToFinishInSeconds() >= 0);
-
-    // Verify all tables are marked as not scheduled
-    for (String tableName : tableNames) {
-      if (tableName.equals(polledJob.getTableName())) {
-        // the polled job was in ongoing, so should be marked as CANCELLED
-        assertEquals(updatedStats.getTableStatusMap().get(tableName),
-            TenantRebalanceProgressStats.TableStatus.CANCELLED.name());
-      } else {
-        assertEquals(updatedStats.getTableStatusMap().get(tableName),
-            TenantRebalanceProgressStats.TableStatus.NOT_SCHEDULED.name());
+    try {
+      // Create observer and test cancellation
+      String jobId = "test-cancel-job-456";
+
+      // Create tenant rebalance context with tables in queues
+      TenantRebalanceConfig config = new TenantRebalanceConfig();
+      config.setTenantName(TENANT_NAME);
+      config.setVerboseResult(true);
+      config.setDryRun(true);
+
+      TenantRebalanceResult dryRunResult = tenantRebalancer.rebalance(config);
+      Set<String> tableNames = 
dryRunResult.getRebalanceTableResults().keySet();
+
+      TenantRebalanceContext context = new TenantRebalanceContext(
+          jobId, config, 1,
+          tableNames.stream()
+              .map(tableName -> new 
TenantRebalancer.TenantTableRebalanceJobContext(tableName, tableName + "_job",
+                  false))
+              .collect(Collectors.toCollection(ConcurrentLinkedDeque::new)),
+          new LinkedList<>(),
+          new ConcurrentLinkedQueue<>()
+      );
+
+      TenantRebalanceProgressStats progressStats = new 
TenantRebalanceProgressStats(tableNames);
+
+      // Test cancellation by user
+      ZkBasedTenantRebalanceObserver observer =
+          new ZkBasedTenantRebalanceObserver(jobId, TENANT_NAME, 
progressStats, context, _helixResourceManager);
+
+      // move one job to ongoing to test cancellation from that state
+      TenantRebalancer.TenantTableRebalanceJobContext polledJob = 
observer.pollParallel();
+      assertNotNull(polledJob);
+      
assertTrue(dryRunResult.getRebalanceTableResults().containsKey(polledJob.getTableName()));
+
+      Pair<List<String>, Boolean> cancelResult = observer.cancelJob(true);
+      assertTrue(cancelResult.getRight()); // cancellation was successful
+      assertTrue(cancelResult.getLeft()
+          .isEmpty()); // no jobs were cancelled (the polled job hasn't 
started its table rebalance job yet thus won't
+      // show in the cancelled list)
+
+      // Verify that queues are emptied and status is updated
+      Map<String, String> updatedMetadata =
+          _helixResourceManager.getControllerJobZKMetadata(jobId, 
ControllerJobTypes.TENANT_REBALANCE);
+      assertNotNull(updatedMetadata);
+      TenantRebalanceContext updatedContext = 
TenantRebalanceContext.fromTenantRebalanceJobMetadata(updatedMetadata);
+      TenantRebalanceProgressStats updatedStats =
+          
TenantRebalanceProgressStats.fromTenantRebalanceJobMetadata(updatedMetadata);
+
+      assertNotNull(updatedContext);
+      assertNotNull(updatedStats);
+      assertTrue(updatedContext.getParallelQueue().isEmpty());
+      assertTrue(updatedContext.getSequentialQueue().isEmpty());
+      assertTrue(updatedContext.getOngoingJobsQueue().isEmpty());
+      assertEquals(updatedStats.getRemainingTables(), 0);
+      assertEquals(updatedStats.getCompletionStatusMsg(), "Tenant rebalance 
job has been cancelled.");
+      assertTrue(updatedStats.getTimeToFinishInSeconds() >= 0);
+
+      // Verify all tables are marked as not scheduled
+      for (String tableName : tableNames) {
+        if (tableName.equals(polledJob.getTableName())) {
+          // the polled job was in ongoing, so should be marked as CANCELLED
+          assertEquals(updatedStats.getTableStatusMap().get(tableName),
+              TenantRebalanceProgressStats.TableStatus.CANCELLED.name());
+        } else {
+          assertEquals(updatedStats.getTableStatusMap().get(tableName),
+              TenantRebalanceProgressStats.TableStatus.NOT_SCHEDULED.name());
+        }
       }
-    }
-
-    _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_A);
-    _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_B);
+    } finally {
+      _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_A);
+      _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_B);
 
-    for (int i = 0; i < numServers + numServersToAdd; i++) {
-      stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+      for (int i = 0; i < numServers + numServersToAdd; i++) {
+        stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+      }
     }
   }
 
@@ -1310,66 +1325,71 @@ public class TenantRebalancerTest extends 
ControllerTest {
     }
     addTenantTagToInstances(TENANT_NAME);
 
-    // Create observer and test table job completion
-    String jobId = "test-table-done-job-789";
-
-    // Create tenant rebalance context with tables in ongoing queue
-    TenantRebalanceConfig config = new TenantRebalanceConfig();
-    config.setTenantName(TENANT_NAME);
-    config.setVerboseResult(true);
-    config.setDryRun(true);
-
-    TenantRebalanceResult dryRunResult = tenantRebalancer.rebalance(config);
-    Set<String> tableNames = dryRunResult.getRebalanceTableResults().keySet();
-
-    TenantRebalanceContext context = new TenantRebalanceContext(
-        jobId, config, 1,
-        new ConcurrentLinkedDeque<>(),
-        new LinkedList<>(),
-        tableNames.stream()
-            .map(tableName -> new 
TenantRebalancer.TenantTableRebalanceJobContext(tableName, tableName + "_job", 
false))
-            .collect(Collectors.toCollection(ConcurrentLinkedQueue::new))
-    );
-
-    TenantRebalanceProgressStats progressStats = new 
TenantRebalanceProgressStats(tableNames);
-    // Set initial status to REBALANCING for the tables
-    for (String tableName : tableNames) {
-      progressStats.updateTableStatus(tableName, 
TenantRebalanceProgressStats.TableStatus.REBALANCING.name());
-    }
-
-    // Test onTableJobDone
-    ZkBasedTenantRebalanceObserver observer =
-        new ZkBasedTenantRebalanceObserver(jobId, TENANT_NAME, progressStats, 
context, _helixResourceManager);
-    TenantRebalancer.TenantTableRebalanceJobContext jobContextA =
-        new 
TenantRebalancer.TenantTableRebalanceJobContext(OFFLINE_TABLE_NAME_A, 
OFFLINE_TABLE_NAME_A + "_job", false);
-    observer.onTableJobDone(jobContextA);
-    TenantRebalancer.TenantTableRebalanceJobContext jobContextB =
-        new 
TenantRebalancer.TenantTableRebalanceJobContext(OFFLINE_TABLE_NAME_B, 
OFFLINE_TABLE_NAME_B + "_job", false);
-    String errorMessage = "Test error message";
-    observer.onTableJobError(jobContextB, errorMessage);
-
-    // Verify that the job was removed from ongoing queue and status was 
updated
-    Map<String, String> updatedMetadata =
-        _helixResourceManager.getControllerJobZKMetadata(jobId, 
ControllerJobTypes.TENANT_REBALANCE);
-    assertNotNull(updatedMetadata);
-    TenantRebalanceContext updatedContext = 
TenantRebalanceContext.fromTenantRebalanceJobMetadata(updatedMetadata);
-    TenantRebalanceProgressStats updatedStats =
-        
TenantRebalanceProgressStats.fromTenantRebalanceJobMetadata(updatedMetadata);
+    try {
+      // Create observer and test table job completion
+      String jobId = "test-table-done-job-789";
+
+      // Create tenant rebalance context with tables in ongoing queue
+      TenantRebalanceConfig config = new TenantRebalanceConfig();
+      config.setTenantName(TENANT_NAME);
+      config.setVerboseResult(true);
+      config.setDryRun(true);
+
+      TenantRebalanceResult dryRunResult = tenantRebalancer.rebalance(config);
+      Set<String> tableNames = 
dryRunResult.getRebalanceTableResults().keySet();
+
+      TenantRebalanceContext context = new TenantRebalanceContext(
+          jobId, config, 1,
+          new ConcurrentLinkedDeque<>(),
+          new LinkedList<>(),
+          tableNames.stream()
+              .map(tableName -> new 
TenantRebalancer.TenantTableRebalanceJobContext(tableName, tableName + "_job",
+                  false))
+              .collect(Collectors.toCollection(ConcurrentLinkedQueue::new))
+      );
+
+      TenantRebalanceProgressStats progressStats = new 
TenantRebalanceProgressStats(tableNames);
+      // Set initial status to REBALANCING for the tables
+      for (String tableName : tableNames) {
+        progressStats.updateTableStatus(tableName, 
TenantRebalanceProgressStats.TableStatus.REBALANCING.name());
+      }
 
-    assertNotNull(updatedContext);
-    assertNotNull(updatedStats);
-    assertFalse(updatedContext.getOngoingJobsQueue().contains(jobContextA));
-    assertFalse(updatedContext.getOngoingJobsQueue().contains(jobContextB));
-    assertEquals(updatedStats.getTableStatusMap().get(OFFLINE_TABLE_NAME_A),
-        TenantRebalanceProgressStats.TableStatus.DONE.name());
-    assertEquals(updatedStats.getTableStatusMap().get(OFFLINE_TABLE_NAME_B), 
errorMessage);
-    assertEquals(updatedStats.getRemainingTables(), tableNames.size() - 2);
-
-    _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_A);
-    _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_B);
-
-    for (int i = 0; i < numServers + numServersToAdd; i++) {
-      stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+      // Test onTableJobDone
+      ZkBasedTenantRebalanceObserver observer =
+          new ZkBasedTenantRebalanceObserver(jobId, TENANT_NAME, 
progressStats, context, _helixResourceManager);
+      TenantRebalancer.TenantTableRebalanceJobContext jobContextA =
+          new 
TenantRebalancer.TenantTableRebalanceJobContext(OFFLINE_TABLE_NAME_A, 
OFFLINE_TABLE_NAME_A + "_job",
+              false);
+      observer.onTableJobDone(jobContextA);
+      TenantRebalancer.TenantTableRebalanceJobContext jobContextB =
+          new 
TenantRebalancer.TenantTableRebalanceJobContext(OFFLINE_TABLE_NAME_B, 
OFFLINE_TABLE_NAME_B + "_job",
+              false);
+      String errorMessage = "Test error message";
+      observer.onTableJobError(jobContextB, errorMessage);
+
+      // Verify that the job was removed from ongoing queue and status was 
updated
+      Map<String, String> updatedMetadata =
+          _helixResourceManager.getControllerJobZKMetadata(jobId, 
ControllerJobTypes.TENANT_REBALANCE);
+      assertNotNull(updatedMetadata);
+      TenantRebalanceContext updatedContext = 
TenantRebalanceContext.fromTenantRebalanceJobMetadata(updatedMetadata);
+      TenantRebalanceProgressStats updatedStats =
+          
TenantRebalanceProgressStats.fromTenantRebalanceJobMetadata(updatedMetadata);
+
+      assertNotNull(updatedContext);
+      assertNotNull(updatedStats);
+      assertFalse(updatedContext.getOngoingJobsQueue().contains(jobContextA));
+      assertFalse(updatedContext.getOngoingJobsQueue().contains(jobContextB));
+      assertEquals(updatedStats.getTableStatusMap().get(OFFLINE_TABLE_NAME_A),
+          TenantRebalanceProgressStats.TableStatus.DONE.name());
+      assertEquals(updatedStats.getTableStatusMap().get(OFFLINE_TABLE_NAME_B), 
errorMessage);
+      assertEquals(updatedStats.getRemainingTables(), tableNames.size() - 2);
+    } finally {
+      _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_A);
+      _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_B);
+
+      for (int i = 0; i < numServers + numServersToAdd; i++) {
+        stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+      }
     }
   }
 
@@ -1402,59 +1422,62 @@ public class TenantRebalancerTest extends 
ControllerTest {
     }
     addTenantTagToInstances(TENANT_NAME);
 
-    // Create observer and test lifecycle methods
-    String jobId = "test-lifecycle-job-202";
-
-    // Create tenant rebalance context
-    TenantRebalanceConfig config = new TenantRebalanceConfig();
-    config.setTenantName(TENANT_NAME);
-    config.setVerboseResult(true);
-    config.setDryRun(true);
-
-    TenantRebalanceResult dryRunResult = tenantRebalancer.rebalance(config);
-    Set<String> tableNames = dryRunResult.getRebalanceTableResults().keySet();
-
-    TenantRebalanceContext context = new TenantRebalanceContext(
-        jobId, config, 1,
-        tableNames.stream()
-            .map(tableName -> new 
TenantRebalancer.TenantTableRebalanceJobContext(tableName, tableName + "_job", 
false))
-            .collect(Collectors.toCollection(ConcurrentLinkedDeque::new)),
-        new LinkedList<>(),
-        new ConcurrentLinkedQueue<>()
-    );
-
-    TenantRebalanceProgressStats progressStats = new 
TenantRebalanceProgressStats(tableNames);
-
-    // Test onStart
-    ZkBasedTenantRebalanceObserver observer =
-        new ZkBasedTenantRebalanceObserver(jobId, TENANT_NAME, progressStats, 
context, _helixResourceManager);
-    observer.onStart();
-    Map<String, String> startMetadata =
-        _helixResourceManager.getControllerJobZKMetadata(jobId, 
ControllerJobTypes.TENANT_REBALANCE);
-    assertNotNull(startMetadata);
-    TenantRebalanceProgressStats startStats =
-        
TenantRebalanceProgressStats.fromTenantRebalanceJobMetadata(startMetadata);
-    assertNotNull(startStats);
-    assertTrue(startStats.getStartTimeMs() > 0);
-
-    // Test onSuccess
-    String successMessage = "Tenant rebalance completed successfully";
-    observer.onSuccess(successMessage);
-    Map<String, String> successMetadata =
-        _helixResourceManager.getControllerJobZKMetadata(jobId, 
ControllerJobTypes.TENANT_REBALANCE);
-    assertNotNull(successMetadata);
-    TenantRebalanceProgressStats successStats =
-        
TenantRebalanceProgressStats.fromTenantRebalanceJobMetadata(successMetadata);
-    assertNotNull(successStats);
-    assertEquals(successStats.getCompletionStatusMsg(), successMessage);
-    assertTrue(successStats.getTimeToFinishInSeconds() >= 0);
-    assertTrue(observer.isDone());
-
-    _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_A);
-    _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_B);
-
-    for (int i = 0; i < numServers + numServersToAdd; i++) {
-      stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+    try {
+      // Create observer and test lifecycle methods
+      String jobId = "test-lifecycle-job-202";
+
+      // Create tenant rebalance context
+      TenantRebalanceConfig config = new TenantRebalanceConfig();
+      config.setTenantName(TENANT_NAME);
+      config.setVerboseResult(true);
+      config.setDryRun(true);
+
+      TenantRebalanceResult dryRunResult = tenantRebalancer.rebalance(config);
+      Set<String> tableNames = 
dryRunResult.getRebalanceTableResults().keySet();
+
+      TenantRebalanceContext context = new TenantRebalanceContext(
+          jobId, config, 1,
+          tableNames.stream()
+              .map(tableName -> new 
TenantRebalancer.TenantTableRebalanceJobContext(tableName, tableName + "_job",
+                  false))
+              .collect(Collectors.toCollection(ConcurrentLinkedDeque::new)),
+          new LinkedList<>(),
+          new ConcurrentLinkedQueue<>()
+      );
+
+      TenantRebalanceProgressStats progressStats = new 
TenantRebalanceProgressStats(tableNames);
+
+      // Test onStart
+      ZkBasedTenantRebalanceObserver observer =
+          new ZkBasedTenantRebalanceObserver(jobId, TENANT_NAME, 
progressStats, context, _helixResourceManager);
+      observer.onStart();
+      Map<String, String> startMetadata =
+          _helixResourceManager.getControllerJobZKMetadata(jobId, 
ControllerJobTypes.TENANT_REBALANCE);
+      assertNotNull(startMetadata);
+      TenantRebalanceProgressStats startStats =
+          
TenantRebalanceProgressStats.fromTenantRebalanceJobMetadata(startMetadata);
+      assertNotNull(startStats);
+      assertTrue(startStats.getStartTimeMs() > 0);
+
+      // Test onSuccess
+      String successMessage = "Tenant rebalance completed successfully";
+      observer.onSuccess(successMessage);
+      Map<String, String> successMetadata =
+          _helixResourceManager.getControllerJobZKMetadata(jobId, 
ControllerJobTypes.TENANT_REBALANCE);
+      assertNotNull(successMetadata);
+      TenantRebalanceProgressStats successStats =
+          
TenantRebalanceProgressStats.fromTenantRebalanceJobMetadata(successMetadata);
+      assertNotNull(successStats);
+      assertEquals(successStats.getCompletionStatusMsg(), successMessage);
+      assertTrue(successStats.getTimeToFinishInSeconds() >= 0);
+      assertTrue(observer.isDone());
+    } finally {
+      _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_A);
+      _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_B);
+
+      for (int i = 0; i < numServers + numServersToAdd; i++) {
+        stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+      }
     }
   }
 
@@ -1484,27 +1507,27 @@ public class TenantRebalancerTest extends 
ControllerTest {
     }
     addTenantTagToInstances(TENANT_NAME);
 
-    for (int i = 0; i < 3; i++) {
-      runZkBasedTenantRebalanceObserverConcurrentPoll();
-    }
-
-    // Clean up tables
-    for (String tableName : tableNames) {
-      _helixResourceManager.deleteOfflineTable(tableName);
-    }
+    try {
+      for (int i = 0; i < 3; i++) {
+        runZkBasedTenantRebalanceObserverConcurrentPoll("concurrent-poll-job-" 
+ i);
+      }
+    } finally {
+      // Clean up tables
+      for (String tableName : tableNames) {
+        _helixResourceManager.deleteOfflineTable(tableName);
+      }
 
-    for (int i = 0; i < numServers + numServersToAdd; i++) {
-      stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+      for (int i = 0; i < numServers + numServersToAdd; i++) {
+        stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+      }
     }
   }
 
-  private void runZkBasedTenantRebalanceObserverConcurrentPoll()
+  private void runZkBasedTenantRebalanceObserverConcurrentPoll(String jobId)
       throws Exception {
     TenantRebalancer tenantRebalancer =
         new TenantRebalancer(_tableRebalanceManager, _helixResourceManager, 
_executorService);
 
-    String jobId = "test-concurrent-poll-job-303";
-
     // Create tenant rebalance context with multiple tables
     TenantRebalanceConfig config = new TenantRebalanceConfig();
     config.setTenantName(TENANT_NAME);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to