This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 17dfafed0d4 Fix flaky TenantRebalancerTest (#16956)
17dfafed0d4 is described below
commit 17dfafed0d4d169773321b64c81ac172dc459cd2
Author: Jhow <[email protected]>
AuthorDate: Mon Oct 13 14:50:18 2025 -0700
Fix flaky TenantRebalancerTest (#16956)
---
.../tenant/ZkBasedTenantRebalanceObserver.java | 26 +-
.../rebalance/tenant/TenantRebalancerTest.java | 1043 ++++++++++----------
2 files changed, 550 insertions(+), 519 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
index e1237238065..b1a54cbf59b 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -46,16 +47,16 @@ import org.slf4j.LoggerFactory;
public class ZkBasedTenantRebalanceObserver {
private static final Logger LOGGER =
LoggerFactory.getLogger(ZkBasedTenantRebalanceObserver.class);
- public static final int DEFAULT_ZK_UPDATE_MAX_RETRIES = 3;
private static final int MIN_ZK_UPDATE_RETRY_DELAY_MS = 100;
private static final int MAX_ZK_UPDATE_RETRY_DELAY_MS = 200;
+ public static final int DEFAULT_ZK_UPDATE_MAX_RETRIES = 5;
private final PinotHelixResourceManager _pinotHelixResourceManager;
private final String _jobId;
private final String _tenantName;
// Keep track of number of updates. Useful during debugging.
private final AtomicInteger _numUpdatesToZk;
- private final int _zkUpdateMaxRetries;
+ private final RetryPolicy _retryPolicy;
private boolean _isDone;
private ZkBasedTenantRebalanceObserver(String jobId, String tenantName,
@@ -64,16 +65,24 @@ public class ZkBasedTenantRebalanceObserver {
_jobId = jobId;
_tenantName = tenantName;
_pinotHelixResourceManager = pinotHelixResourceManager;
- _zkUpdateMaxRetries = zkUpdateMaxRetries;
_numUpdatesToZk = new AtomicInteger(0);
+ _retryPolicy = RetryPolicies.randomDelayRetryPolicy(zkUpdateMaxRetries,
MIN_ZK_UPDATE_RETRY_DELAY_MS,
+ MAX_ZK_UPDATE_RETRY_DELAY_MS);
}
private ZkBasedTenantRebalanceObserver(String jobId, String tenantName,
TenantRebalanceProgressStats progressStats,
TenantRebalanceContext tenantRebalanceContext, PinotHelixResourceManager
pinotHelixResourceManager,
int zkUpdateMaxRetries) {
this(jobId, tenantName, pinotHelixResourceManager, zkUpdateMaxRetries);
- _pinotHelixResourceManager.addControllerJobToZK(_jobId,
makeJobMetadata(tenantRebalanceContext, progressStats),
- ControllerJobTypes.TENANT_REBALANCE);
+ try {
+ _retryPolicy.attempt(() ->
_pinotHelixResourceManager.addControllerJobToZK(_jobId,
+ makeJobMetadata(tenantRebalanceContext, progressStats),
+ ControllerJobTypes.TENANT_REBALANCE, Objects::isNull)
+ );
+ } catch (AttemptFailureException e) {
+ LOGGER.error("Error creating initial job metadata in ZK for jobId: {}
for tenant rebalance", _jobId, e);
+ throw new RuntimeException("Error creating initial job metadata in ZK
for jobId: " + _jobId, e);
+ }
_numUpdatesToZk.incrementAndGet();
}
@@ -255,12 +264,11 @@ public class ZkBasedTenantRebalanceObserver {
private void updateTenantRebalanceJobMetadataInZk(
BiConsumer<TenantRebalanceContext, TenantRebalanceProgressStats> updater)
throws AttemptFailureException {
- RetryPolicy retry =
RetryPolicies.randomDelayRetryPolicy(_zkUpdateMaxRetries,
MIN_ZK_UPDATE_RETRY_DELAY_MS,
- MAX_ZK_UPDATE_RETRY_DELAY_MS);
- retry.attempt(() -> {
+ _retryPolicy.attempt(() -> {
Map<String, String> jobMetadata =
_pinotHelixResourceManager.getControllerJobZKMetadata(_jobId,
ControllerJobTypes.TENANT_REBALANCE);
if (jobMetadata == null) {
+ LOGGER.warn("Skip updating ZK since job metadata is not present in ZK
for jobId: {}", _jobId);
return false;
}
TenantRebalanceContext originalContext =
TenantRebalanceContext.fromTenantRebalanceJobMetadata(jobMetadata);
@@ -297,7 +305,7 @@ public class ZkBasedTenantRebalanceObserver {
if (updateSuccessful) {
return true;
} else {
- LOGGER.info(
+ LOGGER.warn(
"Tenant rebalance context or progress stats is out of sync with ZK
while polling, fetching the latest "
+ "context and progress stats from ZK and retry. jobId: {}",
_jobId);
return false;
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java
index 92f8dc729bb..c926014ef7c 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java
@@ -120,44 +120,44 @@ public class TenantRebalancerTest extends ControllerTest {
addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX +
(numServers + i), true);
}
- Map<String, Map<String, String>> oldSegmentAssignment =
-
_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME_B).getRecord().getMapFields();
-
- // rebalance the tables on test tenant
- TenantRebalanceConfig config = new TenantRebalanceConfig();
- config.setTenantName(TENANT_NAME);
- config.setVerboseResult(true);
- TenantRebalanceResult result = tenantRebalancer.rebalance(config);
- RebalanceResult rebalanceResult =
result.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_B);
- Map<String, Map<String, String>> rebalancedAssignment =
rebalanceResult.getSegmentAssignment();
- // assignment should not change, with a NO_OP status as no now server is
added to test tenant
- assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
- assertEquals(oldSegmentAssignment, rebalancedAssignment);
-
- // rebalance the tables on default tenant
- config.setTenantName(DEFAULT_TENANT_NAME);
- result = tenantRebalancer.rebalance(config);
- // rebalancing default tenant should distribute the segment of table A
over 6 servers
- rebalanceResult =
result.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_A);
- InstancePartitions partitions =
rebalanceResult.getInstanceAssignment().get(InstancePartitionsType.OFFLINE);
- assertEquals(partitions.getPartitionToInstancesMap().get("0_0").size(), 6);
-
- // ensure the ideal state and external view converges
- assertTrue(waitForCompletion(result.getJobId()));
- TenantRebalanceProgressStats progressStats =
getProgress(result.getJobId());
-
assertTrue(progressStats.getTableRebalanceJobIdMap().containsKey(OFFLINE_TABLE_NAME_A));
- assertEquals(progressStats.getTableStatusMap().get(OFFLINE_TABLE_NAME_A),
- TenantRebalanceProgressStats.TableStatus.DONE.name());
- Map<String, Map<String, String>> idealState =
-
_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME_A).getRecord().getMapFields();
- Map<String, Map<String, String>> externalView =
-
_helixResourceManager.getTableExternalView(OFFLINE_TABLE_NAME_A).getRecord().getMapFields();
- assertEquals(idealState, externalView);
-
- _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_A);
- _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_B);
- for (int i = 0; i < numServers + numServersToAdd; i++) {
- stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+ try {
+ Map<String, Map<String, String>> oldSegmentAssignment =
+
_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME_B).getRecord().getMapFields();
+
+ // rebalance the tables on test tenant
+ TenantRebalanceConfig config = new TenantRebalanceConfig();
+ config.setTenantName(TENANT_NAME);
+ config.setVerboseResult(true);
+ TenantRebalanceResult result = tenantRebalancer.rebalance(config);
+ RebalanceResult rebalanceResult =
result.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_B);
+ Map<String, Map<String, String>> rebalancedAssignment =
rebalanceResult.getSegmentAssignment();
+ // assignment should not change, with a NO_OP status as no now server is
added to test tenant
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+ assertEquals(oldSegmentAssignment, rebalancedAssignment);
+ // rebalance the tables on default tenant
+ config.setTenantName(DEFAULT_TENANT_NAME);
+ result = tenantRebalancer.rebalance(config);
+ // rebalancing default tenant should distribute the segment of table A
over 6 servers
+ rebalanceResult =
result.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_A);
+ InstancePartitions partitions =
rebalanceResult.getInstanceAssignment().get(InstancePartitionsType.OFFLINE);
+ assertEquals(partitions.getPartitionToInstancesMap().get("0_0").size(),
6);
+ // ensure the ideal state and external view converges
+ assertTrue(waitForCompletion(result.getJobId()));
+ TenantRebalanceProgressStats progressStats =
getProgress(result.getJobId());
+
assertTrue(progressStats.getTableRebalanceJobIdMap().containsKey(OFFLINE_TABLE_NAME_A));
+ assertEquals(progressStats.getTableStatusMap().get(OFFLINE_TABLE_NAME_A),
+ TenantRebalanceProgressStats.TableStatus.DONE.name());
+ Map<String, Map<String, String>> idealState =
+
_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME_A).getRecord().getMapFields();
+ Map<String, Map<String, String>> externalView =
+
_helixResourceManager.getTableExternalView(OFFLINE_TABLE_NAME_A).getRecord().getMapFields();
+ assertEquals(idealState, externalView);
+ } finally {
+ _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_A);
+ _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_B);
+ for (int i = 0; i < numServers + numServersToAdd; i++) {
+ stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+ }
}
}
@@ -191,90 +191,92 @@ public class TenantRebalancerTest extends ControllerTest {
addTenantTagToInstances(TENANT_NAME);
- // rebalance the tables on test tenant
- TenantRebalanceConfig config = new TenantRebalanceConfig();
- config.setTenantName(TENANT_NAME);
- config.setVerboseResult(true);
- config.setDryRun(true);
-
- // leave allow and block tables empty
- config.setIncludeTables(Collections.emptySet());
- config.setExcludeTables(Collections.emptySet());
+ try {
+ // rebalance the tables on test tenant
+ TenantRebalanceConfig config = new TenantRebalanceConfig();
+ config.setTenantName(TENANT_NAME);
+ config.setVerboseResult(true);
+ config.setDryRun(true);
- TenantRebalanceResult tenantRebalanceResult =
tenantRebalancer.rebalance(config);
+ // leave allow and block tables empty
+ config.setIncludeTables(Collections.emptySet());
+ config.setExcludeTables(Collections.emptySet());
- RebalanceResult rebalanceResultA =
tenantRebalanceResult.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_A);
- RebalanceResult rebalanceResultB =
tenantRebalanceResult.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_B);
+ TenantRebalanceResult tenantRebalanceResult =
tenantRebalancer.rebalance(config);
- assertNotNull(rebalanceResultA);
- assertNotNull(rebalanceResultB);
+ RebalanceResult rebalanceResultA =
tenantRebalanceResult.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_A);
+ RebalanceResult rebalanceResultB =
tenantRebalanceResult.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_B);
- assertEquals(rebalanceResultA.getStatus(), RebalanceResult.Status.DONE);
- assertEquals(rebalanceResultB.getStatus(), RebalanceResult.Status.DONE);
+ assertNotNull(rebalanceResultA);
+ assertNotNull(rebalanceResultB);
- // block table B
- config.setExcludeTables(Collections.singleton(OFFLINE_TABLE_NAME_B));
+ assertEquals(rebalanceResultA.getStatus(), RebalanceResult.Status.DONE);
+ assertEquals(rebalanceResultB.getStatus(), RebalanceResult.Status.DONE);
- tenantRebalanceResult = tenantRebalancer.rebalance(config);
+ // block table B
+ config.setExcludeTables(Collections.singleton(OFFLINE_TABLE_NAME_B));
- rebalanceResultA =
tenantRebalanceResult.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_A);
- rebalanceResultB =
tenantRebalanceResult.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_B);
+ tenantRebalanceResult = tenantRebalancer.rebalance(config);
- assertNotNull(rebalanceResultA);
- assertNull(rebalanceResultB);
- assertEquals(rebalanceResultA.getStatus(), RebalanceResult.Status.DONE);
+ rebalanceResultA =
tenantRebalanceResult.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_A);
+ rebalanceResultB =
tenantRebalanceResult.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_B);
- // allow all tables explicitly, block table B, this should result the same
as above case
- Set<String> includeTables = new HashSet<>();
- includeTables.add(OFFLINE_TABLE_NAME_A);
- includeTables.add(OFFLINE_TABLE_NAME_B);
- config.setIncludeTables(includeTables);
- config.setExcludeTables(Collections.singleton(OFFLINE_TABLE_NAME_B));
+ assertNotNull(rebalanceResultA);
+ assertNull(rebalanceResultB);
+ assertEquals(rebalanceResultA.getStatus(), RebalanceResult.Status.DONE);
- tenantRebalanceResult = tenantRebalancer.rebalance(config);
+ // allow all tables explicitly, block table B, this should result the
same as above case
+ Set<String> includeTables = new HashSet<>();
+ includeTables.add(OFFLINE_TABLE_NAME_A);
+ includeTables.add(OFFLINE_TABLE_NAME_B);
+ config.setIncludeTables(includeTables);
+ config.setExcludeTables(Collections.singleton(OFFLINE_TABLE_NAME_B));
- rebalanceResultA =
tenantRebalanceResult.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_A);
- rebalanceResultB =
tenantRebalanceResult.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_B);
+ tenantRebalanceResult = tenantRebalancer.rebalance(config);
- assertNotNull(rebalanceResultA);
- assertNull(rebalanceResultB);
- assertEquals(rebalanceResultA.getStatus(), RebalanceResult.Status.DONE);
+ rebalanceResultA =
tenantRebalanceResult.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_A);
+ rebalanceResultB =
tenantRebalanceResult.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_B);
- // allow table B
- config.setIncludeTables(Collections.singleton(OFFLINE_TABLE_NAME_B));
- config.setExcludeTables(Collections.emptySet());
+ assertNotNull(rebalanceResultA);
+ assertNull(rebalanceResultB);
+ assertEquals(rebalanceResultA.getStatus(), RebalanceResult.Status.DONE);
- tenantRebalanceResult = tenantRebalancer.rebalance(config);
+ // allow table B
+ config.setIncludeTables(Collections.singleton(OFFLINE_TABLE_NAME_B));
+ config.setExcludeTables(Collections.emptySet());
- rebalanceResultA =
tenantRebalanceResult.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_A);
- rebalanceResultB =
tenantRebalanceResult.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_B);
+ tenantRebalanceResult = tenantRebalancer.rebalance(config);
- assertNull(rebalanceResultA);
- assertNotNull(rebalanceResultB);
+ rebalanceResultA =
tenantRebalanceResult.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_A);
+ rebalanceResultB =
tenantRebalanceResult.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_B);
- assertEquals(rebalanceResultB.getStatus(), RebalanceResult.Status.DONE);
+ assertNull(rebalanceResultA);
+ assertNotNull(rebalanceResultB);
- // allow table B, also block table B
- config.setIncludeTables(Collections.singleton(OFFLINE_TABLE_NAME_B));
- config.setExcludeTables(Collections.singleton(OFFLINE_TABLE_NAME_B));
+ assertEquals(rebalanceResultB.getStatus(), RebalanceResult.Status.DONE);
- tenantRebalanceResult = tenantRebalancer.rebalance(config);
+ // allow table B, also block table B
+ config.setIncludeTables(Collections.singleton(OFFLINE_TABLE_NAME_B));
+ config.setExcludeTables(Collections.singleton(OFFLINE_TABLE_NAME_B));
- assertTrue(tenantRebalanceResult.getRebalanceTableResults().isEmpty());
+ tenantRebalanceResult = tenantRebalancer.rebalance(config);
- // allow a non-existing table
-
config.setIncludeTables(Collections.singleton("TableDoesNotExist_OFFLINE"));
- config.setExcludeTables(Collections.emptySet());
+ assertTrue(tenantRebalanceResult.getRebalanceTableResults().isEmpty());
- tenantRebalanceResult = tenantRebalancer.rebalance(config);
+ // allow a non-existing table
+
config.setIncludeTables(Collections.singleton("TableDoesNotExist_OFFLINE"));
+ config.setExcludeTables(Collections.emptySet());
- assertTrue(tenantRebalanceResult.getRebalanceTableResults().isEmpty());
+ tenantRebalanceResult = tenantRebalancer.rebalance(config);
- _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_A);
- _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_B);
+ assertTrue(tenantRebalanceResult.getRebalanceTableResults().isEmpty());
+ } finally {
+ _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_A);
+ _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_B);
- for (int i = 0; i < numServers + numServersToAdd; i++) {
- stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+ for (int i = 0; i < numServers + numServersToAdd; i++) {
+ stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+ }
}
}
@@ -300,62 +302,67 @@ public class TenantRebalancerTest extends ControllerTest {
TenantRebalancer tenantRebalancer =
new TenantRebalancer(_tableRebalanceManager, _helixResourceManager,
_executorService);
- // table A set tenantConfig.tenants.server to tenantName
- // SHOULD be selected as tenant's table
- TableConfig tableConfigA = new
TableConfigBuilder(TableType.OFFLINE).setTableName(tableNameA)
-
.setServerTenant(TENANT_NAME).setBrokerTenant(DEFAULT_TENANT_NAME).build();
- // table B set tenantConfig.tagOverrideConfig.realtimeConsuming to
tenantName
- // SHOULD be selected as tenant's table
- TableConfig tableConfigB = new
TableConfigBuilder(TableType.REALTIME).setTableName(tableNameB)
- .setServerTenant(DEFAULT_TENANT_NAME)
- .setBrokerTenant(DEFAULT_TENANT_NAME)
- .setTagOverrideConfig(new
TagOverrideConfig(TagNameUtils.getRealtimeTagForTenant(TENANT_NAME), null))
-
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap())
- .build();
- // table C set instanceAssignmentConfigMap.OFFLINE.tagPoolConfig.tag to
tenantName
- // SHOULD be selected as tenant's table
- TableConfig tableConfigC = new
TableConfigBuilder(TableType.OFFLINE).setTableName(tableNameC)
- .setServerTenant(DEFAULT_TENANT_NAME)
- .setBrokerTenant(DEFAULT_TENANT_NAME)
- .setInstanceAssignmentConfigMap(
- Collections.singletonMap("OFFLINE", new InstanceAssignmentConfig(
- new
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false,
0, null), null,
- new InstanceReplicaGroupPartitionConfig(true, 0, 1, 0, 0, 0,
false, null), null, true
- ))).build();
- // table D set tierConfigList[0].serverTag to tenantName
- // SHOULD be selected as tenant's table
- TableConfig tableConfigD = new
TableConfigBuilder(TableType.OFFLINE).setTableName(tableNameD)
- .setServerTenant(DEFAULT_TENANT_NAME)
- .setBrokerTenant(DEFAULT_TENANT_NAME)
- .setTierConfigList(Collections.singletonList(
- new TierConfig("dummyTier",
TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "7d", null,
- TierFactory.PINOT_SERVER_STORAGE_TYPE,
- TagNameUtils.getOfflineTagForTenant(TENANT_NAME), null,
null))).build();
- // table E set to default tenant
- // SHOULD NOT be selected as tenant's table
- TableConfig tableConfigE = new
TableConfigBuilder(TableType.OFFLINE).setTableName(tableNameE)
-
.setServerTenant(DEFAULT_TENANT_NAME).setBrokerTenant(DEFAULT_TENANT_NAME).setNumReplicas(NUM_REPLICAS).build();
- // Create the table
- _helixResourceManager.addTable(tableConfigA);
- _helixResourceManager.addTable(tableConfigB);
- _helixResourceManager.addTable(tableConfigC);
- _helixResourceManager.addTable(tableConfigD);
- _helixResourceManager.addTable(tableConfigE);
- Set<String> tenantTables = tenantRebalancer.getTenantTables(TENANT_NAME);
- assertEquals(tenantTables.size(), 4);
-
assertTrue(tenantTables.contains(TableNameBuilder.OFFLINE.tableNameWithType(tableNameA)));
-
assertTrue(tenantTables.contains(TableNameBuilder.REALTIME.tableNameWithType(tableNameB)));
-
assertTrue(tenantTables.contains(TableNameBuilder.OFFLINE.tableNameWithType(tableNameC)));
-
assertTrue(tenantTables.contains(TableNameBuilder.OFFLINE.tableNameWithType(tableNameD)));
-
assertFalse(tenantTables.contains(TableNameBuilder.OFFLINE.tableNameWithType(tableNameE)));
-
- _helixResourceManager.deleteOfflineTable(tableNameA);
- _helixResourceManager.deleteRealtimeTable(tableNameB);
- _helixResourceManager.deleteOfflineTable(tableNameC);
- _helixResourceManager.deleteOfflineTable(tableNameD);
- _helixResourceManager.deleteOfflineTable(tableNameE);
- for (int i = 0; i < numServers; i++) {
- stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+ try {
+ // table A set tenantConfig.tenants.server to tenantName
+ // SHOULD be selected as tenant's table
+ TableConfig tableConfigA = new
TableConfigBuilder(TableType.OFFLINE).setTableName(tableNameA)
+
.setServerTenant(TENANT_NAME).setBrokerTenant(DEFAULT_TENANT_NAME).build();
+ // table B set tenantConfig.tagOverrideConfig.realtimeConsuming to
tenantName
+ // SHOULD be selected as tenant's table
+ TableConfig tableConfigB = new
TableConfigBuilder(TableType.REALTIME).setTableName(tableNameB)
+ .setServerTenant(DEFAULT_TENANT_NAME)
+ .setBrokerTenant(DEFAULT_TENANT_NAME)
+ .setTagOverrideConfig(new
TagOverrideConfig(TagNameUtils.getRealtimeTagForTenant(TENANT_NAME), null))
+
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap())
+ .build();
+ // table C set instanceAssignmentConfigMap.OFFLINE.tagPoolConfig.tag to
tenantName
+ // SHOULD be selected as tenant's table
+ TableConfig tableConfigC = new
TableConfigBuilder(TableType.OFFLINE).setTableName(tableNameC)
+ .setServerTenant(DEFAULT_TENANT_NAME)
+ .setBrokerTenant(DEFAULT_TENANT_NAME)
+ .setInstanceAssignmentConfigMap(
+ Collections.singletonMap("OFFLINE", new InstanceAssignmentConfig(
+ new
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(TENANT_NAME), false,
0, null), null,
+ new InstanceReplicaGroupPartitionConfig(true, 0, 1, 0, 0, 0,
false, null), null, true
+ ))).build();
+ // table D set tierConfigList[0].serverTag to tenantName
+ // SHOULD be selected as tenant's table
+ TableConfig tableConfigD = new
TableConfigBuilder(TableType.OFFLINE).setTableName(tableNameD)
+ .setServerTenant(DEFAULT_TENANT_NAME)
+ .setBrokerTenant(DEFAULT_TENANT_NAME)
+ .setTierConfigList(Collections.singletonList(
+ new TierConfig("dummyTier",
TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "7d", null,
+ TierFactory.PINOT_SERVER_STORAGE_TYPE,
+ TagNameUtils.getOfflineTagForTenant(TENANT_NAME), null,
null))).build();
+ // table E set to default tenant
+ // SHOULD NOT be selected as tenant's table
+ TableConfig tableConfigE = new
TableConfigBuilder(TableType.OFFLINE).setTableName(tableNameE)
+ .setServerTenant(DEFAULT_TENANT_NAME)
+ .setBrokerTenant(DEFAULT_TENANT_NAME)
+ .setNumReplicas(NUM_REPLICAS)
+ .build();
+ // Create the table
+ _helixResourceManager.addTable(tableConfigA);
+ _helixResourceManager.addTable(tableConfigB);
+ _helixResourceManager.addTable(tableConfigC);
+ _helixResourceManager.addTable(tableConfigD);
+ _helixResourceManager.addTable(tableConfigE);
+ Set<String> tenantTables = tenantRebalancer.getTenantTables(TENANT_NAME);
+ assertEquals(tenantTables.size(), 4);
+
assertTrue(tenantTables.contains(TableNameBuilder.OFFLINE.tableNameWithType(tableNameA)));
+
assertTrue(tenantTables.contains(TableNameBuilder.REALTIME.tableNameWithType(tableNameB)));
+
assertTrue(tenantTables.contains(TableNameBuilder.OFFLINE.tableNameWithType(tableNameC)));
+
assertTrue(tenantTables.contains(TableNameBuilder.OFFLINE.tableNameWithType(tableNameD)));
+
assertFalse(tenantTables.contains(TableNameBuilder.OFFLINE.tableNameWithType(tableNameE)));
+ } finally {
+ _helixResourceManager.deleteOfflineTable(tableNameA);
+ _helixResourceManager.deleteRealtimeTable(tableNameB);
+ _helixResourceManager.deleteOfflineTable(tableNameC);
+ _helixResourceManager.deleteOfflineTable(tableNameD);
+ _helixResourceManager.deleteOfflineTable(tableNameE);
+ for (int i = 0; i < numServers; i++) {
+ stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+ }
}
}
@@ -388,118 +395,120 @@ public class TenantRebalancerTest extends
ControllerTest {
}
addTenantTagToInstances(TENANT_NAME);
- // rebalance the tables on test tenant, this should be a pure scale out
- TenantRebalanceConfig config = new TenantRebalanceConfig();
- config.setTenantName(TENANT_NAME);
- config.setVerboseResult(true);
- config.setDryRun(true);
- TenantRebalanceResult dryRunResult = tenantRebalancer.rebalance(config);
-
- RebalanceSummaryResult.ServerInfo serverInfo =
-
dryRunResult.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_B).getRebalanceSummaryResult().getServerInfo();
- assertEquals(serverInfo.getServersAdded().size(), 3);
- assertEquals(serverInfo.getServersRemoved().size(), 0);
-
- Queue<TenantRebalancer.TenantTableRebalanceJobContext> tableQueue =
- tenantRebalancer.createTableQueue(config,
dryRunResult.getRebalanceTableResults());
- // Dimension Table B should be rebalance first since it is a dim table,
and we're doing scale out
- TenantRebalancer.TenantTableRebalanceJobContext jobContext =
tableQueue.poll();
- assertNotNull(jobContext);
- assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_B);
- jobContext = tableQueue.poll();
- assertNotNull(jobContext);
- assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_A);
-
- // untag server 0, now the rebalance is not a pure scale in/out
- _helixResourceManager.updateInstanceTags(SERVER_INSTANCE_ID_PREFIX + 0,
"", false);
- dryRunResult = tenantRebalancer.rebalance(config);
-
- serverInfo =
-
dryRunResult.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_B).getRebalanceSummaryResult().getServerInfo();
- assertEquals(serverInfo.getServersAdded().size(), 3);
- assertEquals(serverInfo.getServersRemoved().size(), 1);
-
- tableQueue = tenantRebalancer.createTableQueue(config,
dryRunResult.getRebalanceTableResults());
- // Dimension table B should be rebalance first in this case. (it does not
matter whether dimension tables are
- // rebalanced first or last in this case, simply because we defaulted it
to be first while non-pure scale in/out)
- jobContext = tableQueue.poll();
- assertNotNull(jobContext);
- assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_B);
- jobContext = tableQueue.poll();
- assertNotNull(jobContext);
- assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_A);
-
- // untag the added servers, now the rebalance is a pure scale in
- for (int i = numServers; i < numServers + numServersToAdd; i++) {
- _helixResourceManager.updateInstanceTags(SERVER_INSTANCE_ID_PREFIX + i,
"", false);
- }
- dryRunResult = tenantRebalancer.rebalance(config);
-
- serverInfo =
-
dryRunResult.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_B).getRebalanceSummaryResult().getServerInfo();
- assertEquals(serverInfo.getServersAdded().size(), 0);
- assertEquals(serverInfo.getServersRemoved().size(), 1);
-
- tableQueue = tenantRebalancer.createTableQueue(config,
dryRunResult.getRebalanceTableResults());
- // Dimension table B should be rebalance last in this case
- jobContext = tableQueue.poll();
- assertNotNull(jobContext);
- assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_A);
- jobContext = tableQueue.poll();
- assertNotNull(jobContext);
- assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_B);
-
- // set table B in parallel blacklist, so that it ends up in sequential
queue, and table A in parallel queue
-
Pair<ConcurrentLinkedDeque<TenantRebalancer.TenantTableRebalanceJobContext>,
- Queue<TenantRebalancer.TenantTableRebalanceJobContext>>
- queues =
- tenantRebalancer.createParallelAndSequentialQueues(config,
dryRunResult.getRebalanceTableResults(), null,
- Collections.singleton(OFFLINE_TABLE_NAME_B));
- Queue<TenantRebalancer.TenantTableRebalanceJobContext> parallelQueue =
queues.getLeft();
- Queue<TenantRebalancer.TenantTableRebalanceJobContext> sequentialQueue =
queues.getRight();
- jobContext = parallelQueue.poll();
- assertNotNull(jobContext);
- assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_A);
- assertNull(parallelQueue.poll());
- jobContext = sequentialQueue.poll();
- assertNotNull(jobContext);
- assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_B);
- assertNull(sequentialQueue.poll());
-
- // set table B in parallel whitelist, so that it ends up in parallel
queue, and table A in sequential queue
- queues = tenantRebalancer.createParallelAndSequentialQueues(config,
dryRunResult.getRebalanceTableResults(),
- Collections.singleton(OFFLINE_TABLE_NAME_B), null);
- parallelQueue = queues.getLeft();
- sequentialQueue = queues.getRight();
- jobContext = parallelQueue.poll();
- assertNotNull(jobContext);
- assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_B);
- assertNull(parallelQueue.poll());
- jobContext = sequentialQueue.poll();
- assertNotNull(jobContext);
- assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_A);
- assertNull(sequentialQueue.poll());
-
- // set both tables in parallel whitelist, and table B in parallel
blacklist, so that B ends up in sequential
- // queue, and table A in parallel queue
- queues = tenantRebalancer.createParallelAndSequentialQueues(config,
dryRunResult.getRebalanceTableResults(),
- Set.of(OFFLINE_TABLE_NAME_A, OFFLINE_TABLE_NAME_B),
Collections.singleton(OFFLINE_TABLE_NAME_B));
- parallelQueue = queues.getLeft();
- sequentialQueue = queues.getRight();
- jobContext = parallelQueue.poll();
- assertNotNull(jobContext);
- assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_A);
- assertNull(parallelQueue.poll());
- jobContext = sequentialQueue.poll();
- assertNotNull(jobContext);
- assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_B);
- assertNull(sequentialQueue.poll());
-
- _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_A);
- _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_B);
-
- for (int i = 0; i < numServers + numServersToAdd; i++) {
- stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+ try {
+ // rebalance the tables on test tenant, this should be a pure scale out
+ TenantRebalanceConfig config = new TenantRebalanceConfig();
+ config.setTenantName(TENANT_NAME);
+ config.setVerboseResult(true);
+ config.setDryRun(true);
+ TenantRebalanceResult dryRunResult = tenantRebalancer.rebalance(config);
+
+ RebalanceSummaryResult.ServerInfo serverInfo =
+
dryRunResult.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_B).getRebalanceSummaryResult().getServerInfo();
+ assertEquals(serverInfo.getServersAdded().size(), 3);
+ assertEquals(serverInfo.getServersRemoved().size(), 0);
+
+ Queue<TenantRebalancer.TenantTableRebalanceJobContext> tableQueue =
+ tenantRebalancer.createTableQueue(config,
dryRunResult.getRebalanceTableResults());
+ // Dimension Table B should be rebalance first since it is a dim table,
and we're doing scale out
+ TenantRebalancer.TenantTableRebalanceJobContext jobContext =
tableQueue.poll();
+ assertNotNull(jobContext);
+ assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_B);
+ jobContext = tableQueue.poll();
+ assertNotNull(jobContext);
+ assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_A);
+
+ // untag server 0, now the rebalance is not a pure scale in/out
+ _helixResourceManager.updateInstanceTags(SERVER_INSTANCE_ID_PREFIX + 0,
"", false);
+ dryRunResult = tenantRebalancer.rebalance(config);
+
+ serverInfo =
+
dryRunResult.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_B).getRebalanceSummaryResult().getServerInfo();
+ assertEquals(serverInfo.getServersAdded().size(), 3);
+ assertEquals(serverInfo.getServersRemoved().size(), 1);
+
+ tableQueue = tenantRebalancer.createTableQueue(config,
dryRunResult.getRebalanceTableResults());
+ // Dimension table B should be rebalance first in this case. (it does
not matter whether dimension tables are
+ // rebalanced first or last in this case, simply because we defaulted it
to be first while non-pure scale in/out)
+ jobContext = tableQueue.poll();
+ assertNotNull(jobContext);
+ assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_B);
+ jobContext = tableQueue.poll();
+ assertNotNull(jobContext);
+ assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_A);
+
+ // untag the added servers, now the rebalance is a pure scale in
+ for (int i = numServers; i < numServers + numServersToAdd; i++) {
+ _helixResourceManager.updateInstanceTags(SERVER_INSTANCE_ID_PREFIX +
i, "", false);
+ }
+ dryRunResult = tenantRebalancer.rebalance(config);
+
+ serverInfo =
+
dryRunResult.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_B).getRebalanceSummaryResult().getServerInfo();
+ assertEquals(serverInfo.getServersAdded().size(), 0);
+ assertEquals(serverInfo.getServersRemoved().size(), 1);
+
+ tableQueue = tenantRebalancer.createTableQueue(config,
dryRunResult.getRebalanceTableResults());
+ // Dimension table B should be rebalance last in this case
+ jobContext = tableQueue.poll();
+ assertNotNull(jobContext);
+ assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_A);
+ jobContext = tableQueue.poll();
+ assertNotNull(jobContext);
+ assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_B);
+
+ // set table B in parallel blacklist, so that it ends up in sequential
queue, and table A in parallel queue
+
Pair<ConcurrentLinkedDeque<TenantRebalancer.TenantTableRebalanceJobContext>,
+ Queue<TenantRebalancer.TenantTableRebalanceJobContext>>
+ queues =
+ tenantRebalancer.createParallelAndSequentialQueues(config,
dryRunResult.getRebalanceTableResults(), null,
+ Collections.singleton(OFFLINE_TABLE_NAME_B));
+ Queue<TenantRebalancer.TenantTableRebalanceJobContext> parallelQueue =
queues.getLeft();
+ Queue<TenantRebalancer.TenantTableRebalanceJobContext> sequentialQueue =
queues.getRight();
+ jobContext = parallelQueue.poll();
+ assertNotNull(jobContext);
+ assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_A);
+ assertNull(parallelQueue.poll());
+ jobContext = sequentialQueue.poll();
+ assertNotNull(jobContext);
+ assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_B);
+ assertNull(sequentialQueue.poll());
+
+ // set table B in parallel whitelist, so that it ends up in parallel
queue, and table A in sequential queue
+ queues = tenantRebalancer.createParallelAndSequentialQueues(config,
dryRunResult.getRebalanceTableResults(),
+ Collections.singleton(OFFLINE_TABLE_NAME_B), null);
+ parallelQueue = queues.getLeft();
+ sequentialQueue = queues.getRight();
+ jobContext = parallelQueue.poll();
+ assertNotNull(jobContext);
+ assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_B);
+ assertNull(parallelQueue.poll());
+ jobContext = sequentialQueue.poll();
+ assertNotNull(jobContext);
+ assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_A);
+ assertNull(sequentialQueue.poll());
+
+ // set both tables in parallel whitelist, and table B in parallel
blacklist, so that B ends up in sequential
+ // queue, and table A in parallel queue
+ queues = tenantRebalancer.createParallelAndSequentialQueues(config,
dryRunResult.getRebalanceTableResults(),
+ Set.of(OFFLINE_TABLE_NAME_A, OFFLINE_TABLE_NAME_B),
Collections.singleton(OFFLINE_TABLE_NAME_B));
+ parallelQueue = queues.getLeft();
+ sequentialQueue = queues.getRight();
+ jobContext = parallelQueue.poll();
+ assertNotNull(jobContext);
+ assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_A);
+ assertNull(parallelQueue.poll());
+ jobContext = sequentialQueue.poll();
+ assertNotNull(jobContext);
+ assertEquals(jobContext.getTableName(), OFFLINE_TABLE_NAME_B);
+ assertNull(sequentialQueue.poll());
+ } finally {
+ _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_A);
+ _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_B);
+
+ for (int i = 0; i < numServers + numServersToAdd; i++) {
+ stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+ }
}
}
@@ -1123,57 +1132,60 @@ public class TenantRebalancerTest extends
ControllerTest {
}
addTenantTagToInstances(TENANT_NAME);
- String jobId = "test-poll-job-123";
-
- // Create tenant rebalance context with tables in queues
- TenantRebalanceConfig config = new TenantRebalanceConfig();
- config.setTenantName(TENANT_NAME);
- config.setVerboseResult(true);
- config.setDryRun(true);
-
- TenantRebalanceResult dryRunResult = tenantRebalancer.rebalance(config);
- TenantRebalanceContext context = new TenantRebalanceContext(
- jobId, config, 1,
- dryRunResult.getRebalanceTableResults().keySet().stream()
- .map(tableName -> new
TenantRebalancer.TenantTableRebalanceJobContext(tableName, tableName + "_job",
false))
- .collect(Collectors.toCollection(ConcurrentLinkedDeque::new)),
- new LinkedList<>(),
- new ConcurrentLinkedQueue<>()
- );
-
- TenantRebalanceProgressStats progressStats =
- new
TenantRebalanceProgressStats(dryRunResult.getRebalanceTableResults().keySet());
-
- // Test polling from parallel queue
- ZkBasedTenantRebalanceObserver observer =
- new ZkBasedTenantRebalanceObserver(jobId, TENANT_NAME, progressStats,
context, _helixResourceManager);
- TenantRebalancer.TenantTableRebalanceJobContext polledJob =
observer.pollParallel();
- assertNotNull(polledJob);
-
assertTrue(dryRunResult.getRebalanceTableResults().containsKey(polledJob.getTableName()));
-
- // Test polling from sequential queue (should be empty)
- TenantRebalancer.TenantTableRebalanceJobContext sequentialJob =
observer.pollSequential();
- assertNull(sequentialJob);
-
- // Verify the job was moved to ongoing queue and status was updated
- Map<String, String> updatedMetadata =
- _helixResourceManager.getControllerJobZKMetadata(jobId,
ControllerJobTypes.TENANT_REBALANCE);
- assertNotNull(updatedMetadata);
- TenantRebalanceContext updatedContext =
TenantRebalanceContext.fromTenantRebalanceJobMetadata(updatedMetadata);
- TenantRebalanceProgressStats updatedStats =
-
TenantRebalanceProgressStats.fromTenantRebalanceJobMetadata(updatedMetadata);
-
- assertNotNull(updatedContext);
- assertNotNull(updatedStats);
- assertEquals(updatedContext.getOngoingJobsQueue().size(), 1);
-
assertEquals(updatedStats.getTableStatusMap().get(polledJob.getTableName()),
- TenantRebalanceProgressStats.TableStatus.REBALANCING.name());
-
- _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_A);
- _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_B);
+ try {
+ String jobId = "test-poll-job-123";
+
+ // Create tenant rebalance context with tables in queues
+ TenantRebalanceConfig config = new TenantRebalanceConfig();
+ config.setTenantName(TENANT_NAME);
+ config.setVerboseResult(true);
+ config.setDryRun(true);
+
+ TenantRebalanceResult dryRunResult = tenantRebalancer.rebalance(config);
+ TenantRebalanceContext context = new TenantRebalanceContext(
+ jobId, config, 1,
+ dryRunResult.getRebalanceTableResults().keySet().stream()
+ .map(tableName -> new
TenantRebalancer.TenantTableRebalanceJobContext(tableName, tableName + "_job",
+ false))
+ .collect(Collectors.toCollection(ConcurrentLinkedDeque::new)),
+ new LinkedList<>(),
+ new ConcurrentLinkedQueue<>()
+ );
+
+ TenantRebalanceProgressStats progressStats =
+ new
TenantRebalanceProgressStats(dryRunResult.getRebalanceTableResults().keySet());
+
+ // Test polling from parallel queue
+ ZkBasedTenantRebalanceObserver observer =
+ new ZkBasedTenantRebalanceObserver(jobId, TENANT_NAME,
progressStats, context, _helixResourceManager);
+ TenantRebalancer.TenantTableRebalanceJobContext polledJob =
observer.pollParallel();
+ assertNotNull(polledJob);
+
assertTrue(dryRunResult.getRebalanceTableResults().containsKey(polledJob.getTableName()));
+
+ // Test polling from sequential queue (should be empty)
+ TenantRebalancer.TenantTableRebalanceJobContext sequentialJob =
observer.pollSequential();
+ assertNull(sequentialJob);
+
+ // Verify the job was moved to ongoing queue and status was updated
+ Map<String, String> updatedMetadata =
+ _helixResourceManager.getControllerJobZKMetadata(jobId,
ControllerJobTypes.TENANT_REBALANCE);
+ assertNotNull(updatedMetadata);
+ TenantRebalanceContext updatedContext =
TenantRebalanceContext.fromTenantRebalanceJobMetadata(updatedMetadata);
+ TenantRebalanceProgressStats updatedStats =
+
TenantRebalanceProgressStats.fromTenantRebalanceJobMetadata(updatedMetadata);
+
+ assertNotNull(updatedContext);
+ assertNotNull(updatedStats);
+ assertEquals(updatedContext.getOngoingJobsQueue().size(), 1);
+
assertEquals(updatedStats.getTableStatusMap().get(polledJob.getTableName()),
+ TenantRebalanceProgressStats.TableStatus.REBALANCING.name());
+ } finally {
+ _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_A);
+ _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_B);
- for (int i = 0; i < numServers + numServersToAdd; i++) {
- stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+ for (int i = 0; i < numServers + numServersToAdd; i++) {
+ stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+ }
}
}
@@ -1206,78 +1218,81 @@ public class TenantRebalancerTest extends
ControllerTest {
}
addTenantTagToInstances(TENANT_NAME);
- // Create observer and test cancellation
- String jobId = "test-cancel-job-456";
-
- // Create tenant rebalance context with tables in queues
- TenantRebalanceConfig config = new TenantRebalanceConfig();
- config.setTenantName(TENANT_NAME);
- config.setVerboseResult(true);
- config.setDryRun(true);
-
- TenantRebalanceResult dryRunResult = tenantRebalancer.rebalance(config);
- Set<String> tableNames = dryRunResult.getRebalanceTableResults().keySet();
-
- TenantRebalanceContext context = new TenantRebalanceContext(
- jobId, config, 1,
- tableNames.stream()
- .map(tableName -> new
TenantRebalancer.TenantTableRebalanceJobContext(tableName, tableName + "_job",
false))
- .collect(Collectors.toCollection(ConcurrentLinkedDeque::new)),
- new LinkedList<>(),
- new ConcurrentLinkedQueue<>()
- );
-
- TenantRebalanceProgressStats progressStats = new
TenantRebalanceProgressStats(tableNames);
-
- // Test cancellation by user
- ZkBasedTenantRebalanceObserver observer =
- new ZkBasedTenantRebalanceObserver(jobId, TENANT_NAME, progressStats,
context, _helixResourceManager);
-
- // move one job to ongoing to test cancellation from that state
- TenantRebalancer.TenantTableRebalanceJobContext polledJob =
observer.pollParallel();
- assertNotNull(polledJob);
-
assertTrue(dryRunResult.getRebalanceTableResults().containsKey(polledJob.getTableName()));
-
- Pair<List<String>, Boolean> cancelResult = observer.cancelJob(true);
- assertTrue(cancelResult.getRight()); // cancellation was successful
- assertTrue(cancelResult.getLeft()
- .isEmpty()); // no jobs were cancelled (the polled job hasn't started
its table rebalance job yet thus won't
- // show in the cancelled list)
-
- // Verify that queues are emptied and status is updated
- Map<String, String> updatedMetadata =
- _helixResourceManager.getControllerJobZKMetadata(jobId,
ControllerJobTypes.TENANT_REBALANCE);
- assertNotNull(updatedMetadata);
- TenantRebalanceContext updatedContext =
TenantRebalanceContext.fromTenantRebalanceJobMetadata(updatedMetadata);
- TenantRebalanceProgressStats updatedStats =
-
TenantRebalanceProgressStats.fromTenantRebalanceJobMetadata(updatedMetadata);
-
- assertNotNull(updatedContext);
- assertNotNull(updatedStats);
- assertTrue(updatedContext.getParallelQueue().isEmpty());
- assertTrue(updatedContext.getSequentialQueue().isEmpty());
- assertTrue(updatedContext.getOngoingJobsQueue().isEmpty());
- assertEquals(updatedStats.getRemainingTables(), 0);
- assertEquals(updatedStats.getCompletionStatusMsg(), "Tenant rebalance job
has been cancelled.");
- assertTrue(updatedStats.getTimeToFinishInSeconds() >= 0);
-
- // Verify all tables are marked as not scheduled
- for (String tableName : tableNames) {
- if (tableName.equals(polledJob.getTableName())) {
- // the polled job was in ongoing, so should be marked as CANCELLED
- assertEquals(updatedStats.getTableStatusMap().get(tableName),
- TenantRebalanceProgressStats.TableStatus.CANCELLED.name());
- } else {
- assertEquals(updatedStats.getTableStatusMap().get(tableName),
- TenantRebalanceProgressStats.TableStatus.NOT_SCHEDULED.name());
+ try {
+ // Create observer and test cancellation
+ String jobId = "test-cancel-job-456";
+
+ // Create tenant rebalance context with tables in queues
+ TenantRebalanceConfig config = new TenantRebalanceConfig();
+ config.setTenantName(TENANT_NAME);
+ config.setVerboseResult(true);
+ config.setDryRun(true);
+
+ TenantRebalanceResult dryRunResult = tenantRebalancer.rebalance(config);
+ Set<String> tableNames =
dryRunResult.getRebalanceTableResults().keySet();
+
+ TenantRebalanceContext context = new TenantRebalanceContext(
+ jobId, config, 1,
+ tableNames.stream()
+ .map(tableName -> new
TenantRebalancer.TenantTableRebalanceJobContext(tableName, tableName + "_job",
+ false))
+ .collect(Collectors.toCollection(ConcurrentLinkedDeque::new)),
+ new LinkedList<>(),
+ new ConcurrentLinkedQueue<>()
+ );
+
+ TenantRebalanceProgressStats progressStats = new
TenantRebalanceProgressStats(tableNames);
+
+ // Test cancellation by user
+ ZkBasedTenantRebalanceObserver observer =
+ new ZkBasedTenantRebalanceObserver(jobId, TENANT_NAME,
progressStats, context, _helixResourceManager);
+
+ // move one job to ongoing to test cancellation from that state
+ TenantRebalancer.TenantTableRebalanceJobContext polledJob =
observer.pollParallel();
+ assertNotNull(polledJob);
+
assertTrue(dryRunResult.getRebalanceTableResults().containsKey(polledJob.getTableName()));
+
+ Pair<List<String>, Boolean> cancelResult = observer.cancelJob(true);
+ assertTrue(cancelResult.getRight()); // cancellation was successful
+ assertTrue(cancelResult.getLeft()
+ .isEmpty()); // no jobs were cancelled (the polled job hasn't
started its table rebalance job yet thus won't
+ // show in the cancelled list)
+
+ // Verify that queues are emptied and status is updated
+ Map<String, String> updatedMetadata =
+ _helixResourceManager.getControllerJobZKMetadata(jobId,
ControllerJobTypes.TENANT_REBALANCE);
+ assertNotNull(updatedMetadata);
+ TenantRebalanceContext updatedContext =
TenantRebalanceContext.fromTenantRebalanceJobMetadata(updatedMetadata);
+ TenantRebalanceProgressStats updatedStats =
+
TenantRebalanceProgressStats.fromTenantRebalanceJobMetadata(updatedMetadata);
+
+ assertNotNull(updatedContext);
+ assertNotNull(updatedStats);
+ assertTrue(updatedContext.getParallelQueue().isEmpty());
+ assertTrue(updatedContext.getSequentialQueue().isEmpty());
+ assertTrue(updatedContext.getOngoingJobsQueue().isEmpty());
+ assertEquals(updatedStats.getRemainingTables(), 0);
+ assertEquals(updatedStats.getCompletionStatusMsg(), "Tenant rebalance
job has been cancelled.");
+ assertTrue(updatedStats.getTimeToFinishInSeconds() >= 0);
+
+ // Verify all tables are marked as not scheduled
+ for (String tableName : tableNames) {
+ if (tableName.equals(polledJob.getTableName())) {
+ // the polled job was in ongoing, so should be marked as CANCELLED
+ assertEquals(updatedStats.getTableStatusMap().get(tableName),
+ TenantRebalanceProgressStats.TableStatus.CANCELLED.name());
+ } else {
+ assertEquals(updatedStats.getTableStatusMap().get(tableName),
+ TenantRebalanceProgressStats.TableStatus.NOT_SCHEDULED.name());
+ }
}
- }
-
- _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_A);
- _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_B);
+ } finally {
+ _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_A);
+ _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_B);
- for (int i = 0; i < numServers + numServersToAdd; i++) {
- stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+ for (int i = 0; i < numServers + numServersToAdd; i++) {
+ stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+ }
}
}
@@ -1310,66 +1325,71 @@ public class TenantRebalancerTest extends
ControllerTest {
}
addTenantTagToInstances(TENANT_NAME);
- // Create observer and test table job completion
- String jobId = "test-table-done-job-789";
-
- // Create tenant rebalance context with tables in ongoing queue
- TenantRebalanceConfig config = new TenantRebalanceConfig();
- config.setTenantName(TENANT_NAME);
- config.setVerboseResult(true);
- config.setDryRun(true);
-
- TenantRebalanceResult dryRunResult = tenantRebalancer.rebalance(config);
- Set<String> tableNames = dryRunResult.getRebalanceTableResults().keySet();
-
- TenantRebalanceContext context = new TenantRebalanceContext(
- jobId, config, 1,
- new ConcurrentLinkedDeque<>(),
- new LinkedList<>(),
- tableNames.stream()
- .map(tableName -> new
TenantRebalancer.TenantTableRebalanceJobContext(tableName, tableName + "_job",
false))
- .collect(Collectors.toCollection(ConcurrentLinkedQueue::new))
- );
-
- TenantRebalanceProgressStats progressStats = new
TenantRebalanceProgressStats(tableNames);
- // Set initial status to REBALANCING for the tables
- for (String tableName : tableNames) {
- progressStats.updateTableStatus(tableName,
TenantRebalanceProgressStats.TableStatus.REBALANCING.name());
- }
-
- // Test onTableJobDone
- ZkBasedTenantRebalanceObserver observer =
- new ZkBasedTenantRebalanceObserver(jobId, TENANT_NAME, progressStats,
context, _helixResourceManager);
- TenantRebalancer.TenantTableRebalanceJobContext jobContextA =
- new
TenantRebalancer.TenantTableRebalanceJobContext(OFFLINE_TABLE_NAME_A,
OFFLINE_TABLE_NAME_A + "_job", false);
- observer.onTableJobDone(jobContextA);
- TenantRebalancer.TenantTableRebalanceJobContext jobContextB =
- new
TenantRebalancer.TenantTableRebalanceJobContext(OFFLINE_TABLE_NAME_B,
OFFLINE_TABLE_NAME_B + "_job", false);
- String errorMessage = "Test error message";
- observer.onTableJobError(jobContextB, errorMessage);
-
- // Verify that the job was removed from ongoing queue and status was
updated
- Map<String, String> updatedMetadata =
- _helixResourceManager.getControllerJobZKMetadata(jobId,
ControllerJobTypes.TENANT_REBALANCE);
- assertNotNull(updatedMetadata);
- TenantRebalanceContext updatedContext =
TenantRebalanceContext.fromTenantRebalanceJobMetadata(updatedMetadata);
- TenantRebalanceProgressStats updatedStats =
-
TenantRebalanceProgressStats.fromTenantRebalanceJobMetadata(updatedMetadata);
+ try {
+ // Create observer and test table job completion
+ String jobId = "test-table-done-job-789";
+
+ // Create tenant rebalance context with tables in ongoing queue
+ TenantRebalanceConfig config = new TenantRebalanceConfig();
+ config.setTenantName(TENANT_NAME);
+ config.setVerboseResult(true);
+ config.setDryRun(true);
+
+ TenantRebalanceResult dryRunResult = tenantRebalancer.rebalance(config);
+ Set<String> tableNames =
dryRunResult.getRebalanceTableResults().keySet();
+
+ TenantRebalanceContext context = new TenantRebalanceContext(
+ jobId, config, 1,
+ new ConcurrentLinkedDeque<>(),
+ new LinkedList<>(),
+ tableNames.stream()
+ .map(tableName -> new
TenantRebalancer.TenantTableRebalanceJobContext(tableName, tableName + "_job",
+ false))
+ .collect(Collectors.toCollection(ConcurrentLinkedQueue::new))
+ );
+
+ TenantRebalanceProgressStats progressStats = new
TenantRebalanceProgressStats(tableNames);
+ // Set initial status to REBALANCING for the tables
+ for (String tableName : tableNames) {
+ progressStats.updateTableStatus(tableName,
TenantRebalanceProgressStats.TableStatus.REBALANCING.name());
+ }
- assertNotNull(updatedContext);
- assertNotNull(updatedStats);
- assertFalse(updatedContext.getOngoingJobsQueue().contains(jobContextA));
- assertFalse(updatedContext.getOngoingJobsQueue().contains(jobContextB));
- assertEquals(updatedStats.getTableStatusMap().get(OFFLINE_TABLE_NAME_A),
- TenantRebalanceProgressStats.TableStatus.DONE.name());
- assertEquals(updatedStats.getTableStatusMap().get(OFFLINE_TABLE_NAME_B),
errorMessage);
- assertEquals(updatedStats.getRemainingTables(), tableNames.size() - 2);
-
- _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_A);
- _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_B);
-
- for (int i = 0; i < numServers + numServersToAdd; i++) {
- stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+ // Test onTableJobDone
+ ZkBasedTenantRebalanceObserver observer =
+ new ZkBasedTenantRebalanceObserver(jobId, TENANT_NAME,
progressStats, context, _helixResourceManager);
+ TenantRebalancer.TenantTableRebalanceJobContext jobContextA =
+ new
TenantRebalancer.TenantTableRebalanceJobContext(OFFLINE_TABLE_NAME_A,
OFFLINE_TABLE_NAME_A + "_job",
+ false);
+ observer.onTableJobDone(jobContextA);
+ TenantRebalancer.TenantTableRebalanceJobContext jobContextB =
+ new
TenantRebalancer.TenantTableRebalanceJobContext(OFFLINE_TABLE_NAME_B,
OFFLINE_TABLE_NAME_B + "_job",
+ false);
+ String errorMessage = "Test error message";
+ observer.onTableJobError(jobContextB, errorMessage);
+
+ // Verify that the job was removed from ongoing queue and status was
updated
+ Map<String, String> updatedMetadata =
+ _helixResourceManager.getControllerJobZKMetadata(jobId,
ControllerJobTypes.TENANT_REBALANCE);
+ assertNotNull(updatedMetadata);
+ TenantRebalanceContext updatedContext =
TenantRebalanceContext.fromTenantRebalanceJobMetadata(updatedMetadata);
+ TenantRebalanceProgressStats updatedStats =
+
TenantRebalanceProgressStats.fromTenantRebalanceJobMetadata(updatedMetadata);
+
+ assertNotNull(updatedContext);
+ assertNotNull(updatedStats);
+ assertFalse(updatedContext.getOngoingJobsQueue().contains(jobContextA));
+ assertFalse(updatedContext.getOngoingJobsQueue().contains(jobContextB));
+ assertEquals(updatedStats.getTableStatusMap().get(OFFLINE_TABLE_NAME_A),
+ TenantRebalanceProgressStats.TableStatus.DONE.name());
+ assertEquals(updatedStats.getTableStatusMap().get(OFFLINE_TABLE_NAME_B),
errorMessage);
+ assertEquals(updatedStats.getRemainingTables(), tableNames.size() - 2);
+ } finally {
+ _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_A);
+ _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_B);
+
+ for (int i = 0; i < numServers + numServersToAdd; i++) {
+ stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+ }
}
}
@@ -1402,59 +1422,62 @@ public class TenantRebalancerTest extends
ControllerTest {
}
addTenantTagToInstances(TENANT_NAME);
- // Create observer and test lifecycle methods
- String jobId = "test-lifecycle-job-202";
-
- // Create tenant rebalance context
- TenantRebalanceConfig config = new TenantRebalanceConfig();
- config.setTenantName(TENANT_NAME);
- config.setVerboseResult(true);
- config.setDryRun(true);
-
- TenantRebalanceResult dryRunResult = tenantRebalancer.rebalance(config);
- Set<String> tableNames = dryRunResult.getRebalanceTableResults().keySet();
-
- TenantRebalanceContext context = new TenantRebalanceContext(
- jobId, config, 1,
- tableNames.stream()
- .map(tableName -> new
TenantRebalancer.TenantTableRebalanceJobContext(tableName, tableName + "_job",
false))
- .collect(Collectors.toCollection(ConcurrentLinkedDeque::new)),
- new LinkedList<>(),
- new ConcurrentLinkedQueue<>()
- );
-
- TenantRebalanceProgressStats progressStats = new
TenantRebalanceProgressStats(tableNames);
-
- // Test onStart
- ZkBasedTenantRebalanceObserver observer =
- new ZkBasedTenantRebalanceObserver(jobId, TENANT_NAME, progressStats,
context, _helixResourceManager);
- observer.onStart();
- Map<String, String> startMetadata =
- _helixResourceManager.getControllerJobZKMetadata(jobId,
ControllerJobTypes.TENANT_REBALANCE);
- assertNotNull(startMetadata);
- TenantRebalanceProgressStats startStats =
-
TenantRebalanceProgressStats.fromTenantRebalanceJobMetadata(startMetadata);
- assertNotNull(startStats);
- assertTrue(startStats.getStartTimeMs() > 0);
-
- // Test onSuccess
- String successMessage = "Tenant rebalance completed successfully";
- observer.onSuccess(successMessage);
- Map<String, String> successMetadata =
- _helixResourceManager.getControllerJobZKMetadata(jobId,
ControllerJobTypes.TENANT_REBALANCE);
- assertNotNull(successMetadata);
- TenantRebalanceProgressStats successStats =
-
TenantRebalanceProgressStats.fromTenantRebalanceJobMetadata(successMetadata);
- assertNotNull(successStats);
- assertEquals(successStats.getCompletionStatusMsg(), successMessage);
- assertTrue(successStats.getTimeToFinishInSeconds() >= 0);
- assertTrue(observer.isDone());
-
- _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_A);
- _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_B);
-
- for (int i = 0; i < numServers + numServersToAdd; i++) {
- stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+ try {
+ // Create observer and test lifecycle methods
+ String jobId = "test-lifecycle-job-202";
+
+ // Create tenant rebalance context
+ TenantRebalanceConfig config = new TenantRebalanceConfig();
+ config.setTenantName(TENANT_NAME);
+ config.setVerboseResult(true);
+ config.setDryRun(true);
+
+ TenantRebalanceResult dryRunResult = tenantRebalancer.rebalance(config);
+ Set<String> tableNames =
dryRunResult.getRebalanceTableResults().keySet();
+
+ TenantRebalanceContext context = new TenantRebalanceContext(
+ jobId, config, 1,
+ tableNames.stream()
+ .map(tableName -> new
TenantRebalancer.TenantTableRebalanceJobContext(tableName, tableName + "_job",
+ false))
+ .collect(Collectors.toCollection(ConcurrentLinkedDeque::new)),
+ new LinkedList<>(),
+ new ConcurrentLinkedQueue<>()
+ );
+
+ TenantRebalanceProgressStats progressStats = new
TenantRebalanceProgressStats(tableNames);
+
+ // Test onStart
+ ZkBasedTenantRebalanceObserver observer =
+ new ZkBasedTenantRebalanceObserver(jobId, TENANT_NAME,
progressStats, context, _helixResourceManager);
+ observer.onStart();
+ Map<String, String> startMetadata =
+ _helixResourceManager.getControllerJobZKMetadata(jobId,
ControllerJobTypes.TENANT_REBALANCE);
+ assertNotNull(startMetadata);
+ TenantRebalanceProgressStats startStats =
+
TenantRebalanceProgressStats.fromTenantRebalanceJobMetadata(startMetadata);
+ assertNotNull(startStats);
+ assertTrue(startStats.getStartTimeMs() > 0);
+
+ // Test onSuccess
+ String successMessage = "Tenant rebalance completed successfully";
+ observer.onSuccess(successMessage);
+ Map<String, String> successMetadata =
+ _helixResourceManager.getControllerJobZKMetadata(jobId,
ControllerJobTypes.TENANT_REBALANCE);
+ assertNotNull(successMetadata);
+ TenantRebalanceProgressStats successStats =
+
TenantRebalanceProgressStats.fromTenantRebalanceJobMetadata(successMetadata);
+ assertNotNull(successStats);
+ assertEquals(successStats.getCompletionStatusMsg(), successMessage);
+ assertTrue(successStats.getTimeToFinishInSeconds() >= 0);
+ assertTrue(observer.isDone());
+ } finally {
+ _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_A);
+ _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_B);
+
+ for (int i = 0; i < numServers + numServersToAdd; i++) {
+ stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+ }
}
}
@@ -1484,27 +1507,27 @@ public class TenantRebalancerTest extends
ControllerTest {
}
addTenantTagToInstances(TENANT_NAME);
- for (int i = 0; i < 3; i++) {
- runZkBasedTenantRebalanceObserverConcurrentPoll();
- }
-
- // Clean up tables
- for (String tableName : tableNames) {
- _helixResourceManager.deleteOfflineTable(tableName);
- }
+ try {
+ for (int i = 0; i < 3; i++) {
+ runZkBasedTenantRebalanceObserverConcurrentPoll("concurrent-poll-job-"
+ i);
+ }
+ } finally {
+ // Clean up tables
+ for (String tableName : tableNames) {
+ _helixResourceManager.deleteOfflineTable(tableName);
+ }
- for (int i = 0; i < numServers + numServersToAdd; i++) {
- stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+ for (int i = 0; i < numServers + numServersToAdd; i++) {
+ stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+ }
}
}
- private void runZkBasedTenantRebalanceObserverConcurrentPoll()
+ private void runZkBasedTenantRebalanceObserverConcurrentPoll(String jobId)
throws Exception {
TenantRebalancer tenantRebalancer =
new TenantRebalancer(_tableRebalanceManager, _helixResourceManager,
_executorService);
- String jobId = "test-concurrent-poll-job-303";
-
// Create tenant rebalance context with multiple tables
TenantRebalanceConfig config = new TenantRebalanceConfig();
config.setTenantName(TENANT_NAME);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]