gianm closed pull request #5970: [Backport] Coordinator segment balancer max load queue fix URL: https://github.com/apache/incubator-druid/pull/5970
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java index cc266d705df..15f13438163 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java @@ -20,9 +20,9 @@ package io.druid.server.coordinator.helper; import com.google.common.collect.Lists; -import io.druid.java.util.emitter.EmittingLogger; import io.druid.client.ImmutableDruidServer; import io.druid.java.util.common.StringUtils; +import io.druid.java.util.emitter.EmittingLogger; import io.druid.server.coordinator.BalancerSegmentHolder; import io.druid.server.coordinator.BalancerStrategy; import io.druid.server.coordinator.CoordinatorStats; @@ -103,31 +103,37 @@ private void balanceTier( return; } - final List<ServerHolder> serverHolderList = Lists.newArrayList(servers); + final List<ServerHolder> toMoveFrom = Lists.newArrayList(servers); + final List<ServerHolder> toMoveTo = Lists.newArrayList(servers); - if (serverHolderList.size() <= 1) { + if (toMoveTo.size() <= 1) { log.info("[%s]: One or fewer servers found. Cannot balance.", tier); return; } int numSegments = 0; - for (ServerHolder server : serverHolderList) { - numSegments += server.getServer().getSegments().size(); + for (ServerHolder sourceHolder : toMoveFrom) { + numSegments += sourceHolder.getServer().getSegments().size(); } if (numSegments == 0) { log.info("No segments found. Cannot balance."); return; } + + final int maxToLoad = params.getCoordinatorDynamicConfig().getMaxSegmentsInNodeLoadingQueue(); long unmoved = 0L; for (int iter = 0; iter < maxSegmentsToMove; iter++) { - final BalancerSegmentHolder segmentToMove = strategy.pickSegmentToMove(serverHolderList); + if (maxToLoad > 0) { + toMoveTo.removeIf(s -> s.getNumberOfSegmentsInQueue() >= maxToLoad); + } + final BalancerSegmentHolder segmentToMove = strategy.pickSegmentToMove(toMoveFrom); if (segmentToMove != null && params.getAvailableSegments().contains(segmentToMove.getSegment())) { - final ServerHolder holder = strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(), serverHolderList); + final ServerHolder destinationHolder = strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(), toMoveTo); - if (holder != null) { - moveSegment(segmentToMove, holder.getServer(), params); + if (destinationHolder != null) { + moveSegment(segmentToMove, destinationHolder.getServer(), params); } else { ++unmoved; } @@ -140,7 +146,7 @@ private void balanceTier( stats.addToTieredStat("unmovedCount", tier, unmoved); stats.addToTieredStat("movedCount", tier, currentlyMovingSegments.get(tier).size()); if (params.getCoordinatorDynamicConfig().emitBalancingStats()) { - strategy.emitStats(tier, stats, serverHolderList); + strategy.emitStats(tier, stats, toMoveFrom); } log.info( "[%s]: Segments Moved: [%d] Segments Let Alone: [%d]", diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java index ec0b841fbcd..84cf5ce7285 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java @@ -163,7 +163,6 @@ public void tearDown() throws Exception balancerStrategyExecutor.shutdownNow(); } - @Test public void testMoveToEmptyServerBalancer() throws IOException { @@ -186,7 +185,7 @@ public void testMoveToEmptyServerBalancer() throws IOException ) ); - DruidCoordinatorRuntimeParams params = defaullRuntimeParamsBuilder( + DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder( ImmutableList.of(druidServer1, druidServer2), ImmutableList.of(peon1, peon2) ) @@ -197,6 +196,48 @@ public void testMoveToEmptyServerBalancer() throws IOException Assert.assertEquals(2, params.getCoordinatorStats().getTieredStat("movedCount", "normal")); } + @Test + public void testMoveMaxLoadQueueServerBalancer() + { + mockDruidServer(druidServer1, "1", "normal", 30L, 100L, segments); + mockDruidServer(druidServer2, "2", "normal", 0L, 100L, Collections.emptyMap()); + + EasyMock.replay(druidServer3); + EasyMock.replay(druidServer4); + + // Mock stuff that the coordinator needs + mockCoordinator(coordinator); + + BalancerStrategy predefinedPickOrderStrategy = new PredefinedPickOrderBalancerStrategy( + balancerStrategy, + ImmutableList.of( + new BalancerSegmentHolder(druidServer1, segment1), + new BalancerSegmentHolder(druidServer1, segment2), + new BalancerSegmentHolder(druidServer1, segment3), + new BalancerSegmentHolder(druidServer1, segment4) + ) + ); + + DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder( + ImmutableList.of(druidServer1, druidServer2), + ImmutableList.of(peon1, peon2) + ) + .withBalancerStrategy(predefinedPickOrderStrategy) + .withDynamicConfigs( + CoordinatorDynamicConfig + .builder() + .withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE) + .withMaxSegmentsInNodeLoadingQueue(1) + .build() + ) + .build(); + + params = new DruidCoordinatorBalancerTester(coordinator).run(params); + + // max to move is 5, all segments on server 1, but only expect to move 1 to server 2 since max node load queue is 1 + Assert.assertEquals(1, params.getCoordinatorStats().getTieredStat("movedCount", "normal")); + } + @Test public void testMoveSameSegmentTwice() throws Exception { @@ -216,7 +257,7 @@ public void testMoveSameSegmentTwice() throws Exception ) ); - DruidCoordinatorRuntimeParams params = defaullRuntimeParamsBuilder( + DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder( ImmutableList.of(druidServer1, druidServer2), ImmutableList.of(peon1, peon2) ) @@ -245,7 +286,7 @@ public void testRun1() throws IOException // Mock stuff that the coordinator needs mockCoordinator(coordinator); - DruidCoordinatorRuntimeParams params = defaullRuntimeParamsBuilder( + DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder( ImmutableList.of(druidServer1, druidServer2), ImmutableList.of(peon1, peon2) ).build(); @@ -254,7 +295,6 @@ public void testRun1() throws IOException Assert.assertTrue(params.getCoordinatorStats().getTieredStat("movedCount", "normal") > 0); } - @Test public void testRun2() throws IOException { @@ -267,13 +307,13 @@ public void testRun2() throws IOException // Mock stuff that the coordinator needs mockCoordinator(coordinator); - DruidCoordinatorRuntimeParams params = defaullRuntimeParamsBuilder(druidServers, peons).build(); + DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(druidServers, peons).build(); params = new DruidCoordinatorBalancerTester(coordinator).run(params); Assert.assertTrue(params.getCoordinatorStats().getTieredStat("movedCount", "normal") > 0); } - private DruidCoordinatorRuntimeParams.Builder defaullRuntimeParamsBuilder( + private DruidCoordinatorRuntimeParams.Builder defaultRuntimeParamsBuilder( List<ImmutableDruidServer> druidServers, List<LoadQueuePeon> peons ) @@ -393,5 +433,4 @@ public void emitStats( delegate.emitStats(tier, stats, serverHolderList); } } - } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org