This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 12e8fa5c97 Prevent coordinator from getting stuck if leadership 
changes during coordinator run (#14385)
12e8fa5c97 is described below

commit 12e8fa5c97a37ca7636bafda17a0cbb85ff0747f
Author: Kashif Faraz <[email protected]>
AuthorDate: Thu Jun 8 15:29:20 2023 +0530

    Prevent coordinator from getting stuck if leadership changes during 
coordinator run (#14385)
    
    Changes:
    - Add a timeout of 1 minute to resultFuture.get() in 
`CostBalancerStrategy.chooseBestServer`.
    1 minute is the typical time for a full coordinator run and is more than 
enough time for cost
    computations of a single segment.
    - Raise an alert if an exception is encountered while computing costs and 
if the executor has
    not been shutdown. This is because a shutdown is intentional and does not 
require an alert.
---
 .../server/coordinator/CostBalancerStrategy.java   | 35 ++++++++++++----
 .../coordinator/CostBalancerStrategyTest.java      | 46 ++++++++++++++++++++++
 .../simulate/BlockingExecutorService.java          | 19 ++++++++-
 3 files changed, 90 insertions(+), 10 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java
index 8ff4f6a4a2..de6d3824f1 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java
@@ -25,6 +25,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import org.apache.commons.math3.util.FastMath;
 import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.timeline.DataSegment;
 import org.joda.time.Interval;
@@ -36,6 +37,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.NavigableSet;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
 public class CostBalancerStrategy implements BalancerStrategy
@@ -226,7 +229,7 @@ public class CostBalancerStrategy implements 
BalancerStrategy
 
     try {
       // results is an un-ordered list of a pair consisting of the 'cost' of a 
segment being on a server and the server
-      List<Pair<Double, ServerHolder>> results = resultsFuture.get();
+      List<Pair<Double, ServerHolder>> results = resultsFuture.get(1, 
TimeUnit.MINUTES);
       return results.stream()
                     // Comparator.comapringDouble will order by lowest cost...
                     // reverse it because we want to drop from the highest 
cost servers first
@@ -235,7 +238,7 @@ public class CostBalancerStrategy implements 
BalancerStrategy
                     .iterator();
     }
     catch (Exception e) {
-      log.makeAlert(e, "Cost Balancer Multithread strategy wasn't able to 
complete cost computation.").emit();
+      alertOnFailure(e, "pick drop server");
     }
     return Collections.emptyIterator();
   }
@@ -298,10 +301,7 @@ public class CostBalancerStrategy implements 
BalancerStrategy
 
     log.info(
         "[%s]: Initial Total Cost: [%f], Normalization: [%f], Initial 
Normalized Cost: [%f]",
-        tier,
-        initialTotalCost,
-        normalization,
-        normalizedInitialCost
+        tier, initialTotalCost, normalization, normalizedInitialCost
     );
   }
 
@@ -373,7 +373,7 @@ public class CostBalancerStrategy implements 
BalancerStrategy
     final List<Pair<Double, ServerHolder>> bestServers = new ArrayList<>();
     bestServers.add(bestServer);
     try {
-      for (Pair<Double, ServerHolder> server : resultsFuture.get()) {
+      for (Pair<Double, ServerHolder> server : resultsFuture.get(1, 
TimeUnit.MINUTES)) {
         if (server.lhs <= bestServers.get(0).lhs) {
           if (server.lhs < bestServers.get(0).lhs) {
             bestServers.clear();
@@ -390,9 +390,28 @@ public class CostBalancerStrategy implements 
BalancerStrategy
       bestServer = 
bestServers.get(ThreadLocalRandom.current().nextInt(bestServers.size()));
     }
     catch (Exception e) {
-      log.makeAlert(e, "Cost Balancer Multithread strategy wasn't able to 
complete cost computation.").emit();
+      alertOnFailure(e, "choose best load server");
     }
     return bestServer;
   }
+
+  private void alertOnFailure(Exception e, String action)
+  {
+    // Do not alert if the executor has been shutdown
+    if (exec.isShutdown()) {
+      log.noStackTrace().info("Balancer executor was terminated. Failing 
action [%s].", action);
+      return;
+    }
+
+    final boolean hasTimedOut = e instanceof TimeoutException;
+
+    final String message = StringUtils.format(
+        "Cost balancer strategy %s in action [%s].%s",
+        hasTimedOut ? "timed out" : "failed", action,
+        hasTimedOut ? " Try setting a higher value of 
'balancerComputeThreads'." : ""
+    );
+    log.makeAlert(e, message).emit();
+  }
+
 }
 
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/CostBalancerStrategyTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/CostBalancerStrategyTest.java
index 1b6a0cfdc6..5773547748 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/CostBalancerStrategyTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/CostBalancerStrategyTest.java
@@ -23,6 +23,10 @@ import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.druid.client.DruidServer;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.granularity.GranularityType;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.java.util.emitter.service.AlertEvent;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
 import org.apache.druid.server.coordination.ServerType;
 import org.apache.druid.server.coordinator.simulate.BlockingExecutorService;
 import org.apache.druid.timeline.DataSegment;
@@ -32,9 +36,11 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -44,6 +50,7 @@ public class CostBalancerStrategyTest
   private static final double DELTA = 1e-6;
   private static final String DS_WIKI = "wiki";
 
+  private StubServiceEmitter serviceEmitter;
   private ExecutorService balancerExecutor;
   private CostBalancerStrategy strategy;
   private int uniqueServerId;
@@ -53,6 +60,9 @@ public class CostBalancerStrategyTest
   {
     balancerExecutor = new BlockingExecutorService("test-balance-exec-%d");
     strategy = new 
CostBalancerStrategy(MoreExecutors.listeningDecorator(balancerExecutor));
+
+    serviceEmitter = new StubServiceEmitter("test-service", "host");
+    EmittingLogger.registerEmitter(serviceEmitter);
   }
 
   @After
@@ -302,6 +312,42 @@ public class CostBalancerStrategyTest
     );
   }
 
+  @Test
+  public void testFindServerAfterExecutorShutdownThrowsException()
+  {
+    DataSegment segment = CreateDataSegments.ofDatasource(DS_WIKI)
+                                            .forIntervals(1, Granularities.DAY)
+                                            .startingAt("2012-10-24")
+                                            .eachOfSizeInMb(100).get(0);
+
+    ServerHolder serverA = new 
ServerHolder(createHistorical().toImmutableDruidServer(), null);
+    ServerHolder serverB = new 
ServerHolder(createHistorical().toImmutableDruidServer(), null);
+
+    balancerExecutor.shutdownNow();
+    Assert.assertThrows(
+        RejectedExecutionException.class,
+        () -> strategy.findNewSegmentHomeBalancer(segment, 
Arrays.asList(serverA, serverB))
+    );
+  }
+
+  @Test(timeout = 90_000L)
+  public void testFindServerRaisesAlertOnTimeout()
+  {
+    DataSegment segment = CreateDataSegments.ofDatasource(DS_WIKI)
+                                            .forIntervals(1, Granularities.DAY)
+                                            .startingAt("2012-10-24")
+                                            .eachOfSizeInMb(100).get(0);
+
+    ServerHolder serverA = new 
ServerHolder(createHistorical().toImmutableDruidServer(), null);
+    ServerHolder serverB = new 
ServerHolder(createHistorical().toImmutableDruidServer(), null);
+
+    strategy.findNewSegmentHomeBalancer(segment, Arrays.asList(serverA, 
serverB));
+
+    List<Event> events = serviceEmitter.getEvents();
+    Assert.assertEquals(1, events.size());
+    Assert.assertTrue(events.get(0) instanceof AlertEvent);
+  }
+
   private void verifyServerCosts(
       DataSegment segment,
       List<ServerHolder> serverHolders,
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/BlockingExecutorService.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/BlockingExecutorService.java
index fc59a6bd9d..111d88bf55 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/BlockingExecutorService.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/BlockingExecutorService.java
@@ -30,6 +30,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -48,6 +49,8 @@ public class BlockingExecutorService implements 
ExecutorService
   private final String nameFormat;
   private final Queue<Task<?>> taskQueue = new ConcurrentLinkedQueue<>();
 
+  private boolean isShutdown;
+
   public BlockingExecutorService(String nameFormat)
   {
     this.nameFormat = nameFormat;
@@ -115,12 +118,14 @@ public class BlockingExecutorService implements 
ExecutorService
   @Override
   public <T> Future<T> submit(Callable<T> task)
   {
+    verifyNotShutdown();
     return addTaskToQueue(task);
   }
 
   @Override
   public <T> Future<T> submit(Runnable task, T result)
   {
+    verifyNotShutdown();
     return addTaskToQueue(() -> {
       task.run();
       return result;
@@ -130,6 +135,7 @@ public class BlockingExecutorService implements 
ExecutorService
   @Override
   public Future<?> submit(Runnable task)
   {
+    verifyNotShutdown();
     return addTaskToQueue(() -> {
       task.run();
       return null;
@@ -142,6 +148,13 @@ public class BlockingExecutorService implements 
ExecutorService
     submit(command);
   }
 
+  private void verifyNotShutdown()
+  {
+    if (isShutdown) {
+      throw new RejectedExecutionException();
+    }
+  }
+
   private <T> Future<T> addTaskToQueue(Callable<T> callable)
   {
     Task<T> task = new Task<>(callable);
@@ -153,25 +166,27 @@ public class BlockingExecutorService implements 
ExecutorService
   @Override
   public void shutdown()
   {
+    isShutdown = true;
     taskQueue.clear();
   }
 
   @Override
   public List<Runnable> shutdownNow()
   {
+    shutdown();
     return null;
   }
 
   @Override
   public boolean isShutdown()
   {
-    return false;
+    return isShutdown;
   }
 
   @Override
   public boolean isTerminated()
   {
-    return false;
+    return isShutdown && taskQueue.isEmpty();
   }
 
   @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to