This is an automated email from the ASF dual-hosted git repository.

capistrant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b05d6e  recreate the balancer executor only when needed (#10280)
1b05d6e is described below

commit 1b05d6e542fa3164a562f7112ee2d7a13c91d11e
Author: Arvin.Z <[email protected]>
AuthorDate: Wed Sep 16 12:25:57 2020 -0700

    recreate the balancer executor only when needed (#10280)
    
    * recreate the balancer executor only when needed
    
    * fix UT error
    
    * shutdown the balancer executor in stopBeingLeader and stop
    
    * remove commented code
    
    * remove comments
---
 .../druid/server/coordinator/DruidCoordinator.java | 71 ++++++++++++++++++----
 .../server/coordinator/DruidCoordinatorTest.java   | 69 +++++++++++++++++++++
 2 files changed, 128 insertions(+), 12 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index 806ba04..a406761 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.server.coordinator;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Ordering;
 import com.google.common.collect.Sets;
@@ -149,6 +150,9 @@ public class DruidCoordinator
   private volatile boolean started = false;
   private volatile SegmentReplicantLookup segmentReplicantLookup = null;
 
+  private int cachedBalancerThreadNumber;
+  private ListeningExecutorService balancerExec;
+
   @Inject
   public DruidCoordinator(
       DruidCoordinatorConfig config,
@@ -483,6 +487,18 @@ public class DruidCoordinator
     }
   }
 
+  @VisibleForTesting
+  public int getCachedBalancerThreadNumber()
+  {
+    return cachedBalancerThreadNumber;
+  }
+  
+  @VisibleForTesting
+  public ListeningExecutorService getBalancerExec()
+  {
+    return balancerExec;
+  }
+
   @LifecycleStart
   public void start()
   {
@@ -524,6 +540,10 @@ public class DruidCoordinator
       started = false;
 
       exec.shutdownNow();
+
+      if (balancerExec != null) {
+        balancerExec.shutdownNow();
+      }
     }
   }
 
@@ -612,6 +632,11 @@ public class DruidCoordinator
       lookupCoordinatorManager.stop();
       metadataRuleManager.stop();
       segmentsMetadataManager.stopPollingDatabasePeriodically();
+
+      if (balancerExec != null) {
+        balancerExec.shutdownNow();
+        balancerExec = null;
+      }
     }
   }
 
@@ -647,22 +672,52 @@ public class DruidCoordinator
     return ImmutableList.of(compactSegments);
   }
 
