This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 28b3d3e396 Discard existing instancePartitions if bootstrap rebalance
mode is enabled (#10167)
28b3d3e396 is described below
commit 28b3d3e3964c905ef2c4f25b7f25db514e4d7f0b
Author: Jialiang Li <[email protected]>
AuthorDate: Tue Jan 24 16:13:57 2023 -0800
Discard existing instancePartitions if bootstrap rebalance mode is enabled
(#10167)
Co-authored-by: Jack Li(Analytics Engineering) <[email protected]>
---
.../helix/core/rebalance/TableRebalancer.java | 37 +++++++++++-----------
1 file changed, 19 insertions(+), 18 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index 4a1bc13b52..b8257ed450 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -152,10 +152,9 @@ public class TableRebalancer {
LOGGER.info(
"Start rebalancing table: {} with dryRun: {}, reassignInstances: {},
includeConsuming: {}, bootstrap: {}, "
+ "downtime: {}, minReplicasToKeepUpForNoDowntime: {},
enableStrictReplicaGroup: {}, bestEfforts: {}, "
- + "externalViewCheckIntervalInMs: {},
externalViewStabilizationTimeoutInMs: {}",
- tableNameWithType, dryRun, reassignInstances, includeConsuming,
bootstrap, downtime,
- minReplicasToKeepUpForNoDowntime, enableStrictReplicaGroup,
bestEfforts, externalViewCheckIntervalInMs,
- externalViewStabilizationTimeoutInMs);
+ + "externalViewCheckIntervalInMs: {},
externalViewStabilizationTimeoutInMs: {}", tableNameWithType, dryRun,
+ reassignInstances, includeConsuming, bootstrap, downtime,
minReplicasToKeepUpForNoDowntime,
+ enableStrictReplicaGroup, bestEfforts, externalViewCheckIntervalInMs,
externalViewStabilizationTimeoutInMs);
// Validate table config
try {
@@ -200,7 +199,7 @@ public class TableRebalancer {
// Calculate instance partitions map
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap;
try {
- instancePartitionsMap = getInstancePartitionsMap(tableConfig,
reassignInstances, dryRun);
+ instancePartitionsMap = getInstancePartitionsMap(tableConfig,
reassignInstances, bootstrap, dryRun);
} catch (Exception e) {
LOGGER.warn(
"Caught exception while fetching/calculating instance partitions for
table: {}, aborting the rebalance",
@@ -353,7 +352,7 @@ public class TableRebalancer {
if (segmentsToMoveChanged) {
try {
// Re-calculate the instance partitions in case the instance
configs changed during the rebalance
- instancePartitionsMap = getInstancePartitionsMap(tableConfig,
reassignInstances, false);
+ instancePartitionsMap = getInstancePartitionsMap(tableConfig,
reassignInstances, bootstrap, false);
tierToInstancePartitionsMap =
getTierToInstancePartitionsMap(tableNameWithType, sortedTiers);
targetAssignment =
segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap,
sortedTiers,
tierToInstancePartitionsMap, rebalanceConfig);
@@ -419,21 +418,21 @@ public class TableRebalancer {
}
private Map<InstancePartitionsType, InstancePartitions>
getInstancePartitionsMap(TableConfig tableConfig,
- boolean reassignInstances, boolean dryRun) {
+ boolean reassignInstances, boolean bootstrap, boolean dryRun) {
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
new TreeMap<>();
if (tableConfig.getTableType() == TableType.OFFLINE) {
instancePartitionsMap.put(InstancePartitionsType.OFFLINE,
- getInstancePartitions(tableConfig, InstancePartitionsType.OFFLINE,
reassignInstances, dryRun));
+ getInstancePartitions(tableConfig, InstancePartitionsType.OFFLINE,
reassignInstances, bootstrap, dryRun));
} else {
instancePartitionsMap.put(InstancePartitionsType.CONSUMING,
- getInstancePartitions(tableConfig, InstancePartitionsType.CONSUMING,
reassignInstances, dryRun));
+ getInstancePartitions(tableConfig, InstancePartitionsType.CONSUMING,
reassignInstances, bootstrap, dryRun));
String tableNameWithType = tableConfig.getTableName();
if
(InstanceAssignmentConfigUtils.shouldRelocateCompletedSegments(tableConfig)) {
LOGGER.info(
"COMPLETED segments should be relocated, fetching/computing
COMPLETED instance partitions for table: {}",
tableNameWithType);
instancePartitionsMap.put(InstancePartitionsType.COMPLETED,
- getInstancePartitions(tableConfig,
InstancePartitionsType.COMPLETED, reassignInstances, dryRun));
+ getInstancePartitions(tableConfig,
InstancePartitionsType.COMPLETED, reassignInstances, bootstrap, dryRun));
} else {
LOGGER.info(
"COMPLETED segments should not be relocated, skipping
fetching/computing COMPLETED instance partitions "
@@ -451,18 +450,18 @@ public class TableRebalancer {
}
private InstancePartitions getInstancePartitions(TableConfig tableConfig,
- InstancePartitionsType instancePartitionsType, boolean
reassignInstances, boolean dryRun) {
+ InstancePartitionsType instancePartitionsType, boolean
reassignInstances, boolean bootstrap, boolean dryRun) {
String tableNameWithType = tableConfig.getTableName();
if (InstanceAssignmentConfigUtils.allowInstanceAssignment(tableConfig,
instancePartitionsType)) {
if (reassignInstances) {
String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
- boolean hasPreConfiguredInstancePartitions =
TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig,
- instancePartitionsType);
+ boolean hasPreConfiguredInstancePartitions =
+ TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig,
instancePartitionsType);
if (hasPreConfiguredInstancePartitions) {
String referenceInstancePartitionsName =
tableConfig.getInstancePartitionsMap().get(instancePartitionsType);
- InstancePartitions instancePartitions =
InstancePartitionsUtils.fetchInstancePartitionsWithRename(
- _helixManager.getHelixPropertyStore(),
referenceInstancePartitionsName,
- instancePartitionsType.getInstancePartitionsName(rawTableName));
+ InstancePartitions instancePartitions =
+
InstancePartitionsUtils.fetchInstancePartitionsWithRename(_helixManager.getHelixPropertyStore(),
+ referenceInstancePartitionsName,
instancePartitionsType.getInstancePartitionsName(rawTableName));
if (!dryRun) {
LOGGER.info("Persisting instance partitions: {} (referencing {})",
instancePartitions,
referenceInstancePartitionsName);
@@ -471,8 +470,10 @@ public class TableRebalancer {
}
return instancePartitions;
}
- InstancePartitions existingInstancePartitions =
-
InstancePartitionsUtils.fetchInstancePartitions(_helixManager.getHelixPropertyStore(),
+ // Set existing instance partition to null if bootstrap mode is
enabled, so that the instance partition
+ // map can be fully recalculated.
+ InstancePartitions existingInstancePartitions = bootstrap ? null
+ :
InstancePartitionsUtils.fetchInstancePartitions(_helixManager.getHelixPropertyStore(),
InstancePartitionsUtils.getInstancePartitionsName(tableNameWithType,
instancePartitionsType.toString()));
LOGGER.info("Reassigning {} instances for table: {}",
instancePartitionsType, tableNameWithType);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]