kfaraz commented on code in PR #13197:
URL: https://github.com/apache/druid/pull/13197#discussion_r1231759286
##########
server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java:
##########
@@ -155,128 +211,161 @@ public long getAvailableSize()
* <p>
* A load is possible only if the server meets all of the following criteria:
* <ul>
- * <li>is not already serving or loading the segment</li>
* <li>is not being decommissioned</li>
+ * <li>is not already serving the segment</li>
+ * <li>is not performing any other action on the segment</li>
* <li>has not already exceeded the load queue limit in this run</li>
* <li>has available disk space</li>
* </ul>
*/
public boolean canLoadSegment(DataSegment segment)
{
- final SegmentState state = getSegmentState(segment);
return !isDecommissioning
- && (maxSegmentsInLoadQueue == 0 || maxSegmentsInLoadQueue >
segmentsQueuedForLoad)
- && getAvailableSize() >= segment.getSize()
- && state == SegmentState.NONE;
+ && !hasSegmentLoaded(segment.getId())
+ && getActionOnSegment(segment) == null
+ && totalAssignmentsInRun < maxAssignmentsInRun
+ && getAvailableSize() >= segment.getSize();
}
- public SegmentState getSegmentState(DataSegment segment)
+ public SegmentAction getActionOnSegment(DataSegment segment)
{
- SegmentState state = queuedSegments.get(segment.getId());
- if (state != null) {
- return state;
- }
-
- return isServingSegment(segment) ? SegmentState.LOADED : SegmentState.NONE;
+ return queuedSegments.get(segment);
}
/**
* Segments queued for load, drop or move on this server.
+ * <ul>
+ * <li>Contains segments present in the queue when the current coordinator
run started.</li>
+ * <li>Contains segments added to the queue during the current run.</li>
+ * <li>Maps replicating segments to LOAD rather than REPLICATE for
simplicity.</li>
+ * <li>Does not contain segments whose actions were cancelled.</li>
+ * </ul>
*/
- public Map<SegmentId, SegmentState> getQueuedSegments()
+ public Map<DataSegment, SegmentAction> getQueuedSegments()
{
- return Collections.unmodifiableMap(queuedSegments);
+ return new HashMap<>(queuedSegments);
}
+ /**
+ * Segments that are expected to be loaded on this server once all the
+ * operations in progress have completed.
+ */
+ public Set<DataSegment> getProjectedSegments()
+ {
+ return projectedSegments;
+ }
+
+ /**
+ * Segments that are currently in the queue for being loaded on this server.
+ * This does not include segments that are being moved to this server.
+ */
+ public List<DataSegment> getLoadingSegments()
+ {
+ final List<DataSegment> loadingSegments = new ArrayList<>();
+ queuedSegments.forEach((segment, action) -> {
+ if (action == SegmentAction.LOAD) {
+ loadingSegments.add(segment);
+ }
+ });
+
+ return loadingSegments;
+ }
+
+ /**
+ * Returns true if this server has the segment loaded and is not dropping it.
+ */
public boolean isServingSegment(DataSegment segment)
{
- return isServingSegment(segment.getId());
+ return hasSegmentLoaded(segment.getId()) && getActionOnSegment(segment) ==
null;
}
public boolean isLoadingSegment(DataSegment segment)
{
- return getSegmentState(segment) == SegmentState.LOADING;
+ return getActionOnSegment(segment) == SegmentAction.LOAD;
}
public boolean isDroppingSegment(DataSegment segment)
{
- return getSegmentState(segment) == SegmentState.DROPPING;
+ return getActionOnSegment(segment) == SegmentAction.DROP;
}
- public boolean startOperation(DataSegment segment, SegmentState newState)
+ public int getNumMovingSegments()
{
- if (queuedSegments.containsKey(segment.getId())) {
- return false;
- }
+ return movingSegmentCount;
+ }
- if (newState == SegmentState.LOADING || newState ==
SegmentState.MOVING_TO) {
- ++segmentsQueuedForLoad;
- sizeOfLoadingSegments += segment.getSize();
- }
- queuedSegments.put(segment.getId(), newState);
- return true;
+ public int getNumLoadingReplicas()
+ {
+ return loadingReplicaCount;
}
- public boolean cancelOperation(DataSegment segment, SegmentState
currentState)
+ public boolean startOperation(SegmentAction action, DataSegment segment)
{
- SegmentState observedState = queuedSegments.get(segment.getId());
- if (observedState != currentState) {
+ if (queuedSegments.containsKey(segment)) {
return false;
}
- if (currentState == SegmentState.LOADING || currentState ==
SegmentState.MOVING_TO) {
- --segmentsQueuedForLoad;
- sizeOfLoadingSegments -= segment.getSize();
+ if (action.isLoad()) {
+ ++totalAssignmentsInRun;
}
- queuedSegments.remove(segment.getId());
+
+ updateQueuedSegments(segment, simplify(action), true);
return true;
}
- public boolean isServingSegment(SegmentId segmentId)
+ public boolean cancelOperation(SegmentAction action, DataSegment segment)
+ {
+ final SegmentAction queuedAction = queuedSegments.get(segment);
+ return queuedAction == simplify(action)
+ && (queuedAction == SegmentAction.MOVE_FROM ||
peon.cancelOperation(segment))
+ && updateQueuedSegments(segment, queuedAction, false);
+ }
+
+ public boolean hasSegmentLoaded(SegmentId segmentId)
{
return server.getSegment(segmentId) != null;
}
- /**
- * Checks if the server can load the given segment.
- * <p>
- * A load is possible only if the server meets all of the following criteria:
- * <ul>
- * <li>is not being decommissioned</li>
- * <li>is not already serving the segment</li>
- * <li>is not performing any other action on the segment</li>
- * <li>has not already exceeded the load queue limit in this run</li>
- * <li>has available disk space</li>
- * </ul>
- */
- public boolean canLoadSegment(DataSegment segment)
+ public boolean isRealtimeServer()
{
- return !isDecommissioning
- && !isServingSegment(segment.getId())
- && !isLoadingSegment(segment)
- && (maxSegmentsInLoadQueue == 0 || maxSegmentsInLoadQueue >
peon.getNumberOfSegmentsInQueue())
- && getAvailableSize() >= segment.getSize();
+ return server.getType() == ServerType.REALTIME
+ || server.getType() == ServerType.INDEXER_EXECUTOR;
}
- @Override
- public int compareTo(ServerHolder serverHolder)
+ private SegmentAction simplify(SegmentAction action)
{
- int result = Long.compare(getAvailableSize(),
serverHolder.getAvailableSize());
- if (result != 0) {
- return result;
+ return action == SegmentAction.REPLICATE ? SegmentAction.LOAD : action;
+ }
+
+ private boolean updateQueuedSegments(DataSegment segment, SegmentAction
action, boolean addToQueue)
+ {
+ if (addToQueue) {
+ queuedSegments.put(segment, action);
+ } else {
+ queuedSegments.remove(segment);
}
- result = server.getHost().compareTo(serverHolder.server.getHost());
- if (result != 0) {
- return result;
+ final long sizeDelta = addToQueue ? segment.getSize() : -segment.getSize();
+ if (action.isLoad()) {
+ sizeOfLoadingSegments += sizeDelta;
+ } else if (action == SegmentAction.DROP) {
+ sizeOfDroppingSegments += sizeDelta;
}
- result = server.getTier().compareTo(serverHolder.server.getTier());
- if (result != 0) {
- return result;
+ // Remove from projected if load is cancelled or drop is started, add
otherwise
+ if (addToQueue ^ action.isLoad()) {
+ projectedSegments.remove(segment);
+ } else {
+ projectedSegments.add(segment);
}
- return server.getType().compareTo(serverHolder.server.getType());
+ return true;
+ }
Review Comment:
Yeah, the boolean here is stupid, it's always `true`, must have been doing
some other things originally. With the recent changes, it would have become
redundant. Can get rid of it without breaking up the method too.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]