This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 4d4c9e8748 IGNITE-19778 Restore components state on metastorage
recovery (#2266)
4d4c9e8748 is described below
commit 4d4c9e8748bd16e079757cc4f4beac6b8977e3c0
Author: Semyon Danilov <[email protected]>
AuthorDate: Wed Jul 12 12:03:53 2023 +0400
IGNITE-19778 Restore components state on metastorage recovery (#2266)
---
.../internal/catalog/storage/UpdateLogImpl.java | 2 +-
.../configuration/ConfigurationChanger.java | 5 -
.../storage/ConfigurationStorage.java | 10 -
.../storage/TestConfigurationStorage.java | 5 -
.../java/org/apache/ignite/lang/ByteArray.java | 7 +
.../distributionzones/DistributionZoneManager.java | 2 +-
.../BaseDistributionZoneManagerTest.java | 2 +-
.../internal/metastorage/MetaStorageManager.java | 23 +-
.../impl/ItMetaStorageManagerImplTest.java | 71 ------
...MetaStorageSafeTimePropagationAbstractTest.java | 2 +-
.../metastorage/impl/ItMetaStorageWatchTest.java | 6 +-
.../metastorage/impl/MetaStorageManagerImpl.java | 40 ++--
.../server/persistence/RocksDbKeyValueStorage.java | 6 +-
.../server/BasicOperationsKeyValueStorageTest.java | 6 +-
.../server/SimpleInMemoryKeyValueStorage.java | 6 +-
.../ignite/internal/BaseIgniteRestartTest.java | 66 +-----
.../ItDistributedConfigurationPropertiesTest.java | 4 +-
.../ItDistributedConfigurationStorageTest.java | 4 +-
.../storage/ItRebalanceDistributedTest.java | 68 +++---
.../zones/ItDistributionZonesFilterTest.java | 7 +
...niteDistributionZoneManagerNodeRestartTest.java | 5 +-
.../runner/app/ItIgniteNodeRestartTest.java | 9 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 37 +---
.../storage/DistributedConfigurationStorage.java | 134 +++++------
.../storage/LocalConfigurationStorage.java | 5 -
.../storage/LocalFileConfigurationStorage.java | 5 -
.../recovery/ConfigurationCatchUpListener.java | 117 ----------
.../recovery/RecoveryCompletionFutureFactory.java | 51 -----
.../DistributedConfigurationCatchUpTest.java | 244 ---------------------
.../DistributedConfigurationStorageTest.java | 8 +-
.../internal/table/distributed/TableManager.java | 12 +-
31 files changed, 189 insertions(+), 780 deletions(-)
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
index f627c51118..1b7994fabc 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
@@ -157,7 +157,7 @@ public class UpdateLogImpl implements UpdateLog {
// TODO: IGNITE-19790 Read range from metastore
while (true) {
ByteArray key = CatalogKey.update(ver++);
- Entry entry = metastore.getLocally(key.bytes(), appliedRevision);
+ Entry entry = metastore.getLocally(key, appliedRevision);
if (entry.empty() || entry.tombstone()) {
break;
diff --git
a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java
b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java
index 5a01225087..6b115f5995 100644
---
a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java
+++
b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java
@@ -644,11 +644,6 @@ public abstract class ConfigurationChanger implements
DynamicConfigurationChange
rwLock.writeLock().unlock();
}
- // Save revisions for recovery.
- // We execute synchronously to avoid a race between
notifications about updating the Meta Storage and updating the revision
- // of the Meta Storage.
- storage.writeConfigurationRevision(oldStorageRoots.version,
newStorageRoots.version);
-
long notificationNumber =
notificationListenerCnt.incrementAndGet();
CompletableFuture<Void> notificationFuture;
diff --git
a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/storage/ConfigurationStorage.java
b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/storage/ConfigurationStorage.java
index 7315dfd51b..d9663eb284 100644
---
a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/storage/ConfigurationStorage.java
+++
b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/storage/ConfigurationStorage.java
@@ -79,16 +79,6 @@ public interface ConfigurationStorage extends
ManuallyCloseable {
*/
CompletableFuture<Long> lastRevision();
- /**
- * Writes previous and current configuration's MetaStorage revision for
recovery.
- * We need previous and current for the fail-safety: in case if node fails
before changing master key on configuration update,
- * MetaStorage's applied revision will be lower than {@code
currentRevision} and we will be using previous revision.
- *
- * @param prevRevision Previous revision.
- * @param currentRevision Current revision.
- */
- void writeConfigurationRevision(long prevRevision, long currentRevision);
-
/**
* Closes the storage.
*/
diff --git
a/modules/configuration/src/testFixtures/java/org/apache/ignite/internal/configuration/storage/TestConfigurationStorage.java
b/modules/configuration/src/testFixtures/java/org/apache/ignite/internal/configuration/storage/TestConfigurationStorage.java
index 94193714e4..4231915b31 100644
---
a/modules/configuration/src/testFixtures/java/org/apache/ignite/internal/configuration/storage/TestConfigurationStorage.java
+++
b/modules/configuration/src/testFixtures/java/org/apache/ignite/internal/configuration/storage/TestConfigurationStorage.java
@@ -170,11 +170,6 @@ public class TestConfigurationStorage implements
ConfigurationStorage {
return CompletableFuture.completedFuture(version);
}
- @Override
- public void writeConfigurationRevision(long prevRevision, long
currentRevision) {
- // No-op.
- }
-
/**
* Increase the current revision of the storage.
*
diff --git a/modules/core/src/main/java/org/apache/ignite/lang/ByteArray.java
b/modules/core/src/main/java/org/apache/ignite/lang/ByteArray.java
index 76129b6d55..93b1e14a99 100644
--- a/modules/core/src/main/java/org/apache/ignite/lang/ByteArray.java
+++ b/modules/core/src/main/java/org/apache/ignite/lang/ByteArray.java
@@ -77,6 +77,13 @@ public final class ByteArray implements
Comparable<ByteArray> {
return arr;
}
+ /**
+ * Returns the length of this byte array.
+ */
+ public int length() {
+ return arr.length;
+ }
+
/** {@inheritDoc} */
@Override
public boolean equals(Object o) {
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index 03ce9cbaf0..d61f044560 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -828,7 +828,7 @@ public class DistributionZoneManager implements
IgniteComponent {
ZoneState zoneState = new ZoneState(executor,
topologyAugmentationMap);
- VaultEntry dataNodes =
vaultMgr.get(zoneDataNodesKey(zoneId)).join();
+ Entry dataNodes =
metaStorageManager.getLocally(zoneDataNodesKey(zoneId), revision);
if (dataNodes != null) {
String filter = zone.filter();
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java
index a8df523fba..9c69f09930 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java
@@ -93,7 +93,7 @@ public class BaseDistributionZoneManagerTest extends
BaseIgniteAbstractTest {
components.add(metaStorageManager);
- ConfigurationStorage cfgStorage = new
DistributedConfigurationStorage(metaStorageManager, vaultMgr);
+ ConfigurationStorage cfgStorage = new
DistributedConfigurationStorage(metaStorageManager);
generator = new ConfigurationTreeGenerator(
List.of(DistributionZonesConfiguration.KEY),
diff --git
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index 0461d73924..fb37f7f1ab 100644
---
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -33,6 +33,7 @@ import
org.apache.ignite.internal.metastorage.dsl.StatementResult;
import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
import
org.apache.ignite.internal.metastorage.exceptions.OperationTimeoutException;
import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
+import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.NodeStoppingException;
import org.jetbrains.annotations.Nullable;
@@ -66,6 +67,9 @@ public interface MetaStorageManager extends IgniteComponent {
* Returns all entries corresponding to the given key and bounded by given
revisions.
* All these entries are ordered by revisions and have the same key.
* The lower bound and the upper bound are inclusive.
+ *
+ * <p>This method doesn't wait for the storage's revision to become
greater or equal to the revUpperBound parameter, so it is
+ * up to user to wait for the appropriate time to call this method.
* TODO: IGNITE-19735 move this method to another interface for
interaction with local KeyValueStorage.
*
* @param key The key.
@@ -80,11 +84,28 @@ public interface MetaStorageManager extends IgniteComponent
{
* Returns an entry by the given key and bounded by the given revision.
The entry is obtained
* from the local storage.
*
+ * <p>This method doesn't wait for the storage's revision to become
greater or equal to the revUpperBound parameter, so it is
+ * up to user to wait for the appropriate time to call this method.
+ *
* @param key The key.
* @param revUpperBound The upper bound of revision.
* @return Value corresponding to the given key.
*/
- Entry getLocally(byte[] key, long revUpperBound);
+ Entry getLocally(ByteArray key, long revUpperBound);
+
+ /**
+ * Returns cursor by entries which correspond to the given keys range and
bounded by revision number. The entries in the cursor
+ * are obtained from the local storage.
+ *
+ * <p>This method doesn't wait for the storage's revision to become
greater or equal to the revUpperBound parameter, so it is
+ * up to user to wait for the appropriate time to call this method.
+ *
+ * @param startKey Start key of range (inclusive).
+ * @param endKey Last key of range (exclusive).
+ * @param revUpperBound Upper bound of revision.
+ * @return Cursor by entries which correspond to the given keys range.
+ */
+ Cursor<Entry> getLocally(ByteArray startKey, ByteArray endKey, long
revUpperBound);
/**
* Looks up a timestamp by a revision. This should only be invoked if it
is guaranteed that the
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
index ff2a75c687..d2dbc13721 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.metastorage.impl;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toList;
-import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.flow.TestFlowUtils.subscribeToList;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.will;
@@ -31,8 +30,6 @@ import static
org.apache.ignite.utils.ClusterServiceTestUtils.clusterService;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
@@ -65,7 +62,6 @@ import
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFacto
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.internal.vault.VaultEntry;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
import org.apache.ignite.lang.ByteArray;
@@ -194,73 +190,6 @@ public class ItMetaStorageManagerImplTest extends
IgniteAbstractTest {
assertThat(actualKeysFuture, will(contains(key1.bytes(), key2.bytes(),
key3.bytes())));
}
- /**
- * Tests that "watched" Meta Storage keys get persisted in the Vault.
- */
- @Test
- void testWatchEventsPersistence() throws InterruptedException {
- byte[] value = "value".getBytes(UTF_8);
-
- var key1 = new ByteArray("foo");
- var key2 = new ByteArray("bar");
-
- CompletableFuture<Boolean> invokeFuture = metaStorageManager.invoke(
- Conditions.notExists(new ByteArray("foo")),
- List.of(
- Operations.put(key1, value),
- Operations.put(key2, value)
- ),
- List.of(Operations.noop())
- );
-
- assertThat(invokeFuture, willBe(true));
-
- // No data should be persisted until any watches are registered.
- assertThat(vaultManager.get(key1), willBe(nullValue()));
- assertThat(vaultManager.get(key2), willBe(nullValue()));
-
- metaStorageManager.registerExactWatch(key1, new NoOpListener());
-
- invokeFuture = metaStorageManager.invoke(
- Conditions.exists(new ByteArray("foo")),
- List.of(
- Operations.put(key1, value),
- Operations.put(key2, value)
- ),
- List.of(Operations.noop())
- );
-
- assertThat(invokeFuture, willBe(true));
-
- assertTrue(waitForCondition(() -> metaStorageManager.appliedRevision()
== 2, 10_000));
-
- // Expect that only the watched key is persisted.
- assertThat(vaultManager.get(key1).thenApply(VaultEntry::value),
willBe(value));
- assertThat(vaultManager.get(key2), willBe(nullValue()));
-
- metaStorageManager.registerExactWatch(key2, new NoOpListener());
-
- assertThat(metaStorageManager.appliedRevision(), is(2L));
-
- byte[] newValue = "newValue".getBytes(UTF_8);
-
- invokeFuture = metaStorageManager.invoke(
- Conditions.exists(new ByteArray("foo")),
- List.of(
- Operations.put(key1, newValue),
- Operations.put(key2, newValue)
- ),
- List.of(Operations.noop())
- );
-
- assertThat(invokeFuture, willBe(true));
-
- assertTrue(waitForCondition(() -> metaStorageManager.appliedRevision()
== 3, 10_000));
-
- assertThat(vaultManager.get(key1).thenApply(VaultEntry::value),
willBe(newValue));
- assertThat(vaultManager.get(key2).thenApply(VaultEntry::value),
willBe(newValue));
- }
-
private static class NoOpListener implements WatchListener {
@Override
public CompletableFuture<Void> onUpdate(WatchEvent event) {
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java
index 068b2b2721..47b8ad923a 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java
@@ -45,7 +45,7 @@ public abstract class
ItMetaStorageSafeTimePropagationAbstractTest extends Abstr
@BeforeEach
public void startWatches() {
- storage.startWatches(0, (e, t) -> {
+ storage.startWatches(1, (e, t) -> {
time.updateSafeTime(t);
return CompletableFuture.completedFuture(null);
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
index c0e2d93e9e..e919c6ccbd 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
@@ -218,6 +218,10 @@ public class ItMetaStorageWatchTest extends
IgniteAbstractTest {
String name = nodes.get(0).name();
nodes.get(0).cmgManager.initCluster(List.of(name), List.of(name),
"test");
+
+ for (Node node : nodes) {
+ assertThat(node.metaStorageManager.recoveryFinishedFuture(),
willCompleteSuccessfully());
+ }
}
@Test
@@ -312,7 +316,7 @@ public class ItMetaStorageWatchTest extends
IgniteAbstractTest {
}
/**
- * Tests that metastorage missed metastorage events are replayed after
deploying watches.
+ * Tests that missed metastorage events are replayed after deploying
watches.
*/
@Test
void testReplayUpdates() throws Exception {
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index 5c9c72d8df..10ec576f1a 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -66,6 +66,7 @@ import org.apache.ignite.internal.raft.RaftNodeId;
import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
import
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.vault.VaultEntry;
@@ -206,7 +207,7 @@ public class MetaStorageManagerImpl implements
MetaStorageManager {
assert targetRevision != null;
- listenForRecovery(recoveryFinishedFuture, targetRevision);
+ listenForRecovery(targetRevision);
});
return recoveryFinishedFuture;
@@ -215,10 +216,10 @@ public class MetaStorageManagerImpl implements
MetaStorageManager {
}
}
- private void listenForRecovery(CompletableFuture<Long> res, long
targetRevision) {
+ private void listenForRecovery(long targetRevision) {
storage.setRecoveryRevisionListener(storageRevision -> {
if (!busyLock.enterBusy()) {
- res.completeExceptionally(new NodeStoppingException());
+ recoveryFinishedFuture.completeExceptionally(new
NodeStoppingException());
return;
}
@@ -230,7 +231,7 @@ public class MetaStorageManagerImpl implements
MetaStorageManager {
storage.setRecoveryRevisionListener(null);
- if (res.complete(targetRevision)) {
+ if (recoveryFinishedFuture.complete(targetRevision)) {
LOG.info("Finished MetaStorage recovery");
}
} finally {
@@ -239,7 +240,7 @@ public class MetaStorageManagerImpl implements
MetaStorageManager {
});
if (!busyLock.enterBusy()) {
- res.completeExceptionally(new NodeStoppingException());
+ recoveryFinishedFuture.completeExceptionally(new
NodeStoppingException());
return;
}
@@ -249,7 +250,7 @@ public class MetaStorageManagerImpl implements
MetaStorageManager {
if (storage.revision() >= targetRevision) {
storage.setRecoveryRevisionListener(null);
- if (res.complete(targetRevision)) {
+ if (recoveryFinishedFuture.complete(targetRevision)) {
LOG.info("Finished MetaStorage recovery");
}
}
@@ -502,18 +503,23 @@ public class MetaStorageManagerImpl implements
MetaStorageManager {
}
@Override
- public Entry getLocally(byte[] key, long revUpperBound) {
+ public Entry getLocally(ByteArray key, long revUpperBound) {
if (!busyLock.enterBusy()) {
throw new IgniteException(new NodeStoppingException());
}
try {
- return storage.get(key, revUpperBound);
+ return storage.get(key.bytes(), revUpperBound);
} finally {
busyLock.leaveBusy();
}
}
+ @Override
+ public Cursor<Entry> getLocally(ByteArray startKey, ByteArray endKey, long
revUpperBound) {
+ return storage.range(startKey.bytes(), endKey.bytes(), revUpperBound);
+ }
+
@Override
public HybridTimestamp timestampByRevision(long revision) {
if (!busyLock.enterBusy()) {
@@ -824,19 +830,7 @@ public class MetaStorageManagerImpl implements
MetaStorageManager {
clusterTime.updateSafeTime(time);
try {
- CompletableFuture<Void> saveToVaultFuture;
-
- if (watchEvent.entryEvents().isEmpty()) {
- saveToVaultFuture = vaultMgr.put(APPLIED_REV_KEY,
longToBytes(watchEvent.revision()));
- } else {
- Map<ByteArray, byte[]> batch =
IgniteUtils.newHashMap(watchEvent.entryEvents().size() + 1);
-
- batch.put(APPLIED_REV_KEY, longToBytes(watchEvent.revision()));
-
- watchEvent.entryEvents().forEach(e -> batch.put(new
ByteArray(e.newEntry().key()), e.newEntry().value()));
-
- saveToVaultFuture = vaultMgr.putAll(batch);
- }
+ CompletableFuture<Void> saveToVaultFuture =
vaultMgr.put(APPLIED_REV_KEY, longToBytes(watchEvent.revision()));
return saveToVaultFuture.thenRun(() -> appliedRevision =
watchEvent.revision());
} finally {
@@ -906,7 +900,7 @@ public class MetaStorageManagerImpl implements
MetaStorageManager {
}
/** Explicitly notifies revision update listeners. */
- public CompletableFuture<Void> notifyRevisionUpdateListenerOnStart(long
newRevision) {
- return storage.notifyRevisionUpdateListenerOnStart(newRevision);
+ public CompletableFuture<Void> notifyRevisionUpdateListenerOnStart() {
+ return
recoveryFinishedFuture.thenCompose(storage::notifyRevisionUpdateListenerOnStart);
}
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index ee7f3628f5..abb2d85848 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -992,6 +992,8 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
@Override
public void startWatches(long startRevision, OnRevisionAppliedCallback
revisionCallback) {
+ assert startRevision != 0 : "First meaningful revision is 1";
+
long currentRevision;
rwLock.readLock().lock();
@@ -1496,9 +1498,7 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
}
private void replayUpdates(long lowerRevision, long upperRevision) {
- // TODO: https://issues.apache.org/jira/browse/IGNITE-19778 Should be
Math.max, so we start from the revision that
- // components restored their state to (lowerRevision).
- long minWatchRevision = Math.min(lowerRevision,
watchProcessor.minWatchRevision().orElse(-1));
+ long minWatchRevision = Math.max(lowerRevision,
watchProcessor.minWatchRevision().orElse(-1));
if (minWatchRevision == -1 || minWatchRevision > upperRevision) {
// No events to replay, we can start processing more recent events
from the event queue.
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
index d0640a4d9c..198ade0395 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
@@ -1969,7 +1969,7 @@ public abstract class BasicOperationsKeyValueStorageTest
extends AbstractKeyValu
long appliedRevision = storage.revision();
- storage.startWatches(0, (event, ts) -> completedFuture(null));
+ storage.startWatches(1, (event, ts) -> completedFuture(null));
CompletableFuture<byte[]> fut = new CompletableFuture<>();
@@ -2310,7 +2310,7 @@ public abstract class BasicOperationsKeyValueStorageTest
extends AbstractKeyValu
when(mockCallback.onRevisionApplied(any(),
any())).thenReturn(completedFuture(null));
- storage.startWatches(0, mockCallback);
+ storage.startWatches(1, mockCallback);
putToMs(key, value);
@@ -2505,7 +2505,7 @@ public abstract class BasicOperationsKeyValueStorageTest
extends AbstractKeyValu
}
});
- storage.startWatches(0, (event, ts) -> completedFuture(null));
+ storage.startWatches(1, (event, ts) -> completedFuture(null));
return resultFuture;
}
diff --git
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index dbc4ea68b1..6760682702 100644
---
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -483,6 +483,8 @@ public class SimpleInMemoryKeyValueStorage implements
KeyValueStorage {
@Override
public void startWatches(long startRevision, OnRevisionAppliedCallback
revisionCallback) {
+ assert startRevision != 0 : "First meaningful revision is 1";
+
synchronized (mux) {
areWatchesEnabled = true;
@@ -493,9 +495,7 @@ public class SimpleInMemoryKeyValueStorage implements
KeyValueStorage {
}
private void replayUpdates(long startRevision) {
- // TODO: https://issues.apache.org/jira/browse/IGNITE-19778 Should be
Math.max, so we start from the revision that
- // components restored their state to (lowerRevision).
- long minWatchRevision = Math.min(startRevision,
watchProcessor.minWatchRevision().orElse(-1));
+ long minWatchRevision = Math.max(startRevision,
watchProcessor.minWatchRevision().orElse(-1));
if (minWatchRevision <= 0) {
return;
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/BaseIgniteRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/BaseIgniteRestartTest.java
index 54c6b5fe73..8604076f57 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/BaseIgniteRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/BaseIgniteRestartTest.java
@@ -17,9 +17,9 @@
package org.apache.ignite.internal;
-import static
org.apache.ignite.internal.recovery.ConfigurationCatchUpListener.CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import java.io.IOException;
import java.nio.file.Files;
@@ -29,7 +29,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.ignite.IgnitionManager;
import org.apache.ignite.configuration.ConfigurationModule;
@@ -41,20 +40,16 @@ import
org.apache.ignite.internal.configuration.ConfigurationModules;
import org.apache.ignite.internal.configuration.ConfigurationRegistry;
import org.apache.ignite.internal.configuration.ConfigurationTreeGenerator;
import org.apache.ignite.internal.configuration.ServiceLoaderModulesProvider;
-import org.apache.ignite.internal.configuration.storage.ConfigurationStorage;
import
org.apache.ignite.internal.configuration.storage.DistributedConfigurationStorage;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
-import org.apache.ignite.internal.recovery.ConfigurationCatchUpListener;
-import org.apache.ignite.internal.recovery.RecoveryCompletionFutureFactory;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteStringFormatter;
-import org.apache.ignite.lang.IgniteSystemProperties;
import org.intellij.lang.annotations.Language;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
@@ -239,34 +234,21 @@ public abstract class BaseIgniteRestartTest extends
IgniteAbstractTest {
ConfigurationTreeGenerator distributedConfigurationGenerator,
ConfigurationRegistry clusterConfigRegistry
) {
- AtomicLong lastRevision = new AtomicLong();
-
- Consumer<Long> revisionCallback0 = rev -> {
- if (revisionCallback != null) {
- revisionCallback.accept(rev);
- }
-
- lastRevision.set(rev);
- };
-
- CompletableFuture<Void> configurationCatchUpFuture =
RecoveryCompletionFutureFactory.create(
- clusterCfgMgr,
- fut -> new TestConfigurationCatchUpListener(cfgStorage, fut,
revisionCallback0)
- );
-
CompletableFuture<?> startFuture = CompletableFuture.allOf(
nodeCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(),
clusterConfigRegistry.notifyCurrentConfigurationListeners()
).thenCompose(unused ->
// Deploy all registered watches because all components are
ready and have registered their listeners.
- CompletableFuture.allOf(metaStorageMgr.deployWatches(),
configurationCatchUpFuture)
+ metaStorageMgr.deployWatches()
);
assertThat("Partial node was not started", startFuture,
willCompleteSuccessfully());
- log.info("Completed recovery on partially started node, last revision
applied: " + lastRevision.get()
- + ", acceptableDifference: " +
IgniteSystemProperties.getInteger(CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY,
100)
- );
+ Long recoveryRevision =
metaStorageMgr.recoveryFinishedFuture().getNow(null);
+
+ assertNotNull(recoveryRevision);
+
+ log.info("Completed recovery on partially started node, MetaStorage
revision recovered to: " + recoveryRevision);
return new PartialNode(
components,
@@ -333,38 +315,4 @@ public abstract class BaseIgniteRestartTest extends
IgniteAbstractTest {
return logicalTopology;
}
}
-
- /**
- * Configuration catch-up listener for test.
- */
- public static class TestConfigurationCatchUpListener extends
ConfigurationCatchUpListener {
- /** Callback called on revision update. */
- private final Consumer<Long> revisionCallback;
-
- /**
- * Constructor.
- *
- * @param cfgStorage Configuration storage.
- * @param catchUpFuture Catch-up future.
- */
- TestConfigurationCatchUpListener(
- ConfigurationStorage cfgStorage,
- CompletableFuture<Void> catchUpFuture,
- Consumer<Long> revisionCallback
- ) {
- super(cfgStorage, catchUpFuture, log);
-
- this.revisionCallback = revisionCallback;
- }
-
- /** {@inheritDoc} */
- @Override
- public CompletableFuture<?> onUpdate(long appliedRevision) {
- if (revisionCallback != null) {
- revisionCallback.accept(appliedRevision);
- }
-
- return super.onUpdate(appliedRevision);
- }
- }
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
index 9a98a90d9d..6d50b1e47f 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
@@ -184,7 +184,7 @@ public class ItDistributedConfigurationPropertiesTest {
deployWatchesFut = metaStorageManager.deployWatches();
// create a custom storage implementation that is able to "lose"
some storage updates
- var distributedCfgStorage = new
DistributedConfigurationStorage(metaStorageManager, vaultManager) {
+ var distributedCfgStorage = new
DistributedConfigurationStorage(metaStorageManager) {
/** {@inheritDoc} */
@Override
public synchronized void
registerConfigurationListener(ConfigurationStorageListener listener) {
@@ -228,7 +228,7 @@ public class ItDistributedConfigurationPropertiesTest {
Stream.of(clusterService, raftManager, cmgManager,
metaStorageManager)
.forEach(IgniteComponent::start);
- distributedCfgManager.start();
+ CompletableFuture.runAsync(distributedCfgManager::start);
}
/**
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
index 0a88f8fd22..9c8bac497c 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
@@ -157,7 +157,7 @@ public class ItDistributedConfigurationStorageTest {
deployWatchesFut = metaStorageManager.deployWatches();
- cfgStorage = new
DistributedConfigurationStorage(metaStorageManager, vaultManager);
+ cfgStorage = new
DistributedConfigurationStorage(metaStorageManager);
}
/**
@@ -232,8 +232,6 @@ public class ItDistributedConfigurationStorageTest {
assertThat(node.cfgStorage.write(data, 0), willBe(equalTo(true)));
- node.cfgStorage.writeConfigurationRevision(0, 1);
-
assertTrue(waitForCondition(
() -> node.metaStorageManager.appliedRevision() != 0,
3000
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
index 23fff6513d..77b022c0d4 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.configuration.storage;
import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.function.Function.identity;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.alterZoneReplicas;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.createZone;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.partitionAssignments;
@@ -56,6 +57,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -168,6 +170,7 @@ import org.apache.ignite.utils.ClusterServiceTestUtils;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -178,6 +181,7 @@ import org.mockito.Mockito;
*/
@ExtendWith(WorkDirectoryExtension.class)
@ExtendWith(ConfigurationExtension.class)
+@Disabled("https://issues.apache.org/jira/browse/IGNITE-19506")
public class ItRebalanceDistributedTest {
/** Ignite logger. */
private static final IgniteLogger LOG =
Loggers.forClass(ItRebalanceDistributedTest.class);
@@ -483,6 +487,7 @@ public class ItRebalanceDistributedTest {
checkInvokeDestroyedPartitionStorages(evictedNode, TABLE_1_NAME, 0);
}
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-19506")
@Test
@UseTestTxStateStorage
@UseRocksMetaStorage
@@ -711,7 +716,7 @@ public class ItRebalanceDistributedTest {
metaStorageConfiguration
);
- cfgStorage = new
DistributedConfigurationStorage(metaStorageManager, vaultManager);
+ cfgStorage = new
DistributedConfigurationStorage(metaStorageManager);
clusterCfgGenerator = new ConfigurationTreeGenerator(
List.of(
@@ -863,40 +868,51 @@ public class ItRebalanceDistributedTest {
* Starts the created components.
*/
void start() {
- nodeComponents = List.of(
+ nodeComponents = new CopyOnWriteArrayList<>();
+
+ List<IgniteComponent> firstComponents = List.of(
vaultManager,
nodeCfgMgr,
clusterService,
- raftManager,
- cmgManager,
- metaStorageManager,
- clusterCfgMgr,
- clockWaiter,
- catalogManager,
- distributionZoneManager,
- replicaManager,
- txManager,
- baselineMgr,
- dataStorageMgr,
- schemaManager,
- tableManager
+ raftManager
);
- nodeComponents.forEach(IgniteComponent::start);
+ firstComponents.forEach(IgniteComponent::start);
+
+ nodeComponents.addAll(firstComponents);
+
+ deployWatchesFut = CompletableFuture.supplyAsync(() -> {
+ List<IgniteComponent> secondComponents = List.of(
+ cmgManager,
+ metaStorageManager,
+ clusterCfgMgr,
+ clockWaiter,
+ catalogManager,
+ distributionZoneManager,
+ replicaManager,
+ txManager,
+ baselineMgr,
+ dataStorageMgr,
+ schemaManager,
+ tableManager
+ );
+
+ secondComponents.forEach(IgniteComponent::start);
+
+ nodeComponents.addAll(secondComponents);
- assertThat(
- allOf(
+ var configurationNotificationFut =
metaStorageManager.recoveryFinishedFuture().thenCompose(rev -> {
+ return allOf(
nodeCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(),
clusterCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(),
- // Why "-1"? I don't know, it just works like that.
- ((MetaStorageManagerImpl)
metaStorageManager).notifyRevisionUpdateListenerOnStart(
- metaStorageManager.appliedRevision() - 1
- )
- ),
- willSucceedIn(1, TimeUnit.MINUTES)
- );
+ ((MetaStorageManagerImpl)
metaStorageManager).notifyRevisionUpdateListenerOnStart()
+ );
+ });
+
+ assertThat(configurationNotificationFut, willSucceedIn(1,
TimeUnit.MINUTES));
- deployWatchesFut = metaStorageManager.deployWatches();
+ return metaStorageManager.deployWatches();
+ }).thenCompose(identity());
}
/**
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItDistributionZonesFilterTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItDistributionZonesFilterTest.java
index 74198469b5..4f4743953c 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItDistributionZonesFilterTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItDistributionZonesFilterTest.java
@@ -49,6 +49,7 @@ import
org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.sql.Session;
import org.intellij.lang.annotations.Language;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
/**
@@ -95,6 +96,8 @@ public class ItDistributionZonesFilterTest extends
ClusterPerTestIntegrationTest
*
* @throws Exception If failed.
*/
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-19955 also blocks
this.
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-19506")
@Test
void testFilteredDataNodesPropagatedToStable() throws Exception {
String filter = "'$[?(@.region == \"US\" && @.storage == \"SSD\")]'";
@@ -172,6 +175,8 @@ public class ItDistributionZonesFilterTest extends
ClusterPerTestIntegrationTest
*
* @throws Exception If failed.
*/
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-19955 also blocks
this.
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-19506")
@Test
void testAlteringFiltersPropagatedDataNodesToStableImmediately() throws
Exception {
String filter = "'$[?(@.region == \"US\" && @.storage == \"SSD\")]'";
@@ -240,6 +245,8 @@ public class ItDistributionZonesFilterTest extends
ClusterPerTestIntegrationTest
*
* @throws Exception If failed.
*/
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-19955 also blocks
this.
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-19506")
@Test
void testEmptyDataNodesDoNotPropagatedToStableAfterAlteringFilter() throws
Exception {
String filter = "'$[?(@.region == \"US\" && @.storage == \"SSD\")]'";
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java
index d744178a3a..daff409fc2 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java
@@ -26,7 +26,6 @@ import static
org.apache.ignite.internal.distributionzones.DistributionZonesTest
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesGlobalStateRevision;
-import static
org.apache.ignite.internal.recovery.ConfigurationCatchUpListener.CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.util.ByteUtils.bytesToLong;
@@ -80,7 +79,6 @@ import
org.apache.ignite.internal.network.configuration.NetworkConfiguration;
import org.apache.ignite.internal.network.recovery.VaultStateIds;
import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import org.apache.ignite.internal.testframework.TestIgnitionManager;
-import org.apache.ignite.internal.testframework.WithSystemProperty;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.network.ClusterNode;
@@ -96,7 +94,6 @@ import org.junit.jupiter.params.provider.MethodSource;
/**
* Tests for checking {@link DistributionZoneManager} behavior after node's
restart.
*/
-@WithSystemProperty(key = CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY, value =
"0")
@ExtendWith(ConfigurationExtension.class)
public class ItIgniteDistributionZoneManagerNodeRestartTest extends
BaseIgniteRestartTest {
private static final LogicalNode A = new LogicalNode(
@@ -181,7 +178,7 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest
extends BaseIgniteRe
new TestRocksDbKeyValueStorage(name,
workDir.resolve("metastorage"))
));
- var cfgStorage = new DistributedConfigurationStorage(metaStorageMgr,
vault);
+ var cfgStorage = new DistributedConfigurationStorage(metaStorageMgr);
ConfigurationTreeGenerator distributedConfigurationGenerator = new
ConfigurationTreeGenerator(
modules.distributed().rootKeys(),
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 5d3923ef7d..df0527853e 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.runner.app;
-import static
org.apache.ignite.internal.recovery.ConfigurationCatchUpListener.CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -116,7 +115,6 @@ import
org.apache.ignite.internal.table.distributed.TableMessageGroup;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.internal.testframework.TestIgnitionManager;
-import org.apache.ignite.internal.testframework.WithSystemProperty;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
@@ -149,7 +147,6 @@ import org.junit.jupiter.params.provider.ValueSource;
/**
* These tests check node restart scenarios.
*/
-@WithSystemProperty(key = CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY, value =
"0")
@ExtendWith(ConfigurationExtension.class)
@Timeout(120)
public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest {
@@ -297,7 +294,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
metaStorageConfiguration
);
- var cfgStorage = new DistributedConfigurationStorage(metaStorageMgr,
vault);
+ var cfgStorage = new DistributedConfigurationStorage(metaStorageMgr);
ConfigurationTreeGenerator distributedConfigurationGenerator = new
ConfigurationTreeGenerator(
modules.distributed().rootKeys(),
@@ -779,7 +776,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
for (int i = 0; i < 10; i++) {
ByteArray key = ByteArray.fromString("some-test-key-" + i);
- byte[] value = restartedMs.getLocally(key.bytes(), 100).value();
+ byte[] value = restartedMs.getLocally(key, 100).value();
assertEquals(1, value.length);
assertEquals((byte) i, value[0]);
@@ -1020,7 +1017,6 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
* The test for node restart when there is a gap between the node local
configuration and distributed configuration.
*/
@Test
- @WithSystemProperty(key = CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY,
value = "0")
public void testCfgGapWithoutData() throws InterruptedException {
List<IgniteImpl> nodes = startNodes(3);
@@ -1052,7 +1048,6 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
*/
@Test
@Disabled(value = "https://issues.apache.org/jira/browse/IGNITE-18919")
- @WithSystemProperty(key = CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY,
value = "0")
public void testMetastorageStop() throws InterruptedException {
int cfgGap = 4;
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 5b2b53f0ba..dfd08d83ce 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -69,7 +69,6 @@ import
org.apache.ignite.internal.configuration.ConfigurationTreeGenerator;
import
org.apache.ignite.internal.configuration.DistributedConfigurationUpdater;
import org.apache.ignite.internal.configuration.SecurityConfiguration;
import org.apache.ignite.internal.configuration.ServiceLoaderModulesProvider;
-import
org.apache.ignite.internal.configuration.notifications.ConfigurationStorageRevisionListener;
import org.apache.ignite.internal.configuration.presentation.HoconPresentation;
import org.apache.ignite.internal.configuration.storage.ConfigurationStorage;
import
org.apache.ignite.internal.configuration.storage.DistributedConfigurationStorage;
@@ -103,8 +102,6 @@ import org.apache.ignite.internal.raft.Loza;
import
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import
org.apache.ignite.internal.raft.storage.impl.VolatileLogStorageFactoryCreator;
-import org.apache.ignite.internal.recovery.ConfigurationCatchUpListener;
-import org.apache.ignite.internal.recovery.RecoveryCompletionFutureFactory;
import org.apache.ignite.internal.replicator.ReplicaManager;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.rest.RestComponent;
@@ -430,7 +427,7 @@ public class IgniteImpl implements Ignite {
topologyAwareRaftGroupServiceFactory
);
- this.cfgStorage = new DistributedConfigurationStorage(metaStorageMgr,
vaultMgr);
+ this.cfgStorage = new DistributedConfigurationStorage(metaStorageMgr);
clusterCfgMgr = new ConfigurationManager(
modules.distributed().rootKeys(),
@@ -982,44 +979,16 @@ public class IgniteImpl implements Ignite {
* and deploying watches after that.
*/
private CompletableFuture<?> recoverComponentsStateOnStart(ExecutorService
startupExecutor) {
- // Recovery future must be created before configuration listeners are
triggered.
- CompletableFuture<?> recoveryFuture =
RecoveryCompletionFutureFactory.create(
- clusterCfgMgr,
- fut -> new ConfigurationCatchUpListener(cfgStorage, fut, LOG)
- );
-
- //TODO https://issues.apache.org/jira/browse/IGNITE-19778
- // The order of these two lines matter, the first method relies on the
second one not being called yet.
- // After the fix, the order will most likely have to be reversed.
- CompletableFuture<Void> startupRevisionUpdate =
notifyRevisionUpdateListenerOnStart();
CompletableFuture<Void> startupConfigurationUpdate =
notifyConfigurationListeners();
+ CompletableFuture<Void> startupRevisionUpdate =
metaStorageMgr.notifyRevisionUpdateListenerOnStart();
return CompletableFuture.allOf(startupConfigurationUpdate,
startupRevisionUpdate)
.thenComposeAsync(t -> {
// Deploy all registered watches because all components
are ready and have registered their listeners.
- return metaStorageMgr.deployWatches().thenCompose(unused
-> recoveryFuture);
+ return metaStorageMgr.deployWatches();
}, startupExecutor);
}
- private CompletableFuture<Void> notifyRevisionUpdateListenerOnStart() {
- //TODO https://issues.apache.org/jira/browse/IGNITE-19778 Use
meta-storages revision after recovery,
- // it should match configuration revision.
- // Temporary workaround.
- // In order to avoid making a public getter for configuration
revision, I read it from the startup notification.
- // It should be removed once we start using up-to-date meta-storage
revision for node startup.
- var configurationRevisionFuture = new CompletableFuture<Void>();
-
- ConfigurationStorageRevisionListener revisionListener =
newStorageRevision ->
- ((MetaStorageManagerImpl)
metaStorageMgr).notifyRevisionUpdateListenerOnStart(newStorageRevision)
- .thenRun(() ->
configurationRevisionFuture.complete(null));
-
- clusterConfiguration().listenUpdateStorageRevision(revisionListener);
-
- return configurationRevisionFuture.thenRun(() ->
-
clusterConfiguration().stopListenUpdateStorageRevision(revisionListener)
- );
- }
-
/**
* Notify all listeners of current configurations.
*/
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
index cbfe4bd99b..71e79ff9ab 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
@@ -18,6 +18,9 @@
package org.apache.ignite.internal.configuration.storage;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.or;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.revision;
import java.io.Serializable;
import java.util.Arrays;
@@ -41,19 +44,15 @@ import org.apache.ignite.internal.metastorage.EntryEvent;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metastorage.dsl.Condition;
import org.apache.ignite.internal.metastorage.dsl.ConditionType;
-import org.apache.ignite.internal.metastorage.dsl.Conditions;
import org.apache.ignite.internal.metastorage.dsl.Operation;
import org.apache.ignite.internal.metastorage.dsl.Operations;
-import org.apache.ignite.internal.metastorage.dsl.SimpleCondition;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.internal.vault.VaultEntry;
-import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.lang.ByteArray;
-import org.jetbrains.annotations.Nullable;
/**
* Distributed configuration storage.
@@ -70,11 +69,6 @@ public class DistributedConfigurationStorage implements
ConfigurationStorage {
*/
private static final ByteArray MASTER_KEY = new
ByteArray(DISTRIBUTED_PREFIX + "$master$key");
- /**
- * Vault's key for a value of previous and current configuration's
MetaStorage revision.
- */
- private static final ByteArray CONFIGURATION_REVISIONS_KEY = new
ByteArray("$revisions");
-
/**
* Prefix for all keys in the distributed storage. This key is expected to
be the first key in lexicographical order of distributed
* configuration keys.
@@ -91,9 +85,6 @@ public class DistributedConfigurationStorage implements
ConfigurationStorage {
/** Meta storage manager. */
private final MetaStorageManager metaStorageMgr;
- /** Vault manager. */
- private final VaultManager vaultMgr;
-
/** Configuration changes listener. */
private volatile ConfigurationStorageListener lsnr;
@@ -105,10 +96,10 @@ public class DistributedConfigurationStorage implements
ConfigurationStorage {
* <p>Given that {@link #MASTER_KEY} is updated on every configuration
change, one could assume that {@code changeId} matches the
* revision of {@link #MASTER_KEY}.
*
- * <p>This is true for all cases except for node restart. Key-specific
revision values are lost on local vault copy after restart, so
- * stored {@link MetaStorageManager#appliedRevision} value is used
instead. This fact has very important side effect: it's no longer
- * possible to use {@link ConditionType#REV_EQUAL} on {@link #MASTER_KEY}
in {@link DistributedConfigurationStorage#write(Map, long)}.
- * {@link ConditionType#REV_LESS_OR_EQUAL} must be used instead.
+ * <p>This is true for all cases except for node restart. We use latest
values after restart, so MetaStorage's local revision is used
+ * instead. This fact has very important side effect: it's no longer
possible to use {@link ConditionType#REV_EQUAL} on
+ * {@link #MASTER_KEY} in {@link
DistributedConfigurationStorage#write(Map, long)}. {@link
ConditionType#REV_LESS_OR_EQUAL} must be
+ * used instead.
*
* @see #MASTER_KEY
* @see #write(Map, long)
@@ -123,12 +114,9 @@ public class DistributedConfigurationStorage implements
ConfigurationStorage {
* Constructor.
*
* @param metaStorageMgr Meta storage manager.
- * @param vaultMgr Vault manager.
*/
- public DistributedConfigurationStorage(MetaStorageManager metaStorageMgr,
VaultManager vaultMgr) {
+ public DistributedConfigurationStorage(MetaStorageManager metaStorageMgr) {
this.metaStorageMgr = metaStorageMgr;
-
- this.vaultMgr = vaultMgr;
}
@Override
@@ -206,60 +194,46 @@ public class DistributedConfigurationStorage implements
ConfigurationStorage {
@Override
public CompletableFuture<Data> readDataOnRecovery() throws
StorageException {
- CompletableFuture<Data> future =
vaultMgr.get(CONFIGURATION_REVISIONS_KEY)
- .thenApplyAsync(entry -> {
- long revision =
resolveRevision(metaStorageMgr.appliedRevision(), entry);
-
- return readDataOnRecovery0(revision);
- }, threadPool);
+ CompletableFuture<Data> future =
metaStorageMgr.recoveryFinishedFuture()
+ .thenApplyAsync(this::readDataOnRecovery0, threadPool);
return registerFuture(future);
}
- /**
- * Resolves current configuration revision based on the saved in the Vault
revision of the metastorage and also previous and current
- * revisions of the configuration saved in the Vault.
- *
- * @param metaStorageRevision Meta Storage revision.
- * @param revisionsEntry Configuration revisions entry.
- * @return Configuration revision.
- */
- private static long resolveRevision(long metaStorageRevision, @Nullable
VaultEntry revisionsEntry) {
- if (revisionsEntry != null) {
- byte[] value = revisionsEntry.value();
- long prevMasterKeyRevision = ByteUtils.bytesToLong(value, 0);
- long curMasterKeyRevision = ByteUtils.bytesToLong(value,
Long.BYTES);
-
- // If current master key revision is higher than applied revision,
then node failed
- // before applied revision changed, so we have to use previous
master key revision
- return curMasterKeyRevision <= metaStorageRevision ?
curMasterKeyRevision : prevMasterKeyRevision;
- } else {
- // Configuration has not been updated yet, so it is safe to return
0 as the revision for the master key.
- return 0L;
- }
- }
-
private Data readDataOnRecovery0(long cfgRevision) {
var data = new HashMap<String, Serializable>();
- try (Cursor<VaultEntry> entries = storedDistributedConfigKeys()) {
- for (VaultEntry entry : entries) {
- ByteArray key = entry.key();
+ byte[] masterKey = MASTER_KEY.bytes();
+ boolean sawMasterKey = false;
+
+ try (Cursor<Entry> cursor =
metaStorageMgr.getLocally(DST_KEYS_START_RANGE, DST_KEYS_END_RANGE,
cfgRevision)) {
+ for (Entry entry : cursor) {
+ if (entry.tombstone()) {
+ continue;
+ }
+
+ byte[] key = entry.key();
byte[] value = entry.value();
- // vault iterator should not return nulls as values
+ // MetaStorage iterator should not return nulls as values.
assert value != null;
- if (key.equals(MASTER_KEY)) {
+ if (!sawMasterKey && Arrays.equals(masterKey, key)) {
+ sawMasterKey = true;
+
continue;
}
- String dataKey =
key.toString().substring(DISTRIBUTED_PREFIX.length());
+ int startIdx = DST_KEYS_START_RANGE.length();
+
+ int keyLengthWithoutPrefix = key.length - startIdx;
+
+ var dataKey = new String(key, startIdx,
keyLengthWithoutPrefix, UTF_8);
data.put(dataKey,
ConfigurationSerializationUtil.fromBytes(value));
}
} catch (Exception e) {
- throw new StorageException("Exception when closing a Vault
cursor", e);
+ throw new StorageException("Exception reading data on recovery",
e);
}
assert data.isEmpty() || cfgRevision > 0;
@@ -296,9 +270,26 @@ public class DistributedConfigurationStorage implements
ConfigurationStorage {
operations.add(Operations.put(MASTER_KEY,
ByteUtils.longToBytes(curChangeId)));
- SimpleCondition condition = curChangeId == 0L
- ? Conditions.notExists(MASTER_KEY)
- : Conditions.revision(MASTER_KEY).eq(curChangeId);
+ // Condition for a valid MetaStorage data update. Several
possibilities here:
+ // - First update ever, MASTER_KEY property must be absent from
MetaStorage.
+ // - Current node has already performed some updates or received them
from MetaStorage watch listener. In this
+ // case "curChangeId" must match the MASTER_KEY revision exactly.
+ // - Current node has been restarted and received updates from
MetaStorage watch listeners after that. Same as
+ // above, "curChangeId" must match the MASTER_KEY revision exactly.
+ // - Current node has been restarted and have not received any
updates from MetaStorage watch listeners yet.
+ // In this case "curChangeId" matches MetaStorage's local revision,
which may or may not match the MASTER_KEY revision. Two
+ // options here:
+ // - MASTER_KEY is missing in local MetaStorage copy. This means
that current node have not performed nor
+ // observed any configuration changes. Valid condition is
"MASTER_KEY does not exist".
+ // - MASTER_KEY is present in local MetaStorage copy. The
MASTER_KEY revision is unknown but is less than or
+ // equal to MetaStorage's local revision. Obviously, there have
been no updates from the future yet. It's also guaranteed
+ // that the next received configuration update will have the
MASTER_KEY revision strictly greater than
+ // current MetaStorage's local revision. This allows to conclude
that "MASTER_KEY revision <= curChangeId" is a valid
+ // condition for update.
+ // Joining all of the above, it's concluded that the following
condition must be used:
+ Condition condition = curChangeId == 0L
+ ? notExists(MASTER_KEY)
+ : or(notExists(MASTER_KEY),
revision(MASTER_KEY).le(curChangeId));
return metaStorageMgr.invoke(condition, operations,
Set.of(Operations.noop()));
}
@@ -374,29 +365,6 @@ public class DistributedConfigurationStorage implements
ConfigurationStorage {
return metaStorageMgr.get(MASTER_KEY).thenApply(Entry::revision);
}
- @Override
- public void writeConfigurationRevision(long prevRevision, long
currentRevision) {
- byte[] value = new byte[Long.BYTES * 2];
-
- ByteUtils.putLongToBytes(prevRevision, value, 0);
- ByteUtils.putLongToBytes(currentRevision, value, Long.BYTES);
-
- vaultMgr.put(CONFIGURATION_REVISIONS_KEY, value).join();
- }
-
- /**
- * Method that returns all distributed configuration keys from the meta
storage that were stored in the vault filtered out by the
- * current applied revision as an upper bound. Applied revision is a
revision of the last successful vault update.
- *
- * <p>This is possible to distinguish cfg keys from meta storage because
we add a special prefix {@link
- * DistributedConfigurationStorage#DISTRIBUTED_PREFIX} to all
configuration keys that we put to the meta storage.
- *
- * @return Iterator built upon all distributed configuration entries
stored in vault.
- */
- private Cursor<VaultEntry> storedDistributedConfigKeys() {
- return vaultMgr.range(DST_KEYS_START_RANGE, DST_KEYS_END_RANGE);
- }
-
/**
* Increments the last character of the given string.
*/
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java
b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java
index b832ee4927..c026a6aa04 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java
@@ -223,11 +223,6 @@ public class LocalConfigurationStorage implements
ConfigurationStorage {
.thenApply(entry -> entry == null ? 0 : (Long)
fromBytes(entry.value()));
}
- @Override
- public void writeConfigurationRevision(long prevRevision, long
currentRevision) {
- // No-op.
- }
-
/**
* Increments the last character of the given string.
*/
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalFileConfigurationStorage.java
b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalFileConfigurationStorage.java
index e3d75d477b..742745c4cb 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalFileConfigurationStorage.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalFileConfigurationStorage.java
@@ -219,11 +219,6 @@ public class LocalFileConfigurationStorage implements
ConfigurationStorage {
return CompletableFuture.completedFuture(lastRevision);
}
- @Override
- public void writeConfigurationRevision(long prevRevision, long
currentRevision) {
- // No-op.
- }
-
@Override
public void close() {
futureTracker.cancelInFlightFutures();
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/recovery/ConfigurationCatchUpListener.java
b/modules/runner/src/main/java/org/apache/ignite/internal/recovery/ConfigurationCatchUpListener.java
deleted file mode 100644
index ef5cc3c4f0..0000000000
---
a/modules/runner/src/main/java/org/apache/ignite/internal/recovery/ConfigurationCatchUpListener.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.recovery;
-
-import java.util.concurrent.CompletableFuture;
-import
org.apache.ignite.internal.configuration.notifications.ConfigurationStorageRevisionListener;
-import org.apache.ignite.internal.configuration.storage.ConfigurationStorage;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.lang.IgniteStringFormatter;
-import org.apache.ignite.lang.IgniteSystemProperties;
-
-/**
- * Configuration listener class that is intended to complete catch-up future
during recovery when configuration
- * is up-to-date.
- */
-public class ConfigurationCatchUpListener implements
ConfigurationStorageRevisionListener {
- /** Configuration catch-up difference property name. */
- public static final String CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY =
"CONFIGURATION_CATCH_UP_DIFFERENCE";
-
- /**
- * Difference between the local node applied revision and distributed data
storage revision on start.
- * TODO: IGNITE-16488 Make this property adjustable and remove system
property.
- */
- private final int configurationCatchUpDifference =
-
IgniteSystemProperties.getInteger(CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY,
100);
-
- /** Revision to catch up. */
- private volatile long targetRevision = -1;
-
- /** Catch-up future. */
- private final CompletableFuture<Void> catchUpFuture;
-
- /** Configuration storage. */
- private final ConfigurationStorage cfgStorage;
-
- /** Mutex for updating target revision. */
- private final Object targetRevisionUpdateMutex = new Object();
-
- /** Logger. */
- private final IgniteLogger log;
-
- /**
- * Constructor.
- *
- * @param catchUpFuture Catch-up future.
- */
- public ConfigurationCatchUpListener(ConfigurationStorage cfgStorage,
CompletableFuture<Void> catchUpFuture, IgniteLogger log) {
- this.cfgStorage = cfgStorage;
- this.catchUpFuture = catchUpFuture;
- this.log = log;
- }
-
- /**
- * Checks the node up to date by distributed configuration.
- *
- * @param targetRevision Configuration revision.
- * @param appliedRevision Last applied node revision.
- * @return True when the applied revision is great enough for node
recovery to complete, false otherwise.
- */
- private boolean isConfigurationUpToDate(long targetRevision, long
appliedRevision) {
- return targetRevision - configurationCatchUpDifference <=
appliedRevision;
- }
-
- /**
- * Retrieve distribute configuration revision and check whether local
revision is great enough to complete the recovery.
- *
- * @param appliedRevision Applied revision.
- */
- private CompletableFuture<?> checkRevisionUpToDate(long appliedRevision) {
- return cfgStorage.lastRevision().thenAccept(rev -> {
- synchronized (targetRevisionUpdateMutex) {
- assert rev >= appliedRevision : IgniteStringFormatter.format(
- "Configuration revision must be greater than local node
applied revision [msRev={}, appliedRev={}",
- rev, appliedRevision);
-
- targetRevision = rev;
-
- log.info("Checking revision on recovery [targetRevision={},
appliedRevision={}, acceptableDifference={}]",
- targetRevision, appliedRevision,
configurationCatchUpDifference);
-
- if (isConfigurationUpToDate(targetRevision, appliedRevision)) {
- catchUpFuture.complete(null);
- }
- }
- });
- }
-
- /** {@inheritDoc} */
- @Override public CompletableFuture<?> onUpdate(long appliedRevision) {
- long targetRev = targetRevision;
-
- if (targetRev >= 0) {
- if (isConfigurationUpToDate(targetRev, appliedRevision)) {
- return checkRevisionUpToDate(appliedRevision);
- }
- } else {
- return checkRevisionUpToDate(appliedRevision);
- }
-
- return CompletableFuture.completedFuture(null);
- }
-}
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/recovery/RecoveryCompletionFutureFactory.java
b/modules/runner/src/main/java/org/apache/ignite/internal/recovery/RecoveryCompletionFutureFactory.java
deleted file mode 100644
index 32cc5ac7f7..0000000000
---
a/modules/runner/src/main/java/org/apache/ignite/internal/recovery/RecoveryCompletionFutureFactory.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.recovery;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Function;
-import org.apache.ignite.internal.configuration.ConfigurationManager;
-import
org.apache.ignite.internal.configuration.notifications.ConfigurationStorageRevisionListener;
-
-/**
- * Creates a future that completes when local recovery is finished.
- */
-public class RecoveryCompletionFutureFactory {
- /**
- * Create recovery completion future.
- *
- * @param clusterCfgMgr Cluster configuration manager.
- * @param listenerProvider Provider of configuration listener.
- * @return Recovery completion future.
- */
- public static CompletableFuture<Void> create(
- ConfigurationManager clusterCfgMgr,
- Function<CompletableFuture<Void>,
ConfigurationStorageRevisionListener> listenerProvider
- ) {
- CompletableFuture<Void> configCatchUpFuture = new
CompletableFuture<>();
-
- ConfigurationStorageRevisionListener listener =
listenerProvider.apply(configCatchUpFuture);
-
- CompletableFuture<Void> recoveryCompletionFuture =
- configCatchUpFuture.thenRun(() ->
clusterCfgMgr.configurationRegistry().stopListenUpdateStorageRevision(listener));
-
-
clusterCfgMgr.configurationRegistry().listenUpdateStorageRevision(listener);
-
- return recoveryCompletionFuture;
- }
-}
diff --git
a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationCatchUpTest.java
b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationCatchUpTest.java
deleted file mode 100644
index 4e47a634e6..0000000000
---
a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationCatchUpTest.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.configuration.storage;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.concurrent.CompletableFuture.completedFuture;
-import static
org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
-import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
-import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyCollection;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Consumer;
-import org.apache.ignite.configuration.RootKey;
-import org.apache.ignite.configuration.annotation.ConfigurationRoot;
-import org.apache.ignite.configuration.annotation.Value;
-import org.apache.ignite.internal.configuration.ConfigurationTreeGenerator;
-import org.apache.ignite.internal.configuration.TestConfigurationChanger;
-import org.apache.ignite.internal.configuration.tree.ConfigurationSource;
-import org.apache.ignite.internal.configuration.tree.ConstructableTreeNode;
-import
org.apache.ignite.internal.configuration.validation.ConfigurationValidatorImpl;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.metastorage.EntryEvent;
-import org.apache.ignite.internal.metastorage.MetaStorageManager;
-import org.apache.ignite.internal.metastorage.WatchEvent;
-import org.apache.ignite.internal.metastorage.WatchListener;
-import org.apache.ignite.internal.metastorage.dsl.Operation;
-import org.apache.ignite.internal.metastorage.impl.EntryImpl;
-import org.apache.ignite.internal.vault.VaultManager;
-import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
-import org.apache.ignite.lang.ByteArray;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-/**
- * Tests for the {@link DistributedConfigurationStorage}.
- */
-public class DistributedConfigurationCatchUpTest {
- private final VaultManager vaultManager = new VaultManager(new
InMemoryVaultService());
-
- /**
- * Before each.
- */
- @BeforeEach
- void start() {
- vaultManager.start();
- }
-
- /**
- * After each.
- */
- @AfterEach
- void stop() throws Exception {
- vaultManager.stop();
- }
-
- /**
- * Dummy configuration.
- */
- @ConfigurationRoot(rootName = "someKey", type = DISTRIBUTED)
- public static class DistributedTestConfigurationSchema {
- @Value(hasDefault = true)
- public final int foobar = 0;
- }
-
- /**
- * Tests that distributed configuration storage correctly picks up latest
configuration MetaStorage revision during recovery process.
- *
- * @throws Exception If failed.
- */
- @Test
- public void testMetaStorageRevisionDifferentFromConfigurationOnRestart()
throws Exception {
- RootKey<DistributedTestConfiguration, DistributedTestView> rootKey =
DistributedTestConfiguration.KEY;
-
- ConfigurationTreeGenerator generator = new
ConfigurationTreeGenerator(rootKey);
-
- MetaStorageMockWrapper wrapper = new MetaStorageMockWrapper();
-
- DistributedConfigurationStorage storage = storage(wrapper);
-
- try {
- var validator = new ConfigurationValidatorImpl(generator,
Set.of());
- var changer = new TestConfigurationChanger(List.of(rootKey),
storage, generator, validator);
-
- try {
- changer.start();
-
- ConfigurationSource source = source(
- rootKey,
- (DistributedTestChange change) ->
change.changeFoobar(1)
- );
-
- CompletableFuture<Void> change = changer.change(source);
-
- assertThat(change, willCompleteSuccessfully());
- } finally {
- changer.stop();
- }
- } finally {
- storage.close();
- }
-
- // Put a value to the configuration, so we start on non-empty vault.
- vaultManager.put(MetaStorageMockWrapper.TEST_KEY, new byte[]{4, 1, 2,
3, 4}).get();
-
- // This emulates a change in MetaStorage that is not related to the
configuration.
- when(wrapper.mock.appliedRevision()).thenReturn(2L);
-
- storage = storage(wrapper);
-
- try {
-
- var configurationValidator = new
ConfigurationValidatorImpl(generator, Set.of());
- var changer = new TestConfigurationChanger(List.of(rootKey),
storage, generator, configurationValidator);
-
- try {
- changer.start();
-
- // Should return last configuration change, not last
MetaStorage change.
- assertThat(storage.lastRevision(), willBe(1L));
- } finally {
- changer.stop();
- }
- } finally {
- storage.close();
- }
- }
-
- private DistributedConfigurationStorage storage(MetaStorageMockWrapper
wrapper) {
- return new
DistributedConfigurationStorage(wrapper.metaStorageManager(), vaultManager);
- }
-
- /**
- * This class stores data for {@link MetaStorageManager}'s mock.
- */
- private static class MetaStorageMockWrapper {
- private static final String DISTRIBUTED_PREFIX = "dst-cfg.";
-
- /**
- * This and previous field are copy-pasted intentionally, so in case
if something changes, this test should fail and be reviewed and
- * re-written.
- */
- private static final ByteArray MASTER_KEY = new
ByteArray(DISTRIBUTED_PREFIX + "$master$key");
-
- private static final ByteArray TEST_KEY = new
ByteArray(DISTRIBUTED_PREFIX + "someKey.foobar");
-
- /** MetaStorage mock. */
- private final MetaStorageManager mock;
-
- /** Captured MetaStorage listener. */
- private WatchListener lsnr;
-
- /** Current master key revision. */
- private final AtomicLong masterKeyRevision = new AtomicLong();
-
- private MetaStorageMockWrapper() {
- mock = mock(MetaStorageManager.class);
-
- setup();
- }
-
- private void setup() {
- // Returns current master key revision.
- when(mock.get(MASTER_KEY)).then(invocation -> {
- return completedFuture(new EntryImpl(MASTER_KEY.bytes(), null,
masterKeyRevision.get(), -1));
- });
-
- // On any invocation - trigger storage listener.
- when(mock.invoke(any(), anyCollection(), any()))
- .then(invocation -> triggerStorageListener());
-
- when(mock.invoke(any(), any(Operation.class), any()))
- .then(invocation -> triggerStorageListener());
-
- // This captures the listener.
- doAnswer(invocation -> {
- lsnr = invocation.getArgument(1);
-
- return null;
- }).when(mock).registerPrefixWatch(any(), any());
- }
-
- /**
- * Triggers MetaStorage listener incrementing master key revision.
- */
- private CompletableFuture<Boolean> triggerStorageListener() {
- return CompletableFuture.supplyAsync(() -> {
- long newRevision = masterKeyRevision.incrementAndGet();
-
- lsnr.onUpdate(new WatchEvent(List.of(
- new EntryEvent(null, new EntryImpl(MASTER_KEY.bytes(),
null, newRevision, -1)),
- // Add a mock entry to simulate a configuration update.
- new EntryEvent(null, new EntryImpl((DISTRIBUTED_PREFIX
+ "foobar").getBytes(UTF_8), null, newRevision, -1))
- ), newRevision, HybridTimestamp.MAX_VALUE));
-
- return true;
- });
- }
-
- private MetaStorageManager metaStorageManager() {
- return mock;
- }
- }
-
- private static <CHANGET> ConfigurationSource source(RootKey<?, ? super
CHANGET> rootKey, Consumer<CHANGET> changer) {
- return new ConfigurationSource() {
- @Override
- public void descend(ConstructableTreeNode node) {
- ConfigurationSource changerSrc = new ConfigurationSource() {
- @Override
- public void descend(ConstructableTreeNode node) {
- changer.accept((CHANGET) node);
- }
- };
-
- node.construct(rootKey.key(), changerSrc, true);
- }
- };
- }
-}
diff --git
a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorageTest.java
b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorageTest.java
index 17da06caf6..36d40052d8 100644
---
a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorageTest.java
+++
b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorageTest.java
@@ -34,8 +34,6 @@ import
org.apache.ignite.internal.metastorage.server.ExistenceCondition;
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import org.apache.ignite.internal.metastorage.server.RevisionCondition;
import
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
-import org.apache.ignite.internal.vault.VaultManager;
-import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
import org.apache.ignite.lang.ByteArray;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -44,8 +42,6 @@ import org.junit.jupiter.api.BeforeEach;
* Tests for the {@link DistributedConfigurationStorage}.
*/
public class DistributedConfigurationStorageTest extends
ConfigurationStorageTest {
- private final VaultManager vaultManager = new VaultManager(new
InMemoryVaultService());
-
private final KeyValueStorage metaStorage = new
SimpleInMemoryKeyValueStorage("test");
private final MetaStorageManager metaStorageManager =
mockMetaStorageManager();
@@ -55,7 +51,6 @@ public class DistributedConfigurationStorageTest extends
ConfigurationStorageTes
*/
@BeforeEach
void start() {
- vaultManager.start();
metaStorage.start();
metaStorageManager.start();
}
@@ -67,13 +62,12 @@ public class DistributedConfigurationStorageTest extends
ConfigurationStorageTes
void stop() throws Exception {
metaStorageManager.stop();
metaStorage.close();
- vaultManager.stop();
}
/** {@inheritDoc} */
@Override
public ConfigurationStorage getStorage() {
- return new DistributedConfigurationStorage(metaStorageManager,
vaultManager);
+ return new DistributedConfigurationStorage(metaStorageManager);
}
/**
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 23ba896aa2..b534b2e831 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -633,6 +633,8 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
}
});
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-19506
Probably should be reworked so that
+ // the future is returned along with createTableFut. Right now it
will break some tests.
writeTableAssignmentsToMetastore(tableId, assignments);
return createTableFut;
@@ -641,7 +643,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
}
}
- private void writeTableAssignmentsToMetastore(int tableId,
List<Set<Assignment>> assignments) {
+ private CompletableFuture<Boolean> writeTableAssignmentsToMetastore(int
tableId, List<Set<Assignment>> assignments) {
assert !assignments.isEmpty();
List<Operation> partitionAssignments = new
ArrayList<>(assignments.size());
@@ -655,7 +657,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
Condition condition = Conditions.notExists(new
ByteArray(partitionAssignments.get(0).key()));
- metaStorageMgr
+ return metaStorageMgr
.invoke(condition, partitionAssignments,
Collections.emptyList())
.exceptionally(e -> {
LOG.error("Couldn't write assignments to metastore", e);
@@ -1265,7 +1267,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
});
}));
- createTablePartitionsLocally(causalityToken, assignments,
zoneDescriptor.id(), table);
+ CompletableFuture<?> createPartsFut =
createTablePartitionsLocally(causalityToken, assignments, zoneDescriptor.id(),
table);
pendingTables.put(tableId, table);
startedTables.put(tableId, table);
@@ -1279,7 +1281,9 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
// TODO should be reworked in IGNITE-16763
// We use the event notification future as the result so that
dependent components can complete the schema updates.
- return fireEvent(TableEvent.CREATE, new
TableEventParameters(causalityToken, tableId));
+
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-19913 Possible
performance degradation.
+ return allOf(createPartsFut, fireEvent(TableEvent.CREATE, new
TableEventParameters(causalityToken, tableId)));
}
/**