J-HowHuang commented on code in PR #16341:
URL: https://github.com/apache/pinot/pull/16341#discussion_r2211202414
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -387,6 +391,24 @@ private RebalanceResult doRebalance(TableConfig
tableConfig, RebalanceConfig reb
tierToInstancePartitionsMap, targetAssignment, preChecksResult,
summaryResult);
}
+ if (downtime &&
!StringUtils.isEmpty(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme()))
{
Review Comment:
I think in `SegmentCommitter` it checks if this is `null` to decide to
compromise to a peer url. We should check the same here otherwise we fail to
catch the case when `peerSegmentDownloadScheme=""` (though I'm not sure if
empty string is allowed)
https://github.com/apache/pinot/blob/d953d7cffcd2626cbe78648c7e599544e22bab96/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java#L99
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1813,9 +1865,63 @@ public int fetch(String segmentName) {
}
}
+ @VisibleForTesting
+ @FunctionalInterface
+ interface DataLossRiskAssessor {
+ boolean hasDataLossRisk(String segmentName);
+ }
+
+ private static class DataLossRiskAssessorImpl implements
DataLossRiskAssessor {
+ private final String _tableNameWithType;
+ private final HelixManager _helixManager;
+ private final boolean _isPeerDownloadEnabled;
+ private final boolean _isUpsertOrDedupTable;
+ private final boolean _isPauselessEnabled;
+
+ private DataLossRiskAssessorImpl(String tableNameWithType, TableConfig
tableConfig, HelixManager helixManager) {
+ _tableNameWithType = tableNameWithType;
+ _helixManager = helixManager;
+ _isPeerDownloadEnabled =
!StringUtils.isEmpty(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme());
+ _isUpsertOrDedupTable = tableConfig.isUpsertEnabled() ||
tableConfig.isDedupEnabled();
+ _isPauselessEnabled =
PauselessConsumptionUtils.isPauselessEnabled(tableConfig);
+ }
+
+ @Override
+ public boolean hasDataLossRisk(String segmentName) {
+ // If peer-download is disabled, no data loss risk exists
+ if (!_isPeerDownloadEnabled) {
+ return false;
+ }
+
+ SegmentZKMetadata segmentZKMetadata = ZKMetadataProvider
+ .getSegmentZKMetadata(_helixManager.getHelixPropertyStore(),
_tableNameWithType, segmentName);
+ if (segmentZKMetadata == null) {
+ return false;
+ }
+
+ // If the segment state is COMPLETED and the peer download URL is empty,
there is a data loss risk
+ if (!segmentZKMetadata.getStatus().isCompleted()) {
Review Comment:
Agree with @somandal , unless the table is upsert/dedup, the worst of moving
a `COMMITTING` segment results in segments with ERROR states if the new host of
the segment couldn't wait until it's done committing during its `OFFLINE ->
ONLINE` transition:
https://github.com/apache/pinot/blob/d488bd1e0881ca93f4ed50d7be63e8424590050f/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java#L574-L587
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1813,9 +1889,70 @@ public int fetch(String segmentName) {
}
}
+ @VisibleForTesting
+ @FunctionalInterface
+ interface DataLossRiskAssessor {
+ boolean hasDataLossRisk(String segmentName);
+ }
+
+ private static class DataLossRiskAssessorImpl implements
DataLossRiskAssessor {
+ private final String _tableNameWithType;
+ private final TableConfig _tableConfig;
+ private final int _minAvailableReplicas;
+ private final HelixManager _helixManager;
+ private final PinotLLCRealtimeSegmentManager
_pinotLLCRealtimeSegmentManager;
+ private final boolean _isPeerDownloadEnabled;
+ private final boolean _isPauselessEnabled;
+
+ private DataLossRiskAssessorImpl(String tableNameWithType, TableConfig
tableConfig, int minAvailableReplicas,
+ HelixManager helixManager, PinotLLCRealtimeSegmentManager
pinotLLCRealtimeSegmentManager) {
+ _tableNameWithType = tableNameWithType;
+ _tableConfig = tableConfig;
+ _minAvailableReplicas = minAvailableReplicas;
+ _helixManager = helixManager;
+ _pinotLLCRealtimeSegmentManager = pinotLLCRealtimeSegmentManager;
+ _isPeerDownloadEnabled =
!StringUtils.isEmpty(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme());
+ _isPauselessEnabled =
PauselessConsumptionUtils.isPauselessEnabled(tableConfig);
+ }
+
+ @Override
+ public boolean hasDataLossRisk(String segmentName) {
+ // If peer-download is disabled, or minAvailableReplicas > 0 no data
loss risk exists
+ if (!_isPeerDownloadEnabled || _minAvailableReplicas > 0) {
+ return false;
+ }
Review Comment:
nit: since you have had `DataLossRiskAssessor` interface, can we have a
`PeerDownloadTableDataLossRiskAssessor` and a `NoopDataLossRiskAssessor` for
example, which would be easier to maintain and read if we're adding more
assessment like this in the future?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -411,6 +433,30 @@ private RebalanceResult doRebalance(TableConfig
tableConfig, RebalanceConfig reb
tierToInstancePartitionsMap, rebalanceConfig);
}
}
+
+ // If peer-download is enabled, verify that for all segments with
changes in assignment, it is safe to rebalance
+ // Create the DataLossRiskAssessor which is used to check for data loss
scenarios if peer-download is enabled
+ // for a table
+ if
(!StringUtils.isEmpty(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme()))
{
Review Comment:
Check `getPeerSegmentDownloadScheme() != null` instead, as the previous
comment
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -387,6 +391,24 @@ private RebalanceResult doRebalance(TableConfig
tableConfig, RebalanceConfig reb
tierToInstancePartitionsMap, targetAssignment, preChecksResult,
summaryResult);
}
+ if (downtime &&
!StringUtils.isEmpty(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme()))
{
+ if (!forceDowntime) {
+ // Don't allow downtime rebalance if peer-download is enabled as it
can result in data loss
+ // The best way to rebalance peer-download enabled tables is to:
+ // - Ensure that all segments have their deep-store copy available
+ // - Pause ingestion to prevent the creation of new segments during
rebalance
+ // - set forceDowntime=true and re-try running the rebalance
+ String errorMsg = "Peer-download enabled tables cannot undergo
downtime rebalance due to the potential for "
+ + "data loss, validate all segments exist in deep store and pause
ingestion prior to setting "
+ + "forceDowntime=true to override the downtime flag";
+ tableRebalanceLogger.error(errorMsg);
+ return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.FAILED, errorMsg, instancePartitionsMap,
Review Comment:
Should add an `onReturnFailure` call here?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -489,6 +535,27 @@ private RebalanceResult doRebalance(TableConfig
tableConfig, RebalanceConfig reb
minAvailableReplicas = numCurrentAssignmentReplicas;
}
+ // Don't allow rebalance if peer-download is enabled but
minAvailableReplicas = 0 (which is similar to downtime
+ // rebalance where we can drop to 0 replicas during rebalance)
+ if (minAvailableReplicas == 0
+ &&
!StringUtils.isEmpty(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme()))
{
+ if (!forceDowntime) {
+ // Don't allow minAvailableReplicas=0 rebalance if peer-download is
enabled, as it can result in data loss.
+ // The best way to rebalance peer-download enabled tables is to:
+ // - Ensure that all segments have their deep-store copy available
+ // - Pause ingestion to prevent the creation of new segments during
rebalance
+ // - set forceDowntime=true and re-try running the rebalance
+ String errorMsg = "Peer-download enabled tables with cannot set
minAvailableReplicas=0 for rebalance due to "
+ + "the potential for data loss, validate all segments exist in
deep store and pause ingestion prior to "
+ + "setting forceDowntime=true to override the downtime flag";
+ tableRebalanceLogger.error(errorMsg);
+ return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.FAILED, errorMsg, instancePartitionsMap,
Review Comment:
Should add an `onReturnFailure` call here too
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1813,9 +1889,70 @@ public int fetch(String segmentName) {
}
}
+ @VisibleForTesting
+ @FunctionalInterface
+ interface DataLossRiskAssessor {
+ boolean hasDataLossRisk(String segmentName);
+ }
+
+ private static class DataLossRiskAssessorImpl implements
DataLossRiskAssessor {
+ private final String _tableNameWithType;
+ private final TableConfig _tableConfig;
+ private final int _minAvailableReplicas;
+ private final HelixManager _helixManager;
+ private final PinotLLCRealtimeSegmentManager
_pinotLLCRealtimeSegmentManager;
+ private final boolean _isPeerDownloadEnabled;
+ private final boolean _isPauselessEnabled;
+
+ private DataLossRiskAssessorImpl(String tableNameWithType, TableConfig
tableConfig, int minAvailableReplicas,
+ HelixManager helixManager, PinotLLCRealtimeSegmentManager
pinotLLCRealtimeSegmentManager) {
+ _tableNameWithType = tableNameWithType;
+ _tableConfig = tableConfig;
+ _minAvailableReplicas = minAvailableReplicas;
+ _helixManager = helixManager;
+ _pinotLLCRealtimeSegmentManager = pinotLLCRealtimeSegmentManager;
+ _isPeerDownloadEnabled =
!StringUtils.isEmpty(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme());
+ _isPauselessEnabled =
PauselessConsumptionUtils.isPauselessEnabled(tableConfig);
+ }
+
+ @Override
+ public boolean hasDataLossRisk(String segmentName) {
+ // If peer-download is disabled, or minAvailableReplicas > 0 no data
loss risk exists
+ if (!_isPeerDownloadEnabled || _minAvailableReplicas > 0) {
+ return false;
+ }
Review Comment:
Also I think we'll have to exclude the case when `numReplica > 1` and
`minAvailableReplicas=-1`, there's no data loss concerns in this case.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]