This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 9852c3b4b9 IGNITE-23356 Adapt components that use get metastorage
revision by timestamp to compaction (#4506)
9852c3b4b9 is described below
commit 9852c3b4b98cd8f15e7e4f3f1ccbfb93d6aefd35
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Mon Oct 7 06:40:38 2024 +0300
IGNITE-23356 Adapt components that use get metastorage revision by
timestamp to compaction (#4506)
---
.../internal/catalog/storage/UpdateLogImpl.java | 4 +-
.../impl/ItIdempotentCommandCacheTest.java | 60 +++++++++++++++++++-
.../server/persistence/RocksDbKeyValueStorage.java | 9 +--
.../server/persistence/RocksStorageUtils.java | 10 +++-
.../server/raft/MetaStorageWriteHandler.java | 66 ++++++++++++++--------
.../disaster/DisasterRecoveryManager.java | 4 +-
.../disaster/DisasterRecoveryRequest.java | 4 +-
.../disaster/ManualGroupRestartRequest.java | 3 +-
.../disaster/ManualGroupUpdateRequest.java | 6 +-
9 files changed, 124 insertions(+), 42 deletions(-)
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
index 81760cba24..3e083a5a64 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
@@ -262,9 +262,7 @@ public class UpdateLogImpl implements UpdateLog {
UpdateLogEvent update =
marshaller.unmarshall(Objects.requireNonNull(entry.value()));
- long revision = entry.revision();
-
- handler.handle(update,
metastore.timestampByRevisionLocally(revision), revision);
+ handler.handle(update, entry.timestamp(), entry.revision());
}
}
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
index a82bee09ee..984e3efdeb 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
@@ -17,23 +17,31 @@
package org.apache.ignite.internal.metastorage.impl;
+import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
import static org.apache.ignite.internal.metastorage.dsl.Operations.ops;
import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
import static org.apache.ignite.internal.metastorage.dsl.Statements.iif;
+import static
org.apache.ignite.internal.metastorage.server.raft.MetaStorageWriteHandler.IDEMPOTENT_COMMAND_PREFIX_BYTES;
import static
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.clusterService;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
import static org.apache.ignite.internal.util.IgniteUtils.startAsync;
import static org.apache.ignite.internal.util.IgniteUtils.stopAsync;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasSize;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.nio.charset.StandardCharsets;
@@ -48,6 +56,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiPredicate;
import java.util.stream.Stream;
+import org.apache.ignite.internal.causality.CompletableVersionedValue;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
import org.apache.ignite.internal.configuration.ComponentWorkingDir;
@@ -203,10 +212,11 @@ public class ItIdempotentCommandCacheTest extends
IgniteAbstractTest {
RaftGroupOptionsConfigurer msRaftConfigurer =
RaftGroupOptionsConfigHelper.configureProperties(msLogStorageFactory,
metastorageWorkDir.metaPath());
- storage = new RocksDbKeyValueStorage(
+ storage = spy(new RocksDbKeyValueStorage(
clusterService.nodeName(),
metastorageWorkDir.dbPath(),
- new NoOpFailureManager());
+ new NoOpFailureManager()
+ ));
metaStorageManager = new MetaStorageManagerImpl(
clusterService,
@@ -455,6 +465,46 @@ public class ItIdempotentCommandCacheTest extends
IgniteAbstractTest {
}
}
+ @Test
+ void testEvictIdempotentCommandsCacheAfterCompaction() {
+ RaftGroupService raftClient = raftClient();
+ Node leader = leader(raftClient);
+
+ // To wait, the metastore revision increase.
+ var versionedValue = new CompletableVersionedValue<Void>();
+ leader.metaStorageManager.registerRevisionUpdateListener(revision -> {
+ versionedValue.complete(revision);
+
+ return nullCompletedFuture();
+ });
+
+ long revisionBeforeRunCommands = leader.storage.revision();
+
+ assertThat(raftClient.run(buildKeyNotExistsInvokeCommand(TEST_KEY,
TEST_VALUE, ANOTHER_VALUE)), willCompleteSuccessfully());
+ assertThat(raftClient.run(buildKeyNotExistsInvokeCommand(TEST_KEY,
TEST_VALUE, ANOTHER_VALUE)), willCompleteSuccessfully());
+
+ assertThat(versionedValue.get(revisionBeforeRunCommands + 2),
willCompleteSuccessfully());
+
+ HybridTimestamp compactionTimestamp =
leader.storage.timestampByRevision(revisionBeforeRunCommands + 1);
+ leader.storage.compact(revisionBeforeRunCommands + 1);
+
+ assertThat(collectIdempotentCommands(leader.storage), hasSize(2));
+ reset(leader.storage);
+
+ long revisionBeforeEvict = leader.storage.revision();
+
+
assertThat(leader.metaStorageManager.evictIdempotentCommandsCache(compactionTimestamp),
willCompleteSuccessfully());
+ assertThat(versionedValue.get(revisionBeforeEvict + 1),
willCompleteSuccessfully());
+
+ byte[] keyFrom = IDEMPOTENT_COMMAND_PREFIX_BYTES;
+ byte[] keyTo = leader.storage.nextKey(keyFrom);
+
+ verify(leader.storage).revisionByTimestamp(eq(compactionTimestamp));
+ verify(leader.storage).range(eq(keyFrom), eq(keyTo));
+
+ assertThat(collectIdempotentCommands(leader.storage), hasSize(1));
+ }
+
private Node leader(RaftGroupService raftClient) {
CompletableFuture<Void> refreshLeaderFut = raftClient.refreshLeader();
@@ -548,4 +598,10 @@ public class ItIdempotentCommandCacheTest extends
IgniteAbstractTest {
nodes.forEach(Node::deployWatches);
}
+
+ private List<Entry> collectIdempotentCommands(KeyValueStorage storage) {
+ return storage.range(IDEMPOTENT_COMMAND_PREFIX_BYTES,
storage.nextKey(IDEMPOTENT_COMMAND_PREFIX_BYTES)).stream()
+ .filter(entry -> !entry.empty() && !entry.tombstone())
+ .collect(toList());
+ }
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index 4888ec09bd..8419e5d13f 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -34,6 +34,7 @@ import static
org.apache.ignite.internal.metastorage.server.persistence.RocksSto
import static
org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.longsToBytes;
import static
org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.revisionFromRocksKey;
import static
org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.rocksKeyToBytes;
+import static
org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.timestampFromRocksValue;
import static
org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.valueToBytes;
import static
org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.DATA;
import static
org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.INDEX;
@@ -1381,15 +1382,15 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
if (revision != lastSeenRevision) {
if (!updatedEntries.isEmpty()) {
- var updatedEntriesCopy = List.copyOf(updatedEntries);
+ List<Entry> updatedEntriesCopy =
List.copyOf(updatedEntries);
- assert ts != null;
+ assert ts != null : revision;
watchProcessor.notifyWatches(updatedEntriesCopy, ts);
updatedEntries.clear();
- ts = timestampByRevision(revision);
+ ts =
hybridTimestamp(timestampFromRocksValue(rocksValue));
}
lastSeenRevision = revision;
@@ -1397,7 +1398,7 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
if (ts == null) {
// This will only execute on first iteration.
- ts = timestampByRevision(revision);
+ ts = hybridTimestamp(timestampFromRocksValue(rocksValue));
}
updatedEntries.add(entry(rocksKeyToBytes(rocksKey), revision,
bytesToValue(rocksValue)));
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksStorageUtils.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksStorageUtils.java
index d4e19e1333..8fd4e7844f 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksStorageUtils.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksStorageUtils.java
@@ -125,12 +125,20 @@ class RocksStorageUtils {
* Gets a revision from a key with a revision.
*
* @param rocksKey Key with a revision.
- * @return Revision.
*/
static long revisionFromRocksKey(byte[] rocksKey) {
return (long) LONG_ARRAY_HANDLE.get(rocksKey, 0);
}
+ /**
+ * Gets a operation timestamp from a value bytes.
+ *
+ * @param rocksValue Value bytes with a operation timestamp.
+ */
+ static long timestampFromRocksValue(byte[] rocksValue) {
+ return (long) LONG_ARRAY_HANDLE.get(rocksValue, 0);
+ }
+
/** Converts from a byte array to a {@link Value}. */
static Value bytesToValue(byte[] valueBytes) {
// At least an 8-bytes operation timestamp and a 1-byte boolean.
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
index 93e1c7f27e..bba9fafb5e 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
@@ -29,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Stream;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -52,6 +53,7 @@ import
org.apache.ignite.internal.metastorage.dsl.MetaStorageMessagesFactory;
import org.apache.ignite.internal.metastorage.dsl.SimpleCondition;
import org.apache.ignite.internal.metastorage.dsl.Statement.IfStatement;
import org.apache.ignite.internal.metastorage.dsl.Statement.UpdateStatement;
+import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
import org.apache.ignite.internal.metastorage.server.AndCondition;
import org.apache.ignite.internal.metastorage.server.Condition;
import org.apache.ignite.internal.metastorage.server.ExistenceCondition;
@@ -380,35 +382,28 @@ public class MetaStorageWriteHandler {
void evictIdempotentCommandsCache(HybridTimestamp evictionTimestamp,
HybridTimestamp operationTimestamp) {
LOG.info("Idempotent command cache cleanup started
[evictionTimestamp={}].", evictionTimestamp);
- long obsoleteRevision = storage.revisionByTimestamp(evictionTimestamp);
+ List<byte[]> evictionCandidateKeys =
collectEvictionCandidateKeys(evictionTimestamp);
- if (obsoleteRevision != -1) {
- byte[] keyFrom = IDEMPOTENT_COMMAND_PREFIX_BYTES;
- byte[] keyTo = storage.nextKey(IDEMPOTENT_COMMAND_PREFIX_BYTES);
-
- List<byte[]> evictionCandidateKeys = storage.range(keyFrom, keyTo,
obsoleteRevision).stream()
- // Not sure whether it's possible to retrieve empty entry
here, thus !entry.empty() was added just in case.
- .filter(entry -> !entry.tombstone() && !entry.empty())
- .map(Entry::key)
- .collect(toList());
+ if (evictionCandidateKeys.isEmpty()) {
+ return;
+ }
- evictionCandidateKeys.forEach(evictionCandidateKeyBytes -> {
- CommandId commandId = CommandId.fromString(
-
ByteUtils.stringFromBytes(evictionCandidateKeyBytes).substring(IDEMPOTENT_COMMAND_PREFIX.length()));
+ evictionCandidateKeys.forEach(evictionCandidateKeyBytes -> {
+ CommandId commandId = CommandId.fromString(
+
ByteUtils.stringFromBytes(evictionCandidateKeyBytes).substring(IDEMPOTENT_COMMAND_PREFIX.length()));
- idempotentCommandCache.remove(commandId);
- });
+ idempotentCommandCache.remove(commandId);
+ });
- storage.removeAll(evictionCandidateKeys, operationTimestamp);
+ storage.removeAll(evictionCandidateKeys, operationTimestamp);
- LOG.info("Idempotent command cache cleanup finished
[evictionTimestamp={}, cleanupCompletionTimestamp={},"
- + " removedEntriesCount={}, cacheSize={}].",
- evictionTimestamp,
- clusterTime.now(),
- evictionCandidateKeys.size(),
- idempotentCommandCache.size()
- );
- }
+ LOG.info("Idempotent command cache cleanup finished
[evictionTimestamp={}, cleanupCompletionTimestamp={},"
+ + " removedEntriesCount={}, cacheSize={}].",
+ evictionTimestamp,
+ clusterTime.now(),
+ evictionCandidateKeys.size(),
+ idempotentCommandCache.size()
+ );
}
private class ResultCachingClosure implements CommandClosure<WriteCommand>
{
@@ -447,4 +442,27 @@ public class MetaStorageWriteHandler {
closure.result(res);
}
}
+
+ private List<byte[]> collectEvictionCandidateKeys(HybridTimestamp
evictionTimestamp) {
+ byte[] keyFrom = IDEMPOTENT_COMMAND_PREFIX_BYTES;
+ byte[] keyTo = storage.nextKey(IDEMPOTENT_COMMAND_PREFIX_BYTES);
+
+ Stream<Entry> entryStream;
+
+ try {
+ long obsoleteRevision =
storage.revisionByTimestamp(evictionTimestamp);
+
+ entryStream = storage.range(keyFrom, keyTo,
obsoleteRevision).stream()
+ // Not sure whether it's possible to retrieve empty entry
here, thus !entry.empty() was added just in case.
+ .filter(entry -> !entry.tombstone() && !entry.empty());
+ } catch (CompactedException e) {
+ // Fallback to scanning with time timestamp filtering.
+ entryStream = storage.range(keyFrom, keyTo).stream()
+ // Not sure whether it's possible to retrieve empty entry
here, thus !entry.empty() was added just in case.
+ .filter(entry -> !entry.tombstone() && !entry.empty())
+ .filter(entry ->
entry.timestamp().compareTo(evictionTimestamp) <= 0);
+ }
+
+ return entryStream.map(Entry::key).collect(toList());
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
index 83899f4e40..a637a89c14 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
@@ -522,11 +522,11 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
return;
}
- request.handle(this,
watchEvent.revision()).whenComplete(copyStateTo(operationFuture));
+ request.handle(this, watchEvent.revision(),
watchEvent.timestamp()).whenComplete(copyStateTo(operationFuture));
break;
case MULTI_NODE:
- CompletableFuture<Void> handleFuture = request.handle(this,
watchEvent.revision());
+ CompletableFuture<Void> handleFuture = request.handle(this,
watchEvent.revision(), watchEvent.timestamp());
if (operationFuture == null) {
// We're not the initiator, or timeout has passed.
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryRequest.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryRequest.java
index d2bffe0e2a..592a7f39f7 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryRequest.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryRequest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.table.distributed.disaster;
import java.io.Serializable;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
/**
* General interface for disaster recovery requests.
@@ -43,7 +44,8 @@ interface DisasterRecoveryRequest extends Serializable {
*
* @param disasterRecoveryManager Disaster recovery manager.
* @param revision Revision of the {@link
DisasterRecoveryManager#RECOVERY_TRIGGER_KEY} update.
+ * @param timestamp Timestamp of {@link
DisasterRecoveryManager#RECOVERY_TRIGGER_KEY} update operation.
* @return New operation future, that completes when operation is
completed.
*/
- CompletableFuture<Void> handle(DisasterRecoveryManager
disasterRecoveryManager, long revision);
+ CompletableFuture<Void> handle(DisasterRecoveryManager
disasterRecoveryManager, long revision, HybridTimestamp timestamp);
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
index 235ea2e017..eba6226b98 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.tostring.S;
@@ -87,7 +88,7 @@ class ManualGroupRestartRequest implements
DisasterRecoveryRequest {
}
@Override
- public CompletableFuture<Void> handle(DisasterRecoveryManager
disasterRecoveryManager, long revision) {
+ public CompletableFuture<Void> handle(DisasterRecoveryManager
disasterRecoveryManager, long revision, HybridTimestamp timestamp) {
if (!nodeNames.isEmpty() &&
!nodeNames.contains(disasterRecoveryManager.localNode().name())) {
return nullCompletedFuture();
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupUpdateRequest.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupUpdateRequest.java
index 9f4058ac5b..7d0aeaf81b 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupUpdateRequest.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupUpdateRequest.java
@@ -126,10 +126,8 @@ class ManualGroupUpdateRequest implements
DisasterRecoveryRequest {
}
@Override
- public CompletableFuture<Void> handle(DisasterRecoveryManager
disasterRecoveryManager, long msRevision) {
- HybridTimestamp msSafeTime =
disasterRecoveryManager.metaStorageManager.timestampByRevisionLocally(msRevision);
-
- int catalogVersion =
disasterRecoveryManager.catalogManager.activeCatalogVersion(msSafeTime.longValue());
+ public CompletableFuture<Void> handle(DisasterRecoveryManager
disasterRecoveryManager, long msRevision, HybridTimestamp msTimestamp) {
+ int catalogVersion =
disasterRecoveryManager.catalogManager.activeCatalogVersion(msTimestamp.longValue());
if (this.catalogVersion != catalogVersion) {
return failedFuture(