This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new da7bcdd9d9 IGNITE-20332 Fix flaky test
testScaleUpTriggeredByFilterUpdateIsRestoredAfterRestart (#2554)
da7bcdd9d9 is described below
commit da7bcdd9d96f8a69ccd9ebe66b346753f64924e8
Author: Sergey Uttsel <[email protected]>
AuthorDate: Tue Sep 12 12:36:11 2023 +0300
IGNITE-20332 Fix flaky test
testScaleUpTriggeredByFilterUpdateIsRestoredAfterRestart (#2554)
---
.../distributionzones/DistributionZonesUtil.java | 13 ++++++
...niteDistributionZoneManagerNodeRestartTest.java | 53 ++++++++++++++++------
2 files changed, 52 insertions(+), 14 deletions(-)
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
index e04e9970e2..79c24eaacf 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
@@ -54,6 +54,7 @@ import
org.apache.ignite.internal.thread.StripedScheduledThreadPoolExecutor;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.lang.ByteArray;
import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
/**
* Util class for Distribution Zones flow.
@@ -601,4 +602,16 @@ public class DistributionZonesUtil {
new ThreadPoolExecutor.DiscardPolicy()
);
}
+
+ /** Key prefix for zone's scale up change trigger key. */
+ @TestOnly
+ public static ByteArray zoneScaleUpChangeTriggerKeyPrefix() {
+ return new ByteArray(DISTRIBUTION_ZONE_SCALE_UP_CHANGE_TRIGGER_PREFIX);
+ }
+
+ /** Key prefix for zone's scale down change trigger key. */
+ @TestOnly
+ public static ByteArray zoneScaleDownChangeTriggerKeyPrefix() {
+ return new
ByteArray(DISTRIBUTION_ZONE_SCALE_DOWN_CHANGE_TRIGGER_PREFIX);
+ }
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java
index c0d289fd47..ac282dd9ea 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java
@@ -26,18 +26,19 @@ import static
org.apache.ignite.internal.distributionzones.DistributionZoneManag
import static
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertDataNodesFromManager;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertValueInStorage;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
-import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesGlobalStateRevision;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.util.ByteUtils.bytesToLong;
import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
+import static org.apache.ignite.internal.util.IgniteUtils.startsWith;
import static
org.apache.ignite.utils.ClusterServiceTestUtils.defaultSerializationRegistry;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@@ -46,7 +47,6 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -87,12 +87,12 @@ import
org.apache.ignite.internal.network.recovery.VaultStateIds;
import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import org.apache.ignite.internal.testframework.TestIgnitionManager;
import org.apache.ignite.internal.vault.VaultManager;
-import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.network.ClusterNodeImpl;
import org.apache.ignite.network.NettyBootstrapFactory;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
@@ -122,6 +122,10 @@ public class
ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRe
private MetaStorageManager metastore;
+ private volatile boolean startScaleUpBlocking;
+
+ private volatile boolean startScaleDownBlocking;
+
/**
* Start some of Ignite components that are able to serve as Ignite node
for test purposes.
*
@@ -185,6 +189,8 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest
extends BaseIgniteRe
new TestRocksDbKeyValueStorage(name,
workDir.resolve("metastorage"))
));
+ blockScaleUpAndScaleDownUpdates(metastore);
+
Consumer<LongFunction<CompletableFuture<?>>> revisionUpdater =
(LongFunction<CompletableFuture<?>> function) ->
metastore.registerRevisionUpdateListener(function::apply);
@@ -268,6 +274,13 @@ public class
ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRe
return partialNode;
}
+ @AfterEach
+ public void afterTest() {
+ startScaleUpBlocking = false;
+
+ startScaleDownBlocking = false;
+ }
+
@Test
public void testNodeAttributesRestoredAfterRestart() throws Exception {
PartialNode node = startPartialNode(0);
@@ -401,7 +414,7 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest
extends BaseIgniteRe
);
// Block scale up
- blockUpdate(metastore, zoneScaleUpChangeTriggerKey(zoneId));
+ startScaleUpBlocking = true;
node.logicalTopology().putNode(C);
node.logicalTopology().removeNodes(Set.of(B));
@@ -416,6 +429,8 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest
extends BaseIgniteRe
node.stop();
+ startScaleUpBlocking = false;
+
node = startPartialNode(0);
distributionZoneManager = getDistributionZoneManager(node);
@@ -440,10 +455,10 @@ public class
ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRe
createZoneOrAlterDefaultZone(node, zoneName, IMMEDIATE_TIMER_VALUE,
IMMEDIATE_TIMER_VALUE);
- node.logicalTopology().putNode(A);
-
int zoneId = getZoneId(node, zoneName);
+ node.logicalTopology().putNode(A);
+
DistributionZoneManager distributionZoneManager =
getDistributionZoneManager(node);
assertDataNodesFromManager(distributionZoneManager,
metastore::appliedRevision, zoneId, Set.of(A), TIMEOUT_MILLIS);
@@ -454,7 +469,7 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest
extends BaseIgniteRe
assertDataNodesFromManager(distributionZoneManager,
metastore::appliedRevision, zoneId, Set.of(A), TIMEOUT_MILLIS);
- blockUpdate(metastore, zoneScaleUpChangeTriggerKey(zoneId));
+ startScaleUpBlocking = true;
// Only Node B passes the filter
String filter = "$[?(@.dataRegionSize > 10)]";
@@ -471,6 +486,8 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest
extends BaseIgniteRe
node.stop();
+ startScaleUpBlocking = false;
+
node = startPartialNode(0);
distributionZoneManager = getDistributionZoneManager(node);
@@ -493,7 +510,7 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest
extends BaseIgniteRe
assertDataNodesFromManager(distributionZoneManager,
metastore::appliedRevision, zoneId, Set.of(A), TIMEOUT_MILLIS);
- blockUpdate(metastore, zoneScaleUpChangeTriggerKey(zoneId));
+ startScaleUpBlocking = true;
alterZone(node, zoneName, 100, null, null);
@@ -520,6 +537,8 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest
extends BaseIgniteRe
node.stop();
+ startScaleUpBlocking = false;
+
node = startPartialNode(0);
distributionZoneManager = getDistributionZoneManager(node);
@@ -557,7 +576,7 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest
extends BaseIgniteRe
int zoneId = getZoneId(node, zoneName);
// Block scale down
- blockUpdate(metastore, zoneScaleDownChangeTriggerKey(zoneId));
+ startScaleDownBlocking = true;
node.logicalTopology().putNode(A);
node.logicalTopology().putNode(B);
@@ -574,6 +593,8 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest
extends BaseIgniteRe
node.stop();
+ startScaleDownBlocking = false;
+
node = startPartialNode(0);
assertDataNodesFromManager(
@@ -625,14 +646,18 @@ public class
ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRe
}
}
- private static void blockUpdate(MetaStorageManager metaStorageManager,
ByteArray key) {
- when(metaStorageManager.invoke(argThat(iif -> {
+ private void blockScaleUpAndScaleDownUpdates(MetaStorageManager
metaStorageManager) {
+ doThrow(new
RuntimeException("Expected")).when(metaStorageManager).invoke(argThat(iif -> {
If iif1 = MetaStorageWriteHandler.toIf(iif);
- byte[] keyScaleUp = key.bytes();
+ byte[] keyScaleUpBytes =
DistributionZonesUtil.zoneScaleUpChangeTriggerKeyPrefix().bytes();
+ byte[] keyScaleDownBytes =
DistributionZonesUtil.zoneScaleDownChangeTriggerKeyPrefix().bytes();
+
+ boolean isScaleUpKey =
iif1.andThen().update().operations().stream().anyMatch(op ->
startsWith(op.key(), keyScaleUpBytes));
+ boolean isScaleDownKey =
iif1.andThen().update().operations().stream().anyMatch(op ->
startsWith(op.key(), keyScaleDownBytes));
- return iif1.andThen().update().operations().stream().anyMatch(op
-> Arrays.equals(keyScaleUp, op.key()));
- }))).thenThrow(new RuntimeException("Expected"));
+ return isScaleUpKey && startScaleUpBlocking || isScaleDownKey &&
startScaleDownBlocking;
+ }));
}
private static <T extends IgniteComponent> T
getStartedComponent(PartialNode node, Class<T> componentClass) {