This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 22290fd632 Test: Simplify test impl of LoadQueuePeon (#14684)
22290fd632 is described below

commit 22290fd632c5026f2341c023affec4ccfa446721
Author: Kashif Faraz <[email protected]>
AuthorDate: Fri Jul 28 16:14:23 2023 +0530

    Test: Simplify test impl of LoadQueuePeon (#14684)
    
    Changes
    - Rename `LoadQueuePeonTester` to `TestLoadQueuePeon`
    - Simplify `TestLoadQueuePeon` by removing dependency on 
`CuratorLoadQueuePeon`
    - Remove usages of mock peons in `LoadRuleTest` and use `TestLoadQueuePeon` 
instead
---
 .../coordinator/BalancerStrategyBenchmark.java     |   4 +-
 .../coordinator/BalanceSegmentsProfiler.java       |   8 +-
 .../druid/server/coordinator/DruidClusterTest.java |  10 +-
 .../coordinator/RoundRobinServerSelectorTest.java  |   4 +-
 .../druid/server/coordinator/ServerHolderTest.java |  22 +-
 .../coordinator/balancer/BalancerStrategyTest.java |  10 +-
 .../balancer/CachingCostBalancerStrategyTest.java  |   4 +-
 .../balancer/CostBalancerStrategyTest.java         |  10 +-
 .../DiskNormalizedCostBalancerStrategyTest.java    |   6 +-
 .../balancer/ReservoirSegmentSamplerTest.java      |   6 +-
 .../coordinator/duty/BalanceSegmentsTest.java      |   4 +-
 .../coordinator/duty/UnloadUnusedSegmentsTest.java |  18 +-
 .../coordinator/loading/LoadQueuePeonTester.java   |  61 ----
 .../coordinator/loading/TestLoadQueuePeon.java     | 124 +++++++
 .../rules/BroadcastDistributionRuleTest.java       |  20 +-
 .../server/coordinator/rules/LoadRuleTest.java     | 372 ++++++---------------
 16 files changed, 297 insertions(+), 386 deletions(-)

diff --git 
a/benchmarks/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyBenchmark.java
 
b/benchmarks/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyBenchmark.java
index 2afd99c8c3..e51cc7da3a 100644
--- 
a/benchmarks/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyBenchmark.java
+++ 
b/benchmarks/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyBenchmark.java
@@ -27,7 +27,7 @@ import 
org.apache.druid.server.coordination.DruidServerMetadata;
 import org.apache.druid.server.coordination.ServerType;
 import org.apache.druid.server.coordinator.balancer.BalancerSegmentHolder;
 import org.apache.druid.server.coordinator.balancer.ReservoirSegmentSampler;
-import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
+import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
 import org.apache.druid.timeline.DataSegment;
 import org.joda.time.Interval;
 import org.openjdk.jmh.annotations.Benchmark;
@@ -105,7 +105,7 @@ public class BalancerStrategyBenchmark
                   ImmutableMap.of("test", new ImmutableDruidDataSource("test", 
Collections.emptyMap(), segments)),
                   segments.size()
               ),
-              new LoadQueuePeonTester()
+              new TestLoadQueuePeon()
           )
       );
     }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java
index 4a45e16bf9..6a76a865d8 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java
@@ -31,8 +31,8 @@ import org.apache.druid.metadata.MetadataRuleManager;
 import org.apache.druid.server.coordinator.duty.BalanceSegments;
 import org.apache.druid.server.coordinator.duty.RunRules;
 import org.apache.druid.server.coordinator.loading.LoadQueuePeon;
-import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
 import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
+import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
 import org.apache.druid.server.coordinator.rules.PeriodLoadRule;
 import org.apache.druid.server.coordinator.rules.Rule;
 import org.apache.druid.timeline.DataSegment;
@@ -117,7 +117,7 @@ public class BalanceSegmentsProfiler
       
EasyMock.expect(server.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes();
       EasyMock.replay(server);
 
-      LoadQueuePeon peon = new LoadQueuePeonTester();
+      LoadQueuePeon peon = new TestLoadQueuePeon();
       serverHolderList.add(new ServerHolder(server, peon));
     }
 
@@ -154,8 +154,8 @@ public class BalanceSegmentsProfiler
   public void profileRun()
   {
     Stopwatch watch = Stopwatch.createUnstarted();
-    LoadQueuePeonTester fromPeon = new LoadQueuePeonTester();
-    LoadQueuePeonTester toPeon = new LoadQueuePeonTester();
+    TestLoadQueuePeon fromPeon = new TestLoadQueuePeon();
+    TestLoadQueuePeon toPeon = new TestLoadQueuePeon();
 
     EasyMock.expect(druidServer1.getName()).andReturn("from").atLeastOnce();
     EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce();
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/DruidClusterTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/DruidClusterTest.java
index 182d58dd38..17a4de1d73 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/DruidClusterTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/DruidClusterTest.java
@@ -22,7 +22,7 @@ package org.apache.druid.server.coordinator;
 import org.apache.druid.client.DruidServer;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.server.coordination.ServerType;
-import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
+import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
 import org.apache.druid.timeline.DataSegment;
 import org.junit.Assert;
 import org.junit.Before;
@@ -47,13 +47,13 @@ public class DruidClusterTest
   private static final ServerHolder NEW_REALTIME = new ServerHolder(
       new DruidServer("name1", "host2", null, 100L, ServerType.REALTIME, 
"tier1", 0)
           .addDataSegment(SEGMENTS.get(0)).toImmutableDruidServer(),
-      new LoadQueuePeonTester()
+      new TestLoadQueuePeon()
   );
 
   private static final ServerHolder NEW_HISTORICAL = new ServerHolder(
       new DruidServer("name1", "host2", null, 100L, ServerType.HISTORICAL, 
"tier1", 0)
           .addDataSegment(SEGMENTS.get(0)).toImmutableDruidServer(),
-      new LoadQueuePeonTester()
+      new TestLoadQueuePeon()
   );
 
   private DruidCluster.Builder clusterBuilder;
@@ -67,14 +67,14 @@ public class DruidClusterTest
             new ServerHolder(
                 new DruidServer("name1", "host1", null, 100L, 
ServerType.REALTIME, "tier1", 0)
                     .addDataSegment(SEGMENTS.get(0)).toImmutableDruidServer(),
-                new LoadQueuePeonTester()
+                new TestLoadQueuePeon()
             )
         )
         .add(
             new ServerHolder(
                 new DruidServer("name1", "host1", null, 100L, 
ServerType.HISTORICAL, "tier1", 0)
                     .addDataSegment(SEGMENTS.get(0)).toImmutableDruidServer(),
-                new LoadQueuePeonTester()
+                new TestLoadQueuePeon()
             )
         );
   }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/RoundRobinServerSelectorTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/RoundRobinServerSelectorTest.java
index 4164bba364..62f84b631a 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/RoundRobinServerSelectorTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/RoundRobinServerSelectorTest.java
@@ -23,8 +23,8 @@ import org.apache.druid.client.DruidServer;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.server.coordination.ServerType;
-import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
 import org.apache.druid.server.coordinator.loading.RoundRobinServerSelector;
