This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 939a1a31e5 IGNITE-23546 Fix long handling of
EvictIdempotentCommandsCacheCommand (#4637)
939a1a31e5 is described below
commit 939a1a31e50d4396c37fb22af741024f6cc21ebd
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Mon Oct 28 10:20:56 2024 +0300
IGNITE-23546 Fix long handling of EvictIdempotentCommandsCacheCommand
(#4637)
---
.../impl/ItIdempotentCommandCacheTest.java | 54 ---------------
.../server/persistence/RocksDbKeyValueStorage.java | 13 +---
.../server/raft/CommandResultAndTimestamp.java | 34 +++++++++
.../server/raft/MetaStorageListener.java | 1 -
.../server/raft/MetaStorageWriteHandler.java | 80 +++++++++++-----------
.../server/SimpleInMemoryKeyValueStorage.java | 10 +--
6 files changed, 80 insertions(+), 112 deletions(-)
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
index dc867d2fd7..07de8f9a13 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.metastorage.impl;
-import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
@@ -25,24 +24,18 @@ import static
org.apache.ignite.internal.metastorage.dsl.Operations.ops;
import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
import static org.apache.ignite.internal.metastorage.dsl.Statements.iif;
import static
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager.configureCmgManagerToStartMetastorage;
-import static
org.apache.ignite.internal.metastorage.server.raft.MetaStorageWriteHandler.IDEMPOTENT_COMMAND_PREFIX_BYTES;
import static
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.clusterService;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
-import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
import static org.apache.ignite.internal.util.IgniteUtils.startAsync;
import static org.apache.ignite.internal.util.IgniteUtils.stopAsync;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.hasSize;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.nio.charset.StandardCharsets;
@@ -57,7 +50,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiPredicate;
import java.util.stream.Stream;
-import org.apache.ignite.internal.causality.CompletableVersionedValue;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
@@ -475,46 +467,6 @@ public class ItIdempotentCommandCacheTest extends
IgniteAbstractTest {
}
}
- @Test
- void testEvictIdempotentCommandsCacheAfterCompaction() {
- RaftGroupService raftClient = raftClient();
- Node leader = leader(raftClient);
-
- // To wait, the metastore revision increase.
- var versionedValue = new CompletableVersionedValue<Void>();
- leader.metaStorageManager.registerRevisionUpdateListener(revision -> {
- versionedValue.complete(revision);
-
- return nullCompletedFuture();
- });
-
- long revisionBeforeRunCommands = leader.storage.revision();
-
- assertThat(raftClient.run(buildKeyNotExistsInvokeCommand(TEST_KEY,
TEST_VALUE, ANOTHER_VALUE)), willCompleteSuccessfully());
- assertThat(raftClient.run(buildKeyNotExistsInvokeCommand(TEST_KEY,
TEST_VALUE, ANOTHER_VALUE)), willCompleteSuccessfully());
-
- assertThat(versionedValue.get(revisionBeforeRunCommands + 2),
willCompleteSuccessfully());
-
- HybridTimestamp compactionTimestamp =
leader.storage.timestampByRevision(revisionBeforeRunCommands + 1);
- leader.storage.compact(revisionBeforeRunCommands + 1);
-
- assertThat(collectIdempotentCommands(leader.storage), hasSize(2));
- reset(leader.storage);
-
- long revisionBeforeEvict = leader.storage.revision();
-
-
assertThat(leader.metaStorageManager.evictIdempotentCommandsCache(compactionTimestamp),
willCompleteSuccessfully());
- assertThat(versionedValue.get(revisionBeforeEvict + 1),
willCompleteSuccessfully());
-
- byte[] keyFrom = IDEMPOTENT_COMMAND_PREFIX_BYTES;
- byte[] keyTo = leader.storage.nextKey(keyFrom);
-
- verify(leader.storage).revisionByTimestamp(eq(compactionTimestamp));
- verify(leader.storage).range(eq(keyFrom), eq(keyTo));
-
- assertThat(collectIdempotentCommands(leader.storage), hasSize(1));
- }
-
private Node leader(RaftGroupService raftClient) {
CompletableFuture<Void> refreshLeaderFut = raftClient.refreshLeader();
@@ -604,10 +556,4 @@ public class ItIdempotentCommandCacheTest extends
IgniteAbstractTest {
nodes.forEach(Node::deployWatches);
}
-
- private List<Entry> collectIdempotentCommands(KeyValueStorage storage) {
- return storage.range(IDEMPOTENT_COMMAND_PREFIX_BYTES,
storage.nextKey(IDEMPOTENT_COMMAND_PREFIX_BYTES)).stream()
- .filter(entry -> !entry.empty() && !entry.tombstone())
- .collect(toList());
- }
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index 2f0c29b7da..58bff60bb5 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -42,7 +42,7 @@ import static
org.apache.ignite.internal.metastorage.server.persistence.StorageC
import static
org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.REVISION_TO_CHECKSUM;
import static
org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.REVISION_TO_TS;
import static
org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.TS_TO_REVISION;
-import static
org.apache.ignite.internal.metastorage.server.raft.MetaStorageWriteHandler.IDEMPOTENT_COMMAND_PREFIX;
+import static
org.apache.ignite.internal.metastorage.server.raft.MetaStorageWriteHandler.toIdempotentCommandKey;
import static
org.apache.ignite.internal.rocksdb.snapshot.ColumnFamilyRange.fullRange;
import static org.apache.ignite.internal.util.ArrayUtils.LONG_EMPTY_ARRAY;
import static org.apache.ignite.internal.util.ByteUtils.toByteArray;
@@ -70,7 +70,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.internal.failure.FailureManager;
import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.metastorage.CommandId;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.dsl.Operation;
@@ -735,10 +734,7 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
List<Operation> ops = new ArrayList<>(branch ? success : failure);
- ops.add(Operations.put(
- new ByteArray(IDEMPOTENT_COMMAND_PREFIX +
commandId.toMgKeyAsString()),
- updateResult
- ));
+ ops.add(Operations.put(toIdempotentCommandKey(commandId),
updateResult));
applyOperations(ops, context, false, updateResult);
@@ -776,10 +772,7 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
List<Operation> ops = new ArrayList<>(update.operations());
- ops.add(Operations.put(
- new ByteArray(IDEMPOTENT_COMMAND_PREFIX +
commandId.toMgKeyAsString()),
- updateResult
- ));
+ ops.add(Operations.put(toIdempotentCommandKey(commandId),
updateResult));
applyOperations(ops, context, true, updateResult);
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/CommandResultAndTimestamp.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/CommandResultAndTimestamp.java
new file mode 100644
index 0000000000..fc5b344f70
--- /dev/null
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/CommandResultAndTimestamp.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.raft;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.jetbrains.annotations.Nullable;
+
+/** Helper class for idempotent command cache. */
+class CommandResultAndTimestamp {
+ final @Nullable Serializable commandResult;
+
+ final HybridTimestamp commandTimestamp;
+
+ CommandResultAndTimestamp(@Nullable Serializable commandResult,
HybridTimestamp commandTimestamp) {
+ this.commandResult = commandResult;
+ this.commandTimestamp = commandTimestamp;
+ }
+}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
index f8c61f571f..f299d8ac93 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
@@ -64,7 +64,6 @@ public class MetaStorageListener implements
RaftGroupListener, BeforeApplyHandle
private final MetaStorageWriteHandler writeHandler;
- /** Storage. */
private final KeyValueStorage storage;
private final Consumer<CommittedConfiguration> onConfigurationCommitted;
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
index a97a9a60de..73c09d430e 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
@@ -25,12 +25,14 @@ import static
org.apache.ignite.internal.util.ByteUtils.toByteArrayList;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Stream;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
@@ -54,7 +56,6 @@ import
org.apache.ignite.internal.metastorage.dsl.MetaStorageMessagesFactory;
import org.apache.ignite.internal.metastorage.dsl.SimpleCondition;
import org.apache.ignite.internal.metastorage.dsl.Statement.IfStatement;
import org.apache.ignite.internal.metastorage.dsl.Statement.UpdateStatement;
-import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
import org.apache.ignite.internal.metastorage.server.AndCondition;
import org.apache.ignite.internal.metastorage.server.Condition;
import org.apache.ignite.internal.metastorage.server.ExistenceCondition;
@@ -78,7 +79,6 @@ import org.jetbrains.annotations.Nullable;
* Class containing some common logic for Meta Storage Raft group listeners.
*/
public class MetaStorageWriteHandler {
- /** Logger. */
private static final IgniteLogger LOG =
Loggers.forClass(MetaStorageWriteHandler.class);
public static final String IDEMPOTENT_COMMAND_PREFIX = "icp.";
@@ -90,7 +90,7 @@ public class MetaStorageWriteHandler {
private final KeyValueStorage storage;
private final ClusterTimeImpl clusterTime;
- private final Map<CommandId, @Nullable Serializable>
idempotentCommandCache = new ConcurrentHashMap<>();
+ private final Map<CommandId, CommandResultAndTimestamp>
idempotentCommandCache = new ConcurrentHashMap<>();
MetaStorageWriteHandler(
KeyValueStorage storage,
@@ -112,10 +112,10 @@ public class MetaStorageWriteHandler {
IdempotentCommand idempotentCommand = ((IdempotentCommand)
command);
CommandId commandId = idempotentCommand.id();
- Serializable cachedResult = idempotentCommandCache.get(commandId);
+ CommandResultAndTimestamp cachedResult =
idempotentCommandCache.get(commandId);
if (cachedResult != null) {
- clo.result(cachedResult);
+ clo.result(cachedResult.commandResult);
return;
} else {
@@ -384,7 +384,7 @@ public class MetaStorageWriteHandler {
result =
MSG_FACTORY.statementResult().result(ByteBuffer.wrap(entry.value())).build();
}
- idempotentCommandCache.put(commandId, result);
+ idempotentCommandCache.put(commandId, new
CommandResultAndTimestamp(result, entry.timestamp()));
}
}
}
@@ -396,40 +396,33 @@ public class MetaStorageWriteHandler {
* @param evictionTimestamp Cached entries older than given timestamp will
be evicted.
* @param context Command operation context.
*/
- void evictIdempotentCommandsCache(HybridTimestamp evictionTimestamp,
KeyValueUpdateContext context) {
+ private void evictIdempotentCommandsCache(HybridTimestamp
evictionTimestamp, KeyValueUpdateContext context) {
LOG.info("Idempotent command cache cleanup started
[evictionTimestamp={}].", evictionTimestamp);
- List<byte[]> evictionCandidateKeys =
collectEvictionCandidateKeys(evictionTimestamp);
+ List<CommandId> evictedCommandIds =
evictCommandsFromCache(evictionTimestamp);
- if (evictionCandidateKeys.isEmpty()) {
+ if (evictedCommandIds.isEmpty()) {
return;
}
- evictionCandidateKeys.forEach(evictionCandidateKeyBytes -> {
- CommandId commandId = CommandId.fromString(
-
ByteUtils.stringFromBytes(evictionCandidateKeyBytes).substring(IDEMPOTENT_COMMAND_PREFIX.length()));
-
- idempotentCommandCache.remove(commandId);
- });
-
- storage.removeAll(evictionCandidateKeys, context);
+ storage.removeAll(toIdempotentCommandKeyBytes(evictedCommandIds),
context);
LOG.info("Idempotent command cache cleanup finished
[evictionTimestamp={}, cleanupCompletionTimestamp={},"
+ " removedEntriesCount={}, cacheSize={}].",
evictionTimestamp,
clusterTime.now(),
- evictionCandidateKeys.size(),
+ evictedCommandIds.size(),
idempotentCommandCache.size()
);
}
private class ResultCachingClosure implements CommandClosure<WriteCommand>
{
- CommandClosure<WriteCommand> closure;
+ final CommandClosure<WriteCommand> closure;
ResultCachingClosure(CommandClosure<WriteCommand> closure) {
- this.closure = closure;
-
assert closure.command() instanceof IdempotentCommand;
+
+ this.closure = closure;
}
@Override
@@ -453,33 +446,40 @@ public class MetaStorageWriteHandler {
// Exceptions are not cached.
if (!(res instanceof Throwable)) {
- idempotentCommandCache.put(command.id(), res);
+ idempotentCommandCache.put(command.id(), new
CommandResultAndTimestamp(res, command.safeTime()));
}
closure.result(res);
}
}
- private List<byte[]> collectEvictionCandidateKeys(HybridTimestamp
evictionTimestamp) {
- byte[] keyFrom = IDEMPOTENT_COMMAND_PREFIX_BYTES;
- byte[] keyTo = storage.nextKey(IDEMPOTENT_COMMAND_PREFIX_BYTES);
+ private List<CommandId> evictCommandsFromCache(HybridTimestamp
evictionTimestamp) {
+ Iterator<Map.Entry<CommandId, CommandResultAndTimestamp>> iterator =
idempotentCommandCache.entrySet().iterator();
- Stream<Entry> entryStream;
+ var result = new ArrayList<CommandId>();
- try {
- long obsoleteRevision =
storage.revisionByTimestamp(evictionTimestamp);
-
- entryStream = storage.range(keyFrom, keyTo,
obsoleteRevision).stream()
- // Not sure whether it's possible to retrieve empty entry
here, thus !entry.empty() was added just in case.
- .filter(entry -> !entry.tombstone() && !entry.empty());
- } catch (CompactedException e) {
- // Fallback to scanning with time timestamp filtering.
- entryStream = storage.range(keyFrom, keyTo).stream()
- // Not sure whether it's possible to retrieve empty entry
here, thus !entry.empty() was added just in case.
- .filter(entry -> !entry.tombstone() && !entry.empty())
- .filter(entry ->
entry.timestamp().compareTo(evictionTimestamp) <= 0);
+ while (iterator.hasNext()) {
+ Map.Entry<CommandId, CommandResultAndTimestamp> entry =
iterator.next();
+
+ if (evictionTimestamp.compareTo(entry.getValue().commandTimestamp)
>= 0) {
+ iterator.remove();
+
+ result.add(entry.getKey());
+ }
}
- return entryStream.map(Entry::key).collect(toList());
+ return result;
+ }
+
+ private static List<byte[]> toIdempotentCommandKeyBytes(List<CommandId>
commandIds) {
+ return commandIds.stream()
+ .map(MetaStorageWriteHandler::toIdempotentCommandKey)
+ .map(ByteArray::bytes)
+ .collect(toList());
+ }
+
+ /** Converts to independent command key. */
+ public static ByteArray toIdempotentCommandKey(CommandId commandId) {
+ return new ByteArray(IDEMPOTENT_COMMAND_PREFIX +
commandId.toMgKeyAsString());
}
}
diff --git
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index 4078fe6602..dddbf0fc7c 100644
---
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -28,7 +28,7 @@ import static
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils
import static
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.maxRevisionIndex;
import static
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.toUtf8String;
import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE;
-import static
org.apache.ignite.internal.metastorage.server.raft.MetaStorageWriteHandler.IDEMPOTENT_COMMAND_PREFIX;
+import static
org.apache.ignite.internal.metastorage.server.raft.MetaStorageWriteHandler.toIdempotentCommandKey;
import static org.apache.ignite.internal.util.ArrayUtils.LONG_EMPTY_ARRAY;
import static org.apache.ignite.internal.util.ByteUtils.toByteArray;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
@@ -52,7 +52,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.ignite.internal.failure.NoOpFailureManager;
import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.metastorage.CommandId;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.dsl.Operation;
@@ -308,7 +307,7 @@ public class SimpleInMemoryKeyValueStorage extends
AbstractKeyValueStorage {
// In case of in-memory storage, there's no sense in "persisting"
invoke result, however same persistent source operations
// were added in order to have matching revisions count through
all storages.
ops.add(Operations.put(
- new ByteArray(IDEMPOTENT_COMMAND_PREFIX +
commandId.toMgKeyAsString()),
+ toIdempotentCommandKey(commandId),
branch ? INVOKE_RESULT_TRUE_BYTES :
INVOKE_RESULT_FALSE_BYTES)
);
@@ -362,10 +361,7 @@ public class SimpleInMemoryKeyValueStorage extends
AbstractKeyValueStorage {
// In case of in-memory storage, there's no sense in
"persisting" invoke result, however same persistent source
// operations were added in order to have matching
revisions count through all storages.
- ops.add(Operations.put(
- new ByteArray(IDEMPOTENT_COMMAND_PREFIX +
commandId.toMgKeyAsString()),
- branch.update().result().result())
- );
+ ops.add(Operations.put(toIdempotentCommandKey(commandId),
branch.update().result().result()));
for (Operation op : ops) {
switch (op.type()) {