Jackie-Jiang commented on code in PR #12088:
URL: https://github.com/apache/pinot/pull/12088#discussion_r1416183638


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1559,12 +1560,54 @@ private boolean isTmpAndCanDelete(URI uri, Set<String> 
deepURIs, PinotFS pinotFS
 
   /**
    * Force commit the current segments in consuming state and restart 
consumption
+   * Commit all partitions unless either partitionsToCommit or 
segmentsToCommit are provided.
+   *
+   * @param tableNameWithType  table name with type
+   * @param partitionGroupIdsToCommit  comma separated list of partition group 
IDs to commit
+   * @param segmentsToCommit  comma separated list of consuming segments to 
commit
+   * @return the set of consuming segments for which commit was initiated
    */
-  public Set<String> forceCommit(String tableNameWithType) {
+  public Set<String> forceCommit(String tableNameWithType, @Nullable String 
partitionGroupIdsToCommit,
+      @Nullable String segmentsToCommit) {
     IdealState idealState = getIdealState(tableNameWithType);
-    Set<String> consumingSegments = findConsumingSegments(idealState);
-    sendForceCommitMessageToServers(tableNameWithType, consumingSegments);
-    return consumingSegments;
+    Set<String> allConsumingSegments = findConsumingSegments(idealState);
+    Set<String> targetConsumingSegments = 
filterSegmentsToCommit(allConsumingSegments, partitionGroupIdsToCommit,
+        segmentsToCommit);
+    sendForceCommitMessageToServers(tableNameWithType, 
targetConsumingSegments);
+    return targetConsumingSegments;
+  }
+
+  /**
+   * Among all consuming segments, filter the ones that are in the given 
partitions or segments.
+   */
+  private Set<String> filterSegmentsToCommit(Set<String> allConsumingSegments,
+      @Nullable String partitionGroupIdsToCommitStr, @Nullable String 
segmentsToCommitStr) {
+    if (partitionGroupIdsToCommitStr == null && segmentsToCommitStr == null) {
+      return allConsumingSegments;
+    }
+    Preconditions.checkState(partitionGroupIdsToCommitStr == null || 
segmentsToCommitStr == null,

Review Comment:
   (minor) Suggest moving this check to the first line of `forceCommit()` and 
change it to `checkArgument()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to