This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 02d17e9bac IGNITE-23847 Add metastorage revision update notification
for operations that do not update metastorage data (#4839)
02d17e9bac is described below
commit 02d17e9bac6f162546182bd222b0f78f599aceed
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Thu Dec 5 18:06:24 2024 +0300
IGNITE-23847 Add metastorage revision update notification for operations
that do not update metastorage data (#4839)
---
.../internal/metastorage/server/Condition.java | 5 +-
.../server/UpdateOnlyRevisionEvent.java | 52 ++++++++
.../metastorage/server/WatchProcessor.java | 20 ++-
.../server/persistence/RocksDbKeyValueStorage.java | 72 +++++++---
.../server/BasicOperationsKeyValueStorageTest.java | 60 +++++++--
.../server/TestRevisionUpdateListener.java} | 39 +++---
.../server/TestWatchEventHandlingCallback.java} | 34 ++---
.../server/UpdateRevisionOperation.java | 147 +++++++++++++++++++++
.../server/SimpleInMemoryKeyValueStorage.java | 25 +++-
9 files changed, 365 insertions(+), 89 deletions(-)
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/Condition.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/Condition.java
index 36066d9a37..4d73cac0d9 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/Condition.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/Condition.java
@@ -17,13 +17,14 @@
package org.apache.ignite.internal.metastorage.server;
-import java.util.Collection;
+import java.util.List;
+import org.apache.ignite.internal.metastorage.CommandId;
import org.apache.ignite.internal.metastorage.Entry;
/**
* Defines interface for boolean condition which could be applied to an array
of entries.
*
- * @see KeyValueStorage#invoke(Condition, Collection, Collection)
+ * @see KeyValueStorage#invoke(Condition, List, List, KeyValueUpdateContext,
CommandId)
*/
public interface Condition {
/**
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/UpdateOnlyRevisionEvent.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/UpdateOnlyRevisionEvent.java
new file mode 100644
index 0000000000..3d275d2778
--- /dev/null
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/UpdateOnlyRevisionEvent.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.tostring.IgniteToStringInclude;
+import org.apache.ignite.internal.tostring.S;
+
+/** Notifies the {@link WatchProcessor} to update the revision without
updating the {@link Entry entries}. */
+public class UpdateOnlyRevisionEvent implements NotifyWatchProcessorEvent {
+ private final long newRevision;
+
+ @IgniteToStringInclude
+ private final HybridTimestamp timestamp;
+
+ /** Constructor. */
+ public UpdateOnlyRevisionEvent(long newRevision, HybridTimestamp
timestamp) {
+ this.newRevision = newRevision;
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public HybridTimestamp timestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public void notify(WatchProcessor watchProcessor) {
+ watchProcessor.updateOnlyRevision(newRevision, timestamp);
+ }
+
+ @Override
+ public String toString() {
+ return S.toString(this);
+ }
+}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
index b6e0975cfc..03c02e99b0 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
@@ -421,10 +421,20 @@ public class WatchProcessor implements ManuallyCloseable {
watchEventHandlingCallback.onSafeTimeAdvanced(time);
}, watchExecutor)
- .whenComplete((ignored, e) -> {
- if (e != null) {
- failureManager.process(new
FailureContext(CRITICAL_ERROR, e));
- }
- });
+ .whenComplete((ignored, e) ->
notifyFailureHandlerOnFirstFailureInNotificationChain(e));
+ }
+
+ /**
+ * Updates the metastorage revision in the WatchEvent queue. It should be
used for those cases when the revision has been updated but
+ * no {@link Entry}s have been updated.
+ *
+ * @param newRevision New metastorage revision.
+ * @param time Metastorage revision update timestamp.
+ */
+ void updateOnlyRevision(long newRevision, HybridTimestamp time) {
+ notificationFuture = notificationFuture
+ .thenComposeAsync(unused ->
notifyUpdateRevisionListeners(newRevision), watchExecutor)
+ .thenRunAsync(() -> invokeOnRevisionCallback(newRevision,
time), watchExecutor)
+ .whenComplete((ignored, e) ->
notifyFailureHandlerOnFirstFailureInNotificationChain(e));
}
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index 4e26de9ae7..bb9dd8dc4b 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -98,6 +98,7 @@ import
org.apache.ignite.internal.metastorage.server.NotifyWatchProcessorEvent;
import
org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker;
import org.apache.ignite.internal.metastorage.server.Statement;
import org.apache.ignite.internal.metastorage.server.UpdateEntriesEvent;
+import org.apache.ignite.internal.metastorage.server.UpdateOnlyRevisionEvent;
import org.apache.ignite.internal.metastorage.server.Value;
import
org.apache.ignite.internal.metastorage.server.WatchEventHandlingCallback;
import org.apache.ignite.internal.raft.IndexWithTerm;
@@ -978,12 +979,15 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
}
if (currentRevision != 0) {
- Set<NotifyWatchProcessorEvent> fromStorage =
collectNotifyWatchProcessorEventsFromStorage(startRevision, currentRevision);
+ Set<UpdateEntriesEvent> updateEntriesEvents =
collectUpdateEntriesEventsFromStorage(startRevision, currentRevision);
+ Set<UpdateOnlyRevisionEvent> updateOnlyRevisionEvents =
collectUpdateRevisionEventsFromStorage(startRevision, currentRevision);
rwLock.writeLock().lock();
try {
-
notifyWatchProcessorEventsBeforeStartingWatches.addAll(fromStorage);
+
notifyWatchProcessorEventsBeforeStartingWatches.addAll(updateEntriesEvents);
+ // Adds events for which there were no entries updates but the
revision was updated.
+
notifyWatchProcessorEventsBeforeStartingWatches.addAll(updateOnlyRevisionEvents);
drainNotifyWatchProcessorEventsBeforeStartingWatches();
@@ -1144,10 +1148,6 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
* Adds modified entries to the watch event queue.
*/
private void queueWatchEvent() {
- if (updatedEntries.isEmpty()) {
- return;
- }
-
switch (recoveryStatus.get()) {
case INITIAL:
// Watches haven't been enabled yet, no need to queue any
events, they will be replayed upon recovery.
@@ -1155,28 +1155,17 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
break;
case IN_PROGRESS:
- UpdatedEntries copy = updatedEntries.transfer();
-
- var event = new UpdateEntriesEvent(copy.updatedEntries,
copy.ts);
-
- addToNotifyWatchProcessorEventsBeforeStartingWatches(event);
+
addToNotifyWatchProcessorEventsBeforeStartingWatches(updatedEntries.toNotifyWatchProcessorEvent(rev));
break;
default:
- notifyWatches();
+
updatedEntries.toNotifyWatchProcessorEvent(rev).notify(watchProcessor);
break;
}
}
- private void notifyWatches() {
- UpdatedEntries copy = updatedEntries.transfer();
-
- assert copy.ts != null;
- watchProcessor.notifyWatches(copy.updatedEntries, copy.ts);
- }
-
- private Set<NotifyWatchProcessorEvent>
collectNotifyWatchProcessorEventsFromStorage(long lowerRevision, long
upperRevision) {
+ private Set<UpdateEntriesEvent> collectUpdateEntriesEventsFromStorage(long
lowerRevision, long upperRevision) {
long minWatchRevision = Math.max(lowerRevision,
watchProcessor.minWatchRevision().orElse(-1));
if (minWatchRevision > upperRevision) {
@@ -1186,7 +1175,7 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
var updatedEntries = new ArrayList<Entry>();
HybridTimestamp ts = null;
- var events = new TreeSet<NotifyWatchProcessorEvent>();
+ var events = new TreeSet<UpdateEntriesEvent>();
try (
var upperBound = new Slice(longToBytes(upperRevision + 1));
@@ -1252,6 +1241,40 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
return events;
}
+ private Set<UpdateOnlyRevisionEvent>
collectUpdateRevisionEventsFromStorage(long lowerRevision, long upperRevision) {
+ var events = new TreeSet<UpdateOnlyRevisionEvent>();
+
+ try (
+ var upperBound = new Slice(longToBytes(upperRevision + 1));
+ var options = new
ReadOptions().setIterateUpperBound(upperBound);
+ RocksIterator it = revisionToTs.newIterator(options)
+ ) {
+ it.seek(longToBytes(lowerRevision));
+
+ for (; it.isValid(); it.next()) {
+ byte[] rocksKey = it.key();
+ byte[] rocksValue = it.value();
+
+ long revision = bytesToLong(rocksKey);
+ HybridTimestamp time =
hybridTimestamp(bytesToLong(rocksValue));
+
+ UpdateOnlyRevisionEvent event = new
UpdateOnlyRevisionEvent(revision, time);
+
+ boolean added = events.add(event);
+
+ assert added : event;
+
+ try {
+ it.status();
+ } catch (RocksDBException e) {
+ throw new MetaStorageException(OP_EXECUTION_ERR, e);
+ }
+ }
+ }
+
+ return events;
+ }
+
@Override
public HybridTimestamp timestampByRevision(long revision) {
rwLock.readLock().lock();
@@ -1340,6 +1363,13 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
return transferredValue;
}
+
+ NotifyWatchProcessorEvent toNotifyWatchProcessorEvent(long
newRevision) {
+ UpdatedEntries copy = transfer();
+
+ return copy.updatedEntries.isEmpty() ? new
UpdateOnlyRevisionEvent(newRevision, copy.ts)
+ : new UpdateEntriesEvent(copy.updatedEntries, copy.ts);
+ }
}
@Override
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
index 898027b10e..aaef591009 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
@@ -30,8 +30,10 @@ import static
org.apache.ignite.internal.metastorage.dsl.Operations.remove;
import static
org.apache.ignite.internal.metastorage.server.KeyValueUpdateContext.kvContext;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -79,6 +81,8 @@ import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
/**
* Tests for key-value storage implementations.
@@ -2195,6 +2199,48 @@ public abstract class BasicOperationsKeyValueStorageTest
extends AbstractKeyValu
assertThrows(CompactedException.class, () ->
storage.revisionByTimestamp(hybridTimestamp(2)));
}
+ @ParameterizedTest
+ @EnumSource(UpdateRevisionOperation.class)
+ void
testNotifyUpdateRevisionForOperationAfterStartWatches(UpdateRevisionOperation
updateRevisionOperation) {
+ var revisionUpdateListener = new TestRevisionUpdateListener();
+ storage.registerRevisionUpdateListener(revisionUpdateListener);
+
+ var watchEventHandlingCallback = new TestWatchEventHandlingCallback();
+ storage.startWatches(1, watchEventHandlingCallback);
+
+ storage.put(key(0), keyValue(0, 1), kvContext(hybridTimestamp(10)));
+
+ long revision = storage.revision();
+ long newRevision = revision + 1;
+
+ updateRevisionOperation.execute(storage);
+
+ assertThat(storage.revision(), equalTo(newRevision));
+ assertThat(revisionUpdateListener.get(newRevision), willSucceedFast());
+ assertThat(watchEventHandlingCallback.get(newRevision),
willSucceedFast());
+ }
+
+ @ParameterizedTest
+ @EnumSource(UpdateRevisionOperation.class)
+ void
testNotifyUpdateRevisionForOperationBeforeStartWatches(UpdateRevisionOperation
updateRevisionOperation) {
+ var revisionUpdateListener = new TestRevisionUpdateListener();
+ storage.registerRevisionUpdateListener(revisionUpdateListener);
+
+ storage.put(key(0), keyValue(0, 1), kvContext(hybridTimestamp(10)));
+
+ long revision = storage.revision();
+ long newRevision = revision + 1;
+
+ updateRevisionOperation.execute(storage);
+
+ var watchEventHandlingCallback = new TestWatchEventHandlingCallback();
+ storage.startWatches(1, watchEventHandlingCallback);
+
+ assertThat(storage.revision(), equalTo(newRevision));
+ assertThat(revisionUpdateListener.get(newRevision), willSucceedFast());
+ assertThat(watchEventHandlingCallback.get(newRevision),
willSucceedFast());
+ }
+
private CompletableFuture<Void> watchExact(
byte[] key, long revision, int expectedNumCalls,
BiConsumer<WatchEvent, Integer> testCondition
) {
@@ -2246,17 +2292,7 @@ public abstract class BasicOperationsKeyValueStorageTest
extends AbstractKeyValu
}
});
- storage.startWatches(1, new WatchEventHandlingCallback() {
- @Override
- public void onSafeTimeAdvanced(HybridTimestamp newSafeTime) {
- // No-op.
- }
-
- @Override
- public void onRevisionApplied(long revision) {
- // No-op.
- }
- });
+ storage.startWatches(1, new WatchEventHandlingCallback() {});
return resultFuture;
}
@@ -2315,7 +2351,7 @@ public abstract class BasicOperationsKeyValueStorageTest
extends AbstractKeyValu
assertEquals(List.of(timestamps),
collectTimestamps(storage.getAll(keys, revUpperBound)));
}
- private static CommandId createCommandId() {
+ static CommandId createCommandId() {
return new CommandIdGenerator(UUID::randomUUID).newId();
}
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/Condition.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/TestRevisionUpdateListener.java
similarity index 51%
copy from
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/Condition.java
copy to
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/TestRevisionUpdateListener.java
index 36066d9a37..78e7683b68 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/Condition.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/TestRevisionUpdateListener.java
@@ -17,27 +17,24 @@
package org.apache.ignite.internal.metastorage.server;
-import java.util.Collection;
-import org.apache.ignite.internal.metastorage.Entry;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
-/**
- * Defines interface for boolean condition which could be applied to an array
of entries.
- *
- * @see KeyValueStorage#invoke(Condition, Collection, Collection)
- */
-public interface Condition {
- /**
- * Returns the keys which identifies an entries which condition will be
applied to.
- *
- * @return The keys which identifies an entries which condition will be
applied to.
- */
- byte[][] keys();
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.metastorage.RevisionUpdateListener;
+
+class TestRevisionUpdateListener implements RevisionUpdateListener {
+ private final Map<Long, CompletableFuture<Void>> futureByRevision = new
ConcurrentHashMap<>();
+
+ @Override
+ public CompletableFuture<?> onUpdated(long revision) {
+ get(revision).complete(null);
+
+ return nullCompletedFuture();
+ }
- /**
- * Tests the given entries on condition.
- *
- * @param entries Array of entries which will be tested on the condition.
Can't be {@code null}.
- * @return {@code True} if the given entries satisfies to the condition,
otherwise - {@code false}.
- */
- boolean test(Entry... entries);
+ CompletableFuture<Void> get(long revision) {
+ return futureByRevision.computeIfAbsent(revision, revision0 -> new
CompletableFuture<>());
+ }
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/Condition.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/TestWatchEventHandlingCallback.java
similarity index 51%
copy from
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/Condition.java
copy to
modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/TestWatchEventHandlingCallback.java
index 36066d9a37..9ac22073cd 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/Condition.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/TestWatchEventHandlingCallback.java
@@ -17,27 +17,19 @@
package org.apache.ignite.internal.metastorage.server;
-import java.util.Collection;
-import org.apache.ignite.internal.metastorage.Entry;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
-/**
- * Defines interface for boolean condition which could be applied to an array
of entries.
- *
- * @see KeyValueStorage#invoke(Condition, Collection, Collection)
- */
-public interface Condition {
- /**
- * Returns the keys which identifies an entries which condition will be
applied to.
- *
- * @return The keys which identifies an entries which condition will be
applied to.
- */
- byte[][] keys();
+class TestWatchEventHandlingCallback implements WatchEventHandlingCallback {
+ private final Map<Long, CompletableFuture<Void>> futureByRevision = new
ConcurrentHashMap<>();
+
+ @Override
+ public void onRevisionApplied(long revision) {
+ get(revision).complete(null);
+ }
- /**
- * Tests the given entries on condition.
- *
- * @param entries Array of entries which will be tested on the condition.
Can't be {@code null}.
- * @return {@code True} if the given entries satisfies to the condition,
otherwise - {@code false}.
- */
- boolean test(Entry... entries);
+ CompletableFuture<Void> get(long revision) {
+ return futureByRevision.computeIfAbsent(revision, revision0 -> new
CompletableFuture<>());
+ }
}
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/UpdateRevisionOperation.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/UpdateRevisionOperation.java
new file mode 100644
index 0000000000..3b3402e29c
--- /dev/null
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/UpdateRevisionOperation.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.ops;
+import static
org.apache.ignite.internal.metastorage.server.AbstractKeyValueStorageTest.key;
+import static
org.apache.ignite.internal.metastorage.server.BasicOperationsKeyValueStorageTest.createCommandId;
+import static
org.apache.ignite.internal.metastorage.server.KeyValueUpdateContext.kvContext;
+import static org.apache.ignite.internal.util.ArrayUtils.INT_EMPTY_ARRAY;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.dsl.Operation;
+import org.apache.ignite.internal.metastorage.dsl.Operations;
+import org.apache.ignite.internal.metastorage.server.ExistenceCondition.Type;
+
+/**
+ * Enumeration for testing storage revision change notification. It is
expected that there is already an {@link Entry} with key {@code 0}
+ * in the storage.
+ */
+enum UpdateRevisionOperation {
+ // Simple operations.
+ PUT_NEW(storage -> put(storage, 1)),
+ PUT_ALL_NEW(storage -> putAll(storage, 1)),
+ PUT_EXISTING(storage -> put(storage, 0)),
+ PUT_ALL_EXISTING(storage -> putAll(storage, 0)),
+ REMOVE_EXISTING(storage -> remove(storage, 0)),
+ REMOVE_ALL_EXISTING(storage -> removeAll(storage, 0)),
+ REMOVE_NOT_EXISTING(storage -> remove(storage, 1)),
+ REMOVE_ALL_NOT_EXISTING(storage -> removeAll(storage, 1)),
+ PUT_ALL_EMPTY(storage -> putAll(storage, INT_EMPTY_ARRAY)),
+ REMOVE_ALL_EMPTY(storage -> removeAll(storage, INT_EMPTY_ARRAY)),
+ // Invoke operations.
+ INVOKE_PUT_NEW(storage -> invokeSuccessCondition(storage,
putOperation(1))),
+ INVOKE_IF_PUT_NEW(storage -> invokeIfSuccessCondition(storage,
putOperation(1))),
+ INVOKE_PUT_EXISTING(storage -> invokeSuccessCondition(storage,
putOperation(0))),
+ INVOKE_IF_PUT_EXISTING(storage -> invokeIfSuccessCondition(storage,
putOperation(0))),
+ INVOKE_REMOVE_EXISTING(storage -> invokeSuccessCondition(storage,
removeOperation(0))),
+ INVOKE_IF_REMOVE_EXISTING(storage -> invokeIfSuccessCondition(storage,
removeOperation(0))),
+ INVOKE_REMOVE_NOT_EXISTING(storage -> invokeSuccessCondition(storage,
removeOperation(1))),
+ INVOKE_IF_REMOVE_NOT_EXISTING(storage -> invokeIfSuccessCondition(storage,
removeOperation(1))),
+ INVOKE_NOOP(storage -> invokeSuccessCondition(storage, noop())),
+ INVOKE_IF_NOOP(storage -> invokeIfSuccessCondition(storage, noop()));
+
+ private final Consumer<KeyValueStorage> function;
+
+ UpdateRevisionOperation(Consumer<KeyValueStorage> function) {
+ this.function = function;
+ }
+
+ void execute(KeyValueStorage storage) {
+ Entry entry = storage.get(key(0));
+
+ assertFalse(entry.empty());
+ assertFalse(entry.tombstone());
+
+ function.accept(storage);
+ }
+
+ private static void put(KeyValueStorage storage, int key) {
+ storage.put(key(key), value(1), randomKvContext());
+ }
+
+ private static void putAll(KeyValueStorage storage, int... keys) {
+ storage.putAll(toListKeyByteArray(keys), toListValueByteArray(keys),
randomKvContext());
+ }
+
+ private static void remove(KeyValueStorage storage, int key) {
+ storage.remove(key(key), randomKvContext());
+ }
+
+ private static void removeAll(KeyValueStorage storage, int... keys) {
+ storage.removeAll(toListKeyByteArray(keys), randomKvContext());
+ }
+
+ private static void invokeSuccessCondition(KeyValueStorage storage,
Operation operation) {
+ storage.invoke(
+ new ExistenceCondition(Type.EXISTS, key(0)),
+ List.of(operation),
+ List.of(noop()),
+ randomKvContext(),
+ createCommandId()
+ );
+ }
+
+ private static void invokeIfSuccessCondition(KeyValueStorage storage,
Operation operation) {
+ storage.invoke(
+ new If(
+ new ExistenceCondition(Type.EXISTS, key(0)),
+ new Statement(ops(operation).yield()),
+ new Statement(ops(noop()).yield())
+ ),
+ randomKvContext(),
+ createCommandId()
+ );
+ }
+
+ private static KeyValueUpdateContext randomKvContext() {
+ return kvContext(hybridTimestamp(100_500));
+ }
+
+ private static byte[] value(int value) {
+ return ByteArray.fromString("value=" + value).bytes();
+ }
+
+ private static Operation putOperation(int key) {
+ return Operations.put(keyByteArray(key), value(1));
+ }
+
+ private static Operation removeOperation(int key) {
+ return Operations.remove(keyByteArray(key));
+ }
+
+ private static ByteArray keyByteArray(int key) {
+ return new ByteArray(key(key));
+ }
+
+ private static List<byte[]> toListKeyByteArray(int... keys) {
+ return
IntStream.of(keys).mapToObj(AbstractKeyValueStorageTest::key).collect(toList());
+ }
+
+ private static List<byte[]> toListValueByteArray(int... keys) {
+ return
IntStream.of(keys).mapToObj(UpdateRevisionOperation::value).collect(toList());
+ }
+}
diff --git
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index 4d7e306981..b6e7cfa2bb 100644
---
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -487,6 +487,14 @@ public class SimpleInMemoryKeyValueStorage extends
AbstractKeyValueStorage {
fillNotifyWatchProcessorEventsFromUpdatedEntries();
});
+
+ for (long revision = startRevision; revision <= rev; revision++) {
+ HybridTimestamp time = revToTsMap.get(revision);
+
+ assert time != null : revision;
+
+ notifyWatchProcessorEventsBeforeStartingWatches.add(new
UpdateOnlyRevisionEvent(revision, time));
+ }
}
private void fillNotifyWatchProcessorEventsFromUpdatedEntries() {
@@ -507,20 +515,24 @@ public class SimpleInMemoryKeyValueStorage extends
AbstractKeyValueStorage {
}
private void notifyWatches() {
- if (!areWatchesStarted() || updatedEntries.isEmpty()) {
+ if (!areWatchesStarted()) {
updatedEntries.clear();
return;
}
- long revision = updatedEntries.get(0).revision();
+ long newRevision = rev;
- HybridTimestamp ts = revToTsMap.get(revision);
- assert ts != null : revision;
+ HybridTimestamp ts = revToTsMap.get(newRevision);
+ assert ts != null : newRevision;
- watchProcessor.notifyWatches(List.copyOf(updatedEntries), ts);
+ if (updatedEntries.isEmpty()) {
+ watchProcessor.updateOnlyRevision(newRevision, ts);
+ } else {
+ watchProcessor.notifyWatches(List.copyOf(updatedEntries), ts);
- updatedEntries.clear();
+ updatedEntries.clear();
+ }
}
@Override
@@ -731,7 +743,6 @@ public class SimpleInMemoryKeyValueStorage extends
AbstractKeyValueStorage {
var updatedEntry = new EntryImpl(key, val.tombstone() ? null : bytes,
curRev, val.operationTimestamp());
updatedEntries.add(updatedEntry);
-
}
private void doPutAll(long curRev, List<byte[]> keys, List<byte[]>
bytesList, KeyValueUpdateContext context) {