+import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.junit.Assert;
@@ -140,7 +140,7 @@ public class RoundRobinServerSelectorTest
     return new ServerHolder(
         new DruidServer(name, name, null, size, ServerType.HISTORICAL, TIER, 1)
             .toImmutableDruidServer(),
-        new LoadQueuePeonTester()
+        new TestLoadQueuePeon()
     );
   }
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java
index ed9735c6bc..fd1fb7c306 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java
@@ -26,7 +26,7 @@ import org.apache.druid.client.ImmutableDruidServer;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.server.coordination.DruidServerMetadata;
 import org.apache.druid.server.coordination.ServerType;
-import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
+import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.NoneShardSpec;
 import org.junit.Assert;
@@ -79,7 +79,7 @@ public class ServerHolderTest
             ImmutableMap.of("src1", DATA_SOURCES.get("src1")),
             1
         ),
-        new LoadQueuePeonTester()
+        new TestLoadQueuePeon()
     );
 
     // available size of 100
@@ -90,7 +90,7 @@ public class ServerHolderTest
             ImmutableMap.of("src1", DATA_SOURCES.get("src1")),
             1
         ),
-        new LoadQueuePeonTester()
+        new TestLoadQueuePeon()
     );
 
     // available size of 10
@@ -101,7 +101,7 @@ public class ServerHolderTest
             ImmutableMap.of("src1", DATA_SOURCES.get("src1")),
             1
         ),
-        new LoadQueuePeonTester()
+        new TestLoadQueuePeon()
     );
 
     // available size of 50
@@ -112,7 +112,7 @@ public class ServerHolderTest
             ImmutableMap.of("src1", DATA_SOURCES.get("src1")),
             1
         ),
-        new LoadQueuePeonTester()
+        new TestLoadQueuePeon()
     );
 
     Assert.assertEquals(0, h1.compareTo(h2));
@@ -130,7 +130,7 @@ public class ServerHolderTest
             ImmutableMap.of("src1", DATA_SOURCES.get("src1")),
             1
         ),
-        new LoadQueuePeonTester()
+        new TestLoadQueuePeon()
     );
 
     final ServerHolder h2 = new ServerHolder(
@@ -140,7 +140,7 @@ public class ServerHolderTest
             ImmutableMap.of("src1", DATA_SOURCES.get("src1")),
             1
         ),
-        new LoadQueuePeonTester()
+        new TestLoadQueuePeon()
     );
 
     final ServerHolder h3 = new ServerHolder(
@@ -150,7 +150,7 @@ public class ServerHolderTest
             ImmutableMap.of("src1", DATA_SOURCES.get("src1")),
             1
         ),
-        new LoadQueuePeonTester()
+        new TestLoadQueuePeon()
     );
 
     final ServerHolder h4 = new ServerHolder(
@@ -160,7 +160,7 @@ public class ServerHolderTest
             ImmutableMap.of("src1", DATA_SOURCES.get("src1")),
             1
         ),
-        new LoadQueuePeonTester()
+        new TestLoadQueuePeon()
     );
 
     final ServerHolder h5 = new ServerHolder(
@@ -170,7 +170,7 @@ public class ServerHolderTest
             ImmutableMap.of("src1", DATA_SOURCES.get("src1")),
             1
         ),
-        new LoadQueuePeonTester()
+        new TestLoadQueuePeon()
     );
 
     Assert.assertEquals(h1, h2);
@@ -189,7 +189,7 @@ public class ServerHolderTest
             ImmutableMap.of("src1", DATA_SOURCES.get("src1")),
             1
         ),
-        new LoadQueuePeonTester()
+        new TestLoadQueuePeon()
     );
     Assert.assertTrue(h1.isServingSegment(SEGMENTS.get(0)));
     Assert.assertFalse(h1.isServingSegment(SEGMENTS.get(1)));
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/balancer/BalancerStrategyTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/balancer/BalancerStrategyTest.java
index ad4d7f65ae..6e0e3ddf0b 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/balancer/BalancerStrategyTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/balancer/BalancerStrategyTest.java
@@ -24,7 +24,7 @@ import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.server.coordination.ServerType;
 import org.apache.druid.server.coordinator.ServerHolder;
-import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
+import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.NoneShardSpec;
 import org.junit.Assert;
@@ -84,7 +84,7 @@ public class BalancerStrategyTest
   {
     final ServerHolder serverHolder = new ServerHolder(
         new DruidServer("server1", "host1", null, 10L, ServerType.HISTORICAL, 
DruidServer.DEFAULT_TIER, 
0).addDataSegment(proposedDataSegment).toImmutableDruidServer(),
-        new LoadQueuePeonTester());
+        new TestLoadQueuePeon());
     Assert.assertFalse(
         balancerStrategy.findServersToLoadSegment(
             proposedDataSegment,
@@ -99,12 +99,12 @@ public class BalancerStrategyTest
     final ServerHolder serverHolder1 = new ServerHolder(
         new DruidServer("server1", "host1", null, 1000L, 
ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0)
             .addDataSegment(proposedDataSegment).toImmutableDruidServer(),
-        new LoadQueuePeonTester());
+        new TestLoadQueuePeon());
 
     final ServerHolder serverHolder2 = new ServerHolder(
         new DruidServer("server2", "host2", null, 1000L, 
ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0)
             .addDataSegment(proposedDataSegment).toImmutableDruidServer(),
-        new LoadQueuePeonTester());
+        new TestLoadQueuePeon());
 
     serverHolders = new ArrayList<>();
     serverHolders.add(serverHolder1);
@@ -119,7 +119,7 @@ public class BalancerStrategyTest
   {
     final ServerHolder serverHolder = new ServerHolder(
         new DruidServer("server1", "host1", null, 1000L, 
ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0).toImmutableDruidServer(),
-        new LoadQueuePeonTester());
+        new TestLoadQueuePeon());
     serverHolders = new ArrayList<>();
     serverHolders.add(serverHolder);
     final ServerHolder foundServerHolder = balancerStrategy
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyTest.java
index 23975352b3..f0d6519974 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyTest.java
@@ -26,7 +26,7 @@ import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.server.coordination.ServerType;
 import org.apache.druid.server.coordinator.ServerHolder;
-import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
+import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
 import org.apache.druid.timeline.DataSegment;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
@@ -141,7 +141,7 @@ public class CachingCostBalancerStrategyTest
         .forEach(druidServer::addDataSegment);
     return new ServerHolder(
         druidServer.toImmutableDruidServer(),
-        new LoadQueuePeonTester()
+        new TestLoadQueuePeon()
     );
   }
 
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyTest.java
index 17d0a8716a..e4b3715960 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyTest.java
@@ -29,7 +29,7 @@ import org.apache.druid.java.util.metrics.StubServiceEmitter;
 import org.apache.druid.server.coordination.ServerType;
 import org.apache.druid.server.coordinator.CreateDataSegments;
 import org.apache.druid.server.coordinator.ServerHolder;
-import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
+import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
 import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
 import org.apache.druid.server.coordinator.stats.Dimension;
 import org.apache.druid.server.coordinator.stats.RowKey;
@@ -283,7 +283,7 @@ public class CostBalancerStrategyTest
 
     // Create ServerHolder for each server
     final List<ServerHolder> serverHolders = historicals.stream().map(
-        server -> new ServerHolder(server.toImmutableDruidServer(), new 
LoadQueuePeonTester())
+        server -> new ServerHolder(server.toImmutableDruidServer(), new 
TestLoadQueuePeon())
     ).collect(Collectors.toList());
 
     final ServerHolder serverA = serverHolders.get(0);
