This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new ff7407fd9a IGNITE-19736 Do not cancel tasks in
DistributionZoneManager#executor if they were created by immediate
scaleUp/scaleDown events. Avoid concurrent executing several tasks for the same
zone. (#2201)
ff7407fd9a is described below
commit ff7407fd9a930bcabbecb40d0f3067320054075a
Author: Sergey Uttsel <[email protected]>
AuthorDate: Tue Jun 27 11:30:15 2023 +0300
IGNITE-19736 Do not cancel tasks in DistributionZoneManager#executor if
they were created by immediate scaleUp/scaleDown events. Avoid concurrent
executing several tasks for the same zone. (#2201)
---
.../distributionzones/DistributionZoneManager.java | 61 ++--
.../distributionzones/DistributionZonesUtil.java | 22 ++
.../DistributionZoneManagerScaleUpTest.java | 312 ---------------------
.../DistributionZonesSchedulersTest.java | 275 ++++++++++++++++--
4 files changed, 312 insertions(+), 358 deletions(-)
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index 70f082753c..c2fb15a2af 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -21,8 +21,10 @@ import static java.util.Collections.emptySet;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.concurrent.CompletableFuture.supplyAsync;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.createZoneManagerExecutor;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deleteDataNodesAndUpdateTriggerKeys;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.extractChangeTriggerRevision;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.extractDataNodes;
@@ -73,9 +75,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import org.apache.ignite.configuration.ConfigurationChangeException;
@@ -282,10 +281,8 @@ public class DistributionZoneManager implements
IgniteComponent {
nodesAttributes = new ConcurrentHashMap<>();
- executor = new ScheduledThreadPoolExecutor(
- Math.min(Runtime.getRuntime().availableProcessors() * 3, 20),
- new
NamedThreadFactory(NamedThreadFactory.threadPrefix(nodeName,
DISTRIBUTION_ZONE_MANAGER_POOL_NAME), LOG),
- new ThreadPoolExecutor.DiscardPolicy()
+ executor = createZoneManagerExecutor(
+ new
NamedThreadFactory(NamedThreadFactory.threadPrefix(nodeName,
DISTRIBUTION_ZONE_MANAGER_POOL_NAME), LOG)
);
// It's safe to leak with partially initialised object here, because
rebalanceEngine is only accessible through this or by
@@ -361,7 +358,7 @@ public class DistributionZoneManager implements
IgniteComponent {
metaStorageManager.unregisterWatch(topologyWatchListener);
metaStorageManager.unregisterWatch(dataNodesWatchListener);
- shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS);
+ shutdownAndAwaitTermination(executor, 10, SECONDS);
}
/**
@@ -1610,6 +1607,12 @@ public class DistributionZoneManager implements
IgniteComponent {
/** Schedule task for a scale down process. */
private ScheduledFuture<?> scaleDownTask;
+ /** The delay for the scale up task. */
+ private long scaleUpTaskDelay;
+
+ /** The delay for the scale down task. */
+ private long scaleDownTaskDelay;
+
/**
* Map that stores pairs revision -> {@link Augmentation} for a zone.
With this map we can track which nodes
* should be added or removed in the processes of scale up or scale
down. Revision helps to track visibility of the events
@@ -1644,56 +1647,64 @@ public class DistributionZoneManager implements
IgniteComponent {
}
/**
- * Reschedules existing scale up task, if it is not started yet, or
schedules new one, if the current task cannot be canceled.
+ * Reschedules existing scale up task, if it is not started yet and
the delay of this task is not immediate,
+ * or schedules new one, if the current task cannot be canceled.
*
* @param delay Delay to start runnable in seconds.
* @param runnable Custom logic to run.
*/
synchronized void rescheduleScaleUp(long delay, Runnable runnable) {
- if (scaleUpTask != null) {
- scaleUpTask.cancel(false);
- }
+ stopScaleUp();
- scaleUpTask = executor.schedule(runnable, delay, TimeUnit.SECONDS);
+ scaleUpTask = executor.schedule(runnable, delay, SECONDS);
+
+ scaleUpTaskDelay = delay;
}
/**
- * Reschedules existing scale down task, if it is not started yet, or
schedules new one, if the current task cannot be canceled.
+ * Reschedules existing scale down task, if it is not started yet and
the delay of this task is not immediate,
+ * or schedules new one, if the current task cannot be canceled.
*
* @param delay Delay to start runnable in seconds.
* @param runnable Custom logic to run.
*/
synchronized void rescheduleScaleDown(long delay, Runnable runnable) {
- if (scaleDownTask != null) {
- scaleDownTask.cancel(false);
- }
+ stopScaleDown();
- scaleDownTask = executor.schedule(runnable, delay,
TimeUnit.SECONDS);
+ scaleDownTask = executor.schedule(runnable, delay, SECONDS);
+
+ scaleDownTaskDelay = delay;
}
/**
- * Cancels task for scale up and scale down.
+ * Cancels task for scale up and scale down. Used on {@link
ZonesConfigurationListener#onDelete(ConfigurationNotificationEvent)}.
+ * Not need to check {@code scaleUpTaskDelay} and {@code
scaleDownTaskDelay} because after timer stopping on zone delete event
+ * the data nodes value will be updated.
*/
synchronized void stopTimers() {
- stopScaleUp();
+ if (scaleUpTask != null) {
+ scaleUpTask.cancel(false);
+ }
- stopScaleDown();
+ if (scaleDownTask != null) {
+ scaleDownTask.cancel(false);
+ }
}
/**
- * Cancels task for scale up.
+ * Cancels task for scale up if it is not started yet and the delay of
this task is not immediate.
*/
synchronized void stopScaleUp() {
- if (scaleUpTask != null) {
+ if (scaleUpTask != null && scaleUpTaskDelay > 0) {
scaleUpTask.cancel(false);
}
}
/**
- * Cancels task for scale down.
+ * Cancels task for scale down if it is not started yet and the delay
of this task is not immediate.
*/
synchronized void stopScaleDown() {
- if (scaleDownTask != null) {
+ if (scaleDownTask != null && scaleDownTaskDelay > 0) {
scaleDownTask.cancel(false);
}
}
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
index 2046629c2a..32d3571683 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
@@ -40,6 +40,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration;
import
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
@@ -47,6 +50,7 @@ import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.dsl.CompoundCondition;
import org.apache.ignite.internal.metastorage.dsl.SimpleCondition;
import org.apache.ignite.internal.metastorage.dsl.Update;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.DistributionZoneNotFoundException;
@@ -532,4 +536,22 @@ public class DistributionZonesUtil {
.map(Node::nodeName)
.collect(toSet());
}
+
+ /**
+ * Create an executor for the zone manager.
+ * Used a single thread executor to avoid concurrent executing several
tasks for the same zone.
+ * ScheduledThreadPoolExecutor guarantee that tasks scheduled for exactly
the same
+ * execution time are enabled in first-in-first-out (FIFO) order of
submission.
+ * // TODO: IGNITE-19783 Need to use a striped executor.
+ *
+ * @param namedThreadFactory Named thread factory.
+ * @return Executor.
+ */
+ static ScheduledExecutorService
createZoneManagerExecutor(NamedThreadFactory namedThreadFactory) {
+ return new ScheduledThreadPoolExecutor(
+ 1,
+ namedThreadFactory,
+ new ThreadPoolExecutor.DiscardPolicy()
+ );
+ }
}
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
index d6b47b81ea..53f853cad0 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
@@ -37,7 +37,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
@@ -46,15 +45,10 @@ import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledThreadPoolExecutor;
-import org.apache.ignite.configuration.NamedConfigurationTree;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import
org.apache.ignite.internal.distributionzones.DistributionZoneConfigurationParameters.Builder;
import
org.apache.ignite.internal.distributionzones.DistributionZoneManager.ZoneState;
-import
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneChange;
-import
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration;
-import
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneView;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.metastorage.server.If;
import org.apache.ignite.network.NetworkAddress;
@@ -307,312 +301,6 @@ public class DistributionZoneManagerScaleUpTest extends
BaseDistributionZoneMana
assertZoneScaleDownChangeTriggerKey(null, ZONE_1_ID, keyValueStorage);
}
- @Test
- void testTwoScaleUpTimersSecondTimerRunFirst() throws Exception {
- preparePrerequisites();
-
- NamedConfigurationTree<DistributionZoneConfiguration,
DistributionZoneView, DistributionZoneChange> zones =
- zonesConfiguration.distributionZones();
-
- DistributionZoneView zoneView = zones.value().get(0);
-
- CountDownLatch in1 = new CountDownLatch(1);
- CountDownLatch in2 = new CountDownLatch(1);
- CountDownLatch out1 = new CountDownLatch(1);
- CountDownLatch out2 = new CountDownLatch(1);
-
- distributionZoneManager.scheduleTimers(
- zoneView,
- Set.of(D),
- Set.of(),
- prerequisiteRevision + 1,
- (zoneId, revision) -> {
- try {
- in1.await();
- } catch (InterruptedException e) {
- fail();
- }
-
- return testSaveDataNodesOnScaleUp(zoneId,
revision).thenRun(out1::countDown);
- },
- (t1, t2) -> null
- );
-
- // Assert that first task was run and event about adding node "D" with
revision {@code prerequisiteRevision + 1} was added
- // to the topologyAugmentationMap of the zone.
- assertThatZonesAugmentationMapContainsRevision(ZONE_1_ID,
prerequisiteRevision + 1);
-
- distributionZoneManager.scheduleTimers(
- zoneView,
- Set.of(E),
- Set.of(),
- prerequisiteRevision + 2,
- (zoneId, revision) -> {
- try {
- in2.await();
- } catch (InterruptedException e) {
- fail();
- }
-
- return testSaveDataNodesOnScaleUp(zoneId,
revision).thenRun(() -> {
- try {
- out2.await();
- } catch (InterruptedException e) {
- fail();
- }
- });
- },
- (t1, t2) -> null
- );
-
- // Assert that second task was run and event about adding node "E"
with revision {@code prerequisiteRevision + 2} was added
- // to the topologyAugmentationMap of the zone.
- assertThatZonesAugmentationMapContainsRevision(ZONE_1_ID,
prerequisiteRevision + 2);
-
- //Second task is propagating data nodes first.
- in2.countDown();
-
- assertZoneScaleUpChangeTriggerKey(prerequisiteRevision + 2, ZONE_1_ID,
keyValueStorage);
-
- assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C, D, E),
keyValueStorage);
-
- out2.countDown();
-
- in1.countDown();
-
- //Waiting for the first scheduler ends it work.
- out1.countDown();
-
- // Assert that nothing has been changed.
- assertZoneScaleUpChangeTriggerKey(prerequisiteRevision + 2, ZONE_1_ID,
keyValueStorage);
-
- assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C, D, E),
keyValueStorage);
- }
-
- @Test
- void testTwoScaleDownTimersSecondTimerRunFirst() throws Exception {
- preparePrerequisites();
-
- NamedConfigurationTree<DistributionZoneConfiguration,
DistributionZoneView, DistributionZoneChange> zones =
- zonesConfiguration.distributionZones();
-
- DistributionZoneView zoneView = zones.value().get(0);
-
- CountDownLatch in1 = new CountDownLatch(1);
- CountDownLatch in2 = new CountDownLatch(1);
- CountDownLatch out1 = new CountDownLatch(1);
- CountDownLatch out2 = new CountDownLatch(1);
-
- distributionZoneManager.scheduleTimers(
- zoneView,
- Set.of(),
- Set.of(B),
- prerequisiteRevision + 1,
- (t1, t2) -> null,
- (zoneId, revision) -> {
- try {
- in1.await();
- } catch (InterruptedException e) {
- fail();
- }
-
- return testSaveDataNodesOnScaleDown(zoneId,
revision).thenRun(out1::countDown);
- }
- );
-
- // Assert that first task was run and event about removing node "B"
with revision {@code prerequisiteRevision + 1} was added
- // to the topologyAugmentationMap of the zone.
- assertThatZonesAugmentationMapContainsRevision(ZONE_1_ID,
prerequisiteRevision + 1);
-
- distributionZoneManager.scheduleTimers(
- zoneView,
- Set.of(),
- Set.of(C),
- prerequisiteRevision + 2,
- (t1, t2) -> null,
- (zoneId, revision) -> {
- try {
- in2.await();
- } catch (InterruptedException e) {
- fail();
- }
-
- return testSaveDataNodesOnScaleDown(zoneId,
revision).thenRun(() -> {
- try {
- out2.await();
- } catch (InterruptedException e) {
- fail();
- }
- });
- }
- );
-
- // Assert that second task was run and event about removing node "C"
with revision {@code prerequisiteRevision + 2} was added
- // to the topologyAugmentationMap of the zone.
- assertThatZonesAugmentationMapContainsRevision(ZONE_1_ID,
prerequisiteRevision + 2);
-
- //Second task is propagating data nodes first.
- in2.countDown();
-
- assertZoneScaleDownChangeTriggerKey(prerequisiteRevision + 2,
ZONE_1_ID, keyValueStorage);
-
- assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A),
keyValueStorage);
-
- out2.countDown();
-
- in1.countDown();
-
- //Waiting for the first scheduler ends it work.
- out1.countDown();
-
- // Assert that nothing has been changed.
- assertZoneScaleDownChangeTriggerKey(prerequisiteRevision + 2,
ZONE_1_ID, keyValueStorage);
-
- assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A),
keyValueStorage);
- }
-
- @Test
- void testTwoScaleUpTimersFirstTimerRunFirst() throws Exception {
- preparePrerequisites();
-
- NamedConfigurationTree<DistributionZoneConfiguration,
DistributionZoneView, DistributionZoneChange> zones =
- zonesConfiguration.distributionZones();
-
- DistributionZoneView zoneView = zones.value().get(0);
-
- CountDownLatch in1 = new CountDownLatch(1);
- CountDownLatch in2 = new CountDownLatch(1);
- CountDownLatch out1 = new CountDownLatch(1);
-
- distributionZoneManager.scheduleTimers(
- zoneView,
- Set.of(D),
- Set.of(),
- prerequisiteRevision + 1,
- (zoneId, revision) -> {
- in1.countDown();
-
- return testSaveDataNodesOnScaleUp(zoneId,
revision).thenRun(() -> {
- try {
- out1.await();
- } catch (InterruptedException e) {
- fail();
- }
- });
- },
- (t1, t2) -> null
- );
-
- // Waiting for the first task to be run. We have to do that to be sure
that watch events,
- // which we try to emulate, are handled sequentially.
- in1.await();
-
- distributionZoneManager.scheduleTimers(
- zoneView,
- Set.of(E),
- Set.of(),
- prerequisiteRevision + 2,
- (zoneId, revision) -> {
- try {
- in2.await();
- } catch (InterruptedException e) {
- fail();
- }
-
- return testSaveDataNodesOnScaleUp(zoneId, revision);
- },
- (t1, t2) -> null
- );
-
- // Assert that second task was run and event about adding node "E"
with revision {@code prerequisiteRevision + 2} was added
- // to the topologyAugmentationMap of the zone.
- assertThatZonesAugmentationMapContainsRevision(ZONE_1_ID,
prerequisiteRevision + 2);
-
- assertZoneScaleUpChangeTriggerKey(prerequisiteRevision + 1, ZONE_1_ID,
keyValueStorage);
-
- assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C, D),
keyValueStorage);
-
- // Second task is run and we await that data nodes will be changed
from ["A", "B", "C", "D"] to ["A", "B", "C", "D", "E"]
- in2.countDown();
-
- assertZoneScaleUpChangeTriggerKey(prerequisiteRevision + 2, ZONE_1_ID,
keyValueStorage);
-
- assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C, D, E),
keyValueStorage);
-
- out1.countDown();
- }
-
- @Test
- void testTwoScaleDownTimersFirstTimerRunFirst() throws Exception {
- preparePrerequisites();
-
- NamedConfigurationTree<DistributionZoneConfiguration,
DistributionZoneView, DistributionZoneChange> zones =
- zonesConfiguration.distributionZones();
-
- DistributionZoneView zoneView = zones.value().get(0);
-
- CountDownLatch in1 = new CountDownLatch(1);
- CountDownLatch in2 = new CountDownLatch(1);
- CountDownLatch out1 = new CountDownLatch(1);
-
- distributionZoneManager.scheduleTimers(
- zoneView,
- Set.of(),
- Set.of(B),
- prerequisiteRevision + 1,
- (t1, t2) -> null,
- (zoneId, revision) -> {
- in1.countDown();
-
- return testSaveDataNodesOnScaleDown(zoneId,
revision).thenRun(() -> {
- try {
- out1.await();
- } catch (InterruptedException e) {
- fail();
- }
- });
- }
- );
-
- // Waiting for the first task to be run. We have to do that to be sure
that watch events,
- // which we try to emulate, are handled sequentially.
- in1.await();
-
- distributionZoneManager.scheduleTimers(
- zoneView,
- Set.of(),
- Set.of(C),
- prerequisiteRevision + 2,
- (t1, t2) -> null,
- (zoneId, revision) -> {
- try {
- in2.await();
- } catch (InterruptedException e) {
- fail();
- }
-
- return testSaveDataNodesOnScaleDown(zoneId, revision);
- }
- );
-
- // Assert that second task was run and event about removing node "C"
with revision {@code prerequisiteRevision + 2} was added
- // to the topologyAugmentationMap of the zone.
- assertThatZonesAugmentationMapContainsRevision(ZONE_1_ID,
prerequisiteRevision + 2);
-
- assertZoneScaleDownChangeTriggerKey(prerequisiteRevision + 1,
ZONE_1_ID, keyValueStorage);
-
- assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, C),
keyValueStorage);
-
- // Second task is run and we await that data nodes will be changed
from ["A", "C"] to ["A"]
- in2.countDown();
-
- assertZoneScaleDownChangeTriggerKey(prerequisiteRevision + 2,
ZONE_1_ID, keyValueStorage);
-
- assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A),
keyValueStorage);
-
- out1.countDown();
- }
-
@Test
void testEmptyDataNodesOnStart() throws Exception {
startDistributionZoneManager();
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZonesSchedulersTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZonesSchedulersTest.java
index 2e14cc044e..0a6eb584e5 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZonesSchedulersTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZonesSchedulersTest.java
@@ -17,21 +17,20 @@
package org.apache.ignite.internal.distributionzones;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.createZoneManagerExecutor;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.after;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
+import java.util.function.Supplier;
import
org.apache.ignite.internal.distributionzones.DistributionZoneManager.ZoneState;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
@@ -45,10 +44,8 @@ import org.junit.jupiter.api.Test;
public class DistributionZonesSchedulersTest {
private static final IgniteLogger LOG =
Loggers.forClass(DistributionZonesSchedulersTest.class);
- private static final ScheduledExecutorService executor = new
ScheduledThreadPoolExecutor(
- Math.min(Runtime.getRuntime().availableProcessors() * 3, 20),
- new NamedThreadFactory("test-dst-zones-scheduler", LOG),
- new ThreadPoolExecutor.DiscardPolicy()
+ private static ScheduledExecutorService executor =
createZoneManagerExecutor(
+ new NamedThreadFactory("test-dst-zones-scheduler", LOG)
);
@AfterAll
@@ -83,27 +80,118 @@ public class DistributionZonesSchedulersTest {
}
@Test
- void testScaleUpReScheduleNotStartedTask() {
+ void testScaleUpReScheduling() throws InterruptedException {
ZoneState state = new DistributionZoneManager.ZoneState(executor);
- testReScheduleNotStartedTask(state::rescheduleScaleUp);
+ testReScheduling(state::rescheduleScaleUp);
}
@Test
- void testScaleDownReScheduleNotStartedTask() {
+ void testScaleDownReScheduling() throws InterruptedException {
ZoneState state = new DistributionZoneManager.ZoneState(executor);
- testReScheduleNotStartedTask(state::rescheduleScaleDown);
+ testReScheduling(state::rescheduleScaleDown);
}
- private static void testReScheduleNotStartedTask(BiConsumer<Long,
Runnable> fn) {
- Runnable runnable = mock(Runnable.class);
+ /**
+ * Tests that scaleUp/scaleDown tasks with a zero delay will not be
canceled by other tasks.
+ * Tests that scaleUp/scaleDown tasks with a delay grater then zero will
be canceled by other tasks.
+ */
+ private static void testReScheduling(BiConsumer<Long, Runnable> fn) throws
InterruptedException {
+ AtomicInteger counter = new AtomicInteger();
- fn.accept(1L, runnable);
+ CountDownLatch firstTaskLatch = new CountDownLatch(1);
- fn.accept(0L, runnable);
+ CountDownLatch lastTaskLatch = new CountDownLatch(1);
- verify(runnable, after(1200).times(1)).run();
+ fn.accept(0L, () -> {
+ try {
+ assertTrue(firstTaskLatch.await(3, TimeUnit.SECONDS));
+
+ counter.incrementAndGet();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ fn.accept(1L, counter::incrementAndGet);
+
+ fn.accept(0L, counter::incrementAndGet);
+
+ fn.accept(0L, counter::incrementAndGet);
+
+ fn.accept(1L, counter::incrementAndGet);
+
+ fn.accept(1L, counter::incrementAndGet);
+
+ fn.accept(0L, counter::incrementAndGet);
+
+ fn.accept(0L, () -> {
+ counter.incrementAndGet();
+
+ lastTaskLatch.countDown();
+ });
+
+ firstTaskLatch.countDown();
+
+ assertTrue(lastTaskLatch.await(3, TimeUnit.SECONDS));
+
+ assertEquals(5, counter.get());
+ }
+
+ @Test
+ void testScaleUpOrdering() throws InterruptedException {
+ ZoneState state = new DistributionZoneManager.ZoneState(executor);
+
+ testOrdering(state::rescheduleScaleUp);
+ }
+
+ @Test
+ void testScaleDownOrdering() throws InterruptedException {
+ ZoneState state = new DistributionZoneManager.ZoneState(executor);
+
+ testOrdering(state::rescheduleScaleDown);
+ }
+
+ private static void testOrdering(BiConsumer<Long, Runnable> fn) throws
InterruptedException {
+ AtomicInteger counter = new AtomicInteger();
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ AtomicBoolean sequentialOrder = new AtomicBoolean(true);
+
+ fn.accept(0L, () -> {
+ try {
+ assertTrue(latch.await(3, TimeUnit.SECONDS));
+
+ counter.incrementAndGet();
+
+ if (counter.get() != 1) {
+ sequentialOrder.set(false);
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ for (int i = 2; i < 11; i++) {
+ int j = i;
+
+ fn.accept(0L, () -> {
+ counter.incrementAndGet();
+
+ if (counter.get() != j) {
+ sequentialOrder.set(false);
+ }
+ });
+ }
+
+ latch.countDown();
+
+ waitForCondition(() -> counter.get() == 10, 3000);
+ assertEquals(10, counter.get(), "Not all tasks were executed.");
+
+ assertTrue(sequentialOrder.get(), "The order of tasks execution is not
sequential.");
}
@Test
@@ -131,11 +219,156 @@ public class DistributionZonesSchedulersTest {
latch.await(1000, TimeUnit.MILLISECONDS);
- fn.accept(0L, () -> {
- flag.set(true);
- });
+ fn.accept(0L, () -> flag.set(true));
assertTrue(waitForCondition(() -> 0L == latch.getCount(), 1500));
assertTrue(waitForCondition(flag::get, 1500));
}
+
+ @Test
+ void testCancelScaleUpTaskOnStopScaleUp() {
+ ZoneState state = new DistributionZoneManager.ZoneState(executor);
+
+ testCancelTask(state::rescheduleScaleUp, state::stopScaleUp, () ->
state.scaleUpTask().isCancelled());
+ }
+
+ @Test
+ void testCancelScaleDownTaskOnStopScaleDown() {
+ ZoneState state = new ZoneState(executor);
+
+ testCancelTask(state::rescheduleScaleDown, state::stopScaleDown, () ->
state.scaleDownTask().isCancelled());
+ }
+
+ @Test
+ void testCancelScaleUpTasksOnStopTimers() {
+ ZoneState state = new ZoneState(executor);
+
+ testCancelTask(state::rescheduleScaleUp, state::stopTimers, () ->
state.scaleUpTask().isCancelled());
+ }
+
+ @Test
+ void testCancelScaleDownTasksOnStopTimers() {
+ ZoneState state = new ZoneState(executor);
+
+ testCancelTask(state::rescheduleScaleDown, state::stopTimers, () ->
state.scaleDownTask().isCancelled());
+ }
+
+ /**
+ * {@link ZoneState#stopScaleUp()}, {@link ZoneState#stopScaleDown()} and
{@link ZoneState#stopTimers()} cancel task
+ * if it is not started and has a delay greater than zero.
+ */
+ private static void testCancelTask(
+ BiConsumer<Long, Runnable> fn,
+ Runnable stopTask,
+ Supplier<Boolean> isTaskCancelled
+ ) {
+ CountDownLatch latch = new CountDownLatch(1);
+
+ fn.accept(0L, () -> {
+ try {
+ assertTrue(latch.await(3, TimeUnit.SECONDS));
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ fn.accept(1L, () -> {});
+
+ assertFalse(isTaskCancelled.get());
+
+ stopTask.run();
+
+ assertTrue(isTaskCancelled.get());
+
+ latch.countDown();
+ }
+
+ @Test
+ void testNotCancelScaleUpTaskOnStopScaleUp() {
+ ZoneState state = new DistributionZoneManager.ZoneState(executor);
+
+ testNotCancelTask(state::rescheduleScaleUp, state::stopScaleUp, () ->
state.scaleUpTask().isCancelled());
+ }
+
+ @Test
+ void testNotCancelScaleDownTaskOnStopScaleDown() {
+ ZoneState state = new ZoneState(executor);
+
+ testNotCancelTask(state::rescheduleScaleDown, state::stopScaleDown, ()
-> state.scaleDownTask().isCancelled());
+
+ }
+
+ /**
+ * {@link ZoneState#stopScaleUp()} and {@link ZoneState#stopScaleDown()}
doesn't cancel task
+ * if it is not started and has a delay equal to zero.
+ */
+ private static void testNotCancelTask(
+ BiConsumer<Long, Runnable> fn,
+ Runnable stopTask,
+ Supplier<Boolean> isTaskCancelled
+ ) {
+ CountDownLatch latch = new CountDownLatch(1);
+
+ fn.accept(0L, () -> {
+ try {
+ assertTrue(latch.await(3, TimeUnit.SECONDS));
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ fn.accept(0L, () -> {});
+
+ assertFalse(isTaskCancelled.get());
+
+ stopTask.run();
+
+ assertFalse(isTaskCancelled.get());
+
+ latch.countDown();
+ }
+
+ /**
+ * {@link ZoneState#stopTimers()} cancel task if it is not started and has
a delay equal to zero.
+ */
+ @Test
+ public void testCancelTasksOnStopTimersAndImmediateTimerValues() {
+ ZoneState state = new DistributionZoneManager.ZoneState(executor);
+
+ CountDownLatch scaleUpTaskLatch = new CountDownLatch(1);
+
+ state.rescheduleScaleUp(0L, () -> {
+ try {
+ assertTrue(scaleUpTaskLatch.await(3, TimeUnit.SECONDS));
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ state.rescheduleScaleUp(0L, () -> {});
+
+ assertFalse(state.scaleUpTask().isCancelled());
+
+ CountDownLatch scaleDownTaskLatch = new CountDownLatch(1);
+
+ state.rescheduleScaleDown(0L, () -> {
+ try {
+ assertTrue(scaleDownTaskLatch.await(3, TimeUnit.SECONDS));
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ state.rescheduleScaleDown(0L, () -> {});
+
+ assertFalse(state.scaleDownTask().isCancelled());
+
+ state.stopTimers();
+
+ assertTrue(state.scaleUpTask().isCancelled());
+ assertTrue(state.scaleDownTask().isCancelled());
+
+ scaleUpTaskLatch.countDown();
+ scaleDownTaskLatch.countDown();
+ }
}