Jackie-Jiang commented on code in PR #14811:
URL: https://github.com/apache/pinot/pull/14811#discussion_r1917549332
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1728,15 +1736,114 @@ private boolean isTmpAndCanDelete(String filePath,
Set<String> downloadUrls, Pin
* @return the set of consuming segments for which commit was initiated
*/
public Set<String> forceCommit(String tableNameWithType, @Nullable String
partitionGroupIdsToCommit,
- @Nullable String segmentsToCommit) {
+ @Nullable String segmentsToCommit, int batchSize) {
IdealState idealState = getIdealState(tableNameWithType);
Set<String> allConsumingSegments = findConsumingSegments(idealState);
Set<String> targetConsumingSegments =
filterSegmentsToCommit(allConsumingSegments, partitionGroupIdsToCommit,
segmentsToCommit);
- sendForceCommitMessageToServers(tableNameWithType,
targetConsumingSegments);
+
+ List<Set<String>> segmentBatchList = getSegmentBatchList(idealState,
targetConsumingSegments, batchSize);
+ ExecutorService executorService = Executors.newFixedThreadPool(1);
+
+ try {
+ for (Set<String> segmentBatchToCommit : segmentBatchList) {
+ executorService.submit(() -> executeBatch(tableNameWithType,
segmentBatchToCommit));
+ }
+ } finally {
+ executorService.shutdown();
+ }
+
return targetConsumingSegments;
}
+ private void executeBatch(String tableNameWithType, Set<String>
segmentBatchToCommit) {
+ sendForceCommitMessageToServers(tableNameWithType, segmentBatchToCommit);
+
+ int attemptCount = 0;
+ try {
+ attemptCount = DEFAULT_RETRY_POLICY.attempt(() ->
isBatchSuccessful(tableNameWithType, segmentBatchToCommit));
+ } catch (AttemptsExceededException | RetriableOperationException e) {
+ String errorMsg =
+ String.format("Failed to execute the forceCommit batch of segments:
%s , attempt count: %d",
+ segmentBatchToCommit,
+ attemptCount);
+ LOGGER.error(errorMsg, e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private boolean isBatchSuccessful(String tableNameWithType,
+ Set<String> segmentBatchToCommit) {
+
+ Set<String> onlineSegmentsForTable =
+
_helixResourceManager.getOnlineSegmentsFromIdealState(tableNameWithType, false);
Review Comment:
Checking ideal state itself is not enough when pauseless consumption is
enabled. We can read the segment ZK metadata and check its status. It is
successfully committed when the status turns `DONE`.
Segment ZK metadata is per segment, so we want to skip checking the same
segment when it is already `DONE` to reduce ZK accesses.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1728,15 +1736,114 @@ private boolean isTmpAndCanDelete(String filePath,
Set<String> downloadUrls, Pin
* @return the set of consuming segments for which commit was initiated
*/
public Set<String> forceCommit(String tableNameWithType, @Nullable String
partitionGroupIdsToCommit,
- @Nullable String segmentsToCommit) {
+ @Nullable String segmentsToCommit, int batchSize) {
IdealState idealState = getIdealState(tableNameWithType);
Set<String> allConsumingSegments = findConsumingSegments(idealState);
Set<String> targetConsumingSegments =
filterSegmentsToCommit(allConsumingSegments, partitionGroupIdsToCommit,
segmentsToCommit);
- sendForceCommitMessageToServers(tableNameWithType,
targetConsumingSegments);
+
+ List<Set<String>> segmentBatchList = getSegmentBatchList(idealState,
targetConsumingSegments, batchSize);
+ ExecutorService executorService = Executors.newFixedThreadPool(1);
+
+ try {
+ for (Set<String> segmentBatchToCommit : segmentBatchList) {
+ executorService.submit(() -> executeBatch(tableNameWithType,
segmentBatchToCommit));
+ }
+ } finally {
+ executorService.shutdown();
+ }
+
return targetConsumingSegments;
}
+ private void executeBatch(String tableNameWithType, Set<String>
segmentBatchToCommit) {
+ sendForceCommitMessageToServers(tableNameWithType, segmentBatchToCommit);
+
+ int attemptCount = 0;
+ try {
+ attemptCount = DEFAULT_RETRY_POLICY.attempt(() ->
isBatchSuccessful(tableNameWithType, segmentBatchToCommit));
+ } catch (AttemptsExceededException | RetriableOperationException e) {
+ String errorMsg =
+ String.format("Failed to execute the forceCommit batch of segments:
%s , attempt count: %d",
Review Comment:
We want to log the individual segments not successfully committed. Currently
there is no way to differentiate the ones committed and ones failed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]