This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 79564ca7e4 IGNITE-23291 Improve the local compaction mechanism of
Metastorage (#4465)
79564ca7e4 is described below
commit 79564ca7e4cbc40fc35f9004e1595efaa425c781
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Sat Sep 28 12:54:23 2024 +0300
IGNITE-23291 Improve the local compaction mechanism of Metastorage (#4465)
---
.../java/org/apache/ignite/lang/ErrorGroups.java | 3 +
.../internal/metastorage/MetaStorageManager.java | 29 +++
.../metastorage/exceptions/CompactedException.java | 25 ++-
.../metastorage/impl/MetaStorageManagerImpl.java | 22 +-
.../metastorage/impl/MetaStorageService.java | 8 -
.../metastorage/impl/MetaStorageServiceImpl.java | 6 -
.../metastorage/server/CompactedException.java | 59 ------
.../metastorage/server/KeyValueStorage.java | 33 ++-
.../metastorage/server/KeyValueStorageUtils.java | 67 ++++++
.../server/persistence/RocksDbKeyValueStorage.java | 170 ++++++---------
.../AbstractCompactionKeyValueStorageTest.java | 227 +++++++++------------
.../server/KeyValueStorageUtilsTest.java | 78 +++++++
.../server/AbstractKeyValueStorageTest.java | 2 +-
.../server/SimpleInMemoryKeyValueStorage.java | 157 +++++++-------
modules/platforms/cpp/ignite/common/error_codes.h | 1 +
modules/platforms/cpp/ignite/odbc/common_types.cpp | 2 +
.../platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs | 3 +
17 files changed, 472 insertions(+), 420 deletions(-)
diff --git a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index 4a173d618b..2f15dd0383 100755
--- a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -302,6 +302,9 @@ public class ErrorGroups {
/** Failed to perform an operation within a specified time period.
Usually in such cases the operation should be retried. */
public static final int OP_EXECUTION_TIMEOUT_ERR =
META_STORAGE_ERR_GROUP.registerErrorCode((short) 5);
+
+ /** Failed to perform a read operation on the underlying key value
storage because the revision has already been compacted. */
+ public static final int COMPACTED_ERR =
META_STORAGE_ERR_GROUP.registerErrorCode((short) 6);
}
/** Index error group. */
diff --git
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index 09970056ca..65e0eb9586 100644
---
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -26,6 +26,7 @@ import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.dsl.Condition;
@@ -33,6 +34,7 @@ import org.apache.ignite.internal.metastorage.dsl.Iif;
import org.apache.ignite.internal.metastorage.dsl.Operation;
import org.apache.ignite.internal.metastorage.dsl.StatementResult;
import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
+import org.apache.ignite.internal.metastorage.exceptions.MetaStorageException;
import
org.apache.ignite.internal.metastorage.exceptions.OperationTimeoutException;
import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
import org.apache.ignite.internal.util.Cursor;
@@ -272,4 +274,31 @@ public interface MetaStorageManager extends
IgniteComponent {
/** Unregisters a Meta Storage revision update listener. */
void unregisterRevisionUpdateListener(RevisionUpdateListener listener);
+
+ /**
+ * Compacts outdated key versions and removes tombstones of metastorage
locally.
+ *
+ * <p>We do not compact the only and last version of the key unless it is
a tombstone.</p>
+ *
+ * <p>Let's look at some examples, let's say we have the following keys
with their versions:</p>
+ * <ul>
+ * <li>Key "foo" with versions that have revisions (1, 3, 5) - "foo"
[1, 3, 5].</li>
+ * <li>Key "bar" with versions that have revisions (1, 2, 5) the last
revision is a tombstone - "bar" [1, 2, 5 tomb].</li>
+ * </ul>
+ *
+ * <p>Let's look at examples of invoking the current method and what will
be in the storage after:</p>
+ * <ul>
+ * <li>Compaction revision is {@code 1}: "foo" [3, 5], "bar" [2, 5
tomb].</li>
+ * <li>Compaction revision is {@code 2}: "foo" [3, 5], "bar" [5
tomb].</li>
+ * <li>Compaction revision is {@code 3}: "foo" [5], "bar" [5
tomb].</li>
+ * <li>Compaction revision is {@code 4}: "foo" [5], "bar" [5
tomb].</li>
+ * <li>Compaction revision is {@code 5}: "foo" [5].</li>
+ * <li>Compaction revision is {@code 6}: "foo" [5].</li>
+ * </ul>
+ *
+ * @param revision Revision up to which (including) the metastorage keys
will be compacted.
+ * @throws IgniteInternalException with cause {@link
NodeStoppingException} if the node is in the process of stopping.
+ * @throws MetaStorageException If there is an error during the
metastorage compaction process.
+ */
+ void compactLocally(long revision);
}
diff --git
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/exceptions/CompactedException.java
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/exceptions/CompactedException.java
index 1b0d01e9c3..3f11f48f65 100644
---
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/exceptions/CompactedException.java
+++
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/exceptions/CompactedException.java
@@ -17,18 +17,29 @@
package org.apache.ignite.internal.metastorage.exceptions;
-import static org.apache.ignite.lang.ErrorGroups.MetaStorage.COMPACTION_ERR;
+import static org.apache.ignite.lang.ErrorGroups.MetaStorage.COMPACTED_ERR;
/**
- * Thrown when a requested operation on meta storage could not be performed
because target revisions were removed from storage due to a
- * compaction procedure. In such case the operation should be retried with
actual revision.
+ * Thrown when a requested read operation on meta storage could not be
performed because target revisions were removed from storage due to
+ * a compaction procedure. In such case the operation should be retried with
actual revision.
*/
public class CompactedException extends MetaStorageException {
+ private static final long serialVersionUID = -6849399873850280288L;
+
/**
* Constructs an exception.
*/
public CompactedException() {
- super(COMPACTION_ERR);
+ super(COMPACTED_ERR);
+ }
+
+ /**
+ * Constructs an exception with a given message.
+ *
+ * @param revision Requested revision.
+ */
+ public CompactedException(long revision) {
+ super(COMPACTED_ERR, "Requested revision has already been compacted: "
+ revision);
}
/**
@@ -37,7 +48,7 @@ public class CompactedException extends MetaStorageException {
* @param message Detail message.
*/
public CompactedException(String message) {
- super(COMPACTION_ERR, message);
+ super(COMPACTED_ERR, message);
}
/**
@@ -47,7 +58,7 @@ public class CompactedException extends MetaStorageException {
* @param cause Cause.
*/
public CompactedException(String message, Throwable cause) {
- super(COMPACTION_ERR, message, cause);
+ super(COMPACTED_ERR, message, cause);
}
/**
@@ -56,6 +67,6 @@ public class CompactedException extends MetaStorageException {
* @param cause Cause.
*/
public CompactedException(Throwable cause) {
- super(COMPACTION_ERR, cause);
+ super(COMPACTED_ERR, cause);
}
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index 6ddca448ab..fa7f9f6b0a 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -958,23 +958,6 @@ public class MetaStorageManagerImpl implements
MetaStorageManager, MetastorageGr
}
}
- /**
- * Compacts Meta storage (removes all tombstone entries and old entries
except of entries with latest revision).
- *
- * @see MetaStorageService#compact()
- */
- public CompletableFuture<Void> compact() {
- if (!busyLock.enterBusy()) {
- return failedFuture(new NodeStoppingException());
- }
-
- try {
- return metaStorageSvcFut.thenCompose(MetaStorageService::compact);
- } finally {
- busyLock.leaveBusy();
- }
- }
-
private void onSafeTimeAdvanced(HybridTimestamp time) {
assert time != null;
@@ -1147,4 +1130,9 @@ public class MetaStorageManagerImpl implements
MetaStorageManager, MetastorageGr
return S.toString(this);
}
}
+
+ @Override
+ public void compactLocally(long revision) {
+ inBusyLock(busyLock, () -> storage.compact(revision));
+ }
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageService.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageService.java
index 66bb7c57ab..563a025b7e 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageService.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageService.java
@@ -248,14 +248,6 @@ public interface MetaStorageService extends
ManuallyCloseable {
*/
Publisher<Entry> prefix(ByteArray prefix, long revUpperBound);
- /**
- * Compacts meta storage (removes all tombstone entries and old entries
except of entries with latest revision).
- *
- * @return Completed future. Couldn't be {@code null}.
- * @throws OperationTimeoutException If the operation is timed out. Will
be thrown on getting future result.
- */
- CompletableFuture<Void> compact();
-
/**
* Returns a future which will hold current revision of the metastorage
leader.
*/
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
index 3dc67a4012..1ec59b36cf 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
@@ -257,12 +257,6 @@ public class MetaStorageServiceImpl implements
MetaStorageService {
return context.raftService().run(syncTimeCommand);
}
- // TODO: IGNITE-19417 Implement.
- @Override
- public CompletableFuture<Void> compact() {
- throw new UnsupportedOperationException();
- }
-
@Override
public CompletableFuture<Long> currentRevision() {
GetCurrentRevisionCommand cmd =
context.commandsFactory().getCurrentRevisionCommand().build();
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/CompactedException.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/CompactedException.java
deleted file mode 100644
index 07f18a18f0..0000000000
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/CompactedException.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.metastorage.server;
-
-/**
- * Thrown when a requested operation on meta storage could not be performed
because target revisions were removed from storage due to a
- * compaction procedure. In such case the operation should be retried with
actual revision.
- */
-public class CompactedException extends RuntimeException {
- /**
- * Constructs an exception.
- */
- public CompactedException() {
- super();
- }
-
- /**
- * Constructs an exception with a given message.
- *
- * @param message Detail message.
- */
- public CompactedException(String message) {
- super(message);
- }
-
- /**
- * Constructs an exception with a given message and a cause.
- *
- * @param message Detail message.
- * @param cause Cause.
- */
- public CompactedException(String message, Throwable cause) {
- super(message, cause);
- }
-
- /**
- * Constructs an exception with a given cause.
- *
- * @param cause Cause.
- */
- public CompactedException(Throwable cause) {
- super(cause);
- }
-}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index 3159069f15..454b455a1c 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -30,6 +30,7 @@ import
org.apache.ignite.internal.metastorage.RevisionUpdateListener;
import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.metastorage.dsl.Operation;
import org.apache.ignite.internal.metastorage.dsl.StatementResult;
+import org.apache.ignite.internal.metastorage.exceptions.MetaStorageException;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.Nullable;
@@ -233,13 +234,33 @@ public interface KeyValueStorage extends
ManuallyCloseable {
void removeWatch(WatchListener listener);
/**
- * Compacts storage (removes tombstones).
+ * Compacts outdated key versions and removes tombstones of metastorage
locally.
*
- * @param lowWatermark A time threshold for the entry. Only entries that
have revisions with timestamp higher or equal to the
- * watermark can be removed.
+ * <p>We do not compact the only and last version of the key unless it is
a tombstone.</p>
+ *
+ * <p>Let's look at some examples, let's say we have the following keys
with their versions:</p>
+ * <ul>
+ * <li>Key "foo" with versions that have revisions (1, 3, 5) - "foo"
[1, 3, 5].</li>
+ * <li>Key "bar" with versions that have revisions (1, 2, 5) the last
revision is a tombstone - "bar" [1, 2, 5 tomb].</li>
+ * </ul>
+ *
+ * <p>Let's look at examples of invoking the current method and what will
be in the storage after:</p>
+ * <ul>
+ * <li>Compaction revision is {@code 1}: "foo" [3, 5], "bar" [2, 5
tomb].</li>
+ * <li>Compaction revision is {@code 2}: "foo" [3, 5], "bar" [5
tomb].</li>
+ * <li>Compaction revision is {@code 3}: "foo" [5], "bar" [5
tomb].</li>
+ * <li>Compaction revision is {@code 4}: "foo" [5], "bar" [5
tomb].</li>
+ * <li>Compaction revision is {@code 5}: "foo" [5].</li>
+ * <li>Compaction revision is {@code 6}: "foo" [5].</li>
+ * </ul>
+ *
+ * <p>Compaction revision is expected to be less than the {@link #revision
current storage revision}.</p>
+ *
+ * @param revision Revision up to which (including) the metastorage keys
will be compacted.
+ * @throws MetaStorageException If there is an error during the
metastorage compaction process.
*/
- // TODO: IGNITE-19417 Provide low-watermark for compaction.
- void compact(HybridTimestamp lowWatermark);
+ // TODO: IGNITE-23281 Do not hold write lock for the entire operation
+ void compact(long revision);
/**
* Creates a snapshot of the storage's current state in the specified
directory.
@@ -268,6 +289,7 @@ public interface KeyValueStorage extends ManuallyCloseable {
* @param revision Revision by which to do a lookup.
* @return Timestamp corresponding to the revision.
*/
+ // TODO: IGNITE-23307 Figure out what to do after compaction
HybridTimestamp timestampByRevision(long revision);
/**
@@ -276,6 +298,7 @@ public interface KeyValueStorage extends ManuallyCloseable {
* @param timestamp Timestamp by which to do a lookup.
* @return Revision lesser or equal to the timestamp or -1 if there is no
such revision.
*/
+ // TODO: IGNITE-23307 Figure out what to do after compaction
long revisionByTimestamp(HybridTimestamp timestamp);
/**
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorageUtils.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorageUtils.java
new file mode 100644
index 0000000000..41c9a55cdd
--- /dev/null
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorageUtils.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Arrays.binarySearch;
+
+import java.util.function.LongPredicate;
+
+/** Helper class with useful methods and constants for {@link KeyValueStorage}
implementations. */
+public class KeyValueStorageUtils {
+ /** Special value indicating that there are no key revisions that need to
be compacted. */
+ public static final int NOTHING_TO_COMPACT_INDEX = -1;
+
+ /**
+ * Calculates the revision index in key revisions up to which compaction
is needed or {@link #NOTHING_TO_COMPACT_INDEX} if nothing
+ * needs to be compacted.
+ *
+ * <p>If the returned index points to the last revision and if the last
revision is <b>not</b> a tombstone, then the returned index is
+ * decremented by 1.</p>
+ *
+ * @param keyRevisions Metastorage key revisions in ascending order.
+ * @param compactionRevisionInclusive Revision up to which you need to
compact (inclusive).
+ * @param isTombstone Predicate to test whether a key revision is a
tombstone.
+ */
+ public static int indexToCompact(long[] keyRevisions, long
compactionRevisionInclusive, LongPredicate isTombstone) {
+ int i = binarySearch(keyRevisions, compactionRevisionInclusive);
+
+ if (i < 0) {
+ if (i == -1) {
+ return NOTHING_TO_COMPACT_INDEX;
+ }
+
+ i = -(i + 2);
+ }
+
+ if (i == keyRevisions.length - 1 &&
!isTombstone.test(keyRevisions[i])) {
+ i = i == 0 ? NOTHING_TO_COMPACT_INDEX : i - 1;
+ }
+
+ return i;
+ }
+
+ /**
+ * Converts bytes to UTF-8 string.
+ *
+ * @param bytes Bytes.
+ */
+ public static String toUtf8String(byte[] bytes) {
+ return new String(bytes, UTF_8);
+ }
+}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index d7a7346d95..ee1fb66078 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -17,6 +17,10 @@
package org.apache.ignite.internal.metastorage.server.persistence;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.NOTHING_TO_COMPACT_INDEX;
+import static
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.indexToCompact;
+import static
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.toUtf8String;
import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE;
import static
org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.appendLong;
import static
org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.bytesToLong;
@@ -44,7 +48,6 @@ import static
org.apache.ignite.lang.ErrorGroups.MetaStorage.STARTING_STORAGE_ER
import static org.rocksdb.util.SizeUnit.MB;
import java.io.IOException;
-import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
@@ -136,13 +139,13 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
/** Revision key. */
private static final byte[] REVISION_KEY = keyToRocksKey(
SYSTEM_REVISION_MARKER_VALUE,
- "SYSTEM_REVISION_KEY".getBytes(StandardCharsets.UTF_8)
+ "SYSTEM_REVISION_KEY".getBytes(UTF_8)
);
/** Update counter key. */
private static final byte[] UPDATE_COUNTER_KEY = keyToRocksKey(
SYSTEM_REVISION_MARKER_VALUE,
- "SYSTEM_UPDATE_COUNTER_KEY".getBytes(StandardCharsets.UTF_8)
+ "SYSTEM_UPDATE_COUNTER_KEY".getBytes(UTF_8)
);
/** Lexicographic order comparator. */
@@ -971,23 +974,21 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
}
@Override
- public void compact(HybridTimestamp lowWatermark) {
- rwLock.writeLock().lock();
+ public void compact(long revision) {
+ assert revision >= 0;
+ rwLock.writeLock().lock();
try (WriteBatch batch = new WriteBatch()) {
- // Find a revision with timestamp lesser or equal to the watermark.
- long maxRevision = revisionByTimestamp(lowWatermark);
-
- if (maxRevision == -1) {
- // Nothing to compact yet.
- return;
- }
+ assert revision < rev : String.format(
+ "Compaction revision should be less than the current:
[compaction=%s, current=%s]",
+ revision, rev
+ );
try (RocksIterator iterator = index.newIterator()) {
iterator.seekToFirst();
- RocksUtils.forEach(iterator, (key, value) ->
compactForKey(batch, key, getAsLongs(value), maxRevision));
+ RocksUtils.forEach(iterator, (key, value) ->
compactForKey(batch, key, getAsLongs(value), revision));
}
fillAndWriteBatch(batch, rev, updCntr, null);
@@ -1026,104 +1027,41 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
}
/**
- * Compacts all entries by the given key, removing revision that are no
longer needed.
- * Last entry with a revision lesser or equal to the {@code
minRevisionToKeep} and all consecutive entries will be preserved.
- * If the first entry to keep is a tombstone, it will be removed.
- * Example:
- * <pre>
- * Example 1:
- * put entry1: revision 5
- * put entry2: revision 7
- *
- * do compaction: revision 6
- *
- * entry1: exists
- * entry2: exists
- *
- * Example 2:
- * put entry1: revision 5
- * put entry2: revision 7
- *
- * do compaction: revision 7
- *
- * entry1: doesn't exist
- * entry2: exists
- * </pre>
+ * Compacts the key, see the documentation of {@link
KeyValueStorage#compact} for examples.
*
* @param batch Write batch.
- * @param key Target key.
- * @param revs Revisions.
- * @param minRevisionToKeep Minimum revision that should be kept.
- * @throws RocksDBException If failed.
+ * @param key Target key.
+ * @param revs Key revisions.
+ * @param compactionRevision Revision up to which (inclusively) the key
will be compacted.
+ * @throws MetaStorageException If failed.
*/
- private void compactForKey(WriteBatch batch, byte[] key, long[] revs, long
minRevisionToKeep) throws RocksDBException {
- if (revs.length < 2) {
- // If we have less than two revisions, there is no point in
compaction.
- return;
- }
-
- // Index of the first revision we will be keeping in the array of
revisions.
- int idxToKeepFrom = 0;
-
- // Whether there is an entry with the minRevisionToKeep.
- boolean hasMinRevision = false;
+ private void compactForKey(WriteBatch batch, byte[] key, long[] revs, long
compactionRevision) {
+ try {
+ int indexToCompact = indexToCompact(revs, compactionRevision,
revision -> isTombstoneForCompaction(key, revision));
- // Traverse revisions, looking for the first revision that needs to be
kept.
- for (long rev : revs) {
- if (rev >= minRevisionToKeep) {
- if (rev == minRevisionToKeep) {
- hasMinRevision = true;
- }
- break;
+ if (NOTHING_TO_COMPACT_INDEX == indexToCompact) {
+ return;
}
- idxToKeepFrom++;
- }
-
- if (!hasMinRevision) {
- // Minimal revision was not encountered, that mean that we are
between revisions of a key, so previous revision
- // must be preserved.
- idxToKeepFrom--;
- }
-
- if (idxToKeepFrom <= 0) {
- // All revisions are still in use.
- return;
- }
-
- for (int i = 0; i < idxToKeepFrom; i++) {
- // This revision is not needed anymore, remove data.
- data.delete(batch, keyToRocksKey(revs[i], key));
- }
-
- // Whether we only have last revision (even if it's lesser or equal to
watermark).
- boolean onlyLastRevisionLeft = idxToKeepFrom == (revs.length - 1);
-
- // Get the number of the first revision that will be kept.
- long rev = onlyLastRevisionLeft ? lastRevision(revs) :
revs[idxToKeepFrom];
-
- byte[] rocksKey = keyToRocksKey(rev, key);
-
- Value value = bytesToValue(data.get(rocksKey));
-
- if (value.tombstone()) {
- // The first revision we are going to keep is a tombstone, we may
delete it.
- data.delete(batch, rocksKey);
-
- if (!onlyLastRevisionLeft) {
- // First revision was a tombstone, but there are other
revisions, that need to be kept,
- // so advance index of the first revision we need to keep.
- idxToKeepFrom++;
+ for (int revisionIndex = 0; revisionIndex <= indexToCompact;
revisionIndex++) {
+ // This revision is not needed anymore, remove data.
+ data.delete(batch, keyToRocksKey(revs[revisionIndex], key));
}
- }
- if (onlyLastRevisionLeft && value.tombstone()) {
- // We don't have any previous revisions for this entry and the
single existing is a tombstone,
- // so we can remove it from index.
- index.delete(batch, key);
- } else {
- // Keeps revisions starting with idxToKeepFrom.
- index.put(batch, key, longsToBytes(revs, idxToKeepFrom));
+ if (indexToCompact == revs.length - 1) {
+ index.delete(batch, key);
+ } else {
+ index.put(batch, key, longsToBytes(revs, indexToCompact + 1));
+ }
+ } catch (Throwable t) {
+ throw new MetaStorageException(
+ COMPACTION_ERR,
+ String.format(
+ "Error during compaction of key: [KeyBytes=%s,
keyBytesToUtf8String=%s]",
+ Arrays.toString(key), toUtf8String(key)
+ ),
+ t
+ );
}
}
@@ -1194,7 +1132,6 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
assert revUpperBound >= 0 : "Invalid arguments: [revUpperBound=" +
revUpperBound + ']';
assert revUpperBound >= revLowerBound
: "Invalid arguments: [revLowerBound=" + revLowerBound + ",
revUpperBound=" + revUpperBound + ']';
- // TODO: IGNITE-19782 throw CompactedException if revLowerBound is
compacted.
long[] revs;
@@ -1633,4 +1570,29 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
rwLock.writeLock().unlock();
}
}
+
+ private boolean isTombstone(byte[] key, long revision) throws
RocksDBException {
+ byte[] rocksKey = keyToRocksKey(revision, key);
+
+ byte[] valueBytes = data.get(rocksKey);
+
+ assert valueBytes != null : "key=" + toUtf8String(key) + ", revision="
+ revision;
+
+ return bytesToValue(valueBytes).tombstone();
+ }
+
+ private boolean isTombstoneForCompaction(byte[] key, long revision) {
+ try {
+ return isTombstone(key, revision);
+ } catch (RocksDBException e) {
+ throw new MetaStorageException(
+ COMPACTION_ERR,
+ String.format(
+ "Error getting key value by revision:
[KeyBytes=%s, keyBytesToUtf8String=%s, revision=%s]",
+ Arrays.toString(key), toUtf8String(key), revision
+ ),
+ e
+ );
+ }
+ }
}
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
index 9ccc44caa8..4eef4a8aed 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
@@ -17,186 +17,147 @@
package org.apache.ignite.internal.metastorage.server;
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.ops;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.remove;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.impl.CommandIdGenerator;
+import org.apache.ignite.internal.metastorage.server.ExistenceCondition.Type;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
/** Compaction tests. */
public abstract class AbstractCompactionKeyValueStorageTest extends
AbstractKeyValueStorageTest {
- private final HybridClock clock = new HybridClockImpl();
+ private static final byte[] FOO_KEY = fromString("foo");
- @Test
- public void testCompactionAfterLastRevision() {
- byte[] key = key(0);
- byte[] value1 = keyValue(0, 0);
- byte[] value2 = keyValue(0, 1);
+ private static final byte[] BAR_KEY = fromString("bar");
- storage.put(key, value1, clock.now());
- storage.put(key, value2, clock.now());
+ private static final byte[] SOME_KEY = fromString("someKey");
- long lastRevision = storage.revision();
+ private static final byte[] SOME_VALUE = fromString("someValue");
- storage.compact(clock.now());
+ private final HybridClock clock = new HybridClockImpl();
- // Latest value, must exist.
- Entry entry2 = storage.get(key, lastRevision);
- assertEquals(lastRevision, entry2.revision());
- assertArrayEquals(value2, entry2.value());
+ @Override
+ @BeforeEach
+ void setUp() {
+ super.setUp();
- // Previous value, must be removed due to compaction.
- Entry entry1 = storage.get(key, lastRevision - 1);
- assertTrue(entry1.empty());
- }
+ storage.putAll(List.of(FOO_KEY, BAR_KEY), List.of(SOME_VALUE,
SOME_VALUE), clock.now());
+ storage.put(BAR_KEY, SOME_VALUE, clock.now());
+ storage.put(FOO_KEY, SOME_VALUE, clock.now());
+ storage.put(SOME_KEY, SOME_VALUE, clock.now());
- @Test
- public void testCompactionAfterTombstone() {
- byte[] key = key(0);
- byte[] value = keyValue(0, 0);
+ var fooKey = new ByteArray(FOO_KEY);
+ var barKey = new ByteArray(BAR_KEY);
- storage.put(key, value, clock.now());
- storage.remove(key, clock.now());
+ var iif = new If(
+ new AndCondition(new ExistenceCondition(Type.EXISTS, FOO_KEY),
new ExistenceCondition(Type.EXISTS, BAR_KEY)),
+ new Statement(ops(put(fooKey, SOME_VALUE),
remove(barKey)).yield()),
+ new Statement(ops(noop()).yield())
+ );
- long lastRevision = storage.revision();
+ storage.invoke(iif, clock.now(), new
CommandIdGenerator(UUID::randomUUID).newId());
- storage.compact(clock.now());
+ storage.remove(SOME_KEY, clock.now());
- // Current value, must be removed due to being a tombstone.
- Entry entry2 = storage.get(key, lastRevision);
- assertTrue(entry2.empty());
+ // Special revision update to prevent tests from failing.
+ storage.put(fromString("fake"), SOME_VALUE, clock.now());
- // Previous value, must be removed due to compaction.
- Entry entry1 = storage.get(key, lastRevision - 1);
- assertTrue(entry1.empty());
+ assertEquals(List.of(1, 3, 5), collectRevisions(FOO_KEY));
+ assertEquals(List.of(1, 2, 5), collectRevisions(BAR_KEY));
+ assertEquals(List.of(4, 6), collectRevisions(SOME_KEY));
}
@Test
- public void testCompactionBetweenMultipleWrites() {
- byte[] key = key(0);
- byte[] value1 = keyValue(0, 0);
- byte[] value2 = keyValue(0, 1);
- byte[] value3 = keyValue(0, 2);
- byte[] value4 = keyValue(0, 3);
-
- storage.put(key, value1, clock.now());
- storage.put(key, value2, clock.now());
-
- HybridTimestamp compactTs = clock.now();
-
- storage.put(key, value3, clock.now());
- storage.put(key, value4, clock.now());
-
- long lastRevision = storage.revision();
+ void testCompactRevision1() {
+ storage.compact(1);
- storage.compact(compactTs);
-
- Entry entry4 = storage.get(key, lastRevision);
- assertArrayEquals(value4, entry4.value());
-
- Entry entry3 = storage.get(key, lastRevision - 1);
- assertArrayEquals(value3, entry3.value());
-
- Entry entry2 = storage.get(key, lastRevision - 2);
- assertArrayEquals(value2, entry2.value());
-
- // Previous value, must be removed due to compaction.
- Entry entry1 = storage.get(key, lastRevision - 3);
- assertTrue(entry1.empty());
+ assertEquals(List.of(3, 5), collectRevisions(FOO_KEY));
+ assertEquals(List.of(2, 5), collectRevisions(BAR_KEY));
+ assertEquals(List.of(4, 6), collectRevisions(SOME_KEY));
}
@Test
- public void testCompactionAfterTombstoneRemovesTombstone() {
- byte[] key = key(0);
- byte[] value1 = keyValue(0, 0);
- byte[] value2 = keyValue(0, 1);
-
- storage.put(key, value1, clock.now());
-
- storage.remove(key, clock.now());
-
- HybridTimestamp compactTs = clock.now();
-
- storage.put(key, value2, clock.now());
+ void testCompactRevision2() {
+ storage.compact(2);
- storage.remove(key, clock.now());
-
- long lastRevision = storage.revision();
-
- storage.compact(compactTs);
-
- // Last operation was remove, so this is a tombstone.
- Entry entry4 = storage.get(key, lastRevision);
- assertTrue(entry4.tombstone());
-
- Entry entry3 = storage.get(key, lastRevision - 1);
- assertArrayEquals(value2, entry3.value());
-
- // Previous value, must be removed due to compaction.
- Entry entry2 = storage.get(key, lastRevision - 2);
- assertTrue(entry2.empty());
-
- Entry entry1 = storage.get(key, lastRevision - 3);
- assertTrue(entry1.empty());
+ assertEquals(List.of(3, 5), collectRevisions(FOO_KEY));
+ assertEquals(List.of(5), collectRevisions(BAR_KEY));
+ assertEquals(List.of(4, 6), collectRevisions(SOME_KEY));
}
@Test
- public void testCompactEmptyStorage() {
- storage.compact(clock.now());
+ void testCompactRevision3() {
+ storage.compact(3);
+
+ assertEquals(List.of(5), collectRevisions(FOO_KEY));
+ assertEquals(List.of(5), collectRevisions(BAR_KEY));
+ assertEquals(List.of(4, 6), collectRevisions(SOME_KEY));
}
@Test
- public void testCompactionBetweenRevisionsOfOneKey() {
- byte[] key = key(0);
- byte[] value11 = keyValue(0, 0);
- byte[] value12 = keyValue(0, 1);
-
- storage.put(key, value11, clock.now());
+ void testCompactRevision4() {
+ storage.compact(4);
- byte[] key2 = key(1);
- byte[] value2 = keyValue(1, 0);
- storage.put(key2, value2, clock.now());
-
- HybridTimestamp compactTs = clock.now();
-
- storage.put(key, value12, clock.now());
+ assertEquals(List.of(5), collectRevisions(FOO_KEY));
+ assertEquals(List.of(5), collectRevisions(BAR_KEY));
+ assertEquals(List.of(6), collectRevisions(SOME_KEY));
+ }
- storage.compact(compactTs);
+ @Test
+ void testCompactRevision5() {
+ storage.compact(5);
- // Both keys should exist, as low watermark's revision is higher than
entry11's, but lesser than entry12's,
- // this means that entry1 is still needed.
- Entry entry12 = storage.get(key, storage.revision());
- assertArrayEquals(value12, entry12.value());
+ assertEquals(List.of(5), collectRevisions(FOO_KEY));
+ assertEquals(List.of(), collectRevisions(BAR_KEY));
+ assertEquals(List.of(6), collectRevisions(SOME_KEY));
+ }
- Entry entry11 = storage.get(key, storage.revision() - 1);
- assertArrayEquals(value11, entry11.value());
+ @Test
+ void testCompactRevision6() {
+ storage.compact(6);
- Entry entry2 = storage.get(key2, storage.revision());
- assertArrayEquals(value2, entry2.value());
+ assertEquals(List.of(5), collectRevisions(FOO_KEY));
+ assertEquals(List.of(), collectRevisions(BAR_KEY));
+ assertEquals(List.of(), collectRevisions(SOME_KEY));
}
@Test
- public void testInvokeCompactionBeforeAnyEntry() {
- byte[] key = key(0);
- byte[] value1 = keyValue(0, 0);
- byte[] value2 = keyValue(0, 1);
+ void testCompactRevisionSequentially() {
+ testCompactRevision1();
+ testCompactRevision2();
+ testCompactRevision3();
+ testCompactRevision4();
+ testCompactRevision5();
+ testCompactRevision6();
+ }
- HybridTimestamp compactTs = clock.now();
+ private List<Integer> collectRevisions(byte[] key) {
+ var revisions = new ArrayList<Integer>();
- storage.put(key, value1, clock.now());
- storage.put(key, value2, clock.now());
+ for (int revision = 0; revision <= storage.revision(); revision++) {
+ Entry entry = storage.get(key, revision);
- storage.compact(compactTs);
+ if (!entry.empty() && entry.revision() == revision) {
+ revisions.add(revision);
+ }
+ }
- // No entry should be compacted.
- Entry entry2 = storage.get(key, storage.revision());
- assertArrayEquals(value2, entry2.value());
+ return revisions;
+ }
- Entry entry1 = storage.get(key, storage.revision() - 1);
- assertArrayEquals(value1, entry1.value());
+ private static byte[] fromString(String s) {
+ return s.getBytes(UTF_8);
}
}
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/KeyValueStorageUtilsTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/KeyValueStorageUtilsTest.java
new file mode 100644
index 0000000000..241aeb9fbd
--- /dev/null
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/KeyValueStorageUtilsTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.NOTHING_TO_COMPACT_INDEX;
+import static
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.indexToCompact;
+import static
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.toUtf8String;
+import static org.apache.ignite.internal.util.ArrayUtils.LONG_EMPTY_ARRAY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.junit.jupiter.api.Test;
+
+/** For {@link KeyValueStorageUtils} testing. */
+public class KeyValueStorageUtilsTest {
+ @Test
+ void testIndexToCompactNoRevisions() {
+ assertEquals(NOTHING_TO_COMPACT_INDEX,
indexToCompact(LONG_EMPTY_ARRAY, 0, revision -> false));
+ assertEquals(NOTHING_TO_COMPACT_INDEX,
indexToCompact(LONG_EMPTY_ARRAY, 0, revision -> true));
+ }
+
+ @Test
+ void testIndexToCompactSingleRevision() {
+ long[] keyRevisions = {2};
+
+ assertEquals(NOTHING_TO_COMPACT_INDEX, indexToCompact(keyRevisions, 1,
revision -> false));
+ assertEquals(NOTHING_TO_COMPACT_INDEX, indexToCompact(keyRevisions, 1,
revision -> true));
+
+ assertEquals(0, indexToCompact(keyRevisions, 2, revision -> true));
+ assertEquals(NOTHING_TO_COMPACT_INDEX, indexToCompact(keyRevisions, 2,
revision -> false));
+
+ assertEquals(0, indexToCompact(keyRevisions, 3, revision -> true));
+ assertEquals(NOTHING_TO_COMPACT_INDEX, indexToCompact(keyRevisions, 3,
revision -> false));
+ }
+
+ @Test
+ void testIndexToCompactMultipleRevisions() {
+ long[] keyRevisions = {2, 4, 5};
+
+ assertEquals(NOTHING_TO_COMPACT_INDEX, indexToCompact(keyRevisions, 1,
revision -> true));
+ assertEquals(NOTHING_TO_COMPACT_INDEX, indexToCompact(keyRevisions, 1,
revision -> false));
+
+ assertEquals(0, indexToCompact(keyRevisions, 2, revision -> true));
+ assertEquals(0, indexToCompact(keyRevisions, 2, revision -> false));
+
+ assertEquals(0, indexToCompact(keyRevisions, 3, revision -> true));
+ assertEquals(0, indexToCompact(keyRevisions, 3, revision -> false));
+
+ assertEquals(1, indexToCompact(keyRevisions, 4, revision -> true));
+ assertEquals(1, indexToCompact(keyRevisions, 4, revision -> false));
+
+ assertEquals(2, indexToCompact(keyRevisions, 5, revision -> true));
+ assertEquals(1, indexToCompact(keyRevisions, 5, revision -> false));
+
+ assertEquals(2, indexToCompact(keyRevisions, 6, revision -> true));
+ assertEquals(1, indexToCompact(keyRevisions, 6, revision -> false));
+ }
+
+ @Test
+ void testToUtf8String() {
+ assertEquals("foo", toUtf8String("foo".getBytes(UTF_8)));
+ }
+}
diff --git
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
index 73160d3721..5abbcd0104 100644
---
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
+++
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
@@ -30,7 +30,7 @@ public abstract class AbstractKeyValueStorageTest extends
BaseIgniteAbstractTest
protected KeyValueStorage storage;
@BeforeEach
- public void setUp() {
+ void setUp() {
storage = createStorage();
storage.start();
diff --git
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index e1fe414002..0cace48eaa 100644
---
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -19,9 +19,13 @@ package org.apache.ignite.internal.metastorage.server;
import static java.util.stream.Collectors.collectingAndThen;
import static java.util.stream.Collectors.toList;
+import static
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.NOTHING_TO_COMPACT_INDEX;
+import static
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.indexToCompact;
+import static
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.toUtf8String;
import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE;
import static
org.apache.ignite.internal.metastorage.server.raft.MetaStorageWriteHandler.IDEMPOTENT_COMMAND_PREFIX;
import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix;
+import static org.apache.ignite.internal.util.ArrayUtils.LONG_EMPTY_ARRAY;
import static org.apache.ignite.internal.util.ByteUtils.toByteArray;
import static org.apache.ignite.lang.ErrorGroups.MetaStorage.OP_EXECUTION_ERR;
@@ -40,6 +44,7 @@ import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.function.LongConsumer;
import java.util.function.Predicate;
import org.apache.ignite.internal.failure.NoOpFailureManager;
@@ -65,8 +70,14 @@ public class SimpleInMemoryKeyValueStorage implements
KeyValueStorage {
/** Lexicographical comparator. */
private static final Comparator<byte[]> CMP = Arrays::compareUnsigned;
- /** Keys index. Value is the list of all revisions under which entry
corresponding to the key was modified. */
- private NavigableMap<byte[], List<Long>> keysIdx = new TreeMap<>(CMP);
+ /**
+ * Keys index. Value is the list of all revisions under which entry
corresponding to the key was modified.
+ *
+ * <p>Concurrent map to avoid {@link
java.util.ConcurrentModificationException} on compaction.</p>
+ *
+ * <p>Guarded by {@link #mux}.</p>
+ */
+ private final NavigableMap<byte[], List<Long>> keysIdx = new
ConcurrentSkipListMap<>(CMP);
/** Timestamp to revision mapping. */
private final NavigableMap<Long, Long> tsToRevMap = new TreeMap<>();
@@ -74,8 +85,14 @@ public class SimpleInMemoryKeyValueStorage implements
KeyValueStorage {
/** Revision to timestamp mapping. */
private final Map<Long, HybridTimestamp> revToTsMap = new HashMap<>();
- /** Revisions index. Value contains all entries which were modified under
particular revision. */
- private NavigableMap<Long, NavigableMap<byte[], Value>> revsIdx = new
TreeMap<>();
+ /**
+ * Revisions index. Value contains all entries which were modified under
particular revision.
+ *
+ * <p>Concurrent map to avoid {@link
java.util.ConcurrentModificationException} on compaction.</p>
+ *
+ * <p>Guarded by {@link #mux}.</p>
+ */
+ private final NavigableMap<Long, NavigableMap<byte[], Value>> revsIdx =
new ConcurrentSkipListMap<>();
/** Revision. Will be incremented for each single-entry or multi-entry
update operation. */
private long rev;
@@ -499,23 +516,16 @@ public class SimpleInMemoryKeyValueStorage implements
KeyValueStorage {
}
@Override
- public void compact(HybridTimestamp lowWatermark) {
- synchronized (mux) {
- NavigableMap<byte[], List<Long>> compactedKeysIdx = new
TreeMap<>(CMP);
+ public void compact(long revision) {
+ assert revision >= 0;
- NavigableMap<Long, NavigableMap<byte[], Value>> compactedRevsIdx =
new TreeMap<>();
-
- long maxRevision = revisionByTimestamp(lowWatermark);
-
- if (maxRevision == -1) {
- return;
- }
-
- keysIdx.forEach((key, revs) -> compactForKey(key, revs,
compactedKeysIdx, compactedRevsIdx, maxRevision));
-
- keysIdx = compactedKeysIdx;
+ synchronized (mux) {
+ assert revision < rev : String.format(
+ "Compaction revision should be less than the current:
[compaction=%s, current=%s]",
+ revision, rev
+ );
- revsIdx = compactedRevsIdx;
+ keysIdx.forEach((key, revs) -> compactForKey(key,
toLongArray(revs), revision));
}
}
@@ -547,78 +557,42 @@ public class SimpleInMemoryKeyValueStorage implements
KeyValueStorage {
}
/**
- * Compacts all entries by the given key, removing revision that are no
longer needed.
- * Last entry with a revision lesser or equal to the {@code
minRevisionToKeep} and all consecutive entries will be preserved.
- * If the first entry to keep is a tombstone, it will be removed.
+ * Compacts the key, see the documentation of {@link
KeyValueStorage#compact} for examples.
*
- * @param key A key.
- * @param revs All revisions of a key.
- * @param compactedKeysIdx Out parameter, revisions that need to be kept
must be put here.
- * @param compactedRevsIdx Out parameter, values that need to be kept must
be put here.
- * @param minRevisionToKeep Minimum revision that should be kept.
+ * @param key Target key.
+ * @param revs Key revisions.
+ * @param compactionRevision Revision up to which (inclusively) the key
will be compacted.
+ * @throws MetaStorageException If failed.
*/
- private void compactForKey(
- byte[] key,
- List<Long> revs,
- Map<byte[], List<Long>> compactedKeysIdx,
- Map<Long, NavigableMap<byte[], Value>> compactedRevsIdx,
- long minRevisionToKeep
- ) {
- List<Long> revsToKeep = new ArrayList<>();
+ private void compactForKey(byte[] key, long[] revs, long
compactionRevision) {
+ int indexToCompact = indexToCompact(revs, compactionRevision, revision
-> isTombstoneForCompaction(key, revision));
- // Index of the first revision we will be keeping in the array of
revisions.
- int idxToKeepFrom = 0;
+ if (indexToCompact == NOTHING_TO_COMPACT_INDEX) {
+ return;
+ }
- // Whether there is an entry with the minRevisionToKeep.
- boolean hasMinRevision = false;
+ // Let's deal with the key revisions.
+ if (indexToCompact == revs.length - 1) {
+ keysIdx.remove(key);
+ } else {
+ List<Long> keyRevisions = keysIdx.get(key);
- // Traverse revisions, looking for the first revision that needs to be
kept.
- for (long rev : revs) {
- if (rev >= minRevisionToKeep) {
- if (rev == minRevisionToKeep) {
- hasMinRevision = true;
- }
- break;
- }
+ assert keyRevisions != null : toUtf8String(key);
- idxToKeepFrom++;
+ keyRevisions.subList(0, indexToCompact + 1).clear();
}
- if (!hasMinRevision) {
- // Minimal revision was not encountered, that mean that we are
between revisions of a key, so previous revision
- // must be preserved.
- idxToKeepFrom--;
- }
+ // Let's deal with the key values.
+ for (int revisionIndex = 0; revisionIndex < indexToCompact;
revisionIndex++) {
+ long revision = revs[revisionIndex];
- for (int i = idxToKeepFrom; i < revs.size(); i++) {
- long rev = revs.get(i);
-
- // If this revision is higher than max revision or is the last
revision, we may need to keep it.
- NavigableMap<byte[], Value> kv = revsIdx.get(rev);
+ NavigableMap<byte[], Value> valueByKey = revsIdx.get(revision);
- Value value = kv.get(key);
+ valueByKey.remove(key);
- if (i == idxToKeepFrom) {
- // Check if a first entry to keep is a tombstone.
- if (value.tombstone()) {
- // If this is a first revision we are keeping and it is a
tombstone, then don't keep it.
- continue;
- }
+ if (valueByKey.isEmpty()) {
+ revsIdx.remove(revision);
}
-
- NavigableMap<byte[], Value> compactedKv =
compactedRevsIdx.computeIfAbsent(
- rev,
- k -> new TreeMap<>(CMP)
- );
-
- // Keep the entry and the revision.
- compactedKv.put(key, value);
-
- revsToKeep.add(rev);
- }
-
- if (!revsToKeep.isEmpty()) {
- compactedKeysIdx.put(key, revsToKeep);
}
}
@@ -660,7 +634,6 @@ public class SimpleInMemoryKeyValueStorage implements
KeyValueStorage {
assert revUpperBound >= 0 : "Invalid arguments: [revUpperBound=" +
revUpperBound + ']';
assert revUpperBound >= revLowerBound
: "Invalid arguments: [revLowerBound=" + revLowerBound + ",
revUpperBound=" + revUpperBound + ']';
- // TODO: IGNITE-19782 throw CompactedException if revLowerBound is
compacted.
List<Long> revs = keysIdx.get(key);
@@ -861,4 +834,28 @@ public class SimpleInMemoryKeyValueStorage implements
KeyValueStorage {
watchProcessor.advanceSafeTime(newSafeTime);
}
}
+
+ private static long[] toLongArray(List<Long> list) {
+ if (list.isEmpty()) {
+ return LONG_EMPTY_ARRAY;
+ }
+
+ var array = new long[list.size()];
+
+ for (int i = 0; i < array.length; i++) {
+ array[i] = list.get(i);
+ }
+
+ return array;
+ }
+
+ private boolean isTombstoneForCompaction(byte[] key, long revision) {
+ NavigableMap<byte[], Value> kv = revsIdx.get(revision);
+
+ Value value = kv.get(key);
+
+ assert value != null : "key=" + toUtf8String(key) + ", revision=" +
revision;
+
+ return value.tombstone();
+ }
}
diff --git a/modules/platforms/cpp/ignite/common/error_codes.h
b/modules/platforms/cpp/ignite/common/error_codes.h
index 0f1706c64a..479297a33b 100644
--- a/modules/platforms/cpp/ignite/common/error_codes.h
+++ b/modules/platforms/cpp/ignite/common/error_codes.h
@@ -112,6 +112,7 @@ enum class code : underlying_t {
COMPACTION = 0x50003,
OP_EXECUTION = 0x50004,
OP_EXECUTION_TIMEOUT = 0x50005,
+ COMPACTED = 0x50006,
// Index group. Group code: 6
INVALID_INDEX_DEFINITION = 0x60001,
diff --git a/modules/platforms/cpp/ignite/odbc/common_types.cpp
b/modules/platforms/cpp/ignite/odbc/common_types.cpp
index 4d767591d4..e4feab83ec 100644
--- a/modules/platforms/cpp/ignite/odbc/common_types.cpp
+++ b/modules/platforms/cpp/ignite/odbc/common_types.cpp
@@ -181,6 +181,8 @@ sql_state error_code_to_sql_state(error::code code) {
case error::code::RESTORING_STORAGE:
case error::code::COMPACTION:
return sql_state::SHY000_GENERAL_ERROR;
+ case error::code::COMPACTED:
+ return sql_state::SHY000_GENERAL_ERROR;
// Index group. Group code: 6
case error::code::INDEX_NOT_FOUND:
diff --git a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
index f5e10723d9..c3f7c7fb09 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
@@ -229,6 +229,9 @@ namespace Apache.Ignite
/// <summary> OpExecutionTimeout error. </summary>
public const int OpExecutionTimeout = (GroupCode << 16) | (5 &
0xFFFF);
+
+ /// <summary> Compacted error. </summary>
+ public const int Compacted = (GroupCode << 16) | (6 & 0xFFFF);
}
/// <summary> Index errors. </summary>