@@ -321,11 +321,11 @@ public class CostBalancerStrategyTest
   {
     final ServerHolder serverA = new ServerHolder(
         createHistorical().toImmutableDruidServer(),
-        new LoadQueuePeonTester()
+        new TestLoadQueuePeon()
     );
     final ServerHolder serverB = new ServerHolder(
         createHistorical().toImmutableDruidServer(),
-        new LoadQueuePeonTester()
+        new TestLoadQueuePeon()
     );
 
     final DataSegment segment = 
CreateDataSegments.ofDatasource(DS_WIKI).eachOfSizeInMb(100).get(0);
@@ -356,7 +356,7 @@ public class CostBalancerStrategyTest
                                             .startingAt("2012-10-24")
                                             .eachOfSizeInMb(100).get(0);
 
-    final LoadQueuePeonTester peon = new LoadQueuePeonTester();
+    final TestLoadQueuePeon peon = new TestLoadQueuePeon();
     ServerHolder serverA = new 
ServerHolder(createHistorical().toImmutableDruidServer(), peon);
     ServerHolder serverB = new 
ServerHolder(createHistorical().toImmutableDruidServer(), peon);
 
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategyTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategyTest.java
index 98f86ec318..cbc4b8f0f7 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategyTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategyTest.java
@@ -29,7 +29,7 @@ import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.server.coordination.DruidServerMetadata;
 import org.apache.druid.server.coordination.ServerType;
 import org.apache.druid.server.coordinator.ServerHolder;
-import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
+import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
 import org.apache.druid.timeline.DataSegment;
 import org.easymock.EasyMock;
 import org.joda.time.Interval;
@@ -57,7 +57,7 @@ public class DiskNormalizedCostBalancerStrategyTest
     // Create 10 servers with current size being 3K & max size being 10K
     // Each having having 100 segments
     for (int i = 0; i < serverCount; i++) {
-      LoadQueuePeonTester fromPeon = new LoadQueuePeonTester();
+      TestLoadQueuePeon fromPeon = new TestLoadQueuePeon();
 
       List<DataSegment> segments = IntStream
           .range(0, maxSegments)
@@ -78,7 +78,7 @@ public class DiskNormalizedCostBalancerStrategyTest
     }
 
     // The best server to be available for next segment assignment has greater 
max Size
-    LoadQueuePeonTester fromPeon = new LoadQueuePeonTester();
+    TestLoadQueuePeon fromPeon = new TestLoadQueuePeon();
     ImmutableDruidServer druidServer = 
EasyMock.createMock(ImmutableDruidServer.class);
     EasyMock.expect(druidServer.getName()).andReturn("BEST_SERVER").anyTimes();
     EasyMock.expect(druidServer.getCurrSize()).andReturn(3000L).anyTimes();
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/balancer/ReservoirSegmentSamplerTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/balancer/ReservoirSegmentSamplerTest.java
index e6387a5a34..c438219e1b 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/balancer/ReservoirSegmentSamplerTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/balancer/ReservoirSegmentSamplerTest.java
@@ -25,8 +25,8 @@ import 
org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.server.coordination.ServerType;
 import org.apache.druid.server.coordinator.CreateDataSegments;
 import org.apache.druid.server.coordinator.ServerHolder;
-import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
 import org.apache.druid.server.coordinator.loading.SegmentAction;
+import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
 import org.apache.druid.timeline.DataSegment;
 import org.junit.Assert;
 import org.junit.Before;
@@ -175,7 +175,7 @@ public class ReservoirSegmentSamplerTest
             .addDataSegment(segments.get(2))
             .addDataSegment(segments.get(3))
             .toImmutableDruidServer(),
-        new LoadQueuePeonTester()
+        new TestLoadQueuePeon()
     );
 
     // Try to pick all the segments on the servers
@@ -380,6 +380,6 @@ public class ReservoirSegmentSamplerTest
     for (DataSegment segment : loadedSegments) {
       server.addDataSegment(segment);
     }
-    return new ServerHolder(server.toImmutableDruidServer(), new 
LoadQueuePeonTester());
+    return new ServerHolder(server.toImmutableDruidServer(), new 
TestLoadQueuePeon());
   }
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java
index 185f8e8263..c608f58d9a 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java
@@ -31,8 +31,8 @@ import 
org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
 import org.apache.druid.server.coordinator.ServerHolder;
 import org.apache.druid.server.coordinator.balancer.BalancerStrategy;
 import 
org.apache.druid.server.coordinator.balancer.CostBalancerStrategyFactory;
-import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
 import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
+import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
 import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
 import org.apache.druid.server.coordinator.stats.Stats;
 import org.apache.druid.timeline.DataSegment;
