This is an automated email from the ASF dual-hosted git repository.
capistrant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 1b05d6e recreate the balancer executor only when needed (#10280)
1b05d6e is described below
commit 1b05d6e542fa3164a562f7112ee2d7a13c91d11e
Author: Arvin.Z <[email protected]>
AuthorDate: Wed Sep 16 12:25:57 2020 -0700
recreate the balancer executor only when needed (#10280)
* recreate the balancer executor only when needed
* fix UT error
* shutdown the balancer executor in stopBeingLeader and stop
* remove commented code
* remove comments
---
.../druid/server/coordinator/DruidCoordinator.java | 71 ++++++++++++++++++----
.../server/coordinator/DruidCoordinatorTest.java | 69 +++++++++++++++++++++
2 files changed, 128 insertions(+), 12 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index 806ba04..a406761 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -19,6 +19,7 @@
package org.apache.druid.server.coordinator;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
@@ -149,6 +150,9 @@ public class DruidCoordinator
private volatile boolean started = false;
private volatile SegmentReplicantLookup segmentReplicantLookup = null;
+ private int cachedBalancerThreadNumber;
+ private ListeningExecutorService balancerExec;
+
@Inject
public DruidCoordinator(
DruidCoordinatorConfig config,
@@ -483,6 +487,18 @@ public class DruidCoordinator
}
}
+ @VisibleForTesting
+ public int getCachedBalancerThreadNumber()
+ {
+ return cachedBalancerThreadNumber;
+ }
+
+ @VisibleForTesting
+ public ListeningExecutorService getBalancerExec()
+ {
+ return balancerExec;
+ }
+
@LifecycleStart
public void start()
{
@@ -524,6 +540,10 @@ public class DruidCoordinator
started = false;
exec.shutdownNow();
+
+ if (balancerExec != null) {
+ balancerExec.shutdownNow();
+ }
}
}
@@ -612,6 +632,11 @@ public class DruidCoordinator
lookupCoordinatorManager.stop();
metadataRuleManager.stop();
segmentsMetadataManager.stopPollingDatabasePeriodically();
+
+ if (balancerExec != null) {
+ balancerExec.shutdownNow();
+ balancerExec = null;
+ }
}
}
@@ -647,22 +672,52 @@ public class DruidCoordinator
return ImmutableList.of(compactSegments);
}
- private class DutiesRunnable implements Runnable
+ @VisibleForTesting
+ protected class DutiesRunnable implements Runnable
{
private final long startTimeNanos = System.nanoTime();
private final List<CoordinatorDuty> duties;
private final int startingLeaderCounter;
- private DutiesRunnable(List<CoordinatorDuty> duties, final int
startingLeaderCounter)
+ protected DutiesRunnable(List<CoordinatorDuty> duties, final int
startingLeaderCounter)
{
this.duties = duties;
this.startingLeaderCounter = startingLeaderCounter;
}
+ @VisibleForTesting
+ protected void initBalancerExecutor()
+ {
+ final int currentNumber =
getDynamicConfigs().getBalancerComputeThreads();
+ final String threadNameFormat = "coordinator-cost-balancer-%s";
+ // fist time initialization
+ if (balancerExec == null) {
+ balancerExec = MoreExecutors.listeningDecorator(Execs.multiThreaded(
+ currentNumber,
+ threadNameFormat
+ ));
+ cachedBalancerThreadNumber = currentNumber;
+ return;
+ }
+
+ if (cachedBalancerThreadNumber != currentNumber) {
+ log.info(
+ "balancerComputeThreads has been changed from [%s] to [%s],
recreating the thread pool.",
+ cachedBalancerThreadNumber,
+ currentNumber
+ );
+ balancerExec.shutdownNow();
+ balancerExec = MoreExecutors.listeningDecorator(Execs.multiThreaded(
+ currentNumber,
+ threadNameFormat
+ ));
+ cachedBalancerThreadNumber = currentNumber;
+ }
+ }
+
@Override
public void run()
{
- ListeningExecutorService balancerExec = null;
try {
synchronized (lock) {
if (!coordLeaderSelector.isLeader()) {
@@ -684,10 +739,7 @@ public class DruidCoordinator
}
}
- balancerExec = MoreExecutors.listeningDecorator(Execs.multiThreaded(
- getDynamicConfigs().getBalancerComputeThreads(),
- "coordinator-cost-balancer-%s"
- ));
+ initBalancerExecutor();
BalancerStrategy balancerStrategy =
factory.createBalancerStrategy(balancerExec);
// Do coordinator stuff.
@@ -733,11 +785,6 @@ public class DruidCoordinator
catch (Exception e) {
log.makeAlert(e, "Caught exception, ignoring so that schedule keeps
going.").emit();
}
- finally {
- if (balancerExec != null) {
- balancerExec.shutdownNow();
- }
- }
}
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
index 51308cb..9d1756e 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.ListeningExecutorService;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
import org.apache.curator.framework.CuratorFramework;
@@ -665,6 +666,74 @@ public class DruidCoordinatorTest extends CuratorTestBase
EasyMock.verify(metadataRuleManager);
}
+ @Test
+ public void testBalancerThreadNumber()
+ {
+ CoordinatorDynamicConfig dynamicConfig =
EasyMock.createNiceMock(CoordinatorDynamicConfig.class);
+
EasyMock.expect(dynamicConfig.getBalancerComputeThreads()).andReturn(5).times(2);
+
EasyMock.expect(dynamicConfig.getBalancerComputeThreads()).andReturn(10).once();
+
+ JacksonConfigManager configManager =
EasyMock.createNiceMock(JacksonConfigManager.class);
+ EasyMock.expect(
+ configManager.watch(
+ EasyMock.eq(CoordinatorDynamicConfig.CONFIG_KEY),
+ EasyMock.anyObject(Class.class),
+ EasyMock.anyObject()
+ )
+ ).andReturn(new AtomicReference(dynamicConfig)).anyTimes();
+
+ ScheduledExecutorFactory scheduledExecutorFactory =
EasyMock.createNiceMock(ScheduledExecutorFactory.class);
+ EasyMock.replay(configManager, dynamicConfig, scheduledExecutorFactory);
+
+ DruidCoordinator c = new DruidCoordinator(
+ null,
+ null,
+ configManager,
+ null,
+ null,
+ null,
+ null,
+ null,
+ scheduledExecutorFactory,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
+ );
+
+ DruidCoordinator.DutiesRunnable duty = c.new
DutiesRunnable(Collections.emptyList(), 0);
+ // before initialization
+ Assert.assertEquals(0, c.getCachedBalancerThreadNumber());
+ Assert.assertNull(c.getBalancerExec());
+
+ // first initialization
+ duty.initBalancerExecutor();
+ System.out.println("c.getCachedBalancerThreadNumber(): " +
c.getCachedBalancerThreadNumber());
+ Assert.assertEquals(5, c.getCachedBalancerThreadNumber());
+ ListeningExecutorService firstExec = c.getBalancerExec();
+ Assert.assertNotNull(firstExec);
+
+ // second initialization, expect no changes as cachedBalancerThreadNumber
is not changed
+ duty.initBalancerExecutor();
+ Assert.assertEquals(5, c.getCachedBalancerThreadNumber());
+ ListeningExecutorService secondExec = c.getBalancerExec();
+ Assert.assertNotNull(secondExec);
+ Assert.assertTrue(firstExec == secondExec);
+
+ // third initialization, expect executor recreated as
cachedBalancerThreadNumber is changed to 10
+ duty.initBalancerExecutor();
+ Assert.assertEquals(10, c.getCachedBalancerThreadNumber());
+ ListeningExecutorService thirdExec = c.getBalancerExec();
+ Assert.assertNotNull(thirdExec);
+ Assert.assertFalse(secondExec == thirdExec);
+ Assert.assertFalse(firstExec == thirdExec);
+ }
+
private CountDownLatch
createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(int latchCount,
PathChildrenCache pathChildrenCache,
Map<String, DataSegment> segments,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]