This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 06d8540378 For table rebalance, check if instances are equal for NO_OP
(#11073)
06d8540378 is described below
commit 06d8540378d9df6220c0e0e47d42841b3376bd96
Author: summerhasama-stripe
<[email protected]>
AuthorDate: Mon Jul 24 21:17:03 2023 -0400
For table rebalance, check if instances are equal for NO_OP (#11073)
---
.../common/assignment/InstancePartitions.java | 19 +++
.../helix/core/rebalance/TableRebalancer.java | 174 +++++++++++++--------
.../TableRebalancerClusterStatelessTest.java | 16 +-
3 files changed, 134 insertions(+), 75 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java
b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java
index a296527e84..a67bb93ce0 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitions.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.TreeMap;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -146,4 +147,22 @@ public class InstancePartitions {
public String toString() {
return toJsonString();
}
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (!(obj instanceof InstancePartitions)) {
+ return false;
+ }
+ InstancePartitions other = (InstancePartitions) obj;
+ return Objects.equals(_instancePartitionsName,
other._instancePartitionsName)
+ && Objects.equals(_partitionToInstancesMap,
other._partitionToInstancesMap);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(_instancePartitionsName, _partitionToInstancesMap);
+ }
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index 22c5b87635..95df26ff96 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -226,8 +226,12 @@ public class TableRebalancer {
// Calculate instance partitions map
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap;
+ boolean instancePartitionsUnchanged;
try {
- instancePartitionsMap = getInstancePartitionsMap(tableConfig,
reassignInstances, bootstrap, dryRun);
+ Pair<Map<InstancePartitionsType, InstancePartitions>, Boolean>
instancePartitionsMapAndUnchanged =
+ getInstancePartitionsMap(tableConfig, reassignInstances, bootstrap,
dryRun);
+ instancePartitionsMap = instancePartitionsMapAndUnchanged.getLeft();
+ instancePartitionsUnchanged =
instancePartitionsMapAndUnchanged.getRight();
} catch (Exception e) {
LOGGER.warn("For rebalanceId: {}, caught exception while
fetching/calculating instance partitions for table: {}, "
+ "aborting the rebalance", rebalanceJobId, tableNameWithType, e);
@@ -237,9 +241,13 @@ public class TableRebalancer {
// Calculate instance partitions for tiers if configured
List<Tier> sortedTiers = getSortedTiers(tableConfig);
- Map<String, InstancePartitions> tierToInstancePartitionsMap =
+
+ Pair<Map<String, InstancePartitions>, Boolean>
tierToInstancePartitionsMapAndUnchanged =
getTierToInstancePartitionsMap(tableConfig, sortedTiers,
reassignInstances, bootstrap, dryRun);
+ Map<String, InstancePartitions> tierToInstancePartitionsMap =
tierToInstancePartitionsMapAndUnchanged.getLeft();
+ boolean tierInstancePartitionsUnchanged =
tierToInstancePartitionsMapAndUnchanged.getRight();
+
LOGGER.info("For rebalanceId: {}, calculating the target assignment for
table: {}", rebalanceJobId,
tableNameWithType);
SegmentAssignment segmentAssignment =
SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig);
@@ -256,21 +264,26 @@ public class TableRebalancer {
tierToInstancePartitionsMap, null);
}
- if (currentAssignment.equals(targetAssignment)) {
+ boolean segmentAssignmentUnchanged =
currentAssignment.equals(targetAssignment);
+ LOGGER.info("For rebalanceId: {}, segmentAssignmentUnchanged: {}, "
+ + "tierInstancePartitionsUnchanged: {},
instancePartitionsUnchanged: {} for table: {}",
+ rebalanceJobId, segmentAssignmentUnchanged,
tierInstancePartitionsUnchanged,
+ instancePartitionsUnchanged, tableNameWithType);
+
+ if (segmentAssignmentUnchanged) {
LOGGER.info("Table: {} is already balanced", tableNameWithType);
- if (reassignInstances) {
+ if (instancePartitionsUnchanged && tierInstancePartitionsUnchanged) {
+ return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.NO_OP, "Table is already balanced",
+ instancePartitionsMap, tierToInstancePartitionsMap,
targetAssignment);
+ } else {
if (dryRun) {
return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.DONE,
"Instance reassigned in dry-run mode, table is already
balanced", instancePartitionsMap,
tierToInstancePartitionsMap, targetAssignment);
- } else {
- return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.DONE,
- "Instance reassigned, table is already balanced",
instancePartitionsMap, tierToInstancePartitionsMap,
- targetAssignment);
}
- } else {
- return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.NO_OP, "Table is already balanced",
- instancePartitionsMap, tierToInstancePartitionsMap,
targetAssignment);
+ return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE,
+ "Instance reassigned, table is already balanced",
instancePartitionsMap, tierToInstancePartitionsMap,
+ targetAssignment);
}
}
@@ -399,9 +412,11 @@ public class TableRebalancer {
if (segmentsToMoveChanged) {
try {
// Re-calculate the instance partitions in case the instance
configs changed during the rebalance
- instancePartitionsMap = getInstancePartitionsMap(tableConfig,
reassignInstances, bootstrap, false);
+ instancePartitionsMap =
+ getInstancePartitionsMap(tableConfig, reassignInstances,
bootstrap, false).getLeft();
tierToInstancePartitionsMap =
- getTierToInstancePartitionsMap(tableConfig, sortedTiers,
reassignInstances, bootstrap, dryRun);
+ getTierToInstancePartitionsMap(tableConfig, sortedTiers,
reassignInstances,
+ bootstrap, dryRun).getLeft();
targetAssignment =
segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap,
sortedTiers,
tierToInstancePartitionsMap, rebalanceConfig);
} catch (Exception e) {
@@ -480,22 +495,32 @@ public class TableRebalancer {
}
}
- private Map<InstancePartitionsType, InstancePartitions>
getInstancePartitionsMap(TableConfig tableConfig,
- boolean reassignInstances, boolean bootstrap, boolean dryRun) {
+ /**
+ * Gets the instance partitions for instance partition types and also
returns a boolean for whether they are unchanged
+ */
+ private Pair<Map<InstancePartitionsType, InstancePartitions>, Boolean>
getInstancePartitionsMap(
+ TableConfig tableConfig, boolean reassignInstances, boolean bootstrap,
boolean dryRun) {
+ boolean instancePartitionsUnchanged = true;
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
new TreeMap<>();
if (tableConfig.getTableType() == TableType.OFFLINE) {
- instancePartitionsMap.put(InstancePartitionsType.OFFLINE,
- getInstancePartitions(tableConfig, InstancePartitionsType.OFFLINE,
reassignInstances, bootstrap, dryRun));
+ Pair<InstancePartitions, Boolean> partitionAndUnchangedForOffline =
+ getInstancePartitions(tableConfig, InstancePartitionsType.OFFLINE,
reassignInstances, bootstrap, dryRun);
+ instancePartitionsMap.put(InstancePartitionsType.OFFLINE,
partitionAndUnchangedForOffline.getLeft());
+ instancePartitionsUnchanged = instancePartitionsUnchanged &&
partitionAndUnchangedForOffline.getRight();
} else {
- instancePartitionsMap.put(InstancePartitionsType.CONSUMING,
- getInstancePartitions(tableConfig, InstancePartitionsType.CONSUMING,
reassignInstances, bootstrap, dryRun));
+ Pair<InstancePartitions, Boolean> partitionAndUnchangedForConsuming =
+ getInstancePartitions(tableConfig, InstancePartitionsType.CONSUMING,
reassignInstances, bootstrap, dryRun);
+ instancePartitionsMap.put(InstancePartitionsType.CONSUMING,
partitionAndUnchangedForConsuming.getLeft());
+ instancePartitionsUnchanged = instancePartitionsUnchanged &&
partitionAndUnchangedForConsuming.getRight();
String tableNameWithType = tableConfig.getTableName();
if
(InstanceAssignmentConfigUtils.shouldRelocateCompletedSegments(tableConfig)) {
+ Pair<InstancePartitions, Boolean> partitionAndUnchangedForCompleted =
+ getInstancePartitions(tableConfig,
InstancePartitionsType.COMPLETED, reassignInstances, bootstrap, dryRun);
LOGGER.info(
"COMPLETED segments should be relocated, fetching/computing
COMPLETED instance partitions for table: {}",
tableNameWithType);
- instancePartitionsMap.put(InstancePartitionsType.COMPLETED,
- getInstancePartitions(tableConfig,
InstancePartitionsType.COMPLETED, reassignInstances, bootstrap, dryRun));
+ instancePartitionsMap.put(InstancePartitionsType.COMPLETED,
partitionAndUnchangedForCompleted.getLeft());
+ instancePartitionsUnchanged = instancePartitionsUnchanged &&
partitionAndUnchangedForCompleted.getRight();
} else {
LOGGER.info(
"COMPLETED segments should not be relocated, skipping
fetching/computing COMPLETED instance partitions "
@@ -509,12 +534,21 @@ public class TableRebalancer {
}
}
}
- return instancePartitionsMap;
+ return Pair.of(instancePartitionsMap, instancePartitionsUnchanged);
}
- private InstancePartitions getInstancePartitions(TableConfig tableConfig,
+ /**
+ * Fetches/computes the instance partitions and also returns a boolean for
whether they are unchanged
+ */
+ private Pair<InstancePartitions, Boolean> getInstancePartitions(TableConfig
tableConfig,
InstancePartitionsType instancePartitionsType, boolean
reassignInstances, boolean bootstrap, boolean dryRun) {
String tableNameWithType = tableConfig.getTableName();
+
+ InstancePartitions existingInstancePartitions =
+
InstancePartitionsUtils.fetchInstancePartitions(_helixManager.getHelixPropertyStore(),
+
InstancePartitionsUtils.getInstancePartitionsName(tableNameWithType,
+ instancePartitionsType.toString()));
+
if (InstanceAssignmentConfigUtils.allowInstanceAssignment(tableConfig,
instancePartitionsType)) {
if (reassignInstances) {
String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
@@ -525,35 +559,33 @@ public class TableRebalancer {
InstancePartitions instancePartitions =
InstancePartitionsUtils.fetchInstancePartitionsWithRename(_helixManager.getHelixPropertyStore(),
referenceInstancePartitionsName,
instancePartitionsType.getInstancePartitionsName(rawTableName));
- if (!dryRun) {
+ boolean instancePartitionsUnchanged =
instancePartitions.equals(existingInstancePartitions);
+ if (!dryRun && !instancePartitionsUnchanged) {
LOGGER.info("Persisting instance partitions: {} (referencing {})",
instancePartitions,
referenceInstancePartitionsName);
InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(),
instancePartitions);
}
- return instancePartitions;
+ return Pair.of(instancePartitions, instancePartitionsUnchanged);
}
- // Set existing instance partition to null if bootstrap mode is
enabled, so that the instance partition
- // map can be fully recalculated.
- InstancePartitions existingInstancePartitions = bootstrap ? null
- :
InstancePartitionsUtils.fetchInstancePartitions(_helixManager.getHelixPropertyStore(),
-
InstancePartitionsUtils.getInstancePartitionsName(tableNameWithType,
- instancePartitionsType.toString()));
LOGGER.info("Reassigning {} instances for table: {}",
instancePartitionsType, tableNameWithType);
+ // Assign instances with existing instance partition to null if
bootstrap mode is enabled,
+ // so that the instance partition map can be fully recalculated.
InstanceAssignmentDriver instanceAssignmentDriver = new
InstanceAssignmentDriver(tableConfig);
InstancePartitions instancePartitions =
instanceAssignmentDriver.assignInstances(instancePartitionsType,
_helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs(),
true),
- existingInstancePartitions);
- if (!dryRun) {
+ bootstrap ? null : existingInstancePartitions);
+ boolean instancePartitionsUnchanged =
instancePartitions.equals(existingInstancePartitions);
+ if (!dryRun && !instancePartitionsUnchanged) {
LOGGER.info("Persisting instance partitions: {} to ZK",
instancePartitions);
InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(),
instancePartitions);
}
- return instancePartitions;
+ return Pair.of(instancePartitions, instancePartitionsUnchanged);
} else {
LOGGER.info("Fetching/computing {} instance partitions for table: {}",
instancePartitionsType,
tableNameWithType);
- return
InstancePartitionsUtils.fetchOrComputeInstancePartitions(_helixManager,
tableConfig,
- instancePartitionsType);
+ return
Pair.of(InstancePartitionsUtils.fetchOrComputeInstancePartitions(_helixManager,
tableConfig,
+ instancePartitionsType), true);
}
} else {
LOGGER.info("{} instance assignment is not allowed, using default
instance partitions for table: {}",
@@ -564,12 +596,15 @@ public class TableRebalancer {
}
InstancePartitions instancePartitions =
InstancePartitionsUtils.computeDefaultInstancePartitions(_helixManager,
tableConfig, instancePartitionsType);
- if (!dryRun) {
+
+ Boolean noExistingInstancePartitions = existingInstancePartitions ==
null;
+
+ if (!dryRun && !noExistingInstancePartitions) {
String instancePartitionsName =
instancePartitions.getInstancePartitionsName();
LOGGER.info("Removing instance partitions: {} from ZK if it exists",
instancePartitionsName);
InstancePartitionsUtils.removeInstancePartitions(_helixManager.getHelixPropertyStore(),
instancePartitionsName);
}
- return instancePartitions;
+ return Pair.of(instancePartitions, noExistingInstancePartitions);
}
}
@@ -586,74 +621,79 @@ public class TableRebalancer {
}
}
- @Nullable
- private Map<String, InstancePartitions>
getTierToInstancePartitionsMap(TableConfig tableConfig,
+ /**
+ * Fetches/computes the instance partitions for sorted tiers and also
returns a boolean for whether the
+ * instance partitions are unchanged.
+ */
+ private Pair<Map<String, InstancePartitions>, Boolean>
getTierToInstancePartitionsMap(TableConfig tableConfig,
@Nullable List<Tier> sortedTiers, boolean reassignInstances, boolean
bootstrap, boolean dryRun) {
if (sortedTiers == null) {
- return null;
+ return Pair.of(null, true);
}
+ boolean instancePartitionsUnchanged = true;
Map<String, InstancePartitions> tierToInstancePartitionsMap = new
HashMap<>();
for (Tier tier : sortedTiers) {
LOGGER.info("Fetching/computing instance partitions for tier: {} of
table: {}", tier.getName(),
tableConfig.getTableName());
- tierToInstancePartitionsMap.put(tier.getName(),
- getInstancePartitionsForTier(tableConfig, tier, reassignInstances,
bootstrap, dryRun));
+ Pair<InstancePartitions, Boolean> partitionsAndUnchanged =
getInstancePartitionsForTier(
+ tableConfig, tier, reassignInstances, bootstrap, dryRun);
+ tierToInstancePartitionsMap.put(tier.getName(),
partitionsAndUnchanged.getLeft());
+ instancePartitionsUnchanged = instancePartitionsUnchanged &&
partitionsAndUnchanged.getRight();
}
- return tierToInstancePartitionsMap;
+ return Pair.of(tierToInstancePartitionsMap, instancePartitionsUnchanged);
}
/**
* Computes the instance partitions for the given tier. If table's
instanceAssignmentConfigMap has an entry for the
- * tier, it's used to calculate the instance partitions. Else default
instance partitions are returned
+ * tier, it's used to calculate the instance partitions. Else default
instance partitions are returned. Also returns
+ * a boolean for whether the instance partition is unchanged.
*/
- private InstancePartitions getInstancePartitionsForTier(TableConfig
tableConfig, Tier tier, boolean reassignInstances,
- boolean bootstrap, boolean dryRun) {
+ private Pair<InstancePartitions, Boolean>
getInstancePartitionsForTier(TableConfig tableConfig, Tier tier,
+ boolean reassignInstances, boolean bootstrap, boolean dryRun) {
PinotServerTierStorage storage = (PinotServerTierStorage)
tier.getStorage();
InstancePartitions defaultInstancePartitions =
InstancePartitionsUtils.computeDefaultInstancePartitionsForTag(_helixManager,
tableConfig.getTableName(),
tier.getName(), storage.getServerTag());
+ String instancePartitionsName =
+
InstancePartitionsUtils.getInstancePartitionsNameForTier(tableConfig.getTableName(),
tier.getName());
+ InstancePartitions existingInstancePartitions = InstancePartitionsUtils.
+ fetchInstancePartitions(_helixManager.getHelixPropertyStore(),
instancePartitionsName);
if (tableConfig.getInstanceAssignmentConfigMap() == null ||
!tableConfig.getInstanceAssignmentConfigMap()
.containsKey(tier.getName())) {
LOGGER.info("Skipping fetching/computing instance partitions for tier {}
for table: {}", tier.getName(),
tableConfig.getTableName());
- if (!dryRun) {
- String instancePartitionsName =
-
InstancePartitionsUtils.getInstancePartitionsNameForTier(tableConfig.getTableName(),
tier.getName());
+ Boolean noExistingInstancePartitions = existingInstancePartitions ==
null;
+
+ if (!dryRun && !noExistingInstancePartitions) {
LOGGER.info("Removing instance partitions: {} from ZK if it exists",
instancePartitionsName);
InstancePartitionsUtils.removeInstancePartitions(_helixManager.getHelixPropertyStore(),
instancePartitionsName);
}
- return defaultInstancePartitions;
+ return Pair.of(defaultInstancePartitions, noExistingInstancePartitions);
}
String tableNameWithType = tableConfig.getTableName();
- String instancePartitionsName =
-
InstancePartitionsUtils.getInstancePartitionsNameForTier(tableConfig.getTableName(),
tier.getName());
if (reassignInstances) {
- // Set existing instance partition to null if bootstrap mode is enabled,
so that the instance partition
- // map can be fully recalculated.
- InstancePartitions existingInstancePartitions = bootstrap ? null
- :
InstancePartitionsUtils.fetchInstancePartitions(_helixManager.getHelixPropertyStore(),
- instancePartitionsName);
+ // Assign instances with existing instance partition to null if
bootstrap mode is enabled,
+ // so that the instance partition map can be fully recalculated.
InstanceAssignmentDriver instanceAssignmentDriver = new
InstanceAssignmentDriver(tableConfig);
InstancePartitions instancePartitions =
instanceAssignmentDriver.assignInstances(tier.getName(),
_helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs(),
true),
- existingInstancePartitions,
tableConfig.getInstanceAssignmentConfigMap().get(tier.getName()));
- if (!dryRun) {
+ bootstrap ? null : existingInstancePartitions,
+ tableConfig.getInstanceAssignmentConfigMap().get(tier.getName()));
+ boolean instancePartitionsUnchanged =
instancePartitions.equals(existingInstancePartitions);
+ if (!dryRun && !instancePartitionsUnchanged) {
LOGGER.info("Persisting instance partitions: {} to ZK",
instancePartitions);
InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(),
instancePartitions);
}
- return instancePartitions;
+ return Pair.of(instancePartitions, instancePartitionsUnchanged);
}
- InstancePartitions instancePartitions =
-
InstancePartitionsUtils.fetchInstancePartitions(_helixManager.getHelixPropertyStore(),
-
InstancePartitionsUtils.getInstancePartitionsNameForTier(tableNameWithType,
tier.getName()));
- if (instancePartitions != null) {
- return instancePartitions;
+ if (existingInstancePartitions != null) {
+ return Pair.of(existingInstancePartitions, true);
}
- return defaultInstancePartitions;
+ return Pair.of(defaultInstancePartitions, true);
}
private IdealState waitForExternalViewToConverge(String tableNameWithType,
boolean bestEfforts,
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
index 91f27160c0..f2eed4ccab 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
@@ -246,19 +246,19 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
tableConfig.setInstanceAssignmentConfigMap(null);
_helixResourceManager.updateTableConfig(tableConfig);
- // Without instances reassignment, the rebalance should return status
NO_OP as instance partitions are already
- // generated
+ // Without instances reassignment, the rebalance should return status DONE,
+ // and the instance partitions should be removed
rebalanceResult = tableRebalancer.rebalance(tableConfig, new
BaseConfiguration());
- assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+ assertNull(InstancePartitionsUtils.fetchInstancePartitions(_propertyStore,
+
InstancePartitionsType.OFFLINE.getInstancePartitionsName(RAW_TABLE_NAME)));
- // With instances reassignment, the instance partitions should be removed,
and the default instance partitions
- // should be used for segment assignment
+ // With instances reassignment, the default instance partitions
+ // should be used for segment assignment and should return NO_OP
rebalanceConfig = new BaseConfiguration();
rebalanceConfig.addProperty(RebalanceConfigConstants.REASSIGN_INSTANCES,
true);
rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig);
- assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
- assertNull(InstancePartitionsUtils.fetchInstancePartitions(_propertyStore,
-
InstancePartitionsType.OFFLINE.getInstancePartitionsName(RAW_TABLE_NAME)));
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
// All servers should be assigned to the table
instanceAssignment = rebalanceResult.getInstanceAssignment();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]