This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new ab051d9c5e Add test for ReservoirSegmentSampler (#14591)
ab051d9c5e is described below
commit ab051d9c5e9a9eafe931a65422bb0f932f4d5b1b
Author: Kashif Faraz <[email protected]>
AuthorDate: Mon Jul 17 18:50:02 2023 +0530
Add test for ReservoirSegmentSampler (#14591)
Tests to verify the following behaviour have been added:
- Segments from more populous servers are more likely to be picked
irrespective of
sample size.
- Segments from all servers are equally likely to be picked if all servers
have equivalent
number of segments.
---
.../balancer/ReservoirSegmentSamplerTest.java | 137 ++++++++++++++++++---
1 file changed, 120 insertions(+), 17 deletions(-)
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/balancer/ReservoirSegmentSamplerTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/balancer/ReservoirSegmentSamplerTest.java
index 25b1801546..e6387a5a34 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/balancer/ReservoirSegmentSamplerTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/balancer/ReservoirSegmentSamplerTest.java
@@ -19,6 +19,7 @@
package org.apache.druid.server.coordinator.balancer;
+import com.google.common.collect.Lists;
import org.apache.druid.client.DruidServer;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.server.coordination.ServerType;
@@ -31,16 +32,16 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.function.Function;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
public class ReservoirSegmentSamplerTest
{
@@ -55,9 +56,6 @@ public class ReservoirSegmentSamplerTest
.withNumPartitions(10)
.eachOfSizeInMb(100);
- private final Function<ServerHolder, Collection<DataSegment>>
GET_SERVED_SEGMENTS
- = serverHolder -> serverHolder.getServer().iterateAllSegments();
-
@Before
public void setUp()
{
@@ -80,7 +78,7 @@ public class ReservoirSegmentSamplerTest
// due to the pseudo-randomness of this method, we may not select a
segment every single time no matter what.
segmentCountMap.compute(
ReservoirSegmentSampler
- .pickMovableSegmentsFrom(servers, 1, GET_SERVED_SEGMENTS,
Collections.emptySet())
+ .pickMovableSegmentsFrom(servers, 1,
ServerHolder::getServedSegments, Collections.emptySet())
.get(0).getSegment(),
(segment, count) -> count == null ? 1 : count + 1
);
@@ -151,9 +149,16 @@ public class ReservoirSegmentSamplerTest
Assert.assertTrue(pickedSegments.containsAll(loadingSegments));
// Pick only loaded segments
- pickedSegments = ReservoirSegmentSampler
- .pickMovableSegmentsFrom(Arrays.asList(server1, server2), 10,
GET_SERVED_SEGMENTS, Collections.emptySet())
-
.stream().map(BalancerSegmentHolder::getSegment).collect(Collectors.toSet());
+ List<BalancerSegmentHolder> pickedHolders =
ReservoirSegmentSampler.pickMovableSegmentsFrom(
+ Arrays.asList(server1, server2),
+ 10,
+ ServerHolder::getServedSegments,
+ Collections.emptySet()
+ );
+ pickedSegments = pickedHolders
+ .stream()
+ .map(BalancerSegmentHolder::getSegment)
+ .collect(Collectors.toSet());
// Verify that only loaded segments are picked
Assert.assertEquals(loadedSegments.size(), pickedSegments.size());
@@ -177,7 +182,7 @@ public class ReservoirSegmentSamplerTest
List<BalancerSegmentHolder> pickedSegments =
ReservoirSegmentSampler.pickMovableSegmentsFrom(
Arrays.asList(historical, broker),
10,
- GET_SERVED_SEGMENTS,
+ ServerHolder::getServedSegments,
Collections.emptySet()
);
@@ -206,8 +211,12 @@ public class ReservoirSegmentSamplerTest
);
// Try to pick all the segments on the servers
- List<BalancerSegmentHolder> pickedSegments = ReservoirSegmentSampler
- .pickMovableSegmentsFrom(servers, 10, GET_SERVED_SEGMENTS,
Collections.singleton(broadcastDatasource));
+ List<BalancerSegmentHolder> pickedSegments =
ReservoirSegmentSampler.pickMovableSegmentsFrom(
+ servers,
+ 10,
+ ServerHolder::getServedSegments,
+ Collections.singleton(broadcastDatasource)
+ );
// Verify that none of the broadcast segments are picked
Assert.assertEquals(2, pickedSegments.size());
@@ -216,21 +225,83 @@ public class ReservoirSegmentSamplerTest
}
}
+ @Test
+ public void testSegmentsFromAllServersAreEquallyLikelyToBePicked()
+ {
+ // Create 4 servers, each having an equal number of segments
+ final List<List<DataSegment>> subSegmentLists = Lists.partition(segments,
segments.size() / 4);
+ final List<ServerHolder> servers = IntStream.range(0, 4).mapToObj(
+ i -> createHistorical("server_" + i,
subSegmentLists.get(i).toArray(new DataSegment[0]))
+ ).collect(Collectors.toList());
+
+ // Get the distribution of picked segments for different sample percentages
+ final int[] samplePercentages = {50, 20, 10, 5};
+ for (int samplePercentage : samplePercentages) {
+ final int[] numSegmentsPickedFromServer
+ = pickSegmentsAndGetPickedCountPerServer(servers, samplePercentage,
50);
+
+ final int totalSegmentsPicked =
Arrays.stream(numSegmentsPickedFromServer).sum();
+
+ // Number of segments picked from each server is ~25% of total
+ final double expectedPickedSegments = totalSegmentsPicked * 0.25;
+ final double error = totalSegmentsPicked * 0.02;
+ for (int pickedSegments : numSegmentsPickedFromServer) {
+ Assert.assertEquals(expectedPickedSegments, pickedSegments, error);
+ }
+ }
+ }
+
+ @Test
+ public void testSegmentsFromMorePopulousServerAreMoreLikelyToBePicked()
+ {
+ // Create 4 servers, first one having twice as many segments as the rest
+ final List<List<DataSegment>> subSegmentLists = Lists.partition(segments,
segments.size() / 5);
+
+ final List<ServerHolder> servers = new ArrayList<>();
+ List<DataSegment> segmentsForServer0 = new
ArrayList<>(subSegmentLists.get(0));
+ segmentsForServer0.addAll(subSegmentLists.get(1));
+ servers.add(createHistorical("server_" + 0, segmentsForServer0));
+
+ IntStream.range(1, 4).mapToObj(
+ i -> createHistorical("server_" + i, subSegmentLists.get(i + 1))
+ ).forEach(servers::add);
+
+ final int[] samplePercentages = {50, 20, 10, 5};
+ for (int samplePercentage : samplePercentages) {
+ final int[] numSegmentsPickedFromServer
+ = pickSegmentsAndGetPickedCountPerServer(servers, samplePercentage,
50);
+
+ final int totalSegmentsPicked =
Arrays.stream(numSegmentsPickedFromServer).sum();
+
+ // Number of segments picked from server0 are ~40% of total and
+ // number of segments picked from other servers are each ~20% of total
+ double error = totalSegmentsPicked * 0.02;
+ Assert.assertEquals(totalSegmentsPicked * 0.40,
numSegmentsPickedFromServer[0], error);
+
+ for (int serverId = 1; serverId < servers.size(); ++serverId) {
+ Assert.assertEquals(totalSegmentsPicked * 0.20,
numSegmentsPickedFromServer[serverId], error);
+ }
+ }
+ }
+
@Test(timeout = 60_000)
- public void testNumberOfIterationsToCycleThroughAllSegments()
+ public void testNumberOfSamplingsRequiredToPickAllSegments()
{
- // The number of runs required for each sample percentage
+ // The number of sampling iterations required for each sample percentage
// remains more or less fixed, even with a larger number of segments
final int[] samplePercentages = {100, 50, 10, 5, 1};
final int[] expectedIterations = {1, 20, 100, 200, 1000};
final int[] totalObservedIterations = new int[5];
+
+ // For every sample percentage, count the minimum number of required
samplings
for (int i = 0; i < 50; ++i) {
for (int j = 0; j < samplePercentages.length; ++j) {
- totalObservedIterations[j] +=
countMinRunsWithSamplePercent(samplePercentages[j]);
+ totalObservedIterations[j] +=
countMinRunsToPickAllSegments(samplePercentages[j]);
}
}
+ // Compute the avg value from the 50 observations for each sample
percentage
for (int j = 0; j < samplePercentages.length; ++j) {
double avgObservedIterations = totalObservedIterations[j] / 50.0;
Assert.assertTrue(avgObservedIterations <= expectedIterations[j]);
@@ -244,7 +315,7 @@ public class ReservoirSegmentSamplerTest
* <p>
* {@code k = sampleSize = totalNumSegments * samplePercentage}
*/
- private int countMinRunsWithSamplePercent(int samplePercentage)
+ private int countMinRunsToPickAllSegments(int samplePercentage)
{
final int numSegments = segments.size();
final List<ServerHolder> servers = Arrays.asList(
@@ -259,7 +330,7 @@ public class ReservoirSegmentSamplerTest
int numIterations = 1;
for (; numIterations < 10000; ++numIterations) {
ReservoirSegmentSampler
- .pickMovableSegmentsFrom(servers, sampleSize, GET_SERVED_SEGMENTS,
Collections.emptySet())
+ .pickMovableSegmentsFrom(servers, sampleSize,
ServerHolder::getServedSegments, Collections.emptySet())
.forEach(holder -> pickedSegments.add(holder.getSegment()));
if (pickedSegments.size() >= numSegments) {
@@ -270,6 +341,38 @@ public class ReservoirSegmentSamplerTest
return numIterations;
}
+ private int[] pickSegmentsAndGetPickedCountPerServer(
+ List<ServerHolder> servers,
+ int samplePercentage,
+ int numIterations
+ )
+ {
+ final int numSegmentsToPick = (int) (segments.size() * samplePercentage /
100.0);
+ final int[] numSegmentsPickedFromServer = new int[servers.size()];
+
+ for (int i = 0; i < numIterations; ++i) {
+ List<BalancerSegmentHolder> pickedSegments =
ReservoirSegmentSampler.pickMovableSegmentsFrom(
+ servers,
+ numSegmentsToPick,
+ ServerHolder::getServedSegments,
+ Collections.emptySet()
+ );
+
+ // Get the number of segments picked from each server
+ for (BalancerSegmentHolder pickedSegment : pickedSegments) {
+ int serverIndex = servers.indexOf(pickedSegment.getServer());
+ numSegmentsPickedFromServer[serverIndex]++;
+ }
+ }
+
+ return numSegmentsPickedFromServer;
+ }
+
+ private ServerHolder createHistorical(String serverName, List<DataSegment>
loadedSegments)
+ {
+ return createHistorical(serverName, loadedSegments.toArray(new
DataSegment[0]));
+ }
+
private ServerHolder createHistorical(String serverName, DataSegment...
loadedSegments)
{
final DruidServer server =
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]