@@ -423,7 +423,7 @@ public class BalanceSegmentsTest
 
     return new ServerHolder(
         server.toImmutableDruidServer(),
-        new LoadQueuePeonTester(),
+        new TestLoadQueuePeon(),
         isDecommissioning,
         maxSegmentsInLoadQueue,
         10
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegmentsTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegmentsTest.java
index 83aa11fb30..ba39c5fc43 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegmentsTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegmentsTest.java
@@ -33,8 +33,8 @@ import org.apache.druid.server.coordinator.DruidCluster;
 import org.apache.druid.server.coordinator.DruidCoordinator;
 import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
 import org.apache.druid.server.coordinator.ServerHolder;
-import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
 import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
+import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
 import 
org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule;
 import org.apache.druid.server.coordinator.rules.ForeverLoadRule;
 import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
@@ -61,10 +61,10 @@ public class UnloadUnusedSegmentsTest
   private ImmutableDruidServer historicalServerTier2;
   private ImmutableDruidServer brokerServer;
   private ImmutableDruidServer indexerServer;
-  private LoadQueuePeonTester historicalPeon;
-  private LoadQueuePeonTester historicalTier2Peon;
-  private LoadQueuePeonTester brokerPeon;
-  private LoadQueuePeonTester indexerPeon;
+  private TestLoadQueuePeon historicalPeon;
+  private TestLoadQueuePeon historicalTier2Peon;
+  private TestLoadQueuePeon brokerPeon;
+  private TestLoadQueuePeon indexerPeon;
   private DataSegment segment1;
   private DataSegment segment2;
   private List<DataSegment> segments;
@@ -143,10 +143,10 @@ public class UnloadUnusedSegmentsTest
     segmentsForRealtime.add(realtimeOnlySegment);
     segmentsForRealtime.add(broadcastSegment);
 
-    historicalPeon = new LoadQueuePeonTester();
-    historicalTier2Peon = new LoadQueuePeonTester();
-    brokerPeon = new LoadQueuePeonTester();
-    indexerPeon = new LoadQueuePeonTester();
+    historicalPeon = new TestLoadQueuePeon();
+    historicalTier2Peon = new TestLoadQueuePeon();
+    brokerPeon = new TestLoadQueuePeon();
+    indexerPeon = new TestLoadQueuePeon();
 
     final ImmutableDruidDataSource dataSource1 = new ImmutableDruidDataSource(
         "datasource1",
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/loading/LoadQueuePeonTester.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/loading/LoadQueuePeonTester.java
deleted file mode 100644
index e1a22aaa07..0000000000
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/loading/LoadQueuePeonTester.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.server.coordinator.loading;
-
-import org.apache.druid.java.util.common.concurrent.Execs;
-import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig;
-import org.apache.druid.timeline.DataSegment;
-import org.joda.time.Duration;
-
-import javax.annotation.Nullable;
-import java.util.concurrent.ConcurrentSkipListSet;
-
-public class LoadQueuePeonTester extends CuratorLoadQueuePeon
-{
-  private final ConcurrentSkipListSet<DataSegment> segmentsToLoad = new 
ConcurrentSkipListSet<>();
-
-  public LoadQueuePeonTester()
-  {
-    super(
-        null,
-        null,
-        null,
-        Execs.scheduledSingleThreaded("LoadQueuePeonTester--%d"),
-        null,
-        new TestDruidCoordinatorConfig.Builder()
-            .withLoadTimeoutDelay(new Duration(1))
-            .withCoordinatorKillMaxSegments(10)
-            .withCoordinatorKillIgnoreDurationToRetain(false)
-            .build()
-    );
-  }
-
-  @Override
-  public void loadSegment(DataSegment segment, SegmentAction action, @Nullable 
LoadPeonCallback callback)
-  {
-    segmentsToLoad.add(segment);
-  }
-
-  @Override
-  public ConcurrentSkipListSet<DataSegment> getSegmentsToLoad()
-  {
-    return segmentsToLoad;
-  }
-}
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/loading/TestLoadQueuePeon.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/loading/TestLoadQueuePeon.java
new file mode 100644
index 0000000000..c496c37e3e
--- /dev/null
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/loading/TestLoadQueuePeon.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.loading;
+
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.timeline.DataSegment;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+/**
+ * Test implementation of {@link LoadQueuePeon} that maintains a set of 
segments
+ * that have been queued for load or drop.
+ */
+public class TestLoadQueuePeon implements LoadQueuePeon
+{
+  private final ConcurrentSkipListSet<DataSegment> segmentsToLoad = new 
ConcurrentSkipListSet<>();
+  private final ConcurrentSkipListSet<DataSegment> segmentsToDrop = new 
ConcurrentSkipListSet<>();
+
+  private final CoordinatorRunStats stats = new CoordinatorRunStats();
+
+  @Override
+  public void loadSegment(DataSegment segment, SegmentAction action, @Nullable 
LoadPeonCallback callback)
+  {
+    segmentsToLoad.add(segment);
+  }
+
+  @Override
+  public void dropSegment(DataSegment segment, LoadPeonCallback callback)
+  {
+    segmentsToDrop.add(segment);
+  }
+
+  @Override
+  public long getSizeOfSegmentsToLoad()
+  {
+    return 0;
+  }
+
+  @Override
+  public CoordinatorRunStats getAndResetStats()
+  {
+    return stats;
+  }
+
+  @Override
+  public boolean cancelOperation(DataSegment segment)
+  {
+    return false;
+  }
+
+  @Override
+  public void start()
+  {
+
+  }
+
+  @Override
+  public void stop()
+  {
+
+  }
+
+  @Override
+  public ConcurrentSkipListSet<DataSegment> getSegmentsToLoad()
+  {
+    return segmentsToLoad;
+  }
+
+  @Override
+  public Set<SegmentHolder> getSegmentsInQueue()
+  {
+    return Collections.emptySet();
+  }
+
+  @Override
+  public Set<DataSegment> getSegmentsToDrop()
+  {
+    return segmentsToDrop;
+  }
+
+  @Override
+  public Set<DataSegment> getTimedOutSegments()
+  {
+    return Collections.emptySet();
+  }
+
+  @Override
+  public void markSegmentToDrop(DataSegment segmentToLoad)
+  {
+
+  }
+
+  @Override
+  public void unmarkSegmentToDrop(DataSegment segmentToLoad)
+  {
+
+  }
+
+  @Override
+  public Set<DataSegment> getSegmentsMarkedToDrop()
+  {
+    return Collections.emptySet();
+  }
+}
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java
index acf815089b..e21d823f34 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java
@@ -27,9 +27,9 @@ import org.apache.druid.server.coordinator.DruidCluster;
 import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
 import org.apache.druid.server.coordinator.ServerHolder;
 import org.apache.druid.server.coordinator.balancer.RandomBalancerStrategy;
-import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
 import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
 import org.apache.druid.server.coordinator.loading.StrategicSegmentAssigner;
+import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
 import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
 import org.apache.druid.server.coordinator.stats.Stats;
 import org.apache.druid.timeline.DataSegment;
@@ -120,7 +120,7 @@ public class BroadcastDistributionRuleTest
             0
         ).addDataSegment(smallSegment)
          .toImmutableDruidServer(),
-        new LoadQueuePeonTester()
+        new TestLoadQueuePeon()
     );
 
     holdersOfLargeSegments.add(
@@ -135,7 +135,7 @@ public class BroadcastDistributionRuleTest
                 0
             ).addDataSegment(largeSegments.get(0))
              .toImmutableDruidServer(),
-            new LoadQueuePeonTester()
+            new TestLoadQueuePeon()
         )
     );
     holdersOfLargeSegments.add(
@@ -150,7 +150,7 @@ public class BroadcastDistributionRuleTest
                 0
             ).addDataSegment(largeSegments.get(1))
              .toImmutableDruidServer(),
-            new LoadQueuePeonTester()
+            new TestLoadQueuePeon()
         )
     );
     holdersOfLargeSegments.add(
@@ -165,7 +165,7 @@ public class BroadcastDistributionRuleTest
                 0
             ).addDataSegment(largeSegments.get(2))
              .toImmutableDruidServer(),
-            new LoadQueuePeonTester()
+            new TestLoadQueuePeon()
         )
     );
 
@@ -181,7 +181,7 @@ public class BroadcastDistributionRuleTest
                 0
             ).addDataSegment(largeSegments2.get(0))
              .toImmutableDruidServer(),
-            new LoadQueuePeonTester()
+            new TestLoadQueuePeon()
         )
     );
     holdersOfLargeSegments2.add(
@@ -196,7 +196,7 @@ public class BroadcastDistributionRuleTest
                 0
             ).addDataSegment(largeSegments2.get(1))
              .toImmutableDruidServer(),
-            new LoadQueuePeonTester()
+            new TestLoadQueuePeon()
         )
     );
 
@@ -211,7 +211,7 @@ public class BroadcastDistributionRuleTest
             0
         ).addDataSegment(largeSegments.get(0))
          .toImmutableDruidServer(),
-        new LoadQueuePeonTester()
+        new TestLoadQueuePeon()
     );
 
     decommissioningServer1 = new ServerHolder(
@@ -225,7 +225,7 @@ public class BroadcastDistributionRuleTest
             0
         ).addDataSegment(smallSegment)
          .toImmutableDruidServer(),
-        new LoadQueuePeonTester(),
+        new TestLoadQueuePeon(),
         true
     );
 
@@ -240,7 +240,7 @@ public class BroadcastDistributionRuleTest
             0
         ).addDataSegment(largeSegments.get(1))
          .toImmutableDruidServer(),
-        new LoadQueuePeonTester(),
+        new TestLoadQueuePeon(),
         true
     );
 
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java
index c7e3b5b239..7ef07719be 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java
@@ -24,7 +24,6 @@ import 
com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import nl.jqno.equalsverifier.EqualsVerifier;
 import org.apache.druid.client.DruidServer;
