This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new d04521d58f Improve description field when emitting metric for 
broadcast failure (#14703)
d04521d58f is described below

commit d04521d58f28119687f6887bb92bfb8cdc189840
Author: Kashif Faraz <[email protected]>
AuthorDate: Tue Aug 1 10:13:55 2023 +0530

    Improve description field when emitting metric for broadcast failure 
(#14703)
    
    Changes:
    - Emit descriptions such as `Load queue is full`, `No disk space` etc. 
instead of `Unknown error`
    - Rewrite `BroadcastDistributionRuleTest`
---
 .../druid/server/coordinator/ServerHolder.java     |   5 +
 .../loading/StrategicSegmentAssigner.java          |  22 +-
 .../druid/server/coordinator/ServerHolderTest.java |   1 +
 .../rules/BroadcastDistributionRuleTest.java       | 522 +++++++--------------
 4 files changed, 196 insertions(+), 354 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java 
b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java
index 7947f1c1b3..d55ac035d6 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java
@@ -196,6 +196,11 @@ public class ServerHolder implements 
Comparable<ServerHolder>
     return isDecommissioning;
   }
 
+  public boolean isLoadQueueFull()
+  {
+    return totalAssignmentsInRun >= maxAssignmentsInRun;
+  }
+
   public long getAvailableSize()
   {
     return getMaxSize() - getSizeUsed();
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java
index 038707a19f..a1800ae72b 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java
@@ -355,7 +355,7 @@ public class StrategicSegmentAssigner implements 
SegmentActionHandler
   }
 
   /**
-   * Loads the broadcast segment if it is not loaded on the given server.
+   * Loads the broadcast segment if it is not already loaded on the given 
server.
    * Returns true only if the segment was successfully queued for load on the 
server.
    */
   private boolean loadBroadcastSegment(DataSegment segment, ServerHolder 
server)
@@ -364,19 +364,21 @@ public class StrategicSegmentAssigner implements 
SegmentActionHandler
       return false;
     } else if (server.isDroppingSegment(segment)) {
       return server.cancelOperation(SegmentAction.DROP, segment);
+    } else if (server.canLoadSegment(segment)) {
+      return loadSegment(segment, server);
     }
 
