somandal commented on code in PR #16341:
URL: https://github.com/apache/pinot/pull/16341#discussion_r2257028526


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1813,9 +1880,103 @@ public int fetch(String segmentName) {
     }
   }
 
+  @VisibleForTesting
+  @FunctionalInterface
+  interface DataLossRiskAssessor {
+    /**
+     * Assess the risk of data loss for the given segment.
+     *
+     * @param segmentName Name of the segment to assess
+     * @return A pair where the first element indicates if there is a risk of 
data loss, and the second element is a
+     *         message describing the risk (if any).
+     */
+    Pair<Boolean, String> assessDataLossRisk(String segmentName);
+  }
+
+  /**
+   * To be used for non-peer download enabled tables or peer-download enabled 
tables rebalanced with
+   * minAvailableReplicas > 0
+   */
+  @VisibleForTesting
+  static class NoOpRiskAssessor implements DataLossRiskAssessor {
+    NoOpRiskAssessor() {
+    }
+
+    @Override
+    public Pair<Boolean, String> assessDataLossRisk(String segmentName) {
+      return Pair.of(false, "");
+    }
+  }
+
+  /**
+   * To be used for peer-download enabled tables with downtime=true or 
minAvailableReplicas=0
+   */
+  @VisibleForTesting
+  static class PeerDownloadTableDataLossRiskAssessor implements 
DataLossRiskAssessor {
+    private final String _tableNameWithType;
+    private final TableConfig _tableConfig;
+    private final HelixManager _helixManager;
+    private final PinotLLCRealtimeSegmentManager 
_pinotLLCRealtimeSegmentManager;
+    private final boolean _isPauselessEnabled;
+
+    @VisibleForTesting
+    PeerDownloadTableDataLossRiskAssessor(String tableNameWithType, 
TableConfig tableConfig,
+        int minAvailableReplicas, HelixManager helixManager,
+        PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager) {
+      // Should only be created for peer-download enabled tables with 
minAvailableReplicas = 0
+      
Preconditions.checkState(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme()
 != null
+          && minAvailableReplicas == 0);
+      _tableNameWithType = tableNameWithType;
+      _tableConfig = tableConfig;
+      _helixManager = helixManager;
+      _pinotLLCRealtimeSegmentManager = pinotLLCRealtimeSegmentManager;
+      _isPauselessEnabled = 
PauselessConsumptionUtils.isPauselessEnabled(tableConfig);
+    }
+
+    @Override
+    public Pair<Boolean, String> assessDataLossRisk(String segmentName) {
+      SegmentZKMetadata segmentZKMetadata = ZKMetadataProvider
+          .getSegmentZKMetadata(_helixManager.getHelixPropertyStore(), 
_tableNameWithType, segmentName);
+      if (segmentZKMetadata == null) {
+        return Pair.of(false, "");
+      }
+
+      // If the segment state is COMPLETED and the download URL is empty, 
there is a data loss risk
+      if (segmentZKMetadata.getStatus().isCompleted() && 
CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(
+          segmentZKMetadata.getDownloadUrl())) {
+        return Pair.of(true, generateDataLossRiskMessage(segmentName));
+      }
+
+      // If the segment is not yet completed, then the following scenarios are 
possible:
+      // - Non-upsert / non-dedup table:
+      //     - data loss scenarios are not possible. Either the segment will 
restart consumption or the
+      //       RealtimeSegmentValidationManager will kick in to fix up the 
segment if pauseless is enabled
+      // - Upsert / dedup table:
+      //     - For non-pauseless tables, it is safe to move the segment 
without data loss concerns
+      //     - For pauseless tables, if the segment is still in CONSUMING 
state, moving it is safe, but if it is in
+      //       COMMITTING state then there is a risk of data loss on segment 
build failures as well since the
+      //       RealtimeSegmentValidationManager does not automatically try to 
fix up these segments. To be safe it is
+      //       best to return that there is a risk of data loss for pauseless 
enabled tables for segments in COMMITTING
+      //       state
+      if (_isPauselessEnabled && segmentZKMetadata.getStatus() == 
CommonConstants.Segment.Realtime.Status.COMMITTING
+          && 
!_pinotLLCRealtimeSegmentManager.allowRepairOfErrorSegments(false, 
_tableConfig)) {
+        return Pair.of(true, generateDataLossRiskMessage(segmentName));
+      }
+      return Pair.of(false, "");
+    }
+
+    private static String generateDataLossRiskMessage(String segmentName) {
+      return "Moving segment " + segmentName + " as part of rebalance is risky 
for peer-download "
+          + "enabled tables, ensure the deep store has a copy of the segment 
and if upsert / dedup enabled "
+          + "that it is completed and try again. It is recommended to 
forceCommit and pause ingestion prior to "

Review Comment:
   I think this is just to be more careful - force commit will ensure that 
consuming segments have been persisted and user should verify the copy exists 
in deep store, no harm in suggesting? wdyt?
   
   also, we don't commit paused streams at all, right? I wasn't entirely sure 
of this part - like do we allow time based commit on paused streams?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to