noob-se7en commented on code in PR #14811:
URL: https://github.com/apache/pinot/pull/14811#discussion_r1917460537
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1728,15 +1736,111 @@ private boolean isTmpAndCanDelete(String filePath,
Set<String> downloadUrls, Pin
* @return the set of consuming segments for which commit was initiated
*/
public Set<String> forceCommit(String tableNameWithType, @Nullable String
partitionGroupIdsToCommit,
- @Nullable String segmentsToCommit) {
+ @Nullable String segmentsToCommit, int batchSize) {
IdealState idealState = getIdealState(tableNameWithType);
Set<String> allConsumingSegments = findConsumingSegments(idealState);
Set<String> targetConsumingSegments =
filterSegmentsToCommit(allConsumingSegments, partitionGroupIdsToCommit,
segmentsToCommit);
- sendForceCommitMessageToServers(tableNameWithType,
targetConsumingSegments);
+
+ List<Set<String>> segmentBatchList = getSegmentBatchList(idealState,
targetConsumingSegments, batchSize);
+ ExecutorService executorService = Executors.newFixedThreadPool(1);
+
+ try {
+ for (Set<String> segmentBatchToCommit : segmentBatchList) {
+ executorService.submit(() -> executeBatch(tableNameWithType,
segmentBatchToCommit));
+ }
+ } finally {
+ executorService.shutdown();
+ }
+
return targetConsumingSegments;
}
+ private void executeBatch(String tableNameWithType, Set<String>
segmentBatchToCommit) {
+ sendForceCommitMessageToServers(tableNameWithType, segmentBatchToCommit);
+
+ int attemptCount = 0;
+ try {
+ attemptCount = DEFAULT_RETRY_POLICY.attempt(() ->
isBatchSuccessful(tableNameWithType, segmentBatchToCommit));
+ } catch (AttemptsExceededException | RetriableOperationException e) {
+ String errorMsg =
+ String.format("Failed to execute the forceCommit batch of segments:
%s , attempt count: %d",
+ segmentBatchToCommit,
+ attemptCount);
+ LOGGER.error(errorMsg, e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private boolean isBatchSuccessful(String tableNameWithType,
+ Set<String> segmentBatchToCommit) {
+
+ Set<String> onlineSegmentsForTable =
+
_helixResourceManager.getOnlineSegmentsFromIdealState(tableNameWithType, false);
+
+ for (String segmentName : segmentBatchToCommit) {
+ if (!onlineSegmentsForTable.contains(segmentName)) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ private List<Set<String>> getSegmentBatchList(IdealState idealState,
Set<String> targetConsumingSegments,
+ int batchSize) {
+ Map<String, Queue<String>> instanceToConsumingSegments =
+ getInstanceToConsumingSegments(idealState, targetConsumingSegments);
+
+ Set<String> segmentsAdded = new HashSet<>();
+ List<Set<String>> segmentBatchList = new ArrayList<>();
+ Set<String> currentBatch = new HashSet<>();
+ boolean segmentsRemaining = true;
+
+ while (segmentsRemaining) {
+ segmentsRemaining = false;
+ for (Queue<String> queue : instanceToConsumingSegments.values()) {
+ if (!queue.isEmpty()) {
+ String segmentName = queue.poll();
+ if (segmentsAdded.contains(segmentName)) {
+ continue;
+ }
+ currentBatch.add(segmentName);
+ segmentsAdded.add(segmentName);
+ if (currentBatch.size() == batchSize) {
+ segmentBatchList.add(currentBatch);
+ currentBatch = new HashSet<>();
+ }
+ segmentsRemaining = true;
+ }
+ }
+ }
Review Comment:
Need to simplify this. Logic is required so that /forceCommit is fast and
uses max parallelism across servers
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]