-    if (server.canLoadSegment(segment) && loadSegment(segment, server)) {
-      return true;
+    final String skipReason;
+    if (server.getAvailableSize() < segment.getSize()) {
+      skipReason = "Not enough disk space";
+    } else if (server.isLoadQueueFull()) {
+      skipReason = "Load queue is full";
     } else {
-      log.makeAlert("Could not assign broadcast segment for datasource [%s]", 
segment.getDataSource())
-         .addData("segmentId", segment.getId())
-         .addData("segmentSize", segment.getSize())
-         .addData("hostName", server.getServer().getHost())
-         .addData("availableSize", server.getAvailableSize())
-         .emit();
-      return false;
+      skipReason = "Unknown error";
     }
+
+    incrementSkipStat(Stats.Segments.ASSIGN_SKIPPED, skipReason, segment, 
server.getServer().getTier());
+    return false;
   }
 
   /**
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java
index fd1fb7c306..11e36dea18 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java
@@ -195,5 +195,6 @@ public class ServerHolderTest
     Assert.assertFalse(h1.isServingSegment(SEGMENTS.get(1)));
     Assert.assertTrue(h1.hasSegmentLoaded(SEGMENTS.get(0).getId()));
     Assert.assertFalse(h1.hasSegmentLoaded(SEGMENTS.get(1).getId()));
+    Assert.assertFalse(h1.isLoadQueueFull());
   }
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java
index e21d823f34..b57cfc9291 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java
@@ -21,293 +21,214 @@ package org.apache.druid.server.coordinator.rules;
 
 import org.apache.druid.client.DruidServer;
 import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.coordinator.CreateDataSegments;
 import org.apache.druid.server.coordinator.DruidCluster;
 import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
 import org.apache.druid.server.coordinator.ServerHolder;
 import org.apache.druid.server.coordinator.balancer.RandomBalancerStrategy;
+import org.apache.druid.server.coordinator.loading.SegmentAction;
 import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
 import org.apache.druid.server.coordinator.loading.StrategicSegmentAssigner;
 import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
 import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.Dimension;
+import org.apache.druid.server.coordinator.stats.RowKey;
 import org.apache.druid.server.coordinator.stats.Stats;
 import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.partition.NoneShardSpec;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 
 public class BroadcastDistributionRuleTest
 {
-  private DruidCluster druidCluster;
-  private ServerHolder holderOfSmallSegment;
-  private final List<ServerHolder> holdersOfLargeSegments = new ArrayList<>();
-  private final List<ServerHolder> holdersOfLargeSegments2 = new ArrayList<>();
-  private final List<DataSegment> largeSegments = new ArrayList<>();
-  private final List<DataSegment> largeSegments2 = new ArrayList<>();
-  private DataSegment smallSegment;
-  private DruidCluster secondCluster;
-  private ServerHolder activeServer;
-  private ServerHolder decommissioningServer1;
-  private ServerHolder decommissioningServer2;
-  private SegmentLoadQueueManager loadQueueManager;
-
-  private static final String DS_SMALL = "small_source";
+  private int serverId = 0;
+
+  private static final String DS_WIKI = "wiki";
   private static final String TIER_1 = "tier1";
   private static final String TIER_2 = "tier2";
 
+  private final DataSegment wikiSegment
+      = CreateDataSegments.ofDatasource(DS_WIKI).eachOfSizeInMb(100).get(0);
+
   @Before
   public void setUp()
   {
-    loadQueueManager = new SegmentLoadQueueManager(null, null, null);
-    smallSegment = new DataSegment(
-        DS_SMALL,
-        Intervals.of("0/1000"),
-        DateTimes.nowUtc().toString(),
-        new HashMap<>(),
-        new ArrayList<>(),
-        new ArrayList<>(),
-        NoneShardSpec.instance(),
-        0,
-        0
-    );
+    serverId = 0;
+  }
 
-    for (int i = 0; i < 3; i++) {
-      largeSegments.add(
-          new DataSegment(
-              "large_source",
-              Intervals.of((i * 1000) + "/" + ((i + 1) * 1000)),
-              DateTimes.nowUtc().toString(),
-              new HashMap<>(),
-              new ArrayList<>(),
-              new ArrayList<>(),
-              NoneShardSpec.instance(),
-              0,
-              100
-          )
-      );
-    }
+  @Test
+  public void testSegmentIsBroadcastToAllTiers()
+  {
+    // 2 tiers with one server each
+    final ServerHolder serverT11 = create10gbHistorical(TIER_1);
+    final ServerHolder serverT21 = create10gbHistorical(TIER_2);
+    DruidCluster cluster = 
DruidCluster.builder().add(serverT11).add(serverT21).build();
+    DruidCoordinatorRuntimeParams params = makeParamsWithUsedSegments(cluster, 
wikiSegment);
 
-    for (int i = 0; i < 2; i++) {
-      largeSegments2.add(
-          new DataSegment(
-              "large_source2",
-              Intervals.of((i * 1000) + "/" + ((i + 1) * 1000)),
-              DateTimes.nowUtc().toString(),
-              new HashMap<>(),
-              new ArrayList<>(),
-              new ArrayList<>(),
-              NoneShardSpec.instance(),
-              0,
-              100
-          )
-      );
-    }
+    ForeverBroadcastDistributionRule rule = new 
ForeverBroadcastDistributionRule();
+    CoordinatorRunStats stats = runRuleOnSegment(rule, wikiSegment, params);
 
-    holderOfSmallSegment = new ServerHolder(
-        new DruidServer(
-            "serverHot2",
-            "hostHot2",
-            null,
-            1000,
-            ServerType.HISTORICAL,
-            TIER_1,
-            0
-        ).addDataSegment(smallSegment)
-         .toImmutableDruidServer(),
-        new TestLoadQueuePeon()
-    );
+    // Verify that segment is assigned to servers of all tiers
+    Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, 
TIER_1, DS_WIKI));
+    Assert.assertTrue(serverT11.isLoadingSegment(wikiSegment));
 
-    holdersOfLargeSegments.add(
-        new ServerHolder(
-            new DruidServer(
-                "serverHot1",
-                "hostHot1",
-                null,
-                1000,
-                ServerType.HISTORICAL,
-                TIER_1,
-                0
-            ).addDataSegment(largeSegments.get(0))
-             .toImmutableDruidServer(),
-            new TestLoadQueuePeon()
-        )
-    );
-    holdersOfLargeSegments.add(
-        new ServerHolder(
-            new DruidServer(
-                "serverNorm1",
-                "hostNorm1",
-                null,
-                1000,
-                ServerType.HISTORICAL,
-                TIER_2,
-                0
-            ).addDataSegment(largeSegments.get(1))
-             .toImmutableDruidServer(),
-            new TestLoadQueuePeon()
-        )
-    );
-    holdersOfLargeSegments.add(
-        new ServerHolder(
-            new DruidServer(
-                "serverNorm2",
-                "hostNorm2",
-                null,
-                100,
-                ServerType.HISTORICAL,
-                TIER_2,
-                0
-            ).addDataSegment(largeSegments.get(2))
-             .toImmutableDruidServer(),
-            new TestLoadQueuePeon()
-        )
-    );
+    Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, 
TIER_2, DS_WIKI));
+    Assert.assertTrue(serverT21.isLoadingSegment(wikiSegment));
+  }
 
-    holdersOfLargeSegments2.add(
-        new ServerHolder(
-            new DruidServer(
-                "serverHot3",
-                "hostHot3",
-                null,
-                1000,
-                ServerType.HISTORICAL,
-                TIER_1,
-                0
-            ).addDataSegment(largeSegments2.get(0))
-             .toImmutableDruidServer(),
-            new TestLoadQueuePeon()
-        )
-    );
-    holdersOfLargeSegments2.add(
-        new ServerHolder(
-            new DruidServer(
-                "serverNorm3",
-                "hostNorm3",
-                null,
-                100,
-                ServerType.HISTORICAL,
-                TIER_2,
-                0
-            ).addDataSegment(largeSegments2.get(1))
-             .toImmutableDruidServer(),
-            new TestLoadQueuePeon()
-        )
-    );
+  @Test
+  public void testSegmentIsNotBroadcastToServerIfAlreadyLoaded()
+  {
+    // serverT11 is already serving the segment which is being broadcast
+    final ServerHolder serverT11 = create10gbHistorical(TIER_1, wikiSegment);
+    final ServerHolder serverT12 = create10gbHistorical(TIER_1);
+    DruidCluster cluster = 
DruidCluster.builder().add(serverT11).add(serverT12).build();
+    DruidCoordinatorRuntimeParams params = makeParamsWithUsedSegments(cluster, 
wikiSegment);
+
+    ForeverBroadcastDistributionRule rule = new 
ForeverBroadcastDistributionRule();
+    CoordinatorRunStats stats = runRuleOnSegment(rule, wikiSegment, params);
+
+    // Verify that serverT11 is already serving and serverT12 is loading 
segment
+    Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, 
TIER_1, DS_WIKI));
+    Assert.assertFalse(serverT11.isLoadingSegment(wikiSegment));
+    Assert.assertTrue(serverT11.isServingSegment(wikiSegment));
+    Assert.assertTrue(serverT12.isLoadingSegment(wikiSegment));
+  }
 
-    activeServer = new ServerHolder(
-        new DruidServer(
-            "active",
-            "host1",
-            null,
-            100,
-            ServerType.HISTORICAL,
-            TIER_1,
-            0
-        ).addDataSegment(largeSegments.get(0))
-         .toImmutableDruidServer(),
+  @Test
+  public void testSegmentIsNotBroadcastToDecommissioningServer()
+  {
+    ServerHolder activeServer = create10gbHistorical(TIER_1);
+    ServerHolder decommissioningServer = 
createDecommissioningHistorical(TIER_1);
+    DruidCluster cluster = DruidCluster.builder()
+                                       .add(activeServer)
+                                       .add(decommissioningServer).build();
+    DruidCoordinatorRuntimeParams params = makeParamsWithUsedSegments(cluster, 
wikiSegment);
+
+    ForeverBroadcastDistributionRule rule = new 
ForeverBroadcastDistributionRule();
+    CoordinatorRunStats stats = runRuleOnSegment(rule, wikiSegment, params);
+
+    Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, 
TIER_1, DS_WIKI));
+    Assert.assertTrue(activeServer.isLoadingSegment(wikiSegment));
+    Assert.assertTrue(decommissioningServer.getLoadingSegments().isEmpty());
+  }
+
+  @Test
+  public void testBroadcastSegmentIsDroppedFromDecommissioningServer()
+  {
+    // Both active and decommissioning servers are already serving the segment
+    ServerHolder activeServer = create10gbHistorical(TIER_1, wikiSegment);
+    ServerHolder decommissioningServer = 
createDecommissioningHistorical(TIER_1, wikiSegment);
+    DruidCluster cluster = DruidCluster.builder()
+                                       .add(activeServer)
+                                       .add(decommissioningServer)
+                                       .build();
+    DruidCoordinatorRuntimeParams params = makeParamsWithUsedSegments(cluster, 
wikiSegment);
+
+    ForeverBroadcastDistributionRule rule = new 
ForeverBroadcastDistributionRule();
+    CoordinatorRunStats stats = runRuleOnSegment(rule, wikiSegment, params);
+
+    // Verify that segment is dropped only from the decommissioning server
+    Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.DROPPED, 
TIER_1, DS_WIKI));
+    Assert.assertTrue(activeServer.getPeon().getSegmentsToDrop().isEmpty());
+    
Assert.assertTrue(decommissioningServer.getPeon().getSegmentsToDrop().contains(wikiSegment));
+  }
+
+  @Test
+  public void testSegmentIsBroadcastToAllServerTypes()
+  {
+    final ServerHolder broker = new ServerHolder(
+        create10gbServer(ServerType.BROKER, 
"broker_tier").toImmutableDruidServer(),
         new TestLoadQueuePeon()
     );
-
-    decommissioningServer1 = new ServerHolder(
-        new DruidServer(
-            "decommissioning1",
-            "host2",
-            null,
-            100,
-            ServerType.HISTORICAL,
-            TIER_1,
-            0
-        ).addDataSegment(smallSegment)
-         .toImmutableDruidServer(),
-        new TestLoadQueuePeon(),
-        true
+    final ServerHolder indexer = new ServerHolder(
+        create10gbServer(ServerType.INDEXER_EXECUTOR, 
TIER_2).toImmutableDruidServer(),
+        new TestLoadQueuePeon()
     );
+    final ServerHolder historical = create10gbHistorical(TIER_1);
 
-    decommissioningServer2 = new ServerHolder(
-        new DruidServer(
-            "decommissioning2",
-            "host3",
-            null,
-            100,
-            ServerType.HISTORICAL,
-            TIER_1,
-            0
-        ).addDataSegment(largeSegments.get(1))
-         .toImmutableDruidServer(),
-        new TestLoadQueuePeon(),
-        true
-    );
+    DruidCluster cluster = DruidCluster.builder()
+                                       
.add(broker).add(indexer).add(historical)
+                                       .build();
+    DruidCoordinatorRuntimeParams params = makeParamsWithUsedSegments(cluster, 
wikiSegment);
 
-    druidCluster = DruidCluster
-        .builder()
-        .addTier(
-            TIER_1,
-            holdersOfLargeSegments.get(0),
-            holderOfSmallSegment,
-            holdersOfLargeSegments2.get(0)
-        )
-        .addTier(
-            TIER_2,
-            holdersOfLargeSegments.get(1),
-            holdersOfLargeSegments.get(2),
-            holdersOfLargeSegments2.get(1)
-        )
-        .build();
+    ForeverBroadcastDistributionRule rule = new 
ForeverBroadcastDistributionRule();
+    final CoordinatorRunStats stats = runRuleOnSegment(rule, wikiSegment, 
params);
 
-    secondCluster = DruidCluster
-        .builder()
-        .addTier(
-            TIER_1,
-            activeServer,
-            decommissioningServer1,
-            decommissioningServer2
-        )
-        .build();
+    // Verify that segment is assigned to historical, broker as well as indexer
+    Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, 
TIER_1, DS_WIKI));
+    Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, 
TIER_2, DS_WIKI));
+    Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, 
broker.getServer().getTier(), DS_WIKI));
+
+    Assert.assertTrue(historical.isLoadingSegment(wikiSegment));
+    Assert.assertTrue(indexer.isLoadingSegment(wikiSegment));
+    Assert.assertTrue(broker.isLoadingSegment(wikiSegment));
   }
 
   @Test
-  public void testBroadcastToSingleDataSource()
+  public void testReasonForBroadcastFailure()
   {
-    final ForeverBroadcastDistributionRule rule =
-        new ForeverBroadcastDistributionRule();
-
-    CoordinatorRunStats stats = runRuleAndGetStats(
-        rule,
-        smallSegment,
-        makeCoordinartorRuntimeParams(
-            druidCluster,
-            smallSegment,
-            largeSegments.get(0),
-            largeSegments.get(1),
-            largeSegments.get(2),
-            largeSegments2.get(0),
-            largeSegments2.get(1)
-        )
+    final ServerHolder eligibleServer = create10gbHistorical(TIER_1);
+    final ServerHolder serverWithNoDiskSpace = new ServerHolder(
+        new DruidServer("server1", "server1", null, 0L, ServerType.HISTORICAL, 
TIER_1, 0)
+            .toImmutableDruidServer(),
+        new TestLoadQueuePeon()
     );
 
-    Assert.assertEquals(2L, stats.getSegmentStat(Stats.Segments.ASSIGNED, 
TIER_1, DS_SMALL));
-    Assert.assertEquals(3L, stats.getSegmentStat(Stats.Segments.ASSIGNED, 
TIER_2, DS_SMALL));
-
-    Assert.assertTrue(
-        holdersOfLargeSegments.stream().allMatch(
-            holder -> holder.isLoadingSegment(smallSegment)
-        )
-    );
-    Assert.assertTrue(
-        holdersOfLargeSegments2.stream().allMatch(
-            holder -> holder.isLoadingSegment(smallSegment)
-        )
+    // Create a server with full load queue
+    final int maxSegmentsInLoadQueue = 5;
+    final ServerHolder serverWithFullQueue = new ServerHolder(
+        create10gbServer(ServerType.HISTORICAL, 
TIER_1).toImmutableDruidServer(),
+        new TestLoadQueuePeon(), false, maxSegmentsInLoadQueue, 100
     );
-    Assert.assertTrue(holderOfSmallSegment.isServingSegment(smallSegment));
+
+    List<DataSegment> segmentsInQueue
+        = CreateDataSegments.ofDatasource("koala")
+                            .forIntervals(maxSegmentsInLoadQueue, 
Granularities.MONTH)
+                            .withNumPartitions(1)
+                            .eachOfSizeInMb(10);
+    segmentsInQueue.forEach(s -> 
serverWithFullQueue.startOperation(SegmentAction.LOAD, s));
+    Assert.assertTrue(serverWithFullQueue.isLoadQueueFull());
+
+    DruidCluster cluster = DruidCluster.builder()
+                                       .add(eligibleServer)
+                                       .add(serverWithNoDiskSpace)
+                                       .add(serverWithFullQueue)
+                                       .build();
+    DruidCoordinatorRuntimeParams params = makeParamsWithUsedSegments(cluster, 
wikiSegment);
+
+    ForeverBroadcastDistributionRule rule = new 
ForeverBroadcastDistributionRule();
+    final CoordinatorRunStats stats = runRuleOnSegment(rule, wikiSegment, 
params);
+
+    // Verify that the segment is broadcast only to the eligible server
+    Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, 
TIER_1, DS_WIKI));
+    RowKey metricKey = RowKey.with(Dimension.DATASOURCE, DS_WIKI)
+                             .with(Dimension.TIER, TIER_1)
+                             .and(Dimension.DESCRIPTION, "Not enough disk 
space");
+    Assert.assertEquals(1L, stats.get(Stats.Segments.ASSIGN_SKIPPED, 
metricKey));
+
+    metricKey = RowKey.with(Dimension.DATASOURCE, DS_WIKI)
+                      .with(Dimension.TIER, TIER_1)
+                      .and(Dimension.DESCRIPTION, "Load queue is full");
+    Assert.assertEquals(1L, stats.get(Stats.Segments.ASSIGN_SKIPPED, 
metricKey));
   }
 
-  private DruidCoordinatorRuntimeParams makeCoordinartorRuntimeParams(
+  private CoordinatorRunStats runRuleOnSegment(
+      Rule rule,
+      DataSegment segment,
+      DruidCoordinatorRuntimeParams params
+  )
+  {
+    StrategicSegmentAssigner segmentAssigner = params.getSegmentAssigner();
+    rule.run(segment, segmentAssigner);
+    return segmentAssigner.getStats();
+  }
+
+  private DruidCoordinatorRuntimeParams makeParamsWithUsedSegments(
       DruidCluster druidCluster,
       DataSegment... usedSegments
   )
@@ -317,118 +238,31 @@ public class BroadcastDistributionRuleTest
         .withDruidCluster(druidCluster)
         .withUsedSegmentsInTest(usedSegments)
         .withBalancerStrategy(new RandomBalancerStrategy())
-        .withSegmentAssignerUsing(loadQueueManager)
+        .withSegmentAssignerUsing(new SegmentLoadQueueManager(null, null, 
null))
         .build();
   }
 
-  /**
-   * Servers:
-   * name             | segments
-   * -----------------+--------------
-   * active           | large segment
-   * decommissioning1 | small segment
-   * decommissioning2 | large segment
-   * <p>
-   * After running the rule for the small segment:
-   * active           | large & small segments
-   * decommissioning1 |
-   * decommissionint2 | large segment
-   */
-  @Test
-  public void testBroadcastDecommissioning()
+  private ServerHolder create10gbHistorical(String tier, DataSegment... 
segments)
   {
-    final ForeverBroadcastDistributionRule rule =
-        new ForeverBroadcastDistributionRule();
-
-    CoordinatorRunStats stats = runRuleAndGetStats(
-        rule,
-        smallSegment,
-        makeCoordinartorRuntimeParams(
-            secondCluster,
-            smallSegment,
-            largeSegments.get(0),
-            largeSegments.get(1)
-        )
-    );
-
-    Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, 
TIER_1, DS_SMALL));
-    Assert.assertEquals(1, activeServer.getPeon().getSegmentsToLoad().size());
-    Assert.assertEquals(1, 
decommissioningServer1.getPeon().getSegmentsToDrop().size());
-    Assert.assertEquals(0, 
decommissioningServer2.getPeon().getSegmentsToLoad().size());
-  }
-
-  @Test
-  public void testBroadcastToMultipleDataSources()
-  {
-    final ForeverBroadcastDistributionRule rule = new 
ForeverBroadcastDistributionRule();
-
-    CoordinatorRunStats stats = runRuleAndGetStats(
-        rule,
-        smallSegment,
-        makeCoordinartorRuntimeParams(
-            druidCluster,
-            smallSegment,
-            largeSegments.get(0),
-            largeSegments.get(1),
-            largeSegments.get(2),
-            largeSegments2.get(0),
-            largeSegments2.get(1)
-        )
-    );
-
-    Assert.assertEquals(2L, stats.getSegmentStat(Stats.Segments.ASSIGNED, 
TIER_1, DS_SMALL));
-    Assert.assertEquals(3L, stats.getSegmentStat(Stats.Segments.ASSIGNED, 
TIER_2, DS_SMALL));
-
-    Assert.assertTrue(
-        holdersOfLargeSegments.stream().allMatch(
-            holder -> holder.isLoadingSegment(smallSegment)
-        )
-    );
-    Assert.assertTrue(
-        holdersOfLargeSegments2.stream().allMatch(
-            holder -> holder.isLoadingSegment(smallSegment)
-        )
-    );
-    Assert.assertFalse(holderOfSmallSegment.isLoadingSegment(smallSegment));
+    DruidServer server = create10gbServer(ServerType.HISTORICAL, tier);
+    for (DataSegment segment : segments) {
+      server.addDataSegment(segment);
+    }
+    return new ServerHolder(server.toImmutableDruidServer(), new 
TestLoadQueuePeon());
   }
 
