kfaraz commented on code in PR #13197: URL: https://github.com/apache/druid/pull/13197#discussion_r1010208794
########## server/src/main/java/org/apache/druid/server/coordinator/SegmentLoader.java: ########## @@ -0,0 +1,460 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator; + +import com.google.common.collect.Sets; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.timeline.DataSegment; + +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +/** + * Used by the coordinator in each run for segment loading, dropping, balancing + * and broadcasting. + * <p> + * An instance of this class is freshly created for each coordinator run. + */ +public class SegmentLoader +{ + private static final EmittingLogger log = new EmittingLogger(SegmentLoader.class); + + private final SegmentStateManager stateManager; + private final DruidCluster cluster; + private final CoordinatorStats stats = new CoordinatorStats(); + private final SegmentReplicantLookup replicantLookup; + private final ReplicationThrottler replicationThrottler; + private final BalancerStrategy strategy; + + private final Set<String> emptyTiers = new HashSet<>(); + + public SegmentLoader( + SegmentStateManager stateManager, + DruidCluster cluster, + SegmentReplicantLookup replicantLookup, + ReplicationThrottler replicationThrottler, + BalancerStrategy strategy + ) + { + this.cluster = cluster; + this.strategy = strategy; + this.stateManager = stateManager; + this.replicantLookup = replicantLookup; + this.replicationThrottler = replicationThrottler; + } + + public CoordinatorStats getStats() + { + return stats; + } + + public void makeAlerts() + { + if (!emptyTiers.isEmpty()) { + log.makeAlert("Tiers %s have no servers! Check your cluster configuration.", emptyTiers).emit(); + } + } + + /** + * Moves the given segment between two servers of the same tier. + * <p> + * See if we can move balancing here. + */ + public boolean moveSegment(DataSegment segment, ServerHolder fromServer, ServerHolder toServer) + { + final String tier = toServer.getServer().getTier(); + if (!fromServer.getServer().getTier().equals(tier)) { + return false; + } + + if (fromServer.isServingSegment(segment)) { + // Segment is loaded on fromServer, move it to toServer + return stateManager.moveSegment(segment, fromServer, toServer, replicationThrottler.getMaxLifetime()); + } else if (!fromServer.isLoadingSegment(segment)) { Review Comment: Yeah, it does make reading a little difficult. Fixed this here and a couple other places where I had used them. ########## server/src/main/java/org/apache/druid/server/coordinator/SegmentHolder.java: ########## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Ordering; +import org.apache.druid.server.coordination.DataSegmentChangeRequest; +import org.apache.druid.server.coordination.SegmentChangeRequestDrop; +import org.apache.druid.server.coordination.SegmentChangeRequestLoad; +import org.apache.druid.timeline.DataSegment; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Represents a segment queued for a load or drop operation in a LoadQueuePeon. + * <p> + * Requests are naturally ordered using the {@link #COMPARE_ACTION_THEN_INTERVAL}. + */ +public class SegmentHolder implements Comparable<SegmentHolder> +{ + /** + * Orders segment requests: + * <ul> + * <li>first by action: all drops, then all loads, then all moves</li> + * <li>then by interval: newest segments first</li> + * </ul> + */ + public static final Comparator<SegmentHolder> COMPARE_ACTION_THEN_INTERVAL = + Ordering.explicit(SegmentAction.DROP, SegmentAction.LOAD, SegmentAction.REPLICATE, SegmentAction.MOVE_TO) + .onResultOf(SegmentHolder::getAction) + .compound(DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST.onResultOf(SegmentHolder::getSegment)); + + private final DataSegment segment; + private final DataSegmentChangeRequest changeRequest; + private final SegmentAction action; + + // Guaranteed to store only non-null elements + private final List<LoadPeonCallback> callbacks = new ArrayList<>(); + private final AtomicLong firstRequestMillis = new AtomicLong(0); + + SegmentHolder( + DataSegment segment, + SegmentAction action, + @Nullable LoadPeonCallback callback + ) + { + this.segment = segment; + this.action = action; + this.changeRequest = (action == SegmentAction.DROP) + ? new SegmentChangeRequestDrop(segment) + : new SegmentChangeRequestLoad(segment); + if (callback != null) { + callbacks.add(callback); + } + } + + public DataSegment getSegment() + { + return segment; + } + + public SegmentAction getAction() + { + return action; + } + + public boolean isLoad() + { + return action != SegmentAction.DROP; + } + + public DataSegmentChangeRequest getChangeRequest() + { + return changeRequest; + } + + public String getSegmentIdentifier() + { + return segment.getId().toString(); + } + + public void addCallback(@Nullable LoadPeonCallback callback) + { + if (callback != null) { + synchronized (callbacks) { + callbacks.add(callback); + } + } + } + + /** + * Returns an immutable copy of all non-null callbacks for this queued segment. + */ + public List<LoadPeonCallback> getCallbacks() + { + synchronized (callbacks) { + return ImmutableList.copyOf(callbacks); + } + } + + public void markRequestSentToServer() + { + firstRequestMillis.compareAndSet(0L, System.currentTimeMillis()); + } + + public boolean isRequestSentToServer() + { + return firstRequestMillis.get() > 0; + } + + public long getMillisSinceFirstRequestToServer() Review Comment: Thanks for the suggestion! ########## server/src/main/java/org/apache/druid/server/coordinator/SegmentAction.java: ########## @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator; + +/** + * Represents actions that can be performed on a server for a single segment. + * <p> + * The different action types can be used to prioritize items in a LoadQueuePeon. + */ +public enum SegmentAction +{ + DROP, + LOAD, + REPLICATE, + MOVE_TO, Review Comment: Added. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
