This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new b23ddc5939 print replication levels in coordinator segment logs 
(#12511)
b23ddc5939 is described below

commit b23ddc5939d2537a51896a5082c610bf2c2bee9e
Author: Clint Wylie <[email protected]>
AuthorDate: Tue May 17 02:24:13 2022 -0700

    print replication levels in coordinator segment logs (#12511)
    
    * print replication levels in coordinator segment logs
    
    * add served segment count to stats
    
    * also for drops
---
 .../duty/EmitClusterStatsAndMetrics.java           |  3 +-
 .../druid/server/coordinator/rules/LoadRule.java   | 79 ++++++++++++++++------
 2 files changed, 59 insertions(+), 23 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
index 71a18ce394..71e0d8ab5c 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
@@ -240,12 +240,13 @@ public class EmitClusterStatsAndMetrics implements 
CoordinatorDuty
         ImmutableDruidServer server = serverHolder.getServer();
         LoadQueuePeon queuePeon = serverHolder.getPeon();
         log.info(
-            "Server[%s, %s, %s] has %,d left to load, %,d left to drop, %,d 
bytes queued, %,d bytes served.",
+            "Server[%s, %s, %s] has %,d left to load, %,d left to drop, %,d 
served, %,d bytes queued, %,d bytes served.",
             server.getName(),
             server.getType().toString(),
             server.getTier(),
             queuePeon.getSegmentsToLoad().size(),
             queuePeon.getSegmentsToDrop().size(),
+            server.getNumSegments(),
             queuePeon.getLoadQueueSize(),
             server.getCurrSize()
         );
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java
index e34cd98772..9e6dd04a38 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java
@@ -239,10 +239,10 @@ public abstract class LoadRule implements Rule
       final String tier = entry.getKey();
 
       String noAvailability = StringUtils.format(
-          "No available [%s] servers or node capacity to assign primary 
segment[%s]! Expected Replicants[%d]",
+          "No available [%s] servers or node capacity to assign primary 
segment [%s]! %s",
           tier,
           segment.getId(),
-          targetReplicantsInTier
+          getReplicationLogString()
       );
 
       final List<ServerHolder> holders = getFilteredHolders(
@@ -299,7 +299,7 @@ public abstract class LoadRule implements Rule
     for (final Object2IntMap.Entry<String> entry : 
targetReplicants.object2IntEntrySet()) {
       final String tier = entry.getKey();
       if (tier.equals(tierToSkip)) {
-        log.info("Skipping replica assignment for tier [%s]", tier);
+        log.info("Skipping replica assignment for segment [%s] to tier [%s]", 
segment.getId(), tier);
         continue;
       }
       final int numAssigned = assignReplicasForTier(
@@ -334,10 +334,10 @@ public abstract class LoadRule implements Rule
     }
 
     String noAvailability = StringUtils.format(
-        "No available [%s] servers or node capacity to assign segment[%s]! 
Expected Replicants[%d]",
+        "No available [%s] servers or node capacity to assign segment [%s]! 
%s",
         tier,
         segment.getId(),
-        targetReplicantsInTier
+        getReplicationLogString()
     );
 
     final List<ServerHolder> holders = getFilteredHolders(tier, 
params.getDruidCluster(), predicate);
@@ -350,7 +350,7 @@ public abstract class LoadRule implements Rule
     final ReplicationThrottler throttler = params.getReplicationManager();
     for (int numAssigned = 0; numAssigned < numToAssign; numAssigned++) {
       if (!throttler.canCreateReplicant(tier)) {
-        log.info("Throttling replication for segment [%s] in tier [%s]", 
segment.getId(), tier);
+        log.info("Throttling replication for segment [%s] in tier [%s]. %s", 
segment.getId(), tier, getReplicationLogString());
         return numAssigned;
       }
 
@@ -371,10 +371,11 @@ public abstract class LoadRule implements Rule
       final String holderHost = holder.getServer().getHost();
       throttler.registerReplicantCreation(tier, segmentId, holderHost);
       log.info(
-          "Assigning 'replica' for segment [%s] to server [%s] in tier [%s]",
+          "Assigning 'replica' for segment [%s] to server [%s] in tier [%s]. 
%s",
           segment.getId(),
           holder.getServer().getName(),
-          holder.getServer().getTier()
+          holder.getServer().getTier(),
+          getReplicationLogString()
       );
       holder.getPeon().loadSegment(segment, () -> 
throttler.unregisterReplicantCreation(tier, segmentId));
     }
@@ -393,11 +394,8 @@ public abstract class LoadRule implements Rule
   {
     final DruidCluster druidCluster = params.getDruidCluster();
 
-    // This enforces that loading is completed before we attempt to drop 
stuffs as a safety measure.
-    if (loadingInProgress(druidCluster)) {
-      log.info("Loading in progress, skipping drop until loading is complete");
-      return;
-    }
+
+    final boolean isLoading = loadingInProgress(druidCluster);
 
     for (final Object2IntMap.Entry<String> entry : 
currentReplicants.object2IntEntrySet()) {
       final String tier = entry.getKey();
@@ -412,7 +410,23 @@ public abstract class LoadRule implements Rule
         final int currentReplicantsInTier = entry.getIntValue();
         final int numToDrop = currentReplicantsInTier - 
targetReplicants.getOrDefault(tier, 0);
         if (numToDrop > 0) {
-          numDropped = dropForTier(numToDrop, holders, segment, 
params.getBalancerStrategy());
+          // This enforces that loading is completed before we attempt to drop 
stuffs as a safety measure.
+          if (isLoading) {
+            log.info(
+                "Loading in progress for segment [%s], skipping drop from tier 
[%s] until loading is complete! %s",
+                segment.getId(),
+                tier,
+                getReplicationLogString()
+            );
+            break;
+          }
+          numDropped = dropForTier(
+              numToDrop,
+              holders,
+              segment,
+              params.getBalancerStrategy(),
+              getReplicationLogString()
+          );
         } else {
           numDropped = 0;
         }
@@ -442,7 +456,8 @@ public abstract class LoadRule implements Rule
       final int numToDrop,
       final NavigableSet<ServerHolder> holdersInTier,
       final DataSegment segment,
-      final BalancerStrategy balancerStrategy
+      final BalancerStrategy balancerStrategy,
+      final String replicationLog
   )
   {
     Map<Boolean, TreeSet<ServerHolder>> holders = holdersInTier.stream()
@@ -453,9 +468,9 @@ public abstract class LoadRule implements Rule
                                                                ));
     TreeSet<ServerHolder> decommissioningServers = holders.get(true);
     TreeSet<ServerHolder> activeServers = holders.get(false);
-    int left = dropSegmentFromServers(balancerStrategy, segment, 
decommissioningServers, numToDrop);
+    int left = dropSegmentFromServers(balancerStrategy, segment, 
decommissioningServers, numToDrop, replicationLog);
     if (left > 0) {
-      left = dropSegmentFromServers(balancerStrategy, segment, activeServers, 
left);
+      left = dropSegmentFromServers(balancerStrategy, segment, activeServers, 
left, replicationLog);
     }
     if (left != 0) {
       log.warn("I have no servers serving [%s]?", segment.getId());
@@ -464,9 +479,11 @@ public abstract class LoadRule implements Rule
   }
 
   private static int dropSegmentFromServers(
-      BalancerStrategy balancerStrategy,
-      DataSegment segment,
-      NavigableSet<ServerHolder> holders, int numToDrop
+      final BalancerStrategy balancerStrategy,
+      final DataSegment segment,
+      final NavigableSet<ServerHolder> holders,
+      int numToDrop,
+      final String replicationLog
   )
   {
     final Iterator<ServerHolder> iterator = 
balancerStrategy.pickServersToDrop(segment, holders);
@@ -479,10 +496,11 @@ public abstract class LoadRule implements Rule
       final ServerHolder holder = iterator.next();
       if (holder.isServingSegment(segment)) {
         log.info(
-            "Dropping segment [%s] on server [%s] in tier [%s]",
+            "Dropping segment [%s] on server [%s] in tier [%s]. %s",
             segment.getId(),
             holder.getServer().getName(),
-            holder.getServer().getTier()
+            holder.getServer().getTier(),
+            replicationLog
         );
         holder.getPeon().dropSegment(segment, null);
         numToDrop--;
@@ -515,4 +533,21 @@ public abstract class LoadRule implements Rule
   public abstract Map<String, Integer> getTieredReplicants();
 
   public abstract int getNumReplicants(String tier);
+
+  protected String getReplicationLogString()
+  {
+    StringBuilder builder = new StringBuilder("Current replication: [");
+    for (final Object2IntMap.Entry<String> entry : 
currentReplicants.object2IntEntrySet()) {
+      final String tier = entry.getKey();
+      // [hot:1/2][cold:2/2]
+      builder.append("[")
+             .append(tier)
+             .append(":")
+             .append(entry.getIntValue())
+             .append("/")
+             .append(targetReplicants.getInt(tier))
+             .append("]");
+    }
+    return builder.append("]").toString();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to