This is an automated email from the ASF dual-hosted git repository.
jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 191572a Add safeguard to make sure new Rules added are aware of Rule
usage in loadstatus API (#10054)
191572a is described below
commit 191572ad5e250628ce7ab27259d1fe175a10b7a6
Author: Maytas Monsereenusorn <[email protected]>
AuthorDate: Fri Jun 19 17:18:56 2020 -1000
Add safeguard to make sure new Rules added are aware of Rule usage in
loadstatus API (#10054)
* Add safeguard to make sure new Rules added are aware of Rule usuage in
loadstatus API
* address comments
* address comments
* add tests
---
.../druid/server/coordinator/DruidCoordinator.java | 42 +++++-----------------
.../server/coordinator/SegmentReplicantLookup.java | 28 +++++++++++++--
.../druid/server/coordinator/ServerHolder.java | 8 ++++-
.../rules/BroadcastDistributionRule.java | 35 ++++++++++++++++++
.../druid/server/coordinator/rules/DropRule.java | 6 ++++
.../druid/server/coordinator/rules/LoadRule.java | 29 +++++++++++++++
.../druid/server/coordinator/rules/Rule.java | 26 ++++++++++++++
.../druid/server/coordinator/ServerHolderTest.java | 22 ++++++++++--
8 files changed, 156 insertions(+), 40 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index 0dcf636..64168e1 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -29,7 +29,6 @@ import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntMaps;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
-import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.ZKPaths;
import org.apache.druid.client.DataSourcesSnapshot;
@@ -69,7 +68,6 @@ import
org.apache.druid.server.coordinator.duty.LogUsedSegments;
import
org.apache.druid.server.coordinator.duty.MarkAsUnusedOvershadowedSegments;
import org.apache.druid.server.coordinator.duty.RunRules;
import org.apache.druid.server.coordinator.duty.UnloadUnusedSegments;
-import org.apache.druid.server.coordinator.rules.BroadcastDistributionRule;
import org.apache.druid.server.coordinator.rules.LoadRule;
import org.apache.druid.server.coordinator.rules.Rule;
import org.apache.druid.server.initialization.ZkPathsConfig;
@@ -275,13 +273,6 @@ public class DruidCoordinator
)
{
final Map<String, Object2LongMap<String>>
underReplicationCountsPerDataSourcePerTier = new HashMap<>();
- final Set<String> decommissioningServers =
getDynamicConfigs().getDecommissioningNodes();
- final List<ImmutableDruidServer> broadcastTargetServers =
serverInventoryView
- .getInventory()
- .stream()
- .filter(druidServer -> druidServer.isSegmentBroadcastTarget() &&
!decommissioningServers.contains(druidServer.getHost()))
- .map(DruidServer::toImmutableDruidServer)
- .collect(Collectors.toList());
if (segmentReplicantLookup == null) {
return underReplicationCountsPerDataSourcePerTier;
@@ -294,36 +285,19 @@ public class DruidCoordinator
for (final Rule rule : rules) {
if (!rule.appliesTo(segment, now)) {
+ // Rule did not match. Continue to the next Rule.
continue;
}
-
- if (rule instanceof LoadRule) {
- ((LoadRule) rule)
- .getTieredReplicants()
- .forEach((final String tier, final Integer ruleReplicants) -> {
- int currentReplicants =
segmentReplicantLookup.getLoadedReplicants(segment.getId(), tier);
- Object2LongMap<String> underReplicationPerDataSource =
underReplicationCountsPerDataSourcePerTier
- .computeIfAbsent(tier, ignored -> new
Object2LongOpenHashMap<>());
- ((Object2LongOpenHashMap<String>)
underReplicationPerDataSource)
- .addTo(segment.getDataSource(), Math.max(ruleReplicants -
currentReplicants, 0));
- });
+ if (!rule.canLoadSegments()) {
+ // Rule matched but rule does not and cannot load segments.
+ // Hence, there is no need to update
underReplicationCountsPerDataSourcePerTier map
+ break;
}
- if (rule instanceof BroadcastDistributionRule) {
- for (ImmutableDruidServer server : broadcastTargetServers) {
- Object2LongMap<String> underReplicationPerDataSource =
underReplicationCountsPerDataSourcePerTier
- .computeIfAbsent(server.getTier(), ignored -> new
Object2LongOpenHashMap<>());
- if (server.getSegment(segment.getId()) == null) {
- ((Object2LongOpenHashMap<String>) underReplicationPerDataSource)
- .addTo(segment.getDataSource(), 1);
- } else {
- // This make sure that every datasource has a entry even if the
all segments are loaded
-
underReplicationPerDataSource.putIfAbsent(segment.getDataSource(), 0);
- }
- }
- }
+ rule.updateUnderReplicated(underReplicationCountsPerDataSourcePerTier,
segmentReplicantLookup, segment);
- // only the first matching rule applies
+ // Only the first matching rule applies. This is because the
Coordinator cycle through all used segments
+ // and match each segment with the first rule that applies. Each
segment may only match a single rule.
break;
}
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java
b/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java
index cd04bfd..b86ca01 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java
@@ -21,6 +21,8 @@ package org.apache.druid.server.coordinator;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Table;
+import it.unimi.dsi.fastutil.objects.Object2LongMap;
+import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
@@ -62,19 +64,22 @@ public class SegmentReplicantLookup
}
}
- return new SegmentReplicantLookup(segmentsInCluster, loadingSegments);
+ return new SegmentReplicantLookup(segmentsInCluster, loadingSegments,
cluster);
}
private final Table<SegmentId, String, Integer> segmentsInCluster;
private final Table<SegmentId, String, Integer> loadingSegments;
+ private final DruidCluster cluster;
private SegmentReplicantLookup(
Table<SegmentId, String, Integer> segmentsInCluster,
- Table<SegmentId, String, Integer> loadingSegments
+ Table<SegmentId, String, Integer> loadingSegments,
+ DruidCluster cluster
)
{
this.segmentsInCluster = segmentsInCluster;
this.loadingSegments = loadingSegments;
+ this.cluster = cluster;
}
public Map<String, Integer> getClusterTiers(SegmentId segmentId)
@@ -93,7 +98,7 @@ public class SegmentReplicantLookup
return retVal;
}
- int getLoadedReplicants(SegmentId segmentId, String tier)
+ public int getLoadedReplicants(SegmentId segmentId, String tier)
{
Integer retVal = segmentsInCluster.get(segmentId, tier);
return (retVal == null) ? 0 : retVal;
@@ -124,4 +129,21 @@ public class SegmentReplicantLookup
{
return getLoadedReplicants(segmentId, tier) +
getLoadingReplicants(segmentId, tier);
}
+
+ public Object2LongMap<String> getBroadcastUnderReplication(SegmentId
segmentId)
+ {
+ Object2LongOpenHashMap<String> perTier = new Object2LongOpenHashMap<>();
+ for (ServerHolder holder : cluster.getAllServers()) {
+ // Only record tier entry for server that is segment broadcast target
+ if (holder.getServer().getType().isSegmentBroadcastTarget()) {
+ // Every broadcast target server should be serving 1 replica of the
segment
+ if (!holder.isServingSegment(segmentId)) {
+ perTier.addTo(holder.getServer().getTier(), 1L);
+ } else {
+ perTier.putIfAbsent(holder.getServer().getTier(), 0);
+ }
+ }
+ }
+ return perTier;
+ }
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java
b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java
index 26fa9a5..43fdaae 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java
@@ -22,6 +22,7 @@ package org.apache.druid.server.coordinator;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
import java.util.Objects;
@@ -114,7 +115,7 @@ public class ServerHolder implements
Comparable<ServerHolder>
public boolean isServingSegment(DataSegment segment)
{
- return server.getSegment(segment.getId()) != null;
+ return isServingSegment(segment.getId());
}
public boolean isLoadingSegment(DataSegment segment)
@@ -132,6 +133,11 @@ public class ServerHolder implements
Comparable<ServerHolder>
return peon.getNumberOfSegmentsInQueue();
}
+ public boolean isServingSegment(SegmentId segmentId)
+ {
+ return server.getSegment(segmentId) != null;
+ }
+
@Override
public int compareTo(ServerHolder serverHolder)
{
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRule.java
b/server/src/main/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRule.java
index 35ff39e..0b8c37b 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRule.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRule.java
@@ -19,15 +19,19 @@
package org.apache.druid.server.coordinator.rules;
+import it.unimi.dsi.fastutil.objects.Object2LongMap;
+import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.CoordinatorStats;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.SegmentReplicantLookup;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.timeline.DataSegment;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@@ -71,6 +75,37 @@ public abstract class BroadcastDistributionRule implements
Rule
.accumulate(drop(dropServerHolders, segment));
}
+ @Override
+ public boolean canLoadSegments()
+ {
+ return true;
+ }
+
+ @Override
+ public void updateUnderReplicated(
+ Map<String, Object2LongMap<String>> underReplicatedPerTier,
+ SegmentReplicantLookup segmentReplicantLookup,
+ DataSegment segment
+ )
+ {
+ Object2LongMap<String> underReplicatedBroadcastTiers =
segmentReplicantLookup.getBroadcastUnderReplication(segment.getId());
+ for (final Object2LongMap.Entry<String> entry :
underReplicatedBroadcastTiers.object2LongEntrySet()) {
+ final String tier = entry.getKey();
+ final long underReplicatedCount = entry.getLongValue();
+ underReplicatedPerTier.compute(tier, (_tier, existing) -> {
+ Object2LongMap<String> underReplicationPerDataSource = existing;
+ if (existing == null) {
+ underReplicationPerDataSource = new Object2LongOpenHashMap<>();
+ }
+ underReplicationPerDataSource.compute(
+ segment.getDataSource(),
+ (_datasource, count) -> count != null ? count +
underReplicatedCount : underReplicatedCount
+ );
+ return underReplicationPerDataSource;
+ });
+ }
+ }
+
private CoordinatorStats assign(
final Set<ServerHolder> serverHolders,
final DataSegment segment
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/rules/DropRule.java
b/server/src/main/java/org/apache/druid/server/coordinator/rules/DropRule.java
index c565df9..7ffc7a2 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/rules/DropRule.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/rules/DropRule.java
@@ -37,4 +37,10 @@ public abstract class DropRule implements Rule
stats.addToGlobalStat("deletedCount", 1);
return stats;
}
+
+ @Override
+ public boolean canLoadSegments()
+ {
+ return false;
+ }
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java
b/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java
index 8091254..d0bf5b8 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java
@@ -21,6 +21,8 @@ package org.apache.druid.server.coordinator.rules;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import it.unimi.dsi.fastutil.objects.Object2LongMap;
+import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
@@ -30,6 +32,7 @@ import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.ReplicationThrottler;
+import org.apache.druid.server.coordinator.SegmentReplicantLookup;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
@@ -90,6 +93,32 @@ public abstract class LoadRule implements Rule
}
}
+ @Override
+ public boolean canLoadSegments()
+ {
+ return true;
+ }
+
+ @Override
+ public void updateUnderReplicated(
+ Map<String, Object2LongMap<String>> underReplicatedPerTier,
+ SegmentReplicantLookup segmentReplicantLookup,
+ DataSegment segment
+ )
+ {
+ getTieredReplicants().forEach((final String tier, final Integer
ruleReplicants) -> {
+ int currentReplicants =
segmentReplicantLookup.getLoadedReplicants(segment.getId(), tier);
+ Object2LongMap<String> underReplicationPerDataSource =
underReplicatedPerTier.computeIfAbsent(
+ tier,
+ ignored -> new Object2LongOpenHashMap<>()
+ );
+ ((Object2LongOpenHashMap<String>) underReplicationPerDataSource).addTo(
+ segment.getDataSource(),
+ Math.max(ruleReplicants - currentReplicants, 0)
+ );
+ });
+ }
+
/**
* @param stats {@link CoordinatorStats} to accumulate assignment statistics.
*/
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/rules/Rule.java
b/server/src/main/java/org/apache/druid/server/coordinator/rules/Rule.java
index d475f67..02c552f 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/rules/Rule.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/Rule.java
@@ -21,13 +21,18 @@ package org.apache.druid.server.coordinator.rules;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.google.common.base.Preconditions;
+import it.unimi.dsi.fastutil.objects.Object2LongMap;
import org.apache.druid.server.coordinator.CoordinatorStats;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.SegmentReplicantLookup;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
+import java.util.Map;
+
/**
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@@ -52,6 +57,27 @@ public interface Rule
boolean appliesTo(Interval interval, DateTime referenceTimestamp);
/**
+ * Return true if this Rule can load segment onto one or more type of Druid
node, otherwise return false.
+ * Any Rule that returns true for this method should implement logic for
calculating segment under replicated
+ * in {@link Rule#updateUnderReplicated}
+ */
+ boolean canLoadSegments();
+
+ /**
+ * This method should update the {@param underReplicatedPerTier} with the
replication count of the
+ * {@param segment}. Rule that returns true for {@link
Rule#canLoadSegments()} must override this method.
+ * Note that {@param underReplicatedPerTier} is a map of tier -> {
dataSource -> underReplicationCount }
+ */
+ default void updateUnderReplicated(
+ Map<String, Object2LongMap<String>> underReplicatedPerTier,
+ SegmentReplicantLookup segmentReplicantLookup,
+ DataSegment segment
+ )
+ {
+ Preconditions.checkArgument(!canLoadSegments());
+ }
+
+ /**
* {@link DruidCoordinatorRuntimeParams#getUsedSegments()} must not be
called in Rule's code, because the used
* segments are not specified for the {@link DruidCoordinatorRuntimeParams}
passed into Rule's code. This is because
* {@link DruidCoordinatorRuntimeParams} entangles two slightly different
(nonexistent yet) abstractions:
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java
index fcebbde..cb1ee04 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java
@@ -39,7 +39,7 @@ public class ServerHolderTest
{
private static final List<DataSegment> SEGMENTS = ImmutableList.of(
new DataSegment(
- "test",
+ "src1",
Intervals.of("2015-04-12/2015-04-13"),
"1",
ImmutableMap.of("containerName", "container1", "blobPath",
"blobPath1"),
@@ -50,7 +50,7 @@ public class ServerHolderTest
1
),
new DataSegment(
- "test",
+ "src2",
Intervals.of("2015-04-12/2015-04-13"),
"1",
ImmutableMap.of("containerName", "container2", "blobPath",
"blobPath2"),
@@ -177,4 +177,22 @@ public class ServerHolderTest
Assert.assertNotEquals(h1, h4);
Assert.assertNotEquals(h1, h5);
}
+
+ @Test
+ public void testIsServingSegment()
+ {
+ final ServerHolder h1 = new ServerHolder(
+ new ImmutableDruidServer(
+ new DruidServerMetadata("name1", "host1", null, 100L,
ServerType.HISTORICAL, "tier1", 0),
+ 0L,
+ ImmutableMap.of("src1", DATA_SOURCES.get("src1")),
+ 1
+ ),
+ new LoadQueuePeonTester()
+ );
+ Assert.assertTrue(h1.isServingSegment(SEGMENTS.get(0)));
+ Assert.assertFalse(h1.isServingSegment(SEGMENTS.get(1)));
+ Assert.assertTrue(h1.isServingSegment(SEGMENTS.get(0).getId()));
+ Assert.assertFalse(h1.isServingSegment(SEGMENTS.get(1).getId()));
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]