This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 7de32e8c60 IGNITE-23292 Local startup of metastorage compaction on
node startup after recovery (#4663)
7de32e8c60 is described below
commit 7de32e8c60cf2bb31e3d950ec7b0c195e29a1b5c
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Tue Nov 5 09:08:54 2024 +0300
IGNITE-23292 Local startup of metastorage compaction on node startup after
recovery (#4663)
---
.../internal/catalog/storage/UpdateLogImpl.java | 9 +-
.../catalog/storage/UpdateLogImplTest.java | 3 +-
.../distributionzones/DistributionZoneManager.java | 5 +-
.../rebalance/DistributionZoneRebalanceEngine.java | 15 ++-
.../DistributionZoneRebalanceEngineV2.java | 5 +-
.../DistributionZoneRebalanceEngineTest.java | 3 +-
.../internal/index/IndexBuildingManager.java | 5 +-
.../IndexAvailabilityControllerRestorerTest.java | 7 +-
.../index/IndexAvailabilityControllerTest.java | 2 +-
.../internal/index/TestIndexManagementUtils.java | 9 +-
.../internal/metastorage/MetaStorageManager.java | 4 +-
.../ignite/internal/metastorage/Revisions.java | 57 +++++++++
modules/metastorage/build.gradle | 2 +
.../metastorage/TestMetasStorageUtils.java | 67 +++++++++++
.../ItMetaStorageCompactionTriggerOneNodeTest.java | 128 +++++++++++++++++++++
.../impl/ItMetaStorageCompactionTriggerTest.java | 61 ++--------
...ommand.java => GetCurrentRevisionsCommand.java} | 11 +-
.../command/MetastorageCommandsMessageGroup.java | 4 +-
.../command/response/RevisionsInfo.java | 71 ++++++++++++
.../impl/MetaStorageCompactionTrigger.java | 48 +++++++-
.../metastorage/impl/MetaStorageManagerImpl.java | 116 +++++++------------
.../metastorage/impl/MetaStorageService.java | 7 +-
.../metastorage/impl/MetaStorageServiceImpl.java | 7 +-
.../impl/RecoveryRevisionsListenerImpl.java | 101 ++++++++++++++++
.../server/AbstractKeyValueStorage.java | 89 ++++++++++++--
.../metastorage/server/KeyValueStorage.java | 32 +++---
.../NotifyWatchProcessorEvent.java} | 20 +++-
.../RecoveryRevisionsListener.java} | 18 ++-
.../server/UpdateCompactionRevisionEvent.java | 51 ++++++++
.../metastorage/server/UpdateEntriesEvent.java | 54 +++++++++
.../server/persistence/RocksDbKeyValueStorage.java | 112 +++++++-----------
.../server/raft/MetaStorageListener.java | 10 +-
.../MetaStorageDeployWatchesCorrectnessTest.java | 6 +-
.../impl/MetaStorageManagerRecoveryTest.java | 9 +-
.../AbstractCompactionKeyValueStorageTest.java | 23 +++-
.../server/SimpleInMemoryKeyValueStorage.java | 60 +++++-----
.../PartitionReplicaLifecycleManager.java | 5 +-
.../internal/placementdriver/ActiveActorTest.java | 3 +-
.../placementdriver/AssignmentsTracker.java | 4 +-
.../placementdriver/PlacementDriverManager.java | 5 +-
.../internal/placementdriver/LeaseUpdaterTest.java | 3 +-
.../placementdriver/PlacementDriverTest.java | 5 +-
.../runner/app/ItIgniteNodeRestartTest.java | 2 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 3 +-
.../storage/DistributedConfigurationStorage.java | 4 +-
.../ignite/internal/BaseIgniteRestartTest.java | 7 +-
.../ignite/internal/schema/SchemaManagerTest.java | 4 +-
.../internal/table/distributed/TableManager.java | 5 +-
.../table/distributed/index/IndexMetaStorage.java | 5 +-
.../table/distributed/TableManagerTest.java | 5 +-
50 files changed, 960 insertions(+), 331 deletions(-)
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
index 3e083a5a64..681c08686b 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
@@ -49,6 +49,7 @@ import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.EntryEvent;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.Revisions;
import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.metastorage.dsl.Condition;
@@ -113,7 +114,7 @@ public class UpdateLogImpl implements UpdateLog {
);
}
- recoveryStateFromMetastore(handler);
+ recoverStateFromMetastore(handler);
UpdateListener listener = new UpdateListener(handler, marshaller);
this.listener = listener;
@@ -236,12 +237,12 @@ public class UpdateLogImpl implements UpdateLog {
}
}
- private void recoveryStateFromMetastore(OnUpdateHandler handler) {
- CompletableFuture<Long> recoveryFinishedFuture =
metastore.recoveryFinishedFuture();
+ private void recoverStateFromMetastore(OnUpdateHandler handler) {
+ CompletableFuture<Revisions> recoveryFinishedFuture =
metastore.recoveryFinishedFuture();
assert recoveryFinishedFuture.isDone();
- long recoveryRevision = recoveryFinishedFuture.join();
+ long recoveryRevision = recoveryFinishedFuture.join().revision();
Entry earliestVersion =
metastore.getLocally(CatalogKey.snapshotVersion(), recoveryRevision);
diff --git
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
index 7c111cda1f..b6d7998a88 100644
---
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
+++
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
@@ -51,6 +51,7 @@ import
org.apache.ignite.internal.catalog.storage.serialization.UpdateLogMarshal
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.Revisions;
import
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import
org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker;
@@ -231,7 +232,7 @@ class UpdateLogImplTest extends BaseIgniteAbstractTest {
metastore = StandaloneMetaStorageManager.create(keyValueStorage,
readOperationForCompactionTracker);
assertThat(metastore.startAsync(componentContext),
willCompleteSuccessfully());
- assertThat(metastore.recoveryFinishedFuture(),
willBe(recoverRevision));
+
assertThat(metastore.recoveryFinishedFuture().thenApply(Revisions::revision),
willBe(recoverRevision));
}
@Test
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index bf285938f1..e7f4693a76 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -114,6 +114,7 @@ import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.EntryEvent;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.Revisions;
import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.metastorage.dsl.CompoundCondition;
@@ -271,12 +272,12 @@ public class DistributionZoneManager implements
IgniteComponent {
metaStorageManager.registerPrefixWatch(zonesLogicalTopologyPrefix(),
topologyWatchListener);
- CompletableFuture<Long> recoveryFinishFuture =
metaStorageManager.recoveryFinishedFuture();
+ CompletableFuture<Revisions> recoveryFinishFuture =
metaStorageManager.recoveryFinishedFuture();
// At the moment of the start of this manager, it is guaranteed
that Meta Storage has been recovered.
assert recoveryFinishFuture.isDone();
- long recoveryRevision = recoveryFinishFuture.join();
+ long recoveryRevision = recoveryFinishFuture.join().revision();
restoreGlobalStateFromLocalMetastorage(recoveryRevision);
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
index 07405ac3f8..777afd919b 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
@@ -48,11 +48,13 @@ import
org.apache.ignite.internal.distributionzones.utils.CatalogAlterZoneEventL
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.Revisions;
import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.TestOnly;
/**
* Zone rebalance manager.
@@ -89,6 +91,11 @@ public class DistributionZoneRebalanceEngine {
// TODO IGNITE-22115 remove it
public static final boolean ENABLED = getBoolean(FEATURE_FLAG_NAME, false);
+ /** Special flag to skip rebalance on node recovery for tests. */
+ // TODO: IGNITE-23466 Remove it
+ @TestOnly
+ public static final String SKIP_REBALANCE_TRIGGERS_RECOVERY =
"IGNITE_SKIP_REBALANCE_TRIGGERS_RECOVERY";
+
/**
* Constructor.
*
@@ -132,12 +139,16 @@ public class DistributionZoneRebalanceEngine {
// TODO: IGNITE-18694 - Recovery for the case when zones watch
listener processed event but assignments were not updated.
metaStorageManager.registerPrefixWatch(zoneDataNodesKey(),
dataNodesListener);
- CompletableFuture<Long> recoveryFinishFuture =
metaStorageManager.recoveryFinishedFuture();
+ CompletableFuture<Revisions> recoveryFinishFuture =
metaStorageManager.recoveryFinishedFuture();
// At the moment of the start of this manager, it is guaranteed
that Meta Storage has been recovered.
assert recoveryFinishFuture.isDone();
- long recoveryRevision = recoveryFinishFuture.join();
+ long recoveryRevision = recoveryFinishFuture.join().revision();
+
+ if (getBoolean(SKIP_REBALANCE_TRIGGERS_RECOVERY, false)) {
+ return nullCompletedFuture();
+ }
if (ENABLED) {
return rebalanceTriggersRecovery(recoveryRevision,
catalogVersion)
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineV2.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineV2.java
index 1fcd523022..26b7e14f9f 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineV2.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineV2.java
@@ -43,6 +43,7 @@ import
org.apache.ignite.internal.distributionzones.utils.CatalogAlterZoneEventL
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.Revisions;
import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
@@ -113,12 +114,12 @@ public class DistributionZoneRebalanceEngineV2 {
// TODO: IGNITE-18694 - Recovery for the case when zones watch
listener processed event but assignments were not updated.
metaStorageManager.registerPrefixWatch(zoneDataNodesKey(),
dataNodesListener);
- CompletableFuture<Long> recoveryFinishFuture =
metaStorageManager.recoveryFinishedFuture();
+ CompletableFuture<Revisions> recoveryFinishFuture =
metaStorageManager.recoveryFinishedFuture();
// At the moment of the start of this manager, it is guaranteed
that Meta Storage has been recovered.
assert recoveryFinishFuture.isDone();
- long recoveryRevision = recoveryFinishFuture.join();
+ long recoveryRevision = recoveryFinishFuture.join().revision();
return rebalanceTriggersRecovery(recoveryRevision);
});
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
index a1653e7927..02b788ac3a 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
@@ -80,6 +80,7 @@ import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.EntryEvent;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.Revisions;
import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.WatchListener;
import
org.apache.ignite.internal.metastorage.command.MetaStorageCommandsFactory;
@@ -173,7 +174,7 @@ public class DistributionZoneRebalanceEngineTest extends
IgniteAbstractTest {
return null;
}).when(metaStorageManager).registerPrefixWatch(any(), any());
-
when(metaStorageManager.recoveryFinishedFuture()).thenReturn(completedFuture(1L));
+
when(metaStorageManager.recoveryFinishedFuture()).thenReturn(completedFuture(new
Revisions(1, -1)));
AtomicLong raftIndex = new AtomicLong();
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildingManager.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildingManager.java
index 8c76986d75..a998694e95 100644
---
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildingManager.java
+++
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildingManager.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.Revisions;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.replicator.ReplicaService;
@@ -134,11 +135,11 @@ public class IndexBuildingManager implements
IgniteComponent {
@Override
public CompletableFuture<Void> startAsync(ComponentContext
componentContext) {
return inBusyLockAsync(busyLock, () -> {
- CompletableFuture<Long> recoveryFinishedFuture =
metaStorageManager.recoveryFinishedFuture();
+ CompletableFuture<Revisions> recoveryFinishedFuture =
metaStorageManager.recoveryFinishedFuture();
assert recoveryFinishedFuture.isDone();
- long recoveryRevision = recoveryFinishedFuture.join();
+ long recoveryRevision = recoveryFinishedFuture.join().revision();
indexAvailabilityController.start(recoveryRevision);
diff --git
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerRestorerTest.java
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerRestorerTest.java
index a99a3c48fe..6bd3231b5c 100644
---
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerRestorerTest.java
+++
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerRestorerTest.java
@@ -54,6 +54,7 @@ import org.apache.ignite.internal.failure.NoOpFailureManager;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.metastorage.Revisions;
import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
import
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
@@ -232,11 +233,11 @@ public class IndexAvailabilityControllerRestorerTest
extends BaseIgniteAbstractT
controller = new IndexAvailabilityController(catalogManager,
metaStorageManager, mock(IndexBuilder.class));
- CompletableFuture<Long> metastoreRecoveryFuture =
metaStorageManager.recoveryFinishedFuture();
+ CompletableFuture<Revisions> metastoreRecoveryFuture =
metaStorageManager.recoveryFinishedFuture();
- assertThat(metastoreRecoveryFuture, willBe(greaterThan(0L)));
+ assertThat(metastoreRecoveryFuture.thenApply(Revisions::revision),
willBe(greaterThan(0L)));
- controller.start(metastoreRecoveryFuture.join());
+ controller.start(metastoreRecoveryFuture.join().revision());
}
private void setLocalNodeToClusterService(ClusterNode clusterNode) {
diff --git
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
index c1ddbc2fd3..49933fd47b 100644
---
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
+++
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
@@ -102,7 +102,7 @@ public class IndexAvailabilityControllerTest extends
BaseIgniteAbstractTest {
assertThat(metaStorageManager.recoveryFinishedFuture(),
willCompleteSuccessfully());
-
indexAvailabilityController.start(metaStorageManager.recoveryFinishedFuture().join());
+
indexAvailabilityController.start(metaStorageManager.recoveryFinishedFuture().join().revision());
assertThat(metaStorageManager.deployWatches(),
willCompleteSuccessfully());
diff --git
a/modules/index/src/test/java/org/apache/ignite/internal/index/TestIndexManagementUtils.java
b/modules/index/src/test/java/org/apache/ignite/internal/index/TestIndexManagementUtils.java
index cd5a7a10bf..dc1db53e93 100644
---
a/modules/index/src/test/java/org/apache/ignite/internal/index/TestIndexManagementUtils.java
+++
b/modules/index/src/test/java/org/apache/ignite/internal/index/TestIndexManagementUtils.java
@@ -43,6 +43,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.command.response.RevisionsInfo;
import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
import org.apache.ignite.internal.metastorage.impl.MetaStorageService;
import org.apache.ignite.internal.network.ClusterNodeImpl;
@@ -110,12 +111,12 @@ class TestIndexManagementUtils {
static void
awaitTillGlobalMetastoreRevisionIsApplied(MetaStorageManagerImpl
metaStorageManager) throws Exception {
assertTrue(
waitForCondition(() -> {
- CompletableFuture<Long> currentRevisionFuture =
metaStorageManager.metaStorageService()
- .thenCompose(MetaStorageService::currentRevision);
+ CompletableFuture<RevisionsInfo> currentRevisionsFuture =
metaStorageManager.metaStorageService()
+ .thenCompose(MetaStorageService::currentRevisions);
- assertThat(currentRevisionFuture,
willCompleteSuccessfully());
+ assertThat(currentRevisionsFuture,
willCompleteSuccessfully());
- return currentRevisionFuture.join() ==
metaStorageManager.appliedRevision();
+ return currentRevisionsFuture.join().revision() ==
metaStorageManager.appliedRevision();
}, 1_000)
);
}
diff --git
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index dd7ba5efa5..ddabb27d85 100644
---
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -414,9 +414,9 @@ public interface MetaStorageManager extends IgniteComponent
{
/**
* Returns a future which completes when MetaStorage manager finished
local recovery.
- * The value of the future is the revision which must be used for state
recovery by other components.
+ * The value of the future is the revisions which must be used for state
recovery by other components.
*/
- CompletableFuture<Long> recoveryFinishedFuture();
+ CompletableFuture<Revisions> recoveryFinishedFuture();
/** Registers a Meta Storage revision update listener. */
void registerRevisionUpdateListener(RevisionUpdateListener listener);
diff --git
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/Revisions.java
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/Revisions.java
new file mode 100644
index 0000000000..595acbf89f
--- /dev/null
+++
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/Revisions.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage;
+
+import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
+import org.apache.ignite.internal.tostring.S;
+
+/** Information about metastorage revisions. */
+public class Revisions {
+ private final long revision;
+
+ private final long compactionRevision;
+
+ /**
+ * Constructor.
+ *
+ * @param revision Metastorage revision.
+ * @param compactionRevision Metastorage compaction revision.
+ */
+ public Revisions(long revision, long compactionRevision) {
+ this.revision = revision;
+ this.compactionRevision = compactionRevision;
+ }
+
+ /** Returns metastorage revision. */
+ public long revision() {
+ return revision;
+ }
+
+ /**
+ * Returns metastorage compaction revision of the up to which (inclusive)
key versions will be deleted and when trying to read them,
+ * {@link CompactedException} will occur.
+ */
+ public long compactionRevision() {
+ return compactionRevision;
+ }
+
+ @Override
+ public String toString() {
+ return S.toString(this);
+ }
+}
diff --git a/modules/metastorage/build.gradle b/modules/metastorage/build.gradle
index d0c167f8b0..a6da29290f 100644
--- a/modules/metastorage/build.gradle
+++ b/modules/metastorage/build.gradle
@@ -68,6 +68,8 @@ dependencies {
integrationTestImplementation project(':ignite-runner')
integrationTestImplementation project(':ignite-system-disaster-recovery')
integrationTestImplementation project(':ignite-configuration-system')
+ integrationTestImplementation project(':ignite-configuration-root')
+ integrationTestImplementation project(':ignite-distribution-zones')
integrationTestImplementation testFixtures(project(':ignite-core'))
integrationTestImplementation testFixtures(project(':ignite-network'))
integrationTestImplementation testFixtures(project(':ignite-raft'))
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/TestMetasStorageUtils.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/TestMetasStorageUtils.java
index bb55ddc9d5..67deee8707 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/TestMetasStorageUtils.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/TestMetasStorageUtils.java
@@ -17,12 +17,24 @@
package org.apache.ignite.internal.metastorage;
+import static
org.apache.ignite.internal.metastorage.impl.MetaStorageCompactionTriggerConfiguration.DATA_AVAILABILITY_TIME_SYSTEM_PROPERTY_NAME;
+import static
org.apache.ignite.internal.metastorage.impl.MetaStorageCompactionTriggerConfiguration.INTERVAL_SYSTEM_PROPERTY_NAME;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.TestWrappers;
+import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
import org.jetbrains.annotations.Nullable;
/** Helper class for use in integration tests that may contain useful methods
and constants. */
@@ -30,6 +42,15 @@ public class TestMetasStorageUtils {
/** Special value representing any random timestamp. */
public static final HybridTimestamp ANY_TIMESTAMP = new
HybridTimestamp(1L, 0);
+ /** Foo key. */
+ public static final ByteArray FOO_KEY = ByteArray.fromString("foo_key");
+
+ /** Bar key. */
+ public static final ByteArray BAR_KEY = ByteArray.fromString("bar_key");
+
+ /** Random value. */
+ public static final byte[] VALUE = ByteArray.fromString("value").bytes();
+
/** Checks the metastore entry. */
public static void checkEntry(Entry actEntry, byte[] expKey, byte
@Nullable [] expValue, long expRevision) {
assertEquals(expRevision, actEntry.revision(), () -> "entry=" +
actEntry);
@@ -55,4 +76,50 @@ public class TestMetasStorageUtils {
return Arrays.equals(act.value(), exp.value());
}
+
+ /** Creates a cluster configuration with metastorage compaction
properties. */
+ public static String createClusterConfigWithCompactionProperties(long
interval, long dataAvailabilityTime) {
+ return String.format(
+ "ignite.system.properties: {"
+ + "%s.propertyValue= \"%s\", "
+ + "%s.propertyValue= \"%s\""
+ + "}",
+ INTERVAL_SYSTEM_PROPERTY_NAME, interval,
DATA_AVAILABILITY_TIME_SYSTEM_PROPERTY_NAME, dataAvailabilityTime
+ );
+ }
+
+ /** Returns the latest revision for the key from the leader. */
+ public static long latestKeyRevision(MetaStorageManager
metaStorageManager, ByteArray key) {
+ CompletableFuture<Entry> latestEntryFuture =
metaStorageManager.get(key);
+ assertThat(latestEntryFuture.thenApply(Entry::empty), willBe(false));
+
+ return latestEntryFuture.join().revision();
+ }
+
+ /** Returns {@code true} if the metastorage key has only one revision in
the cluster. */
+ public static boolean allNodesContainSingleRevisionForKeyLocally(Cluster
cluster, ByteArray key, long revision) {
+ return cluster.runningNodes()
+ .map(TestWrappers::unwrapIgniteImpl)
+ .map(IgniteImpl::metaStorageManager)
+ .map(metaStorageManager ->
collectRevisionsLocally(metaStorageManager, key))
+ .allMatch(keyRevisions -> keyRevisions.size() == 1 &&
keyRevisions.contains(revision));
+ }
+
+ private static Set<Long> collectRevisionsLocally(MetaStorageManager
metaStorageManager, ByteArray key) {
+ var res = new HashSet<Long>();
+
+ for (int i = 0; i <= metaStorageManager.appliedRevision(); i++) {
+ try {
+ Entry entry = metaStorageManager.getLocally(key, i);
+
+ if (!entry.empty()) {
+ res.add(entry.revision());
+ }
+ } catch (CompactedException ignore) {
+ // Do nothing.
+ }
+ }
+
+ return res;
+ }
}
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageCompactionTriggerOneNodeTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageCompactionTriggerOneNodeTest.java
new file mode 100644
index 0000000000..02dce5c715
--- /dev/null
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageCompactionTriggerOneNodeTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.impl;
+
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.BAR_KEY;
+import static
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.FOO_KEY;
+import static
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.VALUE;
+import static
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.allNodesContainSingleRevisionForKeyLocally;
+import static
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.createClusterConfigWithCompactionProperties;
+import static
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.latestKeyRevision;
+import static
org.apache.ignite.internal.metastorage.impl.MetaStorageCompactionTriggerConfiguration.DATA_AVAILABILITY_TIME_SYSTEM_PROPERTY_NAME;
+import static
org.apache.ignite.internal.metastorage.impl.MetaStorageCompactionTriggerConfiguration.INTERVAL_SYSTEM_PROPERTY_NAME;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.InitParametersBuilder;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import
org.apache.ignite.internal.configuration.SystemDistributedExtensionConfiguration;
+import
org.apache.ignite.internal.distributionzones.rebalance.DistributionZoneRebalanceEngine;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.command.CompactionCommand;
+import org.apache.ignite.internal.testframework.WithSystemProperty;
+import org.apache.ignite.raft.jraft.rpc.WriteActionRequest;
+import org.junit.jupiter.api.Test;
+
+/** For {@link MetaStorageCompactionTrigger} testing for single node case. */
+@WithSystemProperty(key =
DistributionZoneRebalanceEngine.SKIP_REBALANCE_TRIGGERS_RECOVERY, value =
"true")
+public class ItMetaStorageCompactionTriggerOneNodeTest extends
ClusterPerTestIntegrationTest {
+ @Override
+ protected int initialNodes() {
+ return 1;
+ }
+
+ @Override
+ protected void customizeInitParameters(InitParametersBuilder builder) {
+ super.customizeInitParameters(builder);
+
+
builder.clusterConfiguration(createClusterConfigWithCompactionProperties(10,
10));
+ }
+
+ @Test
+ void testCompactionAfterRestartNode() throws Exception {
+ IgniteImpl node = aliveNode();
+
+ MetaStorageManager metaStorageManager = node.metaStorageManager();
+
+ assertThat(metaStorageManager.put(FOO_KEY, VALUE),
willCompleteSuccessfully());
+ assertThat(metaStorageManager.put(BAR_KEY, VALUE),
willCompleteSuccessfully());
+
+ // Let's wait until the compaction on revision of FOO_KEY creation
happens.
+ long fooRevision = latestKeyRevision(metaStorageManager, FOO_KEY);
+ assertTrue(waitForCondition(() ->
metaStorageManager.getCompactionRevisionLocally() >= fooRevision, 10, 1_000));
+
+ log.info("Latest revision for key: [key={}, revision={}]", FOO_KEY,
fooRevision);
+
+ // Let's cancel new compactions to create a new version for the key
and not compact it until we restart the node.
+ startDroppingCompactionCommand(node);
+ assertThat(metaStorageManager.put(FOO_KEY, VALUE),
willCompleteSuccessfully());
+
+ long latestFooRevision = latestKeyRevision(metaStorageManager,
FOO_KEY);
+
+ long latestCompactionRevision =
metaStorageManager.getCompactionRevisionLocally();
+ // Let's change the properties before restarting so that a new
scheduled compaction does not start after the node starts.
+ changeCompactionProperties(node, Long.MAX_VALUE, Long.MAX_VALUE);
+
+ IgniteImpl restartedNode = restartNode();
+
+ MetaStorageManager restartedMetaStorageManager =
restartedNode.metaStorageManager();
+
+ // Let's make sure that after the restart the correct revision of the
compaction is restored and the compaction itself will be at
+ // the latest compaction revision.
+ assertEquals(latestCompactionRevision,
restartedMetaStorageManager.getCompactionRevisionLocally());
+ assertTrue(waitForCondition(() ->
allNodesContainSingleRevisionForKeyLocally(cluster, FOO_KEY,
latestFooRevision), 10, 1_000));
+ }
+
+ private IgniteImpl aliveNode() {
+ return unwrapIgniteImpl(node(0));
+ }
+
+ private IgniteImpl restartNode() {
+ return unwrapIgniteImpl(restartNode(0));
+ }
+
+ private static void startDroppingCompactionCommand(IgniteImpl node) {
+ node.dropMessages((s, message) -> message instanceof WriteActionRequest
+ && ((WriteActionRequest) message).deserializedCommand()
instanceof CompactionCommand);
+ }
+
+ private static void changeCompactionProperties(IgniteImpl node, long
interval, long dataAvailabilityTime) {
+ CompletableFuture<Void> changeFuture = node
+ .clusterConfiguration()
+ .getConfiguration(SystemDistributedExtensionConfiguration.KEY)
+ .system()
+ .properties()
+ .change(systemPropertyViews -> systemPropertyViews
+ .update(
+ INTERVAL_SYSTEM_PROPERTY_NAME,
+ systemPropertyChange ->
systemPropertyChange.changePropertyValue(Long.toString(interval))
+ ).update(
+ DATA_AVAILABILITY_TIME_SYSTEM_PROPERTY_NAME,
+ systemPropertyChange ->
systemPropertyChange.changePropertyValue(Long.toString(dataAvailabilityTime))
+ )
+ );
+
+ assertThat(changeFuture, willCompleteSuccessfully());
+ }
+}
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageCompactionTriggerTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageCompactionTriggerTest.java
index 4700050e23..3b4e5860c5 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageCompactionTriggerTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageCompactionTriggerTest.java
@@ -19,10 +19,12 @@ package org.apache.ignite.internal.metastorage.impl;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
-import static
org.apache.ignite.internal.metastorage.impl.MetaStorageCompactionTriggerConfiguration.DATA_AVAILABILITY_TIME_SYSTEM_PROPERTY_NAME;
-import static
org.apache.ignite.internal.metastorage.impl.MetaStorageCompactionTriggerConfiguration.INTERVAL_SYSTEM_PROPERTY_NAME;
+import static
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.FOO_KEY;
+import static
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.VALUE;
+import static
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.allNodesContainSingleRevisionForKeyLocally;
+import static
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.createClusterConfigWithCompactionProperties;
+import static
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.latestKeyRevision;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
-import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -30,20 +32,15 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
-import java.util.HashSet;
-import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import org.apache.ignite.InitParametersBuilder;
import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
-import org.apache.ignite.internal.TestWrappers;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.lang.ByteArray;
-import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.WatchListener;
-import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
import org.apache.ignite.raft.jraft.RaftGroupService;
import org.junit.jupiter.params.ParameterizedTest;
@@ -51,10 +48,6 @@ import org.junit.jupiter.params.provider.ValueSource;
/** Integration test for {@link MetaStorageCompactionTrigger}. */
public class ItMetaStorageCompactionTriggerTest extends
ClusterPerClassIntegrationTest {
- private static final ByteArray FOO_KEY = ByteArray.fromString("foo_key");
-
- private static final byte[] VALUE = ByteArray.fromString("value").bytes();
-
@Override
protected int initialNodes() {
return 2;
@@ -69,12 +62,7 @@ public class ItMetaStorageCompactionTriggerTest extends
ClusterPerClassIntegrati
@Override
protected void configureInitParameters(InitParametersBuilder builder) {
- String clusterConfig = "ignite.system.properties: {"
- + INTERVAL_SYSTEM_PROPERTY_NAME + ".propertyValue= \"10\", "
- + DATA_AVAILABILITY_TIME_SYSTEM_PROPERTY_NAME +
".propertyValue= \"10\""
- + "}";
-
- builder.clusterConfiguration(clusterConfig);
+
builder.clusterConfiguration(createClusterConfigWithCompactionProperties(10,
10));
}
@ParameterizedTest
@@ -97,21 +85,15 @@ public class ItMetaStorageCompactionTriggerTest extends
ClusterPerClassIntegrati
long latestFooEntryRevision = latestKeyRevision(metaStorageManager,
FOO_KEY);
- assertTrue(waitForCondition(() ->
allNodesContainsSingleRevisionForKeyLocally(FOO_KEY, latestFooEntryRevision),
10, 1_000));
+ assertTrue(
+ waitForCondition(() ->
allNodesContainSingleRevisionForKeyLocally(CLUSTER, FOO_KEY,
latestFooEntryRevision), 10, 1_000)
+ );
}
private static IgniteImpl aliveNode() {
return unwrapIgniteImpl(CLUSTER.aliveNode());
}
- private static boolean
allNodesContainsSingleRevisionForKeyLocally(ByteArray key, long revision) {
- return CLUSTER.runningNodes()
- .map(TestWrappers::unwrapIgniteImpl)
- .map(IgniteImpl::metaStorageManager)
- .map(metaStorageManager ->
collectRevisionsLocally(metaStorageManager, key))
- .allMatch(keyRevisions -> keyRevisions.size() == 1 &&
keyRevisions.contains(revision));
- }
-
private static void watchExact(MetaStorageManager metaStorageManager,
ByteArray key, CountDownLatch latch) {
metaStorageManager.registerExactWatch(key, new WatchListener() {
@Override
@@ -127,31 +109,6 @@ public class ItMetaStorageCompactionTriggerTest extends
ClusterPerClassIntegrati
});
}
- private static long latestKeyRevision(MetaStorageManager
metaStorageManager, ByteArray key) {
- CompletableFuture<Entry> latestEntryFuture =
metaStorageManager.get(key);
- assertThat(latestEntryFuture.thenApply(Entry::empty), willBe(false));
-
- return latestEntryFuture.join().revision();
- }
-
- private static Set<Long> collectRevisionsLocally(MetaStorageManager
metaStorageManager, ByteArray key) {
- var res = new HashSet<Long>();
-
- for (int i = 0; i <= metaStorageManager.appliedRevision(); i++) {
- try {
- Entry entry = metaStorageManager.getLocally(key, i);
-
- if (!entry.empty()) {
- res.add(entry.revision());
- }
- } catch (CompactedException ignore) {
- // Do nothing.
- }
- }
-
- return res;
- }
-
private void transferMetastorageLeadershipToAnotherNode() throws Exception
{
RaftGroupService raftGroupService =
CLUSTER.leaderServiceFor(MetastorageGroupId.INSTANCE);
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetCurrentRevisionCommand.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetCurrentRevisionsCommand.java
similarity index 73%
copy from
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetCurrentRevisionCommand.java
copy to
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetCurrentRevisionsCommand.java
index 8c72249c39..7084056c28 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetCurrentRevisionCommand.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetCurrentRevisionsCommand.java
@@ -17,10 +17,15 @@
package org.apache.ignite.internal.metastorage.command;
+import org.apache.ignite.internal.metastorage.command.response.RevisionsInfo;
+import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
import org.apache.ignite.internal.network.annotations.Transferable;
import org.apache.ignite.internal.raft.ReadCommand;
-/** Get command for MetaStorageCommandListener that retrieves current
revision. */
-@Transferable(MetastorageCommandsMessageGroup.GET_CURRENT_REVISION)
-public interface GetCurrentRevisionCommand extends ReadCommand {
+/**
+ * Get command for {@link MetaStorageListener} that retrieves the {@link
RevisionsInfo current metastorage revisions} from the
+ * leader.
+ */
+@Transferable(MetastorageCommandsMessageGroup.GET_CURRENT_REVISIONS)
+public interface GetCurrentRevisionsCommand extends ReadCommand {
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java
index 86f87e7846..ebeff9d5dc 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java
@@ -38,8 +38,8 @@ public interface MetastorageCommandsMessageGroup {
/** Message type for {@link GetAllCommand}. */
short GET_ALL = 30;
- /** Message type for {@link GetCurrentRevisionCommand}. */
- short GET_CURRENT_REVISION = 33;
+ /** Message type for {@link GetCurrentRevisionsCommand}. */
+ short GET_CURRENT_REVISIONS = 33;
/** Message type for {@link GetChecksumCommand}. */
short GET_CHECKSUM = 34;
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/response/RevisionsInfo.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/response/RevisionsInfo.java
new file mode 100644
index 0000000000..c8f4214c9b
--- /dev/null
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/response/RevisionsInfo.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.command.response;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.metastorage.Revisions;
+import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
+import org.apache.ignite.internal.tostring.S;
+
+/** Information about metastorage revisions. */
+public class RevisionsInfo implements Serializable {
+ private static final long serialVersionUID = -1479528194130161192L;
+
+ private final long revision;
+
+ private final long compactionRevision;
+
+ /**
+ * Constructor.
+ *
+ * @param revision Metastorage revision.
+ * @param compactionRevision Metastorage compaction revision.
+ */
+ public RevisionsInfo(long revision, long compactionRevision) {
+ this.revision = revision;
+ this.compactionRevision = compactionRevision;
+ }
+
+ /** Returns metastorage revision. */
+ public long revision() {
+ return revision;
+ }
+
+ /**
+ * Returns metastorage compaction revision of the up to which (inclusive)
key versions will be deleted and when trying to read them,
+ * {@link CompactedException} will occur.
+ */
+ public long compactionRevision() {
+ return compactionRevision;
+ }
+
+ /** Converts to {@link Revisions}. */
+ public Revisions toRevisions() {
+ return new Revisions(revision, compactionRevision);
+ }
+
+ @Override
+ public String toString() {
+ return S.toString(this);
+ }
+
+ /** Converts to {@link RevisionsInfo}. */
+ public static RevisionsInfo of(Revisions currentRevisions) {
+ return new RevisionsInfo(currentRevisions.revision(),
currentRevisions.compactionRevision());
+ }
+}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageCompactionTrigger.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageCompactionTrigger.java
index 1a247f9501..8731697a14 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageCompactionTrigger.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageCompactionTrigger.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.metastorage.impl;
+import static java.util.concurrent.CompletableFuture.runAsync;
import static java.util.concurrent.CompletableFuture.supplyAsync;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
@@ -41,6 +42,8 @@ import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.Revisions;
import org.apache.ignite.internal.metastorage.command.CompactionCommand;
import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
@@ -65,6 +68,14 @@ import org.jetbrains.annotations.Nullable;
* <li>Metastorage leader locally gets notification of the completion of
the local compaction for the new revision.</li>
* <li>Metastorage leader locally schedules a new start of compaction.</li>
* </ol>
+ *
+ * <p>About recovery:</p>
+ * <ul>
+ * <li>At the start of the component, we will start a local compaction for
the compaction revision from
+ * {@link MetaStorageManager#recoveryFinishedFuture}.</li>
+ * <li>{@link CompactionCommand}s that were received before the {@link
MetaStorageManager#deployWatches} will start a local compaction
+ * in the {@link MetaStorageManager#deployWatches}.</li>
+ * </ul>
*/
// TODO: IGNITE-23280 Turn on compaction
public class MetaStorageCompactionTrigger implements IgniteComponent {
@@ -140,10 +151,12 @@ public class MetaStorageCompactionTrigger implements
IgniteComponent {
lock.lock();
try {
- started = true;
-
config.init();
+ startCompactionOnRecoveryInBackground();
+
+ started = true;
+
scheduleNextCompactionBusy();
return nullCompletedFuture();
@@ -319,4 +332,35 @@ public class MetaStorageCompactionTrigger implements
IgniteComponent {
lock.unlock();
}
}
+
+ private void startCompactionOnRecoveryInBackground() {
+ CompletableFuture<Revisions> recoveryFuture =
metaStorageManager.recoveryFinishedFuture();
+
+ assert recoveryFuture.isDone();
+
+ long recoveredCompactionRevision =
recoveryFuture.join().compactionRevision();
+
+ if (recoveredCompactionRevision != -1) {
+ runAsync(() -> inBusyLockSafe(busyLock, () ->
storage.compact(recoveredCompactionRevision)), compactionExecutor)
+ .whenComplete((unused, throwable) -> {
+ if (throwable != null) {
+ Throwable cause = unwrapCause(throwable);
+
+ if (!(cause instanceof NodeStoppingException)) {
+ LOG.error(
+ "Unknown error during metastore
compaction launched on node recovery: [compactionRevision={}]",
+ cause,
+ recoveredCompactionRevision
+ );
+ }
+ } else {
+ LOG.info(
+ "Metastorage compaction launched during
node recovery has been successfully completed: "
+ + "[compactionRevision={}]",
+ recoveredCompactionRevision
+ );
+ }
+ });
+ }
+ }
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index 50880831a4..7fb0f640d6 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -41,7 +41,6 @@ import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
-import java.util.function.LongConsumer;
import java.util.function.Supplier;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.cluster.management.ClusterState;
@@ -63,6 +62,7 @@ import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.RevisionUpdateListener;
+import org.apache.ignite.internal.metastorage.Revisions;
import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.metastorage.command.CompactionCommand;
import
org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration;
@@ -139,10 +139,10 @@ public class MetaStorageManagerImpl implements
MetaStorageManager, MetastorageGr
private final AtomicBoolean isStopped = new AtomicBoolean();
/**
- * Future which completes when MetaStorage manager finished local
recovery. The value of the future is the revision which must be used
+ * Future which completes when MetaStorage manager finished local
recovery. The value of the future is the revisions which must be used
* for state recovery by other components.
*/
- private final CompletableFuture<Long> recoveryFinishedFuture = new
CompletableFuture<>();
+ private final CompletableFuture<Revisions> recoveryFinishedFuture = new
CompletableFuture<>();
/**
* Future that gets completed after {@link #deployWatches} method has been
called.
@@ -196,6 +196,8 @@ public class MetaStorageManagerImpl implements
MetaStorageManager, MetastorageGr
private final MetastorageDivergencyValidator divergencyValidator = new
MetastorageDivergencyValidator();
+ private final RecoveryRevisionsListenerImpl recoveryRevisionsListener;
+
/**
* The constructor.
*
@@ -238,6 +240,9 @@ public class MetaStorageManagerImpl implements
MetaStorageManager, MetastorageGr
this.readOperationFromLeaderForCompactionTracker =
readOperationForCompactionTracker;
learnerManager = new MetaStorageLearnerManager(busyLock,
logicalTopologyService, metaStorageSvcFut);
+
+ recoveryRevisionsListener = new
RecoveryRevisionsListenerImpl(busyLock, recoveryFinishedFuture, storage);
+ storage.setRecoveryRevisionsListener(recoveryRevisionsListener);
}
/**
@@ -311,75 +316,39 @@ public class MetaStorageManagerImpl implements
MetaStorageManager, MetastorageGr
electionListeners.add(listener);
}
- private CompletableFuture<Long> recover(MetaStorageService service) {
- if (!busyLock.enterBusy()) {
- return failedFuture(new NodeStoppingException());
- }
-
- try {
- service.currentRevision().whenComplete((targetRevision, throwable)
-> {
- if (throwable != null) {
- recoveryFinishedFuture.completeExceptionally(throwable);
-
- return;
- }
-
- LOG.info("Performing MetaStorage recovery from revision {} to
{}", storage.revision(), targetRevision);
-
- assert targetRevision != null;
-
- listenForRecovery(targetRevision);
- }).whenComplete((res, ex) -> {
- if (ex != null) {
- LOG.info("Recovery failed", ex);
-
- recoveryFinishedFuture.completeExceptionally(ex);
- }
- });
-
- return recoveryFinishedFuture;
- } finally {
- busyLock.leaveBusy();
- }
- }
-
- private void listenForRecovery(long targetRevision) {
- LongConsumer listener = storageRevision -> {
- if (!busyLock.enterBusy()) {
- recoveryFinishedFuture.completeExceptionally(new
NodeStoppingException());
-
- return;
- }
-
- try {
- if (storageRevision < targetRevision) {
- return;
- }
-
- storage.setRecoveryRevisionListener(null);
-
- finishRecovery(targetRevision);
- } finally {
- busyLock.leaveBusy();
- }
- };
+ private CompletableFuture<?> recover(MetaStorageService service) {
+ return inBusyLockAsync(busyLock, () -> {
+ service.currentRevisions()
+ .thenAccept(targetRevisions -> {
+ assert targetRevisions != null;
- storage.setRecoveryRevisionListener(listener);
+ LOG.info("Performing MetaStorage recovery: [from={},
to={}]", storage.revisions(), targetRevisions);
- // Storage might be already up-to-date, so check here manually after
setting the listener.
- listener.accept(storage.revision());
- }
+
recoveryRevisionsListener.setTargetRevisions(targetRevisions.toRevisions());
+ }).whenComplete((res, throwable) -> {
+ if (throwable != null) {
+
recoveryFinishedFuture.completeExceptionally(throwable);
+ }
+ });
- private void finishRecovery(long targetRevision) {
- appliedRevision = targetRevision;
+ return recoveryFinishedFuture
+ .thenAccept(revisions -> {
+ long recoveryRevision = revisions.revision();
- if (targetRevision > 0) {
-
clusterTime.updateSafeTime(storage.timestampByRevision(targetRevision));
- }
+ appliedRevision = recoveryRevision;
- if (recoveryFinishedFuture.complete(targetRevision)) {
- LOG.info("Finished MetaStorage recovery");
- }
+ if (recoveryRevision > 0) {
+
clusterTime.updateSafeTime(storage.timestampByRevision(recoveryRevision));
+ }
+ })
+ .whenComplete((revisions, throwable) -> {
+ if (throwable != null) {
+ LOG.info("Recovery failed", throwable);
+ } else {
+ LOG.info("Finished MetaStorage recovery");
+ }
+ });
+ });
}
private CompletableFuture<MetaStorageServiceImpl>
reenterIfNeededAndInitializeMetaStorage(
@@ -694,6 +663,9 @@ public class MetaStorageManagerImpl implements
MetaStorageManager, MetastorageGr
public CompletableFuture<Void> startAsync(ComponentContext
componentContext) {
storage.start();
+ // Safe because we haven't started raft nodes yet and so no one has to
update storage locally.
+ recoveryRevisionsListener.onUpdate(storage.revisions());
+
cmgMgr.metaStorageInfo()
.thenCombine(cmgMgr.clusterState(),
MetaStorageInfoAndClusterState::new)
.thenCompose(infoAndState -> {
@@ -811,8 +783,8 @@ public class MetaStorageManagerImpl implements
MetaStorageManager, MetastorageGr
try {
return recoveryFinishedFuture
- .thenAccept(revision -> inBusyLock(busyLock, () -> {
- storage.startWatches(revision + 1, new
WatchEventHandlingCallback() {
+ .thenAccept(revisions -> inBusyLock(busyLock, () -> {
+ storage.startWatches(revisions.revision() + 1, new
WatchEventHandlingCallback() {
@Override
public void onSafeTimeAdvanced(HybridTimestamp
newSafeTime) {
MetaStorageManagerImpl.this.onSafeTimeAdvanced(newSafeTime);
@@ -1065,7 +1037,7 @@ public class MetaStorageManagerImpl implements
MetaStorageManager, MetastorageGr
}
@Override
- public CompletableFuture<Long> recoveryFinishedFuture() {
+ public CompletableFuture<Revisions> recoveryFinishedFuture() {
return recoveryFinishedFuture;
}
@@ -1167,9 +1139,9 @@ public class MetaStorageManagerImpl implements
MetaStorageManager, MetastorageGr
storage.unregisterRevisionUpdateListener(listener);
}
- /** Explicitly notifies revision update listeners. */
+ /** Explicitly notifies revisions update listeners. */
public CompletableFuture<Void> notifyRevisionUpdateListenerOnStart() {
- return
recoveryFinishedFuture.thenCompose(storage::notifyRevisionUpdateListenerOnStart);
+ return
recoveryFinishedFuture.thenApply(Revisions::revision).thenCompose(storage::notifyRevisionUpdateListenerOnStart);
}
/**
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageService.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageService.java
index 577c3b6319..b36d754185 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageService.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageService.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.close.ManuallyCloseable;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.command.response.ChecksumInfo;
+import org.apache.ignite.internal.metastorage.command.response.RevisionsInfo;
import org.apache.ignite.internal.metastorage.dsl.Condition;
import org.apache.ignite.internal.metastorage.dsl.Iif;
import org.apache.ignite.internal.metastorage.dsl.Operation;
@@ -249,10 +250,8 @@ public interface MetaStorageService extends
ManuallyCloseable {
*/
Publisher<Entry> prefix(ByteArray prefix, long revUpperBound);
- /**
- * Returns a future which will hold current revision of the metastorage
leader.
- */
- CompletableFuture<Long> currentRevision();
+ /** Returns a future which will hold {@link RevisionsInfo current
revisions} of the metastorage leader. */
+ CompletableFuture<RevisionsInfo> currentRevisions();
/**
* Returns information about a revision checksum on the leader.
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
index 5f867370ff..e7c5a5b7d6 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
@@ -41,7 +41,7 @@ import
org.apache.ignite.internal.metastorage.command.EvictIdempotentCommandsCac
import org.apache.ignite.internal.metastorage.command.GetAllCommand;
import org.apache.ignite.internal.metastorage.command.GetChecksumCommand;
import org.apache.ignite.internal.metastorage.command.GetCommand;
-import
org.apache.ignite.internal.metastorage.command.GetCurrentRevisionCommand;
+import
org.apache.ignite.internal.metastorage.command.GetCurrentRevisionsCommand;
import org.apache.ignite.internal.metastorage.command.InvokeCommand;
import
org.apache.ignite.internal.metastorage.command.MetaStorageCommandsFactory;
import org.apache.ignite.internal.metastorage.command.MultiInvokeCommand;
@@ -51,6 +51,7 @@ import
org.apache.ignite.internal.metastorage.command.RemoveAllCommand;
import org.apache.ignite.internal.metastorage.command.RemoveCommand;
import org.apache.ignite.internal.metastorage.command.SyncTimeCommand;
import org.apache.ignite.internal.metastorage.command.response.ChecksumInfo;
+import org.apache.ignite.internal.metastorage.command.response.RevisionsInfo;
import org.apache.ignite.internal.metastorage.dsl.Condition;
import org.apache.ignite.internal.metastorage.dsl.Iif;
import org.apache.ignite.internal.metastorage.dsl.Operation;
@@ -260,8 +261,8 @@ public class MetaStorageServiceImpl implements
MetaStorageService {
}
@Override
- public CompletableFuture<Long> currentRevision() {
- GetCurrentRevisionCommand cmd =
context.commandsFactory().getCurrentRevisionCommand().build();
+ public CompletableFuture<RevisionsInfo> currentRevisions() {
+ GetCurrentRevisionsCommand cmd =
context.commandsFactory().getCurrentRevisionsCommand().build();
return context.raftService().run(cmd);
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/RecoveryRevisionsListenerImpl.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/RecoveryRevisionsListenerImpl.java
new file mode 100644
index 0000000000..6498fd502b
--- /dev/null
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/RecoveryRevisionsListenerImpl.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.impl;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.metastorage.Revisions;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.RecoveryRevisionsListener;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+
+/** Implementation of {@link RecoveryRevisionsListener}. */
+class RecoveryRevisionsListenerImpl implements RecoveryRevisionsListener {
+ private final IgniteSpinBusyLock busyLock;
+
+ private final CompletableFuture<Revisions> recoveryFinishFuture;
+
+ private final KeyValueStorage storage;
+
+ private final ReentrantLock lock = new ReentrantLock();
+
+ /** Guarded by {@link #lock}. */
+ private Revisions targetRevisions;
+
+ /** Guarded by {@link #lock}. */
+ private Revisions currentRevisions;
+
+ RecoveryRevisionsListenerImpl(
+ IgniteSpinBusyLock busyLock,
+ CompletableFuture<Revisions> recoveryFinishFuture,
+ KeyValueStorage storage
+ ) {
+ this.busyLock = busyLock;
+ this.recoveryFinishFuture = recoveryFinishFuture;
+ this.storage = storage;
+ }
+
+ @Override
+ public void onUpdate(Revisions currentRevisions) {
+ lock.lock();
+
+ try {
+ this.currentRevisions = currentRevisions;
+
+ completeRecoveryFinishFutureIfPossible();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ void setTargetRevisions(Revisions targetRevisions) {
+ lock.lock();
+
+ try {
+ this.targetRevisions = targetRevisions;
+
+ completeRecoveryFinishFutureIfPossible();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void completeRecoveryFinishFutureIfPossible() {
+ if (!busyLock.enterBusy()) {
+ recoveryFinishFuture.completeExceptionally(new
NodeStoppingException());
+ }
+
+ try {
+ if (targetRevisions == null
+ || currentRevisions == null
+ || currentRevisions.revision() < targetRevisions.revision()
+ || currentRevisions.compactionRevision() <
targetRevisions.compactionRevision()) {
+ return;
+ }
+
+ storage.setRecoveryRevisionsListener(null);
+
+ recoveryFinishFuture.complete(currentRevisions);
+ } catch (Throwable t) {
+ recoveryFinishFuture.completeExceptionally(t);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
index f3fc6cc479..9a77c26a8c 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
@@ -34,7 +34,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.LongConsumer;
import java.util.function.Predicate;
import org.apache.ignite.internal.failure.FailureManager;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -42,6 +41,7 @@ import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.CompactionRevisionUpdateListener;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.RevisionUpdateListener;
+import org.apache.ignite.internal.metastorage.Revisions;
import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
import org.apache.ignite.internal.metastorage.impl.EntryImpl;
@@ -61,11 +61,12 @@ public abstract class AbstractKeyValueStorage implements
KeyValueStorage {
protected final WatchProcessor watchProcessor;
/**
- * Revision listener for recovery only. Notifies {@link
MetaStorageManagerImpl} of revision update.
+ * Revision listener for recovery only. Notifies {@link
MetaStorageManagerImpl} of current revisions update, {@code null} if recovery
+ * is complete.
*
* <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
*/
- private @Nullable LongConsumer recoveryRevisionListener;
+ private @Nullable RecoveryRevisionsListener recoveryRevisionListener;
/**
* Revision. Will be incremented for each single-entry or multi-entry
update operation.
@@ -88,6 +89,15 @@ public abstract class AbstractKeyValueStorage implements
KeyValueStorage {
/** Tracks only cursors, since reading a single entry or a batch is done
entirely under {@link #rwLock}. */
protected final ReadOperationForCompactionTracker
readOperationForCompactionTracker;
+ /**
+ * Events for notification of the {@link WatchProcessor} that were created
before the {@link #startWatches start of watches}, after the
+ * start of watches there will be {@code null}. Events are sorted by
{@link NotifyWatchProcessorEvent#timestamp} and are expected to
+ * have no duplicates.
+ *
+ * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
+ */
+ protected @Nullable TreeSet<NotifyWatchProcessorEvent>
notifyWatchProcessorEventsBeforeStartingWatches = new TreeSet<>();
+
/**
* Constructor.
*
@@ -115,11 +125,20 @@ public abstract class AbstractKeyValueStorage implements
KeyValueStorage {
protected abstract Value valueForOperation(byte[] key, long revision);
/**
- * Returns {@code true} if the storage is in the recovery state.
+ * Returns {@code true} if the metastorage is in the recovery state.
+ *
+ * <p>Method is expected to be invoked under {@link #rwLock}.</p>
+ */
+ private boolean isInRecoveryState() {
+ return recoveryRevisionListener != null;
+ }
+
+ /**
+ * Returns {@code true} if the watches have {@link #startWatches started}.
*
* <p>Method is expected to be invoked under {@link #rwLock}.</p>
*/
- protected abstract boolean isInRecoveryState();
+ protected abstract boolean areWatchesStarted();
@Override
public Entry get(byte[] key) {
@@ -214,6 +233,8 @@ public abstract class AbstractKeyValueStorage implements
KeyValueStorage {
assertCompactionRevisionLessThanCurrent(revision, rev);
compactionRevision = revision;
+
+ notifyRevisionsUpdate();
} finally {
rwLock.writeLock().unlock();
}
@@ -243,8 +264,12 @@ public abstract class AbstractKeyValueStorage implements
KeyValueStorage {
if (isInRecoveryState()) {
setCompactionRevision(compactionRevision);
- } else {
+ } else if (areWatchesStarted()) {
watchProcessor.updateCompactionRevision(compactionRevision,
context.timestamp);
+ } else {
+ var notifyWatchesEvent = new
UpdateCompactionRevisionEvent(compactionRevision, context.timestamp);
+
+
addToNotifyWatchProcessorEventsBeforeStartingWatches(notifyWatchesEvent);
}
} finally {
rwLock.writeLock().unlock();
@@ -289,7 +314,7 @@ public abstract class AbstractKeyValueStorage implements
KeyValueStorage {
}
@Override
- public void setRecoveryRevisionListener(@Nullable LongConsumer listener) {
+ public void setRecoveryRevisionsListener(@Nullable
RecoveryRevisionsListener listener) {
rwLock.writeLock().lock();
try {
@@ -339,10 +364,10 @@ public abstract class AbstractKeyValueStorage implements
KeyValueStorage {
}
/** Notifies of revision update. Must be called under the {@link #rwLock}.
*/
- protected void notifyRevisionUpdate() {
+ protected void notifyRevisionsUpdate() {
if (recoveryRevisionListener != null) {
// Listener must be invoked only on recovery, after recovery
listener must be null.
- recoveryRevisionListener.accept(rev);
+ recoveryRevisionListener.onUpdate(createCurrentRevisions());
}
}
@@ -430,4 +455,50 @@ public abstract class AbstractKeyValueStorage implements
KeyValueStorage {
return res;
}
+
+ @Override
+ public void advanceSafeTime(KeyValueUpdateContext context) {
+ rwLock.writeLock().lock();
+
+ try {
+ setIndexAndTerm(context.index, context.term);
+
+ if (areWatchesStarted()) {
+ watchProcessor.advanceSafeTime(context.timestamp);
+ }
+ } finally {
+ rwLock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public Revisions revisions() {
+ rwLock.readLock().lock();
+
+ try {
+ return createCurrentRevisions();
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ private Revisions createCurrentRevisions() {
+ return new Revisions(rev, compactionRevision);
+ }
+
+ protected void
addToNotifyWatchProcessorEventsBeforeStartingWatches(NotifyWatchProcessorEvent
event) {
+ assert !areWatchesStarted();
+
+ boolean added =
notifyWatchProcessorEventsBeforeStartingWatches.add(event);
+
+ assert added : event;
+ }
+
+ protected void drainNotifyWatchProcessorEventsBeforeStartingWatches() {
+ assert !areWatchesStarted();
+
+ notifyWatchProcessorEventsBeforeStartingWatches.forEach(event ->
event.notify(watchProcessor));
+
+ notifyWatchProcessorEventsBeforeStartingWatches = null;
+ }
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index 3e74e4e7d9..ec580af62d 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -21,13 +21,13 @@ import java.nio.file.Path;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import java.util.function.LongConsumer;
import org.apache.ignite.internal.close.ManuallyCloseable;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.metastorage.CommandId;
import org.apache.ignite.internal.metastorage.CompactionRevisionUpdateListener;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.RevisionUpdateListener;
+import org.apache.ignite.internal.metastorage.Revisions;
import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.metastorage.dsl.Operation;
import org.apache.ignite.internal.metastorage.dsl.StatementResult;
@@ -50,11 +50,7 @@ public interface KeyValueStorage extends ManuallyCloseable {
*/
void start();
- /**
- * Returns storage revision.
- *
- * @return Storage revision.
- */
+ /** Returns storage revision, {@code 0} if there have been no storage
update operations yet. */
long revision();
/**
@@ -477,12 +473,10 @@ public interface KeyValueStorage extends
ManuallyCloseable {
long revisionByTimestamp(HybridTimestamp timestamp);
/**
- * Sets the revision listener. This is needed only for the recovery, after
that listener must be set to {@code null}.
- * {@code null} means that we no longer must be notified of revision
updates for recovery, because recovery is finished.
- *
- * @param listener Revision listener.
+ * Sets the revisions listener. This is needed only for the recovery,
after that listener must be set to {@code null}.
+ * {@code null} means that we no longer must be notified of revisions
updates for recovery, because recovery is finished.
*/
- void setRecoveryRevisionListener(@Nullable LongConsumer listener);
+ void setRecoveryRevisionsListener(@Nullable RecoveryRevisionsListener
listener);
/** Registers a Meta Storage revision update listener. */
void registerRevisionUpdateListener(RevisionUpdateListener listener);
@@ -548,13 +542,15 @@ public interface KeyValueStorage extends
ManuallyCloseable {
* Updates the metastorage compaction revision.
*
* <p>Algorithm:</p>
- * <ul>
+ * <ol>
* <li>Invokes {@link #saveCompactionRevision}.</li>
- * <li>If the storage is in a recovery state ({@link #startWatches all
registered watches not started}), then
+ * <li>If the metastorage is in a recovery state (listener set via
{@link #setRecoveryRevisionsListener}), then
* {@link #setCompactionRevision} is invoked and the current method is
completed.</li>
+ * <li>If the watches have <b>not</b> {@link #startWatches started},
then it will postpone the execution of step 4 until the
+ * watches and the current method is completed.</li>
* <li>Otherwise, a new task (A) is added to the WatchEvent queue and
the current method is completed.</li>
* <li>Task (A) invokes {@link #setCompactionRevision} and invokes
{@link CompactionRevisionUpdateListener#onUpdate}.</li>
- * </ul>
+ * </ol>
*
* <p>Compaction revision is expected to be less than the {@link #revision
current storage revision}.</p>
*
@@ -585,4 +581,12 @@ public interface KeyValueStorage extends ManuallyCloseable
{
* Clears the content of the storage. Should only be called when no one
else uses this storage.
*/
void clear();
+
+ /**
+ * Returns current metastorage revisions.
+ *
+ * @see #revision()
+ * @see #getCompactionRevision()
+ */
+ Revisions revisions();
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetCurrentRevisionCommand.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/NotifyWatchProcessorEvent.java
similarity index 59%
copy from
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetCurrentRevisionCommand.java
copy to
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/NotifyWatchProcessorEvent.java
index 8c72249c39..2373cc3284 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetCurrentRevisionCommand.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/NotifyWatchProcessorEvent.java
@@ -15,12 +15,20 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.metastorage.command;
+package org.apache.ignite.internal.metastorage.server;
-import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.raft.ReadCommand;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
-/** Get command for MetaStorageCommandListener that retrieves current
revision. */
-@Transferable(MetastorageCommandsMessageGroup.GET_CURRENT_REVISION)
-public interface GetCurrentRevisionCommand extends ReadCommand {
+/** {@link WatchProcessor} notification events. */
+public interface NotifyWatchProcessorEvent extends
Comparable<NotifyWatchProcessorEvent> {
+ /** Event timestamp. */
+ HybridTimestamp timestamp();
+
+ /** Notifies the {@link WatchProcessor}. */
+ void notify(WatchProcessor watchProcessor);
+
+ @Override
+ default int compareTo(NotifyWatchProcessorEvent o) {
+ return timestamp().compareTo(o.timestamp());
+ }
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetCurrentRevisionCommand.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/RecoveryRevisionsListener.java
similarity index 56%
rename from
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetCurrentRevisionCommand.java
rename to
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/RecoveryRevisionsListener.java
index 8c72249c39..55d1519cb3 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetCurrentRevisionCommand.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/RecoveryRevisionsListener.java
@@ -15,12 +15,18 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.metastorage.command;
+package org.apache.ignite.internal.metastorage.server;
-import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.raft.ReadCommand;
+import org.apache.ignite.internal.metastorage.Revisions;
-/** Get command for MetaStorageCommandListener that retrieves current
revision. */
-@Transferable(MetastorageCommandsMessageGroup.GET_CURRENT_REVISION)
-public interface GetCurrentRevisionCommand extends ReadCommand {
+/** Listener update of {@link Revisions current metastorage revisions}, needed
only for metastorage recovery. */
+@FunctionalInterface
+public interface RecoveryRevisionsListener {
+ /**
+ * Invoked when one of the {@link Revisions current metastorage revisions}
is updated.
+ *
+ * <p>Until the method completes its execution, no update or compaction of
metastorage will occur, so the method should complete its
+ * execution as soon as possible.</p>
+ */
+ void onUpdate(Revisions currentRevisions);
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/UpdateCompactionRevisionEvent.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/UpdateCompactionRevisionEvent.java
new file mode 100644
index 0000000000..6d7f61e302
--- /dev/null
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/UpdateCompactionRevisionEvent.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.tostring.IgniteToStringInclude;
+import org.apache.ignite.internal.tostring.S;
+
+/** Notifier of {@link WatchProcessor} about updating the metastorage
compaction revision. */
+public class UpdateCompactionRevisionEvent implements
NotifyWatchProcessorEvent {
+ private final long compactionRevision;
+
+ @IgniteToStringInclude
+ private final HybridTimestamp timestamp;
+
+ /** Constructor. */
+ public UpdateCompactionRevisionEvent(long revision, HybridTimestamp
timestamp) {
+ compactionRevision = revision;
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public HybridTimestamp timestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public void notify(WatchProcessor watchProcessor) {
+ watchProcessor.updateCompactionRevision(compactionRevision, timestamp);
+ }
+
+ @Override
+ public String toString() {
+ return S.toString(this);
+ }
+}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/UpdateEntriesEvent.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/UpdateEntriesEvent.java
new file mode 100644
index 0000000000..0c2944e3f3
--- /dev/null
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/UpdateEntriesEvent.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import java.util.List;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.tostring.IgniteToStringInclude;
+import org.apache.ignite.internal.tostring.S;
+
+/** Notifier of {@link WatchProcessor} about updating metastorage {@link Entry
entries}. */
+public class UpdateEntriesEvent implements NotifyWatchProcessorEvent {
+ @IgniteToStringInclude
+ private final List<Entry> updatedEntries;
+
+ @IgniteToStringInclude
+ private final HybridTimestamp timestamp;
+
+ /** Constructor. */
+ public UpdateEntriesEvent(List<Entry> updatedEntries, HybridTimestamp
timestamp) {
+ this.updatedEntries = updatedEntries;
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public HybridTimestamp timestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public void notify(WatchProcessor watchProcessor) {
+ watchProcessor.notifyWatches(updatedEntries, timestamp);
+ }
+
+ @Override
+ public String toString() {
+ return S.toString(this);
+ }
+}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index 67a88affc9..d6a30e2dd8 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -63,6 +63,8 @@ import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -87,8 +89,10 @@ import org.apache.ignite.internal.metastorage.server.If;
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import org.apache.ignite.internal.metastorage.server.KeyValueUpdateContext;
import org.apache.ignite.internal.metastorage.server.MetastorageChecksum;
+import org.apache.ignite.internal.metastorage.server.NotifyWatchProcessorEvent;
import
org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker;
import org.apache.ignite.internal.metastorage.server.Statement;
+import org.apache.ignite.internal.metastorage.server.UpdateEntriesEvent;
import org.apache.ignite.internal.metastorage.server.Value;
import
org.apache.ignite.internal.metastorage.server.WatchEventHandlingCallback;
import org.apache.ignite.internal.raft.IndexWithTerm;
@@ -228,15 +232,6 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
*/
private final AtomicReference<RecoveryStatus> recoveryStatus = new
AtomicReference<>(RecoveryStatus.INITIAL);
- /**
- * Buffer used to cache new events while an event replay is in progress.
After replay finishes, the cache gets drained and is never
- * used again.
- *
- * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
- */
- @Nullable
- private List<UpdatedEntries> eventCache;
-
/**
* Current list of updated entries.
*
@@ -491,7 +486,7 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
compactionRevision = bytesToLong(compactionRevisionBytes);
}
- notifyRevisionUpdate();
+ notifyRevisionsUpdate();
} catch (MetaStorageException e) {
throw e;
} catch (Exception e) {
@@ -652,7 +647,7 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
queueWatchEvent();
- notifyRevisionUpdate();
+ notifyRevisionsUpdate();
}
private boolean validateNoChecksumConflict(long newRev, long newChecksum)
throws RocksDBException {
@@ -926,7 +921,19 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
}
if (currentRevision != 0) {
- replayUpdates(startRevision, currentRevision);
+ Set<NotifyWatchProcessorEvent> fromStorage =
collectNotifyWatchProcessorEventsFromStorage(startRevision, currentRevision);
+
+ rwLock.writeLock().lock();
+
+ try {
+
notifyWatchProcessorEventsBeforeStartingWatches.addAll(fromStorage);
+
+ drainNotifyWatchProcessorEventsBeforeStartingWatches();
+
+ recoveryStatus.set(RecoveryStatus.DONE);
+ } finally {
+ rwLock.writeLock().unlock();
+ }
}
}
@@ -1090,17 +1097,14 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
updatedEntries.clear();
break;
-
case IN_PROGRESS:
- // Buffer the event while event replay is still in progress.
- if (eventCache == null) {
- eventCache = new ArrayList<>();
- }
+ UpdatedEntries copy = updatedEntries.transfer();
- eventCache.add(updatedEntries.transfer());
+ var event = new UpdateEntriesEvent(copy.updatedEntries,
copy.ts);
- break;
+ addToNotifyWatchProcessorEventsBeforeStartingWatches(event);
+ break;
default:
notifyWatches();
@@ -1115,19 +1119,18 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
watchProcessor.notifyWatches(copy.updatedEntries, copy.ts);
}
- private void replayUpdates(long lowerRevision, long upperRevision) {
+ private Set<NotifyWatchProcessorEvent>
collectNotifyWatchProcessorEventsFromStorage(long lowerRevision, long
upperRevision) {
long minWatchRevision = Math.max(lowerRevision,
watchProcessor.minWatchRevision().orElse(-1));
- if (minWatchRevision == -1 || minWatchRevision > upperRevision) {
- // No events to replay, we can start processing more recent events
from the event queue.
- finishReplay();
-
- return;
+ if (minWatchRevision > upperRevision) {
+ return Set.of();
}
var updatedEntries = new ArrayList<Entry>();
HybridTimestamp ts = null;
+ var events = new TreeSet<NotifyWatchProcessorEvent>();
+
try (
var upperBound = new Slice(longToBytes(upperRevision + 1));
var options = new
ReadOptions().setIterateUpperBound(upperBound);
@@ -1149,7 +1152,11 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
assert ts != null : revision;
- watchProcessor.notifyWatches(updatedEntriesCopy, ts);
+ var event = new UpdateEntriesEvent(updatedEntriesCopy,
ts);
+
+ boolean added = events.add(event);
+
+ assert added : event;
updatedEntries.clear();
@@ -1173,15 +1180,19 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
throw new MetaStorageException(OP_EXECUTION_ERR, e);
}
- // Notify about the events left after finishing the loop above.
+ // Adds event left after finishing the loop above.
if (!updatedEntries.isEmpty()) {
assert ts != null;
- watchProcessor.notifyWatches(updatedEntries, ts);
+ var event = new UpdateEntriesEvent(updatedEntries, ts);
+
+ boolean added = events.add(event);
+
+ assert added : event;
}
}
- finishReplay();
+ return events;
}
@Override
@@ -1229,28 +1240,6 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
}
}
- private void finishReplay() {
- // Take the lock to drain the event cache and prevent new events from
being cached. Since event notification is asynchronous,
- // this lock shouldn't be held for long.
- rwLock.writeLock().lock();
-
- try {
- if (eventCache != null) {
- eventCache.forEach(entries -> {
- assert entries.ts != null;
-
- watchProcessor.notifyWatches(entries.updatedEntries,
entries.ts);
- });
-
- eventCache = null;
- }
-
- recoveryStatus.set(RecoveryStatus.DONE);
- } finally {
- rwLock.writeLock().unlock();
- }
- }
-
@TestOnly
public Path getDbPath() {
return dbPath;
@@ -1296,21 +1285,6 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
}
}
- @Override
- public void advanceSafeTime(KeyValueUpdateContext context) {
- rwLock.writeLock().lock();
-
- try {
- setIndexAndTerm(context.index, context.term);
-
- if (!isInRecoveryState()) {
- watchProcessor.advanceSafeTime(context.timestamp);
- }
- } finally {
- rwLock.writeLock().unlock();
- }
- }
-
@Override
protected void saveCompactionRevision(long revision, KeyValueUpdateContext
context, boolean advanceSafeTime) {
try (WriteBatch batch = new WriteBatch()) {
@@ -1320,7 +1294,7 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
db.write(defaultWriteOptions, batch);
- if (advanceSafeTime && !isInRecoveryState()) {
+ if (advanceSafeTime && areWatchesStarted()) {
watchProcessor.advanceSafeTime(context.timestamp);
}
} catch (Throwable t) {
@@ -1512,8 +1486,8 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
}
@Override
- protected boolean isInRecoveryState() {
- return recoveryStatus.get() != RecoveryStatus.DONE;
+ protected boolean areWatchesStarted() {
+ return recoveryStatus.get() == RecoveryStatus.DONE;
}
private @Nullable Value getValueForOperationNullable(byte[] key, long
revision) {
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
index f299d8ac93..6d4c82a788 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
@@ -29,15 +29,17 @@ import java.util.List;
import java.util.function.Consumer;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.Revisions;
import org.apache.ignite.internal.metastorage.command.GetAllCommand;
import org.apache.ignite.internal.metastorage.command.GetChecksumCommand;
import org.apache.ignite.internal.metastorage.command.GetCommand;
-import
org.apache.ignite.internal.metastorage.command.GetCurrentRevisionCommand;
+import
org.apache.ignite.internal.metastorage.command.GetCurrentRevisionsCommand;
import org.apache.ignite.internal.metastorage.command.GetPrefixCommand;
import org.apache.ignite.internal.metastorage.command.GetRangeCommand;
import org.apache.ignite.internal.metastorage.command.PaginationCommand;
import org.apache.ignite.internal.metastorage.command.response.BatchResponse;
import org.apache.ignite.internal.metastorage.command.response.ChecksumInfo;
+import org.apache.ignite.internal.metastorage.command.response.RevisionsInfo;
import org.apache.ignite.internal.metastorage.server.ChecksumAndRevisions;
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
@@ -148,10 +150,10 @@ public class MetaStorageListener implements
RaftGroupListener, BeforeApplyHandle
byte[] keyTo = storage.nextKey(prefix);
clo.result(handlePaginationCommand(keyFrom, keyTo,
prefixCmd));
- } else if (command instanceof GetCurrentRevisionCommand) {
- long revision = storage.revision();
+ } else if (command instanceof GetCurrentRevisionsCommand) {
+ Revisions currentRevisions = storage.revisions();
- clo.result(revision);
+ clo.result(RevisionsInfo.of(currentRevisions));
} else if (command instanceof GetChecksumCommand) {
ChecksumAndRevisions checksumInfo =
storage.checksumAndRevisions(((GetChecksumCommand) command).revision());
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java
index 7ef5a979a4..f766997ae4 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java
@@ -37,7 +37,8 @@ import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
-import
org.apache.ignite.internal.metastorage.command.GetCurrentRevisionCommand;
+import
org.apache.ignite.internal.metastorage.command.GetCurrentRevisionsCommand;
+import org.apache.ignite.internal.metastorage.command.response.RevisionsInfo;
import
org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration;
import
org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker;
import
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
@@ -82,7 +83,8 @@ public class MetaStorageDeployWatchesCorrectnessTest extends
IgniteAbstractTest
when(clusterService.nodeName()).thenReturn(mcNodeName);
when(raftManager.startRaftGroupNodeAndWaitNodeReady(any(), any(),
any(), any(), any(), any(), any()))
.thenReturn(raftGroupService);
-
when(raftGroupService.run(any(GetCurrentRevisionCommand.class))).thenAnswer(invocation
-> completedFuture(0L));
+ when(raftGroupService.run(any(GetCurrentRevisionsCommand.class)))
+ .thenAnswer(invocation -> completedFuture(new RevisionsInfo(0,
-1)));
var readOperationForCompactionTracker = new
ReadOperationForCompactionTracker();
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java
index 77ad87ff8f..1ec9a9f37c 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java
@@ -41,7 +41,8 @@ import
org.apache.ignite.internal.configuration.testframework.InjectConfiguratio
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.manager.ComponentContext;
-import
org.apache.ignite.internal.metastorage.command.GetCurrentRevisionCommand;
+import
org.apache.ignite.internal.metastorage.command.GetCurrentRevisionsCommand;
+import org.apache.ignite.internal.metastorage.command.response.RevisionsInfo;
import
org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration;
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import
org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker;
@@ -103,13 +104,13 @@ public class MetaStorageManagerRecoveryTest extends
BaseIgniteAbstractTest {
);
}
- private RaftManager raftManager(long remoteRevision) throws Exception {
+ private static RaftManager raftManager(long remoteRevision) throws
Exception {
RaftManager raft = mock(RaftManager.class);
RaftGroupService service = mock(TopologyAwareRaftGroupService.class);
- when(service.run(any(GetCurrentRevisionCommand.class)))
- .thenAnswer(invocation -> completedFuture(remoteRevision));
+ when(service.run(any(GetCurrentRevisionsCommand.class)))
+ .thenAnswer(invocation -> completedFuture(new
RevisionsInfo(remoteRevision, -1)));
when(raft.startRaftGroupNodeAndWaitNodeReady(any(), any(), any(),
any(), any(), any(), any()))
.thenAnswer(invocation -> service);
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
index 5c47a00d69..d88479b249 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.metastorage.server;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
@@ -1021,10 +1022,30 @@ public abstract class
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
/** Tests {@link KeyValueStorage#updateCompactionRevision} case from
method description when storage is in recovery state. */
@Test
- void testUpdateCompactionRevisionWithoutStartWatches() {
+ void testUpdateCompactionRevisionAndStorageInRecoveryState() {
+ storage.setRecoveryRevisionsListener(currentRevisions -> {});
storage.updateCompactionRevision(1, kvContext(clock.now()));
assertEquals(1, storage.getCompactionRevision());
+ storage.setRecoveryRevisionsListener(null);
+ storage.updateCompactionRevision(2, kvContext(clock.now()));
+ assertEquals(1, storage.getCompactionRevision());
+
+ assertThat(
+ allOf(
+ updateCompactionRevisionInWatchEvenQueue.waitFor(1L),
+ updateCompactionRevisionInWatchEvenQueue.waitFor(2L)
+ ),
+ willTimeoutFast()
+ );
+ }
+
+ /** Tests {@link KeyValueStorage#updateCompactionRevision} case from
method description when storage is in recovery state. */
+ @Test
+ void testUpdateCompactionRevisionWithoutStartWatches() {
+ storage.updateCompactionRevision(1, kvContext(clock.now()));
+ assertEquals(-1, storage.getCompactionRevision());
+
assertThat(updateCompactionRevisionInWatchEvenQueue.waitFor(1L),
willTimeoutFast());
}
diff --git
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index e7d525223b..78955c6d81 100644
---
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -226,7 +226,7 @@ public class SimpleInMemoryKeyValueStorage extends
AbstractKeyValueStorage {
notifyWatches();
- notifyRevisionUpdate();
+ notifyRevisionsUpdate();
}
@Override
@@ -462,23 +462,21 @@ public class SimpleInMemoryKeyValueStorage extends
AbstractKeyValueStorage {
rwLock.readLock().lock();
try {
- areWatchesEnabled = true;
-
watchProcessor.setWatchEventHandlingCallback(callback);
- replayUpdates(startRevision);
+ fillNotifyWatchProcessorEventsFromStorage(startRevision);
+
+ drainNotifyWatchProcessorEventsBeforeStartingWatches();
+
+ areWatchesEnabled = true;
} finally {
rwLock.readLock().unlock();
}
}
- private void replayUpdates(long startRevision) {
+ private void fillNotifyWatchProcessorEventsFromStorage(long startRevision)
{
long minWatchRevision = Math.max(startRevision,
watchProcessor.minWatchRevision().orElse(-1));
- if (minWatchRevision <= 0) {
- return;
- }
-
revsIdx.tailMap(minWatchRevision)
.forEach((revision, entries) -> {
entries.forEach((key, value) -> {
@@ -487,12 +485,29 @@ public class SimpleInMemoryKeyValueStorage extends
AbstractKeyValueStorage {
updatedEntries.add(entry);
});
- notifyWatches();
+ fillNotifyWatchProcessorEventsFromUpdatedEntries();
});
}
+ private void fillNotifyWatchProcessorEventsFromUpdatedEntries() {
+ if (updatedEntries.isEmpty()) {
+ return;
+ }
+
+ long revision = updatedEntries.get(0).revision();
+
+ HybridTimestamp ts = revToTsMap.get(revision);
+ assert ts != null : revision;
+
+ var event = new UpdateEntriesEvent(List.copyOf(updatedEntries), ts);
+
+ addToNotifyWatchProcessorEventsBeforeStartingWatches(event);
+
+ updatedEntries.clear();
+ }
+
private void notifyWatches() {
- if (isInRecoveryState() || updatedEntries.isEmpty()) {
+ if (!areWatchesStarted() || updatedEntries.isEmpty()) {
updatedEntries.clear();
return;
@@ -632,6 +647,8 @@ public class SimpleInMemoryKeyValueStorage extends
AbstractKeyValueStorage {
savedCompactionRevision = snapshot.savedCompactionRevision;
term = snapshot.term;
index = snapshot.index;
+
+ notifyRevisionsUpdate();
} catch (Throwable t) {
throw new MetaStorageException(RESTORING_STORAGE_ERR, t);
} finally {
@@ -745,28 +762,13 @@ public class SimpleInMemoryKeyValueStorage extends
AbstractKeyValueStorage {
updateRevision(curRev, context);
}
- @Override
- public void advanceSafeTime(KeyValueUpdateContext context) {
- rwLock.writeLock().lock();
-
- try {
- setIndexAndTerm(context.index, context.term);
-
- if (!isInRecoveryState()) {
- watchProcessor.advanceSafeTime(context.timestamp);
- }
- } finally {
- rwLock.writeLock().unlock();
- }
- }
-
@Override
public void saveCompactionRevision(long revision, KeyValueUpdateContext
context, boolean advanceSafeTime) {
savedCompactionRevision = revision;
setIndexAndTerm(context.index, context.term);
- if (advanceSafeTime && !isInRecoveryState()) {
+ if (advanceSafeTime && areWatchesStarted()) {
watchProcessor.advanceSafeTime(context.timestamp);
}
}
@@ -845,8 +847,8 @@ public class SimpleInMemoryKeyValueStorage extends
AbstractKeyValueStorage {
}
@Override
- protected boolean isInRecoveryState() {
- return !areWatchesEnabled;
+ protected boolean areWatchesStarted() {
+ return areWatchesEnabled;
}
private @Nullable Value getValueNullable(byte[] key, long revision) {
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index f359355777..2836a9a508 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -103,6 +103,7 @@ import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.Revisions;
import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.metastorage.dsl.Condition;
@@ -256,11 +257,11 @@ public class PartitionReplicaLifecycleManager extends
return nullCompletedFuture();
}
- CompletableFuture<Long> recoveryFinishFuture =
metaStorageMgr.recoveryFinishedFuture();
+ CompletableFuture<Revisions> recoveryFinishFuture =
metaStorageMgr.recoveryFinishedFuture();
assert recoveryFinishFuture.isDone();
- long recoveryRevision = recoveryFinishFuture.join();
+ long recoveryRevision = recoveryFinishFuture.join().revision();
cleanUpResourcesForDroppedZonesOnRecovery();
diff --git
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
index 670ef7292a..67adde0cca 100644
---
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
+++
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.Revisions;
import org.apache.ignite.internal.metastorage.dsl.Operation;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.raft.Loza;
@@ -76,7 +77,7 @@ public class ActiveActorTest extends
AbstractTopologyAwareGroupServiceTest {
@BeforeEach
public void setUp() {
- when(msm.recoveryFinishedFuture()).thenReturn(completedFuture(0L));
+ when(msm.recoveryFinishedFuture()).thenReturn(completedFuture(new
Revisions(0, -1)));
when(msm.invoke(any(), any(Operation.class),
any(Operation.class))).thenReturn(trueCompletedFuture());
when(msm.getLocally(any(), anyLong())).then(invocation ->
emptyMetastoreEntry());
when(msm.getLocally(any(), any(), anyLong())).then(invocation ->
Cursor.fromIterable(List.of()));
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
index 5f85110c7c..14a08087e3 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
@@ -87,9 +87,9 @@ public class AssignmentsTracker implements
AssignmentsPlacementDriver {
public void startTrack() {
msManager.registerPrefixWatch(ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX),
assignmentsListener);
- msManager.recoveryFinishedFuture().thenAccept(recoveryRevision -> {
+ msManager.recoveryFinishedFuture().thenAccept(recoveryRevisions -> {
try (Cursor<Entry> cursor =
msManager.getLocally(ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX),
-
ByteArray.fromString(incrementLastChar(STABLE_ASSIGNMENTS_PREFIX)),
recoveryRevision);
+
ByteArray.fromString(incrementLastChar(STABLE_ASSIGNMENTS_PREFIX)),
recoveryRevisions.revision());
) {
for (Entry entry : cursor) {
if (entry.tombstone()) {
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
index ea3e7e4e88..058247af6e 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.Revisions;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.partitiondistribution.TokenizedAssignments;
import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
@@ -266,11 +267,11 @@ public class PlacementDriverManager implements
IgniteComponent {
}
private void recoverInternalComponentsBusy() {
- CompletableFuture<Long> recoveryFinishedFuture =
metastore.recoveryFinishedFuture();
+ CompletableFuture<Revisions> recoveryFinishedFuture =
metastore.recoveryFinishedFuture();
assert recoveryFinishedFuture.isDone();
- long recoveryRevision = recoveryFinishedFuture.join();
+ long recoveryRevision = recoveryFinishedFuture.join().revision();
leaseTracker.startTrack(recoveryRevision);
}
diff --git
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java
index 9a7cd175c4..813e54a53a 100644
---
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java
+++
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java
@@ -56,6 +56,7 @@ import org.apache.ignite.internal.hlc.TestClockService;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.Revisions;
import org.apache.ignite.internal.metastorage.dsl.Condition;
import org.apache.ignite.internal.metastorage.dsl.Operation;
import org.apache.ignite.internal.metastorage.dsl.OperationImpl;
@@ -127,7 +128,7 @@ public class LeaseUpdaterTest extends
BaseIgniteAbstractTest {
when(clusterService.messagingService()).thenReturn(mock(MessagingService.class));
lenient().when(leaseTracker.leasesCurrent()).thenReturn(leases);
lenient().when(leaseTracker.getLease(any(ReplicationGroupId.class))).then(i ->
Lease.emptyLease(i.getArgument(0)));
-
when(metaStorageManager.recoveryFinishedFuture()).thenReturn(completedFuture(1L));
+
when(metaStorageManager.recoveryFinishedFuture()).thenReturn(completedFuture(new
Revisions(1, -1)));
when(metaStorageManager.getLocally(any(ByteArray.class),
any(ByteArray.class), anyLong())).thenReturn(mcEntriesCursor);
when(topologyService.logicalTopologyOnLeader()).thenReturn(completedFuture(new
LogicalTopologySnapshot(1, List.of(node))));
diff --git
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
index 0563fbea01..5ca6e19d4b 100644
---
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
+++
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
@@ -61,6 +61,7 @@ import org.apache.ignite.internal.hlc.TestClockService;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.Revisions;
import org.apache.ignite.internal.metastorage.dsl.Conditions;
import
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
import org.apache.ignite.internal.network.ClusterNodeImpl;
@@ -183,11 +184,11 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
assertThat(metastore.startAsync(new ComponentContext()),
willCompleteSuccessfully());
- CompletableFuture<Long> recoveryFinishedFuture =
metastore.recoveryFinishedFuture();
+ CompletableFuture<Revisions> recoveryFinishedFuture =
metastore.recoveryFinishedFuture();
assertThat(recoveryFinishedFuture, willCompleteSuccessfully());
- leasePlacementDriver.startTrack(recoveryFinishedFuture.join());
+
leasePlacementDriver.startTrack(recoveryFinishedFuture.join().revision());
assignmentsPlacementDriver.startTrack();
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 57b44c78e5..2da26a44a1 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -1553,7 +1553,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
TableImpl table =
unwrapTableImpl(restartedNode.tables().table(TABLE_NAME));
- long recoveryRevision =
restartedNode.metaStorageManager().recoveryFinishedFuture().join();
+ long recoveryRevision =
restartedNode.metaStorageManager().recoveryFinishedFuture().join().revision();
PeersAndLearners configuration =
PeersAndLearners.fromConsistentIds(nodes.stream().map(IgniteImpl::name)
.collect(toSet()), Set.of());
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index a19fcca3e9..ad7c8a2d88 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -1737,9 +1737,8 @@ public class IgniteImpl implements Ignite {
CompletableFuture<Void> startupRevisionUpdate =
metaStorageMgr.notifyRevisionUpdateListenerOnStart();
return CompletableFuture.allOf(startupConfigurationUpdate,
startupRevisionUpdate, startFuture)
- .thenComposeAsync(t -> {
+ .thenComposeAsync(unused -> {
// Deploy all registered watches because all components
are ready and have registered their listeners.
- // TODO: IGNITE-23292 Run local metastore compaction after
start watches for the latest compacted revision
return metaStorageMgr.deployWatches();
}, startupExecutor);
}
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
index bdee06027f..d7b790d217 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
@@ -179,7 +179,7 @@ public class DistributedConfigurationStorage implements
ConfigurationStorage {
@Override
public CompletableFuture<Data> readDataOnRecovery() throws
StorageException {
CompletableFuture<Data> future =
metaStorageMgr.recoveryFinishedFuture()
- .thenApplyAsync(this::readDataOnRecovery0, threadPool);
+ .thenApplyAsync(revisions ->
readDataOnRecovery0(revisions.revision()), threadPool);
return registerFuture(future);
}
@@ -329,7 +329,7 @@ public class DistributedConfigurationStorage implements
ConfigurationStorage {
@Override
public CompletableFuture<Long> localRevision() {
return metaStorageMgr.recoveryFinishedFuture()
- .thenApply(rev -> metaStorageMgr.getLocally(MASTER_KEY,
rev).revision());
+ .thenApply(revisions -> metaStorageMgr.getLocally(MASTER_KEY,
revisions.revision()).revision());
}
private <T> CompletableFuture<T> registerFuture(CompletableFuture<T>
future) {
diff --git
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/BaseIgniteRestartTest.java
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/BaseIgniteRestartTest.java
index 2ae7bbf4eb..0050fe6604 100644
---
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/BaseIgniteRestartTest.java
+++
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/BaseIgniteRestartTest.java
@@ -58,6 +58,7 @@ import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.Revisions;
import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.testframework.TestIgnitionManager;
@@ -288,11 +289,11 @@ public abstract class BaseIgniteRestartTest extends
IgniteAbstractTest {
assertThat("Partial node was not started", startFuture,
willCompleteSuccessfully());
- Long recoveryRevision =
metaStorageMgr.recoveryFinishedFuture().getNow(null);
+ Revisions recoveryRevisions =
metaStorageMgr.recoveryFinishedFuture().getNow(null);
- assertNotNull(recoveryRevision);
+ assertNotNull(recoveryRevisions);
- log.info("Completed recovery on partially started node, MetaStorage
revision recovered to: " + recoveryRevision);
+ log.info("Completed recovery on partially started node, MetaStorage
revision recovered to: " + recoveryRevisions);
return new PartialNode(
name,
diff --git
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaManagerTest.java
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaManagerTest.java
index b3c040a0ab..683b6d2361 100644
---
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaManagerTest.java
+++
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaManagerTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.schema;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static
org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static
org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor.INITIAL_TABLE_VERSION;
@@ -51,6 +52,7 @@ import
org.apache.ignite.internal.catalog.events.CreateTableEventParameters;
import org.apache.ignite.internal.event.EventListener;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.Revisions;
import
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.sql.ColumnType;
@@ -271,7 +273,7 @@ class SchemaManagerTest extends BaseIgniteAbstractTest {
when(catalogService.latestCatalogVersion()).thenReturn(2);
when(catalogService.tables(anyInt())).thenReturn(List.of(tableDescriptorAfterColumnAddition()));
-
doReturn(CompletableFuture.completedFuture(CAUSALITY_TOKEN_2)).when(metaStorageManager).recoveryFinishedFuture();
+ doReturn(completedFuture(new Revisions(CAUSALITY_TOKEN_2,
-1))).when(metaStorageManager).recoveryFinishedFuture();
schemaManager = new SchemaManager(registry, catalogService);
assertThat(schemaManager.startAsync(new ComponentContext()),
willCompleteSuccessfully());
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 51ae06fa02..73a2d5a79a 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -129,6 +129,7 @@ import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.marshaller.ReflectionMarshallersProvider;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.Revisions;
import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.metastorage.dsl.Condition;
@@ -610,11 +611,11 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
fullStateTransferIndexChooser.start();
- CompletableFuture<Long> recoveryFinishFuture =
metaStorageMgr.recoveryFinishedFuture();
+ CompletableFuture<Revisions> recoveryFinishFuture =
metaStorageMgr.recoveryFinishedFuture();
assert recoveryFinishFuture.isDone();
- long recoveryRevision = recoveryFinishFuture.join();
+ long recoveryRevision = recoveryFinishFuture.join().revision();
cleanUpResourcesForDroppedTablesOnRecoveryBusy();
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorage.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorage.java
index afaf721fef..c779af32af 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorage.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorage.java
@@ -79,6 +79,7 @@ import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.Revisions;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.versioned.VersionedSerialization;
import org.jetbrains.annotations.Nullable;
@@ -398,11 +399,11 @@ public class IndexMetaStorage implements IgniteComponent {
}
private Map<Integer, IndexMeta> readAllFromMetastoreOnRecovery() {
- CompletableFuture<Long> recoveryFinishedFuture =
metaStorageManager.recoveryFinishedFuture();
+ CompletableFuture<Revisions> recoveryFinishedFuture =
metaStorageManager.recoveryFinishedFuture();
assert recoveryFinishedFuture.isDone();
- long recoveryRevision = recoveryFinishedFuture.join();
+ long recoveryRevision = recoveryFinishedFuture.join().revision();
try (Cursor<Entry> cursor =
metaStorageManager.prefixLocally(ByteArray.fromString(INDEX_META_VALUE_KEY_PREFIX),
recoveryRevision)) {
return cursor.stream()
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 1e5bf4ee0d..7e2f408648 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -96,6 +96,7 @@ import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.lowwatermark.TestLowWatermark;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.Revisions;
import org.apache.ignite.internal.metastorage.dsl.Operation;
import
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
import org.apache.ignite.internal.network.ClusterNodeImpl;
@@ -680,7 +681,7 @@ public class TableManagerTest extends IgniteAbstractTest {
doReturn(mock(PartitionTimestampCursor.class)).when(mvPartitionStorage).scan(any());
when(txStateStorage.clear()).thenReturn(nullCompletedFuture());
- when(msm.recoveryFinishedFuture()).thenReturn(completedFuture(2L));
+ when(msm.recoveryFinishedFuture()).thenReturn(completedFuture(new
Revisions(2, -1)));
// For some reason, "when(something).thenReturn" does not work on
spies, but this notation works.
createTableManager(tblManagerFut, (mvTableStorage) -> {
@@ -722,7 +723,7 @@ public class TableManagerTest extends IgniteAbstractTest {
when(msm.invoke(any(), anyList(),
anyList())).thenReturn(trueCompletedFuture());
when(msm.get(any())).thenReturn(nullCompletedFuture());
- when(msm.recoveryFinishedFuture()).thenReturn(completedFuture(2L));
+ when(msm.recoveryFinishedFuture()).thenReturn(completedFuture(new
Revisions(2, -1)));
when(msm.prefixLocally(any(),
anyLong())).thenReturn(CursorUtils.emptyCursor());
}