-import org.apache.druid.client.ImmutableDruidServer;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.concurrent.Execs;
@@ -38,18 +37,14 @@ import org.apache.druid.server.coordinator.ServerHolder;
 import org.apache.druid.server.coordinator.balancer.BalancerStrategy;
 import 
org.apache.druid.server.coordinator.balancer.CachingCostBalancerStrategy;
 import org.apache.druid.server.coordinator.balancer.ClusterCostCache;
-import 
org.apache.druid.server.coordinator.balancer.CostBalancerStrategyFactory;
-import org.apache.druid.server.coordinator.loading.LoadQueuePeon;
-import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
-import org.apache.druid.server.coordinator.loading.SegmentAction;
-import org.apache.druid.server.coordinator.loading.SegmentHolder;
+import org.apache.druid.server.coordinator.balancer.CostBalancerStrategy;
 import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
 import org.apache.druid.server.coordinator.loading.StrategicSegmentAssigner;
+import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
 import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
 import org.apache.druid.server.coordinator.stats.Stats;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.NoneShardSpec;
-import org.easymock.EasyMock;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -62,7 +57,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -96,7 +90,7 @@ public class LoadRuleTest
   public void setUp()
   {
     exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, 
"LoadRuleTest-%d"));
-    balancerStrategy = new 
CostBalancerStrategyFactory().createBalancerStrategy(exec);
+    balancerStrategy = new CostBalancerStrategy(exec);
     loadQueueManager = new SegmentLoadQueueManager(null, null, null);
   }
 
@@ -107,17 +101,15 @@ public class LoadRuleTest
   }
 
   @Test
-  public void testLoad()
+  public void testLoadRuleAssignsSegments()
   {
-    final LoadQueuePeon mockPeon = createEmptyPeon();
-    mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(), 
EasyMock.anyObject());
-    EasyMock.expectLastCall().atLeastOnce();
-    EasyMock.replay(mockPeon);
-
+    // Cluster has 2 tiers with 1 server each
+    final ServerHolder server1 = createServer(Tier.T1);
+    final ServerHolder server2 = createServer(Tier.T2);
     DruidCluster druidCluster = DruidCluster
         .builder()
-        .addTier(Tier.T1, createServerHolder(Tier.T1, mockPeon, false))
-        .addTier(Tier.T2, createServerHolder(Tier.T2, mockPeon, false))
+        .addTier(Tier.T1, server1)
+        .addTier(Tier.T2, server2)
         .build();
 
     final DataSegment segment = createDataSegment(DS_WIKI);
@@ -126,8 +118,6 @@ public class LoadRuleTest
 
     Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, 
Tier.T1, DS_WIKI));
     Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, 
Tier.T2, DS_WIKI));
-
-    EasyMock.verify(mockPeon);
   }
 
   private CoordinatorRunStats runRuleAndGetStats(LoadRule rule, DataSegment 
segment, DruidCluster cluster)
@@ -169,99 +159,56 @@ public class LoadRuleTest
   @Test
   public void testLoadPrimaryAssignDoesNotOverAssign()
   {
-    final LoadQueuePeon mockPeon = createEmptyPeon();
-    mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(), 
EasyMock.anyObject());
-    EasyMock.expectLastCall().atLeastOnce();
-    EasyMock.replay(mockPeon);
-
-    ImmutableDruidServer server1 = 
createServer(Tier.T1).toImmutableDruidServer();
-    ImmutableDruidServer server2 = 
createServer(Tier.T1).toImmutableDruidServer();
+    ServerHolder server1 = createServer(Tier.T1);
+    ServerHolder server2 = createServer(Tier.T1);
     DruidCluster druidCluster = DruidCluster
         .builder()
-        .addTier(Tier.T1, new ServerHolder(server1, mockPeon), new 
ServerHolder(server2, mockPeon))
+        .addTier(Tier.T1, server1, server2)
         .build();
 
     final LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1));
     final DataSegment segment = createDataSegment(DS_WIKI);
-    CoordinatorRunStats stats = runRuleAndGetStats(rule, segment, 
druidCluster);
-    Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, 
Tier.T1, segment.getDataSource()));
-
-    // ensure multiple runs don't assign primary segment again if at 
replication count
-    final LoadQueuePeon loadingPeon = createLoadingPeon(segment, false);
-    EasyMock.replay(loadingPeon);
-
-    DruidCluster afterLoad = DruidCluster
-        .builder()
-        .addTier(Tier.T1, new ServerHolder(server1, loadingPeon), new 
ServerHolder(server2, mockPeon))
-        .build();
-
-    CoordinatorRunStats statsAfterLoadPrimary = runRuleAndGetStats(rule, 
segment, afterLoad);
-
-    Assert.assertFalse(statsAfterLoadPrimary.hasStat(Stats.Segments.ASSIGNED));
-
-    EasyMock.verify(mockPeon);
+    CoordinatorRunStats firstRunStats = runRuleAndGetStats(rule, segment, 
druidCluster);
+    Assert.assertEquals(1L, 
firstRunStats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, 
segment.getDataSource()));
+    Assert.assertEquals(1, server1.getLoadingSegments().size() + 
server2.getLoadingSegments().size());
+
+    // Verify that multiple runs don't assign primary segment again if at 
replication count
+    CoordinatorRunStats secondRunStats = runRuleAndGetStats(rule, segment, 
druidCluster);
+    Assert.assertFalse(secondRunStats.hasStat(Stats.Segments.ASSIGNED));
+    Assert.assertEquals(1, server1.getLoadingSegments().size() + 
server2.getLoadingSegments().size());
   }
 
   @Test
   @Ignore("Enable this test when timeout behaviour is fixed")
   public void testOverAssignForTimedOutSegments()
   {
-    final LoadQueuePeon emptyPeon = createEmptyPeon();
-    emptyPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(), 
EasyMock.anyObject());
-    EasyMock.expectLastCall().atLeastOnce();
-    EasyMock.replay(emptyPeon);
-
-    ImmutableDruidServer server1 = 
createServer(Tier.T1).toImmutableDruidServer();
-    ImmutableDruidServer server2 = 
createServer(Tier.T1).toImmutableDruidServer();
+    ServerHolder server1 = createServer(Tier.T1);
+    ServerHolder server2 = createServer(Tier.T1);
     DruidCluster druidCluster = DruidCluster
         .builder()
-        .addTier(Tier.T1, new ServerHolder(server1, emptyPeon), new 
ServerHolder(server2, emptyPeon))
+        .addTier(Tier.T1, server1, server2)
         .build();
 
     final LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1));
     final DataSegment segment = createDataSegment(DS_WIKI);
-    CoordinatorRunStats stats = runRuleAndGetStats(
-        rule,
-        segment,
-        makeCoordinatorRuntimeParams(druidCluster, segment)
-    );
+    CoordinatorRunStats stats = runRuleAndGetStats(rule, segment, 
druidCluster);
 
     // Ensure that the segment is assigned to one of the historicals
     Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, 
Tier.T1, segment.getDataSource()));
 
     // Ensure that the primary segment is assigned again in case the peon 
