Jackie-Jiang commented on code in PR #14811:
URL: https://github.com/apache/pinot/pull/14811#discussion_r1931299015
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -213,6 +222,7 @@ public
PinotLLCRealtimeSegmentManager(PinotHelixResourceManager helixResourceMan
controllerConf.getDeepStoreRetryUploadParallelism()) : null;
_deepStoreUploadExecutorPendingSegments =
_isDeepStoreLLCSegmentUploadRetryEnabled ?
ConcurrentHashMap.newKeySet() : null;
+ _forceCommitExecutorService = Executors.newFixedThreadPool(4);
Review Comment:
Having a fixed size pool could actually cause problems when there are
multiple force commit request. Since it is waiting most of the time, I'd
actually suggest making a single thread pool for each request same as the last
version. It is not query path so the overhead should be fine.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1848,15 +1860,122 @@ private boolean isTmpAndCanDelete(String filePath,
Set<String> downloadUrls, Pin
* @return the set of consuming segments for which commit was initiated
*/
public Set<String> forceCommit(String tableNameWithType, @Nullable String
partitionGroupIdsToCommit,
- @Nullable String segmentsToCommit) {
+ @Nullable String segmentsToCommit, int batchSize) {
IdealState idealState = getIdealState(tableNameWithType);
Set<String> allConsumingSegments = findConsumingSegments(idealState);
Set<String> targetConsumingSegments =
filterSegmentsToCommit(allConsumingSegments, partitionGroupIdsToCommit,
segmentsToCommit);
- sendForceCommitMessageToServers(tableNameWithType,
targetConsumingSegments);
+
+ List<Set<String>> segmentBatchList = getSegmentBatchList(idealState,
targetConsumingSegments, batchSize);
+
+ _forceCommitExecutorService.submit(() ->
processBatchesSequentially(segmentBatchList, tableNameWithType));
+
return targetConsumingSegments;
}
+ private void processBatchesSequentially(List<Set<String>> segmentBatchList,
String tableNameWithType) {
+ Set<String> prevBatch = null;
+ for (Set<String> segmentBatchToCommit: segmentBatchList) {
+ if (prevBatch != null) {
+ waitUntilPrevBatchIsComplete(tableNameWithType, prevBatch);
+ }
+ sendForceCommitMessageToServers(tableNameWithType, segmentBatchToCommit);
+ prevBatch = segmentBatchToCommit;
+ }
+ }
+
+ private void waitUntilPrevBatchIsComplete(String tableNameWithType,
Set<String> segmentBatchToCommit) {
+
+ try {
+ Thread.sleep(FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS);
+ } catch (InterruptedException ignored) {
+ }
+
+ int attemptCount = 0;
+ final Set<String>[] segmentsYetToBeCommitted = new Set[]{new HashSet<>()};
+ try {
+ attemptCount = DEFAULT_RETRY_POLICY.attempt(() -> {
+ segmentsYetToBeCommitted[0] =
getSegmentsYetToBeCommitted(tableNameWithType, segmentBatchToCommit);
+ return segmentsYetToBeCommitted[0].isEmpty();
+ });
+ } catch (AttemptsExceededException | RetriableOperationException e) {
+ String errorMsg = String.format(
+ "Exception occurred while executing the forceCommit batch of
segments: %s, attempt count: %d, "
+ + "segmentsYetToBeCommitted: %s",
+ segmentBatchToCommit, attemptCount, segmentsYetToBeCommitted[0]);
+ LOGGER.error(errorMsg, e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @VisibleForTesting
+ List<Set<String>> getSegmentBatchList(IdealState idealState, Set<String>
targetConsumingSegments,
+ int batchSize) {
+ Map<String, Queue<String>> instanceToConsumingSegments =
+ getInstanceToConsumingSegments(idealState, targetConsumingSegments);
+
+ List<Set<String>> segmentBatchList = new ArrayList<>();
+ Set<String> currentBatch = new HashSet<>();
+ Set<String> segmentsAdded = new HashSet<>();
+ boolean segmentsRemaining = true;
+
+ while (segmentsRemaining) {
+ segmentsRemaining = false;
+ // pick segments in round-robin fashion to parallelize
+ // forceCommit across max servers
+ for (Queue<String> queue : instanceToConsumingSegments.values()) {
+ if (!queue.isEmpty()) {
+ segmentsRemaining = true;
+ String segmentName = queue.poll();
+ // there might be a segment replica hosted on
+ // another instance added before
+ if (segmentsAdded.contains(segmentName)) {
+ continue;
+ }
+ currentBatch.add(segmentName);
+ segmentsAdded.add(segmentName);
+ if (currentBatch.size() == batchSize) {
+ segmentBatchList.add(currentBatch);
+ currentBatch = new HashSet<>();
+ }
+ }
+ }
+ }
+
+ if (!currentBatch.isEmpty()) {
+ segmentBatchList.add(currentBatch);
+ }
+ return segmentBatchList;
+ }
+
+ @VisibleForTesting
+ Map<String, Queue<String>> getInstanceToConsumingSegments(IdealState
idealState,
+ Set<String> targetConsumingSegments) {
+ Map<String, Queue<String>> instanceToConsumingSegments = new HashMap<>();
+
+ Map<String, Map<String, String>> segmentNameToInstanceToStateMap =
idealState.getRecord().getMapFields();
+ for (String segmentName : segmentNameToInstanceToStateMap.keySet()) {
+ if (!targetConsumingSegments.contains(segmentName)) {
Review Comment:
Let's loop over `targetConsumingSegments` instead of ideal state. Ideal
state should always contain `targetConsumingSegments` because they are
extracted from ideal state.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -152,6 +157,9 @@ public class PinotLLCRealtimeSegmentManager {
// Max time to wait for all LLC segments to complete committing their
metadata while stopping the controller.
private static final long MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS =
30_000L;
+ private static final int FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS = 15000;
Review Comment:
Let's take the check interval also from the rest API because different use
case might want different interval; we might also want to add a TIMEOUT and
also take that from rest API. The retry count can be calculated from timeout
and interval.
We can provide default values (e.g. 5s, 3min) for them in case they are not
provided. IMO 15s interval is too long because it means for each batch we will
wait at least 15s.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1848,15 +1860,122 @@ private boolean isTmpAndCanDelete(String filePath,
Set<String> downloadUrls, Pin
* @return the set of consuming segments for which commit was initiated
*/
public Set<String> forceCommit(String tableNameWithType, @Nullable String
partitionGroupIdsToCommit,
- @Nullable String segmentsToCommit) {
+ @Nullable String segmentsToCommit, int batchSize) {
IdealState idealState = getIdealState(tableNameWithType);
Set<String> allConsumingSegments = findConsumingSegments(idealState);
Set<String> targetConsumingSegments =
filterSegmentsToCommit(allConsumingSegments, partitionGroupIdsToCommit,
segmentsToCommit);
- sendForceCommitMessageToServers(tableNameWithType,
targetConsumingSegments);
+
+ List<Set<String>> segmentBatchList = getSegmentBatchList(idealState,
targetConsumingSegments, batchSize);
+
+ _forceCommitExecutorService.submit(() ->
processBatchesSequentially(segmentBatchList, tableNameWithType));
+
return targetConsumingSegments;
}
+ private void processBatchesSequentially(List<Set<String>> segmentBatchList,
String tableNameWithType) {
+ Set<String> prevBatch = null;
+ for (Set<String> segmentBatchToCommit: segmentBatchList) {
+ if (prevBatch != null) {
+ waitUntilPrevBatchIsComplete(tableNameWithType, prevBatch);
+ }
+ sendForceCommitMessageToServers(tableNameWithType, segmentBatchToCommit);
+ prevBatch = segmentBatchToCommit;
+ }
+ }
+
+ private void waitUntilPrevBatchIsComplete(String tableNameWithType,
Set<String> segmentBatchToCommit) {
+
+ try {
+ Thread.sleep(FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS);
+ } catch (InterruptedException ignored) {
+ }
+
+ int attemptCount = 0;
+ final Set<String>[] segmentsYetToBeCommitted = new Set[]{new HashSet<>()};
+ try {
+ attemptCount = DEFAULT_RETRY_POLICY.attempt(() -> {
+ segmentsYetToBeCommitted[0] =
getSegmentsYetToBeCommitted(tableNameWithType, segmentBatchToCommit);
+ return segmentsYetToBeCommitted[0].isEmpty();
+ });
+ } catch (AttemptsExceededException | RetriableOperationException e) {
+ String errorMsg = String.format(
+ "Exception occurred while executing the forceCommit batch of
segments: %s, attempt count: %d, "
+ + "segmentsYetToBeCommitted: %s",
+ segmentBatchToCommit, attemptCount, segmentsYetToBeCommitted[0]);
+ LOGGER.error(errorMsg, e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @VisibleForTesting
+ List<Set<String>> getSegmentBatchList(IdealState idealState, Set<String>
targetConsumingSegments,
+ int batchSize) {
+ Map<String, Queue<String>> instanceToConsumingSegments =
+ getInstanceToConsumingSegments(idealState, targetConsumingSegments);
+
+ List<Set<String>> segmentBatchList = new ArrayList<>();
+ Set<String> currentBatch = new HashSet<>();
+ Set<String> segmentsAdded = new HashSet<>();
+ boolean segmentsRemaining = true;
+
+ while (segmentsRemaining) {
+ segmentsRemaining = false;
+ // pick segments in round-robin fashion to parallelize
+ // forceCommit across max servers
+ for (Queue<String> queue : instanceToConsumingSegments.values()) {
+ if (!queue.isEmpty()) {
Review Comment:
We can remove the queue when it is empty to avoid checking it again and
again. You may use iterator to remove entry without extra lookup
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1848,15 +1860,122 @@ private boolean isTmpAndCanDelete(String filePath,
Set<String> downloadUrls, Pin
* @return the set of consuming segments for which commit was initiated
*/
public Set<String> forceCommit(String tableNameWithType, @Nullable String
partitionGroupIdsToCommit,
- @Nullable String segmentsToCommit) {
+ @Nullable String segmentsToCommit, int batchSize) {
IdealState idealState = getIdealState(tableNameWithType);
Set<String> allConsumingSegments = findConsumingSegments(idealState);
Set<String> targetConsumingSegments =
filterSegmentsToCommit(allConsumingSegments, partitionGroupIdsToCommit,
segmentsToCommit);
- sendForceCommitMessageToServers(tableNameWithType,
targetConsumingSegments);
+
+ List<Set<String>> segmentBatchList = getSegmentBatchList(idealState,
targetConsumingSegments, batchSize);
+
+ _forceCommitExecutorService.submit(() ->
processBatchesSequentially(segmentBatchList, tableNameWithType));
+
return targetConsumingSegments;
}
+ private void processBatchesSequentially(List<Set<String>> segmentBatchList,
String tableNameWithType) {
+ Set<String> prevBatch = null;
+ for (Set<String> segmentBatchToCommit: segmentBatchList) {
+ if (prevBatch != null) {
+ waitUntilPrevBatchIsComplete(tableNameWithType, prevBatch);
+ }
+ sendForceCommitMessageToServers(tableNameWithType, segmentBatchToCommit);
+ prevBatch = segmentBatchToCommit;
+ }
+ }
+
+ private void waitUntilPrevBatchIsComplete(String tableNameWithType,
Set<String> segmentBatchToCommit) {
+
+ try {
+ Thread.sleep(FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS);
+ } catch (InterruptedException ignored) {
+ }
+
+ int attemptCount = 0;
+ final Set<String>[] segmentsYetToBeCommitted = new Set[]{new HashSet<>()};
+ try {
+ attemptCount = DEFAULT_RETRY_POLICY.attempt(() -> {
+ segmentsYetToBeCommitted[0] =
getSegmentsYetToBeCommitted(tableNameWithType, segmentBatchToCommit);
+ return segmentsYetToBeCommitted[0].isEmpty();
+ });
+ } catch (AttemptsExceededException | RetriableOperationException e) {
+ String errorMsg = String.format(
+ "Exception occurred while executing the forceCommit batch of
segments: %s, attempt count: %d, "
+ + "segmentsYetToBeCommitted: %s",
+ segmentBatchToCommit, attemptCount, segmentsYetToBeCommitted[0]);
+ LOGGER.error(errorMsg, e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @VisibleForTesting
+ List<Set<String>> getSegmentBatchList(IdealState idealState, Set<String>
targetConsumingSegments,
+ int batchSize) {
+ Map<String, Queue<String>> instanceToConsumingSegments =
+ getInstanceToConsumingSegments(idealState, targetConsumingSegments);
+
+ List<Set<String>> segmentBatchList = new ArrayList<>();
+ Set<String> currentBatch = new HashSet<>();
+ Set<String> segmentsAdded = new HashSet<>();
+ boolean segmentsRemaining = true;
+
+ while (segmentsRemaining) {
+ segmentsRemaining = false;
+ // pick segments in round-robin fashion to parallelize
Review Comment:
Smart!
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1848,15 +1860,122 @@ private boolean isTmpAndCanDelete(String filePath,
Set<String> downloadUrls, Pin
* @return the set of consuming segments for which commit was initiated
*/
public Set<String> forceCommit(String tableNameWithType, @Nullable String
partitionGroupIdsToCommit,
- @Nullable String segmentsToCommit) {
+ @Nullable String segmentsToCommit, int batchSize) {
IdealState idealState = getIdealState(tableNameWithType);
Set<String> allConsumingSegments = findConsumingSegments(idealState);
Set<String> targetConsumingSegments =
filterSegmentsToCommit(allConsumingSegments, partitionGroupIdsToCommit,
segmentsToCommit);
- sendForceCommitMessageToServers(tableNameWithType,
targetConsumingSegments);
+
+ List<Set<String>> segmentBatchList = getSegmentBatchList(idealState,
targetConsumingSegments, batchSize);
+
+ _forceCommitExecutorService.submit(() ->
processBatchesSequentially(segmentBatchList, tableNameWithType));
+
return targetConsumingSegments;
}
+ private void processBatchesSequentially(List<Set<String>> segmentBatchList,
String tableNameWithType) {
+ Set<String> prevBatch = null;
+ for (Set<String> segmentBatchToCommit: segmentBatchList) {
+ if (prevBatch != null) {
+ waitUntilPrevBatchIsComplete(tableNameWithType, prevBatch);
+ }
+ sendForceCommitMessageToServers(tableNameWithType, segmentBatchToCommit);
+ prevBatch = segmentBatchToCommit;
+ }
+ }
+
+ private void waitUntilPrevBatchIsComplete(String tableNameWithType,
Set<String> segmentBatchToCommit) {
+
+ try {
+ Thread.sleep(FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS);
+ } catch (InterruptedException ignored) {
Review Comment:
Ignoring interrupt could be risky (holding a long running thread). Let's
wrap it as a `RuntimeException` and throw it. We may log an error when catching
it
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1848,15 +1860,122 @@ private boolean isTmpAndCanDelete(String filePath,
Set<String> downloadUrls, Pin
* @return the set of consuming segments for which commit was initiated
*/
public Set<String> forceCommit(String tableNameWithType, @Nullable String
partitionGroupIdsToCommit,
- @Nullable String segmentsToCommit) {
+ @Nullable String segmentsToCommit, int batchSize) {
IdealState idealState = getIdealState(tableNameWithType);
Set<String> allConsumingSegments = findConsumingSegments(idealState);
Set<String> targetConsumingSegments =
filterSegmentsToCommit(allConsumingSegments, partitionGroupIdsToCommit,
segmentsToCommit);
- sendForceCommitMessageToServers(tableNameWithType,
targetConsumingSegments);
+
+ List<Set<String>> segmentBatchList = getSegmentBatchList(idealState,
targetConsumingSegments, batchSize);
+
+ _forceCommitExecutorService.submit(() ->
processBatchesSequentially(segmentBatchList, tableNameWithType));
+
return targetConsumingSegments;
}
+ private void processBatchesSequentially(List<Set<String>> segmentBatchList,
String tableNameWithType) {
+ Set<String> prevBatch = null;
+ for (Set<String> segmentBatchToCommit: segmentBatchList) {
+ if (prevBatch != null) {
+ waitUntilPrevBatchIsComplete(tableNameWithType, prevBatch);
+ }
+ sendForceCommitMessageToServers(tableNameWithType, segmentBatchToCommit);
+ prevBatch = segmentBatchToCommit;
+ }
+ }
+
+ private void waitUntilPrevBatchIsComplete(String tableNameWithType,
Set<String> segmentBatchToCommit) {
+
+ try {
+ Thread.sleep(FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS);
+ } catch (InterruptedException ignored) {
+ }
+
+ int attemptCount = 0;
+ final Set<String>[] segmentsYetToBeCommitted = new Set[]{new HashSet<>()};
+ try {
+ attemptCount = DEFAULT_RETRY_POLICY.attempt(() -> {
+ segmentsYetToBeCommitted[0] =
getSegmentsYetToBeCommitted(tableNameWithType, segmentBatchToCommit);
+ return segmentsYetToBeCommitted[0].isEmpty();
+ });
+ } catch (AttemptsExceededException | RetriableOperationException e) {
+ String errorMsg = String.format(
+ "Exception occurred while executing the forceCommit batch of
segments: %s, attempt count: %d, "
+ + "segmentsYetToBeCommitted: %s",
+ segmentBatchToCommit, attemptCount, segmentsYetToBeCommitted[0]);
+ LOGGER.error(errorMsg, e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @VisibleForTesting
+ List<Set<String>> getSegmentBatchList(IdealState idealState, Set<String>
targetConsumingSegments,
+ int batchSize) {
+ Map<String, Queue<String>> instanceToConsumingSegments =
+ getInstanceToConsumingSegments(idealState, targetConsumingSegments);
+
+ List<Set<String>> segmentBatchList = new ArrayList<>();
+ Set<String> currentBatch = new HashSet<>();
+ Set<String> segmentsAdded = new HashSet<>();
+ boolean segmentsRemaining = true;
+
+ while (segmentsRemaining) {
+ segmentsRemaining = false;
+ // pick segments in round-robin fashion to parallelize
+ // forceCommit across max servers
+ for (Queue<String> queue : instanceToConsumingSegments.values()) {
+ if (!queue.isEmpty()) {
+ segmentsRemaining = true;
+ String segmentName = queue.poll();
+ // there might be a segment replica hosted on
+ // another instance added before
+ if (segmentsAdded.contains(segmentName)) {
+ continue;
+ }
+ currentBatch.add(segmentName);
+ segmentsAdded.add(segmentName);
+ if (currentBatch.size() == batchSize) {
+ segmentBatchList.add(currentBatch);
+ currentBatch = new HashSet<>();
+ }
+ }
+ }
+ }
+
+ if (!currentBatch.isEmpty()) {
+ segmentBatchList.add(currentBatch);
+ }
+ return segmentBatchList;
+ }
+
+ @VisibleForTesting
+ Map<String, Queue<String>> getInstanceToConsumingSegments(IdealState
idealState,
+ Set<String> targetConsumingSegments) {
+ Map<String, Queue<String>> instanceToConsumingSegments = new HashMap<>();
+
+ Map<String, Map<String, String>> segmentNameToInstanceToStateMap =
idealState.getRecord().getMapFields();
+ for (String segmentName : segmentNameToInstanceToStateMap.keySet()) {
+ if (!targetConsumingSegments.contains(segmentName)) {
+ continue;
+ }
+ Map<String, String> instanceToStateMap =
segmentNameToInstanceToStateMap.get(segmentName);
+ for (String instance : instanceToStateMap.keySet()) {
+ String state = instanceToStateMap.get(instance);
+ if (state.equals(SegmentStateModel.CONSUMING)) {
+ instanceToConsumingSegments.compute(instance, (key, value) -> {
+ if (value == null) {
+ value = new LinkedList<>();
+ }
+ value.add(segmentName);
+ return value;
+ });
Review Comment:
```suggestion
instanceToConsumingSegments.computeIfAbsent(instance, k -> new
LinkedList<>()).add(segmentName);
```
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1848,15 +1860,122 @@ private boolean isTmpAndCanDelete(String filePath,
Set<String> downloadUrls, Pin
* @return the set of consuming segments for which commit was initiated
*/
public Set<String> forceCommit(String tableNameWithType, @Nullable String
partitionGroupIdsToCommit,
- @Nullable String segmentsToCommit) {
+ @Nullable String segmentsToCommit, int batchSize) {
IdealState idealState = getIdealState(tableNameWithType);
Set<String> allConsumingSegments = findConsumingSegments(idealState);
Set<String> targetConsumingSegments =
filterSegmentsToCommit(allConsumingSegments, partitionGroupIdsToCommit,
segmentsToCommit);
- sendForceCommitMessageToServers(tableNameWithType,
targetConsumingSegments);
+
+ List<Set<String>> segmentBatchList = getSegmentBatchList(idealState,
targetConsumingSegments, batchSize);
+
+ _forceCommitExecutorService.submit(() ->
processBatchesSequentially(segmentBatchList, tableNameWithType));
+
return targetConsumingSegments;
}
+ private void processBatchesSequentially(List<Set<String>> segmentBatchList,
String tableNameWithType) {
+ Set<String> prevBatch = null;
+ for (Set<String> segmentBatchToCommit: segmentBatchList) {
+ if (prevBatch != null) {
+ waitUntilPrevBatchIsComplete(tableNameWithType, prevBatch);
+ }
+ sendForceCommitMessageToServers(tableNameWithType, segmentBatchToCommit);
+ prevBatch = segmentBatchToCommit;
+ }
+ }
+
+ private void waitUntilPrevBatchIsComplete(String tableNameWithType,
Set<String> segmentBatchToCommit) {
+
+ try {
+ Thread.sleep(FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS);
+ } catch (InterruptedException ignored) {
+ }
+
+ int attemptCount = 0;
+ final Set<String>[] segmentsYetToBeCommitted = new Set[]{new HashSet<>()};
+ try {
+ attemptCount = DEFAULT_RETRY_POLICY.attempt(() -> {
+ segmentsYetToBeCommitted[0] =
getSegmentsYetToBeCommitted(tableNameWithType, segmentBatchToCommit);
+ return segmentsYetToBeCommitted[0].isEmpty();
+ });
+ } catch (AttemptsExceededException | RetriableOperationException e) {
+ String errorMsg = String.format(
+ "Exception occurred while executing the forceCommit batch of
segments: %s, attempt count: %d, "
+ + "segmentsYetToBeCommitted: %s",
+ segmentBatchToCommit, attemptCount, segmentsYetToBeCommitted[0]);
+ LOGGER.error(errorMsg, e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @VisibleForTesting
+ List<Set<String>> getSegmentBatchList(IdealState idealState, Set<String>
targetConsumingSegments,
+ int batchSize) {
+ Map<String, Queue<String>> instanceToConsumingSegments =
+ getInstanceToConsumingSegments(idealState, targetConsumingSegments);
+
+ List<Set<String>> segmentBatchList = new ArrayList<>();
+ Set<String> currentBatch = new HashSet<>();
+ Set<String> segmentsAdded = new HashSet<>();
+ boolean segmentsRemaining = true;
+
+ while (segmentsRemaining) {
+ segmentsRemaining = false;
+ // pick segments in round-robin fashion to parallelize
+ // forceCommit across max servers
+ for (Queue<String> queue : instanceToConsumingSegments.values()) {
+ if (!queue.isEmpty()) {
+ segmentsRemaining = true;
+ String segmentName = queue.poll();
+ // there might be a segment replica hosted on
+ // another instance added before
+ if (segmentsAdded.contains(segmentName)) {
Review Comment:
We can reduce a lookup by
```suggestion
if (!segmentsAdded.add(segmentName)) {
```
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1848,15 +1860,122 @@ private boolean isTmpAndCanDelete(String filePath,
Set<String> downloadUrls, Pin
* @return the set of consuming segments for which commit was initiated
*/
public Set<String> forceCommit(String tableNameWithType, @Nullable String
partitionGroupIdsToCommit,
- @Nullable String segmentsToCommit) {
+ @Nullable String segmentsToCommit, int batchSize) {
IdealState idealState = getIdealState(tableNameWithType);
Set<String> allConsumingSegments = findConsumingSegments(idealState);
Set<String> targetConsumingSegments =
filterSegmentsToCommit(allConsumingSegments, partitionGroupIdsToCommit,
segmentsToCommit);
- sendForceCommitMessageToServers(tableNameWithType,
targetConsumingSegments);
+
+ List<Set<String>> segmentBatchList = getSegmentBatchList(idealState,
targetConsumingSegments, batchSize);
+
+ _forceCommitExecutorService.submit(() ->
processBatchesSequentially(segmentBatchList, tableNameWithType));
+
return targetConsumingSegments;
}
+ private void processBatchesSequentially(List<Set<String>> segmentBatchList,
String tableNameWithType) {
+ Set<String> prevBatch = null;
+ for (Set<String> segmentBatchToCommit: segmentBatchList) {
+ if (prevBatch != null) {
+ waitUntilPrevBatchIsComplete(tableNameWithType, prevBatch);
+ }
+ sendForceCommitMessageToServers(tableNameWithType, segmentBatchToCommit);
+ prevBatch = segmentBatchToCommit;
+ }
+ }
+
+ private void waitUntilPrevBatchIsComplete(String tableNameWithType,
Set<String> segmentBatchToCommit) {
+
+ try {
+ Thread.sleep(FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS);
+ } catch (InterruptedException ignored) {
+ }
+
+ int attemptCount = 0;
+ final Set<String>[] segmentsYetToBeCommitted = new Set[]{new HashSet<>()};
Review Comment:
```suggestion
final Set<String>[] segmentsYetToBeCommitted = new Set[1];
```
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1848,15 +1860,122 @@ private boolean isTmpAndCanDelete(String filePath,
Set<String> downloadUrls, Pin
* @return the set of consuming segments for which commit was initiated
*/
public Set<String> forceCommit(String tableNameWithType, @Nullable String
partitionGroupIdsToCommit,
- @Nullable String segmentsToCommit) {
+ @Nullable String segmentsToCommit, int batchSize) {
IdealState idealState = getIdealState(tableNameWithType);
Set<String> allConsumingSegments = findConsumingSegments(idealState);
Set<String> targetConsumingSegments =
filterSegmentsToCommit(allConsumingSegments, partitionGroupIdsToCommit,
segmentsToCommit);
- sendForceCommitMessageToServers(tableNameWithType,
targetConsumingSegments);
+
+ List<Set<String>> segmentBatchList = getSegmentBatchList(idealState,
targetConsumingSegments, batchSize);
+
+ _forceCommitExecutorService.submit(() ->
processBatchesSequentially(segmentBatchList, tableNameWithType));
+
return targetConsumingSegments;
}
+ private void processBatchesSequentially(List<Set<String>> segmentBatchList,
String tableNameWithType) {
+ Set<String> prevBatch = null;
+ for (Set<String> segmentBatchToCommit: segmentBatchList) {
+ if (prevBatch != null) {
+ waitUntilPrevBatchIsComplete(tableNameWithType, prevBatch);
+ }
+ sendForceCommitMessageToServers(tableNameWithType, segmentBatchToCommit);
+ prevBatch = segmentBatchToCommit;
+ }
+ }
+
+ private void waitUntilPrevBatchIsComplete(String tableNameWithType,
Set<String> segmentBatchToCommit) {
+
+ try {
+ Thread.sleep(FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS);
+ } catch (InterruptedException ignored) {
+ }
+
+ int attemptCount = 0;
+ final Set<String>[] segmentsYetToBeCommitted = new Set[]{new HashSet<>()};
+ try {
+ attemptCount = DEFAULT_RETRY_POLICY.attempt(() -> {
+ segmentsYetToBeCommitted[0] =
getSegmentsYetToBeCommitted(tableNameWithType, segmentBatchToCommit);
+ return segmentsYetToBeCommitted[0].isEmpty();
+ });
+ } catch (AttemptsExceededException | RetriableOperationException e) {
+ String errorMsg = String.format(
+ "Exception occurred while executing the forceCommit batch of
segments: %s, attempt count: %d, "
+ + "segmentsYetToBeCommitted: %s",
+ segmentBatchToCommit, attemptCount, segmentsYetToBeCommitted[0]);
+ LOGGER.error(errorMsg, e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @VisibleForTesting
+ List<Set<String>> getSegmentBatchList(IdealState idealState, Set<String>
targetConsumingSegments,
+ int batchSize) {
+ Map<String, Queue<String>> instanceToConsumingSegments =
+ getInstanceToConsumingSegments(idealState, targetConsumingSegments);
+
+ List<Set<String>> segmentBatchList = new ArrayList<>();
+ Set<String> currentBatch = new HashSet<>();
+ Set<String> segmentsAdded = new HashSet<>();
+ boolean segmentsRemaining = true;
+
+ while (segmentsRemaining) {
+ segmentsRemaining = false;
+ // pick segments in round-robin fashion to parallelize
+ // forceCommit across max servers
+ for (Queue<String> queue : instanceToConsumingSegments.values()) {
+ if (!queue.isEmpty()) {
+ segmentsRemaining = true;
+ String segmentName = queue.poll();
+ // there might be a segment replica hosted on
+ // another instance added before
+ if (segmentsAdded.contains(segmentName)) {
+ continue;
+ }
+ currentBatch.add(segmentName);
+ segmentsAdded.add(segmentName);
+ if (currentBatch.size() == batchSize) {
+ segmentBatchList.add(currentBatch);
+ currentBatch = new HashSet<>();
+ }
+ }
+ }
+ }
+
+ if (!currentBatch.isEmpty()) {
+ segmentBatchList.add(currentBatch);
+ }
+ return segmentBatchList;
+ }
+
+ @VisibleForTesting
+ Map<String, Queue<String>> getInstanceToConsumingSegments(IdealState
idealState,
+ Set<String> targetConsumingSegments) {
+ Map<String, Queue<String>> instanceToConsumingSegments = new HashMap<>();
+
+ Map<String, Map<String, String>> segmentNameToInstanceToStateMap =
idealState.getRecord().getMapFields();
+ for (String segmentName : segmentNameToInstanceToStateMap.keySet()) {
+ if (!targetConsumingSegments.contains(segmentName)) {
+ continue;
+ }
+ Map<String, String> instanceToStateMap =
segmentNameToInstanceToStateMap.get(segmentName);
+ for (String instance : instanceToStateMap.keySet()) {
+ String state = instanceToStateMap.get(instance);
Review Comment:
Use `entrySet()` to reduce lookup
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]