This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 12e8fa5c97 Prevent coordinator from getting stuck if leadership
changes during coordinator run (#14385)
12e8fa5c97 is described below
commit 12e8fa5c97a37ca7636bafda17a0cbb85ff0747f
Author: Kashif Faraz <[email protected]>
AuthorDate: Thu Jun 8 15:29:20 2023 +0530
Prevent coordinator from getting stuck if leadership changes during
coordinator run (#14385)
Changes:
- Add a timeout of 1 minute to resultFuture.get() in
`CostBalancerStrategy.chooseBestServer`.
1 minute is the typical time for a full coordinator run and is more than
enough time for cost
computations of a single segment.
- Raise an alert if an exception is encountered while computing costs and
if the executor has
not been shutdown. This is because a shutdown is intentional and does not
require an alert.
---
.../server/coordinator/CostBalancerStrategy.java | 35 ++++++++++++----
.../coordinator/CostBalancerStrategyTest.java | 46 ++++++++++++++++++++++
.../simulate/BlockingExecutorService.java | 19 ++++++++-
3 files changed, 90 insertions(+), 10 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java
b/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java
index 8ff4f6a4a2..de6d3824f1 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java
@@ -25,6 +25,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.commons.math3.util.FastMath;
import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
@@ -36,6 +37,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
public class CostBalancerStrategy implements BalancerStrategy
@@ -226,7 +229,7 @@ public class CostBalancerStrategy implements
BalancerStrategy
try {
// results is an un-ordered list of a pair consisting of the 'cost' of a
segment being on a server and the server
- List<Pair<Double, ServerHolder>> results = resultsFuture.get();
+ List<Pair<Double, ServerHolder>> results = resultsFuture.get(1,
TimeUnit.MINUTES);
return results.stream()
// Comparator.comapringDouble will order by lowest cost...
// reverse it because we want to drop from the highest
cost servers first
@@ -235,7 +238,7 @@ public class CostBalancerStrategy implements
BalancerStrategy
.iterator();
}
catch (Exception e) {
- log.makeAlert(e, "Cost Balancer Multithread strategy wasn't able to
complete cost computation.").emit();
+ alertOnFailure(e, "pick drop server");
}
return Collections.emptyIterator();
}
@@ -298,10 +301,7 @@ public class CostBalancerStrategy implements
BalancerStrategy
log.info(
"[%s]: Initial Total Cost: [%f], Normalization: [%f], Initial
Normalized Cost: [%f]",
- tier,
- initialTotalCost,
- normalization,
- normalizedInitialCost
+ tier, initialTotalCost, normalization, normalizedInitialCost
);
}
@@ -373,7 +373,7 @@ public class CostBalancerStrategy implements
BalancerStrategy
final List<Pair<Double, ServerHolder>> bestServers = new ArrayList<>();
bestServers.add(bestServer);
try {
- for (Pair<Double, ServerHolder> server : resultsFuture.get()) {
+ for (Pair<Double, ServerHolder> server : resultsFuture.get(1,
TimeUnit.MINUTES)) {
if (server.lhs <= bestServers.get(0).lhs) {
if (server.lhs < bestServers.get(0).lhs) {
bestServers.clear();
@@ -390,9 +390,28 @@ public class CostBalancerStrategy implements
BalancerStrategy
bestServer =
bestServers.get(ThreadLocalRandom.current().nextInt(bestServers.size()));
}
catch (Exception e) {
- log.makeAlert(e, "Cost Balancer Multithread strategy wasn't able to
complete cost computation.").emit();
+ alertOnFailure(e, "choose best load server");
}
return bestServer;
}
+
+ private void alertOnFailure(Exception e, String action)
+ {
+ // Do not alert if the executor has been shutdown
+ if (exec.isShutdown()) {
+ log.noStackTrace().info("Balancer executor was terminated. Failing
action [%s].", action);
+ return;
+ }
+
+ final boolean hasTimedOut = e instanceof TimeoutException;
+
+ final String message = StringUtils.format(
+ "Cost balancer strategy %s in action [%s].%s",
+ hasTimedOut ? "timed out" : "failed", action,
+ hasTimedOut ? " Try setting a higher value of
'balancerComputeThreads'." : ""
+ );
+ log.makeAlert(e, message).emit();
+ }
+
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/CostBalancerStrategyTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/CostBalancerStrategyTest.java
index 1b6a0cfdc6..5773547748 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/CostBalancerStrategyTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/CostBalancerStrategyTest.java
@@ -23,6 +23,10 @@ import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.client.DruidServer;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.GranularityType;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.java.util.emitter.service.AlertEvent;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.simulate.BlockingExecutorService;
import org.apache.druid.timeline.DataSegment;
@@ -32,9 +36,11 @@ import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -44,6 +50,7 @@ public class CostBalancerStrategyTest
private static final double DELTA = 1e-6;
private static final String DS_WIKI = "wiki";
+ private StubServiceEmitter serviceEmitter;
private ExecutorService balancerExecutor;
private CostBalancerStrategy strategy;
private int uniqueServerId;
@@ -53,6 +60,9 @@ public class CostBalancerStrategyTest
{
balancerExecutor = new BlockingExecutorService("test-balance-exec-%d");
strategy = new
CostBalancerStrategy(MoreExecutors.listeningDecorator(balancerExecutor));
+
+ serviceEmitter = new StubServiceEmitter("test-service", "host");
+ EmittingLogger.registerEmitter(serviceEmitter);
}
@After
@@ -302,6 +312,42 @@ public class CostBalancerStrategyTest
);
}
+ @Test
+ public void testFindServerAfterExecutorShutdownThrowsException()
+ {
+ DataSegment segment = CreateDataSegments.ofDatasource(DS_WIKI)
+ .forIntervals(1, Granularities.DAY)
+ .startingAt("2012-10-24")
+ .eachOfSizeInMb(100).get(0);
+
+ ServerHolder serverA = new
ServerHolder(createHistorical().toImmutableDruidServer(), null);
+ ServerHolder serverB = new
ServerHolder(createHistorical().toImmutableDruidServer(), null);
+
+ balancerExecutor.shutdownNow();
+ Assert.assertThrows(
+ RejectedExecutionException.class,
+ () -> strategy.findNewSegmentHomeBalancer(segment,
Arrays.asList(serverA, serverB))
+ );
+ }
+
+ @Test(timeout = 90_000L)
+ public void testFindServerRaisesAlertOnTimeout()
+ {
+ DataSegment segment = CreateDataSegments.ofDatasource(DS_WIKI)
+ .forIntervals(1, Granularities.DAY)
+ .startingAt("2012-10-24")
+ .eachOfSizeInMb(100).get(0);
+
+ ServerHolder serverA = new
ServerHolder(createHistorical().toImmutableDruidServer(), null);
+ ServerHolder serverB = new
ServerHolder(createHistorical().toImmutableDruidServer(), null);
+
+ strategy.findNewSegmentHomeBalancer(segment, Arrays.asList(serverA,
serverB));
+
+ List<Event> events = serviceEmitter.getEvents();
+ Assert.assertEquals(1, events.size());
+ Assert.assertTrue(events.get(0) instanceof AlertEvent);
+ }
+
private void verifyServerCosts(
DataSegment segment,
List<ServerHolder> serverHolders,
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/BlockingExecutorService.java
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/BlockingExecutorService.java
index fc59a6bd9d..111d88bf55 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/BlockingExecutorService.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/BlockingExecutorService.java
@@ -30,6 +30,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
/**
@@ -48,6 +49,8 @@ public class BlockingExecutorService implements
ExecutorService
private final String nameFormat;
private final Queue<Task<?>> taskQueue = new ConcurrentLinkedQueue<>();
+ private boolean isShutdown;
+
public BlockingExecutorService(String nameFormat)
{
this.nameFormat = nameFormat;
@@ -115,12 +118,14 @@ public class BlockingExecutorService implements
ExecutorService
@Override
public <T> Future<T> submit(Callable<T> task)
{
+ verifyNotShutdown();
return addTaskToQueue(task);
}
@Override
public <T> Future<T> submit(Runnable task, T result)
{
+ verifyNotShutdown();
return addTaskToQueue(() -> {
task.run();
return result;
@@ -130,6 +135,7 @@ public class BlockingExecutorService implements
ExecutorService
@Override
public Future<?> submit(Runnable task)
{
+ verifyNotShutdown();
return addTaskToQueue(() -> {
task.run();
return null;
@@ -142,6 +148,13 @@ public class BlockingExecutorService implements
ExecutorService
submit(command);
}
+ private void verifyNotShutdown()
+ {
+ if (isShutdown) {
+ throw new RejectedExecutionException();
+ }
+ }
+
private <T> Future<T> addTaskToQueue(Callable<T> callable)
{
Task<T> task = new Task<>(callable);
@@ -153,25 +166,27 @@ public class BlockingExecutorService implements
ExecutorService
@Override
public void shutdown()
{
+ isShutdown = true;
taskQueue.clear();
}
@Override
public List<Runnable> shutdownNow()
{
+ shutdown();
return null;
}
@Override
public boolean isShutdown()
{
- return false;
+ return isShutdown;
}
@Override
public boolean isTerminated()
{
- return false;
+ return isShutdown && taskQueue.isEmpty();
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]