This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new d04521d58f Improve description field when emitting metric for
broadcast failure (#14703)
d04521d58f is described below
commit d04521d58f28119687f6887bb92bfb8cdc189840
Author: Kashif Faraz <[email protected]>
AuthorDate: Tue Aug 1 10:13:55 2023 +0530
Improve description field when emitting metric for broadcast failure
(#14703)
Changes:
- Emit descriptions such as `Load queue is full`, `No disk space` etc.
instead of `Unknown error`
- Rewrite `BroadcastDistributionRuleTest`
---
.../druid/server/coordinator/ServerHolder.java | 5 +
.../loading/StrategicSegmentAssigner.java | 22 +-
.../druid/server/coordinator/ServerHolderTest.java | 1 +
.../rules/BroadcastDistributionRuleTest.java | 522 +++++++--------------
4 files changed, 196 insertions(+), 354 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java
b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java
index 7947f1c1b3..d55ac035d6 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java
@@ -196,6 +196,11 @@ public class ServerHolder implements
Comparable<ServerHolder>
return isDecommissioning;
}
+ public boolean isLoadQueueFull()
+ {
+ return totalAssignmentsInRun >= maxAssignmentsInRun;
+ }
+
public long getAvailableSize()
{
return getMaxSize() - getSizeUsed();
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java
b/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java
index 038707a19f..a1800ae72b 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java
@@ -355,7 +355,7 @@ public class StrategicSegmentAssigner implements
SegmentActionHandler
}
/**
- * Loads the broadcast segment if it is not loaded on the given server.
+ * Loads the broadcast segment if it is not already loaded on the given
server.
* Returns true only if the segment was successfully queued for load on the
server.
*/
private boolean loadBroadcastSegment(DataSegment segment, ServerHolder
server)
@@ -364,19 +364,21 @@ public class StrategicSegmentAssigner implements
SegmentActionHandler
return false;
} else if (server.isDroppingSegment(segment)) {
return server.cancelOperation(SegmentAction.DROP, segment);
+ } else if (server.canLoadSegment(segment)) {
+ return loadSegment(segment, server);
}
- if (server.canLoadSegment(segment) && loadSegment(segment, server)) {
- return true;
+ final String skipReason;
+ if (server.getAvailableSize() < segment.getSize()) {
+ skipReason = "Not enough disk space";
+ } else if (server.isLoadQueueFull()) {
+ skipReason = "Load queue is full";
} else {
- log.makeAlert("Could not assign broadcast segment for datasource [%s]",
segment.getDataSource())
- .addData("segmentId", segment.getId())
- .addData("segmentSize", segment.getSize())
- .addData("hostName", server.getServer().getHost())
- .addData("availableSize", server.getAvailableSize())
- .emit();
- return false;
+ skipReason = "Unknown error";
}
+
+ incrementSkipStat(Stats.Segments.ASSIGN_SKIPPED, skipReason, segment,
server.getServer().getTier());
+ return false;
}
/**
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java
index fd1fb7c306..11e36dea18 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java
@@ -195,5 +195,6 @@ public class ServerHolderTest
Assert.assertFalse(h1.isServingSegment(SEGMENTS.get(1)));
Assert.assertTrue(h1.hasSegmentLoaded(SEGMENTS.get(0).getId()));
Assert.assertFalse(h1.hasSegmentLoaded(SEGMENTS.get(1).getId()));
+ Assert.assertFalse(h1.isLoadQueueFull());
}
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java
index e21d823f34..b57cfc9291 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java
@@ -21,293 +21,214 @@ package org.apache.druid.server.coordinator.rules;
import org.apache.druid.client.DruidServer;
import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.coordinator.CreateDataSegments;
import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.balancer.RandomBalancerStrategy;
+import org.apache.druid.server.coordinator.loading.SegmentAction;
import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
import org.apache.druid.server.coordinator.loading.StrategicSegmentAssigner;
import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.Dimension;
+import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.partition.NoneShardSpec;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
public class BroadcastDistributionRuleTest
{
- private DruidCluster druidCluster;
- private ServerHolder holderOfSmallSegment;
- private final List<ServerHolder> holdersOfLargeSegments = new ArrayList<>();
- private final List<ServerHolder> holdersOfLargeSegments2 = new ArrayList<>();
- private final List<DataSegment> largeSegments = new ArrayList<>();
- private final List<DataSegment> largeSegments2 = new ArrayList<>();
- private DataSegment smallSegment;
- private DruidCluster secondCluster;
- private ServerHolder activeServer;
- private ServerHolder decommissioningServer1;
- private ServerHolder decommissioningServer2;
- private SegmentLoadQueueManager loadQueueManager;
-
- private static final String DS_SMALL = "small_source";
+ private int serverId = 0;
+
+ private static final String DS_WIKI = "wiki";
private static final String TIER_1 = "tier1";
private static final String TIER_2 = "tier2";
+ private final DataSegment wikiSegment
+ = CreateDataSegments.ofDatasource(DS_WIKI).eachOfSizeInMb(100).get(0);
+
@Before
public void setUp()
{
- loadQueueManager = new SegmentLoadQueueManager(null, null, null);
- smallSegment = new DataSegment(
- DS_SMALL,
- Intervals.of("0/1000"),
- DateTimes.nowUtc().toString(),
- new HashMap<>(),
- new ArrayList<>(),
- new ArrayList<>(),
- NoneShardSpec.instance(),
- 0,
- 0
- );
+ serverId = 0;
+ }
- for (int i = 0; i < 3; i++) {
- largeSegments.add(
- new DataSegment(
- "large_source",
- Intervals.of((i * 1000) + "/" + ((i + 1) * 1000)),
- DateTimes.nowUtc().toString(),
- new HashMap<>(),
- new ArrayList<>(),
- new ArrayList<>(),
- NoneShardSpec.instance(),
- 0,
- 100
- )
- );
- }
+ @Test
+ public void testSegmentIsBroadcastToAllTiers()
+ {
+ // 2 tiers with one server each
+ final ServerHolder serverT11 = create10gbHistorical(TIER_1);
+ final ServerHolder serverT21 = create10gbHistorical(TIER_2);
+ DruidCluster cluster =
DruidCluster.builder().add(serverT11).add(serverT21).build();
+ DruidCoordinatorRuntimeParams params = makeParamsWithUsedSegments(cluster,
wikiSegment);
- for (int i = 0; i < 2; i++) {
- largeSegments2.add(
- new DataSegment(
- "large_source2",
- Intervals.of((i * 1000) + "/" + ((i + 1) * 1000)),
- DateTimes.nowUtc().toString(),
- new HashMap<>(),
- new ArrayList<>(),
- new ArrayList<>(),
- NoneShardSpec.instance(),
- 0,
- 100
- )
- );
- }
+ ForeverBroadcastDistributionRule rule = new
ForeverBroadcastDistributionRule();
+ CoordinatorRunStats stats = runRuleOnSegment(rule, wikiSegment, params);
- holderOfSmallSegment = new ServerHolder(
- new DruidServer(
- "serverHot2",
- "hostHot2",
- null,
- 1000,
- ServerType.HISTORICAL,
- TIER_1,
- 0
- ).addDataSegment(smallSegment)
- .toImmutableDruidServer(),
- new TestLoadQueuePeon()
- );
+ // Verify that segment is assigned to servers of all tiers
+ Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED,
TIER_1, DS_WIKI));
+ Assert.assertTrue(serverT11.isLoadingSegment(wikiSegment));
- holdersOfLargeSegments.add(
- new ServerHolder(
- new DruidServer(
- "serverHot1",
- "hostHot1",
- null,
- 1000,
- ServerType.HISTORICAL,
- TIER_1,
- 0
- ).addDataSegment(largeSegments.get(0))
- .toImmutableDruidServer(),
- new TestLoadQueuePeon()
- )
- );
- holdersOfLargeSegments.add(
- new ServerHolder(
- new DruidServer(
- "serverNorm1",
- "hostNorm1",
- null,
- 1000,
- ServerType.HISTORICAL,
- TIER_2,
- 0
- ).addDataSegment(largeSegments.get(1))
- .toImmutableDruidServer(),
- new TestLoadQueuePeon()
- )
- );
- holdersOfLargeSegments.add(
- new ServerHolder(
- new DruidServer(
- "serverNorm2",
- "hostNorm2",
- null,
- 100,
- ServerType.HISTORICAL,
- TIER_2,
- 0
- ).addDataSegment(largeSegments.get(2))
- .toImmutableDruidServer(),
- new TestLoadQueuePeon()
- )
- );
+ Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED,
TIER_2, DS_WIKI));
+ Assert.assertTrue(serverT21.isLoadingSegment(wikiSegment));
+ }
- holdersOfLargeSegments2.add(
- new ServerHolder(
- new DruidServer(
- "serverHot3",
- "hostHot3",
- null,
- 1000,
- ServerType.HISTORICAL,
- TIER_1,
- 0
- ).addDataSegment(largeSegments2.get(0))
- .toImmutableDruidServer(),
- new TestLoadQueuePeon()
- )
- );
- holdersOfLargeSegments2.add(
- new ServerHolder(
- new DruidServer(
- "serverNorm3",
- "hostNorm3",
- null,
- 100,
- ServerType.HISTORICAL,
- TIER_2,
- 0
- ).addDataSegment(largeSegments2.get(1))
- .toImmutableDruidServer(),
- new TestLoadQueuePeon()
- )
- );
+ @Test
+ public void testSegmentIsNotBroadcastToServerIfAlreadyLoaded()
+ {
+ // serverT11 is already serving the segment which is being broadcast
+ final ServerHolder serverT11 = create10gbHistorical(TIER_1, wikiSegment);
+ final ServerHolder serverT12 = create10gbHistorical(TIER_1);
+ DruidCluster cluster =
DruidCluster.builder().add(serverT11).add(serverT12).build();
+ DruidCoordinatorRuntimeParams params = makeParamsWithUsedSegments(cluster,
wikiSegment);
+
+ ForeverBroadcastDistributionRule rule = new
ForeverBroadcastDistributionRule();
+ CoordinatorRunStats stats = runRuleOnSegment(rule, wikiSegment, params);
+
+ // Verify that serverT11 is already serving and serverT12 is loading
segment
+ Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED,
TIER_1, DS_WIKI));
+ Assert.assertFalse(serverT11.isLoadingSegment(wikiSegment));
+ Assert.assertTrue(serverT11.isServingSegment(wikiSegment));
+ Assert.assertTrue(serverT12.isLoadingSegment(wikiSegment));
+ }
- activeServer = new ServerHolder(
- new DruidServer(
- "active",
- "host1",
- null,
- 100,
- ServerType.HISTORICAL,
- TIER_1,
- 0
- ).addDataSegment(largeSegments.get(0))
- .toImmutableDruidServer(),
+ @Test
+ public void testSegmentIsNotBroadcastToDecommissioningServer()
+ {
+ ServerHolder activeServer = create10gbHistorical(TIER_1);
+ ServerHolder decommissioningServer =
createDecommissioningHistorical(TIER_1);
+ DruidCluster cluster = DruidCluster.builder()
+ .add(activeServer)
+ .add(decommissioningServer).build();
+ DruidCoordinatorRuntimeParams params = makeParamsWithUsedSegments(cluster,
wikiSegment);
+
+ ForeverBroadcastDistributionRule rule = new
ForeverBroadcastDistributionRule();
+ CoordinatorRunStats stats = runRuleOnSegment(rule, wikiSegment, params);
+
+ Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED,
TIER_1, DS_WIKI));
+ Assert.assertTrue(activeServer.isLoadingSegment(wikiSegment));
+ Assert.assertTrue(decommissioningServer.getLoadingSegments().isEmpty());
+ }
+
+ @Test
+ public void testBroadcastSegmentIsDroppedFromDecommissioningServer()
+ {
+ // Both active and decommissioning servers are already serving the segment
+ ServerHolder activeServer = create10gbHistorical(TIER_1, wikiSegment);
+ ServerHolder decommissioningServer =
createDecommissioningHistorical(TIER_1, wikiSegment);
+ DruidCluster cluster = DruidCluster.builder()
+ .add(activeServer)
+ .add(decommissioningServer)
+ .build();
+ DruidCoordinatorRuntimeParams params = makeParamsWithUsedSegments(cluster,
wikiSegment);
+
+ ForeverBroadcastDistributionRule rule = new
ForeverBroadcastDistributionRule();
+ CoordinatorRunStats stats = runRuleOnSegment(rule, wikiSegment, params);
+
+ // Verify that segment is dropped only from the decommissioning server
+ Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.DROPPED,
TIER_1, DS_WIKI));
+ Assert.assertTrue(activeServer.getPeon().getSegmentsToDrop().isEmpty());
+
Assert.assertTrue(decommissioningServer.getPeon().getSegmentsToDrop().contains(wikiSegment));
+ }
+
+ @Test
+ public void testSegmentIsBroadcastToAllServerTypes()
+ {
+ final ServerHolder broker = new ServerHolder(
+ create10gbServer(ServerType.BROKER,
"broker_tier").toImmutableDruidServer(),
new TestLoadQueuePeon()
);
-
- decommissioningServer1 = new ServerHolder(
- new DruidServer(
- "decommissioning1",
- "host2",
- null,
- 100,
- ServerType.HISTORICAL,
- TIER_1,
- 0
- ).addDataSegment(smallSegment)
- .toImmutableDruidServer(),
- new TestLoadQueuePeon(),
- true
+ final ServerHolder indexer = new ServerHolder(
+ create10gbServer(ServerType.INDEXER_EXECUTOR,
TIER_2).toImmutableDruidServer(),
+ new TestLoadQueuePeon()
);
+ final ServerHolder historical = create10gbHistorical(TIER_1);
- decommissioningServer2 = new ServerHolder(
- new DruidServer(
- "decommissioning2",
- "host3",
- null,
- 100,
- ServerType.HISTORICAL,
- TIER_1,
- 0
- ).addDataSegment(largeSegments.get(1))
- .toImmutableDruidServer(),
- new TestLoadQueuePeon(),
- true
- );
+ DruidCluster cluster = DruidCluster.builder()
+
.add(broker).add(indexer).add(historical)
+ .build();
+ DruidCoordinatorRuntimeParams params = makeParamsWithUsedSegments(cluster,
wikiSegment);
- druidCluster = DruidCluster
- .builder()
- .addTier(
- TIER_1,
- holdersOfLargeSegments.get(0),
- holderOfSmallSegment,
- holdersOfLargeSegments2.get(0)
- )
- .addTier(
- TIER_2,
- holdersOfLargeSegments.get(1),
- holdersOfLargeSegments.get(2),
- holdersOfLargeSegments2.get(1)
- )
- .build();
+ ForeverBroadcastDistributionRule rule = new
ForeverBroadcastDistributionRule();
+ final CoordinatorRunStats stats = runRuleOnSegment(rule, wikiSegment,
params);
- secondCluster = DruidCluster
- .builder()
- .addTier(
- TIER_1,
- activeServer,
- decommissioningServer1,
- decommissioningServer2
- )
- .build();
+ // Verify that segment is assigned to historical, broker as well as indexer
+ Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED,
TIER_1, DS_WIKI));
+ Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED,
TIER_2, DS_WIKI));
+ Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED,
broker.getServer().getTier(), DS_WIKI));
+
+ Assert.assertTrue(historical.isLoadingSegment(wikiSegment));
+ Assert.assertTrue(indexer.isLoadingSegment(wikiSegment));
+ Assert.assertTrue(broker.isLoadingSegment(wikiSegment));
}
@Test
- public void testBroadcastToSingleDataSource()
+ public void testReasonForBroadcastFailure()
{
- final ForeverBroadcastDistributionRule rule =
- new ForeverBroadcastDistributionRule();
-
- CoordinatorRunStats stats = runRuleAndGetStats(
- rule,
- smallSegment,
- makeCoordinartorRuntimeParams(
- druidCluster,
- smallSegment,
- largeSegments.get(0),
- largeSegments.get(1),
- largeSegments.get(2),
- largeSegments2.get(0),
- largeSegments2.get(1)
- )
+ final ServerHolder eligibleServer = create10gbHistorical(TIER_1);
+ final ServerHolder serverWithNoDiskSpace = new ServerHolder(
+ new DruidServer("server1", "server1", null, 0L, ServerType.HISTORICAL,
TIER_1, 0)
+ .toImmutableDruidServer(),
+ new TestLoadQueuePeon()
);
- Assert.assertEquals(2L, stats.getSegmentStat(Stats.Segments.ASSIGNED,
TIER_1, DS_SMALL));
- Assert.assertEquals(3L, stats.getSegmentStat(Stats.Segments.ASSIGNED,
TIER_2, DS_SMALL));
-
- Assert.assertTrue(
- holdersOfLargeSegments.stream().allMatch(
- holder -> holder.isLoadingSegment(smallSegment)
- )
- );
- Assert.assertTrue(
- holdersOfLargeSegments2.stream().allMatch(
- holder -> holder.isLoadingSegment(smallSegment)
- )
+ // Create a server with full load queue
+ final int maxSegmentsInLoadQueue = 5;
+ final ServerHolder serverWithFullQueue = new ServerHolder(
+ create10gbServer(ServerType.HISTORICAL,
TIER_1).toImmutableDruidServer(),
+ new TestLoadQueuePeon(), false, maxSegmentsInLoadQueue, 100
);
- Assert.assertTrue(holderOfSmallSegment.isServingSegment(smallSegment));
+
+ List<DataSegment> segmentsInQueue
+ = CreateDataSegments.ofDatasource("koala")
+ .forIntervals(maxSegmentsInLoadQueue,
Granularities.MONTH)
+ .withNumPartitions(1)
+ .eachOfSizeInMb(10);
+ segmentsInQueue.forEach(s ->
serverWithFullQueue.startOperation(SegmentAction.LOAD, s));
+ Assert.assertTrue(serverWithFullQueue.isLoadQueueFull());
+
+ DruidCluster cluster = DruidCluster.builder()
+ .add(eligibleServer)
+ .add(serverWithNoDiskSpace)
+ .add(serverWithFullQueue)
+ .build();
+ DruidCoordinatorRuntimeParams params = makeParamsWithUsedSegments(cluster,
wikiSegment);
+
+ ForeverBroadcastDistributionRule rule = new
ForeverBroadcastDistributionRule();
+ final CoordinatorRunStats stats = runRuleOnSegment(rule, wikiSegment,
params);
+
+ // Verify that the segment is broadcast only to the eligible server
+ Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED,
TIER_1, DS_WIKI));
+ RowKey metricKey = RowKey.with(Dimension.DATASOURCE, DS_WIKI)
+ .with(Dimension.TIER, TIER_1)
+ .and(Dimension.DESCRIPTION, "Not enough disk
space");
+ Assert.assertEquals(1L, stats.get(Stats.Segments.ASSIGN_SKIPPED,
metricKey));
+
+ metricKey = RowKey.with(Dimension.DATASOURCE, DS_WIKI)
+ .with(Dimension.TIER, TIER_1)
+ .and(Dimension.DESCRIPTION, "Load queue is full");
+ Assert.assertEquals(1L, stats.get(Stats.Segments.ASSIGN_SKIPPED,
metricKey));
}
- private DruidCoordinatorRuntimeParams makeCoordinartorRuntimeParams(
+ private CoordinatorRunStats runRuleOnSegment(
+ Rule rule,
+ DataSegment segment,
+ DruidCoordinatorRuntimeParams params
+ )
+ {
+ StrategicSegmentAssigner segmentAssigner = params.getSegmentAssigner();
+ rule.run(segment, segmentAssigner);
+ return segmentAssigner.getStats();
+ }
+
+ private DruidCoordinatorRuntimeParams makeParamsWithUsedSegments(
DruidCluster druidCluster,
DataSegment... usedSegments
)
@@ -317,118 +238,31 @@ public class BroadcastDistributionRuleTest
.withDruidCluster(druidCluster)
.withUsedSegmentsInTest(usedSegments)
.withBalancerStrategy(new RandomBalancerStrategy())
- .withSegmentAssignerUsing(loadQueueManager)
+ .withSegmentAssignerUsing(new SegmentLoadQueueManager(null, null,
null))
.build();
}
- /**
- * Servers:
- * name | segments
- * -----------------+--------------
- * active | large segment
- * decommissioning1 | small segment
- * decommissioning2 | large segment
- * <p>
- * After running the rule for the small segment:
- * active | large & small segments
- * decommissioning1 |
- * decommissionint2 | large segment
- */
- @Test
- public void testBroadcastDecommissioning()
+ private ServerHolder create10gbHistorical(String tier, DataSegment...
segments)
{
- final ForeverBroadcastDistributionRule rule =
- new ForeverBroadcastDistributionRule();
-
- CoordinatorRunStats stats = runRuleAndGetStats(
- rule,
- smallSegment,
- makeCoordinartorRuntimeParams(
- secondCluster,
- smallSegment,
- largeSegments.get(0),
- largeSegments.get(1)
- )
- );
-
- Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED,
TIER_1, DS_SMALL));
- Assert.assertEquals(1, activeServer.getPeon().getSegmentsToLoad().size());
- Assert.assertEquals(1,
decommissioningServer1.getPeon().getSegmentsToDrop().size());
- Assert.assertEquals(0,
decommissioningServer2.getPeon().getSegmentsToLoad().size());
- }
-
- @Test
- public void testBroadcastToMultipleDataSources()
- {
- final ForeverBroadcastDistributionRule rule = new
ForeverBroadcastDistributionRule();
-
- CoordinatorRunStats stats = runRuleAndGetStats(
- rule,
- smallSegment,
- makeCoordinartorRuntimeParams(
- druidCluster,
- smallSegment,
- largeSegments.get(0),
- largeSegments.get(1),
- largeSegments.get(2),
- largeSegments2.get(0),
- largeSegments2.get(1)
- )
- );
-
- Assert.assertEquals(2L, stats.getSegmentStat(Stats.Segments.ASSIGNED,
TIER_1, DS_SMALL));
- Assert.assertEquals(3L, stats.getSegmentStat(Stats.Segments.ASSIGNED,
TIER_2, DS_SMALL));
-
- Assert.assertTrue(
- holdersOfLargeSegments.stream().allMatch(
- holder -> holder.isLoadingSegment(smallSegment)
- )
- );
- Assert.assertTrue(
- holdersOfLargeSegments2.stream().allMatch(
- holder -> holder.isLoadingSegment(smallSegment)
- )
- );
- Assert.assertFalse(holderOfSmallSegment.isLoadingSegment(smallSegment));
+ DruidServer server = create10gbServer(ServerType.HISTORICAL, tier);
+ for (DataSegment segment : segments) {
+ server.addDataSegment(segment);
+ }
+ return new ServerHolder(server.toImmutableDruidServer(), new
TestLoadQueuePeon());
}
- @Test
- public void testBroadcastToAllServers()
+ private ServerHolder createDecommissioningHistorical(String tier,
DataSegment... segments)
{
- final ForeverBroadcastDistributionRule rule = new
ForeverBroadcastDistributionRule();
-
- CoordinatorRunStats stats = runRuleAndGetStats(
- rule,
- smallSegment,
- makeCoordinartorRuntimeParams(
- druidCluster,
- smallSegment,
- largeSegments.get(0),
- largeSegments.get(1),
- largeSegments.get(2),
- largeSegments2.get(0),
- largeSegments2.get(1)
- )
- );
-
- Assert.assertEquals(2L, stats.getSegmentStat(Stats.Segments.ASSIGNED,
TIER_1, DS_SMALL));
- Assert.assertEquals(3L, stats.getSegmentStat(Stats.Segments.ASSIGNED,
TIER_2, DS_SMALL));
-
- Assert.assertTrue(
- druidCluster.getAllServers().stream().allMatch(
- holder -> holder.isLoadingSegment(smallSegment) ||
holder.isServingSegment(smallSegment)
- )
- );
+ DruidServer server = create10gbServer(ServerType.HISTORICAL, tier);
+ for (DataSegment segment : segments) {
+ server.addDataSegment(segment);
+ }
+ return new ServerHolder(server.toImmutableDruidServer(), new
TestLoadQueuePeon(), true);
}
- private CoordinatorRunStats runRuleAndGetStats(
- Rule rule,
- DataSegment segment,
- DruidCoordinatorRuntimeParams params
- )
+ private DruidServer create10gbServer(ServerType type, String tier)
{
- StrategicSegmentAssigner segmentAssigner = params.getSegmentAssigner();
- rule.run(segment, segmentAssigner);
- return segmentAssigner.getStats();
+ final String name = "server_" + serverId++;
+ return new DruidServer(name, name, null, 10L << 30, type, tier, 0);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]