This is an automated email from the ASF dual-hosted git repository. sdanilov pushed a commit to branch ignite-19532 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 0afca2f44ffce8fa3b1392974c1d7fde6feca51f Author: Semyon Danilov <[email protected]> AuthorDate: Tue May 23 15:34:18 2023 +0400 IGNITE-19532 Happens-before for safe time propagation --- .../ignite/internal/hlc/HybridTimestamp.java | 7 +- .../impl/ItMetaStorageMultipleNodesTest.java | 34 ++++--- .../metastorage/impl/MetaStorageManagerImpl.java | 11 ++- .../server/OnRevisionAppliedCallback.java | 3 +- .../metastorage/server/WatchProcessor.java | 16 +++- .../server/persistence/RocksDbKeyValueStorage.java | 103 ++++++++++++++++++--- .../persistence/StorageColumnFamilyType.java | 4 +- .../server/raft/MetaStorageWriteHandler.java | 8 +- .../server/BasicOperationsKeyValueStorageTest.java | 8 +- .../server/RocksDbKeyValueStorageTest.java | 2 +- .../metastorage/server/WatchProcessorTest.java | 23 ++--- .../server/SimpleInMemoryKeyValueStorage.java | 17 ++-- 12 files changed, 171 insertions(+), 65 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java index 5cdcebdcd5..cfa133ed50 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java @@ -82,7 +82,12 @@ public final class HybridTimestamp implements Comparable<HybridTimestamp>, Seria } } - private HybridTimestamp(long time) { + /** + * The constructor. + * + * @param time Long time value. + */ + public HybridTimestamp(long time) { this.time = time; // Negative time breaks comparison, we don't allow overflow of the physical time. diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java index a817026ba3..68043ee720 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java @@ -37,6 +37,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; @@ -72,9 +73,7 @@ import org.apache.ignite.lang.ByteArray; import org.apache.ignite.lang.NodeStoppingException; import org.apache.ignite.network.ClusterNode; import org.apache.ignite.network.ClusterService; -import org.apache.ignite.network.DefaultMessagingService; import org.apache.ignite.network.NetworkAddress; -import org.apache.ignite.network.NetworkMessage; import org.apache.ignite.network.StaticNodeFinder; import org.apache.ignite.raft.jraft.RaftGroupService; import org.apache.ignite.utils.ClusterServiceTestUtils; @@ -193,16 +192,6 @@ public class ItMetaStorageMultipleNodesTest extends IgniteAbstractTest { .thenCompose(service -> service.refreshMembers(false).thenApply(v -> service.learners())) .thenApply(learners -> learners.stream().map(Peer::consistentId).collect(toSet())); } - - void startDroppingMessagesTo(Node recipient, Class<? extends NetworkMessage> msgType) { - ((DefaultMessagingService) clusterService.messagingService()) - .dropMessages((recipientConsistentId, message) -> - recipient.name().equals(recipientConsistentId) && msgType.isInstance(message)); - } - - void stopDroppingMessages() { - ((DefaultMessagingService) clusterService.messagingService()).stopDroppingMessages(); - } } private final List<Node> nodes = new ArrayList<>(); @@ -364,18 +353,37 @@ public class ItMetaStorageMultipleNodesTest extends IgniteAbstractTest { assertThat(allOf(firstNode.cmgManager.onJoinReady(), secondNode.cmgManager.onJoinReady()), willCompleteSuccessfully()); + CompletableFuture<Void> f = new CompletableFuture<>(); + CountDownLatch l = new CountDownLatch(1); + + secondNode.metaStorageManager.registerExactWatch(ByteArray.fromString("test-key"), new WatchListener() { + @Override + public CompletableFuture<Void> onUpdate(WatchEvent event) { + l.countDown(); + return f; + } + + @Override + public void onError(Throwable e) { + // No-op. + } + }); + // Try putting data from both nodes, because any of them can be a leader. assertThat( firstNode.metaStorageManager.put(ByteArray.fromString("test-key"), new byte[]{0, 1, 2, 3}), willCompleteSuccessfully() ); + assertTrue(l.await(1, TimeUnit.SECONDS)); + f.complete(null); + assertTrue(waitForCondition(() -> { HybridTimestamp sf1 = firstNodeTime.currentSafeTime(); HybridTimestamp sf2 = secondNodeTime.currentSafeTime(); return sf1.equals(sf2); - }, TimeUnit.SECONDS.toMillis(1))); + }, 100)); assertThat( secondNode.metaStorageManager.put(ByteArray.fromString("test-key-2"), new byte[]{0, 1, 2, 3}), diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java index 7bc432a27c..5bf665416d 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java @@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService; import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.metastorage.Entry; @@ -150,7 +151,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager { CompletableFuture<RaftGroupService> raftServiceFuture; try { - RaftNodeDisruptorConfiguration ownFsmCallerExecutorDisruptorConfig = new RaftNodeDisruptorConfiguration("metastorage", 1); + var ownFsmCallerExecutorDisruptorConfig = new RaftNodeDisruptorConfiguration("metastorage", 1); // We need to configure the replication protocol differently whether this node is a synchronous or asynchronous replica. if (metaStorageNodes.contains(thisNodeName)) { @@ -283,7 +284,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager { try { // Meta Storage contract states that all updated entries under a particular revision must be stored in the Vault. - storage.startWatches(this::saveUpdatedEntriesToVault); + storage.startWatches(this::onRevisionApplied); } finally { busyLock.leaveBusy(); } @@ -599,13 +600,17 @@ public class MetaStorageManagerImpl implements MetaStorageManager { /** * Saves processed Meta Storage revision and corresponding entries to the Vault. */ - private CompletableFuture<Void> saveUpdatedEntriesToVault(WatchEvent watchEvent) { + private CompletableFuture<Void> onRevisionApplied(WatchEvent watchEvent, HybridTimestamp time) { + assert time != null; + if (!busyLock.enterBusy()) { LOG.info("Skipping applying MetaStorage revision because the node is stopping"); return completedFuture(null); } + clusterTime.updateSafeTime(time); + try { CompletableFuture<Void> saveToVaultFuture; diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java index 0ce5b39072..d3e20e328b 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.metastorage.server; import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.metastorage.WatchEvent; /** @@ -31,5 +32,5 @@ public interface OnRevisionAppliedCallback { * @param watchEvent Event with modified Meta Storage entries processed at least one Watch. * @return Future that represents the state of the execution of the callback. */ - CompletableFuture<Void> onRevisionApplied(WatchEvent watchEvent); + CompletableFuture<Void> onRevisionApplied(WatchEvent watchEvent, HybridTimestamp time); } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java index aac4330e29..234a14dd08 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java @@ -32,6 +32,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.ignite.internal.close.ManuallyCloseable; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.metastorage.Entry; @@ -118,7 +119,10 @@ public class WatchProcessor implements ManuallyCloseable { /** * Notifies registered watch about an update event. */ - public void notifyWatches(List<Entry> updatedEntries) { + @SuppressWarnings("unchecked") + public void notifyWatches(List<Entry> updatedEntries, HybridTimestamp time) { + assert time != null; + notificationFuture = notificationFuture .thenComposeAsync(v -> { // Revision must be the same for all entries. @@ -130,7 +134,7 @@ public class WatchProcessor implements ManuallyCloseable { .toArray(CompletableFuture[]::new); return allOf(notificationFutures) - .thenComposeAsync(ignored -> invokeOnRevisionCallback(notificationFutures, newRevision), watchExecutor); + .thenComposeAsync(ignored -> invokeOnRevisionCallback(notificationFutures, newRevision, time), watchExecutor); }, watchExecutor); } @@ -179,7 +183,11 @@ public class WatchProcessor implements ManuallyCloseable { }); } - private CompletableFuture<Void> invokeOnRevisionCallback(CompletableFuture<List<EntryEvent>>[] notificationFutures, long revision) { + private CompletableFuture<Void> invokeOnRevisionCallback( + CompletableFuture<List<EntryEvent>>[] notificationFutures, + long revision, + HybridTimestamp time + ) { try { // Only notify about entries that have been accepted by at least one Watch. var acceptedEntries = new HashSet<EntryEvent>(); @@ -193,7 +201,7 @@ public class WatchProcessor implements ManuallyCloseable { var event = new WatchEvent(acceptedEntries, revision); - return revisionCallback.onRevisionApplied(event) + return revisionCallback.onRevisionApplied(event, time) .whenComplete((ignored, e) -> { if (e != null) { LOG.error("Error occurred when notifying watches", e); diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java index 0d439a0b24..cc667908e3 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java @@ -30,6 +30,7 @@ import static org.apache.ignite.internal.metastorage.server.persistence.RocksSto import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.valueToBytes; import static org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.DATA; import static org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.INDEX; +import static org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.REVISION_TO_TS; import static org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.TS_TO_REVISION; import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix; import static org.apache.ignite.internal.rocksdb.snapshot.ColumnFamilyRange.fullRange; @@ -47,6 +48,7 @@ import java.util.Collection; import java.util.Comparator; import java.util.List; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.TreeSet; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -156,6 +158,9 @@ public class RocksDbKeyValueStorage implements KeyValueStorage { /** Timestamp to revision mapping column family. */ private volatile ColumnFamily tsToRevision; + /** Revision to timestamp mapping column family. */ + private volatile ColumnFamily revisionToTs; + /** Snapshot manager. */ private volatile RocksSnapshotManager snapshotManager; @@ -197,14 +202,14 @@ public class RocksDbKeyValueStorage implements KeyValueStorage { * <p>Multi-threaded access is guarded by {@link #rwLock}. */ @Nullable - private List<List<Entry>> eventCache; + private List<UpdatedEntries> eventCache; /** * Current list of updated entries. * * <p>Since this list gets read and updated only on writes (under a write lock), no extra synchronisation is needed. */ - private final List<Entry> updatedEntries = new ArrayList<>(); + private final UpdatedEntries updatedEntries = new UpdatedEntries(); /** * Constructor. @@ -242,10 +247,14 @@ public class RocksDbKeyValueStorage implements KeyValueStorage { Options tsToRevOptions = new Options().setCreateIfMissing(true); ColumnFamilyOptions tsToRevFamilyOptions = new ColumnFamilyOptions(tsToRevOptions); + Options revToTsOptions = new Options().setCreateIfMissing(true); + ColumnFamilyOptions revToTsFamilyOptions = new ColumnFamilyOptions(revToTsOptions); + return List.of( new ColumnFamilyDescriptor(DATA.nameAsBytes(), dataFamilyOptions), new ColumnFamilyDescriptor(INDEX.nameAsBytes(), indexFamilyOptions), - new ColumnFamilyDescriptor(TS_TO_REVISION.nameAsBytes(), tsToRevFamilyOptions) + new ColumnFamilyDescriptor(TS_TO_REVISION.nameAsBytes(), tsToRevFamilyOptions), + new ColumnFamilyDescriptor(REVISION_TO_TS.nameAsBytes(), revToTsFamilyOptions) ); } @@ -254,7 +263,7 @@ public class RocksDbKeyValueStorage implements KeyValueStorage { List<ColumnFamilyDescriptor> descriptors = cfDescriptors(); - assert descriptors.size() == 3; + assert descriptors.size() == 4; var handles = new ArrayList<ColumnFamilyHandle>(descriptors.size()); @@ -270,8 +279,10 @@ public class RocksDbKeyValueStorage implements KeyValueStorage { tsToRevision = ColumnFamily.wrap(db, handles.get(2)); + revisionToTs = ColumnFamily.wrap(db, handles.get(3)); + snapshotManager = new RocksSnapshotManager(db, - List.of(fullRange(data), fullRange(index), fullRange(tsToRevision)), + List.of(fullRange(data), fullRange(index), fullRange(tsToRevision), fullRange(revisionToTs)), snapshotExecutor ); } @@ -413,7 +424,10 @@ public class RocksDbKeyValueStorage implements KeyValueStorage { data.put(batch, REVISION_KEY, revisionBytes); if (ts != null) { - tsToRevision.put(batch, hybridTsToArray(ts), revisionBytes); + byte[] tsBytes = hybridTsToArray(ts); + + tsToRevision.put(batch, tsBytes, revisionBytes); + revisionToTs.put(batch, revisionBytes, tsBytes); } db.write(opts, batch); @@ -422,6 +436,8 @@ public class RocksDbKeyValueStorage implements KeyValueStorage { updCntr = newCntr; } + updatedEntries.ts = ts; + queueWatchEvent(); } @@ -1329,7 +1345,7 @@ public class RocksDbKeyValueStorage implements KeyValueStorage { eventCache = new ArrayList<>(); } - eventCache.add(List.copyOf(updatedEntries)); + eventCache.add(updatedEntries.copy()); updatedEntries.clear(); @@ -1343,7 +1359,11 @@ public class RocksDbKeyValueStorage implements KeyValueStorage { } private void notifyWatches() { - watchProcessor.notifyWatches(List.copyOf(updatedEntries)); + UpdatedEntries copy = updatedEntries.copy(); + + assert copy.ts != null; + + watchProcessor.notifyWatches(copy.updatedEntries, copy.ts); updatedEntries.clear(); } @@ -1359,6 +1379,7 @@ public class RocksDbKeyValueStorage implements KeyValueStorage { } var updatedEntries = new ArrayList<Entry>(); + HybridTimestamp ts = null; try ( var upperBound = new Slice(longToBytes(upperRevision + 1)); @@ -1379,7 +1400,9 @@ public class RocksDbKeyValueStorage implements KeyValueStorage { if (!updatedEntries.isEmpty()) { var updatedEntriesCopy = List.copyOf(updatedEntries); - watchProcessor.notifyWatches(updatedEntriesCopy); + assert ts != null; + + watchProcessor.notifyWatches(updatedEntriesCopy, ts); updatedEntries.clear(); } @@ -1387,6 +1410,10 @@ public class RocksDbKeyValueStorage implements KeyValueStorage { lastSeenRevision = revision; } + if (ts == null) { + ts = timestampByRevision(revision); + } + updatedEntries.add(entry(rocksKeyToBytes(rocksKey), revision, bytesToValue(rocksValue))); } @@ -1394,13 +1421,27 @@ public class RocksDbKeyValueStorage implements KeyValueStorage { // Notify about the events left after finishing the cycle above. if (!updatedEntries.isEmpty()) { - watchProcessor.notifyWatches(updatedEntries); + assert ts != null; + + watchProcessor.notifyWatches(updatedEntries, ts); } } finishReplay(); } + private HybridTimestamp timestampByRevision(long revision) { + try { + byte[] tsBytes = revisionToTs.get(longToBytes(revision)); + + assert tsBytes != null; + + return new HybridTimestamp(bytesToLong(tsBytes)); + } catch (RocksDBException e) { + throw new RuntimeException(e); + } + } + private void finishReplay() { // Take the lock to drain the event cache and prevent new events from being cached. Since event notification is asynchronous, // this lock shouldn't be held for long. @@ -1408,7 +1449,11 @@ public class RocksDbKeyValueStorage implements KeyValueStorage { try { if (eventCache != null) { - eventCache.forEach(watchProcessor::notifyWatches); + eventCache.forEach(entries -> { + assert entries.ts != null; + + watchProcessor.notifyWatches(entries.updatedEntries, entries.ts); + }); eventCache = null; } @@ -1423,4 +1468,40 @@ public class RocksDbKeyValueStorage implements KeyValueStorage { public Path getDbPath() { return dbPath; } + + private static class UpdatedEntries { + private final List<Entry> updatedEntries; + + @Nullable + private HybridTimestamp ts; + + public UpdatedEntries() { + this.updatedEntries = new ArrayList<>(); + } + + private UpdatedEntries(List<Entry> updatedEntries, HybridTimestamp ts) { + this.updatedEntries = updatedEntries; + this.ts = Objects.requireNonNull(ts); + } + + public boolean isEmpty() { + return updatedEntries.isEmpty(); + } + + public boolean add(Entry entry) { + return updatedEntries.add(entry); + } + + public void clear() { + updatedEntries.clear(); + + ts = null; + } + + public UpdatedEntries copy() { + assert ts != null; + + return new UpdatedEntries(new ArrayList<>(updatedEntries), ts); + } + } } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/StorageColumnFamilyType.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/StorageColumnFamilyType.java index 958b73fb88..d1d92b99c9 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/StorageColumnFamilyType.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/StorageColumnFamilyType.java @@ -31,7 +31,9 @@ enum StorageColumnFamilyType { INDEX("INDEX".getBytes(StandardCharsets.UTF_8)), /** Column family for the timestamp to revision mapping. */ - TS_TO_REVISION("TSTOREV".getBytes(StandardCharsets.UTF_8)); + TS_TO_REVISION("TSTOREV".getBytes(StandardCharsets.UTF_8)), + + REVISION_TO_TS("REVTOTTS".getBytes(StandardCharsets.UTF_8)); /** Byte representation of the column family's name. */ private final byte[] nameAsBytes; diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java index ca80ffc46a..6354ece930 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java @@ -88,13 +88,9 @@ class MetaStorageWriteHandler { safeTime = cmdWithTime.safeTime(); handleWriteWithTime(clo, cmdWithTime, safeTime); - - // Every MetaStorageWriteCommand holds safe time that we should set as the cluster time. - clusterTime.updateSafeTime(safeTime); } else if (command instanceof SyncTimeCommand) { - clusterTime.updateSafeTime(((SyncTimeCommand) command).safeTime()); - - clo.result(null); + // TODO: IGNITE-19199 WatchProcessor must be notified of the new safe time. + throw new UnsupportedOperationException("https://issues.apache.org/jira/browse/IGNITE-19199"); } else { assert false : "Command was not found [cmd=" + command + ']'; } diff --git a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java index 27033f9685..fd2318d8ff 100644 --- a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java +++ b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java @@ -1887,7 +1887,7 @@ public abstract class BasicOperationsKeyValueStorageTest extends AbstractKeyValu long appliedRevision = storage.revision(); - storage.startWatches(event -> completedFuture(null)); + storage.startWatches((event, ts) -> completedFuture(null)); CompletableFuture<byte[]> fut = new CompletableFuture<>(); @@ -2226,7 +2226,7 @@ public abstract class BasicOperationsKeyValueStorageTest extends AbstractKeyValu OnRevisionAppliedCallback mockCallback = mock(OnRevisionAppliedCallback.class); - when(mockCallback.onRevisionApplied(any())).thenReturn(completedFuture(null)); + when(mockCallback.onRevisionApplied(any(), any())).thenReturn(completedFuture(null)); storage.startWatches(mockCallback); @@ -2238,7 +2238,7 @@ public abstract class BasicOperationsKeyValueStorageTest extends AbstractKeyValu verify(mockListener3, timeout(10_000)).onUpdate(any()); - verify(mockCallback, never()).onRevisionApplied(any()); + verify(mockCallback, never()).onRevisionApplied(any(), any()); } @Test @@ -2423,7 +2423,7 @@ public abstract class BasicOperationsKeyValueStorageTest extends AbstractKeyValu } }); - storage.startWatches(event -> completedFuture(null)); + storage.startWatches((event, ts) -> completedFuture(null)); return resultFuture; } diff --git a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java index 31baeb0f31..ff02241d07 100644 --- a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java +++ b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java @@ -102,7 +102,7 @@ public class RocksDbKeyValueStorageTest extends BasicOperationsKeyValueStorageTe } }); - storage.startWatches(event -> CompletableFuture.completedFuture(null)); + storage.startWatches((event, ts) -> CompletableFuture.completedFuture(null)); storage.restoreSnapshot(snapshotPath); diff --git a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java index 1e64ecd803..346da4712a 100644 --- a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java +++ b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java @@ -34,6 +34,7 @@ import static org.mockito.Mockito.when; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.metastorage.Entry; import org.apache.ignite.internal.metastorage.EntryEvent; import org.apache.ignite.internal.metastorage.WatchEvent; @@ -55,7 +56,7 @@ public class WatchProcessorTest { @BeforeEach void setUp() { - when(revisionCallback.onRevisionApplied(any())).thenReturn(completedFuture(null)); + when(revisionCallback.onRevisionApplied(any(), any())).thenReturn(completedFuture(null)); watchProcessor.setRevisionCallback(revisionCallback); } @@ -79,7 +80,7 @@ public class WatchProcessorTest { var entry1 = new EntryImpl("foo".getBytes(UTF_8), null, 1, 0); var entry2 = new EntryImpl("bar".getBytes(UTF_8), null, 1, 0); - watchProcessor.notifyWatches(List.of(entry1, entry2)); + watchProcessor.notifyWatches(List.of(entry1, entry2), HybridTimestamp.MAX_VALUE); var entryEvent1 = new EntryEvent(oldEntry(entry1), entry1); var entryEvent2 = new EntryEvent(oldEntry(entry2), entry2); @@ -92,7 +93,7 @@ public class WatchProcessorTest { var watchEventCaptor = ArgumentCaptor.forClass(WatchEvent.class); - verify(revisionCallback, timeout(1_000)).onRevisionApplied(watchEventCaptor.capture()); + verify(revisionCallback, timeout(1_000)).onRevisionApplied(watchEventCaptor.capture(), any()); WatchEvent event = watchEventCaptor.getValue(); @@ -114,23 +115,23 @@ public class WatchProcessorTest { var entry1 = new EntryImpl("foo".getBytes(UTF_8), null, 1, 0); var entry2 = new EntryImpl("bar".getBytes(UTF_8), null, 2, 0); - watchProcessor.notifyWatches(List.of(entry1)); + watchProcessor.notifyWatches(List.of(entry1), HybridTimestamp.MAX_VALUE); var event = new WatchEvent(new EntryEvent(oldEntry(entry1), entry1)); verify(listener1, timeout(1_000)).onUpdate(event); verify(listener2, timeout(1_000)).onRevisionUpdated(1); - verify(revisionCallback, timeout(1_000)).onRevisionApplied(event); + verify(revisionCallback, timeout(1_000)).onRevisionApplied(event, any()); - watchProcessor.notifyWatches(List.of(entry2)); + watchProcessor.notifyWatches(List.of(entry2), HybridTimestamp.MAX_VALUE); event = new WatchEvent(new EntryEvent(oldEntry(entry2), entry2)); verify(listener1, timeout(1_000)).onRevisionUpdated(2); verify(listener2, timeout(1_000)).onUpdate(event); - verify(revisionCallback, timeout(1_000)).onRevisionApplied(event); + verify(revisionCallback, timeout(1_000)).onRevisionApplied(event, any()); } /** @@ -150,13 +151,13 @@ public class WatchProcessorTest { var entry1 = new EntryImpl("foo".getBytes(UTF_8), null, 1, 0); var entry2 = new EntryImpl("bar".getBytes(UTF_8), null, 1, 0); - watchProcessor.notifyWatches(List.of(entry1, entry2)); + watchProcessor.notifyWatches(List.of(entry1, entry2), HybridTimestamp.MAX_VALUE); verify(listener1, timeout(1_000)).onUpdate(new WatchEvent(new EntryEvent(oldEntry(entry1), entry1))); verify(listener2, timeout(1_000)).onUpdate(new WatchEvent(new EntryEvent(oldEntry(entry2), entry2))); verify(listener2, timeout(1_000)).onError(any(IllegalStateException.class)); - verify(revisionCallback, never()).onRevisionApplied(any()); + verify(revisionCallback, never()).onRevisionApplied(any(), any()); } /** @@ -182,7 +183,7 @@ public class WatchProcessorTest { var entry1 = new EntryImpl("foo".getBytes(UTF_8), null, 1, 0); var entry2 = new EntryImpl("bar".getBytes(UTF_8), null, 1, 0); - watchProcessor.notifyWatches(List.of(entry1, entry2)); + watchProcessor.notifyWatches(List.of(entry1, entry2), HybridTimestamp.MAX_VALUE); verify(listener1, timeout(1_000)).onUpdate(new WatchEvent(new EntryEvent(oldEntry(entry1), entry1))); verify(listener2, timeout(1_000)).onUpdate(new WatchEvent(new EntryEvent(oldEntry(entry2), entry2))); @@ -190,7 +191,7 @@ public class WatchProcessorTest { var entry3 = new EntryImpl("foo".getBytes(UTF_8), null, 2, 0); var entry4 = new EntryImpl("bar".getBytes(UTF_8), null, 2, 0); - watchProcessor.notifyWatches(List.of(entry3, entry4)); + watchProcessor.notifyWatches(List.of(entry3, entry4), HybridTimestamp.MAX_VALUE); verify(listener1, never()).onUpdate(new WatchEvent(new EntryEvent(oldEntry(entry3), entry3))); verify(listener2, never()).onUpdate(new WatchEvent(new EntryEvent(oldEntry(entry4), entry4))); diff --git a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java index 2096e9ab4e..6f2c404191 100644 --- a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java +++ b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java @@ -64,6 +64,9 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage { /** Timestamp to revision mapping. */ private final NavigableMap<Long, Long> tsToRevMap = new TreeMap<>(); + /** Revision to timestamp mapping. */ + private final NavigableMap<Long, Long> revToTsMap = new TreeMap<>(); + /** Revisions index. Value contains all entries which were modified under particular revision. */ private NavigableMap<Long, NavigableMap<byte[], Value>> revsIdx = new TreeMap<>(); @@ -120,6 +123,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage { rev = newRevision; tsToRevMap.put(ts.longValue(), rev); + revToTsMap.put(rev, ts.longValue()); notifyWatches(); } @@ -468,7 +472,10 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage { return; } - watchProcessor.notifyWatches(List.copyOf(updatedEntries)); + Long tsLong = revToTsMap.get(updatedEntries.get(0).revision()); + assert tsLong != null; + + watchProcessor.notifyWatches(List.copyOf(updatedEntries), new HybridTimestamp(tsLong)); updatedEntries.clear(); } @@ -748,12 +755,4 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage { private static long lastRevision(List<Long> revs) { return revs.get(revs.size() - 1); } - - private static List<Long> listOf(long val) { - List<Long> res = new ArrayList<>(); - - res.add(val); - - return res; - } }
