This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new b23ddc5939 print replication levels in coordinator segment logs
(#12511)
b23ddc5939 is described below
commit b23ddc5939d2537a51896a5082c610bf2c2bee9e
Author: Clint Wylie <[email protected]>
AuthorDate: Tue May 17 02:24:13 2022 -0700
print replication levels in coordinator segment logs (#12511)
* print replication levels in coordinator segment logs
* add served segment count to stats
* also for drops
---
.../duty/EmitClusterStatsAndMetrics.java | 3 +-
.../druid/server/coordinator/rules/LoadRule.java | 79 ++++++++++++++++------
2 files changed, 59 insertions(+), 23 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
index 71a18ce394..71e0d8ab5c 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
@@ -240,12 +240,13 @@ public class EmitClusterStatsAndMetrics implements
CoordinatorDuty
ImmutableDruidServer server = serverHolder.getServer();
LoadQueuePeon queuePeon = serverHolder.getPeon();
log.info(
- "Server[%s, %s, %s] has %,d left to load, %,d left to drop, %,d
bytes queued, %,d bytes served.",
+ "Server[%s, %s, %s] has %,d left to load, %,d left to drop, %,d
served, %,d bytes queued, %,d bytes served.",
server.getName(),
server.getType().toString(),
server.getTier(),
queuePeon.getSegmentsToLoad().size(),
queuePeon.getSegmentsToDrop().size(),
+ server.getNumSegments(),
queuePeon.getLoadQueueSize(),
server.getCurrSize()
);
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java
b/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java
index e34cd98772..9e6dd04a38 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java
@@ -239,10 +239,10 @@ public abstract class LoadRule implements Rule
final String tier = entry.getKey();
String noAvailability = StringUtils.format(
- "No available [%s] servers or node capacity to assign primary
segment[%s]! Expected Replicants[%d]",
+ "No available [%s] servers or node capacity to assign primary
segment [%s]! %s",
tier,
segment.getId(),
- targetReplicantsInTier
+ getReplicationLogString()
);
final List<ServerHolder> holders = getFilteredHolders(
@@ -299,7 +299,7 @@ public abstract class LoadRule implements Rule
for (final Object2IntMap.Entry<String> entry :
targetReplicants.object2IntEntrySet()) {
final String tier = entry.getKey();
if (tier.equals(tierToSkip)) {
- log.info("Skipping replica assignment for tier [%s]", tier);
+ log.info("Skipping replica assignment for segment [%s] to tier [%s]",
segment.getId(), tier);
continue;
}
final int numAssigned = assignReplicasForTier(
@@ -334,10 +334,10 @@ public abstract class LoadRule implements Rule
}
String noAvailability = StringUtils.format(
- "No available [%s] servers or node capacity to assign segment[%s]!
Expected Replicants[%d]",
+ "No available [%s] servers or node capacity to assign segment [%s]!
%s",
tier,
segment.getId(),
- targetReplicantsInTier
+ getReplicationLogString()
);
final List<ServerHolder> holders = getFilteredHolders(tier,
params.getDruidCluster(), predicate);
@@ -350,7 +350,7 @@ public abstract class LoadRule implements Rule
final ReplicationThrottler throttler = params.getReplicationManager();
for (int numAssigned = 0; numAssigned < numToAssign; numAssigned++) {
if (!throttler.canCreateReplicant(tier)) {
- log.info("Throttling replication for segment [%s] in tier [%s]",
segment.getId(), tier);
+ log.info("Throttling replication for segment [%s] in tier [%s]. %s",
segment.getId(), tier, getReplicationLogString());
return numAssigned;
}
@@ -371,10 +371,11 @@ public abstract class LoadRule implements Rule
final String holderHost = holder.getServer().getHost();
throttler.registerReplicantCreation(tier, segmentId, holderHost);
log.info(
- "Assigning 'replica' for segment [%s] to server [%s] in tier [%s]",
+ "Assigning 'replica' for segment [%s] to server [%s] in tier [%s].
%s",
segment.getId(),
holder.getServer().getName(),
- holder.getServer().getTier()
+ holder.getServer().getTier(),
+ getReplicationLogString()
);
holder.getPeon().loadSegment(segment, () ->
throttler.unregisterReplicantCreation(tier, segmentId));
}
@@ -393,11 +394,8 @@ public abstract class LoadRule implements Rule
{
final DruidCluster druidCluster = params.getDruidCluster();
- // This enforces that loading is completed before we attempt to drop
stuffs as a safety measure.
- if (loadingInProgress(druidCluster)) {
- log.info("Loading in progress, skipping drop until loading is complete");
- return;
- }
+
+ final boolean isLoading = loadingInProgress(druidCluster);
for (final Object2IntMap.Entry<String> entry :
currentReplicants.object2IntEntrySet()) {
final String tier = entry.getKey();
@@ -412,7 +410,23 @@ public abstract class LoadRule implements Rule
final int currentReplicantsInTier = entry.getIntValue();
final int numToDrop = currentReplicantsInTier -
targetReplicants.getOrDefault(tier, 0);
if (numToDrop > 0) {
- numDropped = dropForTier(numToDrop, holders, segment,
params.getBalancerStrategy());
+ // This enforces that loading is completed before we attempt to drop
stuffs as a safety measure.
+ if (isLoading) {
+ log.info(
+ "Loading in progress for segment [%s], skipping drop from tier
[%s] until loading is complete! %s",
+ segment.getId(),
+ tier,
+ getReplicationLogString()
+ );
+ break;
+ }
+ numDropped = dropForTier(
+ numToDrop,
+ holders,
+ segment,
+ params.getBalancerStrategy(),
+ getReplicationLogString()
+ );
} else {
numDropped = 0;
}
@@ -442,7 +456,8 @@ public abstract class LoadRule implements Rule
final int numToDrop,
final NavigableSet<ServerHolder> holdersInTier,
final DataSegment segment,
- final BalancerStrategy balancerStrategy
+ final BalancerStrategy balancerStrategy,
+ final String replicationLog
)
{
Map<Boolean, TreeSet<ServerHolder>> holders = holdersInTier.stream()
@@ -453,9 +468,9 @@ public abstract class LoadRule implements Rule
));
TreeSet<ServerHolder> decommissioningServers = holders.get(true);
TreeSet<ServerHolder> activeServers = holders.get(false);
- int left = dropSegmentFromServers(balancerStrategy, segment,
decommissioningServers, numToDrop);
+ int left = dropSegmentFromServers(balancerStrategy, segment,
decommissioningServers, numToDrop, replicationLog);
if (left > 0) {
- left = dropSegmentFromServers(balancerStrategy, segment, activeServers,
left);
+ left = dropSegmentFromServers(balancerStrategy, segment, activeServers,
left, replicationLog);
}
if (left != 0) {
log.warn("I have no servers serving [%s]?", segment.getId());
@@ -464,9 +479,11 @@ public abstract class LoadRule implements Rule
}
private static int dropSegmentFromServers(
- BalancerStrategy balancerStrategy,
- DataSegment segment,
- NavigableSet<ServerHolder> holders, int numToDrop
+ final BalancerStrategy balancerStrategy,
+ final DataSegment segment,
+ final NavigableSet<ServerHolder> holders,
+ int numToDrop,
+ final String replicationLog
)
{
final Iterator<ServerHolder> iterator =
balancerStrategy.pickServersToDrop(segment, holders);
@@ -479,10 +496,11 @@ public abstract class LoadRule implements Rule
final ServerHolder holder = iterator.next();
if (holder.isServingSegment(segment)) {
log.info(
- "Dropping segment [%s] on server [%s] in tier [%s]",
+ "Dropping segment [%s] on server [%s] in tier [%s]. %s",
segment.getId(),
holder.getServer().getName(),
- holder.getServer().getTier()
+ holder.getServer().getTier(),
+ replicationLog
);
holder.getPeon().dropSegment(segment, null);
numToDrop--;
@@ -515,4 +533,21 @@ public abstract class LoadRule implements Rule
public abstract Map<String, Integer> getTieredReplicants();
public abstract int getNumReplicants(String tier);
+
+ protected String getReplicationLogString()
+ {
+ StringBuilder builder = new StringBuilder("Current replication: [");
+ for (final Object2IntMap.Entry<String> entry :
currentReplicants.object2IntEntrySet()) {
+ final String tier = entry.getKey();
+ // [hot:1/2][cold:2/2]
+ builder.append("[")
+ .append(tier)
+ .append(":")
+ .append(entry.getIntValue())
+ .append("/")
+ .append(targetReplicants.getInt(tier))
+ .append("]");
+ }
+ return builder.append("]").toString();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]