This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new d8db84df26 IGNITE-23293 Implement the algorithm of Metastorage
Compaction Trigger (#4632)
d8db84df26 is described below
commit d8db84df260948b6dcc5f54fed34ebdc043a2904
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Fri Oct 25 19:57:10 2024 +0300
IGNITE-23293 Implement the algorithm of Metastorage Compaction Trigger
(#4632)
---
modules/metastorage/build.gradle | 1 +
.../impl/ItMetaStorageCompactionTriggerTest.java | 162 +++++++++++
...MetaStorageSafeTimePropagationAbstractTest.java | 9 +-
.../CompactionCommand.java} | 34 +--
.../command/MetastorageCommandsMessageGroup.java | 3 +
.../impl/MetaStorageCompactionTrigger.java | 298 +++++++++++++++++++++
.../metastorage/impl/MetaStorageManagerImpl.java | 22 +-
.../metastorage/impl/MetaStorageServiceImpl.java | 16 ++
.../server/AbstractKeyValueStorage.java | 105 +++-----
.../metastorage/server/KeyValueStorage.java | 53 ++--
.../metastorage/server/KeyValueUpdateContext.java | 10 +
...llback.java => WatchEventHandlingCallback.java} | 24 +-
.../metastorage/server/WatchProcessor.java | 40 +--
.../server/persistence/RocksDbKeyValueStorage.java | 27 +-
.../server/raft/MetaStorageListener.java | 12 +-
.../server/raft/MetaStorageWriteHandler.java | 11 +-
.../server/raft/MetastorageGroupId.java | 3 +-
.../AbstractCompactionKeyValueStorageTest.java | 83 ++----
.../server/BasicOperationsKeyValueStorageTest.java | 16 +-
.../metastorage/server/WatchProcessorTest.java | 12 +-
.../server/SimpleInMemoryKeyValueStorage.java | 31 +--
.../org/apache/ignite/internal/app/IgniteImpl.java | 3 +-
22 files changed, 683 insertions(+), 292 deletions(-)
diff --git a/modules/metastorage/build.gradle b/modules/metastorage/build.gradle
index 7fa001c281..2badd58d49 100644
--- a/modules/metastorage/build.gradle
+++ b/modules/metastorage/build.gradle
@@ -75,6 +75,7 @@ dependencies {
integrationTestImplementation
testFixtures(project(':ignite-cluster-management'))
integrationTestImplementation
testFixtures(project(':ignite-failure-handler'))
integrationTestImplementation testFixtures(project(':ignite-metrics:'))
+ integrationTestImplementation testFixtures(project(':ignite-runner:'))
testFixturesImplementation project(':ignite-cluster-management')
testFixturesImplementation project(':ignite-core')
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageCompactionTriggerTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageCompactionTriggerTest.java
new file mode 100644
index 0000000000..4bfdf58a38
--- /dev/null
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageCompactionTriggerTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.impl;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static
org.apache.ignite.internal.metastorage.impl.MetaStorageCompactionTrigger.COMPACTION_DATA_AVAILABILITY_TIME_PROPERTY;
+import static
org.apache.ignite.internal.metastorage.impl.MetaStorageCompactionTrigger.COMPACTION_INTERVAL_PROPERTY;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
+import org.apache.ignite.internal.TestWrappers;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
+import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
+import org.apache.ignite.internal.testframework.WithSystemProperty;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+/** Integration test for {@link MetaStorageCompactionTrigger}. */
+@WithSystemProperty(key = COMPACTION_INTERVAL_PROPERTY, value = "10")
+@WithSystemProperty(key = COMPACTION_DATA_AVAILABILITY_TIME_PROPERTY, value =
"10")
+public class ItMetaStorageCompactionTriggerTest extends
ClusterPerClassIntegrationTest {
+ private static final ByteArray FOO_KEY = ByteArray.fromString("foo_key");
+
+ private static final byte[] VALUE = ByteArray.fromString("value").bytes();
+
+ @Override
+ protected int initialNodes() {
+ return 2;
+ }
+
+ @Override
+ protected int[] cmgMetastoreNodes() {
+ assertEquals(2, initialNodes());
+
+ return new int[] {0, 1};
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testMetastorageCompactionOccursOnAllNodes(boolean
changeMetastorageLeader) throws Exception {
+ MetaStorageManager metaStorageManager =
aliveNode().metaStorageManager();
+
+ var updateFooKeyLatch = new CountDownLatch(2);
+ watchExact(metaStorageManager, FOO_KEY, updateFooKeyLatch);
+
+ assertThat(metaStorageManager.put(FOO_KEY, VALUE),
willCompleteSuccessfully());
+
+ if (changeMetastorageLeader) {
+ transferMetastorageLeadershipToAnotherNode();
+ }
+
+ assertThat(metaStorageManager.put(FOO_KEY, VALUE),
willCompleteSuccessfully());
+
+ assertTrue(updateFooKeyLatch.await(1, SECONDS));
+
+ long latestFooEntryRevision = latestKeyRevision(metaStorageManager,
FOO_KEY);
+
+ assertTrue(waitForCondition(() ->
allNodesContainsSingleRevisionForKeyLocally(FOO_KEY, latestFooEntryRevision),
10, 1_000));
+ }
+
+ private static IgniteImpl aliveNode() {
+ return unwrapIgniteImpl(CLUSTER.aliveNode());
+ }
+
+ private static boolean
allNodesContainsSingleRevisionForKeyLocally(ByteArray key, long revision) {
+ return CLUSTER.runningNodes()
+ .map(TestWrappers::unwrapIgniteImpl)
+ .map(IgniteImpl::metaStorageManager)
+ .map(metaStorageManager ->
collectRevisionsLocally(metaStorageManager, key))
+ .allMatch(keyRevisions -> keyRevisions.size() == 1 &&
keyRevisions.contains(revision));
+ }
+
+ private static void watchExact(MetaStorageManager metaStorageManager,
ByteArray key, CountDownLatch latch) {
+ metaStorageManager.registerExactWatch(key, new WatchListener() {
+ @Override
+ public CompletableFuture<Void> onUpdate(WatchEvent event) {
+ latch.countDown();
+
+ return nullCompletedFuture();
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ }
+ });
+ }
+
+ private static long latestKeyRevision(MetaStorageManager
metaStorageManager, ByteArray key) {
+ CompletableFuture<Entry> latestEntryFuture =
metaStorageManager.get(key);
+ assertThat(latestEntryFuture.thenApply(Entry::empty), willBe(false));
+
+ return latestEntryFuture.join().revision();
+ }
+
+ private static Set<Long> collectRevisionsLocally(MetaStorageManager
metaStorageManager, ByteArray key) {
+ var res = new HashSet<Long>();
+
+ for (int i = 0; i <= metaStorageManager.appliedRevision(); i++) {
+ try {
+ Entry entry = metaStorageManager.getLocally(key, i);
+
+ if (!entry.empty()) {
+ res.add(entry.revision());
+ }
+ } catch (CompactedException ignore) {
+ // Do nothing.
+ }
+ }
+
+ return res;
+ }
+
+ private void transferMetastorageLeadershipToAnotherNode() throws Exception
{
+ RaftGroupService raftGroupService =
CLUSTER.leaderServiceFor(MetastorageGroupId.INSTANCE);
+
+ String leaderConsistentId =
raftGroupService.getRaftNode().getLeaderId().getConsistentId();
+
+ for (int i = 0; i < initialNodes(); i++) {
+ if (!CLUSTER.node(i).name().equals(leaderConsistentId)) {
+ CLUSTER.transferLeadershipTo(i, MetastorageGroupId.INSTANCE);
+
+ return;
+ }
+ }
+
+ fail("Failed to change metastorage leader: " + leaderConsistentId);
+ }
+}
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java
index 2cf7549f56..fea3a10818 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java
@@ -32,7 +32,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.WatchListener;
import
org.apache.ignite.internal.metastorage.server.AbstractKeyValueStorageTest;
-import org.apache.ignite.internal.metastorage.server.OnRevisionAppliedCallback;
+import
org.apache.ignite.internal.metastorage.server.WatchEventHandlingCallback;
import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.junit.jupiter.api.AfterEach;
@@ -47,16 +47,11 @@ public abstract class
ItMetaStorageSafeTimePropagationAbstractTest extends Abstr
@BeforeEach
public void startWatches() {
- storage.startWatches(1, new OnRevisionAppliedCallback() {
+ storage.startWatches(1, new WatchEventHandlingCallback() {
@Override
public void onSafeTimeAdvanced(HybridTimestamp newSafeTime) {
time.updateSafeTime(newSafeTime);
}
-
- @Override
- public void onRevisionApplied(long revision) {
- // No-op.
- }
});
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetastorageGroupId.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/CompactionCommand.java
similarity index 56%
copy from
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetastorageGroupId.java
copy to
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/CompactionCommand.java
index 0739f719f5..f35c3820df 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetastorageGroupId.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/CompactionCommand.java
@@ -15,32 +15,18 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.metastorage.server.raft;
+package org.apache.ignite.internal.metastorage.command;
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import org.apache.ignite.internal.network.annotations.Transferable;
/**
- * Meta storage replication group id.
+ * Command to start the metastore compaction locally.
+ *
+ * <p>Invokes {@link KeyValueStorage#updateCompactionRevision} on command
processing.</p>
*/
-public enum MetastorageGroupId implements ReplicationGroupId {
- /** Meta storage group id. */
- INSTANCE("metastorage_group");
-
- /** Group id string representation. */
- private String name;
-
- /**
- * The constructor.
- *
- * @param name The string representation of the enum.
- */
- MetastorageGroupId(String name) {
- this.name = name;
- }
-
- /** {@inheritDoc} */
- @Override
- public String toString() {
- return name;
- }
+@Transferable(MetastorageCommandsMessageGroup.COMPACTION)
+public interface CompactionCommand extends MetaStorageWriteCommand {
+ /** New metastorage compaction revision. */
+ long compactionRevision();
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java
index 819d02717d..040db1118d 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java
@@ -64,4 +64,7 @@ public interface MetastorageCommandsMessageGroup {
/** Message type for {@link EvictIdempotentCommandsCacheCommand}. */
short EVICT_IDEMPOTENT_COMMAND_CACHE = 71;
+
+ /** Message type for {@link CompactionCommand}. */
+ short COMPACTION = 72;
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageCompactionTrigger.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageCompactionTrigger.java
new file mode 100644
index 0000000000..58011d242b
--- /dev/null
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageCompactionTrigger.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.impl;
+
+import static java.util.concurrent.CompletableFuture.supplyAsync;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockSafe;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Function;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.IgniteSystemProperties;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.command.CompactionCommand;
+import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import
org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker;
+import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Metastorage compaction trigger.
+ *
+ * <p>Algorithm:</p>
+ * <ol>
+ * <li>Metastorage leader waits locally for the start of scheduled
compaction.</li>
+ * <li>Metastorage leader locally calculates revision for compaction: it
takes the current safe time and subtracts the data
+ * availability time and uses that timestamp to get the revision.</li>
+ * <li>If the revision is less than or equal to the last compacted
revision, then go to point 6.</li>
+ * <li>Metastorage leader creates and sends a {@link CompactionCommand}
(see the command description what each node will do) with a new
+ * revision for compaction.</li>
+ * <li>Metastorage leader locally gets notification of the completion of
the local compaction for the new revision.</li>
+ * <li>Metastorage leader locally schedules a new start of compaction.</li>
+ * </ol>
+ */
+// TODO: IGNITE-23280 Turn on compaction
+class MetaStorageCompactionTrigger implements ManuallyCloseable {
+ private static final IgniteLogger LOG =
Loggers.forClass(MetaStorageCompactionTrigger.class);
+
+ /** System property that defines compaction start interval (in
milliseconds). Default value is {@link Long#MAX_VALUE}. */
+ public static final String COMPACTION_INTERVAL_PROPERTY =
"IGNITE_COMPACTION_INTERVAL";
+
+ /** System property that defines data availability time (in milliseconds).
Default value is {@link Long#MAX_VALUE}. */
+ public static final String COMPACTION_DATA_AVAILABILITY_TIME_PROPERTY =
"IGNITE_COMPACTION_DATA_AVAILABILITY_TIME";
+
+ private final String localNodeName;
+
+ private final KeyValueStorage storage;
+
+ private final CompletableFuture<MetaStorageServiceImpl>
metastorageServiceFuture;
+
+ private final ClusterTime clusterTime;
+
+ private final ReadOperationForCompactionTracker
readOperationForCompactionTracker;
+
+ private final ScheduledExecutorService compactionExecutor;
+
+ /** Guarded by {@link #lock}. */
+ private @Nullable ScheduledFuture<?> lastScheduledFuture;
+
+ /** Guarded by {@link #lock}. */
+ private boolean isLocalNodeLeader;
+
+ private final Lock lock = new ReentrantLock();
+
+ /** Compaction start interval (in milliseconds). */
+ // TODO: IGNITE-23279 Change configuration
+ private final long startInterval =
IgniteSystemProperties.getLong(COMPACTION_INTERVAL_PROPERTY, Long.MAX_VALUE);
+
+ /** Data availability time (in milliseconds). */
+ // TODO: IGNITE-23279 Change configuration
+ private final long dataAvailabilityTime =
IgniteSystemProperties.getLong(COMPACTION_DATA_AVAILABILITY_TIME_PROPERTY,
Long.MAX_VALUE);
+
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ private final AtomicBoolean stopGuard = new AtomicBoolean();
+
+ /**
+ * Constructor.
+ *
+ * @param localNodeName Local node name.
+ * @param storage Storage.
+ * @param metastorageServiceFuture Metastorage service future.
+ * @param clusterTime Cluster time.
+ * @param readOperationForCompactionTracker Tracker of read operations,
both local and from the leader.
+ */
+ MetaStorageCompactionTrigger(
+ String localNodeName,
+ KeyValueStorage storage,
+ CompletableFuture<MetaStorageServiceImpl> metastorageServiceFuture,
+ ClusterTime clusterTime,
+ ReadOperationForCompactionTracker readOperationForCompactionTracker
+ ) {
+ this.localNodeName = localNodeName;
+ this.storage = storage;
+ this.metastorageServiceFuture = metastorageServiceFuture;
+ this.clusterTime = clusterTime;
+ this.readOperationForCompactionTracker =
readOperationForCompactionTracker;
+
+ compactionExecutor = Executors.newSingleThreadScheduledExecutor(
+ NamedThreadFactory.create(localNodeName,
"metastorage-compaction-executor", LOG)
+ );
+ }
+
+ @Override
+ public void close() {
+ if (!stopGuard.compareAndSet(false, true)) {
+ return;
+ }
+
+ busyLock.block();
+
+ cancelLastScheduledFutureBusy();
+
+ IgniteUtils.shutdownAndAwaitTermination(compactionExecutor, 10,
TimeUnit.SECONDS);
+ }
+
+ private void doCompactionBusy() {
+ lock.lock();
+
+ try {
+ if (!isLocalNodeLeader) {
+ return;
+ }
+
+ HybridTimestamp candidateCompactionRevisionTimestamp =
createCandidateCompactionRevisionTimestampBusy();
+
+ Long newCompactionRevision =
calculateCandidateCompactionRevisionBusy(candidateCompactionRevisionTimestamp);
+
+ if (newCompactionRevision == null) {
+ scheduleNextCompactionBusy();
+ } else {
+ metastorageServiceFuture.thenCompose(svc ->
svc.sendCompactionCommand(newCompactionRevision))
+ .whenComplete((unused, throwable) -> {
+ if (throwable != null) {
+ Throwable cause = unwrapCause(throwable);
+
+ if (!(cause instanceof NodeStoppingException))
{
+ LOG.error(
+ "Unknown error occurred while
sending the metastorage compaction command: "
+ +
"[newCompactionRevision={}]",
+ cause,
+ newCompactionRevision
+ );
+
+ inBusyLockSafe(busyLock,
this::scheduleNextCompactionBusy);
+ }
+ }
+ });
+ }
+ } catch (Throwable t) {
+ LOG.error("Unknown error on new metastorage compaction revision
scheduling", t);
+
+ inBusyLockSafe(busyLock, this::scheduleNextCompactionBusy);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private HybridTimestamp createCandidateCompactionRevisionTimestampBusy() {
+ HybridTimestamp safeTime = clusterTime.currentSafeTime();
+
+ return safeTime.getPhysical() <= dataAvailabilityTime
+ ? HybridTimestamp.MIN_VALUE
+ : safeTime.subtractPhysicalTime(dataAvailabilityTime);
+ }
+
+ /** Returns {@code null} if there is no need to compact yet. */
+ private @Nullable Long
calculateCandidateCompactionRevisionBusy(HybridTimestamp candidateTimestamp) {
+ try {
+ long candidateCompactionRevision =
storage.revisionByTimestamp(candidateTimestamp);
+ long currentStorageRevision = storage.revision();
+
+ if (candidateCompactionRevision >= currentStorageRevision) {
+ candidateCompactionRevision = currentStorageRevision - 1;
+ }
+
+ return candidateCompactionRevision <=
storage.getCompactionRevision() ? null : candidateCompactionRevision;
+ } catch (CompactedException exception) {
+ // Revision has already been compacted, we need to plan the next
compaction.
+ return null;
+ }
+ }
+
+ private void scheduleNextCompactionBusy() {
+ lock.lock();
+
+ try {
+ if (isLocalNodeLeader) {
+ lastScheduledFuture = compactionExecutor.schedule(
+ () -> inBusyLock(busyLock, this::doCompactionBusy),
+ startInterval,
+ MILLISECONDS
+ );
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /** Invokes when the metastorage compaction revision is updated. */
+ void onCompactionRevisionUpdate(long compactionRevision) {
+ inBusyLockSafe(busyLock, () ->
onCompactionRevisionUpdateBusy(compactionRevision));
+ }
+
+ private void onCompactionRevisionUpdateBusy(long compactionRevision) {
+ supplyAsync(() ->
readOperationForCompactionTracker.collect(compactionRevision),
compactionExecutor)
+ .thenComposeAsync(Function.identity(), compactionExecutor)
+ .thenRunAsync(() -> storage.compact(compactionRevision),
compactionExecutor)
+ .whenComplete((unused, throwable) -> {
+ if (throwable == null) {
+ LOG.info("Metastore compaction completed successfully:
[compactionRevision={}]", compactionRevision);
+ } else {
+ Throwable cause = unwrapCause(throwable);
+
+ if (!(cause instanceof NodeStoppingException)) {
+ LOG.error(
+ "Unknown error on new metastorage
compaction revision: {}",
+ cause,
+ compactionRevision
+ );
+ }
+ }
+
+ inBusyLockSafe(busyLock, this::scheduleNextCompactionBusy);
+ });
+ }
+
+ /** Invokes when a new leader is elected. */
+ void onLeaderElected(ClusterNode newLeader) {
+ inBusyLockSafe(busyLock, () -> onLeaderElectedBusy(newLeader));
+ }
+
+ private void onLeaderElectedBusy(ClusterNode newLeader) {
+ lock.lock();
+
+ try {
+ if (localNodeName.equals(newLeader.name())) {
+ isLocalNodeLeader = true;
+
+ scheduleNextCompactionBusy();
+ } else {
+ isLocalNodeLeader = false;
+
+ cancelLastScheduledFutureBusy();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void cancelLastScheduledFutureBusy() {
+ lock.lock();
+
+ try {
+ ScheduledFuture<?> lastScheduledFuture = this.lastScheduledFuture;
+
+ if (lastScheduledFuture != null) {
+ lastScheduledFuture.cancel(true);
+ }
+
+ this.lastScheduledFuture = null;
+ } finally {
+ lock.unlock();
+ }
+ }
+}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index 000e9bdea0..42006ec065 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -71,8 +71,8 @@ import
org.apache.ignite.internal.metastorage.dsl.StatementResult;
import
org.apache.ignite.internal.metastorage.impl.raft.MetaStorageSnapshotStorageFactory;
import org.apache.ignite.internal.metastorage.metrics.MetaStorageMetricSource;
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
-import org.apache.ignite.internal.metastorage.server.OnRevisionAppliedCallback;
import
org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker;
+import
org.apache.ignite.internal.metastorage.server.WatchEventHandlingCallback;
import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
@@ -192,6 +192,8 @@ public class MetaStorageManagerImpl implements
MetaStorageManager, MetastorageGr
/** Tracks only reads from the leader, local reads are tracked by the
storage itself. */
private final ReadOperationForCompactionTracker
readOperationFromLeaderForCompactionTracker;
+ private final MetaStorageCompactionTrigger metaStorageCompactionTrigger;
+
/**
* The constructor.
*
@@ -234,6 +236,16 @@ public class MetaStorageManagerImpl implements
MetaStorageManager, MetastorageGr
this.readOperationFromLeaderForCompactionTracker =
readOperationForCompactionTracker;
learnerManager = new MetaStorageLearnerManager(busyLock,
logicalTopologyService, metaStorageSvcFut);
+
+ metaStorageCompactionTrigger = new MetaStorageCompactionTrigger(
+ clusterService.nodeName(),
+ storage,
+ metaStorageSvcFut,
+ clusterTime,
+ readOperationForCompactionTracker
+ );
+
+ electionListeners.add(metaStorageCompactionTrigger::onLeaderElected);
}
/**
@@ -707,6 +719,7 @@ public class MetaStorageManagerImpl implements
MetaStorageManager, MetastorageGr
try {
IgniteUtils.closeAllManually(
+ metaStorageCompactionTrigger,
() ->
metricManager.unregisterSource(metaStorageMetricSource),
clusterTime,
() -> cancelOrConsume(metaStorageSvcFut,
MetaStorageServiceImpl::close),
@@ -754,7 +767,7 @@ public class MetaStorageManagerImpl implements
MetaStorageManager, MetastorageGr
try {
return recoveryFinishedFuture
.thenAccept(revision -> inBusyLock(busyLock, () -> {
- storage.startWatches(revision + 1, new
OnRevisionAppliedCallback() {
+ storage.startWatches(revision + 1, new
WatchEventHandlingCallback() {
@Override
public void onSafeTimeAdvanced(HybridTimestamp
newSafeTime) {
MetaStorageManagerImpl.this.onSafeTimeAdvanced(newSafeTime);
@@ -764,6 +777,11 @@ public class MetaStorageManagerImpl implements
MetaStorageManager, MetastorageGr
public void onRevisionApplied(long revision) {
MetaStorageManagerImpl.this.onRevisionApplied(revision);
}
+
+ @Override
+ public void onCompactionRevisionUpdated(long
compactionRevision) {
+
metaStorageCompactionTrigger.onCompactionRevisionUpdate(compactionRevision);
+ }
});
}))
.whenComplete((v, e) -> {
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
index 90703ee62d..dbd4e3f1a7 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.command.CompactionCommand;
import
org.apache.ignite.internal.metastorage.command.EvictIdempotentCommandsCacheCommand;
import org.apache.ignite.internal.metastorage.command.GetAllCommand;
import org.apache.ignite.internal.metastorage.command.GetCommand;
@@ -358,4 +359,19 @@ public class MetaStorageServiceImpl implements
MetaStorageService {
) {
return
commandsFactory.evictIdempotentCommandsCacheCommand().evictionTimestamp(evictionTimestamp).initiatorTime(ts).build();
}
+
+ /**
+ * Sends command {@link CompactionCommand} to the leader.
+ *
+ * @param compactionRevision New metastorage compaction revision.
+ * @return Operation future.
+ */
+ CompletableFuture<Void> sendCompactionCommand(long compactionRevision) {
+ CompactionCommand command =
context.commandsFactory().compactionCommand()
+ .compactionRevision(compactionRevision)
+ .initiatorTime(clusterTime.now())
+ .build();
+
+ return context.raftService().run(command);
+ }
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
index f24850613e..c2865b1e30 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
@@ -17,14 +17,12 @@
package org.apache.ignite.internal.metastorage.server;
-import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
import static
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.NOT_FOUND;
import static
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.assertCompactionRevisionLessThanCurrent;
import static
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.isLastIndex;
import static
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.maxRevisionIndex;
import static
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.minRevisionIndex;
import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix;
-import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import java.util.ArrayList;
import java.util.Arrays;
@@ -33,16 +31,13 @@ import java.util.Comparator;
import java.util.List;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.LongConsumer;
import java.util.function.Predicate;
-import org.apache.ignite.internal.failure.FailureContext;
import org.apache.ignite.internal.failure.FailureManager;
-import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.Entry;
@@ -93,29 +88,22 @@ public abstract class AbstractKeyValueStorage implements
KeyValueStorage {
/** Tracks only cursors, since reading a single entry or a batch is done
entirely under {@link #rwLock}. */
protected final ReadOperationForCompactionTracker
readOperationForCompactionTracker;
- protected final ExecutorService compactionExecutor;
-
- private final List<CompactionListener> compactionListeners = new
CopyOnWriteArrayList<>();
-
/**
* Constructor.
*
* @param nodeName Node name.
* @param failureManager Failure processor that is used to handle critical
errors.
* @param readOperationForCompactionTracker Read operation tracker for
metastorage compaction.
- * @param compactionExecutor Metastorage compaction executor.
*/
protected AbstractKeyValueStorage(
String nodeName,
FailureManager failureManager,
- ReadOperationForCompactionTracker
readOperationForCompactionTracker,
- ExecutorService compactionExecutor
+ ReadOperationForCompactionTracker readOperationForCompactionTracker
) {
this.failureManager = failureManager;
this.readOperationForCompactionTracker =
readOperationForCompactionTracker;
- this.compactionExecutor = compactionExecutor;
- this.watchProcessor = new WatchProcessor(nodeName, this::get,
failureManager);
+ watchProcessor = new WatchProcessor(nodeName, this::get,
failureManager);
}
/** Returns the key revisions for operation, an empty array if not found.
*/
@@ -197,6 +185,23 @@ public abstract class AbstractKeyValueStorage implements
KeyValueStorage {
}
}
+ @Override
+ public void saveCompactionRevision(long revision, KeyValueUpdateContext
context) {
+ assert revision >= 0 : revision;
+
+ rwLock.writeLock().lock();
+
+ try {
+ assertCompactionRevisionLessThanCurrent(revision, rev);
+
+ saveCompactionRevision(revision, context, true);
+ } finally {
+ rwLock.writeLock().unlock();
+ }
+ }
+
+ protected abstract void saveCompactionRevision(long compactionRevision,
KeyValueUpdateContext context, boolean advanceSafeTime);
+
@Override
public void setCompactionRevision(long revision) {
assert revision >= 0 : revision;
@@ -224,7 +229,7 @@ public abstract class AbstractKeyValueStorage implements
KeyValueStorage {
}
@Override
- public void startCompaction(long compactionRevision) {
+ public void updateCompactionRevision(long compactionRevision,
KeyValueUpdateContext context) {
assert compactionRevision >= 0 : compactionRevision;
rwLock.writeLock().lock();
@@ -232,26 +237,12 @@ public abstract class AbstractKeyValueStorage implements
KeyValueStorage {
try {
assertCompactionRevisionLessThanCurrent(compactionRevision, rev);
+ saveCompactionRevision(compactionRevision, context, false);
+
if (isInRecoveryState()) {
- this.compactionRevision = compactionRevision;
+ setCompactionRevision(compactionRevision);
} else {
- watchProcessor
- .addTaskToWatchEventQueue(() ->
setCompactionRevision(compactionRevision))
- .thenComposeAsync(unused ->
readOperationsFuture(compactionRevision), compactionExecutor)
- .thenRunAsync(() -> compact(compactionRevision),
compactionExecutor)
- .whenCompleteAsync((unused, throwable) -> {
- if (throwable == null) {
- log.info("Metastore compaction completed
successfully: [compactionRevision={}]", compactionRevision);
- } else {
- log.error(
- "Metastore compaction failed:
[compactionRevision={}]",
- unwrapCause(throwable),
- compactionRevision
- );
- }
-
-
notifyCompleteCompactionLocally(this.compactionRevision, throwable);
- }, compactionExecutor);
+ watchProcessor.updateCompactionRevision(compactionRevision,
context.timestamp);
}
} finally {
rwLock.writeLock().unlock();
@@ -333,21 +324,6 @@ public abstract class AbstractKeyValueStorage implements
KeyValueStorage {
watchProcessor.addWatch(new Watch(rev, listener, exactPredicate));
}
- @Override
- public CompletableFuture<Void> readOperationsFuture(long
compactionRevisionExcluded) {
- return
readOperationForCompactionTracker.collect(compactionRevisionExcluded);
- }
-
- @Override
- public void registerCompactionListener(CompactionListener listener) {
- compactionListeners.add(listener);
- }
-
- @Override
- public void unregisterCompactionListener(CompactionListener listener) {
- compactionListeners.remove(listener);
- }
-
/** Notifies of revision update. Must be called under the {@link #rwLock}.
*/
protected void notifyRevisionUpdate() {
if (recoveryRevisionListener != null) {
@@ -441,25 +417,24 @@ public abstract class AbstractKeyValueStorage implements
KeyValueStorage {
return res;
}
- private void notifyCompleteCompactionLocally(long compactionRevision,
@Nullable Throwable throwable) {
- if (throwable != null) {
- doCriticalErrorIfNotNodeStoppingException(throwable);
- }
+ protected WatchEventHandlingCallback
createWrapper(WatchEventHandlingCallback callback) {
+ return new WatchEventHandlingCallback() {
+ @Override
+ public void onSafeTimeAdvanced(HybridTimestamp newSafeTime) {
+ callback.onSafeTimeAdvanced(newSafeTime);
+ }
- for (CompactionListener listener : compactionListeners) {
- try {
- listener.onCompactionCompleteLocally(compactionRevision);
- } catch (Throwable t) {
- doCriticalErrorIfNotNodeStoppingException(t);
+ @Override
+ public void onRevisionApplied(long revision) {
+ callback.onRevisionApplied(revision);
}
- }
- }
- private void doCriticalErrorIfNotNodeStoppingException(Throwable
throwable) {
- throwable = unwrapCause(throwable);
+ @Override
+ public void onCompactionRevisionUpdated(long compactionRevision) {
+ setCompactionRevision(compactionRevision);
- if (!(throwable instanceof NodeStoppingException)) {
- failureManager.process(new FailureContext(CRITICAL_ERROR,
throwable));
- }
+ callback.onCompactionRevisionUpdated(compactionRevision);
+ }
+ };
}
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index 268cf73028..e278e74850 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -378,10 +378,9 @@ public interface KeyValueStorage extends ManuallyCloseable
{
* <p>Before calling this method, watches will not receive any updates.</p>
*
* @param startRevision Revision to start processing updates from.
- * @param revisionCallback Callback that will be invoked after all watches
of a particular revision are processed, with the
- * revision and modified entries (processed by at least one watch)
as its argument.
+ * @param callback Watch event handling callback.
*/
- void startWatches(long startRevision, OnRevisionAppliedCallback
revisionCallback);
+ void startWatches(long startRevision, WatchEventHandlingCallback callback);
/**
* Unregisters a watch listener.
@@ -424,28 +423,6 @@ public interface KeyValueStorage extends ManuallyCloseable
{
*/
void compact(long revision);
- /**
- * Starts local compaction of metastorage.
- *
- * <p>Algorithm:</p>
- * <ul>
- * <li>If the storage is in a recovery state ({@link #startWatches all
registered watches not started}), then
- * {@link #setCompactionRevision} is invoked and the current method is
completed.</li>
- * <li>Otherwise, a new task (A) is added to the WatchEvent queue and
the current method is completed.</li>
- * <li>Task (A) invokes {@link #setCompactionRevision} and adds a new
task (B) to the compaction thread pool and completes.</li>
- * <li>Task (B) collects all read operations from metastorage (local
and from the leader {@link #readOperationsFuture}) and starts
- * asynchronously waiting for their completion.</li>
- * <li>Then {@link #compact} is invoked at the compaction thread
pool.</li>
- * <li>Upon completion there will be a notification via {@link
CompactionListener#onCompactionCompleteLocally} for
- * {@link #registerCompactionListener registered} listeners.</li>
- * </ul>
- *
- * <p>Compaction revision is expected to be less than the {@link #revision
current storage revision}.</p>
- *
- * @param compactionRevision Compaction revision.
- */
- void startCompaction(long compactionRevision);
-
/**
* Signals the need to stop local metastorage compaction as soon as
possible. For example, due to a node stopping.
*
@@ -561,21 +538,25 @@ public interface KeyValueStorage extends
ManuallyCloseable {
long getCompactionRevision();
/**
- * Returns a future that will complete when all read operations that were
started before {@code compactionRevisionExcluded}.
+ * Updates the metastorage compaction revision.
*
- * <p>Current method is expected to be invoked after {@link
#setCompactionRevision} on the same revision.</p>
+ * <p>Algorithm:</p>
+ * <ul>
+ * <li>Invokes {@link #saveCompactionRevision}.</li>
+ * <li>If the storage is in a recovery state ({@link #startWatches all
registered watches not started}), then
+ * {@link #setCompactionRevision} is invoked and the current method is
completed.</li>
+ * <li>Otherwise, a new task (A) is added to the WatchEvent queue and
the current method is completed.</li>
+ * <li>Task (A) invokes {@link #setCompactionRevision} and invokes
+ * {@link WatchEventHandlingCallback#onCompactionRevisionUpdated}.</li>
+ * </ul>
*
- * <p>Future completes without exception.</p>
+ * <p>Compaction revision is expected to be less than the {@link #revision
current storage revision}.</p>
*
- * @param compactionRevisionExcluded Compaction revision of interest.
+ * @param revision Compaction revision to update.
+ * @param context Operation's context.
+ * @throws MetaStorageException If there is an error while saving a
compaction revision.
*/
- CompletableFuture<Void> readOperationsFuture(long
compactionRevisionExcluded);
-
- /** Adds a metastore compaction listener. */
- void registerCompactionListener(CompactionListener listener);
-
- /** Removes a metastore compaction listener. */
- void unregisterCompactionListener(CompactionListener listener);
+ void updateCompactionRevision(long revision, KeyValueUpdateContext
context);
/**
* Returns checksum corresponding to the revision.
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueUpdateContext.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueUpdateContext.java
index 9e49da4c8b..2dd7dfe3e7 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueUpdateContext.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueUpdateContext.java
@@ -18,6 +18,8 @@
package org.apache.ignite.internal.metastorage.server;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.tostring.IgniteToStringInclude;
+import org.apache.ignite.internal.tostring.S;
import org.jetbrains.annotations.TestOnly;
/**
@@ -26,7 +28,10 @@ import org.jetbrains.annotations.TestOnly;
*/
public class KeyValueUpdateContext {
public final long index;
+
public final long term;
+
+ @IgniteToStringInclude
public final HybridTimestamp timestamp;
/**
@@ -44,6 +49,11 @@ public class KeyValueUpdateContext {
this.timestamp = timestamp;
}
+ @Override
+ public String toString() {
+ return S.toString(this);
+ }
+
/**
* Returns a context instance with {@code 0} index and term values.
*/
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchEventHandlingCallback.java
similarity index 71%
rename from
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java
rename to
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchEventHandlingCallback.java
index 48a07367bf..8c46d9b87c 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchEventHandlingCallback.java
@@ -19,23 +19,33 @@ package org.apache.ignite.internal.metastorage.server;
import org.apache.ignite.internal.hlc.HybridTimestamp;
-/**
- * Interface for declaring callbacks that get called after all Meta Storage
watches have been notified of a particular revision
- * and/or when SafeTime gets advanced.
- */
-public interface OnRevisionAppliedCallback {
+/** Watch event handling callback. */
+public interface WatchEventHandlingCallback {
/**
* Invoked whenever MetaStorage Safe Time gets advanced (either because a
write command is applied,
* together with all watches that process it, or because idle safe time
mechanism advanced Safe Time).
*
* @param newSafeTime New safe time value.
*/
- void onSafeTimeAdvanced(HybridTimestamp newSafeTime);
+ default void onSafeTimeAdvanced(HybridTimestamp newSafeTime) {
+ // No-op.
+ }
/**
* Notifies of completion of processing of Meta Storage watches for a
particular revision.
*
* @param revision Latest applied meta-storage revision.
*/
- void onRevisionApplied(long revision);
+ default void onRevisionApplied(long revision) {
+ // No-op.
+ }
+
+ /**
+ * Invokes when the metastorage compaction revision has been updated in
the WatchEvent queue.
+ *
+ * @param compactionRevision New metastorage compaction revision.
+ */
+ default void onCompactionRevisionUpdated(long compactionRevision) {
+ // No-op.
+ }
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
index 68e6e57d5d..c717b9420b 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
@@ -94,8 +94,7 @@ public class WatchProcessor implements ManuallyCloseable {
private final EntryReader entryReader;
- /** Callback that gets notified after a {@link WatchEvent} has been
processed by a registered watch. */
- private volatile OnRevisionAppliedCallback revisionCallback;
+ private volatile WatchEventHandlingCallback watchEventHandlingCallback;
/** Executor for processing watch events. */
private final ExecutorService watchExecutor;
@@ -141,13 +140,11 @@ public class WatchProcessor implements ManuallyCloseable {
.min();
}
- /**
- * Sets the callback that will be executed every time after watches have
been notified of a particular revision.
- */
- public void setRevisionCallback(OnRevisionAppliedCallback
revisionCallback) {
- assert this.revisionCallback == null;
+ /** Sets the watch event handling callback. */
+ public void setWatchEventHandlingCallback(WatchEventHandlingCallback
callback) {
+ assert this.watchEventHandlingCallback == null;
- this.revisionCallback = revisionCallback;
+ this.watchEventHandlingCallback = callback;
}
/**
@@ -327,9 +324,9 @@ public class WatchProcessor implements ManuallyCloseable {
}
private void invokeOnRevisionCallback(long revision, HybridTimestamp time)
{
- revisionCallback.onSafeTimeAdvanced(time);
+ watchEventHandlingCallback.onSafeTimeAdvanced(time);
- revisionCallback.onRevisionApplied(revision);
+ watchEventHandlingCallback.onRevisionApplied(revision);
}
/**
@@ -341,7 +338,7 @@ public class WatchProcessor implements ManuallyCloseable {
assert time != null;
notificationFuture = notificationFuture
- .thenRunAsync(() -> revisionCallback.onSafeTimeAdvanced(time),
watchExecutor)
+ .thenRunAsync(() ->
watchEventHandlingCallback.onSafeTimeAdvanced(time), watchExecutor)
.whenComplete((ignored, e) -> {
if (e != null) {
failureManager.process(new
FailureContext(CRITICAL_ERROR, e));
@@ -383,15 +380,24 @@ public class WatchProcessor implements ManuallyCloseable {
}
/**
- * Returns a future that will complete when the task in the WatchEvent
queue is complete.
+ * Updates the metastorage compaction revision in the WatchEvent queue.
*
* <p>This method is not thread-safe and must be performed under an
exclusive lock in concurrent scenarios.</p>
+ *
+ * @param compactionRevision New metastorage compaction revision.
+ * @param time Metastorage compaction revision update timestamp.
*/
- public CompletableFuture<Void> addTaskToWatchEventQueue(Runnable task) {
- CompletableFuture<Void> future = notificationFuture.thenRunAsync(task,
watchExecutor);
-
- notificationFuture = future;
+ public void updateCompactionRevision(long compactionRevision,
HybridTimestamp time) {
+ notificationFuture = notificationFuture
+ .thenRunAsync(() -> {
+
watchEventHandlingCallback.onCompactionRevisionUpdated(compactionRevision);
- return future;
+ watchEventHandlingCallback.onSafeTimeAdvanced(time);
+ }, watchExecutor)
+ .whenComplete((ignored, e) -> {
+ if (e != null) {
+ failureManager.process(new
FailureContext(CRITICAL_ERROR, e));
+ }
+ });
}
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index dcee785092..6e42e4ba7b 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -71,7 +71,6 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.internal.failure.FailureManager;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.ByteArray;
-import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.CommandId;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.dsl.Operation;
@@ -88,10 +87,10 @@ import org.apache.ignite.internal.metastorage.server.If;
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import org.apache.ignite.internal.metastorage.server.KeyValueUpdateContext;
import org.apache.ignite.internal.metastorage.server.MetastorageChecksum;
-import org.apache.ignite.internal.metastorage.server.OnRevisionAppliedCallback;
import
org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker;
import org.apache.ignite.internal.metastorage.server.Statement;
import org.apache.ignite.internal.metastorage.server.Value;
+import
org.apache.ignite.internal.metastorage.server.WatchEventHandlingCallback;
import org.apache.ignite.internal.raft.IndexWithTerm;
import org.apache.ignite.internal.rocksdb.ColumnFamily;
import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
@@ -266,12 +265,7 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
super(
nodeName,
failureManager,
- readOperationForCompactionTracker,
- Executors.newSingleThreadExecutor(NamedThreadFactory.create(
- nodeName,
- "metastorage-compaction-executor",
- Loggers.forClass(RocksDbKeyValueStorage.class)
- ))
+ readOperationForCompactionTracker
);
this.dbPath = dbPath;
@@ -421,7 +415,6 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
watchProcessor.close();
IgniteUtils.shutdownAndAwaitTermination(snapshotExecutor, 10,
TimeUnit.SECONDS);
- IgniteUtils.shutdownAndAwaitTermination(compactionExecutor, 10,
TimeUnit.SECONDS);
rwLock.writeLock().lock();
try {
@@ -867,7 +860,7 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
}
@Override
- public void startWatches(long startRevision, OnRevisionAppliedCallback
revisionCallback) {
+ public void startWatches(long startRevision, WatchEventHandlingCallback
callback) {
assert startRevision > 0 : startRevision;
long currentRevision;
@@ -875,7 +868,7 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
rwLock.readLock().lock();
try {
- watchProcessor.setRevisionCallback(revisionCallback);
+
watchProcessor.setWatchEventHandlingCallback(createWrapper(callback));
currentRevision = rev;
@@ -1278,27 +1271,19 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
}
@Override
- public void saveCompactionRevision(long revision, KeyValueUpdateContext
context) {
- assert revision >= 0 : revision;
-
- rwLock.writeLock().lock();
-
+ protected void saveCompactionRevision(long revision, KeyValueUpdateContext
context, boolean advanceSafeTime) {
try (WriteBatch batch = new WriteBatch()) {
- assertCompactionRevisionLessThanCurrent(revision, rev);
-
data.put(batch, COMPACTION_REVISION_KEY, longToBytes(revision));
addIndexAndTermToWriteBatch(batch, context);
db.write(defaultWriteOptions, batch);
- if (!isInRecoveryState()) {
+ if (advanceSafeTime && !isInRecoveryState()) {
watchProcessor.advanceSafeTime(context.timestamp);
}
} catch (Throwable t) {
throw new MetaStorageException(COMPACTION_ERR, "Error saving
compaction revision: " + revision, t);
- } finally {
- rwLock.writeLock().unlock();
}
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
index cbc49a8581..a2947cf298 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
@@ -68,21 +68,13 @@ public class MetaStorageListener implements
RaftGroupListener, BeforeApplyHandle
private final RaftGroupConfigurationConverter configurationConverter = new
RaftGroupConfigurationConverter();
- /**
- * Constructor.
- *
- * @param storage Storage.
- */
+ /** Constructor. */
@TestOnly
public MetaStorageListener(KeyValueStorage storage, ClusterTimeImpl
clusterTime) {
this(storage, clusterTime, newConfig -> {});
}
- /**
- * Constructor.
- *
- * @param storage Storage.
- */
+ /** Constructor. */
public MetaStorageListener(
KeyValueStorage storage,
ClusterTimeImpl clusterTime,
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
index 5075111d7d..a97a9a60de 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.CommandId;
import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.command.CompactionCommand;
import
org.apache.ignite.internal.metastorage.command.EvictIdempotentCommandsCacheCommand;
import org.apache.ignite.internal.metastorage.command.IdempotentCommand;
import org.apache.ignite.internal.metastorage.command.InvokeCommand;
@@ -184,7 +185,7 @@ public class MetaStorageWriteHandler {
private void handleWriteWithTime(CommandClosure<WriteCommand> clo,
MetaStorageWriteCommand command, long index, long term) {
HybridTimestamp opTime = command.safeTime();
- KeyValueUpdateContext context = new KeyValueUpdateContext(index, term,
opTime);
+ var context = new KeyValueUpdateContext(index, term, opTime);
if (command instanceof PutCommand) {
PutCommand putCmd = (PutCommand) command;
@@ -227,6 +228,14 @@ public class MetaStorageWriteHandler {
evictIdempotentCommandsCache(cmd.evictionTimestamp(), context);
clo.result(null);
+ } else if (command instanceof CompactionCommand) {
+ CompactionCommand cmd = (CompactionCommand) command;
+
+ storage.updateCompactionRevision(cmd.compactionRevision(),
context);
+
+ clo.result(null);
+ } else {
+ throw new AssertionError(String.format("Unsupported command:
[context=%s, command=%s]", context, command));
}
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetastorageGroupId.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetastorageGroupId.java
index 0739f719f5..e2f7535ac5 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetastorageGroupId.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetastorageGroupId.java
@@ -27,7 +27,7 @@ public enum MetastorageGroupId implements ReplicationGroupId {
INSTANCE("metastorage_group");
/** Group id string representation. */
- private String name;
+ private final String name;
/**
* The constructor.
@@ -38,7 +38,6 @@ public enum MetastorageGroupId implements ReplicationGroupId {
this.name = name;
}
- /** {@inheritDoc} */
@Override
public String toString() {
return name;
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
index d298ebef39..bc8416d84d 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
@@ -26,7 +26,6 @@ import static
org.apache.ignite.internal.metastorage.dsl.Operations.put;
import static org.apache.ignite.internal.metastorage.dsl.Operations.remove;
import static
org.apache.ignite.internal.metastorage.server.KeyValueUpdateContext.kvContext;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutFast;
-import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.IgniteUtils.closeAllManually;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -52,6 +51,7 @@ import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
import org.apache.ignite.internal.metastorage.impl.CommandIdGenerator;
+import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
import org.apache.ignite.internal.metastorage.server.ExistenceCondition.Type;
import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
import org.apache.ignite.internal.raft.IndexWithTerm;
@@ -60,6 +60,7 @@ import
org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -85,6 +86,9 @@ public abstract class AbstractCompactionKeyValueStorageTest
extends AbstractKeyV
private final ClusterTimeImpl clusterTime = new ClusterTimeImpl(NODE_NAME,
new IgniteSpinBusyLock(), clock);
+ private final PendingComparableValuesTracker<Long, Void>
updateCompactionRevisionInWatchEvenQueue
+ = new PendingComparableValuesTracker<>(Long.MIN_VALUE);
+
ReadOperationForCompactionTracker readOperationForCompactionTracker = new
ReadOperationForCompactionTracker();
@Override
@@ -282,7 +286,7 @@ public abstract class AbstractCompactionKeyValueStorageTest
extends AbstractKeyV
HybridTimestamp now0 = clock.now();
HybridTimestamp now1 = clock.now();
- startListenUpdateSafeTime();
+ startWatches();
storage.saveCompactionRevision(0, new KeyValueUpdateContext(1, 1,
now0));
assertEquals(-1, storage.getCompactionRevision());
@@ -936,12 +940,12 @@ public abstract class
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
@Test
void testReadOperationsFutureWithoutReadOperations() {
- assertTrue(storage.readOperationsFuture(0).isDone());
- assertTrue(storage.readOperationsFuture(1).isDone());
+ assertTrue(readOperationForCompactionTracker.collect(0).isDone());
+ assertTrue(readOperationForCompactionTracker.collect(1).isDone());
}
/**
- * Tests {@link KeyValueStorage#readOperationsFuture} as expected in use.
+ * Tests tracking only read operations from the storage, reading from the
leader tracks {@link MetaStorageManagerImpl} itself.
* <ul>
* <li>Create read operations, we only need cursors since reading one
entry or a batch is synchronized with
* {@link KeyValueStorage#setCompactionRevision}.</li>
@@ -959,7 +963,7 @@ public abstract class AbstractCompactionKeyValueStorageTest
extends AbstractKeyV
try {
storage.setCompactionRevision(3);
- CompletableFuture<Void> readOperationsFuture =
storage.readOperationsFuture(3);
+ CompletableFuture<Void> readOperationsFuture =
readOperationForCompactionTracker.collect(3);
assertFalse(readOperationsFuture.isDone());
range0.stream().forEach(entry -> {});
@@ -980,7 +984,7 @@ public abstract class AbstractCompactionKeyValueStorageTest
extends AbstractKeyV
/**
* Tests that cursors created after {@link
KeyValueStorage#setCompactionRevision} will not affect future from
- * {@link KeyValueStorage#readOperationsFuture} on a new compaction
revision.
+ * {@link ReadOperationForCompactionTracker#collect} on a new compaction
revision.
*/
@Test
void testReadOperationsFutureForReadOperationAfterSetCompactionRevision()
throws Exception {
@@ -994,7 +998,7 @@ public abstract class AbstractCompactionKeyValueStorageTest
extends AbstractKeyV
rangeAfterSetCompactionRevision0 = storage.range(FOO_KEY, FOO_KEY);
rangeAfterSetCompactionRevision1 = storage.range(FOO_KEY, FOO_KEY,
5);
- CompletableFuture<Void> readOperationsFuture =
storage.readOperationsFuture(3);
+ CompletableFuture<Void> readOperationsFuture =
readOperationForCompactionTracker.collect(3);
assertFalse(readOperationsFuture.isDone());
rangeBeforeSetCompactionRevision.close();
@@ -1011,56 +1015,24 @@ public abstract class
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
}
}
- /** Tests {@link KeyValueStorage#startCompaction} case from method
description when storage is in recovery state. */
+ /** Tests {@link KeyValueStorage#updateCompactionRevision} case from
method description when storage is in recovery state. */
@Test
- void testStartCompactionWithoutStartWatches() {
- var completeCompactionFuture = new CompletableFuture<Long>();
- storage.registerCompactionListener(completeCompactionFuture::complete);
-
- storage.startCompaction(3);
- assertEquals(3, storage.getCompactionRevision());
-
- assertThat(completeCompactionFuture, willTimeoutFast());
+ void testUpdateCompactionRevisionWithoutStartWatches() {
+ storage.updateCompactionRevision(1, kvContext(clock.now()));
+ assertEquals(1, storage.getCompactionRevision());
- // Let's make sure that nothing happens after the watch starts.
- startWatches();
- assertEquals(3, storage.getCompactionRevision());
- assertThat(completeCompactionFuture, willTimeoutFast());
+ assertThat(updateCompactionRevisionInWatchEvenQueue.waitFor(1L),
willTimeoutFast());
}
- /** Tests {@link KeyValueStorage#startCompaction} case from method
description when storage is <b>not</b> in recovery state. */
+ /** Tests {@link KeyValueStorage#updateCompactionRevision} case from
method description when storage is <b>not</b> in recovery state. */
@Test
- void testStartCompaction() {
- var completeCompactionFuture = new CompletableFuture<Long>();
- storage.registerCompactionListener(completeCompactionFuture::complete);
-
- var startHandleWatchEventFuture = new CompletableFuture<Void>();
- var finishHandleWatchEventFuture = new CompletableFuture<Void>();
-
- watchExact(FOO_KEY, startHandleWatchEventFuture,
finishHandleWatchEventFuture);
+ void testUpdateCompactionRevision() {
startWatches();
- storage.put(FOO_KEY, SOME_VALUE, kvContext(clock.now()));
- assertThat(startHandleWatchEventFuture, willCompleteSuccessfully());
-
- storage.startCompaction(3);
-
- Cursor<Entry> rangeCursor = storage.range(FOO_KEY, FOO_KEY);
+ storage.updateCompactionRevision(1, kvContext(clock.now()));
- // Since we blocked the WatchEvent queue, we can't complete the
compaction.
- assertThat(completeCompactionFuture, willTimeoutFast());
- assertEquals(-1, storage.getCompactionRevision());
-
- finishHandleWatchEventFuture.complete(null);
-
- // We have unlocked the WatchEvent queue, but the cursor has not yet
been closed.
- assertThat(completeCompactionFuture, willTimeoutFast());
- assertEquals(3, storage.getCompactionRevision());
-
- rangeCursor.close();
-
- assertThat(completeCompactionFuture, willBe(3L));
- assertEquals(3, storage.getCompactionRevision());
+ assertThat(updateCompactionRevisionInWatchEvenQueue.waitFor(1L),
willCompleteSuccessfully());
+ assertEquals(1, storage.getCompactionRevision());
}
private List<Integer> collectRevisions(byte[] key) {
@@ -1146,23 +1118,20 @@ public abstract class
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
);
}
- private void startListenUpdateSafeTime() {
- storage.startWatches(storage.revision() + 1, new
OnRevisionAppliedCallback() {
+ private void startWatches() {
+ storage.startWatches(storage.revision() + 1, new
WatchEventHandlingCallback() {
@Override
public void onSafeTimeAdvanced(HybridTimestamp newSafeTime) {
clusterTime.updateSafeTime(newSafeTime);
}
@Override
- public void onRevisionApplied(long revision) {
+ public void onCompactionRevisionUpdated(long compactionRevision) {
+
updateCompactionRevisionInWatchEvenQueue.update(compactionRevision, null);
}
});
}
- private void startWatches() {
- startListenUpdateSafeTime();
- }
-
private void watchExact(
byte[] key,
CompletableFuture<Void> startHandleWatchEventFuture,
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
index 9810f5d3c4..898027b10e 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
@@ -1611,17 +1611,7 @@ public abstract class BasicOperationsKeyValueStorageTest
extends AbstractKeyValu
long appliedRevision = storage.revision();
- storage.startWatches(1, new OnRevisionAppliedCallback() {
- @Override
- public void onSafeTimeAdvanced(HybridTimestamp newSafeTime) {
- // No-op.
- }
-
- @Override
- public void onRevisionApplied(long revision) {
- // No-op.
- }
- });
+ storage.startWatches(1, new WatchEventHandlingCallback() {});
CompletableFuture<byte[]> fut = new CompletableFuture<>();
@@ -1938,7 +1928,7 @@ public abstract class BasicOperationsKeyValueStorageTest
extends AbstractKeyValu
storage.watchExact(key, 1, mockListener2);
storage.watchExact(key, 1, mockListener3);
- OnRevisionAppliedCallback mockCallback =
mock(OnRevisionAppliedCallback.class);
+ WatchEventHandlingCallback mockCallback =
mock(WatchEventHandlingCallback.class);
storage.startWatches(1, mockCallback);
@@ -2256,7 +2246,7 @@ public abstract class BasicOperationsKeyValueStorageTest
extends AbstractKeyValu
}
});
- storage.startWatches(1, new OnRevisionAppliedCallback() {
+ storage.startWatches(1, new WatchEventHandlingCallback() {
@Override
public void onSafeTimeAdvanced(HybridTimestamp newSafeTime) {
// No-op.
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java
index d7e1f4c90c..8b04f25eed 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java
@@ -58,11 +58,11 @@ public class WatchProcessorTest extends
BaseIgniteAbstractTest {
WatchProcessorTest::oldEntry,
mock(FailureManager.class));
- private final OnRevisionAppliedCallback revisionCallback =
mock(OnRevisionAppliedCallback.class);
+ private final WatchEventHandlingCallback watchEventHandlingCallback =
mock(WatchEventHandlingCallback.class);
@BeforeEach
void setUp() {
- watchProcessor.setRevisionCallback(revisionCallback);
+
watchProcessor.setWatchEventHandlingCallback(watchEventHandlingCallback);
}
@AfterEach
@@ -94,7 +94,7 @@ public class WatchProcessorTest extends
BaseIgniteAbstractTest {
verify(listener1).onUpdate(new WatchEvent(entryEvent1));
verify(listener2).onUpdate(new WatchEvent(entryEvent2));
- verify(revisionCallback).onRevisionApplied(1L);
+ verify(watchEventHandlingCallback).onRevisionApplied(1L);
}
/**
@@ -121,7 +121,7 @@ public class WatchProcessorTest extends
BaseIgniteAbstractTest {
verify(listener1).onUpdate(event);
- verify(revisionCallback).onRevisionApplied(1L);
+ verify(watchEventHandlingCallback).onRevisionApplied(1L);
ts = new HybridTimestamp(2, 3);
@@ -133,7 +133,7 @@ public class WatchProcessorTest extends
BaseIgniteAbstractTest {
verify(listener2).onUpdate(event);
- verify(revisionCallback).onRevisionApplied(2L);
+ verify(watchEventHandlingCallback).onRevisionApplied(2L);
}
/**
@@ -161,7 +161,7 @@ public class WatchProcessorTest extends
BaseIgniteAbstractTest {
verify(listener2).onUpdate(new WatchEvent(new
EntryEvent(oldEntry(entry2), entry2)));
verify(listener2).onError(any(IllegalStateException.class));
- verify(revisionCallback, never()).onRevisionApplied(anyLong());
+ verify(watchEventHandlingCallback,
never()).onRevisionApplied(anyLong());
}
/**
diff --git
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index 749d5f569a..2d2de27071 100644
---
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -50,12 +50,9 @@ import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ForkJoinPool;
import org.apache.ignite.internal.failure.NoOpFailureManager;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.ByteArray;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.CommandId;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.dsl.Operation;
@@ -74,8 +71,6 @@ import org.jetbrains.annotations.Nullable;
* Simple in-memory key/value storage for tests.
*/
public class SimpleInMemoryKeyValueStorage extends AbstractKeyValueStorage {
- private static final IgniteLogger LOG =
Loggers.forClass(SimpleInMemoryKeyValueStorage.class);
-
/**
* Keys index. Value is the list of all revisions under which entry
corresponding to the key was modified.
*
@@ -147,7 +142,7 @@ public class SimpleInMemoryKeyValueStorage extends
AbstractKeyValueStorage {
/** Constructor. */
public SimpleInMemoryKeyValueStorage(String nodeName,
ReadOperationForCompactionTracker readOperationForCompactionTracker) {
- super(nodeName, new NoOpFailureManager(),
readOperationForCompactionTracker, ForkJoinPool.commonPool());
+ super(nodeName, new NoOpFailureManager(),
readOperationForCompactionTracker);
}
@Override
@@ -465,7 +460,7 @@ public class SimpleInMemoryKeyValueStorage extends
AbstractKeyValueStorage {
}
@Override
- public void startWatches(long startRevision, OnRevisionAppliedCallback
revisionCallback) {
+ public void startWatches(long startRevision, WatchEventHandlingCallback
callback) {
assert startRevision > 0 : startRevision;
rwLock.readLock().lock();
@@ -473,7 +468,7 @@ public class SimpleInMemoryKeyValueStorage extends
AbstractKeyValueStorage {
try {
areWatchesEnabled = true;
- watchProcessor.setRevisionCallback(revisionCallback);
+
watchProcessor.setWatchEventHandlingCallback(createWrapper(callback));
replayUpdates(startRevision);
} finally {
@@ -770,23 +765,13 @@ public class SimpleInMemoryKeyValueStorage extends
AbstractKeyValueStorage {
}
@Override
- public void saveCompactionRevision(long revision, KeyValueUpdateContext
context) {
- assert revision >= 0 : revision;
-
- rwLock.writeLock().lock();
-
- try {
- assertCompactionRevisionLessThanCurrent(revision, rev);
-
- savedCompactionRevision = revision;
+ public void saveCompactionRevision(long revision, KeyValueUpdateContext
context, boolean advanceSafeTime) {
+ savedCompactionRevision = revision;
- setIndexAndTerm(context.index, context.term);
+ setIndexAndTerm(context.index, context.term);
- if (!isInRecoveryState()) {
- watchProcessor.advanceSafeTime(context.timestamp);
- }
- } finally {
- rwLock.writeLock().unlock();
+ if (advanceSafeTime && !isInRecoveryState()) {
+ watchProcessor.advanceSafeTime(context.timestamp);
}
}
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index db1d2e1ccc..c5e20183e6 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -701,7 +701,7 @@ public class IgniteImpl implements Ignite {
readOperationForCompactionTracker
);
- this.cfgStorage = new DistributedConfigurationStorage(name,
metaStorageMgr);
+ cfgStorage = new DistributedConfigurationStorage(name, metaStorageMgr);
clusterCfgMgr = new ConfigurationManager(
modules.distributed().rootKeys(),
@@ -1715,6 +1715,7 @@ public class IgniteImpl implements Ignite {
return CompletableFuture.allOf(startupConfigurationUpdate,
startupRevisionUpdate, startFuture)
.thenComposeAsync(t -> {
// Deploy all registered watches because all components
are ready and have registered their listeners.
+ // TODO: IGNITE-23292 Run local metastore compaction after
start watches for the latest compacted revision
return metaStorageMgr.deployWatches();
}, startupExecutor);
}