This is an automated email from the ASF dual-hosted git repository.
xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 164cd819cf report rebalance job status for the early returns like
noops (#13281)
164cd819cf is described below
commit 164cd819cfaa350167ed0b285875acb0e714b79d
Author: Xiaobing <[email protected]>
AuthorDate: Tue Jun 4 11:35:35 2024 -0700
report rebalance job status for the early returns like noops (#13281)
---
.../core/rebalance/NoOpTableRebalanceObserver.java | 4 ++
.../core/rebalance/TableRebalanceObserver.java | 2 +
.../helix/core/rebalance/TableRebalancer.java | 79 +++++++++++++---------
.../rebalance/ZkBasedTableRebalanceObserver.java | 12 +++-
4 files changed, 64 insertions(+), 33 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/NoOpTableRebalanceObserver.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/NoOpTableRebalanceObserver.java
index a435e35ead..2e5e7e2c96 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/NoOpTableRebalanceObserver.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/NoOpTableRebalanceObserver.java
@@ -29,6 +29,10 @@ public class NoOpTableRebalanceObserver implements
TableRebalanceObserver {
Map<String, Map<String, String>> targetState) {
}
+ @Override
+ public void onNoop(String msg) {
+ }
+
@Override
public void onSuccess(String msg) {
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceObserver.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceObserver.java
index e9c5c299cf..6139a26d65 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceObserver.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceObserver.java
@@ -40,6 +40,8 @@ public interface TableRebalanceObserver {
void onTrigger(Trigger trigger, Map<String, Map<String, String>>
currentState,
Map<String, Map<String, String>> targetState);
+ void onNoop(String msg);
+
void onSuccess(String msg);
void onError(String errorMsg);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index 5c3514fa7e..765d576549 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -194,21 +194,23 @@ public class TableRebalancer {
try {
currentIdealState =
_helixDataAccessor.getProperty(idealStatePropertyKey);
} catch (Exception e) {
- LOGGER.warn(
- "For rebalanceId: {}, caught exception while fetching IdealState for
table: {}, aborting the rebalance",
- rebalanceJobId, tableNameWithType, e);
+ onReturnFailure(String.format(
+ "For rebalanceId: %s, caught exception while fetching IdealState for
table: %s, aborting the rebalance",
+ rebalanceJobId, tableNameWithType), e);
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
"Caught exception while fetching IdealState: " + e, null, null,
null);
}
if (currentIdealState == null) {
- LOGGER.warn("For rebalanceId: {}, cannot find the IdealState for table:
{}, aborting the rebalance",
- rebalanceJobId, tableNameWithType);
+ onReturnFailure(
+ String.format("For rebalanceId: %s, cannot find the IdealState for
table: %s, aborting the rebalance",
+ rebalanceJobId, tableNameWithType), null);
return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.FAILED, "Cannot find the IdealState for table",
null, null, null);
}
if (!currentIdealState.isEnabled() && !downtime) {
- LOGGER.warn("For rebalanceId: {}, cannot rebalance disabled table: {}
without downtime, aborting the rebalance",
- rebalanceJobId, tableNameWithType);
+ onReturnFailure(String.format(
+ "For rebalanceId: %s, cannot rebalance disabled table: %s without
downtime, aborting the rebalance",
+ rebalanceJobId, tableNameWithType), null);
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
"Cannot rebalance disabled table without downtime", null, null,
null);
}
@@ -224,8 +226,9 @@ public class TableRebalancer {
instancePartitionsMap = instancePartitionsMapAndUnchanged.getLeft();
instancePartitionsUnchanged =
instancePartitionsMapAndUnchanged.getRight();
} catch (Exception e) {
- LOGGER.warn("For rebalanceId: {}, caught exception while
fetching/calculating instance partitions for table: {}, "
- + "aborting the rebalance", rebalanceJobId, tableNameWithType, e);
+ onReturnFailure(String.format(
+ "For rebalanceId: %s, caught exception while fetching/calculating
instance partitions for table: %s, "
+ + "aborting the rebalance", rebalanceJobId, tableNameWithType),
e);
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
"Caught exception while fetching/calculating instance partitions: "
+ e, null, null, null);
}
@@ -241,9 +244,9 @@ public class TableRebalancer {
tierToInstancePartitionsMap =
tierToInstancePartitionsMapAndUnchanged.getLeft();
tierInstancePartitionsUnchanged =
tierToInstancePartitionsMapAndUnchanged.getRight();
} catch (Exception e) {
- LOGGER.warn(
- "For rebalanceId: {}, caught exception while fetching/calculating
tier instance partitions for table: {}, "
- + "aborting the rebalance", rebalanceJobId, tableNameWithType,
e);
+ onReturnFailure(String.format(
+ "For rebalanceId: %s, caught exception while fetching/calculating
tier instance partitions for table: %s, "
+ + "aborting the rebalance", rebalanceJobId, tableNameWithType),
e);
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
"Caught exception while fetching/calculating tier instance
partitions: " + e, null, null, null);
}
@@ -258,8 +261,9 @@ public class TableRebalancer {
targetAssignment = segmentAssignment.rebalanceTable(currentAssignment,
instancePartitionsMap, sortedTiers,
tierToInstancePartitionsMap, rebalanceConfig);
} catch (Exception e) {
- LOGGER.warn("For rebalanceId: {}, caught exception while calculating
target assignment for table: {}, "
- + "aborting the rebalance", rebalanceJobId, tableNameWithType, e);
+ onReturnFailure(String.format(
+ "For rebalanceId: %s, caught exception while calculating target
assignment for table: %s, aborting the "
+ + "rebalance", rebalanceJobId, tableNameWithType), e);
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
"Caught exception while calculating target assignment: " + e,
instancePartitionsMap,
tierToInstancePartitionsMap, null);
@@ -273,6 +277,9 @@ public class TableRebalancer {
if (segmentAssignmentUnchanged) {
LOGGER.info("Table: {} is already balanced", tableNameWithType);
if (instancePartitionsUnchanged && tierInstancePartitionsUnchanged) {
+ _tableRebalanceObserver.onNoop(
+ String.format("For rebalanceId: %s, instance unchanged and table:
%s is already balanced", rebalanceJobId,
+ tableNameWithType));
return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.NO_OP, "Table is already balanced",
instancePartitionsMap, tierToInstancePartitionsMap,
targetAssignment);
} else {
@@ -281,6 +288,9 @@ public class TableRebalancer {
"Instance reassigned in dry-run mode, table is already
balanced", instancePartitionsMap,
tierToInstancePartitionsMap, targetAssignment);
} else {
+ _tableRebalanceObserver.onSuccess(
+ String.format("For rebalanceId: %s, instance reassigned but
table: %s is already balanced",
+ rebalanceJobId, tableNameWithType));
return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.DONE,
"Instance reassigned, table is already balanced",
instancePartitionsMap, tierToInstancePartitionsMap,
targetAssignment);
@@ -309,16 +319,19 @@ public class TableRebalancer {
Preconditions.checkState(_helixDataAccessor.getBaseDataAccessor()
.set(idealStatePropertyKey.getPath(), idealStateRecord,
idealStateRecord.getVersion(),
AccessOption.PERSISTENT), "Failed to update IdealState");
- LOGGER.info("For rebalanceId: {}, finished rebalancing table: {} with
downtime in {}ms.", rebalanceJobId,
- tableNameWithType, System.currentTimeMillis() - startTimeMs);
+ String msg =
+ String.format("For rebalanceId: %s, finished rebalancing table: %s
with downtime in %d ms.", rebalanceJobId,
+ tableNameWithType, System.currentTimeMillis() - startTimeMs);
+ LOGGER.info(msg);
+ _tableRebalanceObserver.onSuccess(msg);
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE,
"Success with downtime (replaced IdealState with the target
segment assignment, ExternalView might not "
+ "reach the target segment assignment yet)",
instancePartitionsMap, tierToInstancePartitionsMap,
targetAssignment);
} catch (Exception e) {
- LOGGER.warn(
- "For rebalanceId: {}, caught exception while updating IdealState
for table: {}, aborting the rebalance",
- rebalanceJobId, tableNameWithType, e);
+ onReturnFailure(String.format(
+ "For rebalanceId: %s, caught exception while updating IdealState
for table: %s, aborting the rebalance",
+ rebalanceJobId, tableNameWithType), e);
return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.FAILED,
"Caught exception while updating IdealState: " + e,
instancePartitionsMap, tierToInstancePartitionsMap,
targetAssignment);
@@ -347,12 +360,10 @@ public class TableRebalancer {
if (minReplicasToKeepUpForNoDowntime >= 0) {
// For non-negative value, use it as min available replicas
if (minReplicasToKeepUpForNoDowntime >= numReplicas) {
- String errorMsg = String.format(
+ onReturnFailure(String.format(
"For rebalanceId: %s, Illegal config for
minReplicasToKeepUpForNoDowntime: %d for table: %s, "
+ "must be less than number of replicas: %d, aborting the
rebalance", rebalanceJobId,
- minReplicasToKeepUpForNoDowntime, tableNameWithType, numReplicas);
- LOGGER.warn(errorMsg);
- _tableRebalanceObserver.onError(errorMsg);
+ minReplicasToKeepUpForNoDowntime, tableNameWithType, numReplicas),
null);
return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.FAILED,
"Illegal min available replicas config", instancePartitionsMap,
tierToInstancePartitionsMap,
targetAssignment);
@@ -446,11 +457,9 @@ public class TableRebalancer {
targetAssignment =
segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap,
sortedTiers,
tierToInstancePartitionsMap, rebalanceConfig);
} catch (Exception e) {
- String errorMsg = String.format(
+ onReturnFailure(String.format(
"For rebalanceId: %s, caught exception while re-calculating
the target assignment for table: %s, "
- + "aborting the rebalance", rebalanceJobId,
tableNameWithType);
- LOGGER.warn(errorMsg, e);
- _tableRebalanceObserver.onError(errorMsg);
+ + "aborting the rebalance", rebalanceJobId,
tableNameWithType), e);
return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.FAILED,
"Caught exception while re-calculating the target assignment:
" + e, instancePartitionsMap,
tierToInstancePartitionsMap, targetAssignment);
@@ -513,11 +522,8 @@ public class TableRebalancer {
LOGGER.info("For rebalanceId: {}, version changed while updating
IdealState for table: {}", rebalanceJobId,
tableNameWithType);
} catch (Exception e) {
- String errorMsg = String.format(
- "For rebalanceId: %s, caught exception while updating IdealState
for table: %s, "
- + "aborting the rebalance", rebalanceJobId, tableNameWithType);
- LOGGER.warn(errorMsg, e);
- _tableRebalanceObserver.onError(errorMsg);
+ onReturnFailure(String.format("For rebalanceId: %s, caught exception
while updating IdealState for table: %s, "
+ + "aborting the rebalance", rebalanceJobId, tableNameWithType), e);
return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.FAILED,
"Caught exception while updating IdealState: " + e,
instancePartitionsMap, tierToInstancePartitionsMap,
targetAssignment);
@@ -525,6 +531,15 @@ public class TableRebalancer {
}
}
+ private void onReturnFailure(String errorMsg, Exception e) {
+ if (e != null) {
+ LOGGER.warn(errorMsg, e);
+ } else {
+ LOGGER.warn(errorMsg);
+ }
+ _tableRebalanceObserver.onError(errorMsg);
+ }
+
/**
* Gets the instance partitions for instance partition types and also
returns a boolean for whether they are unchanged
*/
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
index 8386544f3c..b2ea9f8e69 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
@@ -121,6 +121,16 @@ public class ZkBasedTableRebalanceObserver implements
TableRebalanceObserver {
_tableRebalanceProgressStats.setStartTimeMs(System.currentTimeMillis());
}
+ @Override
+ public void onNoop(String msg) {
+ _controllerMetrics.setValueOfTableGauge(_tableNameWithType,
ControllerGauge.TABLE_REBALANCE_IN_PROGRESS, 0);
+ long timeToFinishInSeconds = (System.currentTimeMillis() -
_tableRebalanceProgressStats.getStartTimeMs()) / 1000L;
+ _tableRebalanceProgressStats.setCompletionStatusMsg(msg);
+
_tableRebalanceProgressStats.setTimeToFinishInSeconds(timeToFinishInSeconds);
+ _tableRebalanceProgressStats.setStatus(RebalanceResult.Status.NO_OP);
+ trackStatsInZk();
+ }
+
@Override
public void onSuccess(String msg) {
Preconditions.checkState(RebalanceResult.Status.DONE !=
_tableRebalanceProgressStats.getStatus(),
@@ -140,7 +150,7 @@ public class ZkBasedTableRebalanceObserver implements
TableRebalanceObserver {
@Override
public void onError(String errorMsg) {
_controllerMetrics.setValueOfTableGauge(_tableNameWithType,
ControllerGauge.TABLE_REBALANCE_IN_PROGRESS, 0);
- long timeToFinishInSeconds = (System.currentTimeMillis() -
_tableRebalanceProgressStats.getStartTimeMs()) / 1000;
+ long timeToFinishInSeconds = (System.currentTimeMillis() -
_tableRebalanceProgressStats.getStartTimeMs()) / 1000L;
_tableRebalanceProgressStats.setTimeToFinishInSeconds(timeToFinishInSeconds);
_tableRebalanceProgressStats.setStatus(RebalanceResult.Status.FAILED);
_tableRebalanceProgressStats.setCompletionStatusMsg(errorMsg);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]