timed out on loading the segment
-    final LoadQueuePeon slowLoadingPeon = createLoadingPeon(segment, true);
-    EasyMock.replay(slowLoadingPeon);
-
-    DruidCluster withLoadTimeout = DruidCluster
-        .builder()
-        .addTier(Tier.T1, new ServerHolder(server1, slowLoadingPeon), new 
ServerHolder(server2, emptyPeon))
-        .build();
-
-    CoordinatorRunStats statsAfterLoadPrimary = runRuleAndGetStats(
-        rule,
-        segment,
-        makeCoordinatorRuntimeParams(withLoadTimeout, segment)
-    );
-
+    CoordinatorRunStats statsAfterLoadPrimary = runRuleAndGetStats(rule, 
segment, druidCluster);
     Assert.assertEquals(1L, 
statsAfterLoadPrimary.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, 
DS_WIKI));
-
-    EasyMock.verify(emptyPeon);
   }
 
   @Test
   public void testSkipReplicationForTimedOutSegments()
   {
-    final LoadQueuePeon emptyPeon = createEmptyPeon();
-    emptyPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(), 
EasyMock.anyObject());
-    EasyMock.expectLastCall().atLeastOnce();
-    EasyMock.replay(emptyPeon);
-
-    ImmutableDruidServer server1 = 
createServer(Tier.T1).toImmutableDruidServer();
-    ImmutableDruidServer server2 = 
createServer(Tier.T1).toImmutableDruidServer();
+    ServerHolder server1 = createServer(Tier.T1);
+    ServerHolder server2 = createServer(Tier.T1);
     DruidCluster druidCluster = DruidCluster
         .builder()
-        .addTier(Tier.T1, new ServerHolder(server1, emptyPeon), new 
ServerHolder(server2, emptyPeon))
+        .addTier(Tier.T1, server1, server2)
         .build();
 
     final LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1));
@@ -272,20 +219,10 @@ public class LoadRuleTest
     Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, 
Tier.T1, segment.getDataSource()));
 
     // Add the segment to the timed out list to simulate peon timeout on 
loading the segment
-    final LoadQueuePeon slowLoadingPeon = createLoadingPeon(segment, true);
-    EasyMock.replay(slowLoadingPeon);
-
-    DruidCluster withLoadTimeout = DruidCluster
-        .builder()
-        .addTier(Tier.T1, new ServerHolder(server1, slowLoadingPeon), new 
ServerHolder(server2, emptyPeon))
-        .build();
-
     // Default behavior is to not replicate the timed out segments on other 
servers
-    CoordinatorRunStats statsAfterLoadPrimary = runRuleAndGetStats(rule, 
segment, withLoadTimeout);
+    CoordinatorRunStats statsAfterLoadPrimary = runRuleAndGetStats(rule, 
segment, druidCluster);
 
     Assert.assertFalse(statsAfterLoadPrimary.hasStat(Stats.Segments.ASSIGNED));
-
-    EasyMock.verify(emptyPeon);
   }
 
   @Test
@@ -297,14 +234,10 @@ public class LoadRuleTest
                           .withNumPartitions(2)
                           .eachOfSizeInMb(100);
 
-    final LoadQueuePeon loadingPeon = createLoadingPeon(segments.get(0), true);
-    loadingPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(), 
EasyMock.anyObject());
-    EasyMock.expectLastCall().once();
-    EasyMock.replay(loadingPeon);
-
+    final ServerHolder server1 = createServer(Tier.T1);
     DruidCluster druidCluster = DruidCluster
         .builder()
-        .addTier(Tier.T1, createServerHolder(Tier.T1, loadingPeon, false))
+        .addTier(Tier.T1, server1)
         .build();
 
     balancerStrategy = new 
CachingCostBalancerStrategy(ClusterCostCache.builder().build(), exec);
@@ -316,104 +249,71 @@ public class LoadRuleTest
         makeCoordinatorRuntimeParams(druidCluster, segments.toArray(new 
DataSegment[0]))
     );
     Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, 
Tier.T1, DS_WIKI));
-
-    EasyMock.verify(loadingPeon);
   }
 
   @Test
-  public void testDrop()
+  public void testSegmentsAreDroppedIfLoadRuleHasZeroReplicas()
   {
-    final LoadQueuePeon mockPeon = createEmptyPeon();
-    mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject());
-    EasyMock.expectLastCall().atLeastOnce();
-    EasyMock.replay(mockPeon);
-
     final DataSegment segment = createDataSegment(DS_WIKI);
 
-    DruidServer server1 = createServer(Tier.T1);
-    server1.addDataSegment(segment);
-    DruidServer server2 = createServer(Tier.T2);
-    server2.addDataSegment(segment);
-    DruidServer server3 = createServer(Tier.T2);
+    final ServerHolder serverT11 = createServer(Tier.T1, segment);
+    final ServerHolder serverT12 = createServer(Tier.T2, segment);
+    final ServerHolder serverT21 = createServer(Tier.T2, segment);
 
     DruidCluster druidCluster = DruidCluster
         .builder()
-        .addTier(Tier.T1, new ServerHolder(server1.toImmutableDruidServer(), 
mockPeon))
-        .addTier(
-            Tier.T2,
-            new ServerHolder(server2.toImmutableDruidServer(), mockPeon),
-            new ServerHolder(server3.toImmutableDruidServer(), mockPeon)
-        )
+        .addTier(Tier.T1, serverT11)
+        .addTier(Tier.T2, serverT12, serverT21)
         .build();
 
     LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 0, Tier.T2, 0));
     CoordinatorRunStats stats = runRuleAndGetStats(rule, segment, 
druidCluster);
 
     Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.DROPPED, 
Tier.T1, DS_WIKI));
-    Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.DROPPED, 
Tier.T2, DS_WIKI));
-
-    EasyMock.verify(mockPeon);
+    Assert.assertEquals(2L, stats.getSegmentStat(Stats.Segments.DROPPED, 
Tier.T2, DS_WIKI));
   }
 
   @Test
-  public void testLoadWithNonExistentTier()
+  public void testLoadIgnoresInvalidTiers()
   {
-    final LoadQueuePeon mockPeon = createEmptyPeon();
-    mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(), 
EasyMock.anyObject());
-    EasyMock.expectLastCall().atLeastOnce();
-    EasyMock.replay(mockPeon);
-
+    ServerHolder server = createServer(Tier.T1);
     DruidCluster druidCluster = DruidCluster
         .builder()
-        .addTier(Tier.T1, createServerHolder(Tier.T1, mockPeon, false))
+        .addTier(Tier.T1, server)
         .build();
 
     final DataSegment segment = createDataSegment(DS_WIKI);
-    LoadRule rule = loadForever(ImmutableMap.of("nonExistentTier", 1, Tier.T1, 
1));
+    LoadRule rule = loadForever(ImmutableMap.of("invalidTier", 1, Tier.T1, 1));
 
     CoordinatorRunStats stats = runRuleAndGetStats(rule, segment, 
druidCluster);
-
     Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, 
Tier.T1, DS_WIKI));
-
-    EasyMock.verify(mockPeon);
+    Assert.assertEquals(0L, stats.getSegmentStat(Stats.Segments.ASSIGNED, 
"invalidTier", DS_WIKI));
   }
 
   @Test
-  public void testDropWithNonExistentTier()
+  public void testDropIgnoresInvalidTiers()
   {
-    final LoadQueuePeon mockPeon = createEmptyPeon();
-    mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject());
-    EasyMock.expectLastCall().atLeastOnce();
-    EasyMock.replay(mockPeon);
-
     final DataSegment segment = createDataSegment(DS_WIKI);
 