-  private class DutiesRunnable implements Runnable
+  @VisibleForTesting
+  protected class DutiesRunnable implements Runnable
   {
     private final long startTimeNanos = System.nanoTime();
     private final List<CoordinatorDuty> duties;
     private final int startingLeaderCounter;
 
-    private DutiesRunnable(List<CoordinatorDuty> duties, final int 
startingLeaderCounter)
+    protected DutiesRunnable(List<CoordinatorDuty> duties, final int 
startingLeaderCounter)
     {
       this.duties = duties;
       this.startingLeaderCounter = startingLeaderCounter;
     }
 
+    @VisibleForTesting
+    protected void initBalancerExecutor()
+    {
+      final int currentNumber = 
getDynamicConfigs().getBalancerComputeThreads();
+      final String threadNameFormat = "coordinator-cost-balancer-%s";
+      // fist time initialization
+      if (balancerExec == null) {
+        balancerExec = MoreExecutors.listeningDecorator(Execs.multiThreaded(
+            currentNumber,
+            threadNameFormat
+        ));
+        cachedBalancerThreadNumber = currentNumber;
+        return;
+      }
+
+      if (cachedBalancerThreadNumber != currentNumber) {
+        log.info(
+            "balancerComputeThreads has been changed from [%s] to [%s], 
recreating the thread pool.",
+            cachedBalancerThreadNumber,
+            currentNumber
+        );
+        balancerExec.shutdownNow();
+        balancerExec = MoreExecutors.listeningDecorator(Execs.multiThreaded(
+            currentNumber,
+            threadNameFormat
+        ));
+        cachedBalancerThreadNumber = currentNumber;
+      }
+    }
+
     @Override
     public void run()
     {
-      ListeningExecutorService balancerExec = null;
       try {
         synchronized (lock) {
           if (!coordLeaderSelector.isLeader()) {
@@ -684,10 +739,7 @@ public class DruidCoordinator
           }
         }
 
-        balancerExec = MoreExecutors.listeningDecorator(Execs.multiThreaded(
-            getDynamicConfigs().getBalancerComputeThreads(),
-            "coordinator-cost-balancer-%s"
-        ));
+        initBalancerExecutor();
         BalancerStrategy balancerStrategy = 
factory.createBalancerStrategy(balancerExec);
 
         // Do coordinator stuff.
@@ -733,11 +785,6 @@ public class DruidCoordinator
       catch (Exception e) {
         log.makeAlert(e, "Caught exception, ignoring so that schedule keeps 
going.").emit();
       }
-      finally {
-        if (balancerExec != null) {
-          balancerExec.shutdownNow();
-        }
-      }
     }
   }
 
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
index 51308cb..9d1756e 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.ListeningExecutorService;
 import it.unimi.dsi.fastutil.objects.Object2IntMap;
 import it.unimi.dsi.fastutil.objects.Object2LongMap;
 import org.apache.curator.framework.CuratorFramework;
@@ -665,6 +666,74 @@ public class DruidCoordinatorTest extends CuratorTestBase
     EasyMock.verify(metadataRuleManager);
   }
 
+  @Test
+  public void testBalancerThreadNumber()
+  {
+    CoordinatorDynamicConfig dynamicConfig = 
EasyMock.createNiceMock(CoordinatorDynamicConfig.class);
+    
EasyMock.expect(dynamicConfig.getBalancerComputeThreads()).andReturn(5).times(2);
+    
EasyMock.expect(dynamicConfig.getBalancerComputeThreads()).andReturn(10).once();
+
+    JacksonConfigManager configManager = 
EasyMock.createNiceMock(JacksonConfigManager.class);
+    EasyMock.expect(
+        configManager.watch(
+            EasyMock.eq(CoordinatorDynamicConfig.CONFIG_KEY),
+            EasyMock.anyObject(Class.class),
+            EasyMock.anyObject()
+        )
+    ).andReturn(new AtomicReference(dynamicConfig)).anyTimes();
+
+    ScheduledExecutorFactory scheduledExecutorFactory = 
EasyMock.createNiceMock(ScheduledExecutorFactory.class);
+    EasyMock.replay(configManager, dynamicConfig, scheduledExecutorFactory);
+
+    DruidCoordinator c = new DruidCoordinator(
+        null,
+        null,
+        configManager,
+        null,
+        null,
+        null,
+        null,
+        null,
+        scheduledExecutorFactory,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null
+    );
+
+    DruidCoordinator.DutiesRunnable duty = c.new 
DutiesRunnable(Collections.emptyList(), 0);
+    // before initialization
+    Assert.assertEquals(0, c.getCachedBalancerThreadNumber());
+    Assert.assertNull(c.getBalancerExec());
+
+    // first initialization
+    duty.initBalancerExecutor();
+    System.out.println("c.getCachedBalancerThreadNumber(): " + 
c.getCachedBalancerThreadNumber());
+    Assert.assertEquals(5, c.getCachedBalancerThreadNumber());
+    ListeningExecutorService firstExec = c.getBalancerExec();
+    Assert.assertNotNull(firstExec);
+
+    // second initialization, expect no changes as cachedBalancerThreadNumber 
is not changed
+    duty.initBalancerExecutor();
+    Assert.assertEquals(5, c.getCachedBalancerThreadNumber());
+    ListeningExecutorService secondExec = c.getBalancerExec();
+    Assert.assertNotNull(secondExec);
+    Assert.assertTrue(firstExec == secondExec);
+
+    // third initialization, expect executor recreated as 
cachedBalancerThreadNumber is changed to 10
+    duty.initBalancerExecutor();
+    Assert.assertEquals(10, c.getCachedBalancerThreadNumber());
+    ListeningExecutorService thirdExec = c.getBalancerExec();
+    Assert.assertNotNull(thirdExec);
+    Assert.assertFalse(secondExec == thirdExec);
+    Assert.assertFalse(firstExec == thirdExec);
+  }
+
   private CountDownLatch 
createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(int latchCount,
                                                                                
       PathChildrenCache pathChildrenCache,
                                                                                
       Map<String, DataSegment> segments,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to