This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 7523612b38 IGNITE-19532 Happens-before for safe time propagation
(#2098)
7523612b38 is described below
commit 7523612b3883e4cad4fe8fb38c602edeb4dbc05c
Author: Semyon Danilov <[email protected]>
AuthorDate: Thu May 25 12:47:05 2023 +0400
IGNITE-19532 Happens-before for safe time propagation (#2098)
---
.../ignite/internal/hlc/HybridTimestamp.java | 5 +
...=> ItMetaStorageMultipleNodesAbstractTest.java} | 64 ++++++++----
.../ItMetaStorageMultipleNodesRocksDbTest.java} | 18 ++--
...MetaStorageMultipleNodesSimpleStorageTest.java} | 14 ++-
...MetaStorageSafeTimePropagationAbstractTest.java | 100 +++++++++++++++++++
...MetaStorageSafeTimePropagationRocksDbTest.java} | 11 ++-
...orageSafeTimePropagationSimpleStorageTest.java} | 11 ++-
.../metastorage/impl/MetaStorageManagerImpl.java | 11 ++-
.../server/OnRevisionAppliedCallback.java | 4 +-
.../metastorage/server/WatchProcessor.java | 16 ++-
.../server/persistence/RocksDbKeyValueStorage.java | 109 ++++++++++++++++++---
.../persistence/StorageColumnFamilyType.java | 5 +-
.../server/raft/MetaStorageWriteHandler.java | 8 +-
.../server/BasicOperationsKeyValueStorageTest.java | 8 +-
.../RocksDbCompactionKeyValueStorageTest.java | 2 +-
.../server/RocksDbKeyValueStorageTest.java | 4 +-
...impleInMemoryCompactionKeyValueStorageTest.java | 2 +-
.../server/SimpleInMemoryKeyValueStorageTest.java | 2 +-
.../metastorage/server/WatchProcessorTest.java | 27 ++---
.../server/AbstractKeyValueStorageTest.java | 2 +-
.../server/SimpleInMemoryKeyValueStorage.java | 18 ++--
21 files changed, 339 insertions(+), 102 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
index 5cdcebdcd5..047c536cc9 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
@@ -82,6 +82,11 @@ public final class HybridTimestamp implements
Comparable<HybridTimestamp>, Seria
}
}
+ /**
+ * The constructor.
+ *
+ * @param time Long time value.
+ */
private HybridTimestamp(long time) {
this.time = time;
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
similarity index 89%
rename from
modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java
rename to
modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
index 2fe20b45ee..53ef8f6d2a 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
@@ -28,6 +28,7 @@ import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCo
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -37,6 +38,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
@@ -57,7 +59,7 @@ import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.EntryEvent;
import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.WatchListener;
-import
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
import org.apache.ignite.internal.raft.Loza;
@@ -73,9 +75,7 @@ import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
-import org.apache.ignite.network.DefaultMessagingService;
import org.apache.ignite.network.NetworkAddress;
-import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.StaticNodeFinder;
import org.apache.ignite.raft.jraft.RaftGroupService;
import org.apache.ignite.utils.ClusterServiceTestUtils;
@@ -90,7 +90,7 @@ import org.junit.jupiter.params.provider.ValueSource;
* Tests for scenarios when Meta Storage nodes join and leave a cluster.
*/
@ExtendWith(ConfigurationExtension.class)
-public class ItMetaStorageMultipleNodesTest extends IgniteAbstractTest {
+public abstract class ItMetaStorageMultipleNodesAbstractTest extends
IgniteAbstractTest {
@InjectConfiguration
private static RaftConfiguration raftConfiguration;
@@ -100,7 +100,9 @@ public class ItMetaStorageMultipleNodesTest extends
IgniteAbstractTest {
@InjectConfiguration
private static NodeAttributesConfiguration nodeAttributes;
- private static class Node {
+ public abstract KeyValueStorage createStorage(String nodeName, Path path);
+
+ private class Node {
private final VaultManager vaultManager;
private final ClusterService clusterService;
@@ -146,7 +148,7 @@ public class ItMetaStorageMultipleNodesTest extends
IgniteAbstractTest {
cmgManager,
new LogicalTopologyServiceImpl(logicalTopology,
cmgManager),
raftManager,
- new SimpleInMemoryKeyValueStorage(name()),
+ createStorage(name(), basePath),
clock
);
}
@@ -194,16 +196,6 @@ public class ItMetaStorageMultipleNodesTest extends
IgniteAbstractTest {
.thenCompose(service ->
service.refreshMembers(false).thenApply(v -> service.learners()))
.thenApply(learners ->
learners.stream().map(Peer::consistentId).collect(toSet()));
}
-
- void startDroppingMessagesTo(Node recipient, Class<? extends
NetworkMessage> msgType) {
- ((DefaultMessagingService) clusterService.messagingService())
- .dropMessages((recipientConsistentId, message) ->
- recipient.name().equals(recipientConsistentId) &&
msgType.isInstance(message));
- }
-
- void stopDroppingMessages() {
- ((DefaultMessagingService)
clusterService.messagingService()).stopDroppingMessages();
- }
}
private final List<Node> nodes = new ArrayList<>();
@@ -365,12 +357,50 @@ public class ItMetaStorageMultipleNodesTest extends
IgniteAbstractTest {
assertThat(allOf(firstNode.cmgManager.onJoinReady(),
secondNode.cmgManager.onJoinReady()), willCompleteSuccessfully());
+ CompletableFuture<Void> watchCompletedFuture = new
CompletableFuture<>();
+ CountDownLatch watchCalledLatch = new CountDownLatch(1);
+
+ ByteArray testKey = ByteArray.fromString("test-key");
+
+ // Register watch listener, so that we can control safe time
propagation.
+ // Safe time can only be propagated when all of the listeners
completed their futures successfully.
+ secondNode.metaStorageManager.registerExactWatch(testKey, new
WatchListener() {
+ @Override
+ public CompletableFuture<Void> onUpdate(WatchEvent event) {
+ watchCalledLatch.countDown();
+
+ return watchCompletedFuture;
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ // No-op.
+ }
+ });
+
+ HybridTimestamp timeBeforeOp = firstNodeTime.currentSafeTime();
+
// Try putting data from both nodes, because any of them can be a
leader.
assertThat(
-
firstNode.metaStorageManager.put(ByteArray.fromString("test-key"), new
byte[]{0, 1, 2, 3}),
+ firstNode.metaStorageManager.put(testKey, new byte[]{0, 1, 2,
3}),
willCompleteSuccessfully()
);
+ // Ensure watch listener is called.
+ assertTrue(watchCalledLatch.await(1, TimeUnit.SECONDS));
+
+ // Wait until leader's safe time is propagated.
+ assertTrue(waitForCondition(() -> {
+ return firstNodeTime.currentSafeTime().compareTo(timeBeforeOp) > 0;
+ }, TimeUnit.SECONDS.toMillis(1)));
+
+ // Safe time must not be propagated to the second node at this moment.
+ assertThat(firstNodeTime.currentSafeTime(),
greaterThan(secondNodeTime.currentSafeTime()));
+
+ // Finish watch listener notification process.
+ watchCompletedFuture.complete(null);
+
+ // After that in the nearest future safe time must be propagated.
assertTrue(waitForCondition(() -> {
HybridTimestamp sf1 = firstNodeTime.currentSafeTime();
HybridTimestamp sf2 = secondNodeTime.currentSafeTime();
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbCompactionKeyValueStorageTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesRocksDbTest.java
similarity index 60%
copy from
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbCompactionKeyValueStorageTest.java
copy to
modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesRocksDbTest.java
index 6f52b4130c..b90a88c9ac 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbCompactionKeyValueStorageTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesRocksDbTest.java
@@ -15,22 +15,16 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.metastorage.server;
+package org.apache.ignite.internal.metastorage.impl;
import java.nio.file.Path;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
-import org.apache.ignite.internal.testframework.WorkDirectory;
-import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
-import org.junit.jupiter.api.extension.ExtendWith;
-
-/** Compaction test for the RocksDB implementation of {@link KeyValueStorage}.
*/
-@ExtendWith(WorkDirectoryExtension.class)
-public class RocksDbCompactionKeyValueStorageTest extends
AbstractCompactionKeyValueStorageTest {
- @WorkDirectory
- private Path workDir;
+/** {@link ItMetaStorageMultipleNodesAbstractTest} with {@link
RocksDbKeyValueStorage} implementation. */
+public class ItMetaStorageMultipleNodesRocksDbTest extends
ItMetaStorageMultipleNodesAbstractTest {
@Override
- KeyValueStorage createStorage() {
- return new RocksDbKeyValueStorage("test", workDir.resolve("storage"));
+ public KeyValueStorage createStorage(String nodeName, Path path) {
+ return new RocksDbKeyValueStorage(nodeName, path.resolve("ms"));
}
}
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryCompactionKeyValueStorageTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesSimpleStorageTest.java
similarity index 58%
copy from
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryCompactionKeyValueStorageTest.java
copy to
modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesSimpleStorageTest.java
index 54ba8cfc14..b1eefc14d9 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryCompactionKeyValueStorageTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesSimpleStorageTest.java
@@ -15,12 +15,16 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.metastorage.server;
+package org.apache.ignite.internal.metastorage.impl;
-/** Compaction test for the simple in-memory implementation of {@link
KeyValueStorage}. */
-public class SimpleInMemoryCompactionKeyValueStorageTest extends
AbstractCompactionKeyValueStorageTest {
+import java.nio.file.Path;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+
+/** {@link ItMetaStorageMultipleNodesAbstractTest} with {@link
SimpleInMemoryKeyValueStorage} implementation. */
+public class ItMetaStorageMultipleNodesSimpleStorageTest extends
ItMetaStorageMultipleNodesAbstractTest {
@Override
- KeyValueStorage createStorage() {
- return new SimpleInMemoryKeyValueStorage("test");
+ public KeyValueStorage createStorage(String nodeName, Path path) {
+ return new SimpleInMemoryKeyValueStorage(nodeName);
}
}
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java
new file mode 100644
index 0000000000..7192723bb6
--- /dev/null
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.impl;
+
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.lessThan;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import
org.apache.ignite.internal.metastorage.server.AbstractKeyValueStorageTest;
+import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/** Safe time propagation tests. */
+public abstract class ItMetaStorageSafeTimePropagationAbstractTest extends
AbstractKeyValueStorageTest {
+ private final HybridClock clock = new HybridClockImpl();
+
+ private final ClusterTimeImpl time = new ClusterTimeImpl(new
IgniteSpinBusyLock(), clock);
+
+ @BeforeEach
+ @Override
+ public void setUp() {
+ super.setUp();
+
+ storage.startWatches((e, t) -> {
+ time.updateSafeTime(t);
+
+ return CompletableFuture.completedFuture(null);
+ });
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ storage.close();
+ }
+
+ @Test
+ public void testTimePropagated() throws Exception {
+ CompletableFuture<Void> watchCompletedFuture = new
CompletableFuture<>();
+
+ CountDownLatch watchCalledLatch = new CountDownLatch(1);
+
+ // Register watch listener, so that we can control safe time
propagation.
+ // Safe time can only be propagated when all of the listeners
completed their futures successfully.
+ storage.watchExact(key(0), 1, new WatchListener() {
+ @Override
+ public CompletableFuture<Void> onUpdate(WatchEvent event) {
+ watchCalledLatch.countDown();
+ return watchCompletedFuture;
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ // No-op.
+ }
+ });
+
+ HybridTimestamp opTs = clock.now();
+
+ storage.put(key(0), keyValue(0, 1), opTs);
+
+ // Ensure watch listener is called.
+ assertTrue(watchCalledLatch.await(1, TimeUnit.SECONDS));
+
+ // Safe time must not be propagated before watch notifies all
listeners.
+ assertThat(time.currentSafeTime(), lessThan(opTs));
+
+ // Finish listener notification.
+ watchCompletedFuture.complete(null);
+
+ // Safe time must be propagated.
+ assertThat(time.waitFor(opTs), willCompleteSuccessfully());
+ }
+}
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbCompactionKeyValueStorageTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationRocksDbTest.java
similarity index 72%
copy from
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbCompactionKeyValueStorageTest.java
copy to
modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationRocksDbTest.java
index 6f52b4130c..22cd42442c 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbCompactionKeyValueStorageTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationRocksDbTest.java
@@ -15,22 +15,23 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.metastorage.server;
+package org.apache.ignite.internal.metastorage.impl;
import java.nio.file.Path;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.junit.jupiter.api.extension.ExtendWith;
-/** Compaction test for the RocksDB implementation of {@link KeyValueStorage}.
*/
+/** {@link ItMetaStorageSafeTimePropagationAbstractTest} with {@link
RocksDbKeyValueStorage} implementation. */
@ExtendWith(WorkDirectoryExtension.class)
-public class RocksDbCompactionKeyValueStorageTest extends
AbstractCompactionKeyValueStorageTest {
+public class ItMetaStorageSafeTimePropagationRocksDbTest extends
ItMetaStorageSafeTimePropagationAbstractTest {
@WorkDirectory
private Path workDir;
@Override
- KeyValueStorage createStorage() {
- return new RocksDbKeyValueStorage("test", workDir.resolve("storage"));
+ public KeyValueStorage createStorage() {
+ return new RocksDbKeyValueStorage("test", workDir);
}
}
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryCompactionKeyValueStorageTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationSimpleStorageTest.java
similarity index 64%
copy from
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryCompactionKeyValueStorageTest.java
copy to
modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationSimpleStorageTest.java
index 54ba8cfc14..d412c6565e 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryCompactionKeyValueStorageTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationSimpleStorageTest.java
@@ -15,12 +15,15 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.metastorage.server;
+package org.apache.ignite.internal.metastorage.impl;
-/** Compaction test for the simple in-memory implementation of {@link
KeyValueStorage}. */
-public class SimpleInMemoryCompactionKeyValueStorageTest extends
AbstractCompactionKeyValueStorageTest {
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+
+/** {@link ItMetaStorageSafeTimePropagationAbstractTest} with {@link
SimpleInMemoryKeyValueStorage} implementation. */
+public class ItMetaStorageSafeTimePropagationSimpleStorageTest extends
ItMetaStorageSafeTimePropagationAbstractTest {
@Override
- KeyValueStorage createStorage() {
+ public KeyValueStorage createStorage() {
return new SimpleInMemoryKeyValueStorage("test");
}
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index 7bc432a27c..5bf665416d 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.Entry;
@@ -150,7 +151,7 @@ public class MetaStorageManagerImpl implements
MetaStorageManager {
CompletableFuture<RaftGroupService> raftServiceFuture;
try {
- RaftNodeDisruptorConfiguration ownFsmCallerExecutorDisruptorConfig
= new RaftNodeDisruptorConfiguration("metastorage", 1);
+ var ownFsmCallerExecutorDisruptorConfig = new
RaftNodeDisruptorConfiguration("metastorage", 1);
// We need to configure the replication protocol differently
whether this node is a synchronous or asynchronous replica.
if (metaStorageNodes.contains(thisNodeName)) {
@@ -283,7 +284,7 @@ public class MetaStorageManagerImpl implements
MetaStorageManager {
try {
// Meta Storage contract states that all updated entries under a
particular revision must be stored in the Vault.
- storage.startWatches(this::saveUpdatedEntriesToVault);
+ storage.startWatches(this::onRevisionApplied);
} finally {
busyLock.leaveBusy();
}
@@ -599,13 +600,17 @@ public class MetaStorageManagerImpl implements
MetaStorageManager {
/**
* Saves processed Meta Storage revision and corresponding entries to the
Vault.
*/
- private CompletableFuture<Void> saveUpdatedEntriesToVault(WatchEvent
watchEvent) {
+ private CompletableFuture<Void> onRevisionApplied(WatchEvent watchEvent,
HybridTimestamp time) {
+ assert time != null;
+
if (!busyLock.enterBusy()) {
LOG.info("Skipping applying MetaStorage revision because the node
is stopping");
return completedFuture(null);
}
+ clusterTime.updateSafeTime(time);
+
try {
CompletableFuture<Void> saveToVaultFuture;
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java
index 0ce5b39072..e02534db44 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.metastorage.server;
import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.metastorage.WatchEvent;
/**
@@ -29,7 +30,8 @@ public interface OnRevisionAppliedCallback {
* Notifies of completion of processing of Meta Storage watches for a
particular revision.
*
* @param watchEvent Event with modified Meta Storage entries processed at
least one Watch.
+ * @param newSafeTime Safe time of the applied revision.
* @return Future that represents the state of the execution of the
callback.
*/
- CompletableFuture<Void> onRevisionApplied(WatchEvent watchEvent);
+ CompletableFuture<Void> onRevisionApplied(WatchEvent watchEvent,
HybridTimestamp newSafeTime);
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
index aac4330e29..234a14dd08 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.Entry;
@@ -118,7 +119,10 @@ public class WatchProcessor implements ManuallyCloseable {
/**
* Notifies registered watch about an update event.
*/
- public void notifyWatches(List<Entry> updatedEntries) {
+ @SuppressWarnings("unchecked")
+ public void notifyWatches(List<Entry> updatedEntries, HybridTimestamp
time) {
+ assert time != null;
+
notificationFuture = notificationFuture
.thenComposeAsync(v -> {
// Revision must be the same for all entries.
@@ -130,7 +134,7 @@ public class WatchProcessor implements ManuallyCloseable {
.toArray(CompletableFuture[]::new);
return allOf(notificationFutures)
- .thenComposeAsync(ignored ->
invokeOnRevisionCallback(notificationFutures, newRevision), watchExecutor);
+ .thenComposeAsync(ignored ->
invokeOnRevisionCallback(notificationFutures, newRevision, time),
watchExecutor);
}, watchExecutor);
}
@@ -179,7 +183,11 @@ public class WatchProcessor implements ManuallyCloseable {
});
}
- private CompletableFuture<Void>
invokeOnRevisionCallback(CompletableFuture<List<EntryEvent>>[]
notificationFutures, long revision) {
+ private CompletableFuture<Void> invokeOnRevisionCallback(
+ CompletableFuture<List<EntryEvent>>[] notificationFutures,
+ long revision,
+ HybridTimestamp time
+ ) {
try {
// Only notify about entries that have been accepted by at least
one Watch.
var acceptedEntries = new HashSet<EntryEvent>();
@@ -193,7 +201,7 @@ public class WatchProcessor implements ManuallyCloseable {
var event = new WatchEvent(acceptedEntries, revision);
- return revisionCallback.onRevisionApplied(event)
+ return revisionCallback.onRevisionApplied(event, time)
.whenComplete((ignored, e) -> {
if (e != null) {
LOG.error("Error occurred when notifying watches",
e);
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index 0d439a0b24..78ff64ef7a 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -30,6 +30,7 @@ import static
org.apache.ignite.internal.metastorage.server.persistence.RocksSto
import static
org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.valueToBytes;
import static
org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.DATA;
import static
org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.INDEX;
+import static
org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.REVISION_TO_TS;
import static
org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.TS_TO_REVISION;
import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix;
import static
org.apache.ignite.internal.rocksdb.snapshot.ColumnFamilyRange.fullRange;
@@ -47,6 +48,7 @@ import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.NoSuchElementException;
+import java.util.Objects;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
@@ -156,6 +158,9 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
/** Timestamp to revision mapping column family. */
private volatile ColumnFamily tsToRevision;
+ /** Revision to timestamp mapping column family. */
+ private volatile ColumnFamily revisionToTs;
+
/** Snapshot manager. */
private volatile RocksSnapshotManager snapshotManager;
@@ -197,14 +202,14 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
* <p>Multi-threaded access is guarded by {@link #rwLock}.
*/
@Nullable
- private List<List<Entry>> eventCache;
+ private List<UpdatedEntries> eventCache;
/**
* Current list of updated entries.
*
* <p>Since this list gets read and updated only on writes (under a write
lock), no extra synchronisation is needed.
*/
- private final List<Entry> updatedEntries = new ArrayList<>();
+ private final UpdatedEntries updatedEntries = new UpdatedEntries();
/**
* Constructor.
@@ -242,10 +247,14 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
Options tsToRevOptions = new Options().setCreateIfMissing(true);
ColumnFamilyOptions tsToRevFamilyOptions = new
ColumnFamilyOptions(tsToRevOptions);
+ Options revToTsOptions = new Options().setCreateIfMissing(true);
+ ColumnFamilyOptions revToTsFamilyOptions = new
ColumnFamilyOptions(revToTsOptions);
+
return List.of(
new ColumnFamilyDescriptor(DATA.nameAsBytes(),
dataFamilyOptions),
new ColumnFamilyDescriptor(INDEX.nameAsBytes(),
indexFamilyOptions),
- new ColumnFamilyDescriptor(TS_TO_REVISION.nameAsBytes(),
tsToRevFamilyOptions)
+ new ColumnFamilyDescriptor(TS_TO_REVISION.nameAsBytes(),
tsToRevFamilyOptions),
+ new ColumnFamilyDescriptor(REVISION_TO_TS.nameAsBytes(),
revToTsFamilyOptions)
);
}
@@ -254,7 +263,7 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
List<ColumnFamilyDescriptor> descriptors = cfDescriptors();
- assert descriptors.size() == 3;
+ assert descriptors.size() == 4;
var handles = new ArrayList<ColumnFamilyHandle>(descriptors.size());
@@ -270,8 +279,10 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
tsToRevision = ColumnFamily.wrap(db, handles.get(2));
+ revisionToTs = ColumnFamily.wrap(db, handles.get(3));
+
snapshotManager = new RocksSnapshotManager(db,
- List.of(fullRange(data), fullRange(index),
fullRange(tsToRevision)),
+ List.of(fullRange(data), fullRange(index),
fullRange(tsToRevision), fullRange(revisionToTs)),
snapshotExecutor
);
}
@@ -413,7 +424,10 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
data.put(batch, REVISION_KEY, revisionBytes);
if (ts != null) {
- tsToRevision.put(batch, hybridTsToArray(ts), revisionBytes);
+ byte[] tsBytes = hybridTsToArray(ts);
+
+ tsToRevision.put(batch, tsBytes, revisionBytes);
+ revisionToTs.put(batch, revisionBytes, tsBytes);
}
db.write(opts, batch);
@@ -422,6 +436,8 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
updCntr = newCntr;
}
+ updatedEntries.ts = ts;
+
queueWatchEvent();
}
@@ -1329,9 +1345,7 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
eventCache = new ArrayList<>();
}
- eventCache.add(List.copyOf(updatedEntries));
-
- updatedEntries.clear();
+ eventCache.add(updatedEntries.transfer());
break;
@@ -1343,9 +1357,11 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
}
private void notifyWatches() {
- watchProcessor.notifyWatches(List.copyOf(updatedEntries));
+ UpdatedEntries copy = updatedEntries.transfer();
+
+ assert copy.ts != null;
- updatedEntries.clear();
+ watchProcessor.notifyWatches(copy.updatedEntries, copy.ts);
}
private void replayUpdates(long upperRevision) {
@@ -1359,6 +1375,7 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
}
var updatedEntries = new ArrayList<Entry>();
+ HybridTimestamp ts = null;
try (
var upperBound = new Slice(longToBytes(upperRevision + 1));
@@ -1379,7 +1396,9 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
if (!updatedEntries.isEmpty()) {
var updatedEntriesCopy = List.copyOf(updatedEntries);
- watchProcessor.notifyWatches(updatedEntriesCopy);
+ assert ts != null;
+
+ watchProcessor.notifyWatches(updatedEntriesCopy, ts);
updatedEntries.clear();
}
@@ -1387,6 +1406,10 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
lastSeenRevision = revision;
}
+ if (ts == null) {
+ ts = timestampByRevision(revision);
+ }
+
updatedEntries.add(entry(rocksKeyToBytes(rocksKey), revision,
bytesToValue(rocksValue)));
}
@@ -1394,13 +1417,27 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
// Notify about the events left after finishing the cycle above.
if (!updatedEntries.isEmpty()) {
- watchProcessor.notifyWatches(updatedEntries);
+ assert ts != null;
+
+ watchProcessor.notifyWatches(updatedEntries, ts);
}
}
finishReplay();
}
+ private HybridTimestamp timestampByRevision(long revision) {
+ try {
+ byte[] tsBytes = revisionToTs.get(longToBytes(revision));
+
+ assert tsBytes != null;
+
+ return HybridTimestamp.hybridTimestamp(bytesToLong(tsBytes));
+ } catch (RocksDBException e) {
+ throw new MetaStorageException(OP_EXECUTION_ERR, e);
+ }
+ }
+
private void finishReplay() {
// Take the lock to drain the event cache and prevent new events from
being cached. Since event notification is asynchronous,
// this lock shouldn't be held for long.
@@ -1408,7 +1445,11 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
try {
if (eventCache != null) {
- eventCache.forEach(watchProcessor::notifyWatches);
+ eventCache.forEach(entries -> {
+ assert entries.ts != null;
+
+ watchProcessor.notifyWatches(entries.updatedEntries,
entries.ts);
+ });
eventCache = null;
}
@@ -1423,4 +1464,44 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
public Path getDbPath() {
return dbPath;
}
+
+ private static class UpdatedEntries {
+ private final List<Entry> updatedEntries;
+
+ @Nullable
+ private HybridTimestamp ts;
+
+ public UpdatedEntries() {
+ this.updatedEntries = new ArrayList<>();
+ }
+
+ private UpdatedEntries(List<Entry> updatedEntries, HybridTimestamp ts)
{
+ this.updatedEntries = updatedEntries;
+ this.ts = Objects.requireNonNull(ts);
+ }
+
+ boolean isEmpty() {
+ return updatedEntries.isEmpty();
+ }
+
+ void add(Entry entry) {
+ updatedEntries.add(entry);
+ }
+
+ void clear() {
+ updatedEntries.clear();
+
+ ts = null;
+ }
+
+ UpdatedEntries transfer() {
+ assert ts != null;
+
+ UpdatedEntries transferredValue = new UpdatedEntries(new
ArrayList<>(updatedEntries), ts);
+
+ clear();
+
+ return transferredValue;
+ }
+ }
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/StorageColumnFamilyType.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/StorageColumnFamilyType.java
index 958b73fb88..ef2f0aa685 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/StorageColumnFamilyType.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/StorageColumnFamilyType.java
@@ -31,7 +31,10 @@ enum StorageColumnFamilyType {
INDEX("INDEX".getBytes(StandardCharsets.UTF_8)),
/** Column family for the timestamp to revision mapping. */
- TS_TO_REVISION("TSTOREV".getBytes(StandardCharsets.UTF_8));
+ TS_TO_REVISION("TSTOREV".getBytes(StandardCharsets.UTF_8)),
+
+ /** Column family for the revision to timestamp mapping. */
+ REVISION_TO_TS("REVTOTS".getBytes(StandardCharsets.UTF_8));
/** Byte representation of the column family's name. */
private final byte[] nameAsBytes;
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
index ca80ffc46a..6354ece930 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
@@ -88,13 +88,9 @@ class MetaStorageWriteHandler {
safeTime = cmdWithTime.safeTime();
handleWriteWithTime(clo, cmdWithTime, safeTime);
-
- // Every MetaStorageWriteCommand holds safe time that we
should set as the cluster time.
- clusterTime.updateSafeTime(safeTime);
} else if (command instanceof SyncTimeCommand) {
- clusterTime.updateSafeTime(((SyncTimeCommand)
command).safeTime());
-
- clo.result(null);
+ // TODO: IGNITE-19199 WatchProcessor must be notified of the
new safe time.
+ throw new
UnsupportedOperationException("https://issues.apache.org/jira/browse/IGNITE-19199");
} else {
assert false : "Command was not found [cmd=" + command + ']';
}
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
index 27033f9685..fd2318d8ff 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
@@ -1887,7 +1887,7 @@ public abstract class BasicOperationsKeyValueStorageTest
extends AbstractKeyValu
long appliedRevision = storage.revision();
- storage.startWatches(event -> completedFuture(null));
+ storage.startWatches((event, ts) -> completedFuture(null));
CompletableFuture<byte[]> fut = new CompletableFuture<>();
@@ -2226,7 +2226,7 @@ public abstract class BasicOperationsKeyValueStorageTest
extends AbstractKeyValu
OnRevisionAppliedCallback mockCallback =
mock(OnRevisionAppliedCallback.class);
-
when(mockCallback.onRevisionApplied(any())).thenReturn(completedFuture(null));
+ when(mockCallback.onRevisionApplied(any(),
any())).thenReturn(completedFuture(null));
storage.startWatches(mockCallback);
@@ -2238,7 +2238,7 @@ public abstract class BasicOperationsKeyValueStorageTest
extends AbstractKeyValu
verify(mockListener3, timeout(10_000)).onUpdate(any());
- verify(mockCallback, never()).onRevisionApplied(any());
+ verify(mockCallback, never()).onRevisionApplied(any(), any());
}
@Test
@@ -2423,7 +2423,7 @@ public abstract class BasicOperationsKeyValueStorageTest
extends AbstractKeyValu
}
});
- storage.startWatches(event -> completedFuture(null));
+ storage.startWatches((event, ts) -> completedFuture(null));
return resultFuture;
}
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbCompactionKeyValueStorageTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbCompactionKeyValueStorageTest.java
index 6f52b4130c..01198c199d 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbCompactionKeyValueStorageTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbCompactionKeyValueStorageTest.java
@@ -30,7 +30,7 @@ public class RocksDbCompactionKeyValueStorageTest extends
AbstractCompactionKeyV
private Path workDir;
@Override
- KeyValueStorage createStorage() {
+ public KeyValueStorage createStorage() {
return new RocksDbKeyValueStorage("test", workDir.resolve("storage"));
}
}
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
index 31baeb0f31..d60e46416b 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
@@ -47,7 +47,7 @@ public class RocksDbKeyValueStorageTest extends
BasicOperationsKeyValueStorageTe
/** {@inheritDoc} */
@Override
- KeyValueStorage createStorage() {
+ public KeyValueStorage createStorage() {
return new RocksDbKeyValueStorage("test", workDir.resolve("storage"));
}
@@ -102,7 +102,7 @@ public class RocksDbKeyValueStorageTest extends
BasicOperationsKeyValueStorageTe
}
});
- storage.startWatches(event -> CompletableFuture.completedFuture(null));
+ storage.startWatches((event, ts) ->
CompletableFuture.completedFuture(null));
storage.restoreSnapshot(snapshotPath);
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryCompactionKeyValueStorageTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryCompactionKeyValueStorageTest.java
index 54ba8cfc14..d7adf966d6 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryCompactionKeyValueStorageTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryCompactionKeyValueStorageTest.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.metastorage.server;
/** Compaction test for the simple in-memory implementation of {@link
KeyValueStorage}. */
public class SimpleInMemoryCompactionKeyValueStorageTest extends
AbstractCompactionKeyValueStorageTest {
@Override
- KeyValueStorage createStorage() {
+ public KeyValueStorage createStorage() {
return new SimpleInMemoryKeyValueStorage("test");
}
}
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
index 4e4bca8904..d9c5f9256c 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
@@ -23,7 +23,7 @@ package org.apache.ignite.internal.metastorage.server;
class SimpleInMemoryKeyValueStorageTest extends
BasicOperationsKeyValueStorageTest {
/** {@inheritDoc} */
@Override
- KeyValueStorage createStorage() {
+ public KeyValueStorage createStorage() {
return new SimpleInMemoryKeyValueStorage("test");
}
}
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java
index 1e64ecd803..210a2bcc3d 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java
@@ -34,6 +34,7 @@ import static org.mockito.Mockito.when;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.EntryEvent;
import org.apache.ignite.internal.metastorage.WatchEvent;
@@ -55,7 +56,7 @@ public class WatchProcessorTest {
@BeforeEach
void setUp() {
-
when(revisionCallback.onRevisionApplied(any())).thenReturn(completedFuture(null));
+ when(revisionCallback.onRevisionApplied(any(),
any())).thenReturn(completedFuture(null));
watchProcessor.setRevisionCallback(revisionCallback);
}
@@ -79,7 +80,7 @@ public class WatchProcessorTest {
var entry1 = new EntryImpl("foo".getBytes(UTF_8), null, 1, 0);
var entry2 = new EntryImpl("bar".getBytes(UTF_8), null, 1, 0);
- watchProcessor.notifyWatches(List.of(entry1, entry2));
+ watchProcessor.notifyWatches(List.of(entry1, entry2),
HybridTimestamp.MAX_VALUE);
var entryEvent1 = new EntryEvent(oldEntry(entry1), entry1);
var entryEvent2 = new EntryEvent(oldEntry(entry2), entry2);
@@ -92,7 +93,7 @@ public class WatchProcessorTest {
var watchEventCaptor = ArgumentCaptor.forClass(WatchEvent.class);
- verify(revisionCallback,
timeout(1_000)).onRevisionApplied(watchEventCaptor.capture());
+ verify(revisionCallback,
timeout(1_000)).onRevisionApplied(watchEventCaptor.capture(), any());
WatchEvent event = watchEventCaptor.getValue();
@@ -114,23 +115,27 @@ public class WatchProcessorTest {
var entry1 = new EntryImpl("foo".getBytes(UTF_8), null, 1, 0);
var entry2 = new EntryImpl("bar".getBytes(UTF_8), null, 2, 0);
- watchProcessor.notifyWatches(List.of(entry1));
+ HybridTimestamp ts = new HybridTimestamp(1, 2);
+
+ watchProcessor.notifyWatches(List.of(entry1), ts);
var event = new WatchEvent(new EntryEvent(oldEntry(entry1), entry1));
verify(listener1, timeout(1_000)).onUpdate(event);
verify(listener2, timeout(1_000)).onRevisionUpdated(1);
- verify(revisionCallback, timeout(1_000)).onRevisionApplied(event);
+ verify(revisionCallback, timeout(1_000)).onRevisionApplied(event, ts);
+
+ ts = new HybridTimestamp(2, 3);
- watchProcessor.notifyWatches(List.of(entry2));
+ watchProcessor.notifyWatches(List.of(entry2), ts);
event = new WatchEvent(new EntryEvent(oldEntry(entry2), entry2));
verify(listener1, timeout(1_000)).onRevisionUpdated(2);
verify(listener2, timeout(1_000)).onUpdate(event);
- verify(revisionCallback, timeout(1_000)).onRevisionApplied(event);
+ verify(revisionCallback, timeout(1_000)).onRevisionApplied(event, ts);
}
/**
@@ -150,13 +155,13 @@ public class WatchProcessorTest {
var entry1 = new EntryImpl("foo".getBytes(UTF_8), null, 1, 0);
var entry2 = new EntryImpl("bar".getBytes(UTF_8), null, 1, 0);
- watchProcessor.notifyWatches(List.of(entry1, entry2));
+ watchProcessor.notifyWatches(List.of(entry1, entry2),
HybridTimestamp.MAX_VALUE);
verify(listener1, timeout(1_000)).onUpdate(new WatchEvent(new
EntryEvent(oldEntry(entry1), entry1)));
verify(listener2, timeout(1_000)).onUpdate(new WatchEvent(new
EntryEvent(oldEntry(entry2), entry2)));
verify(listener2,
timeout(1_000)).onError(any(IllegalStateException.class));
- verify(revisionCallback, never()).onRevisionApplied(any());
+ verify(revisionCallback, never()).onRevisionApplied(any(), any());
}
/**
@@ -182,7 +187,7 @@ public class WatchProcessorTest {
var entry1 = new EntryImpl("foo".getBytes(UTF_8), null, 1, 0);
var entry2 = new EntryImpl("bar".getBytes(UTF_8), null, 1, 0);
- watchProcessor.notifyWatches(List.of(entry1, entry2));
+ watchProcessor.notifyWatches(List.of(entry1, entry2),
HybridTimestamp.MAX_VALUE);
verify(listener1, timeout(1_000)).onUpdate(new WatchEvent(new
EntryEvent(oldEntry(entry1), entry1)));
verify(listener2, timeout(1_000)).onUpdate(new WatchEvent(new
EntryEvent(oldEntry(entry2), entry2)));
@@ -190,7 +195,7 @@ public class WatchProcessorTest {
var entry3 = new EntryImpl("foo".getBytes(UTF_8), null, 2, 0);
var entry4 = new EntryImpl("bar".getBytes(UTF_8), null, 2, 0);
- watchProcessor.notifyWatches(List.of(entry3, entry4));
+ watchProcessor.notifyWatches(List.of(entry3, entry4),
HybridTimestamp.MAX_VALUE);
verify(listener1, never()).onUpdate(new WatchEvent(new
EntryEvent(oldEntry(entry3), entry3)));
verify(listener2, never()).onUpdate(new WatchEvent(new
EntryEvent(oldEntry(entry4), entry4)));
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
similarity index 96%
rename from
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
rename to
modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
index acc9241f06..ce7a4a5b53 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
+++
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
@@ -43,7 +43,7 @@ public abstract class AbstractKeyValueStorageTest {
/**
* Returns key value storage for this test.
*/
- abstract KeyValueStorage createStorage();
+ protected abstract KeyValueStorage createStorage();
protected static byte[] key(int k) {
return ("key" + k).getBytes(UTF_8);
diff --git
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index 2096e9ab4e..07e9a913d7 100644
---
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -28,6 +28,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
@@ -64,6 +65,9 @@ public class SimpleInMemoryKeyValueStorage implements
KeyValueStorage {
/** Timestamp to revision mapping. */
private final NavigableMap<Long, Long> tsToRevMap = new TreeMap<>();
+ /** Revision to timestamp mapping. */
+ private final Map<Long, HybridTimestamp> revToTsMap = new HashMap<>();
+
/** Revisions index. Value contains all entries which were modified under
particular revision. */
private NavigableMap<Long, NavigableMap<byte[], Value>> revsIdx = new
TreeMap<>();
@@ -120,6 +124,7 @@ public class SimpleInMemoryKeyValueStorage implements
KeyValueStorage {
rev = newRevision;
tsToRevMap.put(ts.longValue(), rev);
+ revToTsMap.put(rev, ts);
notifyWatches();
}
@@ -468,7 +473,10 @@ public class SimpleInMemoryKeyValueStorage implements
KeyValueStorage {
return;
}
- watchProcessor.notifyWatches(List.copyOf(updatedEntries));
+ HybridTimestamp ts = revToTsMap.get(updatedEntries.get(0).revision());
+ assert ts != null;
+
+ watchProcessor.notifyWatches(List.copyOf(updatedEntries), ts);
updatedEntries.clear();
}
@@ -748,12 +756,4 @@ public class SimpleInMemoryKeyValueStorage implements
KeyValueStorage {
private static long lastRevision(List<Long> revs) {
return revs.get(revs.size() - 1);
}
-
- private static List<Long> listOf(long val) {
- List<Long> res = new ArrayList<>();
-
- res.add(val);
-
- return res;
- }
}