-    DruidServer server1 = createServer(Tier.T1);
-    DruidServer server2 = createServer(Tier.T1);
-    server1.addDataSegment(segment);
-    server2.addDataSegment(segment);
-
+    // Cluster has 1 tier with 2 servers
+    ServerHolder server1 = createServer(Tier.T1, segment);
+    ServerHolder server2 = createServer(Tier.T1, segment);
     DruidCluster druidCluster = DruidCluster
         .builder()
-        .addTier(
-            Tier.T1,
-            new ServerHolder(server1.toImmutableDruidServer(), mockPeon),
-            new ServerHolder(server2.toImmutableDruidServer(), mockPeon)
-        )
+        .addTier(Tier.T1, server1, server2)
         .build();
 
-    LoadRule rule = loadForever(ImmutableMap.of("nonExistentTier", 1, Tier.T1, 
1));
+    LoadRule rule = loadForever(ImmutableMap.of("invalidTier", 1, Tier.T1, 1));
     CoordinatorRunStats stats = runRuleAndGetStats(rule, segment, 
druidCluster);
 
     Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.DROPPED, 
Tier.T1, DS_WIKI));
-
-    EasyMock.verify(mockPeon);
+    Assert.assertEquals(0L, stats.getSegmentStat(Stats.Segments.DROPPED, 
"invalidTier", DS_WIKI));
   }
 
   @Test
   public void testMaxLoadingQueueSize()
   {
-    final LoadQueuePeonTester peon = new LoadQueuePeonTester();
+    final TestLoadQueuePeon peon = new TestLoadQueuePeon();
 
     final int maxSegmentsInQueue = 2;
     DruidCluster druidCluster = DruidCluster
@@ -421,7 +321,7 @@ public class LoadRuleTest
         .addTier(
             Tier.T1,
             new ServerHolder(
-                createServer(Tier.T1).toImmutableDruidServer(),
+                createDruidServer(Tier.T1).toImmutableDruidServer(),
                 peon, false, maxSegmentsInQueue, 10
             )
         )
@@ -456,63 +356,55 @@ public class LoadRuleTest
     Assert.assertEquals(0L, stats3.getSegmentStat(Stats.Segments.ASSIGNED, 
Tier.T1, dataSegment3.getDataSource()));
   }
 
-  /**
-   * 2 servers in different tiers, the first is decommissioning.
-   * Should not load a segment to the server that is decommissioning
-   */
   @Test
-  public void testLoadDecommissioning()
+  public void testSegmentIsAssignedOnlyToActiveServer()
   {
-    final LoadQueuePeon mockPeon1 = createEmptyPeon();
-    final LoadQueuePeon mockPeon2 = createOneCallPeonMock();
-    EasyMock.replay(mockPeon1, mockPeon2);
+    final ServerHolder decommServerT1 = createDecommissioningServer(Tier.T1);
+    final ServerHolder serverT2 = createServer(Tier.T2);
 
     DruidCluster druidCluster = DruidCluster
         .builder()
-        .addTier(Tier.T1, createServerHolder(Tier.T1, mockPeon1, true))
-        .addTier(Tier.T2, createServerHolder(Tier.T2, mockPeon2, false))
+        .addTier(Tier.T1, decommServerT1)
+        .addTier(Tier.T2, serverT2)
         .build();
 
+    // Load rule requires 1 replica on each tier
     LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1, Tier.T2, 1));
     DataSegment segment = createDataSegment(DS_WIKI);
     CoordinatorRunStats stats = runRuleAndGetStats(rule, segment, 
druidCluster);
 
+    // Verify that segment is not loaded on decommissioning server
     Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, 
Tier.T2, DS_WIKI));
-    EasyMock.verify(mockPeon1, mockPeon2);
+    Assert.assertEquals(0, decommServerT1.getLoadingSegments().size());
+    Assert.assertTrue(serverT2.getLoadingSegments().contains(segment));
   }
 
-  /**
-   * 2 tiers, 2 servers each, 1 server of the second tier is decommissioning.
-   * Should not load a segment to the server that is decommssioning.
-   */
   @Test
-  public void testLoadReplicaDuringDecommissioning()
+  public void testSegmentIsAssignedOnlyToActiveServers()
   {
-    final LoadQueuePeon mockPeon1 = createEmptyPeon();
-    final LoadQueuePeon mockPeon2 = createOneCallPeonMock();
-    final LoadQueuePeon mockPeon3 = createOneCallPeonMock();
-    final LoadQueuePeon mockPeon4 = createOneCallPeonMock();
-    EasyMock.replay(mockPeon1, mockPeon2, mockPeon3, mockPeon4);
-
-    ServerHolder holder1 = createServerHolder(Tier.T1, mockPeon1, true);
-    ServerHolder holder2 = createServerHolder(Tier.T1, mockPeon2, false);
-    ServerHolder holder3 = createServerHolder(Tier.T2, mockPeon3, false);
-    ServerHolder holder4 = createServerHolder(Tier.T2, mockPeon4, false);
+    // 2 tiers with 2 servers each, 1 server is decommissioning
+    ServerHolder decommServerT11 = createDecommissioningServer(Tier.T1);
+    ServerHolder serverT12 = createServer(Tier.T1);
+    ServerHolder serverT21 = createServer(Tier.T2);
+    ServerHolder serverT22 = createServer(Tier.T2);
 
     final DataSegment segment = createDataSegment(DS_WIKI);
     DruidCluster druidCluster = DruidCluster
         .builder()
-        .addTier(Tier.T1, holder1, holder2)
-        .addTier(Tier.T2, holder3, holder4)
+        .addTier(Tier.T1, decommServerT11, serverT12)
+        .addTier(Tier.T2, serverT21, serverT22)
         .build();
 
+    // Load rule requires 2 replicas on each server
     LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 2, Tier.T2, 2));
     CoordinatorRunStats stats = runRuleAndGetStats(rule, segment, 
druidCluster);
 
+    // Verify that no replica is assigned to decommissioning server
     Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, 
Tier.T1, DS_WIKI));
-    Assert.assertEquals(2L, stats.getSegmentStat(Stats.Segments.ASSIGNED, 
Tier.T2, DS_WIKI));
+    Assert.assertTrue(decommServerT11.getLoadingSegments().isEmpty());
+    Assert.assertEquals(0, decommServerT11.getLoadingSegments().size());
 
