kfaraz commented on code in PR #13197:
URL: https://github.com/apache/druid/pull/13197#discussion_r1231770485
##########
server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java:
##########
@@ -125,49 +117,85 @@ public int getMovingReplicas(SegmentId segmentId, String
tier)
* Number of replicas of the segment which are safely loaded on the given
tier
* and are not being dropped.
*/
- public int getLoadedReplicas(SegmentId segmentId, String tier)
+ public int getLoadedNotDroppingReplicas(SegmentId segmentId, String tier)
+ {
+ ReplicaCount count = replicaCounts.get(segmentId, tier);
+ return (count == null) ? 0 : count.loadedNotDropping();
+ }
+
+ public int getLoadingReplicas(SegmentId segmentId, String tier)
{
ReplicaCount count = replicaCounts.get(segmentId, tier);
- return (count == null) ? 0 : count.safelyLoaded();
+ return count == null ? 0 : count.loading;
}
/**
- * Number of replicas of the segment which are safely loaded on the cluster
- * and are not being dropped.
+ * Number of replicas of the segment which are loaded on the cluster.
+ *
+ * @param includeDropping Whether segments which are being dropped should be
+ * included in the total count.
*/
- public int getTotalLoadedReplicas(SegmentId segmentId)
+ public int getLoadedReplicas(SegmentId segmentId, boolean includeDropping)
{
final Map<String, ReplicaCount> allTiers = replicaCounts.row(segmentId);
int totalLoaded = 0;
for (ReplicaCount count : allTiers.values()) {
- totalLoaded += count.safelyLoaded();
+ totalLoaded += includeDropping ? count.loaded :
count.loadedNotDropping();
}
return totalLoaded;
}
- public Object2LongMap<String> getBroadcastUnderReplication(SegmentId
segmentId)
+ /**
+ * Sets the number of replicas required for the specified segment in the
tier.
+ * In a given coordinator run, this method must be called atleast once for
+ * every segment every tier.
+ */
+ public void setRequiredReplicas(SegmentId segmentId, boolean isBroadcast,
String tier, int requiredReplicas)
{
- Object2LongOpenHashMap<String> perTier = new Object2LongOpenHashMap<>();
- for (ServerHolder holder : cluster.getAllServers()) {
- // Only record tier entry for server that is segment broadcast target
- if (holder.getServer().getType().isSegmentBroadcastTarget()) {
- // Every broadcast target server should be serving 1 replica of the
segment
- if (!holder.isServingSegment(segmentId)) {
- perTier.addTo(holder.getServer().getTier(), 1L);
- } else {
- perTier.putIfAbsent(holder.getServer().getTier(), 0);
- }
+ ReplicaCount counts = computeIfAbsent(replicaCounts, segmentId, tier);
+ counts.required = requiredReplicas;
+ if (isBroadcast) {
+ counts.possible = requiredReplicas;
+ } else {
+ counts.possible = tierToHistoricalCount.getOrDefault(tier, 0);
+ }
+ }
+
+ public Map<String, Object2LongMap<String>>
getTierToDatasourceToUnderReplicated(
+ Iterable<DataSegment> usedSegments,
+ boolean ignoreMissingServers
+ )
+ {
+ final Map<String, Object2LongMap<String>> tierToUnderReplicated = new
HashMap<>();
+
+ for (DataSegment segment : usedSegments) {
+ final Map<String, ReplicaCount> tierToReplicaCount =
replicaCounts.row(segment.getId());
+ if (tierToReplicaCount == null) {
+ continue;
}
+
+ tierToReplicaCount.forEach((tier, counts) -> {
+ final int underReplicated =
counts.underReplicated(ignoreMissingServers);
+ if (underReplicated >= 0) {
+ Object2LongOpenHashMap<String> datasourceToUnderReplicated =
(Object2LongOpenHashMap<String>)
+ tierToUnderReplicated.computeIfAbsent(tier, ds -> new
Object2LongOpenHashMap<>());
+ datasourceToUnderReplicated.addTo(segment.getDataSource(),
underReplicated);
+ }
+ });
}
- return perTier;
+
+ return tierToUnderReplicated;
}
Review Comment:
Hmm, let me see what can be done here.
It turned out like this mostly because it is a direct translation of the
flag `computeUsingClusterView` passed in from `OverlordResource` to
`DruidCoordinator` to `SegmentReplicantLookup`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]