This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 22290fd632 Test: Simplify test impl of LoadQueuePeon (#14684)
22290fd632 is described below
commit 22290fd632c5026f2341c023affec4ccfa446721
Author: Kashif Faraz <[email protected]>
AuthorDate: Fri Jul 28 16:14:23 2023 +0530
Test: Simplify test impl of LoadQueuePeon (#14684)
Changes
- Rename `LoadQueuePeonTester` to `TestLoadQueuePeon`
- Simplify `TestLoadQueuePeon` by removing dependency on
`CuratorLoadQueuePeon`
- Remove usages of mock peons in `LoadRuleTest` and use `TestLoadQueuePeon`
instead
---
.../coordinator/BalancerStrategyBenchmark.java | 4 +-
.../coordinator/BalanceSegmentsProfiler.java | 8 +-
.../druid/server/coordinator/DruidClusterTest.java | 10 +-
.../coordinator/RoundRobinServerSelectorTest.java | 4 +-
.../druid/server/coordinator/ServerHolderTest.java | 22 +-
.../coordinator/balancer/BalancerStrategyTest.java | 10 +-
.../balancer/CachingCostBalancerStrategyTest.java | 4 +-
.../balancer/CostBalancerStrategyTest.java | 10 +-
.../DiskNormalizedCostBalancerStrategyTest.java | 6 +-
.../balancer/ReservoirSegmentSamplerTest.java | 6 +-
.../coordinator/duty/BalanceSegmentsTest.java | 4 +-
.../coordinator/duty/UnloadUnusedSegmentsTest.java | 18 +-
.../coordinator/loading/LoadQueuePeonTester.java | 61 ----
.../coordinator/loading/TestLoadQueuePeon.java | 124 +++++++
.../rules/BroadcastDistributionRuleTest.java | 20 +-
.../server/coordinator/rules/LoadRuleTest.java | 372 ++++++---------------
16 files changed, 297 insertions(+), 386 deletions(-)
diff --git
a/benchmarks/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyBenchmark.java
b/benchmarks/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyBenchmark.java
index 2afd99c8c3..e51cc7da3a 100644
---
a/benchmarks/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyBenchmark.java
+++
b/benchmarks/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyBenchmark.java
@@ -27,7 +27,7 @@ import
org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.balancer.BalancerSegmentHolder;
import org.apache.druid.server.coordinator.balancer.ReservoirSegmentSampler;
-import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
+import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import org.openjdk.jmh.annotations.Benchmark;
@@ -105,7 +105,7 @@ public class BalancerStrategyBenchmark
ImmutableMap.of("test", new ImmutableDruidDataSource("test",
Collections.emptyMap(), segments)),
segments.size()
),
- new LoadQueuePeonTester()
+ new TestLoadQueuePeon()
)
);
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java
b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java
index 4a45e16bf9..6a76a865d8 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java
@@ -31,8 +31,8 @@ import org.apache.druid.metadata.MetadataRuleManager;
import org.apache.druid.server.coordinator.duty.BalanceSegments;
import org.apache.druid.server.coordinator.duty.RunRules;
import org.apache.druid.server.coordinator.loading.LoadQueuePeon;
-import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
+import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.apache.druid.server.coordinator.rules.PeriodLoadRule;
import org.apache.druid.server.coordinator.rules.Rule;
import org.apache.druid.timeline.DataSegment;
@@ -117,7 +117,7 @@ public class BalanceSegmentsProfiler
EasyMock.expect(server.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes();
EasyMock.replay(server);
- LoadQueuePeon peon = new LoadQueuePeonTester();
+ LoadQueuePeon peon = new TestLoadQueuePeon();
serverHolderList.add(new ServerHolder(server, peon));
}
@@ -154,8 +154,8 @@ public class BalanceSegmentsProfiler
public void profileRun()
{
Stopwatch watch = Stopwatch.createUnstarted();
- LoadQueuePeonTester fromPeon = new LoadQueuePeonTester();
- LoadQueuePeonTester toPeon = new LoadQueuePeonTester();
+ TestLoadQueuePeon fromPeon = new TestLoadQueuePeon();
+ TestLoadQueuePeon toPeon = new TestLoadQueuePeon();
EasyMock.expect(druidServer1.getName()).andReturn("from").atLeastOnce();
EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce();
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/DruidClusterTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/DruidClusterTest.java
index 182d58dd38..17a4de1d73 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/DruidClusterTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/DruidClusterTest.java
@@ -22,7 +22,7 @@ package org.apache.druid.server.coordinator;
import org.apache.druid.client.DruidServer;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.server.coordination.ServerType;
-import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
+import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.apache.druid.timeline.DataSegment;
import org.junit.Assert;
import org.junit.Before;
@@ -47,13 +47,13 @@ public class DruidClusterTest
private static final ServerHolder NEW_REALTIME = new ServerHolder(
new DruidServer("name1", "host2", null, 100L, ServerType.REALTIME,
"tier1", 0)
.addDataSegment(SEGMENTS.get(0)).toImmutableDruidServer(),
- new LoadQueuePeonTester()
+ new TestLoadQueuePeon()
);
private static final ServerHolder NEW_HISTORICAL = new ServerHolder(
new DruidServer("name1", "host2", null, 100L, ServerType.HISTORICAL,
"tier1", 0)
.addDataSegment(SEGMENTS.get(0)).toImmutableDruidServer(),
- new LoadQueuePeonTester()
+ new TestLoadQueuePeon()
);
private DruidCluster.Builder clusterBuilder;
@@ -67,14 +67,14 @@ public class DruidClusterTest
new ServerHolder(
new DruidServer("name1", "host1", null, 100L,
ServerType.REALTIME, "tier1", 0)
.addDataSegment(SEGMENTS.get(0)).toImmutableDruidServer(),
- new LoadQueuePeonTester()
+ new TestLoadQueuePeon()
)
)
.add(
new ServerHolder(
new DruidServer("name1", "host1", null, 100L,
ServerType.HISTORICAL, "tier1", 0)
.addDataSegment(SEGMENTS.get(0)).toImmutableDruidServer(),
- new LoadQueuePeonTester()
+ new TestLoadQueuePeon()
)
);
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/RoundRobinServerSelectorTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/RoundRobinServerSelectorTest.java
index 4164bba364..62f84b631a 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/RoundRobinServerSelectorTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/RoundRobinServerSelectorTest.java
@@ -23,8 +23,8 @@ import org.apache.druid.client.DruidServer;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.server.coordination.ServerType;
-import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
import org.apache.druid.server.coordinator.loading.RoundRobinServerSelector;
+import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.junit.Assert;
@@ -140,7 +140,7 @@ public class RoundRobinServerSelectorTest
return new ServerHolder(
new DruidServer(name, name, null, size, ServerType.HISTORICAL, TIER, 1)
.toImmutableDruidServer(),
- new LoadQueuePeonTester()
+ new TestLoadQueuePeon()
);
}
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java
index ed9735c6bc..fd1fb7c306 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java
@@ -26,7 +26,7 @@ import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
-import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
+import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.junit.Assert;
@@ -79,7 +79,7 @@ public class ServerHolderTest
ImmutableMap.of("src1", DATA_SOURCES.get("src1")),
1
),
- new LoadQueuePeonTester()
+ new TestLoadQueuePeon()
);
// available size of 100
@@ -90,7 +90,7 @@ public class ServerHolderTest
ImmutableMap.of("src1", DATA_SOURCES.get("src1")),
1
),
- new LoadQueuePeonTester()
+ new TestLoadQueuePeon()
);
// available size of 10
@@ -101,7 +101,7 @@ public class ServerHolderTest
ImmutableMap.of("src1", DATA_SOURCES.get("src1")),
1
),
- new LoadQueuePeonTester()
+ new TestLoadQueuePeon()
);
// available size of 50
@@ -112,7 +112,7 @@ public class ServerHolderTest
ImmutableMap.of("src1", DATA_SOURCES.get("src1")),
1
),
- new LoadQueuePeonTester()
+ new TestLoadQueuePeon()
);
Assert.assertEquals(0, h1.compareTo(h2));
@@ -130,7 +130,7 @@ public class ServerHolderTest
ImmutableMap.of("src1", DATA_SOURCES.get("src1")),
1
),
- new LoadQueuePeonTester()
+ new TestLoadQueuePeon()
);
final ServerHolder h2 = new ServerHolder(
@@ -140,7 +140,7 @@ public class ServerHolderTest
ImmutableMap.of("src1", DATA_SOURCES.get("src1")),
1
),
- new LoadQueuePeonTester()
+ new TestLoadQueuePeon()
);
final ServerHolder h3 = new ServerHolder(
@@ -150,7 +150,7 @@ public class ServerHolderTest
ImmutableMap.of("src1", DATA_SOURCES.get("src1")),
1
),
- new LoadQueuePeonTester()
+ new TestLoadQueuePeon()
);
final ServerHolder h4 = new ServerHolder(
@@ -160,7 +160,7 @@ public class ServerHolderTest
ImmutableMap.of("src1", DATA_SOURCES.get("src1")),
1
),
- new LoadQueuePeonTester()
+ new TestLoadQueuePeon()
);
final ServerHolder h5 = new ServerHolder(
@@ -170,7 +170,7 @@ public class ServerHolderTest
ImmutableMap.of("src1", DATA_SOURCES.get("src1")),
1
),
- new LoadQueuePeonTester()
+ new TestLoadQueuePeon()
);
Assert.assertEquals(h1, h2);
@@ -189,7 +189,7 @@ public class ServerHolderTest
ImmutableMap.of("src1", DATA_SOURCES.get("src1")),
1
),
- new LoadQueuePeonTester()
+ new TestLoadQueuePeon()
);
Assert.assertTrue(h1.isServingSegment(SEGMENTS.get(0)));
Assert.assertFalse(h1.isServingSegment(SEGMENTS.get(1)));
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/balancer/BalancerStrategyTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/balancer/BalancerStrategyTest.java
index ad4d7f65ae..6e0e3ddf0b 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/balancer/BalancerStrategyTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/balancer/BalancerStrategyTest.java
@@ -24,7 +24,7 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.ServerHolder;
-import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
+import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.junit.Assert;
@@ -84,7 +84,7 @@ public class BalancerStrategyTest
{
final ServerHolder serverHolder = new ServerHolder(
new DruidServer("server1", "host1", null, 10L, ServerType.HISTORICAL,
DruidServer.DEFAULT_TIER,
0).addDataSegment(proposedDataSegment).toImmutableDruidServer(),
- new LoadQueuePeonTester());
+ new TestLoadQueuePeon());
Assert.assertFalse(
balancerStrategy.findServersToLoadSegment(
proposedDataSegment,
@@ -99,12 +99,12 @@ public class BalancerStrategyTest
final ServerHolder serverHolder1 = new ServerHolder(
new DruidServer("server1", "host1", null, 1000L,
ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0)
.addDataSegment(proposedDataSegment).toImmutableDruidServer(),
- new LoadQueuePeonTester());
+ new TestLoadQueuePeon());
final ServerHolder serverHolder2 = new ServerHolder(
new DruidServer("server2", "host2", null, 1000L,
ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0)
.addDataSegment(proposedDataSegment).toImmutableDruidServer(),
- new LoadQueuePeonTester());
+ new TestLoadQueuePeon());
serverHolders = new ArrayList<>();
serverHolders.add(serverHolder1);
@@ -119,7 +119,7 @@ public class BalancerStrategyTest
{
final ServerHolder serverHolder = new ServerHolder(
new DruidServer("server1", "host1", null, 1000L,
ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0).toImmutableDruidServer(),
- new LoadQueuePeonTester());
+ new TestLoadQueuePeon());
serverHolders = new ArrayList<>();
serverHolders.add(serverHolder);
final ServerHolder foundServerHolder = balancerStrategy
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyTest.java
index 23975352b3..f0d6519974 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyTest.java
@@ -26,7 +26,7 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.ServerHolder;
-import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
+import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@@ -141,7 +141,7 @@ public class CachingCostBalancerStrategyTest
.forEach(druidServer::addDataSegment);
return new ServerHolder(
druidServer.toImmutableDruidServer(),
- new LoadQueuePeonTester()
+ new TestLoadQueuePeon()
);
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyTest.java
index 17d0a8716a..e4b3715960 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyTest.java
@@ -29,7 +29,7 @@ import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.CreateDataSegments;
import org.apache.druid.server.coordinator.ServerHolder;
-import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
+import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
@@ -283,7 +283,7 @@ public class CostBalancerStrategyTest
// Create ServerHolder for each server
final List<ServerHolder> serverHolders = historicals.stream().map(
- server -> new ServerHolder(server.toImmutableDruidServer(), new
LoadQueuePeonTester())
+ server -> new ServerHolder(server.toImmutableDruidServer(), new
TestLoadQueuePeon())
).collect(Collectors.toList());
final ServerHolder serverA = serverHolders.get(0);
@@ -321,11 +321,11 @@ public class CostBalancerStrategyTest
{
final ServerHolder serverA = new ServerHolder(
createHistorical().toImmutableDruidServer(),
- new LoadQueuePeonTester()
+ new TestLoadQueuePeon()
);
final ServerHolder serverB = new ServerHolder(
createHistorical().toImmutableDruidServer(),
- new LoadQueuePeonTester()
+ new TestLoadQueuePeon()
);
final DataSegment segment =
CreateDataSegments.ofDatasource(DS_WIKI).eachOfSizeInMb(100).get(0);
@@ -356,7 +356,7 @@ public class CostBalancerStrategyTest
.startingAt("2012-10-24")
.eachOfSizeInMb(100).get(0);
- final LoadQueuePeonTester peon = new LoadQueuePeonTester();
+ final TestLoadQueuePeon peon = new TestLoadQueuePeon();
ServerHolder serverA = new
ServerHolder(createHistorical().toImmutableDruidServer(), peon);
ServerHolder serverB = new
ServerHolder(createHistorical().toImmutableDruidServer(), peon);
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategyTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategyTest.java
index 98f86ec318..cbc4b8f0f7 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategyTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategyTest.java
@@ -29,7 +29,7 @@ import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.ServerHolder;
-import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
+import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.apache.druid.timeline.DataSegment;
import org.easymock.EasyMock;
import org.joda.time.Interval;
@@ -57,7 +57,7 @@ public class DiskNormalizedCostBalancerStrategyTest
// Create 10 servers with current size being 3K & max size being 10K
// Each having having 100 segments
for (int i = 0; i < serverCount; i++) {
- LoadQueuePeonTester fromPeon = new LoadQueuePeonTester();
+ TestLoadQueuePeon fromPeon = new TestLoadQueuePeon();
List<DataSegment> segments = IntStream
.range(0, maxSegments)
@@ -78,7 +78,7 @@ public class DiskNormalizedCostBalancerStrategyTest
}
// The best server to be available for next segment assignment has greater
max Size
- LoadQueuePeonTester fromPeon = new LoadQueuePeonTester();
+ TestLoadQueuePeon fromPeon = new TestLoadQueuePeon();
ImmutableDruidServer druidServer =
EasyMock.createMock(ImmutableDruidServer.class);
EasyMock.expect(druidServer.getName()).andReturn("BEST_SERVER").anyTimes();
EasyMock.expect(druidServer.getCurrSize()).andReturn(3000L).anyTimes();
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/balancer/ReservoirSegmentSamplerTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/balancer/ReservoirSegmentSamplerTest.java
index e6387a5a34..c438219e1b 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/balancer/ReservoirSegmentSamplerTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/balancer/ReservoirSegmentSamplerTest.java
@@ -25,8 +25,8 @@ import
org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.CreateDataSegments;
import org.apache.druid.server.coordinator.ServerHolder;
-import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
import org.apache.druid.server.coordinator.loading.SegmentAction;
+import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.apache.druid.timeline.DataSegment;
import org.junit.Assert;
import org.junit.Before;
@@ -175,7 +175,7 @@ public class ReservoirSegmentSamplerTest
.addDataSegment(segments.get(2))
.addDataSegment(segments.get(3))
.toImmutableDruidServer(),
- new LoadQueuePeonTester()
+ new TestLoadQueuePeon()
);
// Try to pick all the segments on the servers
@@ -380,6 +380,6 @@ public class ReservoirSegmentSamplerTest
for (DataSegment segment : loadedSegments) {
server.addDataSegment(segment);
}
- return new ServerHolder(server.toImmutableDruidServer(), new
LoadQueuePeonTester());
+ return new ServerHolder(server.toImmutableDruidServer(), new
TestLoadQueuePeon());
}
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java
index 185f8e8263..c608f58d9a 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java
@@ -31,8 +31,8 @@ import
org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.balancer.BalancerStrategy;
import
org.apache.druid.server.coordinator.balancer.CostBalancerStrategyFactory;
-import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
+import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
@@ -423,7 +423,7 @@ public class BalanceSegmentsTest
return new ServerHolder(
server.toImmutableDruidServer(),
- new LoadQueuePeonTester(),
+ new TestLoadQueuePeon(),
isDecommissioning,
maxSegmentsInLoadQueue,
10
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegmentsTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegmentsTest.java
index 83aa11fb30..ba39c5fc43 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegmentsTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegmentsTest.java
@@ -33,8 +33,8 @@ import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.ServerHolder;
-import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
+import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import
org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule;
import org.apache.druid.server.coordinator.rules.ForeverLoadRule;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
@@ -61,10 +61,10 @@ public class UnloadUnusedSegmentsTest
private ImmutableDruidServer historicalServerTier2;
private ImmutableDruidServer brokerServer;
private ImmutableDruidServer indexerServer;
- private LoadQueuePeonTester historicalPeon;
- private LoadQueuePeonTester historicalTier2Peon;
- private LoadQueuePeonTester brokerPeon;
- private LoadQueuePeonTester indexerPeon;
+ private TestLoadQueuePeon historicalPeon;
+ private TestLoadQueuePeon historicalTier2Peon;
+ private TestLoadQueuePeon brokerPeon;
+ private TestLoadQueuePeon indexerPeon;
private DataSegment segment1;
private DataSegment segment2;
private List<DataSegment> segments;
@@ -143,10 +143,10 @@ public class UnloadUnusedSegmentsTest
segmentsForRealtime.add(realtimeOnlySegment);
segmentsForRealtime.add(broadcastSegment);
- historicalPeon = new LoadQueuePeonTester();
- historicalTier2Peon = new LoadQueuePeonTester();
- brokerPeon = new LoadQueuePeonTester();
- indexerPeon = new LoadQueuePeonTester();
+ historicalPeon = new TestLoadQueuePeon();
+ historicalTier2Peon = new TestLoadQueuePeon();
+ brokerPeon = new TestLoadQueuePeon();
+ indexerPeon = new TestLoadQueuePeon();
final ImmutableDruidDataSource dataSource1 = new ImmutableDruidDataSource(
"datasource1",
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/loading/LoadQueuePeonTester.java
b/server/src/test/java/org/apache/druid/server/coordinator/loading/LoadQueuePeonTester.java
deleted file mode 100644
index e1a22aaa07..0000000000
---
a/server/src/test/java/org/apache/druid/server/coordinator/loading/LoadQueuePeonTester.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.server.coordinator.loading;
-
-import org.apache.druid.java.util.common.concurrent.Execs;
-import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig;
-import org.apache.druid.timeline.DataSegment;
-import org.joda.time.Duration;
-
-import javax.annotation.Nullable;
-import java.util.concurrent.ConcurrentSkipListSet;
-
-public class LoadQueuePeonTester extends CuratorLoadQueuePeon
-{
- private final ConcurrentSkipListSet<DataSegment> segmentsToLoad = new
ConcurrentSkipListSet<>();
-
- public LoadQueuePeonTester()
- {
- super(
- null,
- null,
- null,
- Execs.scheduledSingleThreaded("LoadQueuePeonTester--%d"),
- null,
- new TestDruidCoordinatorConfig.Builder()
- .withLoadTimeoutDelay(new Duration(1))
- .withCoordinatorKillMaxSegments(10)
- .withCoordinatorKillIgnoreDurationToRetain(false)
- .build()
- );
- }
-
- @Override
- public void loadSegment(DataSegment segment, SegmentAction action, @Nullable
LoadPeonCallback callback)
- {
- segmentsToLoad.add(segment);
- }
-
- @Override
- public ConcurrentSkipListSet<DataSegment> getSegmentsToLoad()
- {
- return segmentsToLoad;
- }
-}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/loading/TestLoadQueuePeon.java
b/server/src/test/java/org/apache/druid/server/coordinator/loading/TestLoadQueuePeon.java
new file mode 100644
index 0000000000..c496c37e3e
--- /dev/null
+++
b/server/src/test/java/org/apache/druid/server/coordinator/loading/TestLoadQueuePeon.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.loading;
+
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.timeline.DataSegment;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+/**
+ * Test implementation of {@link LoadQueuePeon} that maintains a set of
segments
+ * that have been queued for load or drop.
+ */
+public class TestLoadQueuePeon implements LoadQueuePeon
+{
+ private final ConcurrentSkipListSet<DataSegment> segmentsToLoad = new
ConcurrentSkipListSet<>();
+ private final ConcurrentSkipListSet<DataSegment> segmentsToDrop = new
ConcurrentSkipListSet<>();
+
+ private final CoordinatorRunStats stats = new CoordinatorRunStats();
+
+ @Override
+ public void loadSegment(DataSegment segment, SegmentAction action, @Nullable
LoadPeonCallback callback)
+ {
+ segmentsToLoad.add(segment);
+ }
+
+ @Override
+ public void dropSegment(DataSegment segment, LoadPeonCallback callback)
+ {
+ segmentsToDrop.add(segment);
+ }
+
+ @Override
+ public long getSizeOfSegmentsToLoad()
+ {
+ return 0;
+ }
+
+ @Override
+ public CoordinatorRunStats getAndResetStats()
+ {
+ return stats;
+ }
+
+ @Override
+ public boolean cancelOperation(DataSegment segment)
+ {
+ return false;
+ }
+
+ @Override
+ public void start()
+ {
+
+ }
+
+ @Override
+ public void stop()
+ {
+
+ }
+
+ @Override
+ public ConcurrentSkipListSet<DataSegment> getSegmentsToLoad()
+ {
+ return segmentsToLoad;
+ }
+
+ @Override
+ public Set<SegmentHolder> getSegmentsInQueue()
+ {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set<DataSegment> getSegmentsToDrop()
+ {
+ return segmentsToDrop;
+ }
+
+ @Override
+ public Set<DataSegment> getTimedOutSegments()
+ {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public void markSegmentToDrop(DataSegment segmentToLoad)
+ {
+
+ }
+
+ @Override
+ public void unmarkSegmentToDrop(DataSegment segmentToLoad)
+ {
+
+ }
+
+ @Override
+ public Set<DataSegment> getSegmentsMarkedToDrop()
+ {
+ return Collections.emptySet();
+ }
+}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java
index acf815089b..e21d823f34 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java
@@ -27,9 +27,9 @@ import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.balancer.RandomBalancerStrategy;
-import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
import org.apache.druid.server.coordinator.loading.StrategicSegmentAssigner;
+import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
@@ -120,7 +120,7 @@ public class BroadcastDistributionRuleTest
0
).addDataSegment(smallSegment)
.toImmutableDruidServer(),
- new LoadQueuePeonTester()
+ new TestLoadQueuePeon()
);
holdersOfLargeSegments.add(
@@ -135,7 +135,7 @@ public class BroadcastDistributionRuleTest
0
).addDataSegment(largeSegments.get(0))
.toImmutableDruidServer(),
- new LoadQueuePeonTester()
+ new TestLoadQueuePeon()
)
);
holdersOfLargeSegments.add(
@@ -150,7 +150,7 @@ public class BroadcastDistributionRuleTest
0
).addDataSegment(largeSegments.get(1))
.toImmutableDruidServer(),
- new LoadQueuePeonTester()
+ new TestLoadQueuePeon()
)
);
holdersOfLargeSegments.add(
@@ -165,7 +165,7 @@ public class BroadcastDistributionRuleTest
0
).addDataSegment(largeSegments.get(2))
.toImmutableDruidServer(),
- new LoadQueuePeonTester()
+ new TestLoadQueuePeon()
)
);
@@ -181,7 +181,7 @@ public class BroadcastDistributionRuleTest
0
).addDataSegment(largeSegments2.get(0))
.toImmutableDruidServer(),
- new LoadQueuePeonTester()
+ new TestLoadQueuePeon()
)
);
holdersOfLargeSegments2.add(
@@ -196,7 +196,7 @@ public class BroadcastDistributionRuleTest
0
).addDataSegment(largeSegments2.get(1))
.toImmutableDruidServer(),
- new LoadQueuePeonTester()
+ new TestLoadQueuePeon()
)
);
@@ -211,7 +211,7 @@ public class BroadcastDistributionRuleTest
0
).addDataSegment(largeSegments.get(0))
.toImmutableDruidServer(),
- new LoadQueuePeonTester()
+ new TestLoadQueuePeon()
);
decommissioningServer1 = new ServerHolder(
@@ -225,7 +225,7 @@ public class BroadcastDistributionRuleTest
0
).addDataSegment(smallSegment)
.toImmutableDruidServer(),
- new LoadQueuePeonTester(),
+ new TestLoadQueuePeon(),
true
);
@@ -240,7 +240,7 @@ public class BroadcastDistributionRuleTest
0
).addDataSegment(largeSegments.get(1))
.toImmutableDruidServer(),
- new LoadQueuePeonTester(),
+ new TestLoadQueuePeon(),
true
);
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java
index c7e3b5b239..7ef07719be 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java
@@ -24,7 +24,6 @@ import
com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.client.DruidServer;
-import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
@@ -38,18 +37,14 @@ import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.balancer.BalancerStrategy;
import
org.apache.druid.server.coordinator.balancer.CachingCostBalancerStrategy;
import org.apache.druid.server.coordinator.balancer.ClusterCostCache;
-import
org.apache.druid.server.coordinator.balancer.CostBalancerStrategyFactory;
-import org.apache.druid.server.coordinator.loading.LoadQueuePeon;
-import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
-import org.apache.druid.server.coordinator.loading.SegmentAction;
-import org.apache.druid.server.coordinator.loading.SegmentHolder;
+import org.apache.druid.server.coordinator.balancer.CostBalancerStrategy;
import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
import org.apache.druid.server.coordinator.loading.StrategicSegmentAssigner;
+import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
-import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -62,7 +57,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -96,7 +90,7 @@ public class LoadRuleTest
public void setUp()
{
exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1,
"LoadRuleTest-%d"));
- balancerStrategy = new
CostBalancerStrategyFactory().createBalancerStrategy(exec);
+ balancerStrategy = new CostBalancerStrategy(exec);
loadQueueManager = new SegmentLoadQueueManager(null, null, null);
}
@@ -107,17 +101,15 @@ public class LoadRuleTest
}
@Test
- public void testLoad()
+ public void testLoadRuleAssignsSegments()
{
- final LoadQueuePeon mockPeon = createEmptyPeon();
- mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.anyObject());
- EasyMock.expectLastCall().atLeastOnce();
- EasyMock.replay(mockPeon);
-
+ // Cluster has 2 tiers with 1 server each
+ final ServerHolder server1 = createServer(Tier.T1);
+ final ServerHolder server2 = createServer(Tier.T2);
DruidCluster druidCluster = DruidCluster
.builder()
- .addTier(Tier.T1, createServerHolder(Tier.T1, mockPeon, false))
- .addTier(Tier.T2, createServerHolder(Tier.T2, mockPeon, false))
+ .addTier(Tier.T1, server1)
+ .addTier(Tier.T2, server2)
.build();
final DataSegment segment = createDataSegment(DS_WIKI);
@@ -126,8 +118,6 @@ public class LoadRuleTest
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED,
Tier.T1, DS_WIKI));
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED,
Tier.T2, DS_WIKI));
-
- EasyMock.verify(mockPeon);
}
private CoordinatorRunStats runRuleAndGetStats(LoadRule rule, DataSegment
segment, DruidCluster cluster)
@@ -169,99 +159,56 @@ public class LoadRuleTest
@Test
public void testLoadPrimaryAssignDoesNotOverAssign()
{
- final LoadQueuePeon mockPeon = createEmptyPeon();
- mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.anyObject());
- EasyMock.expectLastCall().atLeastOnce();
- EasyMock.replay(mockPeon);
-
- ImmutableDruidServer server1 =
createServer(Tier.T1).toImmutableDruidServer();
- ImmutableDruidServer server2 =
createServer(Tier.T1).toImmutableDruidServer();
+ ServerHolder server1 = createServer(Tier.T1);
+ ServerHolder server2 = createServer(Tier.T1);
DruidCluster druidCluster = DruidCluster
.builder()
- .addTier(Tier.T1, new ServerHolder(server1, mockPeon), new
ServerHolder(server2, mockPeon))
+ .addTier(Tier.T1, server1, server2)
.build();
final LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1));
final DataSegment segment = createDataSegment(DS_WIKI);
- CoordinatorRunStats stats = runRuleAndGetStats(rule, segment,
druidCluster);
- Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED,
Tier.T1, segment.getDataSource()));
-
- // ensure multiple runs don't assign primary segment again if at
replication count
- final LoadQueuePeon loadingPeon = createLoadingPeon(segment, false);
- EasyMock.replay(loadingPeon);
-
- DruidCluster afterLoad = DruidCluster
- .builder()
- .addTier(Tier.T1, new ServerHolder(server1, loadingPeon), new
ServerHolder(server2, mockPeon))
- .build();
-
- CoordinatorRunStats statsAfterLoadPrimary = runRuleAndGetStats(rule,
segment, afterLoad);
-
- Assert.assertFalse(statsAfterLoadPrimary.hasStat(Stats.Segments.ASSIGNED));
-
- EasyMock.verify(mockPeon);
+ CoordinatorRunStats firstRunStats = runRuleAndGetStats(rule, segment,
druidCluster);
+ Assert.assertEquals(1L,
firstRunStats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1,
segment.getDataSource()));
+ Assert.assertEquals(1, server1.getLoadingSegments().size() +
server2.getLoadingSegments().size());
+
+ // Verify that multiple runs don't assign primary segment again if at
replication count
+ CoordinatorRunStats secondRunStats = runRuleAndGetStats(rule, segment,
druidCluster);
+ Assert.assertFalse(secondRunStats.hasStat(Stats.Segments.ASSIGNED));
+ Assert.assertEquals(1, server1.getLoadingSegments().size() +
server2.getLoadingSegments().size());
}
@Test
@Ignore("Enable this test when timeout behaviour is fixed")
public void testOverAssignForTimedOutSegments()
{
- final LoadQueuePeon emptyPeon = createEmptyPeon();
- emptyPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.anyObject());
- EasyMock.expectLastCall().atLeastOnce();
- EasyMock.replay(emptyPeon);
-
- ImmutableDruidServer server1 =
createServer(Tier.T1).toImmutableDruidServer();
- ImmutableDruidServer server2 =
createServer(Tier.T1).toImmutableDruidServer();
+ ServerHolder server1 = createServer(Tier.T1);
+ ServerHolder server2 = createServer(Tier.T1);
DruidCluster druidCluster = DruidCluster
.builder()
- .addTier(Tier.T1, new ServerHolder(server1, emptyPeon), new
ServerHolder(server2, emptyPeon))
+ .addTier(Tier.T1, server1, server2)
.build();
final LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1));
final DataSegment segment = createDataSegment(DS_WIKI);
- CoordinatorRunStats stats = runRuleAndGetStats(
- rule,
- segment,
- makeCoordinatorRuntimeParams(druidCluster, segment)
- );
+ CoordinatorRunStats stats = runRuleAndGetStats(rule, segment,
druidCluster);
// Ensure that the segment is assigned to one of the historicals
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED,
Tier.T1, segment.getDataSource()));
// Ensure that the primary segment is assigned again in case the peon
timed out on loading the segment
- final LoadQueuePeon slowLoadingPeon = createLoadingPeon(segment, true);
- EasyMock.replay(slowLoadingPeon);
-
- DruidCluster withLoadTimeout = DruidCluster
- .builder()
- .addTier(Tier.T1, new ServerHolder(server1, slowLoadingPeon), new
ServerHolder(server2, emptyPeon))
- .build();
-
- CoordinatorRunStats statsAfterLoadPrimary = runRuleAndGetStats(
- rule,
- segment,
- makeCoordinatorRuntimeParams(withLoadTimeout, segment)
- );
-
+ CoordinatorRunStats statsAfterLoadPrimary = runRuleAndGetStats(rule,
segment, druidCluster);
Assert.assertEquals(1L,
statsAfterLoadPrimary.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1,
DS_WIKI));
-
- EasyMock.verify(emptyPeon);
}
@Test
public void testSkipReplicationForTimedOutSegments()
{
- final LoadQueuePeon emptyPeon = createEmptyPeon();
- emptyPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.anyObject());
- EasyMock.expectLastCall().atLeastOnce();
- EasyMock.replay(emptyPeon);
-
- ImmutableDruidServer server1 =
createServer(Tier.T1).toImmutableDruidServer();
- ImmutableDruidServer server2 =
createServer(Tier.T1).toImmutableDruidServer();
+ ServerHolder server1 = createServer(Tier.T1);
+ ServerHolder server2 = createServer(Tier.T1);
DruidCluster druidCluster = DruidCluster
.builder()
- .addTier(Tier.T1, new ServerHolder(server1, emptyPeon), new
ServerHolder(server2, emptyPeon))
+ .addTier(Tier.T1, server1, server2)
.build();
final LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1));
@@ -272,20 +219,10 @@ public class LoadRuleTest
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED,
Tier.T1, segment.getDataSource()));
// Add the segment to the timed out list to simulate peon timeout on
loading the segment
- final LoadQueuePeon slowLoadingPeon = createLoadingPeon(segment, true);
- EasyMock.replay(slowLoadingPeon);
-
- DruidCluster withLoadTimeout = DruidCluster
- .builder()
- .addTier(Tier.T1, new ServerHolder(server1, slowLoadingPeon), new
ServerHolder(server2, emptyPeon))
- .build();
-
// Default behavior is to not replicate the timed out segments on other
servers
- CoordinatorRunStats statsAfterLoadPrimary = runRuleAndGetStats(rule,
segment, withLoadTimeout);
+ CoordinatorRunStats statsAfterLoadPrimary = runRuleAndGetStats(rule,
segment, druidCluster);
Assert.assertFalse(statsAfterLoadPrimary.hasStat(Stats.Segments.ASSIGNED));
-
- EasyMock.verify(emptyPeon);
}
@Test
@@ -297,14 +234,10 @@ public class LoadRuleTest
.withNumPartitions(2)
.eachOfSizeInMb(100);
- final LoadQueuePeon loadingPeon = createLoadingPeon(segments.get(0), true);
- loadingPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.anyObject());
- EasyMock.expectLastCall().once();
- EasyMock.replay(loadingPeon);
-
+ final ServerHolder server1 = createServer(Tier.T1);
DruidCluster druidCluster = DruidCluster
.builder()
- .addTier(Tier.T1, createServerHolder(Tier.T1, loadingPeon, false))
+ .addTier(Tier.T1, server1)
.build();
balancerStrategy = new
CachingCostBalancerStrategy(ClusterCostCache.builder().build(), exec);
@@ -316,104 +249,71 @@ public class LoadRuleTest
makeCoordinatorRuntimeParams(druidCluster, segments.toArray(new
DataSegment[0]))
);
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED,
Tier.T1, DS_WIKI));
-
- EasyMock.verify(loadingPeon);
}
@Test
- public void testDrop()
+ public void testSegmentsAreDroppedIfLoadRuleHasZeroReplicas()
{
- final LoadQueuePeon mockPeon = createEmptyPeon();
- mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject());
- EasyMock.expectLastCall().atLeastOnce();
- EasyMock.replay(mockPeon);
-
final DataSegment segment = createDataSegment(DS_WIKI);
- DruidServer server1 = createServer(Tier.T1);
- server1.addDataSegment(segment);
- DruidServer server2 = createServer(Tier.T2);
- server2.addDataSegment(segment);
- DruidServer server3 = createServer(Tier.T2);
+ final ServerHolder serverT11 = createServer(Tier.T1, segment);
+ final ServerHolder serverT12 = createServer(Tier.T2, segment);
+ final ServerHolder serverT21 = createServer(Tier.T2, segment);
DruidCluster druidCluster = DruidCluster
.builder()
- .addTier(Tier.T1, new ServerHolder(server1.toImmutableDruidServer(),
mockPeon))
- .addTier(
- Tier.T2,
- new ServerHolder(server2.toImmutableDruidServer(), mockPeon),
- new ServerHolder(server3.toImmutableDruidServer(), mockPeon)
- )
+ .addTier(Tier.T1, serverT11)
+ .addTier(Tier.T2, serverT12, serverT21)
.build();
LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 0, Tier.T2, 0));
CoordinatorRunStats stats = runRuleAndGetStats(rule, segment,
druidCluster);
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.DROPPED,
Tier.T1, DS_WIKI));
- Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.DROPPED,
Tier.T2, DS_WIKI));
-
- EasyMock.verify(mockPeon);
+ Assert.assertEquals(2L, stats.getSegmentStat(Stats.Segments.DROPPED,
Tier.T2, DS_WIKI));
}
@Test
- public void testLoadWithNonExistentTier()
+ public void testLoadIgnoresInvalidTiers()
{
- final LoadQueuePeon mockPeon = createEmptyPeon();
- mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.anyObject());
- EasyMock.expectLastCall().atLeastOnce();
- EasyMock.replay(mockPeon);
-
+ ServerHolder server = createServer(Tier.T1);
DruidCluster druidCluster = DruidCluster
.builder()
- .addTier(Tier.T1, createServerHolder(Tier.T1, mockPeon, false))
+ .addTier(Tier.T1, server)
.build();
final DataSegment segment = createDataSegment(DS_WIKI);
- LoadRule rule = loadForever(ImmutableMap.of("nonExistentTier", 1, Tier.T1,
1));
+ LoadRule rule = loadForever(ImmutableMap.of("invalidTier", 1, Tier.T1, 1));
CoordinatorRunStats stats = runRuleAndGetStats(rule, segment,
druidCluster);
-
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED,
Tier.T1, DS_WIKI));
-
- EasyMock.verify(mockPeon);
+ Assert.assertEquals(0L, stats.getSegmentStat(Stats.Segments.ASSIGNED,
"invalidTier", DS_WIKI));
}
@Test
- public void testDropWithNonExistentTier()
+ public void testDropIgnoresInvalidTiers()
{
- final LoadQueuePeon mockPeon = createEmptyPeon();
- mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject());
- EasyMock.expectLastCall().atLeastOnce();
- EasyMock.replay(mockPeon);
-
final DataSegment segment = createDataSegment(DS_WIKI);
- DruidServer server1 = createServer(Tier.T1);
- DruidServer server2 = createServer(Tier.T1);
- server1.addDataSegment(segment);
- server2.addDataSegment(segment);
-
+ // Cluster has 1 tier with 2 servers
+ ServerHolder server1 = createServer(Tier.T1, segment);
+ ServerHolder server2 = createServer(Tier.T1, segment);
DruidCluster druidCluster = DruidCluster
.builder()
- .addTier(
- Tier.T1,
- new ServerHolder(server1.toImmutableDruidServer(), mockPeon),
- new ServerHolder(server2.toImmutableDruidServer(), mockPeon)
- )
+ .addTier(Tier.T1, server1, server2)
.build();
- LoadRule rule = loadForever(ImmutableMap.of("nonExistentTier", 1, Tier.T1,
1));
+ LoadRule rule = loadForever(ImmutableMap.of("invalidTier", 1, Tier.T1, 1));
CoordinatorRunStats stats = runRuleAndGetStats(rule, segment,
druidCluster);
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.DROPPED,
Tier.T1, DS_WIKI));
-
- EasyMock.verify(mockPeon);
+ Assert.assertEquals(0L, stats.getSegmentStat(Stats.Segments.DROPPED,
"invalidTier", DS_WIKI));
}
@Test
public void testMaxLoadingQueueSize()
{
- final LoadQueuePeonTester peon = new LoadQueuePeonTester();
+ final TestLoadQueuePeon peon = new TestLoadQueuePeon();
final int maxSegmentsInQueue = 2;
DruidCluster druidCluster = DruidCluster
@@ -421,7 +321,7 @@ public class LoadRuleTest
.addTier(
Tier.T1,
new ServerHolder(
- createServer(Tier.T1).toImmutableDruidServer(),
+ createDruidServer(Tier.T1).toImmutableDruidServer(),
peon, false, maxSegmentsInQueue, 10
)
)
@@ -456,63 +356,55 @@ public class LoadRuleTest
Assert.assertEquals(0L, stats3.getSegmentStat(Stats.Segments.ASSIGNED,
Tier.T1, dataSegment3.getDataSource()));
}
- /**
- * 2 servers in different tiers, the first is decommissioning.
- * Should not load a segment to the server that is decommissioning
- */
@Test
- public void testLoadDecommissioning()
+ public void testSegmentIsAssignedOnlyToActiveServer()
{
- final LoadQueuePeon mockPeon1 = createEmptyPeon();
- final LoadQueuePeon mockPeon2 = createOneCallPeonMock();
- EasyMock.replay(mockPeon1, mockPeon2);
+ final ServerHolder decommServerT1 = createDecommissioningServer(Tier.T1);
+ final ServerHolder serverT2 = createServer(Tier.T2);
DruidCluster druidCluster = DruidCluster
.builder()
- .addTier(Tier.T1, createServerHolder(Tier.T1, mockPeon1, true))
- .addTier(Tier.T2, createServerHolder(Tier.T2, mockPeon2, false))
+ .addTier(Tier.T1, decommServerT1)
+ .addTier(Tier.T2, serverT2)
.build();
+ // Load rule requires 1 replica on each tier
LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1, Tier.T2, 1));
DataSegment segment = createDataSegment(DS_WIKI);
CoordinatorRunStats stats = runRuleAndGetStats(rule, segment,
druidCluster);
+ // Verify that segment is not loaded on decommissioning server
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED,
Tier.T2, DS_WIKI));
- EasyMock.verify(mockPeon1, mockPeon2);
+ Assert.assertEquals(0, decommServerT1.getLoadingSegments().size());
+ Assert.assertTrue(serverT2.getLoadingSegments().contains(segment));
}
- /**
- * 2 tiers, 2 servers each, 1 server of the second tier is decommissioning.
- * Should not load a segment to the server that is decommssioning.
- */
@Test
- public void testLoadReplicaDuringDecommissioning()
+ public void testSegmentIsAssignedOnlyToActiveServers()
{
- final LoadQueuePeon mockPeon1 = createEmptyPeon();
- final LoadQueuePeon mockPeon2 = createOneCallPeonMock();
- final LoadQueuePeon mockPeon3 = createOneCallPeonMock();
- final LoadQueuePeon mockPeon4 = createOneCallPeonMock();
- EasyMock.replay(mockPeon1, mockPeon2, mockPeon3, mockPeon4);
-
- ServerHolder holder1 = createServerHolder(Tier.T1, mockPeon1, true);
- ServerHolder holder2 = createServerHolder(Tier.T1, mockPeon2, false);
- ServerHolder holder3 = createServerHolder(Tier.T2, mockPeon3, false);
- ServerHolder holder4 = createServerHolder(Tier.T2, mockPeon4, false);
+ // 2 tiers with 2 servers each, 1 server is decommissioning
+ ServerHolder decommServerT11 = createDecommissioningServer(Tier.T1);
+ ServerHolder serverT12 = createServer(Tier.T1);
+ ServerHolder serverT21 = createServer(Tier.T2);
+ ServerHolder serverT22 = createServer(Tier.T2);
final DataSegment segment = createDataSegment(DS_WIKI);
DruidCluster druidCluster = DruidCluster
.builder()
- .addTier(Tier.T1, holder1, holder2)
- .addTier(Tier.T2, holder3, holder4)
+ .addTier(Tier.T1, decommServerT11, serverT12)
+ .addTier(Tier.T2, serverT21, serverT22)
.build();
+ // Load rule requires 2 replicas on each server
LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 2, Tier.T2, 2));
CoordinatorRunStats stats = runRuleAndGetStats(rule, segment,
druidCluster);
+ // Verify that no replica is assigned to decommissioning server
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED,
Tier.T1, DS_WIKI));
- Assert.assertEquals(2L, stats.getSegmentStat(Stats.Segments.ASSIGNED,
Tier.T2, DS_WIKI));
+ Assert.assertTrue(decommServerT11.getLoadingSegments().isEmpty());
+ Assert.assertEquals(0, decommServerT11.getLoadingSegments().size());
- EasyMock.verify(mockPeon1, mockPeon2, mockPeon3, mockPeon4);
+ Assert.assertEquals(2L, stats.getSegmentStat(Stats.Segments.ASSIGNED,
Tier.T2, DS_WIKI));
}
/**
@@ -522,26 +414,15 @@ public class LoadRuleTest
@Test
public void testDropDuringDecommissioning()
{
- final LoadQueuePeon mockPeon = createEmptyPeon();
- mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject());
- EasyMock.expectLastCall().times(2);
- EasyMock.replay(mockPeon);
-
final DataSegment segment1 = createDataSegment("foo1");
final DataSegment segment2 = createDataSegment("foo2");
- DruidServer server1 = createServer(Tier.T1);
- server1.addDataSegment(segment1);
- DruidServer server2 = createServer(Tier.T1);
- server2.addDataSegment(segment2);
+ final ServerHolder server1 = createDecommissioningServer(Tier.T1,
segment1);
+ final ServerHolder server2 = createServer(Tier.T1, segment2);
DruidCluster druidCluster = DruidCluster
.builder()
- .addTier(
- Tier.T1,
- new ServerHolder(server1.toImmutableDruidServer(), mockPeon, true),
- new ServerHolder(server2.toImmutableDruidServer(), mockPeon, false)
- )
+ .addTier(Tier.T1, server1, server2)
.build();
DruidCoordinatorRuntimeParams params =
makeCoordinatorRuntimeParams(druidCluster, segment1, segment2);
@@ -549,45 +430,29 @@ public class LoadRuleTest
CoordinatorRunStats stats = runRuleAndGetStats(rule, segment1, params);
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.DROPPED,
Tier.T1, segment1.getDataSource()));
+
Assert.assertTrue(server1.getPeon().getSegmentsToDrop().contains(segment1));
stats = runRuleAndGetStats(rule, segment2, params);
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.DROPPED,
Tier.T1, segment2.getDataSource()));
-
- EasyMock.verify(mockPeon);
+
Assert.assertTrue(server2.getPeon().getSegmentsToDrop().contains(segment2));
}
- /**
- * 3 servers hosting 3 replicas of the segment.
- * 1 servers is decommissioning.
- * 1 replica is redundant.
- * Should drop from the decommissioning server.
- */
@Test
- public void testRedundantReplicaDropDuringDecommissioning()
+ public void testExtraReplicasAreDroppedFromDecommissioningServer()
{
- final LoadQueuePeon mockPeon1 = new LoadQueuePeonTester();
- final LoadQueuePeon mockPeon2 = new LoadQueuePeonTester();
- final LoadQueuePeon mockPeon3 = new LoadQueuePeonTester();
-
final DataSegment segment1 = createDataSegment(DS_WIKI);
- DruidServer server1 = createServer(Tier.T1);
- server1.addDataSegment(segment1);
- DruidServer server2 = createServer(Tier.T1);
- server2.addDataSegment(segment1);
- DruidServer server3 = createServer(Tier.T1);
- server3.addDataSegment(segment1);
+ // 3 servers, each serving the same segment
+ final ServerHolder server1 = createServer(Tier.T1, segment1);
+ final ServerHolder server2 = createDecommissioningServer(Tier.T1,
segment1);
+ final ServerHolder server3 = createServer(Tier.T1, segment1);
DruidCluster druidCluster = DruidCluster
.builder()
- .addTier(
- Tier.T1,
- new ServerHolder(server1.toImmutableDruidServer(), mockPeon1,
false),
- new ServerHolder(server2.toImmutableDruidServer(), mockPeon2,
true),
- new ServerHolder(server3.toImmutableDruidServer(), mockPeon3,
false)
- )
+ .addTier(Tier.T1, server1, server2, server3)
.build();
+ // Load rule requires 2 replicas
LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 2));
CoordinatorRunStats stats = runRuleAndGetStats(
rule,
@@ -595,10 +460,11 @@ public class LoadRuleTest
makeCoordinatorRuntimeParams(druidCluster, segment1)
);
+ // Verify that the extra replica is dropped from the decommissioning server
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.DROPPED,
Tier.T1, DS_WIKI));
- Assert.assertEquals(0, mockPeon1.getSegmentsToDrop().size());
- Assert.assertEquals(1, mockPeon2.getSegmentsToDrop().size());
- Assert.assertEquals(0, mockPeon3.getSegmentsToDrop().size());
+ Assert.assertEquals(0, server1.getPeon().getSegmentsToDrop().size());
+ Assert.assertEquals(1, server2.getPeon().getSegmentsToDrop().size());
+ Assert.assertEquals(0, server3.getPeon().getSegmentsToDrop().size());
}
private DataSegment createDataSegment(String dataSource)
@@ -621,54 +487,36 @@ public class LoadRuleTest
return new ForeverLoadRule(tieredReplicants, null);
}
- private static LoadQueuePeon createEmptyPeon()
- {
- final LoadQueuePeon mockPeon = EasyMock.createMock(LoadQueuePeon.class);
-
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Collections.emptySet()).anyTimes();
-
EasyMock.expect(mockPeon.getSegmentsInQueue()).andReturn(Collections.emptySet()).anyTimes();
-
EasyMock.expect(mockPeon.getSegmentsMarkedToDrop()).andReturn(Collections.emptySet()).anyTimes();
-
EasyMock.expect(mockPeon.getSizeOfSegmentsToLoad()).andReturn(0L).anyTimes();
-
- return mockPeon;
- }
-
- private static LoadQueuePeon createLoadingPeon(DataSegment segment, boolean
slowLoading)
- {
- final Set<DataSegment> segs = Collections.singleton(segment);
-
- final LoadQueuePeon mockPeon = EasyMock.createMock(LoadQueuePeon.class);
- EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(segs).anyTimes();
-
EasyMock.expect(mockPeon.getSegmentsMarkedToDrop()).andReturn(Collections.emptySet()).anyTimes();
-
EasyMock.expect(mockPeon.getSegmentsToDrop()).andReturn(Collections.emptySet()).anyTimes();
- EasyMock.expect(mockPeon.getSegmentsInQueue())
- .andReturn(Collections.singleton(new SegmentHolder(segment,
SegmentAction.LOAD, null))).anyTimes();
-
- EasyMock.expect(mockPeon.getTimedOutSegments())
- .andReturn(slowLoading ? segs : Collections.emptySet()).anyTimes();
-
- return mockPeon;
- }
-
- private DruidServer createServer(String tier)
+ private DruidServer createDruidServer(String tier)
{
final String serverName = "hist_" + tier + "_" +
serverId.incrementAndGet();
return new DruidServer(serverName, serverName, null, 10L << 30,
ServerType.HISTORICAL, tier, 0);
}
- private static LoadQueuePeon createOneCallPeonMock()
+ private ServerHolder createServer(String tier, DataSegment... segments)
{
- final LoadQueuePeon mockPeon2 = createEmptyPeon();
- mockPeon2.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.anyObject());
- EasyMock.expectLastCall().once();
- return mockPeon2;
+ final DruidServer server = createDruidServer(tier);
+ for (DataSegment segment : segments) {
+ server.addDataSegment(segment);
+ }
+
+ return new ServerHolder(
+ server.toImmutableDruidServer(),
+ new TestLoadQueuePeon()
+ );
}
- private ServerHolder createServerHolder(String tier, LoadQueuePeon
loadQueuePeon, boolean isDecommissioning)
+ private ServerHolder createDecommissioningServer(String tier, DataSegment...
segments)
{
+ final DruidServer server = createDruidServer(tier);
+ for (DataSegment segment : segments) {
+ server.addDataSegment(segment);
+ }
+
return new ServerHolder(
- createServer(tier).toImmutableDruidServer(),
- loadQueuePeon,
- isDecommissioning
+ server.toImmutableDruidServer(),
+ new TestLoadQueuePeon(),
+ true
);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]