This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 79f9e3f1214 IGNITE-25030 Remove rwLock from rocksdb-based meta-storage
(#5670)
79f9e3f1214 is described below
commit 79f9e3f1214416ba8d0d66f6f22375d1626fa045
Author: Ivan Bessonov <[email protected]>
AuthorDate: Wed Apr 23 12:12:31 2025 +0300
IGNITE-25030 Remove rwLock from rocksdb-based meta-storage (#5670)
---
.../impl/ItMetaStorageServicePersistenceTest.java | 8 +-
.../metastorage/impl/MetaStorageManagerImpl.java | 35 +-
.../server/AbstractKeyValueStorage.java | 177 +++------
.../server/ReadOperationForCompactionTracker.java | 106 ++++--
.../server/persistence/RocksDbKeyValueStorage.java | 400 +++++++++++----------
.../server/persistence/WriteBatchProtector.java | 87 +++++
.../AbstractCompactionKeyValueStorageTest.java | 41 ++-
.../ReadOperationForCompactionTrackerTest.java | 143 +++++---
.../persistence/WriteBatchProtectorTest.java | 82 +++++
.../server/SimpleInMemoryKeyValueStorage.java | 99 ++++-
10 files changed, 765 insertions(+), 413 deletions(-)
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
index 82201e1b2ee..5c33f2c55ad 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.metastorage.impl;
-import static
org.apache.ignite.internal.metastorage.TestMetasStorageUtils.ANY_TIMESTAMP;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -138,14 +137,9 @@ public class ItMetaStorageServicePersistenceTest extends
ItAbstractListenerSnaps
KeyValueStorage storage = storageByName.get(node.name());
- byte[] lastKey = interactedAfterSnapshot ? SECOND_KEY.bytes() :
FIRST_KEY.bytes();
- byte[] lastValue = interactedAfterSnapshot ? SECOND_VALUE :
FIRST_VALUE;
-
int expectedRevision = interactedAfterSnapshot ? 4 : 3;
- Entry expectedLastEntry = new EntryImpl(lastKey, lastValue,
expectedRevision, ANY_TIMESTAMP);
-
- return () -> TestMetasStorageUtils.equals(storage.get(lastKey),
expectedLastEntry);
+ return () -> storage.revision() == expectedRevision;
}
@Override
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index e5930391d20..434bdaaf7d5 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -84,6 +84,7 @@ import
org.apache.ignite.internal.metastorage.impl.raft.MetaStorageSnapshotStora
import org.apache.ignite.internal.metastorage.metrics.MetaStorageMetricSource;
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import
org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker;
+import
org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker.TrackingToken;
import
org.apache.ignite.internal.metastorage.server.WatchEventHandlingCallback;
import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
@@ -1303,29 +1304,27 @@ public class MetaStorageManagerImpl implements
MetaStorageManager, MetastorageGr
long operationRevision,
Supplier<CompletableFuture<T>> readFromLeader
) {
- long trackingRevision = operationRevision == LATEST_REVISION ?
storage.revision() : operationRevision;
-
- long readOperationId =
readOperationFromLeaderForCompactionTracker.generateReadOperationId();
-
- readOperationFromLeaderForCompactionTracker.track(readOperationId,
trackingRevision);
+ TrackingToken token =
readOperationFromLeaderForCompactionTracker.track(
+ operationRevision,
+ storage::revision,
+ storage::getCompactionRevision
+ );
try {
- return readFromLeader.get().whenComplete(
- (t, throwable) ->
readOperationFromLeaderForCompactionTracker.untrack(readOperationId,
trackingRevision)
- );
+ return readFromLeader.get().whenComplete((t, throwable) ->
token.close());
} catch (Throwable t) {
-
readOperationFromLeaderForCompactionTracker.untrack(readOperationId,
trackingRevision);
+ token.close();
throw t;
}
}
private Publisher<Entry> withTrackReadOperationFromLeaderPublisher(long
operationRevision, Supplier<Publisher<Entry>> readFromLeader) {
- long trackingRevision = operationRevision == LATEST_REVISION ?
storage.revision() : operationRevision;
-
- long readOperationId =
readOperationFromLeaderForCompactionTracker.generateReadOperationId();
-
- readOperationFromLeaderForCompactionTracker.track(readOperationId,
trackingRevision);
+ TrackingToken token =
readOperationFromLeaderForCompactionTracker.track(
+ operationRevision,
+ storage::revision,
+ storage::getCompactionRevision
+ );
try {
Publisher<Entry> publisherFromLeader = readFromLeader.get();
@@ -1341,7 +1340,7 @@ public class MetaStorageManagerImpl implements
MetaStorageManager, MetastorageGr
@Override
public void cancel() {
-
readOperationFromLeaderForCompactionTracker.untrack(readOperationId,
trackingRevision);
+ token.close();
subscription.cancel();
}
@@ -1355,20 +1354,20 @@ public class MetaStorageManagerImpl implements
MetaStorageManager, MetastorageGr
@Override
public void onError(Throwable throwable) {
-
readOperationFromLeaderForCompactionTracker.untrack(readOperationId,
trackingRevision);
+ token.close();
subscriber.onError(throwable);
}
@Override
public void onComplete() {
-
readOperationFromLeaderForCompactionTracker.untrack(readOperationId,
trackingRevision);
+ token.close();
subscriber.onComplete();
}
});
} catch (Throwable t) {
-
readOperationFromLeaderForCompactionTracker.untrack(readOperationId,
trackingRevision);
+ token.close();
throw t;
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
index 0a3e9805098..980e72d41b6 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
@@ -32,8 +32,6 @@ import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Predicate;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -54,19 +52,17 @@ public abstract class AbstractKeyValueStorage implements
KeyValueStorage {
protected final IgniteLogger log = Loggers.forClass(getClass());
- protected final ReadWriteLock rwLock = new ReentrantReadWriteLock();
-
protected final FailureProcessor failureProcessor;
protected final WatchProcessor watchProcessor;
+ protected final Object watchProcessorMutex = new Object();
+
/**
* Revision listener for recovery only. Notifies {@link
MetaStorageManagerImpl} of current revisions update, {@code null} if recovery
* is complete.
- *
- * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
*/
- private @Nullable RecoveryRevisionsListener recoveryRevisionListener;
+ private volatile @Nullable RecoveryRevisionsListener
recoveryRevisionListener;
/**
* Revision. Will be incremented for each single-entry or multi-entry
update operation.
@@ -78,8 +74,6 @@ public abstract class AbstractKeyValueStorage implements
KeyValueStorage {
*
* <p>This field is used by metastorage read methods to determine whether
{@link CompactedException} should be thrown.</p>
*
- * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
- *
* <p>Field with a volatile so as not to be blocked when reading the
compaction revision by other components, for example due to long
* write operations, including the compaction itself.</p>
*/
@@ -93,22 +87,18 @@ public abstract class AbstractKeyValueStorage implements
KeyValueStorage {
* we can get into a gap when commands came from different leaders to the
same compaction revision, but we simply did not have time to
* process the update of the compaction revision from the previous leader.
This is necessary to cover corner cases with a sufficiently
* small compaction revision update interval.</p>
- *
- * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
*/
- private volatile long planedUpdateCompactionRevision = -1;
+ private volatile long plannedUpdateCompactionRevision = -1;
protected final AtomicBoolean stopCompaction = new AtomicBoolean();
- /** Tracks only cursors, since reading a single entry or a batch is done
entirely under {@link #rwLock}. */
+ /** Tracks only cursors. */
protected final ReadOperationForCompactionTracker
readOperationForCompactionTracker;
/**
* Events for notification of the {@link WatchProcessor} that were created
before the {@link #startWatches start of watches}, after the
* start of watches there will be {@code null}. Events are sorted by
{@link NotifyWatchProcessorEvent#timestamp} and are expected to
* have no duplicates.
- *
- * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
*/
protected @Nullable TreeSet<NotifyWatchProcessorEvent>
notifyWatchProcessorEventsBeforeStartingWatches = new TreeSet<>();
@@ -136,13 +126,15 @@ public abstract class AbstractKeyValueStorage implements
KeyValueStorage {
/** Returns the key revisions for operation, an empty array if not found.
*/
protected abstract long[] keyRevisionsForOperation(byte[] key);
- /** Returns key values by revision for operation. */
- protected abstract Value valueForOperation(byte[] key, long revision);
+ /**
+ * Returns key values by revision for operation.
+ *
+ * @throws CompactedException If value is compacted for this particular
revision.
+ */
+ protected abstract Value valueForOperation(byte[] key, long revision)
throws CompactedException;
/**
* Returns {@code true} if the metastorage is in the recovery state.
- *
- * <p>Method is expected to be invoked under {@link #rwLock}.</p>
*/
private boolean isInRecoveryState() {
return recoveryRevisionListener != null;
@@ -150,79 +142,40 @@ public abstract class AbstractKeyValueStorage implements
KeyValueStorage {
/**
* Returns {@code true} if the watches have {@link #startWatches started}.
- *
- * <p>Method is expected to be invoked under {@link #rwLock}.</p>
*/
protected abstract boolean areWatchesStarted();
@Override
public Entry get(byte[] key) {
- rwLock.readLock().lock();
-
- try {
- return doGet(key, rev);
- } finally {
- rwLock.readLock().unlock();
- }
+ return doGet(key, rev);
}
@Override
public Entry get(byte[] key, long revUpperBound) {
- rwLock.readLock().lock();
-
- try {
- return doGet(key, revUpperBound);
- } finally {
- rwLock.readLock().unlock();
- }
+ return doGet(key, revUpperBound);
}
@Override
public List<Entry> getAll(List<byte[]> keys) {
- rwLock.readLock().lock();
-
- try {
- return doGetAll(keys, rev);
- } finally {
- rwLock.readLock().unlock();
- }
+ return doGetAll(keys, rev);
}
@Override
public List<Entry> getAll(List<byte[]> keys, long revUpperBound) {
- rwLock.readLock().lock();
-
- try {
- return doGetAll(keys, revUpperBound);
- } finally {
- rwLock.readLock().unlock();
- }
+ return doGetAll(keys, revUpperBound);
}
@Override
public long revision() {
- rwLock.readLock().lock();
-
- try {
- return rev;
- } finally {
- rwLock.readLock().unlock();
- }
+ return rev;
}
@Override
public void saveCompactionRevision(long revision, KeyValueUpdateContext
context) {
assert revision >= 0 : revision;
+ assertCompactionRevisionLessThanCurrent(revision, rev);
- rwLock.writeLock().lock();
-
- try {
- assertCompactionRevisionLessThanCurrent(revision, rev);
-
- saveCompactionRevision(revision, context, true);
- } finally {
- rwLock.writeLock().unlock();
- }
+ saveCompactionRevision(revision, context, true);
}
protected abstract void saveCompactionRevision(long compactionRevision,
KeyValueUpdateContext context, boolean advanceSafeTime);
@@ -231,17 +184,11 @@ public abstract class AbstractKeyValueStorage implements
KeyValueStorage {
public void setCompactionRevision(long revision) {
assert revision >= 0 : revision;
- rwLock.writeLock().lock();
+ assertCompactionRevisionLessThanCurrent(revision, rev);
- try {
- assertCompactionRevisionLessThanCurrent(revision, rev);
+ compactionRevision = revision;
- compactionRevision = revision;
-
- notifyRevisionsUpdate();
- } finally {
- rwLock.writeLock().unlock();
- }
+ notifyRevisionsUpdate();
}
@Override
@@ -253,28 +200,22 @@ public abstract class AbstractKeyValueStorage implements
KeyValueStorage {
public void updateCompactionRevision(long compactionRevision,
KeyValueUpdateContext context) {
assert compactionRevision >= 0 : compactionRevision;
- rwLock.writeLock().lock();
+ assertCompactionRevisionLessThanCurrent(compactionRevision, rev);
- try {
- assertCompactionRevisionLessThanCurrent(compactionRevision, rev);
+ saveCompactionRevision(compactionRevision, context, false);
- saveCompactionRevision(compactionRevision, context, false);
+ if (isInRecoveryState()) {
+ setCompactionRevision(compactionRevision);
+ } else if (compactionRevision > plannedUpdateCompactionRevision) {
+ plannedUpdateCompactionRevision = compactionRevision;
- if (isInRecoveryState()) {
+ notifyWatchProcessor(new AdvanceSafeTimeEvent(() -> {
setCompactionRevision(compactionRevision);
- } else if (compactionRevision > planedUpdateCompactionRevision) {
- planedUpdateCompactionRevision = compactionRevision;
- notifyWatchProcessor(new AdvanceSafeTimeEvent(() -> {
- setCompactionRevision(compactionRevision);
-
- compactionRevisionUpdateListeners.forEach(listener ->
listener.onUpdate(compactionRevision));
- }, context.timestamp));
- } else if (areWatchesStarted()) {
- watchProcessor.advanceSafeTime(() -> {}, context.timestamp);
- }
- } finally {
- rwLock.writeLock().unlock();
+ compactionRevisionUpdateListeners.forEach(listener ->
listener.onUpdate(compactionRevision));
+ }, context.timestamp));
+ } else if (areWatchesStarted()) {
+ watchProcessor.advanceSafeTime(() -> {}, context.timestamp);
}
}
@@ -317,13 +258,7 @@ public abstract class AbstractKeyValueStorage implements
KeyValueStorage {
@Override
public void setRecoveryRevisionsListener(@Nullable
RecoveryRevisionsListener listener) {
- rwLock.writeLock().lock();
-
- try {
- this.recoveryRevisionListener = listener;
- } finally {
- rwLock.writeLock().unlock();
- }
+ this.recoveryRevisionListener = listener;
}
@Override
@@ -365,7 +300,7 @@ public abstract class AbstractKeyValueStorage implements
KeyValueStorage {
watchProcessor.addWatch(new Watch(rev, listener, exactPredicate));
}
- /** Notifies of revision update. Must be called under the {@link #rwLock}.
*/
+ /** Notifies of revision update. */
protected void notifyRevisionsUpdate() {
if (recoveryRevisionListener != null) {
// Listener must be invoked only on recovery, after recovery
listener must be null.
@@ -389,8 +324,8 @@ public abstract class AbstractKeyValueStorage implements
KeyValueStorage {
Value value = valueForOperation(key, revision);
- if (revUpperBound <= compactionRevision && (!isLastIndex(keyRevisions,
maxRevisionIndex) || value.tombstone())) {
- throw new CompactedException(revUpperBound, compactionRevision);
+ if (!isLastIndex(keyRevisions, maxRevisionIndex) || value.tombstone())
{
+
CompactedException.throwIfRequestedRevisionLessThanOrEqualToCompacted(revUpperBound,
compactionRevision);
}
return EntryImpl.toEntry(key, revision, value);
@@ -411,28 +346,16 @@ public abstract class AbstractKeyValueStorage implements
KeyValueStorage {
@Override
public void advanceSafeTime(KeyValueUpdateContext context) {
- rwLock.writeLock().lock();
+ setIndexAndTerm(context.index, context.term);
- try {
- setIndexAndTerm(context.index, context.term);
-
- if (areWatchesStarted()) {
- watchProcessor.advanceSafeTime(() -> {}, context.timestamp);
- }
- } finally {
- rwLock.writeLock().unlock();
+ if (areWatchesStarted()) {
+ watchProcessor.advanceSafeTime(() -> {}, context.timestamp);
}
}
@Override
public Revisions revisions() {
- rwLock.readLock().lock();
-
- try {
- return createCurrentRevisions();
- } finally {
- rwLock.readLock().unlock();
- }
+ return createCurrentRevisions();
}
private Revisions createCurrentRevisions() {
@@ -440,20 +363,24 @@ public abstract class AbstractKeyValueStorage implements
KeyValueStorage {
}
protected void notifyWatchProcessor(NotifyWatchProcessorEvent event) {
- if (areWatchesStarted()) {
- event.notify(watchProcessor);
- } else {
- boolean added =
notifyWatchProcessorEventsBeforeStartingWatches.add(event);
+ synchronized (watchProcessorMutex) {
+ if (areWatchesStarted()) {
+ event.notify(watchProcessor);
+ } else {
+ boolean added =
notifyWatchProcessorEventsBeforeStartingWatches.add(event);
- assert added : event;
+ assert added : event;
+ }
}
}
protected void drainNotifyWatchProcessorEventsBeforeStartingWatches() {
- assert !areWatchesStarted();
+ synchronized (watchProcessorMutex) {
+ assert !areWatchesStarted();
- notifyWatchProcessorEventsBeforeStartingWatches.forEach(event ->
event.notify(watchProcessor));
+ notifyWatchProcessorEventsBeforeStartingWatches.forEach(event ->
event.notify(watchProcessor));
- notifyWatchProcessorEventsBeforeStartingWatches = null;
+ notifyWatchProcessorEventsBeforeStartingWatches = null;
+ }
}
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/ReadOperationForCompactionTracker.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/ReadOperationForCompactionTracker.java
index 2760099e359..2baae964773 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/ReadOperationForCompactionTracker.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/ReadOperationForCompactionTracker.java
@@ -25,6 +25,9 @@ import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.LongSupplier;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
import org.apache.ignite.internal.tostring.IgniteToStringInclude;
import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.internal.util.CompletableFutures;
@@ -35,9 +38,8 @@ import org.apache.ignite.internal.util.CompletableFutures;
*
* <p>Expected usage:</p>
* <ul>
- * <li>Before starting execution, the reading command invoke {@link
#track} with its ID and the lowest estimation for revision upper
- * bound.
- * <li>After completion, the reading command will invoke {@link #untrack}
with the same arguments as when calling {@link #track},
+ * <li>Before starting execution, the reading command invokes {@link
#track} with the lowest estimation for revision upper bound.
+ * <li>After completion, the reading command will invoke {@link
TrackingToken#close()} on an instance returned by {@link #track},
* regardless of whether the operation was successful or not.</li>
* <li>{@link #collect} will be invoked only after a new compaction
revision has been set
* ({@link KeyValueStorage#setCompactionRevision}) for a new compaction
revision.</li>
@@ -48,56 +50,86 @@ public class ReadOperationForCompactionTracker {
private final AtomicLong longOperationIdGenerator = new AtomicLong();
- /** Generates the next read operation ID. Thread-safe. */
- public long generateReadOperationId() {
- return longOperationIdGenerator.getAndIncrement();
+ /**
+ * Token to stop tracking the read operation.
+ *
+ * @see #track(long, LongSupplier, LongSupplier)
+ */
+ @FunctionalInterface
+ public interface TrackingToken extends AutoCloseable {
+ @Override
+ void close();
}
/**
* Starts tracking the completion of a read operation on its lowest
estimation for revision upper bound.
*
- * <p>Method is expected not to be called more than once for the same
arguments.</p>
- *
* <p>Expected usage pattern:</p>
* <pre><code>
- * Object readOperationId = ...;
* int operationRevision = ...;
*
- * tracker.track(readOperationId, operationRevision);
- *
- * try {
+ * try (var token = tracker.track(operationRevision,
storage::revision, storage::getCompactionRevision)) {
* doReadOperation(...);
- * } finally {
- * tracker.untrack(readOperationId, operationRevision);
* }
* </code></pre>
*
- * @see #untrack(Object, long)
+ * <p>Or you can use an explicit {@link TrackingToken#close()} call if
execution context requires that, for example if your operation is
+ * asynchronous.
+ *
+ * @see TrackingToken
*/
- public void track(Object readOperationId, long operationRevision) {
- var key = new ReadOperationKey(readOperationId, operationRevision);
+ public TrackingToken track(
+ long operationRevision, LongSupplier latestRevision, LongSupplier
compactedRevision
+ ) throws CompactedException {
+ long operationId = longOperationIdGenerator.getAndIncrement();
+
+ while (true) {
+ // "operationRevision" parameter is immutable, because we might
need it on a next iteration.
+ // If it is asked to track the latest revision, we receive it
right here from the corresponding supplier.
+ long currentOperationRevision = operationRevision ==
MetaStorageManager.LATEST_REVISION
+ ? latestRevision.getAsLong()
+ : operationRevision;
+
+ // Value from compacted revision supplier can only grow. We only
use it for upper bound checks, so it's safe to read it every
+ // time instead of caching it. It applies to all usages of the
supplier.
+ if (currentOperationRevision <= compactedRevision.getAsLong()) {
+ // Latest revision can never be compacted. If for some reason
latest revision is concurrently updated and this revision is
+ // already compacted, we should retry until we succeed. That's
quite a lag, but it is possible in theory and in tests.
+ if (operationRevision == MetaStorageManager.LATEST_REVISION) {
+ continue;
+ }
+
+ throw new CompactedException(currentOperationRevision,
compactedRevision.getAsLong());
+ }
- CompletableFuture<Void> previous =
readOperationFutureByKey.putIfAbsent(key, new CompletableFuture<>());
+ var key = new ReadOperationKey(operationId,
currentOperationRevision);
- assert previous == null : key;
- }
+ TrackingToken trackingToken = () -> {
+ CompletableFuture<Void> removed =
readOperationFutureByKey.remove(key);
- /**
- * Stops tracking the read operation. {@code operationRevision} must match
the corresponding value from {@link #track}.
- *
- * <p>Method is expected not to be called more than once for the same
arguments, and {@link #track} was previously called for same
- * arguments.</p>
- *
- * @see #track(Object, long)
- */
- public void untrack(Object readOperationId, long operationRevision) {
- var key = new ReadOperationKey(readOperationId, operationRevision);
+ // Might be null, that's fine, "close" can be called multiple
times.
+ if (removed != null) {
+ removed.complete(null);
+ }
+ };
- CompletableFuture<Void> removed = readOperationFutureByKey.remove(key);
+ CompletableFuture<Void> operationFuture = new
CompletableFuture<>();
+ readOperationFutureByKey.put(key, operationFuture);
- assert removed != null : key;
+ // If this condition passes, it is possible that a future returned
by "collect" in a compaction thread is already completed.
+ // It is not safe to proceed with tracking, we have to invalidate
current token and either retry, or throw an exception.
+ if (currentOperationRevision <= compactedRevision.getAsLong()) {
+ trackingToken.close();
- removed.complete(null);
+ if (operationRevision == MetaStorageManager.LATEST_REVISION) {
+ continue;
+ }
+
+ throw new CompactedException(currentOperationRevision,
compactedRevision.getAsLong());
+ }
+
+ return trackingToken;
+ }
}
/**
@@ -115,11 +147,11 @@ public class ReadOperationForCompactionTracker {
private static class ReadOperationKey {
@IgniteToStringInclude
- private final Object readOperationId;
+ private final long readOperationId;
private final long operationRevision;
- private ReadOperationKey(Object readOperationId, long
operationRevision) {
+ private ReadOperationKey(long readOperationId, long operationRevision)
{
assert operationRevision >= 0 : operationRevision;
this.readOperationId = readOperationId;
@@ -137,12 +169,12 @@ public class ReadOperationForCompactionTracker {
ReadOperationKey that = (ReadOperationKey) o;
- return operationRevision == that.operationRevision &&
readOperationId.equals(that.readOperationId);
+ return operationRevision == that.operationRevision &&
readOperationId == that.readOperationId;
}
@Override
public int hashCode() {
- int result = readOperationId.hashCode();
+ int result = Long.hashCode(readOperationId);
result = 31 * result + Long.hashCode(operationRevision);
return result;
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index 7bd4add855a..566f028fb3b 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -19,6 +19,7 @@ package
org.apache.ignite.internal.metastorage.server.persistence;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
+import static
org.apache.ignite.internal.metastorage.MetaStorageManager.LATEST_REVISION;
import static
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.NOT_FOUND;
import static
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.assertCompactionRevisionLessThanCurrent;
import static
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.assertRequestedRevisionLessThanOrEqualToCurrent;
@@ -74,7 +75,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Supplier;
+import java.util.function.LongSupplier;
import org.apache.ignite.internal.components.NoOpLogSyncer;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -97,6 +98,7 @@ import
org.apache.ignite.internal.metastorage.server.KeyValueUpdateContext;
import org.apache.ignite.internal.metastorage.server.MetastorageChecksum;
import org.apache.ignite.internal.metastorage.server.NotifyWatchProcessorEvent;
import
org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker;
+import
org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker.TrackingToken;
import org.apache.ignite.internal.metastorage.server.Statement;
import org.apache.ignite.internal.metastorage.server.UpdateEntriesEvent;
import org.apache.ignite.internal.metastorage.server.UpdateOnlyRevisionEvent;
@@ -109,7 +111,6 @@ import org.apache.ignite.internal.rocksdb.RocksUtils;
import org.apache.ignite.internal.rocksdb.flush.RocksDbFlusher;
import org.apache.ignite.internal.rocksdb.snapshot.RocksSnapshotManager;
import org.apache.ignite.internal.thread.NamedThreadFactory;
-import org.apache.ignite.internal.util.ArrayUtils;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
@@ -190,6 +191,18 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
RocksDB.loadLibrary();
}
+ private final LongSupplier revSupplier = () -> rev;
+
+ private final LongSupplier compactedRevSupplier = () -> compactionRevision;
+
+ /**
+ * Only read or modifying while being synchronized on itself.
+ *
+ * @see #writeBatch(WriteBatch)
+ * @see #writeCompactedBatch(List, WriteBatch)
+ */
+ private final WriteBatchProtector writeBatchProtector = new
WriteBatchProtector();
+
/** Executor for storage operations. */
private final ExecutorService executor;
@@ -229,10 +242,8 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
/**
* Facility to work with checksums.
- *
- * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
*/
- private MetastorageChecksum checksum;
+ private volatile MetastorageChecksum checksum;
/** Status of the watch recovery process. */
private enum RecoveryStatus {
@@ -251,8 +262,6 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
* Current list of updated entries.
*
* <p>Since this list gets read and updated only on writes (under a write
lock), no extra synchronisation is needed.</p>
- *
- * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
*/
private final UpdatedEntries updatedEntries = new UpdatedEntries();
@@ -261,13 +270,10 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
/**
* Write options used to write to RocksDB.
- *
- * <p>Access is guarded by {@link #rwLock}.
*/
- private WriteOptions writeOptions;
+ private volatile WriteOptions writeOptions;
- /** Multi-threaded access is guarded by {@link #rwLock}. */
- private RocksDbFlusher flusher;
+ private volatile RocksDbFlusher flusher;
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
@@ -311,8 +317,6 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
}
private void startBusy() {
- rwLock.writeLock().lock();
-
try {
Files.createDirectories(dbPath);
@@ -321,8 +325,6 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
closeRocksResources();
throw new MetaStorageException(STARTING_STORAGE_ERR, "Failed to
start the storage", e);
- } finally {
- rwLock.writeLock().unlock();
}
}
@@ -489,12 +491,7 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
IgniteUtils.shutdownAndAwaitTermination(executor, 10,
TimeUnit.SECONDS);
- rwLock.writeLock().lock();
- try {
- closeRocksResources();
- } finally {
- rwLock.writeLock().unlock();
- }
+ closeRocksResources();
}
private void closeRocksResources() {
@@ -505,22 +502,15 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
@Override
public CompletableFuture<Void> snapshot(Path snapshotPath) {
- rwLock.writeLock().lock();
-
- try {
- return snapshotManager
- .createSnapshot(snapshotPath)
- .thenCompose(unused -> flush());
- } finally {
- rwLock.writeLock().unlock();
- }
+ return snapshotManager
+ .createSnapshot(snapshotPath)
+ .thenCompose(unused -> flush());
}
@Override
public void restoreSnapshot(Path path) {
- rwLock.writeLock().lock();
-
try {
+ // TODO IGNITE-25213 Ensure that this doesn't happen on a running
node.
clear();
snapshotManager.restoreSnapshot(path);
@@ -538,15 +528,25 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
throw e;
} catch (Exception e) {
throw new MetaStorageException(RESTORING_STORAGE_ERR, "Failed to
restore snapshot", e);
- } finally {
- rwLock.writeLock().unlock();
}
}
@Override
- public void put(byte[] key, byte[] value, KeyValueUpdateContext context) {
- rwLock.writeLock().lock();
+ public Entry get(byte[] key) {
+ try (TrackingToken token =
readOperationForCompactionTracker.track(LATEST_REVISION, revSupplier,
compactedRevSupplier)) {
+ return super.get(key);
+ }
+ }
+
+ @Override
+ public List<Entry> getAll(List<byte[]> keys) {
+ try (TrackingToken token =
readOperationForCompactionTracker.track(LATEST_REVISION, revSupplier,
compactedRevSupplier)) {
+ return super.getAll(keys);
+ }
+ }
+ @Override
+ public void put(byte[] key, byte[] value, KeyValueUpdateContext context) {
try (WriteBatch batch = new WriteBatch()) {
long newChecksum = checksum.wholePut(key, value);
@@ -559,8 +559,6 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
completeAndWriteBatch(batch, curRev, context, newChecksum);
} catch (RocksDBException e) {
throw new MetaStorageException(OP_EXECUTION_ERR, e);
- } finally {
- rwLock.writeLock().unlock();
}
}
@@ -590,30 +588,22 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
@Override
public void saveConfiguration(byte[] configuration, long lastAppliedIndex,
long lastAppliedTerm) {
- rwLock.writeLock().lock();
-
try (WriteBatch batch = new WriteBatch()) {
data.put(batch, INDEX_AND_TERM_KEY, longsToBytes(0,
lastAppliedIndex, lastAppliedTerm));
data.put(batch, CONFIGURATION_KEY, configuration);
- db.write(writeOptions, batch);
+ writeBatch(batch);
} catch (RocksDBException e) {
throw new MetaStorageException(OP_EXECUTION_ERR, e);
- } finally {
- rwLock.writeLock().unlock();
}
}
@Override
public byte @Nullable [] getConfiguration() {
- rwLock.readLock().lock();
-
try {
return data.get(CONFIGURATION_KEY);
} catch (RocksDBException e) {
throw new MetaStorageException(OP_EXECUTION_ERR, e);
- } finally {
- rwLock.readLock().unlock();
}
}
@@ -666,9 +656,9 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
addIndexAndTermToWriteBatch(batch, context);
- db.write(writeOptions, batch);
+ writeBatch(batch);
+ rev = newRev; // Done.
- rev = newRev;
checksum.commitRound(newChecksum);
updatedEntries.ts = ts;
@@ -704,8 +694,6 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
@Override
public void putAll(List<byte[]> keys, List<byte[]> values,
KeyValueUpdateContext context) {
- rwLock.writeLock().lock();
-
try (WriteBatch batch = new WriteBatch()) {
long newChecksum = checksum.wholePutAll(keys, values);
@@ -720,15 +708,11 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
completeAndWriteBatch(batch, curRev, context, newChecksum);
} catch (RocksDBException e) {
throw new MetaStorageException(OP_EXECUTION_ERR, e);
- } finally {
- rwLock.writeLock().unlock();
}
}
@Override
public void remove(byte[] key, KeyValueUpdateContext context) {
- rwLock.writeLock().lock();
-
try (WriteBatch batch = new WriteBatch()) {
long newChecksum = checksum.wholeRemove(key);
@@ -741,15 +725,11 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
completeAndWriteBatch(batch, curRev, context, newChecksum);
} catch (RocksDBException e) {
throw new MetaStorageException(OP_EXECUTION_ERR, e);
- } finally {
- rwLock.writeLock().unlock();
}
}
@Override
public void removeAll(List<byte[]> keys, KeyValueUpdateContext context) {
- rwLock.writeLock().lock();
-
try (WriteBatch batch = new WriteBatch()) {
long newChecksum = checksum.wholeRemoveAll(keys);
@@ -770,15 +750,11 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
completeAndWriteBatch(batch, curRev, context, newChecksum);
} catch (RocksDBException e) {
throw new MetaStorageException(OP_EXECUTION_ERR, e);
- } finally {
- rwLock.writeLock().unlock();
}
}
@Override
public void removeByPrefix(byte[] prefix, KeyValueUpdateContext context) {
- rwLock.writeLock().lock();
-
try (
WriteBatch batch = new WriteBatch();
Cursor<Entry> entryCursor = range(prefix, nextKey(prefix))
@@ -796,8 +772,6 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
completeAndWriteBatch(batch, curRev, context,
checksum.wholeRemoveByPrefix(prefix));
} catch (RocksDBException e) {
throw new MetaStorageException(OP_EXECUTION_ERR, e);
- } finally {
- rwLock.writeLock().unlock();
}
}
@@ -809,8 +783,6 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
KeyValueUpdateContext context,
CommandId commandId
) {
- rwLock.writeLock().lock();
-
try {
Entry[] entries =
getAll(Arrays.asList(condition.keys())).toArray(new Entry[]{});
@@ -826,15 +798,11 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
return branch;
} catch (RocksDBException e) {
throw new MetaStorageException(OP_EXECUTION_ERR, e);
- } finally {
- rwLock.writeLock().unlock();
}
}
@Override
public StatementResult invoke(If iif, KeyValueUpdateContext context,
CommandId commandId) {
- rwLock.writeLock().lock();
-
try {
If currIf = iif;
@@ -868,8 +836,6 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
}
} catch (RocksDBException e) {
throw new MetaStorageException(OP_EXECUTION_ERR, e);
- } finally {
- rwLock.writeLock().unlock();
}
}
@@ -928,24 +894,12 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
@Override
public Cursor<Entry> range(byte[] keyFrom, byte @Nullable [] keyTo) {
- rwLock.readLock().lock();
-
- try {
- return doRange(keyFrom, keyTo, rev);
- } finally {
- rwLock.readLock().unlock();
- }
+ return doRange(keyFrom, keyTo, rev);
}
@Override
public Cursor<Entry> range(byte[] keyFrom, byte @Nullable [] keyTo, long
revUpperBound) {
- rwLock.readLock().lock();
-
- try {
- return doRange(keyFrom, keyTo, revUpperBound);
- } finally {
- rwLock.readLock().unlock();
- }
+ return doRange(keyFrom, keyTo, revUpperBound);
}
@Override
@@ -954,9 +908,7 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
long currentRevision;
- rwLock.readLock().lock();
-
- try {
+ synchronized (watchProcessorMutex) {
watchProcessor.setWatchEventHandlingCallback(callback);
currentRevision = rev;
@@ -969,17 +921,13 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
// If revision is not 0, we need to replay updates that match
the existing data.
recoveryStatus.set(RecoveryStatus.IN_PROGRESS);
}
- } finally {
- rwLock.readLock().unlock();
}
if (currentRevision != 0) {
Set<UpdateEntriesEvent> updateEntriesEvents =
collectUpdateEntriesEventsFromStorage(startRevision, currentRevision);
Set<UpdateOnlyRevisionEvent> updateOnlyRevisionEvents =
collectUpdateRevisionEventsFromStorage(startRevision, currentRevision);
- rwLock.writeLock().lock();
-
- try {
+ synchronized (watchProcessorMutex) {
notifyWatchProcessorEventsBeforeStartingWatches.addAll(updateEntriesEvents);
// Adds events for which there were no entries updates but the
revision was updated.
notifyWatchProcessorEventsBeforeStartingWatches.addAll(updateOnlyRevisionEvents);
@@ -987,8 +935,6 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
drainNotifyWatchProcessorEventsBeforeStartingWatches();
recoveryStatus.set(RecoveryStatus.DONE);
- } finally {
- rwLock.writeLock().unlock();
}
}
}
@@ -1001,10 +947,6 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
compactKeys(revision);
compactAuxiliaryMappings(revision);
-
- // Compaction might have created a lot of tombstones in column
families, which affect scan speed. Removing them makes next
- // compaction faster, as well as other scans in general.
- db.compactRange();
} catch (Throwable t) {
throw new MetaStorageException(COMPACTION_ERR, "Error during
compaction: " + revision, t);
}
@@ -1041,12 +983,13 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
* Compacts the key, see the documentation of {@link
KeyValueStorage#compact} for examples.
*
* @param batch Write batch.
+ * @param batchKeys A list of keys, in which this method will add keys
that it compacted.
* @param key Target key.
* @param revs Key revisions.
* @param compactionRevision Revision up to which (inclusively) the key
will be compacted.
* @throws MetaStorageException If failed.
*/
- private void compactForKey(WriteBatch batch, byte[] key, long[] revs, long
compactionRevision) {
+ private void compactForKey(WriteBatch batch, List<byte[]> batchKeys,
byte[] key, long[] revs, long compactionRevision) {
try {
int indexToCompact = indexToCompact(revs, compactionRevision,
revision -> isTombstoneForCompaction(key, revision));
@@ -1054,6 +997,8 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
return;
}
+ batchKeys.add(key);
+
for (int revisionIndex = 0; revisionIndex <= indexToCompact;
revisionIndex++) {
// This revision is not needed anymore, remove data.
data.delete(batch, keyToRocksKey(revs[revisionIndex], key));
@@ -1290,13 +1235,12 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
rocksIterator.status();
- byte[] tsValue = rocksIterator.value();
-
- if (tsValue.length == 0) {
+ if (!rocksIterator.isValid()) {
+ // No previous value means that it got deleted, or never
existed in the first place.
throw new CompactedException("Revisions less than or equal to
the requested one are already compacted: " + timestamp);
}
- return bytesToLong(tsValue);
+ return bytesToLong(rocksIterator.value());
} catch (RocksDBException e) {
throw new MetaStorageException(OP_EXECUTION_ERR, e);
}
@@ -1361,7 +1305,7 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
addIndexAndTermToWriteBatch(batch, context);
- db.write(writeOptions, batch);
+ writeBatch(batch);
if (advanceSafeTime && areWatchesStarted()) {
watchProcessor.advanceSafeTime(() -> {}, context.timestamp);
@@ -1384,18 +1328,17 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
@Override
public ChecksumAndRevisions checksumAndRevisions(long revision) {
- rwLock.readLock().lock();
-
try {
+ // "rev" is the last thing that gets updated, so it has to be read
first.
+ long currentRevision = rev;
+
return new ChecksumAndRevisions(
checksumByRevisionOrZero(revision),
minChecksummedRevisionOrZero(),
- rev
+ currentRevision
);
} catch (RocksDBException e) {
throw new MetaStorageException(INTERNAL_ERR, "Cannot get checksum
by revision: " + revision, e);
- } finally {
- rwLock.readLock().unlock();
}
}
@@ -1418,8 +1361,6 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
@Override
public void clear() {
- rwLock.readLock().lock();
-
try {
// There's no way to easily remove all data from RocksDB, so we
need to re-create it from scratch.
closeRocksResources();
@@ -1434,71 +1375,80 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
createDb();
} catch (Exception e) {
throw new MetaStorageException(RESTORING_STORAGE_ERR, "Failed to
restore snapshot", e);
- } finally {
- rwLock.readLock().unlock();
}
}
private void compactKeys(long compactionRevision) throws RocksDBException {
- compactInBatches(index, (key, value, batch) -> {
- compactForKey(batch, key, getAsLongs(value.get()),
compactionRevision);
+ assertCompactionRevisionLessThanCurrent(this.compactionRevision, rev);
- return true;
- });
- }
-
- private void compactAuxiliaryMappings(long compactionRevision) throws
RocksDBException {
- compactInBatches(revisionToTs, (key, value, batch) -> {
- long revision = bytesToLong(key);
+ // Clear bloom filter before opening iterator, so that we don't have
collisions right from the start.
+ synchronized (writeBatchProtector) {
+ writeBatchProtector.clear();
+ }
- if (revision > compactionRevision) {
- return false;
- }
+ if (!busyLock.enterBusy()) {
+ return;
+ }
- revisionToTs.delete(batch, key);
- tsToRevision.delete(batch, value.get());
+ try (RocksIterator iterator = index.newIterator()) {
+ byte[] key = null;
+ List<byte[]> batchKeys = new ArrayList<>(COMPACT_BATCH_SIZE);
- revisionToChecksum.delete(batch, key);
+ iterator.seekToFirst();
+ while (iterator.isValid()) {
+ try (WriteBatch batch = new WriteBatch()) {
+ byte[] retryPositionKey = key;
- return true;
- });
- }
+ batchKeys.clear();
+ for (int i = 0; i < COMPACT_BATCH_SIZE &&
iterator.isValid(); i++, iterator.next()) {
+ if (stopCompaction.get()) {
+ return;
+ }
- @FunctionalInterface
- private interface CompactionAction {
- /**
- * Performs compaction on the storage at the current iterator pointer.
Returns {@code true} if it is necessary to continue
- * iterating, {@link false} if it is necessary to finish with writing
the last batch.
- */
- boolean compact(byte[] key, Supplier<byte[]> value, WriteBatch batch)
throws RocksDBException;
- }
+ // Throw an exception if something went wrong.
+ iterator.status();
- private void compactInBatches(ColumnFamily columnFamily, CompactionAction
compactionAction) throws RocksDBException {
- try (RocksIterator iterator = columnFamily.newIterator()) {
- boolean continueIterating = true;
+ key = iterator.key();
- byte[] key = null;
+ compactForKey(batch, batchKeys, key,
getAsLongs(iterator.value()), compactionRevision);
+ }
- while (continueIterating) {
- rwLock.writeLock().lock();
+ if (!writeCompactedBatch(batchKeys, batch)) {
+ key = retryPositionKey;
- try (WriteBatch batch = new WriteBatch()) {
- // We must refresh the iterator while holding write lock,
because iterator state might be outdated due to its snapshot
- // isolation.
- if (!refreshIterator(iterator, key)) {
- break;
+ // Refreshing the iterator is absolutely crucial. We
have determined that data has been modified externally,
+ // current snapshot in the iterator is invalid.
+ refreshIterator(iterator, key);
}
+ }
+ }
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ private void compactAuxiliaryMappings(long compactionRevision) throws
RocksDBException {
+ assertCompactionRevisionLessThanCurrent(compactionRevision, rev);
-
assertCompactionRevisionLessThanCurrent(compactionRevision, rev);
+ if (!busyLock.enterBusy()) {
+ return;
+ }
+
+ try (RocksIterator iterator = revisionToTs.newIterator()) {
+ boolean continueIterating = true;
+ iterator.seekToFirst();
+ while (continueIterating && iterator.isValid()) {
+ try (WriteBatch batch = new WriteBatch()) {
for (int i = 0; i < COMPACT_BATCH_SIZE &&
iterator.isValid(); i++, iterator.next()) {
if (stopCompaction.get()) {
return;
}
- key = iterator.key();
+ // Throw an exception if something went wrong.
+ iterator.status();
- if (!compactionAction.compact(key, iterator::value,
batch)) {
+ if (!deleteAuxiliaryMapping(compactionRevision,
iterator, batch)) {
continueIterating = false;
break;
@@ -1506,16 +1456,103 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
}
db.write(writeOptions, batch);
- } finally {
- rwLock.writeLock().unlock();
}
}
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ private boolean deleteAuxiliaryMapping(long compactionRevision,
RocksIterator iterator, WriteBatch batch) throws RocksDBException {
+ byte[] key = iterator.key();
+ long revision = bytesToLong(key);
- iterator.status();
+ if (revision > compactionRevision) {
+ return false;
}
+
+ revisionToTs.delete(batch, key);
+ tsToRevision.delete(batch, iterator.value());
+
+ revisionToChecksum.delete(batch, key);
+
+ return true;
}
- private static boolean refreshIterator(RocksIterator iterator, byte
@Nullable [] key) throws RocksDBException {
+ /**
+ * Writes the batch to the database in the {@code FSM} thread. By the time
this method is called, {@link #updatedEntries} must be in a
+ * state that corresponds to the current batch. This collection will be
used to mark the entries as updated in
+ * {@link #writeBatchProtector}.
+ *
+ * <p>No other actions besides calling {@link
WriteBatchProtector#onUpdate(byte[])} and {@link RocksDB#write(WriteOptions,
WriteBatch)}
+ * are performed here. They're both executed while holding {@link
#writeBatchProtector}'s monitor in order to synchronise
+ * {@link #writeBatchProtector} modifications.
+ *
+ * <p>Since there's a gap between reading the data and writing batch into
the storage, it is possible that compaction thread had
+ * concurrently updated the data that we're modifying. The only real side
effect of such a race will be a presence of already deleted
+ * revisions in revisions list, associated with any particular key (see
{@link RocksDbKeyValueStorage#keyRevisionsForOperation(byte[])}.
+ *
+ * <p>We ignore this side effect, because retrying the operation in {@code
FSM} thread is too expensive, and because there's really no
+ * harm in having some obsolete revisions there. We always keep in mind
that a particular revision might already be compacted when we
+ * read data.
+ *
+ * @param batch RockDB's {@link WriteBatch}.
+ * @throws RocksDBException If {@link RocksDB#write(WriteOptions,
WriteBatch)} threw an exception.
+ * @see #writeCompactedBatch(List, WriteBatch)
+ */
+ private void writeBatch(WriteBatch batch) throws RocksDBException {
+ synchronized (writeBatchProtector) {
+ for (Entry updatedEntry : updatedEntries.updatedEntries) {
+ writeBatchProtector.onUpdate(updatedEntry.key());
+ }
+
+ db.write(writeOptions, batch);
+ }
+ }
+
+ /**
+ * Writes the batch to the database in the compaction thread.
+ *
+ * <p>No other actions besides accessing {@link #writeBatchProtector} and
calling {@link RocksDB#write(WriteOptions, WriteBatch)}
+ * are performed here. They're both executed while holding {@link
#writeBatchProtector}'s monitor in order to synchronise potential
+ * {@link #writeBatchProtector} modifications.
+ *
+ * <p>Since there's a gap between reading the data and writing batch into
the storage, it is possible that {@code FSM} thread had
+ * concurrently updated the data that we're compacting. Such a race would
mean that there are unaccounted revisions in revisions list,
+ * associated with keys from {@code batchKeys} (see {@link
RocksDbKeyValueStorage#keyRevisionsForOperation(byte[])}.
+ *
+ * <p>For that reason, unlike {@link #writeBatch(WriteBatch)}, here we
don't have a privilege of just writing the batch in case of race.
+ * It would lead to data loss. In order to avoid it, we probabilistically
check if data that we're compacting has been modified
+ * concurrently. If it certainly wasn't, we proceed and return {@code
true}. If we're not sure, we abort this batch and return
+ * {@code false}.
+ *
+ * <p>If we return {@code false}, we also clear the {@link
#writeBatchProtector} to avoid blocking the next batch. It is important to
+ * remember that we'll get false-negative results sometimes if we have
hash collisions. That's fine.
+ *
+ * @param batchKeys Meta-storage keys that have been compacted in this
batch.
+ * @param batch RockDB's {@link WriteBatch}.
+ * @return {@code true} if writing succeeded, {@code false} if compaction
round must be retried due to concurrent storage update.
+ * @throws RocksDBException If {@link RocksDB#write(WriteOptions,
WriteBatch)} threw an exception.
+ *
+ * @see #writeBatch(WriteBatch)
+ */
+ private boolean writeCompactedBatch(List<byte[]> batchKeys, WriteBatch
batch) throws RocksDBException {
+ synchronized (writeBatchProtector) {
+ for (byte[] key : batchKeys) {
+ if (writeBatchProtector.maybeUpdated(key)) {
+ writeBatchProtector.clear();
+
+ return false;
+ }
+ }
+
+ db.write(writeOptions, batch);
+ }
+
+ return true;
+ }
+
+ private static void refreshIterator(RocksIterator iterator, byte @Nullable
[] key) throws RocksDBException {
iterator.refresh();
if (key == null) {
@@ -1535,8 +1572,6 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
// Check for errors.
iterator.status();
-
- return iterator.isValid();
}
private boolean isTombstone(byte[] key, long revision) throws
RocksDBException {
@@ -1575,11 +1610,7 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
@Override
protected Value valueForOperation(byte[] key, long revision) {
- Value value = getValueForOperationNullable(key, revision);
-
- assert value != null : "key=" + toUtf8String(key) + ", revision=" +
revision;
-
- return value;
+ return getValueForOperation(key, revision);
}
@Override
@@ -1587,13 +1618,18 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
return recoveryStatus.get() == RecoveryStatus.DONE;
}
- private @Nullable Value getValueForOperationNullable(byte[] key, long
revision) {
+ private Value getValueForOperation(byte[] key, long revision) {
try {
byte[] valueBytes = data.get(keyToRocksKey(revision, key));
- assert valueBytes != null && valueBytes.length != 0 : "key=" +
toUtf8String(key) + ", revision=" + revision;
+ if (valueBytes == null) {
+ assert revision <= compactionRevision
+ : "key=" + toUtf8String(key) + ", revision=" +
revision + ", compactionRevision=" + compactionRevision;
- return ArrayUtils.nullOrEmpty(valueBytes) ? null :
bytesToValue(valueBytes);
+ throw new CompactedException(revision, compactionRevision);
+ }
+
+ return bytesToValue(valueBytes);
} catch (RocksDBException e) {
throw new MetaStorageException(
OP_EXECUTION_ERR,
@@ -1606,11 +1642,6 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
private Cursor<Entry> doRange(byte[] keyFrom, byte @Nullable [] keyTo,
long revUpperBound) {
assert revUpperBound >= 0 : revUpperBound;
-
CompactedException.throwIfRequestedRevisionLessThanOrEqualToCompacted(revUpperBound,
compactionRevision);
-
- long readOperationId =
readOperationForCompactionTracker.generateReadOperationId();
- readOperationForCompactionTracker.track(readOperationId,
revUpperBound);
-
var readOpts = new ReadOptions();
Slice upperBound = keyTo == null ? null : new Slice(keyTo);
@@ -1621,6 +1652,16 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
iterator.seek(keyFrom);
+ TrackingToken token;
+ try {
+ token = readOperationForCompactionTracker.track(revUpperBound,
this::revision, this::getCompactionRevision);
+ } catch (Throwable e) {
+ // Revision might have been compacted already, in which case we
must free the resources.
+ RocksUtils.closeAll(iterator, upperBound, readOpts);
+
+ throw e;
+ }
+
return new RocksIteratorAdapter<>(iterator) {
/** Cached entry used to filter "empty" values. */
private @Nullable Entry next;
@@ -1672,10 +1713,9 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
}
long revision = keyRevisions[maxRevisionIndex];
- Value value = getValueForOperationNullable(key, revision);
+ Value value = getValueForOperation(key, revision);
- // Value may be null if the compaction has removed it in
parallel.
- if (value == null || value.tombstone()) {
+ if (value.tombstone()) {
return EntryImpl.empty(key);
}
@@ -1684,7 +1724,7 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
@Override
public void close() {
- readOperationForCompactionTracker.untrack(readOperationId,
revUpperBound);
+ token.close();
super.close();
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/WriteBatchProtector.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/WriteBatchProtector.java
new file mode 100644
index 00000000000..1def45f50fa
--- /dev/null
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/WriteBatchProtector.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.persistence;
+
+import java.util.Arrays;
+import org.apache.ignite.internal.util.HashUtils;
+
+/**
+ * Simple bloom-filter-based utility class to be used to cooperate compaction
thread with an FSM thread. For a full explanation please
+ * consider reading comments in the {@link
RocksDbKeyValueStorage#compact(long)} implementation.
+ */
+class WriteBatchProtector {
+ /**
+ * Signifies that there are 2^14 bits in this bloom filter, which is
roughly equal to 16 thousand. This is a reasonably large value.
+ */
+ private static final int BLOOM_BITS = 14;
+ private static final int BLOOM_MASK = (1 << BLOOM_BITS) - 1;
+
+ // Constants for least significant bits (LSB) of hash values. These bits
would point to individual bit positions in "long" values.
+ private static final int LSB_BITS =
Integer.numberOfTrailingZeros(Long.SIZE);
+ private static final int LSB_MASK = Long.SIZE - 1;
+
+ /** Bit-set. */
+ private final long[] bloom = new long[1 << BLOOM_BITS >>> LSB_BITS];
+
+ /**
+ * Called when {@code key} is updated in the storage. Immediate
consecutive call of {@code maybeUpdated(key)} will return {@code true}.
+ */
+ public void onUpdate(byte[] key) {
+ int h = hash(key);
+
+ int msb = indexInsideArray(h);
+ int lsb = indexInsideLongValue(h);
+ bloom[msb] |= singleBitInLong(lsb);
+ }
+
+ /**
+ * Checks if the {@code key} was probably updated in the storage. Returns
{@code false} if it was definitely not updated. Returns
+ * {@code true} if the answer is unknown.
+ */
+ public boolean maybeUpdated(byte[] key) {
+ int h = hash(key);
+
+ int msb = indexInsideArray(h);
+ int lsb = indexInsideLongValue(h);
+ return (bloom[msb] & singleBitInLong(lsb)) != 0L;
+ }
+
+ private static int indexInsideArray(int h) {
+ return h >>> LSB_BITS;
+ }
+
+ private static int indexInsideLongValue(int h) {
+ return h & LSB_MASK;
+ }
+
+ private static long singleBitInLong(int lsb) {
+ return 1L << lsb;
+ }
+
+ /**
+ * Clears all the information about previous calls of {@link
#onUpdate(byte[])}. All calls of {@link #maybeUpdated(byte[])} will return
+ * {@code false} after this call.
+ */
+ public void clear() {
+ Arrays.fill(bloom, 0L);
+ }
+
+ private static int hash(byte[] key) {
+ return HashUtils.hash32(key) & BLOOM_MASK;
+ }
+}
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
index dbb6d8f19ed..374b3254085 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
@@ -535,6 +535,7 @@ public abstract class AbstractCompactionKeyValueStorageTest
extends AbstractKeyV
assertDoesNotThrowCompactedExceptionForGetSingleValue(BAR_KEY, 5);
storage.setCompactionRevision(5);
+ // TODO IGNITE-25212 Consider expecting a tombstone here.
assertThrowsCompactedExceptionForGetSingleValue(BAR_KEY, 5);
assertDoesNotThrowCompactedExceptionForGetSingleValue(BAR_KEY, 6);
@@ -959,12 +960,50 @@ public abstract class
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
}
}
+ @Test
+ void testConcurrentUpdateAndCompaction() {
+ KeyValueUpdateContext context = kvContext(hybridTimestamp(10L));
+
+ for (int i = 0; i < 500; i++) {
+ byte[] key = key(i);
+ byte[] value = keyValue(i, i);
+
+ storage.put(key, value, context);
+ long revision = storage.revision();
+ storage.remove(key, context);
+
+ runRace(
+ () -> {
+ storage.setCompactionRevision(revision);
+ storage.compact(revision);
+ },
+ () -> {
+ // We update the same value in order to cause a race
in "writeBatch" method, that would leave already compacted
+ // revisions in a list of revisions associated with
this key.
+ storage.put(key, value, context);
+ }
+ );
+
+ try {
+ Entry entry = storage.get(key, revision);
+
+ assertFalse(entry.empty());
+ assertEquals(revision, entry.revision());
+ assertArrayEquals(value, entry.value());
+ } catch (CompactedException ignore) {
+ // This is expected.
+ }
+
+ storage.remove(key, context);
+ }
+ }
+
@Test
void testConcurrentReadAllAndCompaction() {
KeyValueUpdateContext context = kvContext(hybridTimestamp(10L));
int numberOfKeys = 15;
- for (int i = 0; i < 100; i++) {
+ for (int i = 0; i < 800; i++) {
List<byte[]> keys = new ArrayList<>();
List<byte[]> values = new ArrayList<>();
for (int j = 0; j < numberOfKeys; j++) {
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/ReadOperationForCompactionTrackerTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/ReadOperationForCompactionTrackerTest.java
index 9476d69d02a..62eb6c5d1aa 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/ReadOperationForCompactionTrackerTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/ReadOperationForCompactionTrackerTest.java
@@ -17,12 +17,19 @@
package org.apache.ignite.internal.metastorage.server;
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static
org.apache.ignite.internal.metastorage.MetaStorageManager.LATEST_REVISION;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
+import
org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker.TrackingToken;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.junit.jupiter.api.Test;
/** For {@link ReadOperationForCompactionTracker} testing. */
@@ -38,62 +45,114 @@ public class ReadOperationForCompactionTrackerTest {
@Test
void testTrackAndUntrack() {
- UUID readOperation0 = UUID.randomUUID();
- UUID readOperation1 = UUID.randomUUID();
+ assertThrows(CompactedException.class, () -> tracker.track(0, () ->
0L, () -> 0L));
- long operationRevision0 = 1;
- long operationRevision1 = 2;
+ TrackingToken token1 = tracker.track(1, () -> 0L, () -> 0L);
- assertDoesNotThrow(() -> tracker.track(readOperation0,
operationRevision0));
- assertDoesNotThrow(() -> tracker.track(readOperation0,
operationRevision1));
- assertDoesNotThrow(() -> tracker.track(readOperation1,
operationRevision0));
- assertDoesNotThrow(() -> tracker.track(readOperation1,
operationRevision1));
-
- assertDoesNotThrow(() -> tracker.untrack(readOperation0,
operationRevision0));
- assertDoesNotThrow(() -> tracker.untrack(readOperation0,
operationRevision1));
- assertDoesNotThrow(() -> tracker.untrack(readOperation1,
operationRevision0));
- assertDoesNotThrow(() -> tracker.untrack(readOperation1,
operationRevision1));
-
- // Let's check that after untrack we can do track again for the
previous arguments.
- assertDoesNotThrow(() -> tracker.track(readOperation0,
operationRevision0));
- assertDoesNotThrow(() -> tracker.untrack(readOperation0,
operationRevision0));
+ token1.close();
}
@Test
void testTrackUntrackAndCollect() {
- UUID readOperation0 = UUID.randomUUID();
- UUID readOperation1 = UUID.randomUUID();
-
- long operationRevision0 = 1;
- long operationRevision1 = 2;
-
- tracker.track(readOperation0, operationRevision0);
- tracker.track(readOperation1, operationRevision0);
+ TrackingToken token1 = tracker.track(1, () -> 0L, () -> 0L);
+ TrackingToken token2 = tracker.track(2, () -> 0L, () -> 0L);
assertTrue(tracker.collect(0).isDone());
CompletableFuture<Void> collectFuture1 = tracker.collect(1);
- assertFalse(collectFuture1.isDone());
+ CompletableFuture<Void> collectFuture2 = tracker.collect(2);
- tracker.untrack(readOperation0, operationRevision0);
assertFalse(collectFuture1.isDone());
-
- tracker.untrack(readOperation1, operationRevision0);
- assertTrue(collectFuture1.isDone());
-
- tracker.track(readOperation0, operationRevision1);
- tracker.track(readOperation1, operationRevision1);
-
- assertTrue(tracker.collect(0).isDone());
- assertTrue(tracker.collect(1).isDone());
-
- CompletableFuture<Void> collectFuture2 = tracker.collect(2);
assertFalse(collectFuture2.isDone());
- tracker.untrack(readOperation1, operationRevision1);
+ token1.close();
+ assertTrue(collectFuture1.isDone());
assertFalse(collectFuture2.isDone());
- tracker.untrack(readOperation0, operationRevision1);
+ token2.close();
+ assertTrue(collectFuture1.isDone());
assertTrue(collectFuture2.isDone());
}
+
+ /**
+ * Tests that concurrent update and compaction doesn't break reading the
"latest" revision of data.
+ */
+ @Test
+ void testTrackLatestRevision() {
+ for (int i = 0; i < 10_000; i++) {
+ AtomicLong latestRevision = new AtomicLong(2);
+ AtomicLong compactedRevision = new AtomicLong(1);
+
+ AtomicReference<TrackingToken> token = new AtomicReference<>();
+
+ IgniteTestUtils.runRace(
+ () -> token.set(tracker.track(LATEST_REVISION,
latestRevision::get, compactedRevision::get)),
+ () -> {
+ latestRevision.set(3);
+ compactedRevision.set(2);
+
+ // This one is tricky. If we identify that there are
no tracked futures after the moment of setting compaction
+ // revision to "2", it should mean that concurrent
tracking should end up creating the future for revision "3".
+ // Here we validate that there are no data races for
such a scenario, by re-checking the list of tracked futures.
+ // If something has been added there by a concurrent
thread, it should soon be completed before a retry.
+ if (tracker.collect(2).isDone()) {
+ assertThat(tracker.collect(2), willSucceedFast());
+ }
+ }
+ );
+
+ assertFalse(tracker.collect(3).isDone());
+
+ token.get().close();
+
+ assertTrue(tracker.collect(3).isDone());
+ }
+ }
+
+ /**
+ * Tests that concurrent compaction either leads to {@link
CompactedException}, or just works. There should be no leaks or races.
+ */
+ @Test
+ void testConcurrentCompact() {
+ for (int i = 0; i < 10_000; i++) {
+ AtomicLong latestRevision = new AtomicLong(2);
+ AtomicLong compactedRevision = new AtomicLong(1);
+
+ AtomicReference<TrackingToken> token = new AtomicReference<>();
+
+ IgniteTestUtils.runRace(
+ () -> {
+ try {
+ token.set(tracker.track(2, latestRevision::get,
compactedRevision::get));
+ } catch (CompactedException ignore) {
+ // No-op.
+ }
+ },
+ () -> {
+ latestRevision.set(3);
+ compactedRevision.set(2);
+
+ // This one is tricky. If we identify that there are
no tracked futures after the moment of setting compaction
+ // revision to "2", it should mean that concurrent
tracking should end up end up throwing CompactedException.
+ // Here we validate that there are no data races for
such a scenario, by re-checking the list of tracked futures.
+ // If something has been added there by a concurrent
thread, it should soon be completed before failing.
+ if (tracker.collect(2).isDone()) {
+ assertThat(tracker.collect(2), willSucceedFast());
+ }
+ }
+ );
+
+ if (token.get() == null) {
+ // CompactedException case.
+ assertTrue(tracker.collect(2).isDone());
+ } else {
+ // Successful tracking case.
+ assertFalse(tracker.collect(2).isDone());
+
+ token.get().close();
+
+ assertTrue(tracker.collect(2).isDone());
+ }
+ }
+ }
}
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/persistence/WriteBatchProtectorTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/persistence/WriteBatchProtectorTest.java
new file mode 100644
index 00000000000..fa41e2ef7b6
--- /dev/null
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/persistence/WriteBatchProtectorTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.persistence;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class WriteBatchProtectorTest {
+ private WriteBatchProtector protector;
+
+ @BeforeEach
+ void setUp() {
+ protector = new WriteBatchProtector();
+ }
+
+ @Test
+ void testUpdateKey() {
+ byte[] key = "testKey".getBytes();
+
+ protector.onUpdate(key);
+
+ assertTrue(protector.maybeUpdated(key), "Key should be present in the
bloom filter");
+ }
+
+ @Test
+ void testNoUpdates() {
+ byte[] key = "nonExistentKey".getBytes();
+
+ assertFalse(protector.maybeUpdated(key), "Key should not be present in
the bloom filter");
+ }
+
+ @Test
+ void testUpdateAndClear() {
+ byte[] key = "testKey".getBytes();
+
+ protector.onUpdate(key);
+ protector.clear();
+
+ assertFalse(protector.maybeUpdated(key), "Key should not be present
after clearing the bloom filter");
+ }
+
+ @Test
+ void testUpdateMultipleKeys() {
+ byte[] key1 = "key1".getBytes();
+ byte[] key2 = "key2".getBytes();
+
+ protector.onUpdate(key1);
+ protector.onUpdate(key2);
+
+ assertTrue(protector.maybeUpdated(key1), "Key1 should be present in
the bloom filter");
+ assertTrue(protector.maybeUpdated(key2), "Key2 should be present in
the bloom filter");
+ }
+
+ @Test
+ void testNoCollision() {
+ byte[] key1 = "key1".getBytes();
+ byte[] key2 = "key2".getBytes();
+
+ protector.onUpdate(key1);
+
+ // No collision.
+ assertFalse(protector.maybeUpdated(key2), "Key2 should not be present
unless explicitly added");
+ }
+}
diff --git
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index 00b19ec8648..804b07d3705 100644
---
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -50,6 +50,8 @@ import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.internal.failure.NoOpFailureManager;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.metastorage.CommandId;
@@ -60,6 +62,7 @@ import
org.apache.ignite.internal.metastorage.dsl.StatementResult;
import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
import org.apache.ignite.internal.metastorage.exceptions.MetaStorageException;
import org.apache.ignite.internal.metastorage.impl.EntryImpl;
+import
org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker.TrackingToken;
import org.apache.ignite.internal.raft.IndexWithTerm;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.Cursor;
@@ -70,6 +73,8 @@ import org.jetbrains.annotations.Nullable;
* Simple in-memory key/value storage for tests.
*/
public class SimpleInMemoryKeyValueStorage extends AbstractKeyValueStorage {
+ private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+
/**
* Keys index. Value is the list of all revisions under which entry
corresponding to the key was modified.
*
@@ -200,6 +205,61 @@ public class SimpleInMemoryKeyValueStorage extends
AbstractKeyValueStorage {
}
}
+ @Override
+ public Entry get(byte[] key) {
+ rwLock.readLock().lock();
+
+ try {
+ return super.get(key);
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public Entry get(byte[] key, long revUpperBound) {
+ rwLock.readLock().lock();
+
+ try {
+ return super.get(key, revUpperBound);
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public List<Entry> getAll(List<byte[]> keys) {
+ rwLock.readLock().lock();
+
+ try {
+ return super.getAll(keys);
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public List<Entry> getAll(List<byte[]> keys, long revUpperBound) {
+ rwLock.readLock().lock();
+
+ try {
+ return super.getAll(keys, revUpperBound);
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public long revision() {
+ rwLock.readLock().lock();
+
+ try {
+ return rev;
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
@Override
public void put(byte[] key, byte[] value, KeyValueUpdateContext context) {
rwLock.writeLock().lock();
@@ -795,6 +855,17 @@ public class SimpleInMemoryKeyValueStorage extends
AbstractKeyValueStorage {
updateRevision(curRev, context);
}
+ @Override
+ public void saveCompactionRevision(long revision, KeyValueUpdateContext
context) {
+ rwLock.writeLock().lock();
+
+ try {
+ super.saveCompactionRevision(revision, context);
+ } finally {
+ rwLock.writeLock().unlock();
+ }
+ }
+
@Override
public void saveCompactionRevision(long revision, KeyValueUpdateContext
context, boolean advanceSafeTime) {
savedCompactionRevision = revision;
@@ -806,6 +877,28 @@ public class SimpleInMemoryKeyValueStorage extends
AbstractKeyValueStorage {
}
}
+ @Override
+ public void setCompactionRevision(long revision) {
+ rwLock.writeLock().lock();
+
+ try {
+ super.setCompactionRevision(revision);
+ } finally {
+ rwLock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public void updateCompactionRevision(long compactionRevision,
KeyValueUpdateContext context) {
+ rwLock.writeLock().lock();
+
+ try {
+ super.updateCompactionRevision(compactionRevision, context);
+ } finally {
+ rwLock.writeLock().unlock();
+ }
+ }
+
@Override
public long checksum(long revision) {
throw new UnsupportedOperationException();
@@ -927,13 +1020,13 @@ public class SimpleInMemoryKeyValueStorage extends
AbstractKeyValueStorage {
Iterator<Entry> iterator = entries.iterator();
- long readOperationId =
readOperationForCompactionTracker.generateReadOperationId();
- readOperationForCompactionTracker.track(readOperationId,
revUpperBound);
+ //noinspection resource
+ TrackingToken token =
readOperationForCompactionTracker.track(revUpperBound, this::revision,
this::getCompactionRevision);
return new Cursor<>() {
@Override
public void close() {
- readOperationForCompactionTracker.untrack(readOperationId,
revUpperBound);
+ token.close();
}
@Override