-    EasyMock.verify(mockPeon1, mockPeon2, mockPeon3, mockPeon4);
+    Assert.assertEquals(2L, stats.getSegmentStat(Stats.Segments.ASSIGNED, 
Tier.T2, DS_WIKI));
   }
 
   /**
@@ -522,26 +414,15 @@ public class LoadRuleTest
   @Test
   public void testDropDuringDecommissioning()
   {
-    final LoadQueuePeon mockPeon = createEmptyPeon();
-    mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject());
-    EasyMock.expectLastCall().times(2);
-    EasyMock.replay(mockPeon);
-
     final DataSegment segment1 = createDataSegment("foo1");
     final DataSegment segment2 = createDataSegment("foo2");
 
-    DruidServer server1 = createServer(Tier.T1);
-    server1.addDataSegment(segment1);
-    DruidServer server2 = createServer(Tier.T1);
-    server2.addDataSegment(segment2);
+    final ServerHolder server1 = createDecommissioningServer(Tier.T1, 
segment1);
+    final ServerHolder server2 = createServer(Tier.T1, segment2);
 
     DruidCluster druidCluster = DruidCluster
         .builder()
-        .addTier(
-            Tier.T1,
-            new ServerHolder(server1.toImmutableDruidServer(), mockPeon, true),
-            new ServerHolder(server2.toImmutableDruidServer(), mockPeon, false)
-        )
+        .addTier(Tier.T1, server1, server2)
         .build();
 
     DruidCoordinatorRuntimeParams params = 
makeCoordinatorRuntimeParams(druidCluster, segment1, segment2);
@@ -549,45 +430,29 @@ public class LoadRuleTest
 
     CoordinatorRunStats stats = runRuleAndGetStats(rule, segment1, params);
     Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.DROPPED, 
Tier.T1, segment1.getDataSource()));
+    
Assert.assertTrue(server1.getPeon().getSegmentsToDrop().contains(segment1));
 
     stats = runRuleAndGetStats(rule, segment2, params);
     Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.DROPPED, 
Tier.T1, segment2.getDataSource()));
-
-    EasyMock.verify(mockPeon);
+    
Assert.assertTrue(server2.getPeon().getSegmentsToDrop().contains(segment2));
   }
 
-  /**
-   * 3 servers hosting 3 replicas of the segment.
-   * 1 servers is decommissioning.
-   * 1 replica is redundant.
-   * Should drop from the decommissioning server.
-   */
   @Test
-  public void testRedundantReplicaDropDuringDecommissioning()
+  public void testExtraReplicasAreDroppedFromDecommissioningServer()
   {
-    final LoadQueuePeon mockPeon1 = new LoadQueuePeonTester();
-    final LoadQueuePeon mockPeon2 = new LoadQueuePeonTester();
-    final LoadQueuePeon mockPeon3 = new LoadQueuePeonTester();
-
     final DataSegment segment1 = createDataSegment(DS_WIKI);
 
-    DruidServer server1 = createServer(Tier.T1);
-    server1.addDataSegment(segment1);
-    DruidServer server2 = createServer(Tier.T1);
-    server2.addDataSegment(segment1);
-    DruidServer server3 = createServer(Tier.T1);
-    server3.addDataSegment(segment1);
+    // 3 servers, each serving the same segment
+    final ServerHolder server1 = createServer(Tier.T1, segment1);
+    final ServerHolder server2 = createDecommissioningServer(Tier.T1, 
segment1);
+    final ServerHolder server3 = createServer(Tier.T1, segment1);
 
     DruidCluster druidCluster = DruidCluster
         .builder()
-        .addTier(
-            Tier.T1,
-            new ServerHolder(server1.toImmutableDruidServer(), mockPeon1, 
false),
-            new ServerHolder(server2.toImmutableDruidServer(), mockPeon2, 
true),
-            new ServerHolder(server3.toImmutableDruidServer(), mockPeon3, 
false)
-        )
+        .addTier(Tier.T1, server1, server2, server3)
         .build();
 
+    // Load rule requires 2 replicas
     LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 2));
     CoordinatorRunStats stats = runRuleAndGetStats(
         rule,
@@ -595,10 +460,11 @@ public class LoadRuleTest
         makeCoordinatorRuntimeParams(druidCluster, segment1)
     );
 
+    // Verify that the extra replica is dropped from the decommissioning server
     Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.DROPPED, 
Tier.T1, DS_WIKI));
-    Assert.assertEquals(0, mockPeon1.getSegmentsToDrop().size());
-    Assert.assertEquals(1, mockPeon2.getSegmentsToDrop().size());
-    Assert.assertEquals(0, mockPeon3.getSegmentsToDrop().size());
+    Assert.assertEquals(0, server1.getPeon().getSegmentsToDrop().size());
+    Assert.assertEquals(1, server2.getPeon().getSegmentsToDrop().size());
+    Assert.assertEquals(0, server3.getPeon().getSegmentsToDrop().size());
   }
 
   private DataSegment createDataSegment(String dataSource)
@@ -621,54 +487,36 @@ public class LoadRuleTest
     return new ForeverLoadRule(tieredReplicants, null);
   }
 
-  private static LoadQueuePeon createEmptyPeon()
-  {
-    final LoadQueuePeon mockPeon = EasyMock.createMock(LoadQueuePeon.class);
-    
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Collections.emptySet()).anyTimes();
-    
EasyMock.expect(mockPeon.getSegmentsInQueue()).andReturn(Collections.emptySet()).anyTimes();
-    
EasyMock.expect(mockPeon.getSegmentsMarkedToDrop()).andReturn(Collections.emptySet()).anyTimes();
-    
EasyMock.expect(mockPeon.getSizeOfSegmentsToLoad()).andReturn(0L).anyTimes();
-
-    return mockPeon;
-  }
-
-  private static LoadQueuePeon createLoadingPeon(DataSegment segment, boolean 
slowLoading)
-  {
-    final Set<DataSegment> segs = Collections.singleton(segment);
-
-    final LoadQueuePeon mockPeon = EasyMock.createMock(LoadQueuePeon.class);
-    EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(segs).anyTimes();
-    
EasyMock.expect(mockPeon.getSegmentsMarkedToDrop()).andReturn(Collections.emptySet()).anyTimes();
-    
EasyMock.expect(mockPeon.getSegmentsToDrop()).andReturn(Collections.emptySet()).anyTimes();
-    EasyMock.expect(mockPeon.getSegmentsInQueue())
-            .andReturn(Collections.singleton(new SegmentHolder(segment, 
SegmentAction.LOAD, null))).anyTimes();
-
-    EasyMock.expect(mockPeon.getTimedOutSegments())
-            .andReturn(slowLoading ? segs : Collections.emptySet()).anyTimes();
-
-    return mockPeon;
-  }
-
-  private DruidServer createServer(String tier)
+  private DruidServer createDruidServer(String tier)
   {
     final String serverName = "hist_" + tier + "_" + 
serverId.incrementAndGet();
     return new DruidServer(serverName, serverName, null, 10L << 30, 
ServerType.HISTORICAL, tier, 0);
   }
 
-  private static LoadQueuePeon createOneCallPeonMock()
+  private ServerHolder createServer(String tier, DataSegment... segments)
   {
-    final LoadQueuePeon mockPeon2 = createEmptyPeon();
-    mockPeon2.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(), 
EasyMock.anyObject());
-    EasyMock.expectLastCall().once();
-    return mockPeon2;
+    final DruidServer server = createDruidServer(tier);
+    for (DataSegment segment : segments) {
+      server.addDataSegment(segment);
+    }
+
+    return new ServerHolder(
+        server.toImmutableDruidServer(),
+        new TestLoadQueuePeon()
+    );
   }
 
-  private ServerHolder createServerHolder(String tier, LoadQueuePeon 
loadQueuePeon, boolean isDecommissioning)
+  private ServerHolder createDecommissioningServer(String tier, DataSegment... 
segments)
   {
+    final DruidServer server = createDruidServer(tier);
+    for (DataSegment segment : segments) {
+      server.addDataSegment(segment);
+    }
+
     return new ServerHolder(
-        createServer(tier).toImmutableDruidServer(),
-        loadQueuePeon,
-        isDecommissioning
+        server.toImmutableDruidServer(),
+        new TestLoadQueuePeon(),
+        true
     );
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to