This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new f5f2c65827 IGNITE-19606 Linearize metaStorageManager.deployWatches and
metaStorageManager.start() (#2183)
f5f2c65827 is described below
commit f5f2c658273efaf98b5df156690f3b1afe10876d
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Thu Jun 15 18:01:15 2023 +0400
IGNITE-19606 Linearize metaStorageManager.deployWatches and
metaStorageManager.start() (#2183)
---
.../internal/catalog/CatalogServiceSelfTest.java | 7 +-
.../catalog/storage/UpdateLogImplTest.java | 5 +-
.../metastore/DeploymentUnitStoreImplTest.java | 6 +-
.../DistributionZonesTestUtil.java | 11 +-
.../internal/metastorage/MetaStorageManager.java | 4 +-
.../impl/ItMetaStorageManagerImplTest.java | 6 +-
.../ItMetaStorageMultipleNodesAbstractTest.java | 28 ++++-
.../metastorage/impl/ItMetaStorageWatchTest.java | 11 +-
.../metastorage/impl/MetaStorageManagerImpl.java | 11 +-
.../MetaStorageDeployWatchesCorrectnessTest.java | 122 +++++++++++++++++++++
.../MultiActorPlacementDriverTest.java | 7 +-
.../PlacementDriverManagerTest.java | 6 +-
.../placementdriver/PlacementDriverTest.java | 5 +-
.../ItDistributedConfigurationPropertiesTest.java | 23 ++--
.../ItDistributedConfigurationStorageTest.java | 17 ++-
.../storage/ItRebalanceDistributedTest.java | 19 +++-
.../runner/app/ItIgniteNodeRestartTest.java | 21 +---
.../org/apache/ignite/internal/app/IgniteImpl.java | 8 +-
18 files changed, 241 insertions(+), 76 deletions(-)
diff --git
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
index 3c6affd572..d4f21d0222 100644
---
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
+++
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
@@ -100,7 +100,6 @@ import
org.apache.ignite.lang.DistributionZoneNotFoundException;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IndexAlreadyExistsException;
import org.apache.ignite.lang.IndexNotFoundException;
-import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.lang.TableAlreadyExistsException;
import org.apache.ignite.lang.TableNotFoundException;
import org.apache.ignite.sql.ColumnType;
@@ -138,7 +137,7 @@ public class CatalogServiceSelfTest {
private HybridClock clock;
@BeforeEach
- void setUp() throws NodeStoppingException {
+ void setUp() {
vault = new VaultManager(new InMemoryVaultService());
metastore = StandaloneMetaStorageManager.create(
@@ -152,7 +151,7 @@ public class CatalogServiceSelfTest {
metastore.start();
service.start();
- metastore.deployWatches();
+ assertThat("Watches were not deployed", metastore.deployWatches(),
willCompleteSuccessfully());
}
@AfterEach
@@ -1128,7 +1127,7 @@ public class CatalogServiceSelfTest {
metaStorageManager.start();
service.start();
- metaStorageManager.deployWatches();
+ assertThat("Watches were not deployed",
metaStorageManager.deployWatches(), willCompleteSuccessfully());
try {
CreateTableParams params = CreateTableParams.builder()
diff --git
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
index e41ba4eb53..dd9bf36ee5 100644
---
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
+++
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.catalog.storage;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
@@ -79,7 +80,7 @@ class UpdateLogImplTest {
updateLog.registerUpdateHandler(update -> {/* no-op */});
updateLog.start();
- metastore.deployWatches();
+ assertThat("Watches were not deployed", metastore.deployWatches(),
willCompleteSuccessfully());
List<VersionedUpdate> expectedLog = List.of(
new VersionedUpdate(1, 1L, List.of(new
TestUpdateEntry("foo"))),
@@ -138,7 +139,7 @@ class UpdateLogImplTest {
long revisionBefore = metastore.appliedRevision();
- metastore.deployWatches();
+ assertThat("Watches were not deployed", metastore.deployWatches(),
willCompleteSuccessfully());
// first update should always be successful
assertTrue(await(updateLog.append(singleEntryUpdateOfVersion(startVersion))));
diff --git
a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/metastore/DeploymentUnitStoreImplTest.java
b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/metastore/DeploymentUnitStoreImplTest.java
index 5474c10613..2cc191f3d9 100644
---
a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/metastore/DeploymentUnitStoreImplTest.java
+++
b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/metastore/DeploymentUnitStoreImplTest.java
@@ -22,6 +22,7 @@ import static
org.apache.ignite.internal.rest.api.deployment.DeploymentStatus.OB
import static
org.apache.ignite.internal.rest.api.deployment.DeploymentStatus.REMOVING;
import static
org.apache.ignite.internal.rest.api.deployment.DeploymentStatus.UPLOADING;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -46,7 +47,6 @@ import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
-import org.apache.ignite.lang.NodeStoppingException;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -76,7 +76,7 @@ public class DeploymentUnitStoreImplTest {
private Path workDir;
@BeforeEach
- public void setup() throws NodeStoppingException {
+ public void setup() {
history.clear();
KeyValueStorage storage = new RocksDbKeyValueStorage("test", workDir);
@@ -87,7 +87,7 @@ public class DeploymentUnitStoreImplTest {
vaultManager.start();
metaStorageManager.start();
- metaStorageManager.deployWatches();
+ assertThat("Watches were not deployed",
metaStorageManager.deployWatches(), willCompleteSuccessfully());
}
@Test
diff --git
a/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java
b/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java
index 132795d09f..16997064c5 100644
---
a/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java
+++
b/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java
@@ -57,7 +57,6 @@ import
org.apache.ignite.internal.schema.configuration.storage.DataStorageChange
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.lang.ByteArray;
-import org.apache.ignite.lang.NodeStoppingException;
import org.jetbrains.annotations.Nullable;
@@ -306,24 +305,22 @@ public class DistributionZonesTestUtil {
* TODO: IGNITE-19403 Watch listeners must be deployed after the zone
manager starts.
*
* @param metaStorageManager Meta storage manager.
- * @throws NodeStoppingException If node is stopping.
* @throws InterruptedException If thread was interrupted.
*/
- public static void
deployWatchesAndUpdateMetaStorageRevision(MetaStorageManager metaStorageManager)
- throws NodeStoppingException, InterruptedException {
+ public static void
deployWatchesAndUpdateMetaStorageRevision(MetaStorageManager
metaStorageManager) throws InterruptedException {
// Watches are deployed before distributionZoneManager start in order
to update Meta Storage revision before
// distributionZoneManager's recovery.
- metaStorageManager.deployWatches();
+ CompletableFuture<Void> deployWatchesFut =
metaStorageManager.deployWatches();
// Bump Meta Storage applied revision by modifying a fake key.
DistributionZoneManager breaks on start if Vault is not empty, but
// Meta Storage revision is equal to 0.
var fakeKey = new ByteArray("foobar");
- CompletableFuture<Boolean> invokeFuture = metaStorageManager.invoke(
+ CompletableFuture<Boolean> invokeFuture =
deployWatchesFut.thenCompose(unused -> metaStorageManager.invoke(
Conditions.notExists(fakeKey),
Operations.put(fakeKey, fakeKey.bytes()),
Operations.noop()
- );
+ ));
assertThat(invokeFuture, willBe(true));
diff --git
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index efca5855ed..5d1d411f70 100644
---
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -170,8 +170,10 @@ public interface MetaStorageManager extends
IgniteComponent {
* Starts all registered watches.
*
* <p>Should be called after all Ignite components have registered
required watches and they are ready to process Meta Storage events.
+ *
+ * @return Future which completes when Meta storage manager is started and
deploying watches is finished.
*/
- void deployWatches() throws NodeStoppingException;
+ CompletableFuture<Void> deployWatches();
/**
* Returns cluster time with a hybrid clock instance and access to safe
time.
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
index 7b1ef8573d..04dec2a164 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
@@ -24,6 +24,7 @@ import static
org.apache.ignite.internal.testframework.flow.TestFlowUtils.subscr
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.will;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.utils.ClusterServiceTestUtils.clusterService;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
@@ -61,7 +62,6 @@ import org.apache.ignite.internal.vault.VaultEntry;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
import org.apache.ignite.lang.ByteArray;
-import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.StaticNodeFinder;
@@ -87,7 +87,7 @@ public class ItMetaStorageManagerImplTest extends
IgniteAbstractTest {
private MetaStorageManagerImpl metaStorageManager;
@BeforeEach
- void setUp(TestInfo testInfo, @InjectConfiguration RaftConfiguration
raftConfiguration) throws NodeStoppingException {
+ void setUp(TestInfo testInfo, @InjectConfiguration RaftConfiguration
raftConfiguration) {
var addr = new NetworkAddress("localhost", 10_000);
clusterService = clusterService(testInfo, addr.port(), new
StaticNodeFinder(List.of(addr)));
@@ -118,7 +118,7 @@ public class ItMetaStorageManagerImplTest extends
IgniteAbstractTest {
raftManager.start();
metaStorageManager.start();
- metaStorageManager.deployWatches();
+ assertThat("Watches were not deployed",
metaStorageManager.deployWatches(), willCompleteSuccessfully());
}
@AfterEach
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
index 53ef8f6d2a..9103686e64 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
@@ -115,6 +115,9 @@ public abstract class
ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr
private final MetaStorageManagerImpl metaStorageManager;
+ /** The future have to be complete after the node start and all Meta
storage watches are deployd. */
+ private final CompletableFuture<Void> deployWatchesFut;
+
Node(ClusterService clusterService, Path dataPath) {
this.clusterService = clusterService;
@@ -151,6 +154,8 @@ public abstract class
ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr
createStorage(name(), basePath),
clock
);
+
+ deployWatchesFut = metaStorageManager.deployWatches();
}
void start() throws NodeStoppingException {
@@ -164,8 +169,13 @@ public abstract class
ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr
);
components.forEach(IgniteComponent::start);
+ }
- metaStorageManager.deployWatches();
+ /**
+ * Waits for watches deployed.
+ */
+ void waitWatches() {
+ assertThat("Watches were not deployed", deployWatchesFut,
willCompleteSuccessfully());
}
String name() {
@@ -228,6 +238,8 @@ public abstract class
ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr
firstNode.cmgManager.initCluster(List.of(firstNode.name()),
List.of(firstNode.name()), "test");
+ firstNode.waitWatches();
+
var key = new ByteArray("foo");
byte[] value = "bar".getBytes(StandardCharsets.UTF_8);
@@ -237,6 +249,8 @@ public abstract class
ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr
Node secondNode = startNode(testInfo);
+ secondNode.waitWatches();
+
// Check that reading remote data works correctly.
assertThat(secondNode.metaStorageManager.get(key).thenApply(Entry::value),
willBe(value));
@@ -288,6 +302,9 @@ public abstract class
ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr
firstNode.cmgManager.initCluster(List.of(firstNode.name()),
List.of(firstNode.name()), "test");
+ firstNode.waitWatches();
+ secondNode.waitWatches();
+
// Try reading some data to make sure that Raft has been configured
correctly.
assertThat(secondNode.metaStorageManager.get(new
ByteArray("test")).thenApply(Entry::value), willBe(nullValue()));
@@ -314,6 +331,9 @@ public abstract class
ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr
assertThat(allOf(firstNode.cmgManager.onJoinReady(),
secondNode.cmgManager.onJoinReady()), willCompleteSuccessfully());
+ firstNode.waitWatches();
+ secondNode.waitWatches();
+
CompletableFuture<Set<String>> logicalTopologyNodes =
firstNode.cmgManager
.logicalTopology()
.thenApply(logicalTopology ->
logicalTopology.nodes().stream().map(ClusterNode::name).collect(toSet()));
@@ -357,6 +377,9 @@ public abstract class
ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr
assertThat(allOf(firstNode.cmgManager.onJoinReady(),
secondNode.cmgManager.onJoinReady()), willCompleteSuccessfully());
+ firstNode.waitWatches();
+ secondNode.waitWatches();
+
CompletableFuture<Void> watchCompletedFuture = new
CompletableFuture<>();
CountDownLatch watchCalledLatch = new CountDownLatch(1);
@@ -438,6 +461,9 @@ public abstract class
ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr
assertThat(allOf(firstNode.cmgManager.onJoinReady(),
secondNode.cmgManager.onJoinReady()), willCompleteSuccessfully());
+ firstNode.waitWatches();
+ secondNode.waitWatches();
+
assertThat(
firstNode.metaStorageManager.put(ByteArray.fromString("test-key"), new
byte[]{0, 1, 2, 3}),
willCompleteSuccessfully()
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
index 1ea7959294..eb7701110f 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.metastorage.impl;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static
org.apache.ignite.utils.ClusterServiceTestUtils.findLocalAddresses;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -273,7 +274,7 @@ public class ItMetaStorageWatchTest extends
IgniteAbstractTest {
for (Node node : nodes) {
registerWatchAction.accept(node, latch);
- node.metaStorageManager.deployWatches();
+ assertThat("Watches were not deployed",
node.metaStorageManager.deployWatches(), willCompleteSuccessfully());
}
var key = new ByteArray("foo");
@@ -364,13 +365,7 @@ public class ItMetaStorageWatchTest extends
IgniteAbstractTest {
assertThat(invokeFuture, willBe(true));
- nodes.forEach(node -> {
- try {
- node.metaStorageManager.deployWatches();
- } catch (NodeStoppingException e) {
- throw new RuntimeException(e);
- }
- });
+ nodes.forEach(node -> assertThat("Watches were not deployed",
node.metaStorageManager.deployWatches(), willCompleteSuccessfully()));
assertTrue(exactLatch.await(10, TimeUnit.SECONDS));
assertTrue(prefixLatch.await(10, TimeUnit.SECONDS));
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index 143b30acab..082249a8fa 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -21,6 +21,7 @@ import static
java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.ignite.internal.util.ByteUtils.bytesToLong;
import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
import static org.apache.ignite.internal.util.IgniteUtils.cancelOrConsume;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static
org.apache.ignite.lang.ErrorGroups.MetaStorage.RESTORING_STORAGE_ERR;
import java.util.Collection;
@@ -277,14 +278,16 @@ public class MetaStorageManagerImpl implements
MetaStorageManager {
}
@Override
- public void deployWatches() throws NodeStoppingException {
+ public CompletableFuture<Void> deployWatches() {
if (!busyLock.enterBusy()) {
- throw new NodeStoppingException();
+ return CompletableFuture.failedFuture(new NodeStoppingException());
}
try {
- // Meta Storage contract states that all updated entries under a
particular revision must be stored in the Vault.
- storage.startWatches(this::onRevisionApplied);
+ return metaStorageSvcFut.thenRun(() -> inBusyLock(busyLock, () -> {
+ // Meta Storage contract states that all updated entries under
a particular revision must be stored in the Vault.
+ storage.startWatches(this::onRevisionApplied);
+ }));
} finally {
busyLock.leaveBusy();
}
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java
new file mode 100644
index 0000000000..9f68f87de2
--- /dev/null
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.impl;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Set;
+import java.util.stream.Stream;
+import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import org.apache.ignite.internal.raft.RaftManager;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
+import org.apache.ignite.network.ClusterService;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Tests that check correctness of an invocation {@link
MetaStorageManager#deployWatches()}.
+ */
+public class MetaStorageDeployWatchesCorrectnessTest extends
IgniteAbstractTest {
+ /** Vault manager. */
+ private static VaultManager vaultManager;
+
+ @BeforeAll
+ public static void init() {
+ vaultManager = new VaultManager(new InMemoryVaultService());
+
+ vaultManager.start();
+ }
+
+ @AfterAll
+ public static void deInit() {
+ vaultManager.beforeNodeStop();
+
+ vaultManager.stop();
+ }
+
+ /**
+ * Returns a stream with test arguments.
+ *
+ * @return Stream of different types of Meta storages to to check.
+ * @throws Exception If failed.
+ */
+ private static Stream<MetaStorageManager> metaStorageProvider() throws
Exception {
+ HybridClock clock = new HybridClockImpl();
+ String mcNodeName = "mc-node-1";
+
+ ClusterManagementGroupManager cmgManager =
mock(ClusterManagementGroupManager.class);
+ ClusterService clusterService = mock(ClusterService.class);
+ LogicalTopologyService logicalTopologyService =
mock(LogicalTopologyService.class);
+ RaftManager raftManager = mock(RaftManager.class);
+
+
when(cmgManager.metaStorageNodes()).thenReturn(completedFuture(Set.of(mcNodeName)));
+ when(clusterService.nodeName()).thenReturn(mcNodeName);
+ when(raftManager.startRaftGroupNodeAndWaitNodeReadyFuture(any(),
any(), any(), any(), any())).thenReturn(completedFuture(mock(
+ RaftGroupService.class)));
+
+ return Stream.of(
+ new MetaStorageManagerImpl(
+ vaultManager,
+ clusterService,
+ cmgManager,
+ logicalTopologyService,
+ raftManager,
+ new SimpleInMemoryKeyValueStorage(mcNodeName),
+ clock
+ ),
+ StandaloneMetaStorageManager.create(vaultManager)
+ );
+ }
+
+ /**
+ * Invokes {@link MetaStorageManager#deployWatches()} and checks result.
+ *
+ * @param metastore Meta storage.
+ */
+ @ParameterizedTest
+ @MethodSource("metaStorageProvider")
+ public void testCheckCorrectness(MetaStorageManager metastore) throws
Exception {
+ var deployWatchesFut = metastore.deployWatches();
+
+ assertFalse(deployWatchesFut.isDone());
+
+ metastore.start();
+
+ assertThat(deployWatchesFut, willCompleteSuccessfully());
+
+ metastore.beforeNodeStop();
+
+ metastore.stop();
+ }
+}
diff --git
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
index 006cfcd436..6e6f7145aa 100644
---
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
+++
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
@@ -23,8 +23,10 @@ import static
org.apache.ignite.internal.placementdriver.PlacementDriverManager.
import static
org.apache.ignite.internal.placementdriver.leases.Lease.fromBytes;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static
org.apache.ignite.internal.utils.RebalanceUtil.stablePartAssignmentsKey;
import static org.apache.ignite.lang.ByteArray.fromString;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
@@ -230,12 +232,11 @@ public class MultiActorPlacementDriverTest extends
IgniteAbstractTest {
* @param services Cluster services.
* @param logicalTopManagers The list to update in the method. The list
might be used for driving of the logical topology.
* @return List of closures to stop the services.
- * @throws Exception If something goes wrong.
*/
public List<Closeable> startPlacementDriver(
Map<String, ClusterService> services,
List<LogicalTopologyServiceTestImpl> logicalTopManagers
- ) throws Exception {
+ ) {
var res = new ArrayList<Closeable>(placementDriverNodeNames.size());
for (String nodeName : placementDriverNodeNames) {
@@ -305,7 +306,7 @@ public class MultiActorPlacementDriverTest extends
IgniteAbstractTest {
metaStorageManager.start();
placementDriverManager.start();
- metaStorageManager.deployWatches();
+ assertThat("Watches were not deployed",
metaStorageManager.deployWatches(), willCompleteSuccessfully());
res.add(() -> {
try {
diff --git
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
index 6cceabe662..592a2e67b9 100644
---
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
+++
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
@@ -25,9 +25,11 @@ import static
org.apache.ignite.internal.placementdriver.PlacementDriverManager.
import static
org.apache.ignite.internal.placementdriver.leases.Lease.fromBytes;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static
org.apache.ignite.internal.utils.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX;
import static
org.apache.ignite.internal.utils.RebalanceUtil.stablePartAssignmentsKey;
import static org.apache.ignite.lang.ByteArray.fromString;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -149,7 +151,7 @@ public class PlacementDriverManagerTest extends
IgniteAbstractTest {
startPlacementDriverManager();
}
- private void startPlacementDriverManager() throws NodeStoppingException {
+ private void startPlacementDriverManager() {
vaultManager = new VaultManager(new
PersistentVaultService(testNodeName(testInfo, PORT), workDir.resolve("vault")));
var nodeFinder = new StaticNodeFinder(Collections.singletonList(new
NetworkAddress("localhost", PORT)));
@@ -221,7 +223,7 @@ public class PlacementDriverManagerTest extends
IgniteAbstractTest {
metaStorageManager.start();
placementDriverManager.start();
- metaStorageManager.deployWatches();
+ assertThat("Watches were not deployed",
metaStorageManager.deployWatches(), willCompleteSuccessfully());
}
/**
diff --git
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
index 50497b06d2..a6453191fd 100644
---
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
+++
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
@@ -22,6 +22,7 @@ import static
org.apache.ignite.internal.metastorage.dsl.Operations.put;
import static
org.apache.ignite.internal.placementdriver.PlacementDriverManager.PLACEMENTDRIVER_PREFIX;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -84,7 +85,7 @@ public class PlacementDriverTest {
private LeaseTracker placementDriver;
@BeforeEach
- void setUp() throws Exception {
+ void setUp() {
vault = new VaultManager(new InMemoryVaultService());
metastore = StandaloneMetaStorageManager.create(vault);
@@ -98,7 +99,7 @@ public class PlacementDriverTest {
metastore.start();
placementDriver.startTrack();
- metastore.deployWatches();
+ assertThat("Watches were not deployed", metastore.deployWatches(),
willCompleteSuccessfully());
}
@AfterEach
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
index d9d10ced6f..54881775cb 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.configuration;
import static java.util.stream.Collectors.toUnmodifiableList;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
@@ -60,7 +61,6 @@ import
org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
-import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.StaticNodeFinder;
@@ -114,6 +114,9 @@ public class ItDistributedConfigurationPropertiesTest {
private final ConfigurationManager distributedCfgManager;
+ /** The future have to be complete after the node start and all Meta
storage watches are deployd. */
+ private final CompletableFuture<Void> deployWatchesFut;
+
/** Flag that disables storage updates. */
private volatile boolean receivesUpdates = true;
@@ -161,6 +164,8 @@ public class ItDistributedConfigurationPropertiesTest {
clock
);
+ deployWatchesFut = metaStorageManager.deployWatches();
+
// create a custom storage implementation that is able to "lose"
some storage updates
var distributedCfgStorage = new
DistributedConfigurationStorage(metaStorageManager, vaultManager) {
/** {@inheritDoc} */
@@ -206,16 +211,16 @@ public class ItDistributedConfigurationPropertiesTest {
Stream.of(clusterService, raftManager, cmgManager,
metaStorageManager)
.forEach(IgniteComponent::start);
- // deploy watches to propagate data from the metastore into the
vault
- try {
- metaStorageManager.deployWatches();
- } catch (NodeStoppingException e) {
- throw new RuntimeException(e);
- }
-
distributedCfgManager.start();
}
+ /**
+ * Waits for watches deployed.
+ */
+ void waitWatches() {
+ assertThat("Watches were not deployed", deployWatchesFut,
willCompleteSuccessfully());
+ }
+
/**
* Stops the created components.
*/
@@ -286,6 +291,8 @@ public class ItDistributedConfigurationPropertiesTest {
Stream.of(firstNode, secondNode).parallel().forEach(Node::start);
firstNode.cmgManager.initCluster(List.of(firstNode.name()), List.of(),
"cluster");
+
+ Stream.of(firstNode, secondNode).parallel().forEach(Node::waitWatches);
}
/**
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
index ffa58c6682..89dfa81caa 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
@@ -92,6 +92,9 @@ public class ItDistributedConfigurationStorageTest {
private final DistributedConfigurationStorage cfgStorage;
+ /** The future have to be complete after the node start and all Meta
storage watches are deployd. */
+ private final CompletableFuture<Void> deployWatchesFut;
+
/**
* Constructor that simply creates a subset of components of this node.
*/
@@ -133,6 +136,8 @@ public class ItDistributedConfigurationStorageTest {
clock
);
+ deployWatchesFut = metaStorageManager.deployWatches();
+
cfgStorage = new
DistributedConfigurationStorage(metaStorageManager, vaultManager);
}
@@ -157,9 +162,13 @@ public class ItDistributedConfigurationStorageTest {
return completedFuture(null);
}
});
+ }
- // deploy watches to propagate data from the metastore into the
vault
- metaStorageManager.deployWatches();
+ /**
+ * Waits for watches deployed.
+ */
+ void waitWatches() {
+ assertThat("Watches were not deployed", deployWatchesFut,
willCompleteSuccessfully());
}
/**
@@ -200,6 +209,8 @@ public class ItDistributedConfigurationStorageTest {
node.cmgManager.initCluster(List.of(node.name()), List.of(),
"cluster");
+ node.waitWatches();
+
assertThat(node.cfgStorage.write(data, 0), willBe(equalTo(true)));
assertThat(node.cfgStorage.writeConfigurationRevision(0, 1),
willCompleteSuccessfully());
@@ -216,6 +227,8 @@ public class ItDistributedConfigurationStorageTest {
try {
node2.start();
+ node2.waitWatches();
+
CompletableFuture<Data> storageData =
node2.cfgStorage.readDataOnRecovery();
assertThat(storageData.thenApply(Data::values),
willBe(equalTo(data)));
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
index a7d41e2b00..1594f8f319 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
@@ -237,6 +237,8 @@ public class ItRebalanceDistributedTest {
allOf(nodes.get(0).cmgManager.onJoinReady(),
nodes.get(1).cmgManager.onJoinReady(), nodes.get(2).cmgManager.onJoinReady()),
willCompleteSuccessfully()
);
+
+ nodes.stream().forEach(Node::waitWatches);
}
@AfterEach
@@ -514,6 +516,8 @@ public class ItRebalanceDistributedTest {
newNode.start();
+ newNode.waitWatches();
+
nodes.set(evictedNodeIndex, newNode);
// Let's make sure that we will destroy the partition again.
@@ -600,6 +604,9 @@ public class ItRebalanceDistributedTest {
private final NetworkAddress networkAddress;
+ /** The future have to be complete after the node start and all Meta
storage watches are deployd. */
+ private CompletableFuture<Void> deployWatchesFut;
+
/**
* Constructor that simply creates a subset of components of this node.
*/
@@ -828,7 +835,7 @@ public class ItRebalanceDistributedTest {
/**
* Starts the created components.
*/
- void start() throws Exception {
+ void start() {
nodeComponents = List.of(
vaultManager,
nodeCfgMgr,
@@ -857,8 +864,14 @@ public class ItRebalanceDistributedTest {
willSucceedIn(1, TimeUnit.MINUTES)
);
- // deploy watches to propagate data from the metastore into the
vault
- metaStorageManager.deployWatches();
+ deployWatchesFut = metaStorageManager.deployWatches();
+ }
+
+ /**
+ * Waits for watches deployed.
+ */
+ void waitWatches() {
+ assertThat("Watches were not deployed", deployWatchesFut,
willCompleteSuccessfully());
}
/**
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 20261f8cee..76d9c698dc 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -45,7 +45,6 @@ import java.util.Objects;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.IntFunction;
@@ -124,7 +123,6 @@ import
org.apache.ignite.internal.vault.persistence.PersistentVaultService;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteStringFormatter;
import org.apache.ignite.lang.IgniteSystemProperties;
-import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.NettyBootstrapFactory;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
@@ -494,24 +492,15 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
fut -> new TestConfigurationCatchUpListener(cfgStorage, fut,
revisionCallback0)
);
- CompletableFuture<?> notificationFuture = CompletableFuture.allOf(
+ CompletableFuture<?> startFuture = CompletableFuture.allOf(
nodeCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(),
clusterCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners()
+ ).thenCompose(unused ->
+ // Deploy all registered watches because all components are
ready and have registered their listeners.
+ CompletableFuture.allOf(metaStorageMgr.deployWatches(),
configurationCatchUpFuture)
);
- CompletableFuture<?> startFuture = notificationFuture
- .thenCompose(v -> {
- // Deploy all registered watches because all components
are ready and have registered their listeners.
- try {
- metaStorageMgr.deployWatches();
- } catch (NodeStoppingException e) {
- throw new CompletionException(e);
- }
-
- return configurationCatchUpFuture;
- });
-
- assertThat(startFuture, willCompleteSuccessfully());
+ assertThat("Partial node was not started", startFuture,
willCompleteSuccessfully());
log.info("Completed recovery on partially started node, last revision
applied: " + lastRevision.get()
+ ", acceptableDifference: " +
IgniteSystemProperties.getInteger(CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY,
100)
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index dfa0fd35a1..91c329dcac 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -729,13 +729,7 @@ public class IgniteImpl implements Ignite {
return notifyConfigurationListeners()
.thenComposeAsync(t -> {
// Deploy all registered watches because
all components are ready and have registered their listeners.
- try {
- metaStorageMgr.deployWatches();
- } catch (NodeStoppingException e) {
- throw new CompletionException(e);
- }
-
- return recoveryFuture;
+ return
metaStorageMgr.deployWatches().thenCompose(unused -> recoveryFuture);
}, startupExecutor);
}, startupExecutor)
.thenRunAsync(() -> {