This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 9852c3b4b9 IGNITE-23356 Adapt components that use get metastorage 
revision by timestamp to compaction (#4506)
9852c3b4b9 is described below

commit 9852c3b4b98cd8f15e7e4f3f1ccbfb93d6aefd35
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Mon Oct 7 06:40:38 2024 +0300

    IGNITE-23356 Adapt components that use get metastorage revision by 
timestamp to compaction (#4506)
---
 .../internal/catalog/storage/UpdateLogImpl.java    |  4 +-
 .../impl/ItIdempotentCommandCacheTest.java         | 60 +++++++++++++++++++-
 .../server/persistence/RocksDbKeyValueStorage.java |  9 +--
 .../server/persistence/RocksStorageUtils.java      | 10 +++-
 .../server/raft/MetaStorageWriteHandler.java       | 66 ++++++++++++++--------
 .../disaster/DisasterRecoveryManager.java          |  4 +-
 .../disaster/DisasterRecoveryRequest.java          |  4 +-
 .../disaster/ManualGroupRestartRequest.java        |  3 +-
 .../disaster/ManualGroupUpdateRequest.java         |  6 +-
 9 files changed, 124 insertions(+), 42 deletions(-)

diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
index 81760cba24..3e083a5a64 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
@@ -262,9 +262,7 @@ public class UpdateLogImpl implements UpdateLog {
 
             UpdateLogEvent update = 
marshaller.unmarshall(Objects.requireNonNull(entry.value()));
 
-            long revision = entry.revision();
-
-            handler.handle(update, 
metastore.timestampByRevisionLocally(revision), revision);
+            handler.handle(update, entry.timestamp(), entry.revision());
         }
     }
 
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
index a82bee09ee..984e3efdeb 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
@@ -17,23 +17,31 @@
 
 package org.apache.ignite.internal.metastorage.impl;
 
+import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toSet;
 import static 
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.ops;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
 import static org.apache.ignite.internal.metastorage.dsl.Statements.iif;
+import static 
org.apache.ignite.internal.metastorage.server.raft.MetaStorageWriteHandler.IDEMPOTENT_COMMAND_PREFIX_BYTES;
 import static 
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.clusterService;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
 import static org.apache.ignite.internal.util.IgniteUtils.startAsync;
 import static org.apache.ignite.internal.util.IgniteUtils.stopAsync;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasSize;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.nio.charset.StandardCharsets;
@@ -48,6 +56,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiPredicate;
 import java.util.stream.Stream;
+import org.apache.ignite.internal.causality.CompletableVersionedValue;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
 import org.apache.ignite.internal.configuration.ComponentWorkingDir;
@@ -203,10 +212,11 @@ public class ItIdempotentCommandCacheTest extends 
IgniteAbstractTest {
             RaftGroupOptionsConfigurer msRaftConfigurer =
                     
RaftGroupOptionsConfigHelper.configureProperties(msLogStorageFactory, 
metastorageWorkDir.metaPath());
 
-            storage = new RocksDbKeyValueStorage(
+            storage = spy(new RocksDbKeyValueStorage(
                     clusterService.nodeName(),
                     metastorageWorkDir.dbPath(),
-                    new NoOpFailureManager());
+                    new NoOpFailureManager()
+            ));
 
             metaStorageManager = new MetaStorageManagerImpl(
                     clusterService,
@@ -455,6 +465,46 @@ public class ItIdempotentCommandCacheTest extends 
IgniteAbstractTest {
         }
     }
 
+    @Test
+    void testEvictIdempotentCommandsCacheAfterCompaction() {
+        RaftGroupService raftClient = raftClient();
+        Node leader = leader(raftClient);
+
+        // To wait, the metastore revision increase.
+        var versionedValue = new CompletableVersionedValue<Void>();
+        leader.metaStorageManager.registerRevisionUpdateListener(revision -> {
+            versionedValue.complete(revision);
+
+            return nullCompletedFuture();
+        });
+
+        long revisionBeforeRunCommands = leader.storage.revision();
+
+        assertThat(raftClient.run(buildKeyNotExistsInvokeCommand(TEST_KEY, 
TEST_VALUE, ANOTHER_VALUE)), willCompleteSuccessfully());
+        assertThat(raftClient.run(buildKeyNotExistsInvokeCommand(TEST_KEY, 
TEST_VALUE, ANOTHER_VALUE)), willCompleteSuccessfully());
+
+        assertThat(versionedValue.get(revisionBeforeRunCommands + 2), 
willCompleteSuccessfully());
+
+        HybridTimestamp compactionTimestamp = 
leader.storage.timestampByRevision(revisionBeforeRunCommands + 1);
+        leader.storage.compact(revisionBeforeRunCommands + 1);
+
+        assertThat(collectIdempotentCommands(leader.storage), hasSize(2));
+        reset(leader.storage);
+
+        long revisionBeforeEvict = leader.storage.revision();
+
+        
assertThat(leader.metaStorageManager.evictIdempotentCommandsCache(compactionTimestamp),
 willCompleteSuccessfully());
+        assertThat(versionedValue.get(revisionBeforeEvict + 1), 
willCompleteSuccessfully());
+
+        byte[] keyFrom = IDEMPOTENT_COMMAND_PREFIX_BYTES;
+        byte[] keyTo = leader.storage.nextKey(keyFrom);
+
+        verify(leader.storage).revisionByTimestamp(eq(compactionTimestamp));
+        verify(leader.storage).range(eq(keyFrom), eq(keyTo));
+
+        assertThat(collectIdempotentCommands(leader.storage), hasSize(1));
+    }
+
     private Node leader(RaftGroupService raftClient) {
         CompletableFuture<Void> refreshLeaderFut = raftClient.refreshLeader();
 
@@ -548,4 +598,10 @@ public class ItIdempotentCommandCacheTest extends 
IgniteAbstractTest {
 
         nodes.forEach(Node::deployWatches);
     }
+
+    private List<Entry> collectIdempotentCommands(KeyValueStorage storage) {
+        return storage.range(IDEMPOTENT_COMMAND_PREFIX_BYTES, 
storage.nextKey(IDEMPOTENT_COMMAND_PREFIX_BYTES)).stream()
+                .filter(entry -> !entry.empty() && !entry.tombstone())
+                .collect(toList());
+    }
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index 4888ec09bd..8419e5d13f 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -34,6 +34,7 @@ import static 
org.apache.ignite.internal.metastorage.server.persistence.RocksSto
 import static 
org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.longsToBytes;
 import static 
org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.revisionFromRocksKey;
 import static 
org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.rocksKeyToBytes;
+import static 
org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.timestampFromRocksValue;
 import static 
org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.valueToBytes;
 import static 
org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.DATA;
 import static 
org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.INDEX;
@@ -1381,15 +1382,15 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
 
                 if (revision != lastSeenRevision) {
                     if (!updatedEntries.isEmpty()) {
-                        var updatedEntriesCopy = List.copyOf(updatedEntries);
+                        List<Entry> updatedEntriesCopy = 
List.copyOf(updatedEntries);
 
-                        assert ts != null;
+                        assert ts != null : revision;
 
                         watchProcessor.notifyWatches(updatedEntriesCopy, ts);
 
                         updatedEntries.clear();
 
-                        ts = timestampByRevision(revision);
+                        ts = 
hybridTimestamp(timestampFromRocksValue(rocksValue));
                     }
 
                     lastSeenRevision = revision;
@@ -1397,7 +1398,7 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
 
                 if (ts == null) {
                     // This will only execute on first iteration.
-                    ts = timestampByRevision(revision);
+                    ts = hybridTimestamp(timestampFromRocksValue(rocksValue));
                 }
 
                 updatedEntries.add(entry(rocksKeyToBytes(rocksKey), revision, 
bytesToValue(rocksValue)));
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksStorageUtils.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksStorageUtils.java
index d4e19e1333..8fd4e7844f 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksStorageUtils.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksStorageUtils.java
@@ -125,12 +125,20 @@ class RocksStorageUtils {
      * Gets a revision from a key with a revision.
      *
      * @param rocksKey Key with a revision.
-     * @return Revision.
      */
     static long revisionFromRocksKey(byte[] rocksKey) {
         return (long) LONG_ARRAY_HANDLE.get(rocksKey, 0);
     }
 
+    /**
+     * Gets a operation timestamp from a value bytes.
+     *
+     * @param rocksValue Value bytes with a operation timestamp.
+     */
+    static long timestampFromRocksValue(byte[] rocksValue) {
+        return (long) LONG_ARRAY_HANDLE.get(rocksValue, 0);
+    }
+
     /** Converts from a byte array to a {@link Value}. */
     static Value bytesToValue(byte[] valueBytes) {
         // At least an 8-bytes operation timestamp and a 1-byte boolean.
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
index 93e1c7f27e..bba9fafb5e 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Stream;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.logger.IgniteLogger;
@@ -52,6 +53,7 @@ import 
org.apache.ignite.internal.metastorage.dsl.MetaStorageMessagesFactory;
 import org.apache.ignite.internal.metastorage.dsl.SimpleCondition;
 import org.apache.ignite.internal.metastorage.dsl.Statement.IfStatement;
 import org.apache.ignite.internal.metastorage.dsl.Statement.UpdateStatement;
+import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
 import org.apache.ignite.internal.metastorage.server.AndCondition;
 import org.apache.ignite.internal.metastorage.server.Condition;
 import org.apache.ignite.internal.metastorage.server.ExistenceCondition;
@@ -380,35 +382,28 @@ public class MetaStorageWriteHandler {
     void evictIdempotentCommandsCache(HybridTimestamp evictionTimestamp, 
HybridTimestamp operationTimestamp) {
         LOG.info("Idempotent command cache cleanup started 
[evictionTimestamp={}].", evictionTimestamp);
 
-        long obsoleteRevision = storage.revisionByTimestamp(evictionTimestamp);
+        List<byte[]> evictionCandidateKeys = 
collectEvictionCandidateKeys(evictionTimestamp);
 
-        if (obsoleteRevision != -1) {
-            byte[] keyFrom = IDEMPOTENT_COMMAND_PREFIX_BYTES;
-            byte[] keyTo = storage.nextKey(IDEMPOTENT_COMMAND_PREFIX_BYTES);
-
-            List<byte[]> evictionCandidateKeys = storage.range(keyFrom, keyTo, 
obsoleteRevision).stream()
-                    // Not sure whether it's possible to retrieve empty entry 
here, thus !entry.empty() was added just in case.
-                    .filter(entry -> !entry.tombstone() && !entry.empty())
-                    .map(Entry::key)
-                    .collect(toList());
+        if (evictionCandidateKeys.isEmpty()) {
+            return;
+        }
 
-            evictionCandidateKeys.forEach(evictionCandidateKeyBytes -> {
-                CommandId commandId = CommandId.fromString(
-                        
ByteUtils.stringFromBytes(evictionCandidateKeyBytes).substring(IDEMPOTENT_COMMAND_PREFIX.length()));
+        evictionCandidateKeys.forEach(evictionCandidateKeyBytes -> {
+            CommandId commandId = CommandId.fromString(
+                    
ByteUtils.stringFromBytes(evictionCandidateKeyBytes).substring(IDEMPOTENT_COMMAND_PREFIX.length()));
 
-                idempotentCommandCache.remove(commandId);
-            });
+            idempotentCommandCache.remove(commandId);
+        });
 
-            storage.removeAll(evictionCandidateKeys, operationTimestamp);
+        storage.removeAll(evictionCandidateKeys, operationTimestamp);
 
-            LOG.info("Idempotent command cache cleanup finished 
[evictionTimestamp={}, cleanupCompletionTimestamp={},"
-                            + " removedEntriesCount={}, cacheSize={}].",
-                    evictionTimestamp,
-                    clusterTime.now(),
-                    evictionCandidateKeys.size(),
-                    idempotentCommandCache.size()
-            );
-        }
+        LOG.info("Idempotent command cache cleanup finished 
[evictionTimestamp={}, cleanupCompletionTimestamp={},"
+                        + " removedEntriesCount={}, cacheSize={}].",
+                evictionTimestamp,
+                clusterTime.now(),
+                evictionCandidateKeys.size(),
+                idempotentCommandCache.size()
+        );
     }
 
     private class ResultCachingClosure implements CommandClosure<WriteCommand> 
{
@@ -447,4 +442,27 @@ public class MetaStorageWriteHandler {
             closure.result(res);
         }
     }
+
+    private List<byte[]> collectEvictionCandidateKeys(HybridTimestamp 
evictionTimestamp) {
+        byte[] keyFrom = IDEMPOTENT_COMMAND_PREFIX_BYTES;
+        byte[] keyTo = storage.nextKey(IDEMPOTENT_COMMAND_PREFIX_BYTES);
+
+        Stream<Entry> entryStream;
+
+        try {
+            long obsoleteRevision = 
storage.revisionByTimestamp(evictionTimestamp);
+
+            entryStream = storage.range(keyFrom, keyTo, 
obsoleteRevision).stream()
+                    // Not sure whether it's possible to retrieve empty entry 
here, thus !entry.empty() was added just in case.
+                    .filter(entry -> !entry.tombstone() && !entry.empty());
+        } catch (CompactedException e) {
+            // Fallback to scanning with time timestamp filtering.
+            entryStream = storage.range(keyFrom, keyTo).stream()
+                    // Not sure whether it's possible to retrieve empty entry 
here, thus !entry.empty() was added just in case.
+                    .filter(entry -> !entry.tombstone() && !entry.empty())
+                    .filter(entry -> 
entry.timestamp().compareTo(evictionTimestamp) <= 0);
+        }
+
+        return entryStream.map(Entry::key).collect(toList());
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
index 83899f4e40..a637a89c14 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
@@ -522,11 +522,11 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
                     return;
                 }
 
-                request.handle(this, 
watchEvent.revision()).whenComplete(copyStateTo(operationFuture));
+                request.handle(this, watchEvent.revision(), 
watchEvent.timestamp()).whenComplete(copyStateTo(operationFuture));
 
                 break;
             case MULTI_NODE:
-                CompletableFuture<Void> handleFuture = request.handle(this, 
watchEvent.revision());
+                CompletableFuture<Void> handleFuture = request.handle(this, 
watchEvent.revision(), watchEvent.timestamp());
 
                 if (operationFuture == null) {
                     // We're not the initiator, or timeout has passed.
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryRequest.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryRequest.java
index d2bffe0e2a..592a7f39f7 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryRequest.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryRequest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.table.distributed.disaster;
 import java.io.Serializable;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 
 /**
  * General interface for disaster recovery requests.
@@ -43,7 +44,8 @@ interface DisasterRecoveryRequest extends Serializable {
      *
      * @param disasterRecoveryManager Disaster recovery manager.
      * @param revision Revision of the {@link 
DisasterRecoveryManager#RECOVERY_TRIGGER_KEY} update.
+     * @param timestamp Timestamp of {@link 
DisasterRecoveryManager#RECOVERY_TRIGGER_KEY} update operation.
      * @return New operation future, that completes when operation is 
completed.
      */
-    CompletableFuture<Void> handle(DisasterRecoveryManager 
disasterRecoveryManager, long revision);
+    CompletableFuture<Void> handle(DisasterRecoveryManager 
disasterRecoveryManager, long revision, HybridTimestamp timestamp);
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
index 235ea2e017..eba6226b98 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.tostring.S;
 
@@ -87,7 +88,7 @@ class ManualGroupRestartRequest implements 
DisasterRecoveryRequest {
     }
 
     @Override
-    public CompletableFuture<Void> handle(DisasterRecoveryManager 
disasterRecoveryManager, long revision) {
+    public CompletableFuture<Void> handle(DisasterRecoveryManager 
disasterRecoveryManager, long revision, HybridTimestamp timestamp) {
         if (!nodeNames.isEmpty() && 
!nodeNames.contains(disasterRecoveryManager.localNode().name())) {
             return nullCompletedFuture();
         }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupUpdateRequest.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupUpdateRequest.java
index 9f4058ac5b..7d0aeaf81b 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupUpdateRequest.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupUpdateRequest.java
@@ -126,10 +126,8 @@ class ManualGroupUpdateRequest implements 
DisasterRecoveryRequest {
     }
 
     @Override
-    public CompletableFuture<Void> handle(DisasterRecoveryManager 
disasterRecoveryManager, long msRevision) {
-        HybridTimestamp msSafeTime = 
disasterRecoveryManager.metaStorageManager.timestampByRevisionLocally(msRevision);
-
-        int catalogVersion = 
disasterRecoveryManager.catalogManager.activeCatalogVersion(msSafeTime.longValue());
+    public CompletableFuture<Void> handle(DisasterRecoveryManager 
disasterRecoveryManager, long msRevision, HybridTimestamp msTimestamp) {
+        int catalogVersion = 
disasterRecoveryManager.catalogManager.activeCatalogVersion(msTimestamp.longValue());
 
         if (this.catalogVersion != catalogVersion) {
             return failedFuture(

Reply via email to