-  @Test
-  public void testBroadcastToAllServers()
+  private ServerHolder createDecommissioningHistorical(String tier, 
DataSegment... segments)
   {
-    final ForeverBroadcastDistributionRule rule = new 
ForeverBroadcastDistributionRule();
-
-    CoordinatorRunStats stats = runRuleAndGetStats(
-        rule,
-        smallSegment,
-        makeCoordinartorRuntimeParams(
-            druidCluster,
-            smallSegment,
-            largeSegments.get(0),
-            largeSegments.get(1),
-            largeSegments.get(2),
-            largeSegments2.get(0),
-            largeSegments2.get(1)
-        )
-    );
-
-    Assert.assertEquals(2L, stats.getSegmentStat(Stats.Segments.ASSIGNED, 
TIER_1, DS_SMALL));
-    Assert.assertEquals(3L, stats.getSegmentStat(Stats.Segments.ASSIGNED, 
TIER_2, DS_SMALL));
-
-    Assert.assertTrue(
-        druidCluster.getAllServers().stream().allMatch(
-            holder -> holder.isLoadingSegment(smallSegment) || 
holder.isServingSegment(smallSegment)
-        )
-    );
+    DruidServer server = create10gbServer(ServerType.HISTORICAL, tier);
+    for (DataSegment segment : segments) {
+      server.addDataSegment(segment);
+    }
+    return new ServerHolder(server.toImmutableDruidServer(), new 
TestLoadQueuePeon(), true);
   }
 
-  private CoordinatorRunStats runRuleAndGetStats(
-      Rule rule,
-      DataSegment segment,
-      DruidCoordinatorRuntimeParams params
-  )
+  private DruidServer create10gbServer(ServerType type, String tier)
   {
-    StrategicSegmentAssigner segmentAssigner = params.getSegmentAssigner();
-    rule.run(segment, segmentAssigner);
-    return segmentAssigner.getStats();
+    final String name = "server_" + serverId++;
+    return new DruidServer(name, name, null, 10L << 30, type, tier, 0);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to