This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 939a1a31e5 IGNITE-23546 Fix long handling of 
EvictIdempotentCommandsCacheCommand (#4637)
939a1a31e5 is described below

commit 939a1a31e50d4396c37fb22af741024f6cc21ebd
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Mon Oct 28 10:20:56 2024 +0300

    IGNITE-23546 Fix long handling of EvictIdempotentCommandsCacheCommand 
(#4637)
---
 .../impl/ItIdempotentCommandCacheTest.java         | 54 ---------------
 .../server/persistence/RocksDbKeyValueStorage.java | 13 +---
 .../server/raft/CommandResultAndTimestamp.java     | 34 +++++++++
 .../server/raft/MetaStorageListener.java           |  1 -
 .../server/raft/MetaStorageWriteHandler.java       | 80 +++++++++++-----------
 .../server/SimpleInMemoryKeyValueStorage.java      | 10 +--
 6 files changed, 80 insertions(+), 112 deletions(-)

diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
index dc867d2fd7..07de8f9a13 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.metastorage.impl;
 
-import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toSet;
 import static 
org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
@@ -25,24 +24,18 @@ import static 
org.apache.ignite.internal.metastorage.dsl.Operations.ops;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
 import static org.apache.ignite.internal.metastorage.dsl.Statements.iif;
 import static 
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager.configureCmgManagerToStartMetastorage;
-import static 
org.apache.ignite.internal.metastorage.server.raft.MetaStorageWriteHandler.IDEMPOTENT_COMMAND_PREFIX_BYTES;
 import static 
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.clusterService;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
-import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
 import static org.apache.ignite.internal.util.IgniteUtils.startAsync;
 import static org.apache.ignite.internal.util.IgniteUtils.stopAsync;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.hasSize;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.nio.charset.StandardCharsets;
@@ -57,7 +50,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiPredicate;
 import java.util.stream.Stream;
-import org.apache.ignite.internal.causality.CompletableVersionedValue;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import 
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
@@ -475,46 +467,6 @@ public class ItIdempotentCommandCacheTest extends 
IgniteAbstractTest {
         }
     }
 
-    @Test
-    void testEvictIdempotentCommandsCacheAfterCompaction() {
-        RaftGroupService raftClient = raftClient();
-        Node leader = leader(raftClient);
-
-        // To wait, the metastore revision increase.
-        var versionedValue = new CompletableVersionedValue<Void>();
-        leader.metaStorageManager.registerRevisionUpdateListener(revision -> {
-            versionedValue.complete(revision);
-
-            return nullCompletedFuture();
-        });
-
-        long revisionBeforeRunCommands = leader.storage.revision();
-
-        assertThat(raftClient.run(buildKeyNotExistsInvokeCommand(TEST_KEY, 
TEST_VALUE, ANOTHER_VALUE)), willCompleteSuccessfully());
-        assertThat(raftClient.run(buildKeyNotExistsInvokeCommand(TEST_KEY, 
TEST_VALUE, ANOTHER_VALUE)), willCompleteSuccessfully());
-
-        assertThat(versionedValue.get(revisionBeforeRunCommands + 2), 
willCompleteSuccessfully());
-
-        HybridTimestamp compactionTimestamp = 
leader.storage.timestampByRevision(revisionBeforeRunCommands + 1);
-        leader.storage.compact(revisionBeforeRunCommands + 1);
-
-        assertThat(collectIdempotentCommands(leader.storage), hasSize(2));
-        reset(leader.storage);
-
-        long revisionBeforeEvict = leader.storage.revision();
-
-        
assertThat(leader.metaStorageManager.evictIdempotentCommandsCache(compactionTimestamp),
 willCompleteSuccessfully());
-        assertThat(versionedValue.get(revisionBeforeEvict + 1), 
willCompleteSuccessfully());
-
-        byte[] keyFrom = IDEMPOTENT_COMMAND_PREFIX_BYTES;
-        byte[] keyTo = leader.storage.nextKey(keyFrom);
-
-        verify(leader.storage).revisionByTimestamp(eq(compactionTimestamp));
-        verify(leader.storage).range(eq(keyFrom), eq(keyTo));
-
-        assertThat(collectIdempotentCommands(leader.storage), hasSize(1));
-    }
-
     private Node leader(RaftGroupService raftClient) {
         CompletableFuture<Void> refreshLeaderFut = raftClient.refreshLeader();
 
@@ -604,10 +556,4 @@ public class ItIdempotentCommandCacheTest extends 
IgniteAbstractTest {
 
         nodes.forEach(Node::deployWatches);
     }
-
-    private List<Entry> collectIdempotentCommands(KeyValueStorage storage) {
-        return storage.range(IDEMPOTENT_COMMAND_PREFIX_BYTES, 
storage.nextKey(IDEMPOTENT_COMMAND_PREFIX_BYTES)).stream()
-                .filter(entry -> !entry.empty() && !entry.tombstone())
-                .collect(toList());
-    }
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index 2f0c29b7da..58bff60bb5 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -42,7 +42,7 @@ import static 
org.apache.ignite.internal.metastorage.server.persistence.StorageC
 import static 
org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.REVISION_TO_CHECKSUM;
 import static 
org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.REVISION_TO_TS;
 import static 
org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.TS_TO_REVISION;
-import static 
org.apache.ignite.internal.metastorage.server.raft.MetaStorageWriteHandler.IDEMPOTENT_COMMAND_PREFIX;
+import static 
org.apache.ignite.internal.metastorage.server.raft.MetaStorageWriteHandler.toIdempotentCommandKey;
 import static 
org.apache.ignite.internal.rocksdb.snapshot.ColumnFamilyRange.fullRange;
 import static org.apache.ignite.internal.util.ArrayUtils.LONG_EMPTY_ARRAY;
 import static org.apache.ignite.internal.util.ByteUtils.toByteArray;
@@ -70,7 +70,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.internal.failure.FailureManager;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.metastorage.CommandId;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.dsl.Operation;
@@ -735,10 +734,7 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
 
             List<Operation> ops = new ArrayList<>(branch ? success : failure);
 
-            ops.add(Operations.put(
-                    new ByteArray(IDEMPOTENT_COMMAND_PREFIX + 
commandId.toMgKeyAsString()),
-                    updateResult
-            ));
+            ops.add(Operations.put(toIdempotentCommandKey(commandId), 
updateResult));
 
             applyOperations(ops, context, false, updateResult);
 
@@ -776,10 +772,7 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
 
                     List<Operation> ops = new ArrayList<>(update.operations());
 
-                    ops.add(Operations.put(
-                            new ByteArray(IDEMPOTENT_COMMAND_PREFIX + 
commandId.toMgKeyAsString()),
-                            updateResult
-                    ));
+                    ops.add(Operations.put(toIdempotentCommandKey(commandId), 
updateResult));
 
                     applyOperations(ops, context, true, updateResult);
 
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/CommandResultAndTimestamp.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/CommandResultAndTimestamp.java
new file mode 100644
index 0000000000..fc5b344f70
--- /dev/null
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/CommandResultAndTimestamp.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.raft;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.jetbrains.annotations.Nullable;
+
+/** Helper class for idempotent command cache. */
+class CommandResultAndTimestamp {
+    final @Nullable Serializable commandResult;
+
+    final HybridTimestamp commandTimestamp;
+
+    CommandResultAndTimestamp(@Nullable Serializable commandResult, 
HybridTimestamp commandTimestamp) {
+        this.commandResult = commandResult;
+        this.commandTimestamp = commandTimestamp;
+    }
+}
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
index f8c61f571f..f299d8ac93 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
@@ -64,7 +64,6 @@ public class MetaStorageListener implements 
RaftGroupListener, BeforeApplyHandle
 
     private final MetaStorageWriteHandler writeHandler;
 
-    /** Storage. */
     private final KeyValueStorage storage;
 
     private final Consumer<CommittedConfiguration> onConfigurationCommitted;
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
index a97a9a60de..73c09d430e 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
@@ -25,12 +25,14 @@ import static 
org.apache.ignite.internal.util.ByteUtils.toByteArrayList;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Stream;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
@@ -54,7 +56,6 @@ import 
org.apache.ignite.internal.metastorage.dsl.MetaStorageMessagesFactory;
 import org.apache.ignite.internal.metastorage.dsl.SimpleCondition;
 import org.apache.ignite.internal.metastorage.dsl.Statement.IfStatement;
 import org.apache.ignite.internal.metastorage.dsl.Statement.UpdateStatement;
-import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
 import org.apache.ignite.internal.metastorage.server.AndCondition;
 import org.apache.ignite.internal.metastorage.server.Condition;
 import org.apache.ignite.internal.metastorage.server.ExistenceCondition;
@@ -78,7 +79,6 @@ import org.jetbrains.annotations.Nullable;
  * Class containing some common logic for Meta Storage Raft group listeners.
  */
 public class MetaStorageWriteHandler {
-    /** Logger. */
     private static final IgniteLogger LOG = 
Loggers.forClass(MetaStorageWriteHandler.class);
 
     public static final String IDEMPOTENT_COMMAND_PREFIX = "icp.";
@@ -90,7 +90,7 @@ public class MetaStorageWriteHandler {
     private final KeyValueStorage storage;
     private final ClusterTimeImpl clusterTime;
 
-    private final Map<CommandId, @Nullable Serializable> 
idempotentCommandCache = new ConcurrentHashMap<>();
+    private final Map<CommandId, CommandResultAndTimestamp> 
idempotentCommandCache = new ConcurrentHashMap<>();
 
     MetaStorageWriteHandler(
             KeyValueStorage storage,
@@ -112,10 +112,10 @@ public class MetaStorageWriteHandler {
             IdempotentCommand idempotentCommand = ((IdempotentCommand) 
command);
             CommandId commandId = idempotentCommand.id();
 
-            Serializable cachedResult = idempotentCommandCache.get(commandId);
+            CommandResultAndTimestamp cachedResult = 
idempotentCommandCache.get(commandId);
 
             if (cachedResult != null) {
-                clo.result(cachedResult);
+                clo.result(cachedResult.commandResult);
 
                 return;
             } else {
@@ -384,7 +384,7 @@ public class MetaStorageWriteHandler {
                         result = 
MSG_FACTORY.statementResult().result(ByteBuffer.wrap(entry.value())).build();
                     }
 
-                    idempotentCommandCache.put(commandId, result);
+                    idempotentCommandCache.put(commandId, new 
CommandResultAndTimestamp(result, entry.timestamp()));
                 }
             }
         }
@@ -396,40 +396,33 @@ public class MetaStorageWriteHandler {
      * @param evictionTimestamp Cached entries older than given timestamp will 
be evicted.
      * @param context Command operation context.
      */
-    void evictIdempotentCommandsCache(HybridTimestamp evictionTimestamp, 
KeyValueUpdateContext context) {
+    private void evictIdempotentCommandsCache(HybridTimestamp 
evictionTimestamp, KeyValueUpdateContext context) {
         LOG.info("Idempotent command cache cleanup started 
[evictionTimestamp={}].", evictionTimestamp);
 
-        List<byte[]> evictionCandidateKeys = 
collectEvictionCandidateKeys(evictionTimestamp);
+        List<CommandId> evictedCommandIds = 
evictCommandsFromCache(evictionTimestamp);
 
-        if (evictionCandidateKeys.isEmpty()) {
+        if (evictedCommandIds.isEmpty()) {
             return;
         }
 
-        evictionCandidateKeys.forEach(evictionCandidateKeyBytes -> {
-            CommandId commandId = CommandId.fromString(
-                    
ByteUtils.stringFromBytes(evictionCandidateKeyBytes).substring(IDEMPOTENT_COMMAND_PREFIX.length()));
-
-            idempotentCommandCache.remove(commandId);
-        });
-
-        storage.removeAll(evictionCandidateKeys, context);
+        storage.removeAll(toIdempotentCommandKeyBytes(evictedCommandIds), 
context);
 
         LOG.info("Idempotent command cache cleanup finished 
[evictionTimestamp={}, cleanupCompletionTimestamp={},"
                         + " removedEntriesCount={}, cacheSize={}].",
                 evictionTimestamp,
                 clusterTime.now(),
-                evictionCandidateKeys.size(),
+                evictedCommandIds.size(),
                 idempotentCommandCache.size()
         );
     }
 
     private class ResultCachingClosure implements CommandClosure<WriteCommand> 
{
-        CommandClosure<WriteCommand> closure;
+        final CommandClosure<WriteCommand> closure;
 
         ResultCachingClosure(CommandClosure<WriteCommand> closure) {
-            this.closure = closure;
-
             assert closure.command() instanceof IdempotentCommand;
+
+            this.closure = closure;
         }
 
         @Override
@@ -453,33 +446,40 @@ public class MetaStorageWriteHandler {
 
             // Exceptions are not cached.
             if (!(res instanceof Throwable)) {
-                idempotentCommandCache.put(command.id(), res);
+                idempotentCommandCache.put(command.id(), new 
CommandResultAndTimestamp(res, command.safeTime()));
             }
 
             closure.result(res);
         }
     }
 
-    private List<byte[]> collectEvictionCandidateKeys(HybridTimestamp 
evictionTimestamp) {
-        byte[] keyFrom = IDEMPOTENT_COMMAND_PREFIX_BYTES;
-        byte[] keyTo = storage.nextKey(IDEMPOTENT_COMMAND_PREFIX_BYTES);
+    private List<CommandId> evictCommandsFromCache(HybridTimestamp 
evictionTimestamp) {
+        Iterator<Map.Entry<CommandId, CommandResultAndTimestamp>> iterator = 
idempotentCommandCache.entrySet().iterator();
 
-        Stream<Entry> entryStream;
+        var result = new ArrayList<CommandId>();
 
-        try {
-            long obsoleteRevision = 
storage.revisionByTimestamp(evictionTimestamp);
-
-            entryStream = storage.range(keyFrom, keyTo, 
obsoleteRevision).stream()
-                    // Not sure whether it's possible to retrieve empty entry 
here, thus !entry.empty() was added just in case.
-                    .filter(entry -> !entry.tombstone() && !entry.empty());
-        } catch (CompactedException e) {
-            // Fallback to scanning with time timestamp filtering.
-            entryStream = storage.range(keyFrom, keyTo).stream()
-                    // Not sure whether it's possible to retrieve empty entry 
here, thus !entry.empty() was added just in case.
-                    .filter(entry -> !entry.tombstone() && !entry.empty())
-                    .filter(entry -> 
entry.timestamp().compareTo(evictionTimestamp) <= 0);
+        while (iterator.hasNext()) {
+            Map.Entry<CommandId, CommandResultAndTimestamp> entry = 
iterator.next();
+
+            if (evictionTimestamp.compareTo(entry.getValue().commandTimestamp) 
>= 0) {
+                iterator.remove();
+
+                result.add(entry.getKey());
+            }
         }
 
-        return entryStream.map(Entry::key).collect(toList());
+        return result;
+    }
+
+    private static List<byte[]> toIdempotentCommandKeyBytes(List<CommandId> 
commandIds) {
+        return commandIds.stream()
+                .map(MetaStorageWriteHandler::toIdempotentCommandKey)
+                .map(ByteArray::bytes)
+                .collect(toList());
+    }
+
+    /** Converts to independent command key. */
+    public static ByteArray toIdempotentCommandKey(CommandId commandId) {
+        return new ByteArray(IDEMPOTENT_COMMAND_PREFIX + 
commandId.toMgKeyAsString());
     }
 }
diff --git 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index 4078fe6602..dddbf0fc7c 100644
--- 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++ 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -28,7 +28,7 @@ import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils
 import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.maxRevisionIndex;
 import static 
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.toUtf8String;
 import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE;
-import static 
org.apache.ignite.internal.metastorage.server.raft.MetaStorageWriteHandler.IDEMPOTENT_COMMAND_PREFIX;
+import static 
org.apache.ignite.internal.metastorage.server.raft.MetaStorageWriteHandler.toIdempotentCommandKey;
 import static org.apache.ignite.internal.util.ArrayUtils.LONG_EMPTY_ARRAY;
 import static org.apache.ignite.internal.util.ByteUtils.toByteArray;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
@@ -52,7 +52,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListMap;
 import org.apache.ignite.internal.failure.NoOpFailureManager;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.metastorage.CommandId;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.dsl.Operation;
@@ -308,7 +307,7 @@ public class SimpleInMemoryKeyValueStorage extends 
AbstractKeyValueStorage {
             // In case of in-memory storage, there's no sense in "persisting" 
invoke result, however same persistent source operations
             // were added in order to have matching revisions count through 
all storages.
             ops.add(Operations.put(
-                    new ByteArray(IDEMPOTENT_COMMAND_PREFIX + 
commandId.toMgKeyAsString()),
+                    toIdempotentCommandKey(commandId),
                     branch ? INVOKE_RESULT_TRUE_BYTES : 
INVOKE_RESULT_FALSE_BYTES)
             );
 
@@ -362,10 +361,7 @@ public class SimpleInMemoryKeyValueStorage extends 
AbstractKeyValueStorage {
 
                     // In case of in-memory storage, there's no sense in 
"persisting" invoke result, however same persistent source
                     // operations were added in order to have matching 
revisions count through all storages.
-                    ops.add(Operations.put(
-                            new ByteArray(IDEMPOTENT_COMMAND_PREFIX + 
commandId.toMgKeyAsString()),
-                            branch.update().result().result())
-                    );
+                    ops.add(Operations.put(toIdempotentCommandKey(commandId), 
branch.update().result().result()));
 
                     for (Operation op : ops) {
                         switch (op.type()) {

Reply via email to